Tag Archives: Spark

Time difference on handling the file larger than memory

Apache Spark can deal with big file well even if file size is much larger than the memory.

For instance, I have the file “rate.csv” which is 3.2GB in size:

$ du -h rate.csv 
3.2G	rate.csv

But my VPS has only 2GB total memory:

$ free -m
              total        used        free      shared  buff/cache   available
Mem:           1992        1652         206           4         134         197
Swap:          1023         521         502

Even though Spark can read and handle this file well. For example, I load the file into spark and issue a SQL query via its Dataframe API.

// define the schema
scala> val schema="uid STRING,item STRING,rate FLOAT,time INT"
val schema: String = uid STRING,item STRING,rate FLOAT,time INT

// load csv file into spark as a dataframe
scala> val df = spark.read.format("csv").schema(schema).load("skydrive/rate.csv")
val df: org.apache.spark.sql.DataFrame = [uid: string, item: string ... 2 more fields]

// print the schema
scala> df.printSchema()
root
 |-- uid: string (nullable = true)
 |-- item: string (nullable = true)
 |-- rate: float (nullable = true)
 |-- time: integer (nullable = true)

// get the items count
scala> df.count()
val res1: Long = 82677131   

// get the partitions number
scala> df.rdd.getNumPartitions
val res3: Int = 26

// issue a query by grouping item and aggregating rate
scala> df.groupBy("item").agg(avg("rate").alias("avg_rate")).orderBy(desc("avg_rate")).show()
+----------+--------+                                                           
|      item|avg_rate|
+----------+--------+
|0001061100|     5.0|
|0001543849|     5.0|
|0001061127|     5.0|
|0001019880|     5.0|
|0001062395|     5.0|
|0000143502|     5.0|
|000014357X|     5.0|
|0001527665|     5.0|
|000107461X|     5.0|
|0000191639|     5.0|
|0001127748|     5.0|
|0000791156|     5.0|
|0001203088|     5.0|
|0001053744|     5.0|
|0001360183|     5.0|
|0001042335|     5.0|
|0001374400|     5.0|
|0001046810|     5.0|
|0001380877|     5.0|
|0001050230|     5.0|
+----------+--------+
only showing top 20 rows

As you see above, the query run pretty well. Spark didn’t throw out any memory overflow error. The total query time is: 1 min 46 seconds.

Apache Drill is quite memory sensitive, even so it can make the job done under this limited memory. For example:

> select columns[1] as `item`, avg(cast(columns[2] as Float)) as `avg_rate`
> from `rate.csv` group by `item` order by `avg_rate` desc limit 20;
+------------+----------+
|    item    | avg_rate |
+------------+----------+
| 0004133900 | 5.0      |
| 0005019222 | 5.0      |
| 0002557991 | 5.0      |
| 0002214830 | 5.0      |
| 0002720213 | 5.0      |
| 0002326817 | 5.0      |
| 0002254352 | 5.0      |
| 000225316X | 5.0      |
| 0002199203 | 5.0      |
| 0001712705 | 5.0      |
| 0002617501 | 5.0      |
| 0002113848 | 5.0      |
| 0001840266 | 5.0      |
| 0002117576 | 5.0      |
| 0001360183 | 5.0      |
| 0000202010 | 5.0      |
| 0002179083 | 5.0      |
| 0001954792 | 5.0      |
| 0002005468 | 5.0      |
| 0005097231 | 5.0      |
+------------+----------+
20 rows selected (119.311 seconds)

As you see Drill run well on this scenario too. The total query time is: 1 min 59 seconds.

Then I gave Mysql a try. I know for limited memory Mysql handles this scale of data very slow. Though the VM has only 2GB memory, I had to adjust two important memory arguments for Mysql.

mysql> show variables like 'innodb_buffer_pool_size';
+-------------------------+-----------+
| Variable_name           | Value     |
+-------------------------+-----------+
| innodb_buffer_pool_size | 402653184 |
+-------------------------+-----------+
1 row in set (0.01 sec)

mysql> show variables like 'key_buffer_size';
+-----------------+-----------+
| Variable_name   | Value     |
+-----------------+-----------+
| key_buffer_size | 134217728 |
+-----------------+-----------+
1 row in set (0.02 sec)

The tuning to Mysql includes ‘innodb_buffer_pool_size’ was set to 384MB, ‘key_buffer_size’ was set to 128MB.

After then I created the table, and loaded the data into this table by using the statements below.

mysql> create table rate(userId varchar(24), itemId varchar(24),rating float, time int);
Query OK, 0 rows affected (0.11 sec)

mysql> load data local infile 'skydrive/rate.csv' into table rate FIELDS TERMINATED BY ',';
Query OK, 82677131 rows affected (21 min 21.25 sec)
Records: 82677131  Deleted: 0  Skipped: 0  Warnings: 0

Loading the data takes time 21 min 21 sec. And I need to add an index to the queried column, in this case it’s itemId.

mysql> create index itemIndex on rate(itemId);
Query OK, 0 rows affected (9 min 22.60 sec)
Records: 0  Duplicates: 0  Warnings: 0

Finally I made the query in Mysql:

mysql> select itemId, avg(rating) as avg_rate from rate group by itemId order by avg_rate desc limit 20;
+------------+----------+
| itemId     | avg_rate |
+------------+----------+
| 0000191639 |        5 |
| 0000143529 |        5 |
| 0000143502 |        5 |
| 0000202010 |        5 |
| 0000053155 |        5 |
| 0001019880 |        5 |
| 0001018043 |        5 |
| 000014357X |        5 |
| 000077135X |        5 |
| 0001026038 |        5 |
| 0000401048 |        5 |
| 0000000078 |        5 |
| 0000230022 |        5 |
| 0000913154 |        5 |
| 0000143588 |        5 |
| 0000466603 |        5 |
| 0001024388 |        5 |
| 0001006657 |        5 |
| 0000791156 |        5 |
| 0000174076 |        5 |
+------------+----------+
20 rows in set (16 min 29.75 sec)

Mysql’s total query time is: 16 min 30 seconds.

For comparison to Mysql, I installed Postgresql in this VM. It’s the default installation from Ubuntu apt source, with the basic tuning as below.

postgres=# show shared_buffers;
 shared_buffers 
----------------
 384MB
(1 row)

postgres=# show work_mem;
 work_mem 
----------
 16MB
(1 row)

I created the same table as in Mysql and setup the index to the queried column.

bigdata=# \d rate
                       Table "public.rate"
 Column |         Type          | Collation | Nullable | Default 
--------+-----------------------+-----------+----------+---------
 userid | character varying(24) |           |          | 
 itemid | character varying(24) |           |          | 
 rating | double precision      |           |          | 
 time   | integer               |           |          | 
Indexes:
    "itemindex" btree (itemid)

Then I loaded the data from csv to the table:

bigdata=# \copy rate FROM '/home/pyh/skydrive/rate.csv' DELIMITER ',' CSV;
COPY 82677131

And the same query was issued to Postgresql. As you see the command top’s output following, Postgresql uses 3 threads in the progress of shuffling. This consumes out all the available memory.

0945 postgres  20   0 1311264 640036   3336 D  20.9 31.4   0:20.64 postgres                                                                      
30944 postgres  20   0 1335840 637304   3356 D  18.6 31.2   0:20.86 postgres                                                                      
30330 postgres  20   0 1383316 558332   3272 R  49.3 27.4   4:45.60 postgres 

But finally Postgresql finished the job as well:

bigdata=# select itemId, avg(rating) as avg_rate from rate group by itemId order by avg_rate desc limit 20;
   itemid   | avg_rate 
------------+----------
 0001006657 |        5
 0001024388 |        5
 0000913154 |        5
 0000791156 |        5
 0001019880 |        5
 0000466603 |        5
 000077135X |        5
 0000191639 |        5
 0000174076 |        5
 000014357X |        5
 0001018043 |        5
 0000202010 |        5
 0000143529 |        5
 0000230022 |        5
 0000000078 |        5
 0000401048 |        5
 0000053155 |        5
 0000143588 |        5
 0000143502 |        5
 0001026038 |        5
(20 rows)

Time: 156100.056 ms (02:36.100)

The total query time in Postgresql is: 2 min 36 seconds.

So, for this limited memory and this scale of dataset, the specific big data apps have much more efficiency than the traditional Mysql. Though Postgresql is one of the RDBMs, it’s much faster than Mysql too. Because it uses multi-threading for the shuffling progress, while the drawback is it consumes the most memory than all the three others.

All queries above were run for several times, the minimum values were taken as the final results.

The query time comparison:

Data AppSparkDrillMysqlPostgresql
Query Time106 sec119 sec990 sec156 sec

Here are the App’s versions:

// Mysql version
mysql> select version();
+-------------------------+
| version()               |
+-------------------------+
| 5.7.37-0ubuntu0.18.04.1 |
+-------------------------+
1 row in set (0.01 sec)

// Postgresql version
bigdata=# select version();
                                                               version                                                                
--------------------------------------------------------------------------------------------------------------------------------------
 PostgreSQL 10.19 (Ubuntu 10.19-0ubuntu0.18.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 7.5.0-3ubuntu1~18.04) 7.5.0, 64-bit
(1 row)

// Drill version
apache drill (dfs.skydrive)> select version from sys.version;
+---------+
| version |
+---------+
| 1.20.0  |
+---------+
1 row selected (0.641 seconds)

// Spark version
version 3.2.1        
Using Scala version 2.13.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_321)

The VM I was using for this test has 2G dedicated memory, 2 Intel cores, 50G NVME disk. It’s good to see all Apps run well under this limited memory. But Mysql is really slow comparing to the others.

programming with Spark structure streaming

In my last blog I played programming with Spark streaming, which is called DStream.

This article I program with Spark structure streaming, which is a new way for streaming handling.

What’s the difference between DStream and structure streaming? The former is based on Spark’s traditional RDD API, while the latter is based on Spark’s SQL engine.

From my thought, if you want more transforms to the original data such as data cleaning, DStream is more useful. Since RDD provides a lot of higher order functions for data transform, such as map, reduce etc.

If you want more aggregate functions, such as statistics and reports, structure streaming is more convenient. Since SQL engine is powerful for implementation of this kind of jobs, such as group, count, sort etc.

But, structure streaming is much easier to use than DStream. It’s higher optimized. You can just program with structure streaming as the way of regular spark session.

To play with the demo, first we need a socket server which prints the data continuously to remote socket. And a streaming client will receive the data from the socket.

The socket server has been given in my last blog. Please start up it in a separate terminal.

Then, I follow the official demo to get the scala code below to handle the streaming.

import org.apache.spark.sql.SparkSession

object Myjob {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()

    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count
    val wordCounts = words.groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

How to package and deploy the app? Please see my last blog, they are almost the same.

The final step is to run the structure streaming from Spark:

$ spark-submit --class "Myjob" --master local[2] target/scala-2.12/my-strucutre-streaming-job_2.12-1.0.jar localhost 9999

We can see the continuous output in the screen for words counting:

-------------------------------------------
Batch: 190
-------------------------------------------
+-------+-----+
|  value|count|
+-------+-----+
| orange|  360|
|  apple|  372|
|  mango|  380|
| tomato|  362|
|apricot|  357|
| cherry|  366|
| banana|  328|
|  lemon|  341|
|   plum|  365|
|  peach|  372|
+-------+-----+


-------------------------------------------
Batch: 191
-------------------------------------------
+-------+-----+
|  value|count|
+-------+-----+
| orange|  362|
|  apple|  378|
|  mango|  382|
| tomato|  363|
|apricot|  358|
| cherry|  368|
| banana|  332|
|  lemon|  342|
|   plum|  367|
|  peach|  377|
+-------+-----+

-------------------------------------------
Batch: 192
-------------------------------------------
+-------+-----+
|  value|count|
+-------+-----+
| orange|  363|
|  apple|  381|
|  mango|  386|
| tomato|  364|
|apricot|  361|
| cherry|  374|
| banana|  333|
|  lemon|  344|
|   plum|  369|
|  peach|  378|
+-------+-----+

The application environment:

  • OS: ubuntu 18.04 x86_64, a KVM instance
  • Ram: 4GB dedicated
  • CPU: double AMD 7302 processor
  • Spark: 3.2.0
  • Scala: 2.12.15

programming with Spark streaming

Spark can read streaming from many sources, including file system, object storage, socket, message queue etc. Here I run the app to receive streaming from a socket server, handle the streaming and print out the results.

First I have to prepare a socket server. I wrote one with Perl, the content is following:

package MyPackage;
 
use strict;
use base qw(Net::Server::PreFork);
 
MyPackage->run(host=>'localhost',port=>9999);
 
# over-ride the default echo handler
sub process_request {
    my $self = shift;
    my @fruits=qw(apple orange tomato plum cherry peach apricot banana mango lemon);

    while(1) {
        my $fruit = $fruits[int(rand(scalar @fruits))];
        print "$fruit\r\n" or return;
        select(undef, undef, undef, 0.25);
    }
}
 
1;

Please open a terminal and run this script. This script run as a socket server, listening on port 9999 for accepting connections. Once the connect has been established, it prints the content to the remote socket, one line one word.

Here is the scala program who reads the streaming from socket and handles it.

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.storage.StorageLevel


object Myjob {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    // Create the context with a 15 second batch size
    val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(15))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (e.g. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))

    // Convert RDDs of the words DStream to DataFrame and run SQL query
    words.foreachRDD { (rdd: RDD[String], time: Time) =>

      // Get the singleton instance of SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }

    ssc.start()
    ssc.awaitTermination()
  }
}


/** Case class for converting RDD to DataFrame */
case class Record(word: String)


/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

You can see the source code from spark’s official sample.

How to deploy it? First we need Apache spark running on localhost.

Then we start a new project by creating a project root dir. In the project dir, the dir tree looks as this:

./build.sbt
./src/main/scala/Myjob.scala

“build.sbt” is the configuration file for project building, whose content is as following:

name := "My Streaming Job"

version := "1.0"

scalaVersion := "2.12.15"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.2.0"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.2.0" % "provided"

Here “name” specifies the project name. “version” specifies the project version. “scalaVersion” specifies scala’s version. please notice for spark 3.2.x we can use scala 2.12.x only. “libraryDependencies” is the project’s dependencies packages.

In the project dir, we run the following command to build the project:

$ sbt package
[info] welcome to sbt 1.6.1 (Ubuntu Java 11.0.11)
[info] loading project definition from /home/pyh/ops/spark/job6/project
[info] loading settings for project job6 from build.sbt ...
[info] set current project to My Streaming Job (in build file:/home/pyh/ops/spark/job6/)
[success] Total time: 3 s, completed Feb 7, 2022, 3:56:56 PM

After the building you will find the project dir becomes:

$ ls
build.sbt  project  src  target

There are new dir “project” and “target” added. And the compiled class is located at:

target/scala-2.12/my-streaming-job_2.12-1.0.jar

Finally we run the project by submitting the job to local spark server:

$ spark-submit --class "Myjob" --master local[2] target/scala-2.12/my-streaming-job_2.12-1.0.jar localhost 9999

Since I have only 2 cores in the VM, here I specify local[2] to use max 2 threads.

The results are printed out to the screen periodically:

========= 1644221235000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
| banana|    6|
|  lemon|    5|
|  mango|    9|
|  apple|    6|
|   plum|    5|
| tomato|   12|
| cherry|    8|
|  peach|    2|
|apricot|    5|
| orange|    2|
+-------+-----+

========= 1644221250000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
|  peach|    9|
|   plum|    8|
| banana|    7|
|  apple|    5|
|apricot|    6|
| tomato|    5|
| cherry|    6|
|  lemon|    4|
|  mango|    8|
| orange|    2|
+-------+-----+

========= 1644221265000 ms =========
+-------+-----+
|   word|total|
+-------+-----+
|  apple|    2|
|  mango|    4|
|  lemon|    4|
|  peach|    9|
| orange|    6|
|apricot|    9|
| cherry|   10|
| banana|    4|
|   plum|    6|
| tomato|    6|
+-------+-----+

Until now all the job has been done.

The application environment:

  • OS: ubuntu 18.04 x86_64, a KVM instance
  • Ram: 4GB dedicated
  • CPU: double AMD 7302 processor
  • Spark: 3.2.0
  • Scala: 2.12.15

computing performance comparison for words statistics

Here I provided a dataset for words appearing in the public tech lists.

The file is a text file, one line with one word. The total lines and size:

$ wc -l words.txt 
283218160 words.txt

$ du -h words.txt 
1.5G	words.txt

You can download this file from here (tgz compressed).

I just did a test to count the words in this file by grouping and sorting, with three methods: spark RDD API, spark dataframe API, and the scala program.

This is the syntax of spark RDD API in pyspark:

>>> rdd = sc.textFile("/tmp/words.txt")
>>> rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False).take(20)

This is the syntax of spark dataframe API in pyspark:

>>> df = spark.read.text("/tmp/words.txt")
>>> df.select("*").groupBy("value").count().orderBy("count",ascending=False).show()

This is the scala program:

import scala.io.Source

object BigWords extends App {

  val file = Source.fromFile("/tmp/words.txt").getLines()
  val hash = scala.collection.mutable.Map[String,Int]()

  for (x <- file) {
    if (hash.contains(x)) hash(x) += 1 else hash(x) = 1
  }

  hash.toList.sortBy(-_._2).take(20).foreach(println)
}

I compiled the scala program and run it:

$ scalac bigwords.scala 
$ time scala BigWords
(the,14218320)
(to,11045040)
(a,5677600)
(and,5205760)
(is,4972080)
(i,4447440)
(in,4228200)
(of,3982280)
(on,3899320)
(for,3760800)
(this,3684640)
(you,3485360)
(at,3238480)
(that,3230920)
(it,2925320)
(with,2181160)
(be,2172400)
(not,2124320)
(from,2097120)
(if,1993560)

real	0m49.031s
user	0m50.390s
sys	0m0.734s

As you see it takes 49s to finish the job.

While spark’s dataframe API is a bit slower, it takes 56s to finish the job (timer from my iOS stopwatch app):

+-----+--------+                                                                
|value|   count|
+-----+--------+
|  the|14218320|
|   to|11045040|
|    a| 5677600|
|  and| 5205760|
|   is| 4972080|
|    i| 4447440|
|   in| 4228200|
|   of| 3982280|
|   on| 3899320|
|  for| 3760800|
| this| 3684640|
|  you| 3485360|
|   at| 3238480|
| that| 3230920|
|   it| 2925320|
| with| 2181160|
|   be| 2172400|
|  not| 2124320|
| from| 2097120|
|   if| 1993560|
+-----+--------+
only showing top 20 rows

But, spark RDD API is quite slow in my use case. It takes 7m15s to finish the job:

[('the', 14218320), ('to', 11045040), ('a', 5677600), ('and', 5205760), ('is', 4972080), ('i', 4447440), ('in', 4228200), ('of', 3982280), ('on', 3899320), ('for', 3760800), ('this', 3684640), ('you', 3485360), ('at', 3238480), ('that', 3230920), ('it', 2925320), ('with', 2181160), ('be', 2172400), ('not', 2124320), ('from', 2097120), ('if', 1993560)]

I doubt it’s due to the slow python parser, so I re-run the RDD API with spark’s scala shell. The syntax and results as below:

scala> val rdd = sc.textFile("/tmp/words.txt")

scala> rdd.map{(_,1)}.reduceByKey{_ + _}.sortBy{-_._2}.take(20)

res1: Array[(String, Int)] = Array((the,14218320), (to,11045040), (a,5677600), (and,5205760), (is,4972080), (i,4447440), (in,4228200), (of,3982280), (on,3899320), (for,3760800), (this,3684640), (you,3485360), (at,3238480), (that,3230920), (it,2925320), (with,2181160), (be,2172400), (not,2124320), (from,2097120), (if,1993560))

This time it takes 1m31s to finish the job.

The results summary in a table:

programtime
scala program49s
pyspark dataframe56s
scala RDD1m31s
pyspark RDD7m15s

I am so surprised pyspark’s RDD API is too slow as this. I want to give a research on the case.

Application environment:

  • OS: Ubuntu 18.04.6 LTS, a KVM instance
  • Spark version: 3.2.0 (with scala 2.12.15, and python 3.6.9)
  • Scala version: 2.13.7
  • CPU: double AMD EPYC 7302
  • Ram: 4GB dedicated
  • Disk: 40GB SSD