How do i receive avro messages through kafka in a spark streaming scala application?



İ’m encoding messages in avro format via php and want to decode it in a kafka streaming application. i’m accessing the message in kafka topic. any suggesstion will be appreciated.
thanks for help.




Currently, I’m trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. thanks for any help. Code part is below:

import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010._ 

object Consumer4 { 
  def main(args: Array[String]): Unit = { 

val spark = SparkSession 
  .appName("Kafka test") 

val sparkContext = SparkContext.getOrCreate() 
val streamingContext = new StreamingContext(sparkContext, Seconds(2)) 
val preferredHosts = LocationStrategies.PreferConsistent 
val topics = List("xyz") // 

val kafkaParams = Map[String, Object]( 
  "bootstrap.servers" -> "", 
  "key.deserializer" -> classOf[StringDeserializer], 
  "value.deserializer" -> classOf[StringDeserializer], 
  "" -> "use_a_separate_group_id_for_each_stream", 
  "auto.offset.reset" -> "latest", 
  "" -> (false: java.lang.Boolean) 
val stream = KafkaUtils.createDirectStream[String, String]( 
  Subscribe[String, String](topics, kafkaParams) 
// => (record.key, record.value)) 
stream.foreachRDD { rdd => 
  //      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
  rdd.foreachPartition { iter => 

    while (iter.hasNext) { 
      val item = 

    //        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 
    //        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")