Start spark3.3 + kafka3.2 streaming from scratch

I have not used spark for long days. Today I tried to setup spark to integrate with kafka streaming, but I found the ecosystem has changed a lot.

The main branches for spark and kafka are these:

Though scala 2.13 has been released for long days, but both spark and kafka were developed mainly by scala 2.12. When downloading the software, we should choose the version who uses scala 2.12.

Firstly, we want to install scala 2.12 in the system. My system is ubuntu 20.04, x64. Following the steps below to install java ,sdkman, scala and sbt:

$ sudo apt install openjdk-11-jre
$ curl -s "https://get.sdkman.io" | bash
$ sdk install scala 2.12.15
$ sdk install sbt

After then, download spark and kafka from the links above. Untar the packages, and move them to /opt directory. So in /opt dir I have:

$ ls /opt
kafka   spark

Then put these settings in .bash_profile file in user’s home dir:

source "/home/pyh/.sdkman/bin/sdkman-init.sh"

export SPARK_HOME=/opt/spark
export JAVA_HOME=/usr
export PATH=/opt/kafka/bin:/opt/spark/bin:$PATH

I have a script “kafka.sh” for managing the kafka service, whose content is as follow:

#!/bin/bash

ACT=$1
TOP=$2
PRE="/opt/kafka"

if [ -z $ACT ];then
  echo "$0 action [topic]"
  exit
fi

if [ -z $TOP ];then
  TOP="quickstart-events"
fi

if [ "$ACT" == "produce" ];then
  $PRE/bin/kafka-console-producer.sh --topic $TOP --bootstrap-server localhost:9092

elif [ "$ACT" == "consume" ];then
  $PRE/bin/kafka-console-consumer.sh --topic $TOP --from-beginning --bootstrap-server localhost:9092

elif [ "$ACT" == "create" ];then
  $PRE/bin/kafka-topics.sh --create --partitions 2 --replication-factor 1 --topic $TOP --bootstrap-server localhost:9092

elif [ "$ACT" == "desc" ];then
  $PRE/bin/kafka-topics.sh --describe --topic $TOP --bootstrap-server localhost:9092

elif [ "$ACT" == "startzk" ];then
  $PRE/bin/zookeeper-server-start.sh $PRE/config/zookeeper.properties

elif [ "$ACT" == "start" ];then
  $PRE/bin/kafka-server-start.sh $PRE/config/server.properties

fi

So, I use this script to start a kafka process and create a topic:

$ kafka.sh startzk  # startup zookeeper
$ kafka.sh start  # startup kafka
$ kafka.sh create mytest  # create a topic

These three commands should be run in three separated terminals. The last step is to create a topic named as “mytest”.

Now, I produce the messages to kafka by a ruby script:

$ cat produce.rb 
require 'kafka'

kafka = Kafka.new("localhost:9092", client_id: "ruby-client", resolve_seed_brokers: true)

producer = kafka.producer(required_acks: :all,max_buffer_size: 50_000)

1000.times do
    message = rand.to_s
    producer.produce(message, key: "key1", topic: "mytest")
end

producer.deliver_messages

To keep publishing messages continuously, we can do it in bash shell:

$ while [ 1 ];do ruby produce.rb ;sleep 5;done

For now the messages have been published to kafka successfully. I have to read the streaming from kafka into spark and calculate the messages from within spark.

To setup a scala project:

$ mkdir myproject
$ cd myproject
$ mkdir -p src/main/scala
$ touch build.sbt
$ touch src/main/scala/sparkafka.scala

Here is the content of build.sbt:

name := "sparkafka"

version := "0.1"

scalaVersion := "2.12.15"

libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.12" % "3.3.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.0"

And the source code in sparkafka.scala:

import org.apache.spark.sql.SparkSession

object Sparkafka {
    def main(args:Array[String]):Unit = {

      val spark = SparkSession.builder.appName("Mykafka").getOrCreate()

      val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "mytest")
      .load()

      import spark.implicits._

      df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]

      val myCount = df.groupBy("key").count()

      val query = myCount.writeStream
      .outputMode("complete")
      .format("console")
      .start()

      query.awaitTermination()

  }
}

In myproject directory, I run this command to compile and package the project:

$ sbt package
...
[success] Total time: 4 s, completed Jul 7, 2022, 4:39:19 PM

Go to spark’s configuration dir, change the log level in log4j2.properties to error:

$ cat /opt/spark/conf/log4j2.properties |grep error
rootLogger.level = error

The last step, submit the job to spark. In myproject dir, run the command below:

$ spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 --class "Sparkafka" --master local[2] target/scala-2.12/sparkafka_2.12-0.1.jar

Here is the output in terminal:

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+-----+
|          key|count|
+-------------+-----+
|[6B 65 79 31]| 2000|
+-------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------------+-----+
|          key|count|
+-------------+-----+
|[6B 65 79 31]| 4000|
+-------------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------------+-----+
|          key|count|
+-------------+-----+
|[6B 65 79 31]| 5000|
+-------------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------+-----+
|          key|count|
+-------------+-----+
|[6B 65 79 31]| 7000|
+-------------+-----+

Finally we got the job run correctly. For production deployment, we read the streaming from kafka, after aggregation we would write the results into a storage such as redis or mysql, instead of writing to terminal as in this sample.