kafka high-level consumer 多线程访
在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。
Java代码
defhasNext():Boolean={
if(state==FAILED)//处于FAILED状态时,另外线程访问会直接异常
thrownewIllegalStateException("Iteratorisinfailedstate")
statematch{
caseDONE=>false
caseREADY=>true
case_=>maybeComputeNext()
}
}
defmaybeComputeNext():Boolean={
state=FAILED//重置了状态
nextItem=Some(makeNext())
if(state==DONE){
false
}else{
state=READY
true
}
}
下载
protecteddefmakeNext():MessageAndMetadata[K,V]={
varcurrentDataChunk:FetchedDataChunk=null
//ifwedon'thaveaniterator,getone
varlocalCurrent=current.get()
if(localCurrent==null||!localCurrent.hasNext){
if(consumerTimeoutMs<0)
currentDataChunk=channel.take//channel是BlockingQueue这里会阻塞
else{
currentDataChunk=channel.poll(consumerTimeoutMs,TimeUnit.MILLISECONDS)
if(currentDataChunk==null){
//resetstatetomaketheiteratorre-iterable
resetState()
thrownewConsumerTimeoutException
}
}
//省略部分代码
}
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。