新版插件:


说明: 从5.0开始,插件都独立拆分成gem包,每个插件可独立更新,无需等待Logstash自身整体更新,具体管理命令可参考./bin/logstash-plugin --help帮助信息../bin/logstash-plugin list其实所有的插件就位于本地./vendor/bundle/jruby/1.9/gems/目录下

扩展: 如果GitHub上面(https://github.com/logstash-plugins/)发布了扩展插件,可通过./bin/logstash-plugin install <plugin-name>,当然升级也很方便./bin/logstash-plugin update <plugin-name>,如果要安装更新本地已有的插件可通过./bin/logstash-plugin install/update <plugin-path>即可.

注意: 默认./bin/logstash-plugin install/update时是到https://rubygems.org/下载包,速度非常慢,所以强烈推荐手动从https://github.com/logstash-plugins/或https://rubygems.org/下载下来更新


输入插件: https://www.elastic.co/guide/en/logstash/current/input-plugins.html


说明: 输入插件只用于input区段中,可以通过不同的方式来采集数据,如上为目前Logstash支持的所有输入插件.


插件名称: udp (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html)

input{udp{port=>25826workers=>2queue_size=>20000buffer_size=>1452codec=>collectd{}type=>"collectd"}}output{stdout{codec=>rubydebug}}

生产案例:

yum-yinstallcollectd*cp-rfp/etc/collectd.conf/etc/collectd.conf_orgvim/etc/collectd.conf#Collectd基础信息采集配置Hostname"xm-server-00001"FQDNLookupfalseLoadPlugindfLoadPlugincpuLoadPlugindiskLoadPluginmemoryLoadPluginnetwork<Pluginnetwork>Server"10.2.5.51""25826"</Plugin>LoadPlugininterface<Plugininterface>Interface"eth0"IgnoreSelectedfalse</Plugin>Include"/etc/collectd.d"


说明: 如上是Logstash Udp插件的生产案例,Logstash配置文件中udp区段port表示监听的udp端口,workers表示读取socket数据的线程数,一般设置为CPU个数即可,queue_size表示udp包可堆积在内存中个数,buffer_size表示读取缓冲区最大大小,codec 表示编码器,type表示手工定义的类型,可用于filter区段中以及后面前端Kibana检索


插件名称: file (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html)

input{file{path=>["/xm-workspace/xm-webs/xmcloud/logs/*.log"]type=>"dss-pubserver"codec=>jsonstart_position=>"beginning"}}output{stdout{codec=>rubydebug}}

生产案例:

log_formatjson'{"@timestamp":"$time_iso8601",''"host":"$server_addr",''"clientip":"$remote_addr",''"size":$body_bytes_sent,''"responsetime":$request_time,''"upstreamtime":"$upstream_response_time",''"upstreamhost":"$upstream_addr",''"http_host":"$host",''"url":"$uri",''"xff":"$http_x_forwarded_for",''"referer":"$http_referer",''"agent":"$http_user_agent",''"status":"$status"}';


说明: 分析网站/接口访问日志时可监听匹配日志文件,同时会生成.sincedb数据库文件,记录被监听文件的inode/major/minor/position,file区段中还支持delimiter表示行分割符,discover_interval表示检查是否有新文件频率,默认15秒,exclude表示排除监听的匹配列表,close_older表示多久没修改就关闭监听句柄,默认3600秒,ignore_older表示只检查多久内的文件,默认86400,sincedb_path表示sincedb存放位置,默认位于(Linux: $HOME/.,sincedb | Windows: C:\Windows\System32\config\systemprofile\.sincedb),sincedb_write_interval表示多久写一次sincedb,默认为15秒,stat_interval为每隔多久检查一次被监听的文件状态,默认为1秒,start_position表示从什么位置读取文件数据,默认为结束位置,类似tail -f,当然也可设定beginning从头读取.

注意: 通常导入原有数据进Elasticsearch前需通过filter/date插件规范默认的@timestamp字段值,file插件并不支持递归监视,如果有需求可以数组形式匹配,支持/path/to/*/*/*/*.log写法,如果需要反复测试可将sincedb_path定义为/dev/null,这样每次重启都会自动从头开始读,还有一点儿需要注意的是Windows由于没有inode概念,所以监听文件不靠谱,推荐使用nxlog作为收集端.


插件名称: stdin (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html)

input{stdin{codec=>jsontags=>["dss-pubserver"]type=>"stdin"add_field=>{"runenv"=>"docker"}}}output{stdout{codec=>rubydebug}}

生产案例:

{"status":"","content_length":"","body_bytes_sent":"","http_x_forwarded_for":"","request_time":"","errors":{"text":"resbodyisinvalidprotocolformatargs","line":128,"func":"process_msg()","file":"dsspub-server.lua"},"request_body":"","http_referer":"","level":"","hostname":"","request":"","remote_addr":"","upstream_response_time":"","time_local":"","upstream_addr":""}


说明: type和tags是Logstash事件中两个特殊的字段,通常会在输入区段中来标记事件类型,然后在数据处理区域根据事件类型来调用插件处理消息或添加删除tags,最后在输出中通过tags来判断并选择输出


插件名称: syslog(https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html)

input{syslog{port=>"6514"}}output{stdout{codec=>rubydebug}}

生产案例:

vim/etc/rsyslog.conf*.*@@127.0.0.1:6514


说明: syslog在收集网络设备端日志时也许是几乎唯一可行办法,简单测试只需修改/etc/rsyslog.conf配置,加入网络传输支持,发送到本地的Logstash监听端口6514,默认接收过滤操作都在input中完成,虽然启动时可通过-w指定worker数但是同一个客户端数据的处理以及过滤都在同一个线程中完成,导致处理性能严重下降,如果使用udp+grok的match的%{SYSLOGLINE}和syslog_pri实现,虽然将input和grok拆分拆分到两个线程,但udp接收缓冲区有限制(netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'),当大于228096时就开始丢包,所以还是推荐使用tcp方式

扩展: 如果实在没法切换到tcp协议,可通过这个小工具来实现异步IO的UDP监听数据输入写入Elasticsearch(https://gist.github.com/chenryn/7c922ac424324ee0d695)


插件名称: tcp(https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html)

input{tcp{port=>"8888"mode=>"server"ssl_enable=>false}}output{stdout{codec=>rubydebug}}

生产实例:

nc127.0.0.18888<error.log


说明: Logstash也可通过Tcp组件实现简单消息队列,但千万别用于生产环境,生产环境还是换用专业的Logstash broker,比较常用的就是配合nc实现旧数据导入,这种导入旧数据的方式相比file好处在于可以准确的知道何时导入完成.