Integrate Spark Streaming With Kafka

Use this tutorial to set up a Spark Streaming app that consumes data from Kafka topics and prints the results.

  1. Install the following sbt (Scala Build Tool) dependencies:

    // Project Configuration
    name := "spark-streaming-demo"
    version := "1.0"
    scalaVersion := "2.11.12"
    resolvers += "spark-packages" at "https://dl.bintray.com/spark-packages/maven/"
    val SPARK_VERSION = "2.4.0"
    
    // Add Dependencies
    // Note: Dependencies that are already present in Saagie are specify as "provided" to avoid having a heavy JAR file.
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided",
      "org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided",
      "org.apache.spark" %% "spark-streaming" % SPARK_VERSION % "provided",
      "org.apache.spark" %% "spark-streaming-kafka-0-10" % SPARK_VERSION
    )
    
    // Scala Compiler Options
    scalacOptions in ThisBuild ++= Seq("-J-Xss8M")
    
    // File Merge Strategy During Assembly
    assemblyMergeStrategy in assembly := {
      case PathList("META-INF", xs@_*) =>
        xs map {
          _.toLowerCase
        } match {
          case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard
          case _ => MergeStrategy.discard
        }
      case "conf/application.conf" => MergeStrategy.concat
      case _ => MergeStrategy.first
    }
    
    // Assembly Test Configuration
    test in assembly := {}
    // Parallel Execution Configuration
    parallelExecution in Test := false
  2. Create your Spark Streaming session with the following Kafka configuration:

    // Kafka Configuration
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "<host>:<port>",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "my-group-id",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    
    val topics = Array("my-topic")
    
    // Spark Session and Streaming Context Setup
    val session = SparkSession.builder()
      .appName("spark-demo")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.driver.allowMultipleContexts", "true")
      .getOrCreate()
    
    
    val ssc = new StreamingContext(session.sparkContext, Seconds(30))
  3. You can now read messages from a Kafka topic by running the following lines of code:

    // Kafka Direct Stream Setup
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    
    // Processing and Output
    val values = stream.map(record => record.value)
    
    values.print()
    
    // Start Computation and Await Termination
    ssc.start()
    ssc.awaitTermination()