在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。

Java代码

defhasNext():Boolean={

if(state==FAILED)//处于FAILED状态时,另外线程访问会直接异常

thrownewIllegalStateException("Iteratorisinfailedstate")

statematch{

caseDONE=>false

caseREADY=>true

case_=>maybeComputeNext()

}

}

defmaybeComputeNext():Boolean={

state=FAILED//重置了状态

nextItem=Some(makeNext())

if(state==DONE){

false

}else{

state=READY

true

}

}

下载

protecteddefmakeNext():MessageAndMetadata[K,V]={

varcurrentDataChunk:FetchedDataChunk=null

//ifwedon'thaveaniterator,getone

varlocalCurrent=current.get()

if(localCurrent==null||!localCurrent.hasNext){

if(consumerTimeoutMs<0)

currentDataChunk=channel.take//channel是BlockingQueue这里会阻塞

else{

currentDataChunk=channel.poll(consumerTimeoutMs,TimeUnit.MILLISECONDS)

if(currentDataChunk==null){

//resetstatetomaketheiteratorre-iterable

resetState()

thrownewConsumerTimeoutException

}

}

//省略部分代码

}