Integrate Spark Streaming With Kafka
-
Install the following SBT dependencies:
name := "spark-streaming-demo" version := "1.0" scalaVersion := "2.11.12" resolvers += "spark-packages" at "https://dl.bintray.com/spark-packages/maven/" val SPARK_VERSION = "2.4.0" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-streaming" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-streaming-kafka-0-10" % SPARK_VERSION ) scalacOptions in ThisBuild ++= Seq("-J-Xss8M") assemblyMergeStrategy in assembly := { case PathList("META-INF", xs@_*) => xs map { _.toLowerCase } match { case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard case _ => MergeStrategy.discard } case "conf/application.conf" => MergeStrategy.concat case _ => MergeStrategy.first } test in assembly := {} parallelExecution in Test := false
-
Create your Spark Streaming session with the following Kafka configuration:
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "<host>:<port>", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "my-group-id", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("my-topic") val session = SparkSession.builder() .appName("spark-demo") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.driver.allowMultipleContexts", "true") .getOrCreate() val ssc = new StreamingContext(session.sparkContext, Seconds(30))
-
You can now read messages from a Kafka topic by running the following lines of code:
val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val values = stream.map(record => record.value) values.print() ssc.start() // Start the computation ssc.awaitTermination()