How do i receive avro messages through kafka in a spark streaming scala application?


#1

Hi,

İ’m encoding messages in avro format via php and want to decode it in a kafka streaming application. i’m accessing the message in kafka topic. any suggesstion will be appreciated.
thanks for help.

Regards.


#2

Hi,

Currently, I’m trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. thanks for any help. Code part is below:

import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010._ 

object Consumer4 { 
  def main(args: Array[String]): Unit = { 

val spark = SparkSession 
  .builder() 
  .appName("Kafka test") 
  .getOrCreate(); 

val sparkContext = SparkContext.getOrCreate() 
val streamingContext = new StreamingContext(sparkContext, Seconds(2)) 
val preferredHosts = LocationStrategies.PreferConsistent 
val topics = List("xyz") // 

val kafkaParams = Map[String, Object]( 
  "bootstrap.servers" -> "xyz.xyz.com.tr:1111", 
  "key.deserializer" -> classOf[StringDeserializer], 
  "value.deserializer" -> classOf[StringDeserializer], 
  "group.id" -> "use_a_separate_group_id_for_each_stream", 
  "auto.offset.reset" -> "latest", 
  "enable.auto.commit" -> (false: java.lang.Boolean) 
) 
val stream = KafkaUtils.createDirectStream[String, String]( 
  streamingContext, 
  PreferConsistent, 
  Subscribe[String, String](topics, kafkaParams) 
) 
//    stream.map(record => (record.key, record.value)) 
stream.foreachRDD { rdd => 
  //      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
  rdd.foreachPartition { iter => 

    while (iter.hasNext) { 
      val item = iter.next() 
      println(item) 
    } 

    //        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 
    //        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 
  } 
} 
streamingContext.start() 
streamingContext.awaitTermination() 

}
}