Logstash 2.x版本kafka升级

V1

前言

Logstash 2.x版本output-kafka插件只支持kafka-0.8.x版本。但是工作中我们可能用到0.9.x版本的kafka。故而需要升级Logstash-output-kafka插件至3.x版本。

安装依赖包

yum-yinstallrubyrubygemsruby-develgemsources--addhttps://ruby.taobao.org/--removehttp://rubygems.org/geminstalljar-dependencies-v'0.3.4'geminstallruby-maven-v'3.3.11'

升级output-kafka

/usr/local/logstash/bin/logstash-pluginupdatelogstash-output-kafka

启动logstash有如下警告信息

./logstash-f/usr/local/logstash/conf/kafka.confSettings:Defaultpipelineworkers:8log4j:WARNNoappenderscouldbefoundforlogger(org.apache.kafka.clients.producer.ProducerConfig).log4j:WARNPleaseinitializethelog4jsystemproperly.log4j:WARNSeehttp://logging.apache.org/log4j/1.2/faq.html#noconfigformoreinfo.Pipelinemainstarted

解决办法

参考网站

1.切换到/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/目录下

cd/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-3.0.1/lib/logstash/outputs/

2.备份kafka.rb文件

mvkafka.rb{,.backup}

3.新建kafka.rb文件内容如下:

require'logstash/namespace'require'logstash/outputs/base'require'jruby-kafka'#WriteeventstoaKafkatopic.ThisusestheKafkaProducerAPItowritemessagestoatopicon#thebroker.##Theonlyrequiredconfigurationisthetopicname.Thedefaultcodecisjson,#soeventswillbepersistedonthebrokerinjsonformat.Ifyouselectacodecofplain,#Logstashwillencodeyourmessageswithnotonlythemessagebutalsowithatimestampand#hostname.Ifyoudonotwantanythingbutyourmessagepassingthrough,youshouldmaketheoutput#configurationsomethinglike:#[source,ruby]#output{#kafka{#codec=>plain{#format=>"%{message}"#}#}#}#Formoreinformationseehttp://kafka.apache.org/documentation.html#theproducer##Kafkaproducerconfiguration:http://kafka.apache.org/documentation.html#newproducerconfigsclassLogStash::Outputs::Kafka<LogStash::Outputs::Baseconfig_name'kafka'default:codec,'json'#Thetopictoproducemessagestoconfig:topic_id,:validate=>:string,:required=>true#Thisisforbootstrappingandtheproducerwillonlyuseitforgettingmetadata(topics,#partitionsandreplicas).Thesocketconnectionsforsendingtheactualdatawillbe#establishedbasedonthebrokerinformationreturnedinthemetadata.Theformatis#`host1:port1,host2:port2`,andthelistcanbeasubsetofbrokersoraVIPpointingtoa#subsetofbrokers.config:bootstrap_servers,:validate=>:string,:default=>'localhost:9092'#Serializerclassforthekeyofthemessageconfig:key_serializer,:validate=>:string,:default=>'org.apache.kafka.common.serialization.StringSerializer'#Serializerclassforthevalueofthemessageconfig:value_serializer,:validate=>:string,:default=>'org.apache.kafka.common.serialization.StringSerializer'#Thekeythatwillbeincludedwiththerecord##Ifa`message_key`ispresent,apartitionwillbechosenusingahashofthekey.#Ifnotpresent,apartitionforthemessagewillbeassignedinaround-robinfashion.config:message_key,:validate=>:string#Thenumberofacknowledgmentstheproducerrequirestheleadertohavereceived#beforeconsideringarequestcomplete.##acks=0,theproducerwillnotwaitforanyacknowledgmentfromtheserveratall.#acks=1,Thiswillmeantheleaderwillwritetherecordtoitslocallogbut#willrespondwithoutawaitingfullacknowledgementfromallfollowers.#acks=all,Thismeanstheleaderwillwaitforthefullsetofin-syncreplicastoacknowledgetherecord.config:acks,:validate=>["0","1","all"],:default=>"1"#Thetotalbytesofmemorytheproducercanusetobufferrecordswaitingtobesenttotheserver.config:buffer_memory,:validate=>:number,:default=>33554432#Thecompressiontypeforalldatageneratedbytheproducer.#Thedefaultisnone(i.e.nocompression).Validvaluesarenone,gzip,orsnappy.config:compression_type,:validate=>["none","gzip","snappy"],:default=>"none"#Settingavaluegreaterthanzerowillcausetheclientto#resendanyrecordwhosesendfailswithapotentiallytransienterror.config:retries,:validate=>:number,:default=>0#Theproducerwillattempttobatchrecordstogetherintofewerrequestswhenevermultiple#recordsarebeingsenttothesamepartition.Thishelpsperformanceonboththeclient#andtheserver.Thisconfigurationcontrolsthedefaultbatchsizeinbytes.config:batch_size,:validate=>:number,:default=>16384#Theidstringtopasstotheserverwhenmakingrequests.#Thepurposeofthisistobeabletotrackthesourceofrequestsbeyondjust#ip/portbyallowingalogicalapplicationnametobeincludedwiththerequestconfig:client_id,:validate=>:string#Theproducergroupstogetheranyrecordsthatarriveinbetweenrequest#transmissionsintoasinglebatchedrequest.Normallythisoccursonlyunder#loadwhenrecordsarrivefasterthantheycanbesentout.Howeverinsomecircumstances#theclientmaywanttoreducethenumberofrequestsevenundermoderateload.#Thissettingaccomplishesthisbyaddingasmallamountofartificialdelay—thatis,#ratherthanimmediatelysendingoutarecordtheproducerwillwaitforuptothegivendelay#toallowotherrecordstobesentsothatthesendscanbebatchedtogether.config:linger_ms,:validate=>:number,:default=>0#Themaximumsizeofarequestconfig:max_request_size,:validate=>:number,:default=>1048576#ThesizeoftheTCPreceivebuffertousewhenreadingdataconfig:receive_buffer_bytes,:validate=>:number,:default=>32768#ThesizeoftheTCPsendbuffertousewhensendingdata.config:send_buffer_bytes,:validate=>:number,:default=>131072#Theconfigurationcontrolsthemaximumamountoftimetheserverwillwaitforacknowledgments#fromfollowerstomeettheacknowledgmentrequirementstheproducerhasspecifiedwiththe#acksconfiguration.Iftherequestednumberofacknowledgmentsarenotmetwhenthetimeout#elapsesanerrorwillbereturned.Thistimeoutismeasuredontheserversideanddoesnot#includethenetworklatencyoftherequest.config:timeout_ms,:validate=>:number,:default=>30000#Whenourmemorybufferisexhaustedwemusteitherstopacceptingnew#records(block)orthrowerrors.Bydefaultthissettingistrueandweblock,#howeverinsomescenariosblockingisnotdesirableanditisbettertoimmediatelygiveanerror.config:block_on_buffer_full,:validate=>:boolean,:default=>true#thetimeoutsettingforinitialmetadatarequesttofetchtopicmetadata.config:metadata_fetch_timeout_ms,:validate=>:number,:default=>60000#themaxtimeinmillisecondsbeforeametadatarefreshisforced.config:metadata_max_age_ms,:validate=>:number,:default=>300000#Theamountoftimetowaitbeforeattemptingtoreconnecttoagivenhostwhenaconnectionfails.config:reconnect_backoff_ms,:validate=>:number,:default=>10#Theamountoftimetowaitbeforeattemptingtoretryafailedproducerequesttoagiventopicpartition.config:retry_backoff_ms,:validate=>:number,:default=>100publicdefregisterLogStash::Logger.setup_log4j(@logger)options={:key_serializer=>@key_serializer,:value_serializer=>@value_serializer,:bootstrap_servers=>@bootstrap_servers,:acks=>@acks,:buffer_memory=>@buffer_memory,:compression_type=>@compression_type,:retries=>@retries,:batch_size=>@batch_size,:client_id=>@client_id,:linger_ms=>@linger_ms,:max_request_size=>@max_request_size,:receive_buffer_bytes=>@receive_buffer_bytes,:send_buffer_bytes=>@send_buffer_bytes,:timeout_ms=>@timeout_ms,:block_on_buffer_full=>@block_on_buffer_full,:metadata_fetch_timeout_ms=>@metadata_fetch_timeout_ms,:metadata_max_age_ms=>@metadata_max_age_ms,:reconnect_backoff_ms=>@reconnect_backoff_ms,:retry_backoff_ms=>@retry_backoff_ms}@producer=Kafka::KafkaProducer.new(options)@producer.connect@logger.info('Registeringkafkaproducer',:topic_id=>@topic_id,:bootstrap_servers=>@bootstrap_servers)@codec.on_eventdo|event,data|beginkey=if@message_key.nil?thennilelseevent.sprintf(@message_key)end@producer.send_msg(event.sprintf(@topic_id),nil,key,data)rescueLogStash::ShutdownSignal@logger.info('Kafkaproducergotshutdownsignal')rescue=>e@logger.warn('kafkaproducerthrewexception,restarting',:exception=>e)endendend#defregisterdefreceive(event)ifevent==LogStash::SHUTDOWNreturnend@codec.encode(event)enddefclose@producer.closeendend#classLogStash::Outputs::Kafka