Integrate Spark Streaming With Kafka
-
Install the following sbt (Scala Build Tool) dependencies:
// Project Configuration name := "spark-streaming-demo" version := "1.0" scalaVersion := "2.11.12" resolvers += "spark-packages" at "https://dl.bintray.com/spark-packages/maven/" val SPARK_VERSION = "2.4.0" // Add Dependencies // Note: Dependencies that are already present in Saagie are specify as "provided" to avoid having a heavy JAR file. libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-streaming" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-streaming-kafka-0-10" % SPARK_VERSION ) // Scala Compiler Options scalacOptions in ThisBuild ++= Seq("-J-Xss8M") // File Merge Strategy During Assembly assemblyMergeStrategy in assembly := { case PathList("META-INF", xs@_*) => xs map { _.toLowerCase } match { case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard case _ => MergeStrategy.discard } case "conf/application.conf" => MergeStrategy.concat case _ => MergeStrategy.first } // Assembly Test Configuration test in assembly := {} // Parallel Execution Configuration parallelExecution in Test := false
-
Create your Spark Streaming session with the following Kafka configuration:
// Kafka Configuration val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "<host>:<port>", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "my-group-id", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("my-topic") // Spark Session and Streaming Context Setup val session = SparkSession.builder() .appName("spark-demo") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.driver.allowMultipleContexts", "true") .getOrCreate() val ssc = new StreamingContext(session.sparkContext, Seconds(30))
-
You can now read messages from a Kafka topic by running the following lines of code:
// Kafka Direct Stream Setup val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) // Processing and Output val values = stream.map(record => record.value) values.print() // Start Computation and Await Termination ssc.start() ssc.awaitTermination()