Getting Bootstrap broker ip:9092 disconnected error from kafka spout


Getting Bootstrap broker ip:9092 disconnected error from kafka spout



Versions:


"org.apache.storm" % "storm-kafka-client" % "1.2.1"
"org.apache.storm" % "storm-core" % "1.2.1" % "compile"
Kafka: 0.10.1.0



I am getting following error/warnings, running in localCluster, from my kafka spout:


2018-06-28 00:00:34,930 AppInfoParser [INFO] Kafka version : 0.10.1.0
2018-06-28 00:00:34,930 AppInfoParser [INFO] Kafka commitId : 3402a74efb23d1d4
2018-06-28 00:00:34,931 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip1:9092 disconnected
2018-06-28 00:00:35,092 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip2:9092 disconnected
2018-06-28 00:00:35,251 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip3:9092 disconnected
2018-06-28 00:00:35,524 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip4:9092 disconnected
2018-06-28 00:00:35,629 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip5:9092 disconnected
2018-06-28 00:00:35,822 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip6:9092 disconnected
2018-06-28 00:00:35,927 WARN NetworkClient [Thread-40-KafkaSpout-executor[12 12]] Bootstrap broker ip7:9092 disconnected



Here is the code of Kafka Spout:


private def getKafkaSpoutConfig(source: TopologyConfig) = {
System.clearProperty("java.security.auth.login.config") //tried this after getting error, no impact
KafkaSpoutConfig.builder("ip1:9092,ip2:9092,ip3:9092,.....,ip10:9092", "topicName")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setOffsetCommitPeriodMs(100)
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
.setMaxUncommittedOffsets(1000000)
.build()
}
def getKafkaSpout(source: TopologyConfig) = new KafkaSpout(getKafkaSpoutConfig(source: TopologyConfig))



On debugging, I see following is the stack trace for this error:


maybeHandleDisconnection:568, NetworkClient$DefaultMetadataUpdater (org.apache.kafka.clients)
processDisconnection:396, NetworkClient (org.apache.kafka.clients)
handleDisconnections:464, NetworkClient (org.apache.kafka.clients)
poll:270, NetworkClient (org.apache.kafka.clients)
poll:232, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
poll:195, ConsumerNetworkClient (org.apache.kafka.clients.consumer.internals)
getTopicMetadata:253, Fetcher (org.apache.kafka.clients.consumer.internals)
partitionsFor:1318, KafkaConsumer (org.apache.kafka.clients.consumer)
getFilteredTopicPartitions:57, NamedTopicFilter (org.apache.storm.kafka.spout)
refreshAssignment:54, ManualPartitionSubscription (org.apache.storm.kafka.spout)
subscribe:49, ManualPartitionSubscription (org.apache.storm.kafka.spout)
subscribeKafkaConsumer:657, KafkaSpout (org.apache.storm.kafka.spout)
activate:648, KafkaSpout (org.apache.storm.kafka.spout)
invoke:484, util$async_loop$fn__557 (org.apache.storm)
run:22, AFn (clojure.lang)
run:748, Thread (java.lang)



The same code was working for one Kafka setup, but for another setup of kafka of same version it started to give above error.



As par comments, I tried to connect to port 9092 of Kafka, Which I was able to do:


➜ git:(myBranch) ✗ telnet ipn 9092
Trying ipn...
Connected to my-kafka-app-396433.
Escape character is '^]'.





Can you please show you can reach port 9092 of Kafka from your Storm machines?
– cricket_007
Jun 28 at 3:56





@cricket_007 I am able to connect, see edited question. I am trying to run this storm via local mode from my Mac.
– Saurabh
Jun 28 at 4:27





Can you enable TRACE logging, see if you get more information?
– cricket_007
Jun 28 at 4:30





@cricket_007 I already have conf.setDebug(true) for Storm or do you mean anything else?
– Saurabh
Jun 28 at 4:38


conf.setDebug(true)





I don't know the Storm API, but I think you have a log4j config being applied at some level.
– cricket_007
Jun 28 at 6:04




1 Answer
1



This was happening because of some version mismatch of kafka. The installed kafka version was 0.10.0.1 while the code was picking and executing with kafka-clients version: 0.10.1.0.


0.10.0.1


0.10.1.0



It was happening as storm-core has a dependency of kafka-clients version: 0.10.1.0, which can be overwritten, which I did but somehow it was not excluded properly in sbt. After some permutations it was working and final dependencies were looking like this:


storm-core


kafka-clients


0.10.1.0


Seq("org.apache.storm" % "storm-core" % "1.2.1" % "compile" excludeAll(
ExclusionRule(organization = "org.apache.logging.log4j"),
ExclusionRule(organization = "org.apache.kafka", artifact = "kafka-clients"),
ExclusionRule(organization = "ring-cors"),
ExclusionRule(organization = "org.slf4j", artifact = "*"),
ExclusionRule(organization = "log4j", artifact = "*"),
ExclusionRule(organization = "javax", artifact = "*"),
ExclusionRule(organization = "javax.servlet", artifact="*")
)),
"org.apache.kafka" % "kafka-clients" % "0.10.0.1" excludeAll(
ExclusionRule(organization = "org.slf4j", name = "slf4j-log4j12"),
ExclusionRule(organization = "log4j"),
ExclusionRule(organization = "javax.servlet", artifact="*")
),
"org.apache.storm" % "storm-kafka-client" % "1.2.1" excludeAll(
ExclusionRule(organization = "org.apache.kafka", artifact = "*"),
ExclusionRule(organization = "org.apache.logging.log4j"),
ExclusionRule(organization = "ring-cors"),
ExclusionRule(organization = "org.apache.logging.log4j", artifact = "*"),
ExclusionRule(organization = "org.slf4j", artifact = "*"),
ExclusionRule(organization = "log4j", artifact = "*")
),






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

Rothschild family

Cinema of Italy