programming with Spark streaming

Spark can read streaming from many sources, including file system, object storage, socket, message queue etc. Here I run the app to receive streaming from a socket server, handle the streaming and print out the results.

First I have to prepare a socket server. I wrote one with Perl, the content is following:

package MyPackage;
 
use strict;
use base qw(Net::Server::PreFork);
 
MyPackage->run(host=>'localhost',port=>9999);
 
# over-ride the default echo handler
sub process_request {
    my $self = shift;
    my @fruits=qw(apple orange tomato plum cherry peach apricot banana mango lemon);

    while(1) {
        my $fruit = $fruits[int(rand(scalar @fruits))];
        print "$fruit\r\n" or return;
        select(undef, undef, undef, 0.25);
    }
}
 
1;

Please open a terminal and run this script. This script run as a socket server, listening on port 9999 for accepting connections. Once the connect has been established, it prints the content to the remote socket, one line one word.

Here is the scala program who reads the streaming from socket and handles it.

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.storage.StorageLevel


object Myjob {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    // Create the context with a 15 second batch size
    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(15))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (e.g. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    words.foreachRDD { (rdd: RDD[String], time: Time) =>

      // Get the singleton instance of SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }

    ssc.start()
    ssc.awaitTermination()
  }
}


/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

You can see the source code from spark’s official sample.

How to deploy it? First we need Apache spark running on localhost.

Then we start a new project by creating a project root dir. In the project dir, the dir tree looks as this:

./build.sbt
./src/main/scala/Myjob.scala

“build.sbt” is the configuration file for project building, whose content is as following:

name := "My Streaming Job"

version := "1.0"

scalaVersion := "2.12.15"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.2.0" % "provided"

Here “name” specifies the project name. “version” specifies the project version. “scalaVersion” specifies scala’s version. please notice for spark 3.2.x we can use scala 2.12.x only. “libraryDependencies” is the project’s dependencies packages.

In the project dir, we run the following command to build the project:

$ sbt package
[info] welcome to sbt 1.6.1 (Ubuntu Java 11.0.11)
[info] loading project definition from /home/pyh/ops/spark/job6/project
[info] loading settings for project job6 from build.sbt ...
[info] set current project to My Streaming Job (in build file:/home/pyh/ops/spark/job6/)
[success] Total time: 3 s, completed Feb 7, 2022, 3:56:56 PM

After the building you will find the project dir becomes:

$ ls
build.sbt  project  src  target

There are new dir “project” and “target” added. And the compiled class is located at:

target/scala-2.12/my-streaming-job_2.12-1.0.jar

Finally we run the project by submitting the job to local spark server:

$ spark-submit --class "Myjob" --master local[2] target/scala-2.12/my-streaming-job_2.12-1.0.jar localhost 9999

Since I have only 2 cores in the VM, here I specify local[2] to use max 2 threads.

The results are printed out to the screen periodically:

========= 1644221235000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
| banana|    6|
|  lemon|    5|
|  mango|    9|
|  apple|    6|
|   plum|    5|
| tomato|   12|
| cherry|    8|
|  peach|    2|
|apricot|    5|
| orange|    2|
+-------+-----+

========= 1644221250000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
|  peach|    9|
|   plum|    8|
| banana|    7|
|  apple|    5|
|apricot|    6|
| tomato|    5|
| cherry|    6|
|  lemon|    4|
|  mango|    8|
| orange|    2|
+-------+-----+

========= 1644221265000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
|  apple|    2|
|  mango|    4|
|  lemon|    4|
|  peach|    9|
| orange|    6|
|apricot|    9|
| cherry|   10|
| banana|    4|
|   plum|    6|
| tomato|    6|
+-------+-----+

Until now all the job has been done.

The application environment:

  • OS: ubuntu 18.04 x86_64, a KVM instance
  • Ram: 4GB dedicated
  • CPU: double AMD 7302 processor
  • Spark: 3.2.0
  • Scala: 2.12.15