here simple scala program use kafka in flink:
import org.apache.kafka.clients.producer.{producerconfig, kafkaproducer, producerrecord} import org.apache.flink.streaming.api.environment._ import org.apache.flink.streaming.connectors.kafka import org.apache.flink.streaming.connectors.kafka.api._ import org.apache.flink.streaming.util.serialization._ import org.apache.flink.api.common.typeinfo._ object testkafka { def main(args: array[string]) { val env = streamexecutionenvironment.getexecutionenvironment val stream = env .addsource(new kafkasource[string]("localhost:2181", "test", new simplestringschema)) .addsink(new kafkasink[string]("localhost:2181", "test", new javadefaultstringschema)) env.execute("test kafka") } } in build.sbt:
librarydependencies ++= seq("org.apache.flink" % "flink-scala" % "0.9.0", "org.apache.flink" % "flink-clients" % "0.9.0") librarydependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1" librarydependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0" librarydependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}") librarydependencies += "com.101tec" % "zkclient" % "0.5" i'm using "sbt assembly" build fat jar, target jar file supposed contain everything. however, there errors when running target jar file:
java.lang.noclassdeffounderror: org/i0itec/zkclient/serialize/zkserializer @ kafka.javaapi.consumer.zookeeperconsumerconnector.<init>(zookeeperconsumerconnector.scala:66) @ kafka.javaapi.consumer.zookeeperconsumerconnector.<init>(zookeeperconsumerconnector.scala:69) @ kafka.consumer.consumer$.createjavaconsumerconnector(consumerconnector.scala:105) @ kafka.consumer.consumer.createjavaconsumerconnector(consumerconnector.scala) @ org.apache.flink.streaming.connectors.kafka.api.kafkasource.initializeconnection(kafkasource.java:175) @ org.apache.flink.streaming.connectors.kafka.api.kafkasource.open(kafkasource.java:207) @ org.apache.flink.api.common.functions.util.functionutils.openfunction(functionutils.java:33) @ org.apache.flink.streaming.api.operators.abstractudfstreamoperator.open(abstractudfstreamoperator.java:56) @ org.apache.flink.streaming.runtime.tasks.streamtask.openoperator(streamtask.java:158) @ org.apache.flink.streaming.runtime.tasks.sourcestreamtask.invoke(sourcestreamtask.java:52) @ org.apache.flink.runtime.taskmanager.task.run(task.java:559) @ java.lang.thread.run(thread.java:745) caused by: java.lang.classnotfoundexception: org.i0itec.zkclient.serialize.zkserializer @ java.net.urlclassloader$1.run(urlclassloader.java:366) ......... i put zkclient-0.5.jar under /lib. can shed light?
Comments
Post a Comment