Author Archives: DevOps

programming with Spark streaming

Spark can read streaming from many sources, including file system, object storage, socket, message queue etc. Here I run the app to receive streaming from a socket server, handle the streaming and print out the results.

First I have to prepare a socket server. I wrote one with Perl, the content is following:

package MyPackage;
 
use strict;
use base qw(Net::Server::PreFork);
 
MyPackage->run(host=>'localhost',port=>9999);
 
# over-ride the default echo handler
sub process_request {
    my $self = shift;
    my @fruits=qw(apple orange tomato plum cherry peach apricot banana mango lemon);

    while(1) {
        my $fruit = $fruits[int(rand(scalar @fruits))];
        print "$fruit\r\n" or return;
        select(undef, undef, undef, 0.25);
    }
}
 
1;

Please open a terminal and run this script. This script run as a socket server, listening on port 9999 for accepting connections. Once the connect has been established, it prints the content to the remote socket, one line one word.

Here is the scala program who reads the streaming from socket and handles it.

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.storage.StorageLevel


object Myjob {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    // Create the context with a 15 second batch size
    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(15))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (e.g. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    words.foreachRDD { (rdd: RDD[String], time: Time) =>

      // Get the singleton instance of SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }

    ssc.start()
    ssc.awaitTermination()
  }
}


/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

You can see the source code from spark’s official sample.

How to deploy it? First we need Apache spark running on localhost.

Then we start a new project by creating a project root dir. In the project dir, the dir tree looks as this:

./build.sbt
./src/main/scala/Myjob.scala

“build.sbt” is the configuration file for project building, whose content is as following:

name := "My Streaming Job"

version := "1.0"

scalaVersion := "2.12.15"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.2.0" % "provided"

Here “name” specifies the project name. “version” specifies the project version. “scalaVersion” specifies scala’s version. please notice for spark 3.2.x we can use scala 2.12.x only. “libraryDependencies” is the project’s dependencies packages.

In the project dir, we run the following command to build the project:

$ sbt package
[info] welcome to sbt 1.6.1 (Ubuntu Java 11.0.11)
[info] loading project definition from /home/pyh/ops/spark/job6/project
[info] loading settings for project job6 from build.sbt ...
[info] set current project to My Streaming Job (in build file:/home/pyh/ops/spark/job6/)
[success] Total time: 3 s, completed Feb 7, 2022, 3:56:56 PM

After the building you will find the project dir becomes:

$ ls
build.sbt  project  src  target

There are new dir “project” and “target” added. And the compiled class is located at:

target/scala-2.12/my-streaming-job_2.12-1.0.jar

Finally we run the project by submitting the job to local spark server:

$ spark-submit --class "Myjob" --master local[2] target/scala-2.12/my-streaming-job_2.12-1.0.jar localhost 9999

Since I have only 2 cores in the VM, here I specify local[2] to use max 2 threads.

The results are printed out to the screen periodically:

========= 1644221235000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
| banana|    6|
|  lemon|    5|
|  mango|    9|
|  apple|    6|
|   plum|    5|
| tomato|   12|
| cherry|    8|
|  peach|    2|
|apricot|    5|
| orange|    2|
+-------+-----+

========= 1644221250000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
|  peach|    9|
|   plum|    8|
| banana|    7|
|  apple|    5|
|apricot|    6|
| tomato|    5|
| cherry|    6|
|  lemon|    4|
|  mango|    8|
| orange|    2|
+-------+-----+

========= 1644221265000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
|  apple|    2|
|  mango|    4|
|  lemon|    4|
|  peach|    9|
| orange|    6|
|apricot|    9|
| cherry|   10|
| banana|    4|
|   plum|    6|
| tomato|    6|
+-------+-----+

Until now all the job has been done.

The application environment:

  • OS: ubuntu 18.04 x86_64, a KVM instance
  • Ram: 4GB dedicated
  • CPU: double AMD 7302 processor
  • Spark: 3.2.0
  • Scala: 2.12.15

computing performance comparison for words statistics

Here I provided a dataset for words appearing in the public tech lists.

The file is a text file, one line with one word. The total lines and size:

$ wc -l words.txt 
283218160 words.txt

$ du -h words.txt 
1.5G	words.txt

You can download this file from here (tgz compressed).

I just did a test to count the words in this file by grouping and sorting, with three methods: spark RDD API, spark dataframe API, and the scala program.

This is the syntax of spark RDD API in pyspark:

>>> rdd = sc.textFile("/tmp/words.txt")
>>> rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False).take(20)

This is the syntax of spark dataframe API in pyspark:

>>> df = spark.read.text("/tmp/words.txt")
>>> df.select("*").groupBy("value").count().orderBy("count",ascending=False).show()

This is the scala program:

import scala.io.Source

object BigWords extends App {

  val file = Source.fromFile("/tmp/words.txt").getLines()
  val hash = scala.collection.mutable.Map[String,Int]()

  for (x <- file) {
    if (hash.contains(x)) hash(x) += 1 else hash(x) = 1
  }

  hash.toList.sortBy(-_._2).take(20).foreach(println)
}

I compiled the scala program and run it:

$ scalac bigwords.scala 
$ time scala BigWords
(the,14218320)
(to,11045040)
(a,5677600)
(and,5205760)
(is,4972080)
(i,4447440)
(in,4228200)
(of,3982280)
(on,3899320)
(for,3760800)
(this,3684640)
(you,3485360)
(at,3238480)
(that,3230920)
(it,2925320)
(with,2181160)
(be,2172400)
(not,2124320)
(from,2097120)
(if,1993560)

real	0m49.031s
user	0m50.390s
sys	0m0.734s

As you see it takes 49s to finish the job.

While spark’s dataframe API is a bit slower, it takes 56s to finish the job (timer from my iOS stopwatch app):

+-----+--------+                                                                
|value|   count|
+-----+--------+
|  the|14218320|
|   to|11045040|
|    a| 5677600|
|  and| 5205760|
|   is| 4972080|
|    i| 4447440|
|   in| 4228200|
|   of| 3982280|
|   on| 3899320|
|  for| 3760800|
| this| 3684640|
|  you| 3485360|
|   at| 3238480|
| that| 3230920|
|   it| 2925320|
| with| 2181160|
|   be| 2172400|
|  not| 2124320|
| from| 2097120|
|   if| 1993560|
+-----+--------+
only showing top 20 rows

But, spark RDD API is quite slow in my use case. It takes 7m15s to finish the job:

[('the', 14218320), ('to', 11045040), ('a', 5677600), ('and', 5205760), ('is', 4972080), ('i', 4447440), ('in', 4228200), ('of', 3982280), ('on', 3899320), ('for', 3760800), ('this', 3684640), ('you', 3485360), ('at', 3238480), ('that', 3230920), ('it', 2925320), ('with', 2181160), ('be', 2172400), ('not', 2124320), ('from', 2097120), ('if', 1993560)]

I doubt it’s due to the slow python parser, so I re-run the RDD API with spark’s scala shell. The syntax and results as below:

scala> val rdd = sc.textFile("/tmp/words.txt")

scala> rdd.map{(_,1)}.reduceByKey{_ + _}.sortBy{-_._2}.take(20)

res1: Array[(String, Int)] = Array((the,14218320), (to,11045040), (a,5677600), (and,5205760), (is,4972080), (i,4447440), (in,4228200), (of,3982280), (on,3899320), (for,3760800), (this,3684640), (you,3485360), (at,3238480), (that,3230920), (it,2925320), (with,2181160), (be,2172400), (not,2124320), (from,2097120), (if,1993560))

This time it takes 1m31s to finish the job.

The results summary in a table:

programtime
scala program49s
pyspark dataframe56s
scala RDD1m31s
pyspark RDD7m15s

I am so surprised pyspark’s RDD API is too slow as this. I want to give a research on the case.

Application environment:

  • OS: Ubuntu 18.04.6 LTS, a KVM instance
  • Spark version: 3.2.0 (with scala 2.12.15, and python 3.6.9)
  • Scala version: 2.13.7
  • CPU: double AMD EPYC 7302
  • Ram: 4GB dedicated
  • Disk: 40GB SSD

RabbitMQ admin script based on REST API

Here I use the ruby script below to create a vhost for user, with the corresponding permissions setup. It can delete the vhost and user as well. The script is based on RabbitMQ’s restful admin API.

require "rabbitmq/http/client"
require 'securerandom'

class MQManage

    def initialize(admin_host,mq_host,mq_port,mq_user,mq_pass)
        @mq_host = mq_host
        @mq_port = mq_port
        endpoint = "http://#{admin_host}:15672"
        @client = RabbitMQ::HTTP::Client.new(endpoint, :username => mq_user, :password => mq_pass)
    end

    def create_mq_dsn
        vhost = "/" + SecureRandom.hex(6)
        user = SecureRandom.hex(6)
        pass = SecureRandom.hex(8)
        @client.create_vhost(vhost)
        @client.update_user(user, :tags => "autodeploy", :password => pass)
        @client.update_permissions_of(vhost, user, :write => ".*", :read => ".*", :configure => ".*")

        dsn = {:host => @mq_host, :port => @mq_port, :vhost => vhost, :user => user, :pass => pass}
        return dsn
    end

    def drop_mq_dsn(vhost)
        vs = @client.list_vhosts
        names = []
        vs.each do |s| names << s.name end

        if vhost == "/"
            return -1
        end

        if not names.include? vhost
            return -2
        end

        ps = @client.list_permissions(vhost)
        ps.each do |s|
           @client.delete_user(s.user)
        end

        @client.delete_vhost(vhost)
    end
end

Why mmap has no significant effect in my code

In my last blog there was a small benchmark for the languages of scala, perl and ruby.

Many geeks on the lists have pointed out the problems and gave the suggestions for improvement. I really appreciate them.

Why I want to benchmark them? because we have the real production which is using the similar counting technology. In the production, the input is the streaming. That means, every second there are the input words coming into the service. The data are coming from a message queue like Kafka or Rabbitmq or whatever. Then, we run the realtime computing on Spark. Spark reads the streaming and then run the DSL syntax for filtering and counting.

Here is the similar syntax on spark:

rdd=spark.readStream.format("socket")... # similar to words.txt
df=spark.createDataFrame(rdd.map(lambda x:(x,1)),["word","count"])
df2=df.filter(~col("word").isin(stopwords)) # filter out stopwords
df2.select("*").groupBy("word").count().orderBy("count",ascending=False).show(20)

Thanks to Paul who points out that, I can use mmap() to speed up the file read. I have followed Paul’s code to test again, but it’s strange mmap() have no special values to me. Here I give the comparison.

The common-read version for perl:

use strict;

my %stopwords;

open HD,"stopwords.txt" or die $!;
while(<HD>) {
    chomp;
    $stopwords{$_} =1;
}
close HD;

my %count;

open HD,"words.txt" or die $!;
while(<HD>) {
    chomp;
    unless ( $stopwords{$_} ) {
        $count{$_} ++;
    }
}
close HD;

my $i=0;
for (sort {$count{$b} <=> $count{$a}} keys %count) {
    if ($i < 20) {
        print "$_ -> $count{$_}\n"
    } else {
       last; 
    }
    $i ++;
}

The mmap version for perl:

use strict;

my %stopwords;

open my $fh, '<:mmap', 'stopwords.txt' or die $!;
while(<$fh>) {
    chomp;
    $stopwords{$_} =1;
}
close $fh;

my %count;

open my $fh, '<:mmap', 'words.txt' or die $!;
while(<$fh>) {
    chomp;
    unless ( $stopwords{$_} ) {
        $count{$_} ++;
    }
}
close $fh;

my $i=0;
for (sort {$count{$b} <=> $count{$a}} keys %count) {
    if ($i < 20) {
        print "$_ -> $count{$_}\n"
    } else {
       last; 
    }
    $i ++;
}

The common-read version for ruby:

stopwords = {}
File.open('stopwords.txt').each_line do |s|
  s.chomp!
  stopwords[s] = 1
end

count = Hash.new(0)
File.open('words.txt').each_line do |s|
  s.chomp!
  count[s] += 1 unless stopwords[s]
end

count.sort_by{|_,c| -c}.take(20).each do |s|
  puts "#{s[0]} -> #{s[1]}"
end

The mmap version for ruby:

require 'mmap'

stopwords = {}
mmap_s = Mmap.new('stopwords.txt')
mmap_s.advise(Mmap::MADV_SEQUENTIAL)
mmap_s.each_line do |s|
  s.chomp!
  stopwords[s] = 1
end

count = Hash.new(0)
mmap_c = Mmap.new('words.txt')
mmap_c.advise(Mmap::MADV_SEQUENTIAL)
mmap_c.each_line do |s|
  s.chomp!
  count[s] += 1 unless stopwords[s]
end

count.sort_by{|_,c| -c}.take(20).each do |s|
  puts "#{s[0]} -> #{s[1]}"
end

The code body of ruby was optimized by Frank, thanks.

So, this is the comparison for perl (the first is the common version, the second is mmap version):

$ time perl perl-hash.pl 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m2.018s
user	0m2.003s
sys	0m0.012s

$ time perl perl-mmap.pl 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m1.905s
user	0m1.888s
sys	0m0.016s

And, this is the comparison for ruby (the first is the common version, the second is mmap version):

$ time ruby ruby-hash.rb 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m2.690s
user	0m2.660s
sys	0m0.028s

$ time ruby ruby-mmap.rb 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m2.695s
user	0m2.689s
sys	0m0.004s

I have run the above comparison for many times. The results were similar. My case shows there is no visible speed improvement by using mmap() for reading files.

The OS is ubuntu 18.04 x86_64, running on KVM VPS. There are 4GB dedicated ram, double AMD 7302 cores, 40GB ssd disk. Ruby version 2.5.1, Perl version 5.26.1.

Perl has its built-in mmap support. Ruby has mmap library installed by this way:

sudo apt install ruby-mmap2

Why this happens? I will continue to research with the problem.

Benchmark for Scala, Ruby and Perl

I know this benchmark is maybe meaningless. But I would like to give a simple comparison for run speed of Scala, Ruby and Perl.

To tell the results directly: for this job, Perl is the fastest, taking 1.9s. Ruby is the second fast, taking 3.0s. Scala script is the slowest, taking 4.0s.

Two input data used by the scripts can be downloaded form here:

words.txt.tgz (11MB)

stopwords.txt.tgz (4KB)

Here is the Scala script:

import scala.io.Source

val li = Source.fromFile("words.txt").getLines()
val set_sw = Source.fromFile("stopwords.txt").getLines().toSet
val hash = scala.collection.mutable.Map[String,Int]()

for (x <- li) {
    if ( ! set_sw.contains(x) ) {
      if (hash.contains(x)) hash(x) += 1 else hash(x) = 1
    }
}

val sorted = hash.toList.sortBy(-_._2)
sorted.take(20).foreach {println}

Here is the Ruby script:

stopwords = {}
File.open("stopwords.txt").each_line do |s|
  s.strip!
  stopwords[s] =1
end

count = {}
File.open("words.txt").each_line do |s|
  s.strip!
  if ! stopwords.has_key?(s)
    if count.has_key?(s) 
       count[s] += 1
    else
       count[s] = 1
    end
  end
end
      
z = count.sort {|a1,a2| a2[1]<=>a1[1]}
z.take(20).each do |s| puts "#{s[0]} -> #{s[1]}" end

Here is the Perl script:

use strict;

my %stopwords;

open HD,"stopwords.txt" or die $!;
while(<HD>) {
    chomp;
    $stopwords{$_} =1;
}
close HD;

my %count;

open HD,"words.txt" or die $!;
while(<HD>) {
    chomp;
    unless ( $stopwords{$_} ) {
        $count{$_} ++;
    }
}
close HD;

my $i=0;
for (sort {$count{$b} <=> $count{$a}} keys %count) {
    if ($i < 20) {
        print "$_ -> $count{$_}\n"
    } else {
       last; 
    }
    $i ++;
}

The basic idea of above scripts are the same. The difference is I use Set structure in Scala for keeping stopwords, but in Perl and Ruby I use Hash structure for stopwords.

And this is Scala’s run result:

$ time scala scala-set.sc 
(send,20987)
(message,17516)
(unsubscribe,15541)
(2021,15221)
(list,13017)
(mailing,12402)
(mail,11647)
(file,11133)
(flink,10114)
(email,9919)
(pm,9248)
(group,8865)
(problem,8853)
(code,8659)
(data,8657)
(2020,8398)
(received,8246)
(google,7921)
(discussion,7920)
(jan,7893)

real	0m4.096s
user	0m6.725s
sys	0m0.187s

This is Ruby’s run result:

$ time ruby ruby-hash.rb 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m3.062s
user	0m3.028s
sys	0m0.032s

The final is Perl’s run result:

$ time perl perl-hash.pl 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m1.924s
user	0m1.893s
sys	0m0.029s

I have run the above three scripts many times. Their results are similar.

Version for the languages:

$ ruby -v
ruby 2.5.1p57 (2018-03-29 revision 63029) [x86_64-linux-gnu]

$ perl -v
This is perl 5, version 26, subversion 1 (v5.26.1) built for x86_64-linux-gnu-thread-multi
(with 71 registered patches, see perl -V for more detail)

Copyright 1987-2017, Larry Wall

$ scala -version
Scala code runner version 2.13.7 -- Copyright 2002-2021, LAMP/EPFL and Lightbend, Inc.

The OS is ubuntu 18.04 for a KVM VPS. Hardware includes 4G ram, 40G ssd disk, double AMD 7302 processors.

I am surprised to see Perl has that fast speed among these three languages. Though I maybe have not written the best Ruby or Scala program for performance stuff, but this simple testing still shows Perl language has big performance advantages on the common text parsing jobs.

[updated 2022-01-29] Below is the updated content:

After I compiled the scala script, the running time becomes much shorter. So I was thinking the reason for the slow scala script above is the parser starts up too slow.

Scala script changed to this:

import scala.io.Source

object CountWords {
  def main(args: Array[String]):Unit = {

    val li = Source.fromFile("words.txt").getLines()
    val stopwords = Source.fromFile("stopwords.txt").getLines().toSet
    val hash = scala.collection.mutable.Map[String,Int]()

    for (x <- li) {
        if ( ! stopwords.contains(x) ) {
            if (hash.contains(x)) hash(x) += 1 else hash(x) = 1
        }
    }

    hash.toList
     .sortBy(-_._2)
     .take(20)
     .foreach {println}
  }
}

And compiled it with:

$ scalac CountWords.scala 

Here is the comparison of running speed to perl:

$ time scala CountWords
(send,21919)
(message,19347)
(unsubscribe,16617)
(2021,15344)
(list,14271)
(mailing,13098)
(file,12537)
(mail,12122)
(jan,12070)
(email,10701)
(flink,10249)
(pm,9940)
(code,9562)
(group,9547)
(problem,9536)
(data,9373)
(2022,8932)
(received,8760)
(return,8566)
(discussion,8441)

real	0m2.107s
user	0m2.979s
sys	0m0.142s

$ time perl perl-hash.pl 
send -> 21919
message -> 19347
unsubscribe -> 16617
2021 -> 15344
list -> 14271
mailing -> 13098
file -> 12537
mail -> 12122
jan -> 12070
email -> 10701
flink -> 10249
pm -> 9940
code -> 9562
group -> 9547
problem -> 9536
data -> 9373
2022 -> 8932
received -> 8760
return -> 8566
discussion -> 8441

real	0m2.418s
user	0m2.380s
sys	0m0.036s

Now, perl run with 2.4s, while scala run with 2.1s, the latter is faster.

For this simple comparison, the running speed is finally with this order:

compiled scala > perl > ruby > scala script