Trouble with deserializing Avro data in Scala


Trouble with deserializing Avro data in Scala



I am building an Apache Flink application in Scala which reads streaming data from a Kafka bus and then performs summarizing operations on it. The data from Kafka is in Avro format and needs a special Deserialization class. I found this scala class AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschemascala_saveveltri_scala):


package org.myorg.quickstart
import org.apache.avro.io.BinaryDecoder
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.reflect.ReflectDatumReader
import org.apache.avro.specific.{SpecificDatumReader, SpecificRecordBase}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.common.serialization._
import java.io.IOException

class AvroDeserializationSchema[T](val avroType: Class[T]) extends DeserializationSchema[T] {
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null

def deserialize(message: Array[Byte]): T = {
ensureInitialized()
try {
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)
}
catch {
case e: IOException => {
throw new RuntimeException(e)
}
}
}

def isEndOfStream(nextElement: T): Boolean = false


def getProducedType: TypeInformation[T] = TypeExtractor.getForClass(avroType)

private def ensureInitialized() {
if (reader == null) {
if (classOf[SpecificRecordBase].isAssignableFrom(avroType)) {
reader = new SpecificDatumReader[T](avroType)
}
else {
reader = new ReflectDatumReader[T](avroType)
}
}
}
}



In my streaming class i use this as follows:


val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))



where DeviceData is an Scala case class defined in the same project


/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double
)



I get the following error on compiling the StreamingKafkaClient.scala class


Error:(24, 102) object java.lang.Class is not a value
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](Class[DeviceData]), properties))



Also tried


val stream = env
.addSource(new FlinkKafkaConsumer010[String]("test", new
AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))



With that i get a different error:


Error:(21, 20) overloaded method constructor FlinkKafkaConsumer010 with alternatives:
(x$1: java.util.regex.Pattern,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.regex.Pattern,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.List[String],x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: java.util.List[String],x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: String,x$2: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String] <and>
(x$1: String,x$2: org.apache.flink.api.common.serialization.DeserializationSchema[String],x$3: java.util.Properties)org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010[String]
cannot be applied to (String, org.myorg.quickstart.AvroDeserializationSchema[org.myorg.quickstart.DeviceData], java.util.Properties)
.addSource(new FlinkKafkaConsumer010[String]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties))



I am completely new at Scala (this is my first scala program) so i know i am missing something fundamental here. As i try to learn Scala could someone please point out what am i doing wrong. My intent is to basically read avro encoded data from Kafka into Flink and do some operations on streaming data. I could not find any examples of the usage of AvroDeserializationSchema class, seems to me this is something that should be natively built into Flink packages.





By chance, does your Avro data come from the Confluent Schema Registry, because this won't work exactly the way as written anyway. See stackoverflow.com/a/45710854/2308683 Plus, if you know Java and not Scala, it's best to use Java 8
– cricket_007
Jul 1 at 18:41






Thanks, but i want to use Scala only.
– aarora
Jul 2 at 0:16





Did you see that link? It uses Scala. And you didn't answer my first question
– cricket_007
Jul 2 at 1:38





I will review the link again. No i am not using Confluent Registry Schema.
– aarora
Jul 2 at 3:00




1 Answer
1



In order to get a class object in Scala, you want classOf[DeviceData], not Class[DeviceData]


classOf[DeviceData]


Class[DeviceData]


new AvroDeserializationSchema[DeviceData](classOf[DeviceData])



I could not find any examples of the usage of AvroDeserializationSchema class



I found one (in Java)



Also, it looks like in Flink 1.6 release, they will add this class rather than you copying from elsewhere. FLINK-9337 & FLINK-9338



As mentioned in the comments, if you would like to use the Confluent Avro Schema Registry rather than giving a class type, see this answer, or refer to the code in the above Github link



Additionally, if you are running Kafka 0.11+ (or Confluent 3.3+), then you should ideally be using FlinkKafkaConsumer011 along with the class you are deserializing to


FlinkKafkaConsumer011


new FlinkKafkaConsumer011[DeviceData]





Edited my original question to include compiler error i got with "classOf[DeviceData]". Just saw the Java example (github.com/okkam-it/flink-examples/blob/master/src/main/java/…) EXACTLY what i am trying to do in Scala.
– aarora
Jul 2 at 3:01






You're still using [String] in your consumer type, not the type you're deserializing
– cricket_007
Jul 2 at 3:42


[String]





That was it, how did i miss that. Fixed that and it worked perfectly, thank you! I do have a next problem though, as the streamin data is received, the program crashed immediately because seems like the deserialization routine requires an "init" method: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: java.lang.NoSuchMethodException: org.myorg.quickstart.DeviceData.<init>()
– aarora
Jul 2 at 4:24





Not sure about that one. You have a case class, which should build a class initialization method. Feel free to accept this answer using the checkmark and post a new question
– cricket_007
Jul 2 at 4:48






Accepting this answer! To summarize i used FlinkKafkaConsumer011 (although i dont think that was needed for this problem) and the code that worked was : val stream = env .addSource(new FlinkKafkaConsumer011[DeviceData]("test", new AvroDeserializationSchema[DeviceData](classOf[DeviceData]), properties)) I will create a new issue for the init method related error.
– aarora
Jul 2 at 4:57







By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Popular posts from this blog

How to make file upload 'Required' in Contact Form 7?

Rothschild family

amazon EC2 - How to make wp-config.php to writable?