scala - KafkaUtils.createDirectStream to a String object Spark -


i have kafka producer reads directory , writes contents of files topic

  def main(args: array[string]) {     val array(brokers, topic, messagespersec, wordspermessage) = array("quickstart.cloudera:9092", "test","10","10") val directorypath = "/home/cloudera/documents/config/" // zookeeper connection properties val props = new hashmap[string, object]() props.put(producerconfig.bootstrap_servers_config, brokers) props.put(producerconfig.value_serializer_class_config,   "org.apache.kafka.common.serialization.stringserializer") props.put(producerconfig.key_serializer_class_config,   "org.apache.kafka.common.serialization.stringserializer")  val producer = new kafkaproducer[string, string](props)  val mydirectory= new file(directorypath) var lines ="" (file <- mydirectory.listfiles) {    lines = scala.io.source.fromfile(file).mkstring     val message = new producerrecord[string, string](topic, null, lines)    producer.send(message)    print(lines)    thread.sleep(1000) } 

similarly using spark direct streaming consumer

val lines = kafkautils.createdirectstream[array[byte], string, defaultdecoder, stringdecoder](ssc, kafkaconf, set(topic)).map(_._2)  val str = lines.print(10) 

i able print content of file. using single topic. have fetch rdd dstream , take entire content string object can pass method. can ?

the api looking is:

dstream.foreachrdd(func) 

it applies function, func, each rdd generated stream. so, use case, write following code:

lines.foreachrdd(rdd => {   val data = rdd.collect().mkstring("\n")   println(data) }) 

please note since code runs on driver process, have make sure has enough resources process given file. usually, 1 should use api push data in each rdd external system, such saving rdd files, or writing on network database.

you can further read other output operations of dstreams on spark's programming guide.


Comments