python连接kafka的方法
小编给大家分享一下python连接kafka的方法,希望大家阅读完这篇文章后大所收获,下面让我们一起去探讨方法吧!
1、kafka-python安装:
#PyPI安装pipinstallkafka-python#conda安装condainstall-cconda-forgekafka-python#anaconda自带pip安装/root/anaconda3/bin/pipinstallkafka-python
2、kafka-python生产者
producer.py
#!/usr/bin/envpython#-*-coding:utf-8-*-importdatetimeimportjsonimporttimeimportuuidfromkafkaimportKafkaProducerfromkafka.errorsimportKafkaErrorproducer=KafkaProducer(bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092')topic='test_20181105'deftest():print('begin')try:n=0whileTrue:dic={}dic['id']=nn=n+1dic['myuuid']=str(uuid.uuid4().hex)dic['time']=datetime.datetime.now().strftime("%Y%m%d%H:%M:%S")producer.send(topic,json.dumps(dic).encode())print("send:"+json.dumps(dic))time.sleep(0.5)exceptKafkaErrorase:print(e)finally:producer.close()print('done')if__name__=='__main__':test()
服务器集群中配置好Kafka, 修改上面程序中的ip地址和端口号, 执行python脚本就可以成功将消息发送到 topic: test_20181105
send:{"id":1411,"myuuid":"a25a3d0361f94d3b8fffd5967ab5df01","time":"2018110516:11:14"}send:{"id":1412,"myuuid":"784efd5389564194941240dca66233b6","time":"2018110516:11:14"}send:{"id":1413,"myuuid":"6a211195319e447aa559614662f70590","time":"2018110516:11:15"}send:{"id":1414,"myuuid":"2cc45bd82baf4a1cb41ea4786e50a0df","time":"2018110516:11:15"}send:{"id":1415,"myuuid":"b7dfed4919c74164b83cf3ec28e257b6","time":"2018110516:11:16"}send:{"id":1416,"myuuid":"9218eceb17834c228f5ab01ca7595272","time":"2018110516:11:16"}send:{"id":1417,"myuuid":"c2751c54c390453f9eedd417fb1e5a31","time":"2018110516:11:17"}send:{"id":1418,"myuuid":"9bbc4ef2cfbb42148332eb979b1142cb","time":"2018110516:11:17"}send:{"id":1419,"myuuid":"f4998a862494445c976137793b55ed73","time":"2018110516:11:18"}
3、kafka-python消费者
consumer.py
#!/bin/envpythonfromkafkaimportKafkaConsumer#connecttoKafkaserverandpassthetopicwewanttoconsumeconsumer=KafkaConsumer('test_20181105',group_id='test_group2',bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092')try:formsginconsumer:print(msg)#print("%s:%d:%d:key=%svalue=%s"%(msg.topic,msg.partition,msg.offset,msg.key,msg.value))exceptKeyboardInterruptase:print(e)
同样修改上面的Ip地址和端口号,就可以接收 topic: test_20181105上的消息:
ConsumerRecord(topic='test_20181105',partition=1,offset=951,timestamp=1541405600340,timestamp_type=0,key=None,value=b'{"id":1663,"myuuid":"0f744021b2d9468886908ee6685a0fdb","time":"2018110516:13:20"}',checksum=1357895145,serialized_key_size=-1,serialized_value_size=87)ConsumerRecord(topic='test_20181105',partition=0,offset=935,timestamp=1541405600841,timestamp_type=0,key=None,value=b'{"id":1664,"myuuid":"9379f68f656644bdb2d30911f06240e4","time":"2018110516:13:20"}',checksum=-715594646,serialized_key_size=-1,serialized_value_size=87)ConsumerRecord(topic='test_20181105',partition=1,offset=952,timestamp=1541405601341,timestamp_type=0,key=None,value=b'{"id":1665,"myuuid":"f4a5fa5b32cd4b7991612b626bea4b0e","time":"2018110516:13:21"}',checksum=-2068072013,serialized_key_size=-1,serialized_value_size=87)
可以通过设置不同的group_id 来实现消息队列或消息订阅:
如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
看完了这篇文章,相信你对python连接kafka的方法有了一定的了解,想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。