Tag Archives: DevOps

groupMapReduce in Scala, Ruby and Spark

Given the following dataset:

$ head -10 fruits.txt 
peach	1
apricot	2
apple	3
haw	1
persimmon	9
orange	2
litchi	9
orange	5
jackfruit	8
crab apple	0

We can group and aggregate to count the fruit’s number by their names.

In Scala it’s quite easy because this language has the built-in higher order functions for calculation. The implementation in Scala REPL:

scala> import scala.io.Source
import scala.io.Source

scala> val fd = Source.fromFile("tmp/fruits.txt").getLines().toList

scala> val regex = """(.+)\s+(\d+)""".r
val regex: scala.util.matching.Regex = (.+)\s+(\d+)

scala> fd.map{ case regex(x,y) => (x,y.toInt) }.groupMapReduce(_._1)(_._2)(_+_).toList.sortBy(-_._2).foreach(println)
(blueberry,166)
(cumquat,145)
(lemon,145)
(areca nut,139)
(haw,137)
(banana,134)
(greengage,134)
(raspberry,133)
(longan,132)
(nectarine,128)
(flat peach,125)
(tangerine,122)
(blackberry,121)
(litchi,120)
(watermelon,120)
(peach,117)
(bitter orange,117)
(strawberry,116)
(pawpaw papaya,113)
(persimmon,108)
(orange,104)
(tomato,102)
(mango,102)
(plum,99)
(wax apple,98)
(waxberry red bayberry,95)
(grape,92)
(jackfruit,92)
(pineapple,91)
(pear,89)
(loquat,89)
(coconut,86)
(apple,86)
(pomegranate,84)
(shaddock pomelo,84)
(musk melon,84)
(guava,82)
(honey peach,82)
(apricot,81)
(starfruit,80)
(sugar cane,79)
(crab apple,75)
(cherry,73)

Ruby also implements this job well by using its Array methods. Though Ruby has no built-in groupMapReduce method, but I have implemented one for which you can get from this repo. The code below is run in Ruby’s interactive shell (irb).

irb(main):001:0> require './bitfox'
=> true
irb(main):002:0> file = File.readlines('tmp/fruits.txt')

irb(main):011:-> file.map {|s| (x,y) = s.chomp.split("\t"); [x,y.to_i] }.reduceByKey {|x,y| x+y}.sort_by{ |s| -s[1] }
=> 
[["blueberry", 166],                                                                                                                               
 ["cumquat", 145],                                                                                                                                 
 ["lemon", 145],                                                                                                                                   
 ["areca nut", 139],                                                                                                                               
 ["haw", 137],                                                                                                                                     
 ["greengage", 134],                                                                                                                               
 ["banana", 134],                                                                                                                                  
 ["raspberry", 133],                                                                                                                               
 ["longan", 132],                                                                                                                                  
 ["nectarine", 128],                                                                                                                               
 ["flat peach", 125],                                                                                                                              
 ["tangerine", 122],                                                                                                                               
 ["blackberry", 121],                                                                                                                              
 ["litchi", 120],                                                                                                                                  
 ["watermelon", 120],
 ["peach", 117],
 ["bitter orange", 117],
 ["strawberry", 116],
 ["pawpaw papaya", 113],
 ["persimmon", 108],
 ["orange", 104],
 ["tomato", 102],
 ["mango", 102],
 ["plum", 99],
 ["wax apple", 98],
 ["waxberry red bayberry", 95],
 ["jackfruit", 92],
 ["grape", 92],
 ["pineapple", 91],
 ["pear", 89],
 ["loquat", 89],
 ["apple", 86],
 ["coconut", 86],
 ["pomegranate", 84],
 ["shaddock pomelo", 84],
 ["musk melon", 84],
 ["honey peach", 82],
 ["guava", 82],
 ["apricot", 81],
 ["starfruit", 80],
 ["sugar cane", 79],
 ["crab apple", 75],
 ["cherry", 73]]

In my implementation above, Ruby uses the method reduceByKey instead of groupMapReduce in Scala. That’s to say, Ruby and Spark use the method name “reduceBykey”, while Scala uses the name “groupMapReduce”. All three methods have the same effect.

The last is Spark’s implementation. It’s quite simple to run this statistics in spark-shell.

scala> val rdd = sc.textFile("tmp/fruits.txt")
rdd: org.apache.spark.rdd.RDD[String] = tmp/fruits.txt MapPartitionsRDD[1] at textFile at <console>:23

scala> val regex = """(.+)\s+(\d+)""".r
regex: scala.util.matching.Regex = (.+)\s+(\d+)

scala> rdd.map{ case regex(x,y) => (x,y.toInt) }.reduceByKey( _+_ ).sortBy(-_._2).foreach(println)
(blueberry,166)
(cumquat,145)
(lemon,145)
(areca nut,139)
(haw,137)
(banana,134)
(greengage,134)
(raspberry,133)
(longan,132)
(nectarine,128)
(flat peach,125)
(tangerine,122)
(blackberry,121)
(watermelon,120)
(litchi,120)
(bitter orange,117)
(peach,117)
(strawberry,116)
(pawpaw papaya,113)
(persimmon,108)
(orange,104)
(tomato,102)
(mango,102)
(plum,99)
(wax apple,98)
(waxberry red bayberry,95)
(grape,92)
(jackfruit,92)
(pineapple,91)
(pear,89)
(loquat,89)
(apple,86)
(coconut,86)
(musk melon,84)
(shaddock pomelo,84)
(pomegranate,84)
(guava,82)
(honey peach,82)
(apricot,81)
(starfruit,80)
(sugar cane,79)
(crab apple,75)
(cherry,73)

It’s not hard to understand how groupMapReduce is implemented. If you have checked my code in Github, you will find the code is little.

    def reduceByKey
        if block_given?
            group_by {|x| x[0]}.map {|x,y| y.reduce {|x,y| [ x[0], yield(x[1],y[1]) ]}}
        else
            raise "no block given"
        end
    end

Perl and Python can implement this method as well. But they are not that convenient. For data programming, Scala and Spark are really the best. If no considering the performance, Ruby is fun too.

Time difference on handling the file larger than memory

Apache Spark can deal with big file well even if file size is much larger than the memory.

For instance, I have the file “rate.csv” which is 3.2GB in size:

$ du -h rate.csv 
3.2G	rate.csv

But my VPS has only 2GB total memory:

$ free -m
              total        used        free      shared  buff/cache   available
Mem:           1992        1652         206           4         134         197
Swap:          1023         521         502

Even though Spark can read and handle this file well. For example, I load the file into spark and issue a SQL query via its Dataframe API.

// define the schema
scala> val schema="uid STRING,item STRING,rate FLOAT,time INT"
val schema: String = uid STRING,item STRING,rate FLOAT,time INT

// load csv file into spark as a dataframe
scala> val df = spark.read.format("csv").schema(schema).load("skydrive/rate.csv")
val df: org.apache.spark.sql.DataFrame = [uid: string, item: string ... 2 more fields]

// print the schema
scala> df.printSchema()
root
 |-- uid: string (nullable = true)
 |-- item: string (nullable = true)
 |-- rate: float (nullable = true)
 |-- time: integer (nullable = true)

// get the items count
scala> df.count()
val res1: Long = 82677131   

// get the partitions number
scala> df.rdd.getNumPartitions
val res3: Int = 26

// issue a query by grouping item and aggregating rate
scala> df.groupBy("item").agg(avg("rate").alias("avg_rate")).orderBy(desc("avg_rate")).show()
+----------+--------+                                                           
|      item|avg_rate|
+----------+--------+
|0001061100|     5.0|
|0001543849|     5.0|
|0001061127|     5.0|
|0001019880|     5.0|
|0001062395|     5.0|
|0000143502|     5.0|
|000014357X|     5.0|
|0001527665|     5.0|
|000107461X|     5.0|
|0000191639|     5.0|
|0001127748|     5.0|
|0000791156|     5.0|
|0001203088|     5.0|
|0001053744|     5.0|
|0001360183|     5.0|
|0001042335|     5.0|
|0001374400|     5.0|
|0001046810|     5.0|
|0001380877|     5.0|
|0001050230|     5.0|
+----------+--------+
only showing top 20 rows

As you see above, the query run pretty well. Spark didn’t throw out any memory overflow error. The total query time is: 1 min 46 seconds.

Apache Drill is quite memory sensitive, even so it can make the job done under this limited memory. For example:

> select columns[1] as `item`, avg(cast(columns[2] as Float)) as `avg_rate`
> from `rate.csv` group by `item` order by `avg_rate` desc limit 20;
+------------+----------+
|    item    | avg_rate |
+------------+----------+
| 0004133900 | 5.0      |
| 0005019222 | 5.0      |
| 0002557991 | 5.0      |
| 0002214830 | 5.0      |
| 0002720213 | 5.0      |
| 0002326817 | 5.0      |
| 0002254352 | 5.0      |
| 000225316X | 5.0      |
| 0002199203 | 5.0      |
| 0001712705 | 5.0      |
| 0002617501 | 5.0      |
| 0002113848 | 5.0      |
| 0001840266 | 5.0      |
| 0002117576 | 5.0      |
| 0001360183 | 5.0      |
| 0000202010 | 5.0      |
| 0002179083 | 5.0      |
| 0001954792 | 5.0      |
| 0002005468 | 5.0      |
| 0005097231 | 5.0      |
+------------+----------+
20 rows selected (119.311 seconds)

As you see Drill run well on this scenario too. The total query time is: 1 min 59 seconds.

Then I gave Mysql a try. I know for limited memory Mysql handles this scale of data very slow. Though the VM has only 2GB memory, I had to adjust two important memory arguments for Mysql.

mysql> show variables like 'innodb_buffer_pool_size';
+-------------------------+-----------+
| Variable_name           | Value     |
+-------------------------+-----------+
| innodb_buffer_pool_size | 402653184 |
+-------------------------+-----------+
1 row in set (0.01 sec)

mysql> show variables like 'key_buffer_size';
+-----------------+-----------+
| Variable_name   | Value     |
+-----------------+-----------+
| key_buffer_size | 134217728 |
+-----------------+-----------+
1 row in set (0.02 sec)

The tuning to Mysql includes ‘innodb_buffer_pool_size’ was set to 384MB, ‘key_buffer_size’ was set to 128MB.

After then I created the table, and loaded the data into this table by using the statements below.

mysql> create table rate(userId varchar(24), itemId varchar(24),rating float, time int);
Query OK, 0 rows affected (0.11 sec)

mysql> load data local infile 'skydrive/rate.csv' into table rate FIELDS TERMINATED BY ',';
Query OK, 82677131 rows affected (21 min 21.25 sec)
Records: 82677131  Deleted: 0  Skipped: 0  Warnings: 0

Loading the data takes time 21 min 21 sec. And I need to add an index to the queried column, in this case it’s itemId.

mysql> create index itemIndex on rate(itemId);
Query OK, 0 rows affected (9 min 22.60 sec)
Records: 0  Duplicates: 0  Warnings: 0

Finally I made the query in Mysql:

mysql> select itemId, avg(rating) as avg_rate from rate group by itemId order by avg_rate desc limit 20;
+------------+----------+
| itemId     | avg_rate |
+------------+----------+
| 0000191639 |        5 |
| 0000143529 |        5 |
| 0000143502 |        5 |
| 0000202010 |        5 |
| 0000053155 |        5 |
| 0001019880 |        5 |
| 0001018043 |        5 |
| 000014357X |        5 |
| 000077135X |        5 |
| 0001026038 |        5 |
| 0000401048 |        5 |
| 0000000078 |        5 |
| 0000230022 |        5 |
| 0000913154 |        5 |
| 0000143588 |        5 |
| 0000466603 |        5 |
| 0001024388 |        5 |
| 0001006657 |        5 |
| 0000791156 |        5 |
| 0000174076 |        5 |
+------------+----------+
20 rows in set (16 min 29.75 sec)

Mysql’s total query time is: 16 min 30 seconds.

For comparison to Mysql, I installed Postgresql in this VM. It’s the default installation from Ubuntu apt source, with the basic tuning as below.

postgres=# show shared_buffers;
 shared_buffers 
----------------
 384MB
(1 row)

postgres=# show work_mem;
 work_mem 
----------
 16MB
(1 row)

I created the same table as in Mysql and setup the index to the queried column.

bigdata=# \d rate
                       Table "public.rate"
 Column |         Type          | Collation | Nullable | Default 
--------+-----------------------+-----------+----------+---------
 userid | character varying(24) |           |          | 
 itemid | character varying(24) |           |          | 
 rating | double precision      |           |          | 
 time   | integer               |           |          | 
Indexes:
    "itemindex" btree (itemid)

Then I loaded the data from csv to the table:

bigdata=# \copy rate FROM '/home/pyh/skydrive/rate.csv' DELIMITER ',' CSV;
COPY 82677131

And the same query was issued to Postgresql. As you see the command top’s output following, Postgresql uses 3 threads in the progress of shuffling. This consumes out all the available memory.

0945 postgres  20   0 1311264 640036   3336 D  20.9 31.4   0:20.64 postgres                                                                      
30944 postgres  20   0 1335840 637304   3356 D  18.6 31.2   0:20.86 postgres                                                                      
30330 postgres  20   0 1383316 558332   3272 R  49.3 27.4   4:45.60 postgres 

But finally Postgresql finished the job as well:

bigdata=# select itemId, avg(rating) as avg_rate from rate group by itemId order by avg_rate desc limit 20;
   itemid   | avg_rate 
------------+----------
 0001006657 |        5
 0001024388 |        5
 0000913154 |        5
 0000791156 |        5
 0001019880 |        5
 0000466603 |        5
 000077135X |        5
 0000191639 |        5
 0000174076 |        5
 000014357X |        5
 0001018043 |        5
 0000202010 |        5
 0000143529 |        5
 0000230022 |        5
 0000000078 |        5
 0000401048 |        5
 0000053155 |        5
 0000143588 |        5
 0000143502 |        5
 0001026038 |        5
(20 rows)

Time: 156100.056 ms (02:36.100)

The total query time in Postgresql is: 2 min 36 seconds.

So, for this limited memory and this scale of dataset, the specific big data apps have much more efficiency than the traditional Mysql. Though Postgresql is one of the RDBMs, it’s much faster than Mysql too. Because it uses multi-threading for the shuffling progress, while the drawback is it consumes the most memory than all the three others.

All queries above were run for several times, the minimum values were taken as the final results.

The query time comparison:

Data AppSparkDrillMysqlPostgresql
Query Time106 sec119 sec990 sec156 sec

Here are the App’s versions:

// Mysql version
mysql> select version();
+-------------------------+
| version()               |
+-------------------------+
| 5.7.37-0ubuntu0.18.04.1 |
+-------------------------+
1 row in set (0.01 sec)

// Postgresql version
bigdata=# select version();
                                                               version                                                                
--------------------------------------------------------------------------------------------------------------------------------------
 PostgreSQL 10.19 (Ubuntu 10.19-0ubuntu0.18.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 7.5.0-3ubuntu1~18.04) 7.5.0, 64-bit
(1 row)

// Drill version
apache drill (dfs.skydrive)> select version from sys.version;
+---------+
| version |
+---------+
| 1.20.0  |
+---------+
1 row selected (0.641 seconds)

// Spark version
version 3.2.1        
Using Scala version 2.13.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_321)

The VM I was using for this test has 2G dedicated memory, 2 Intel cores, 50G NVME disk. It’s good to see all Apps run well under this limited memory. But Mysql is really slow comparing to the others.

A simple comparison for three SQL engines

I have a data from our production environment. The data is just words list, which contains 11922695 items.

I load the data into Mysql, Spark and Drill for a simple test against their query performance.

The test environment:

  • OS: Ubuntu 18.04 Linux x86_64, KVM instance
  • Hardware specs: 2 AMD cores, 4GB memory, 40GB NVME disk
  • Java: Openjdk version 11.0.13
  • Mysql: version 5.7.36, default installation, only one change by setting key_buffer_size to 256MB
  • Spark: version 3.2.0, deployed as local mode
  • Drill: version 1.20, run as drill-embedded

I load the words into the above databases first.

In spark:

scala> val list = sc.textFile("words.txt").toDF("word")
list: org.apache.spark.sql.DataFrame = [word: string]

In mysql:

mysql> create table words (word varchar(32));
Query OK, 0 rows affected (0.01 sec)

mysql> load data local infile './words.txt' into table words;
Query OK, 11922695 rows affected (29.88 sec)

In drill I need to rename the file to a csv, then query it directly like:

apache drill (dfs.pyh)> select * from `words.csv` limit 2;
+------+
| WORD |
+------+
| on   |
| jan  |
+------+
2 rows selected (0.235 seconds)

Hence I can make a simple SQL query to the three data objects. The purpose is to group the words and count their numbers.

First query in spark:

scala> list.groupBy("word").count.orderBy(desc("count")).show(20)
+----+------+                                                                   
|word| count|
+----+------+
| the|597563|
|  to|466569|
|   a|228042|
|  is|223282|
| and|215099|
|  in|176847|
|   i|174579|
| for|165258|
|  on|164483|
|  of|161266|
|this|159244|
| you|128854|
|that|126949|
|  at|126562|
|  it|117224|
|  be| 89777|
|from| 87112|
|with| 86998|
| not| 85245|
|  if| 82798|
+----+------+
only showing top 20 rows

Spark doesn’t show its query time. I got the time roughly by iOS’s Stopwatch app. I have run above queries some times, the average query time is about 3.7 second.

Second query in mysql:

mysql> select word,count(*) as dd from words group by word order by dd desc limit 20;
+------+--------+
| word | dd     |
+------+--------+
| the  | 597563 |
| to   | 466569 |
| a    | 228042 |
| is   | 223282 |
| and  | 215099 |
| in   | 176847 |
| i    | 174579 |
| for  | 165258 |
| on   | 164483 |
| of   | 161266 |
| this | 159244 |
| you  | 128854 |
| that | 126949 |
| at   | 126562 |
| it   | 117224 |
| be   |  89777 |
| from |  87112 |
| with |  86998 |
| not  |  85245 |
| if   |  82798 |
+------+--------+
20 rows in set (11.05 sec)

Then I created an index for this mysql table and query again:

mysql> create index wordIndex on words(word);
Query OK, 0 rows affected (40.21 sec)
Records: 0  Duplicates: 0  Warnings: 0

mysql> select word,count(*) as dd from words group by word order by dd desc limit 20;
+------+--------+
| word | dd     |
+------+--------+
| the  | 597563 |
| to   | 466569 |
| a    | 228042 |
| is   | 223282 |
| and  | 215099 |
| in   | 176847 |
| i    | 174579 |
| for  | 165258 |
| on   | 164483 |
| of   | 161266 |
| this | 159244 |
| you  | 128854 |
| that | 126949 |
| at   | 126562 |
| it   | 117224 |
| be   |  89777 |
| from |  87112 |
| with |  86998 |
| not  |  85245 |
| if   |  82798 |
+------+--------+
20 rows in set (5.10 sec)

As you see mysql has huge performance improvement after adding the index.

The last query in drill:

apache drill (dfs.pyh)> select word,count(*) as dd from `words.csv` group by word order by dd desc limit 20;
+------+--------+
| word |   dd   |
+------+--------+
| the  | 597563 |
| to   | 466569 |
| a    | 228042 |
| is   | 223282 |
| and  | 215099 |
| in   | 176847 |
| i    | 174579 |
| for  | 165258 |
| on   | 164483 |
| of   | 161266 |
| this | 159244 |
| you  | 128854 |
| that | 126949 |
| at   | 126562 |
| it   | 117224 |
| be   | 89777  |
| from | 87112  |
| with | 86998  |
| not  | 85245  |
| if   | 82798  |
+------+--------+
20 rows selected (3.507 seconds)

I have run the above queries some times, choose an average running time 3.5 second.

So the comparison of query time in three applications:

SparkMysqlDrill
Query time3.7swith index: 5.1s
without index: 11s
3.5s
Query time comparison in three SQL engines

It’s not surprised that for big data Mysql is not as efficient as the other two. Spark and Drill have the similar performance in this scenario. While Drill is much easier to use as you don’t have to know Scala programming.

computing performance comparison for words statistics

Here I provided a dataset for words appearing in the public tech lists.

The file is a text file, one line with one word. The total lines and size:

$ wc -l words.txt 
283218160 words.txt

$ du -h words.txt 
1.5G	words.txt

You can download this file from here (tgz compressed).

I just did a test to count the words in this file by grouping and sorting, with three methods: spark RDD API, spark dataframe API, and the scala program.

This is the syntax of spark RDD API in pyspark:

>>> rdd = sc.textFile("/tmp/words.txt")
>>> rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1],ascending=False).take(20)

This is the syntax of spark dataframe API in pyspark:

>>> df = spark.read.text("/tmp/words.txt")
>>> df.select("*").groupBy("value").count().orderBy("count",ascending=False).show()

This is the scala program:

import scala.io.Source

object BigWords extends App {

  val file = Source.fromFile("/tmp/words.txt").getLines()
  val hash = scala.collection.mutable.Map[String,Int]()

  for (x <- file) {
    if (hash.contains(x)) hash(x) += 1 else hash(x) = 1
  }

  hash.toList.sortBy(-_._2).take(20).foreach(println)
}

I compiled the scala program and run it:

$ scalac bigwords.scala 
$ time scala BigWords
(the,14218320)
(to,11045040)
(a,5677600)
(and,5205760)
(is,4972080)
(i,4447440)
(in,4228200)
(of,3982280)
(on,3899320)
(for,3760800)
(this,3684640)
(you,3485360)
(at,3238480)
(that,3230920)
(it,2925320)
(with,2181160)
(be,2172400)
(not,2124320)
(from,2097120)
(if,1993560)

real	0m49.031s
user	0m50.390s
sys	0m0.734s

As you see it takes 49s to finish the job.

While spark’s dataframe API is a bit slower, it takes 56s to finish the job (timer from my iOS stopwatch app):

+-----+--------+                                                                
|value|   count|
+-----+--------+
|  the|14218320|
|   to|11045040|
|    a| 5677600|
|  and| 5205760|
|   is| 4972080|
|    i| 4447440|
|   in| 4228200|
|   of| 3982280|
|   on| 3899320|
|  for| 3760800|
| this| 3684640|
|  you| 3485360|
|   at| 3238480|
| that| 3230920|
|   it| 2925320|
| with| 2181160|
|   be| 2172400|
|  not| 2124320|
| from| 2097120|
|   if| 1993560|
+-----+--------+
only showing top 20 rows

But, spark RDD API is quite slow in my use case. It takes 7m15s to finish the job:

[('the', 14218320), ('to', 11045040), ('a', 5677600), ('and', 5205760), ('is', 4972080), ('i', 4447440), ('in', 4228200), ('of', 3982280), ('on', 3899320), ('for', 3760800), ('this', 3684640), ('you', 3485360), ('at', 3238480), ('that', 3230920), ('it', 2925320), ('with', 2181160), ('be', 2172400), ('not', 2124320), ('from', 2097120), ('if', 1993560)]

I doubt it’s due to the slow python parser, so I re-run the RDD API with spark’s scala shell. The syntax and results as below:

scala> val rdd = sc.textFile("/tmp/words.txt")

scala> rdd.map{(_,1)}.reduceByKey{_ + _}.sortBy{-_._2}.take(20)

res1: Array[(String, Int)] = Array((the,14218320), (to,11045040), (a,5677600), (and,5205760), (is,4972080), (i,4447440), (in,4228200), (of,3982280), (on,3899320), (for,3760800), (this,3684640), (you,3485360), (at,3238480), (that,3230920), (it,2925320), (with,2181160), (be,2172400), (not,2124320), (from,2097120), (if,1993560))

This time it takes 1m31s to finish the job.

The results summary in a table:

programtime
scala program49s
pyspark dataframe56s
scala RDD1m31s
pyspark RDD7m15s

I am so surprised pyspark’s RDD API is too slow as this. I want to give a research on the case.

Application environment:

  • OS: Ubuntu 18.04.6 LTS, a KVM instance
  • Spark version: 3.2.0 (with scala 2.12.15, and python 3.6.9)
  • Scala version: 2.13.7
  • CPU: double AMD EPYC 7302
  • Ram: 4GB dedicated
  • Disk: 40GB SSD

Why mmap has no significant effect in my code

In my last blog there was a small benchmark for the languages of scala, perl and ruby.

Many geeks on the lists have pointed out the problems and gave the suggestions for improvement. I really appreciate them.

Why I want to benchmark them? because we have the real production which is using the similar counting technology. In the production, the input is the streaming. That means, every second there are the input words coming into the service. The data are coming from a message queue like Kafka or Rabbitmq or whatever. Then, we run the realtime computing on Spark. Spark reads the streaming and then run the DSL syntax for filtering and counting.

Here is the similar syntax on spark:

rdd=spark.readStream.format("socket")... # similar to words.txt
df=spark.createDataFrame(rdd.map(lambda x:(x,1)),["word","count"])
df2=df.filter(~col("word").isin(stopwords)) # filter out stopwords
df2.select("*").groupBy("word").count().orderBy("count",ascending=False).show(20)

Thanks to Paul who points out that, I can use mmap() to speed up the file read. I have followed Paul’s code to test again, but it’s strange mmap() have no special values to me. Here I give the comparison.

The common-read version for perl:

use strict;

my %stopwords;

open HD,"stopwords.txt" or die $!;
while(<HD>) {
    chomp;
    $stopwords{$_} =1;
}
close HD;

my %count;

open HD,"words.txt" or die $!;
while(<HD>) {
    chomp;
    unless ( $stopwords{$_} ) {
        $count{$_} ++;
    }
}
close HD;

my $i=0;
for (sort {$count{$b} <=> $count{$a}} keys %count) {
    if ($i < 20) {
        print "$_ -> $count{$_}\n"
    } else {
       last; 
    }
    $i ++;
}

The mmap version for perl:

use strict;

my %stopwords;

open my $fh, '<:mmap', 'stopwords.txt' or die $!;
while(<$fh>) {
    chomp;
    $stopwords{$_} =1;
}
close $fh;

my %count;

open my $fh, '<:mmap', 'words.txt' or die $!;
while(<$fh>) {
    chomp;
    unless ( $stopwords{$_} ) {
        $count{$_} ++;
    }
}
close $fh;

my $i=0;
for (sort {$count{$b} <=> $count{$a}} keys %count) {
    if ($i < 20) {
        print "$_ -> $count{$_}\n"
    } else {
       last; 
    }
    $i ++;
}

The common-read version for ruby:

stopwords = {}
File.open('stopwords.txt').each_line do |s|
  s.chomp!
  stopwords[s] = 1
end

count = Hash.new(0)
File.open('words.txt').each_line do |s|
  s.chomp!
  count[s] += 1 unless stopwords[s]
end

count.sort_by{|_,c| -c}.take(20).each do |s|
  puts "#{s[0]} -> #{s[1]}"
end

The mmap version for ruby:

require 'mmap'

stopwords = {}
mmap_s = Mmap.new('stopwords.txt')
mmap_s.advise(Mmap::MADV_SEQUENTIAL)
mmap_s.each_line do |s|
  s.chomp!
  stopwords[s] = 1
end

count = Hash.new(0)
mmap_c = Mmap.new('words.txt')
mmap_c.advise(Mmap::MADV_SEQUENTIAL)
mmap_c.each_line do |s|
  s.chomp!
  count[s] += 1 unless stopwords[s]
end

count.sort_by{|_,c| -c}.take(20).each do |s|
  puts "#{s[0]} -> #{s[1]}"
end

The code body of ruby was optimized by Frank, thanks.

So, this is the comparison for perl (the first is the common version, the second is mmap version):

$ time perl perl-hash.pl 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m2.018s
user	0m2.003s
sys	0m0.012s

$ time perl perl-mmap.pl 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m1.905s
user	0m1.888s
sys	0m0.016s

And, this is the comparison for ruby (the first is the common version, the second is mmap version):

$ time ruby ruby-hash.rb 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m2.690s
user	0m2.660s
sys	0m0.028s

$ time ruby ruby-mmap.rb 
send -> 20987
message -> 17516
unsubscribe -> 15541
2021 -> 15221
list -> 13017
mailing -> 12402
mail -> 11647
file -> 11133
flink -> 10114
email -> 9919
pm -> 9248
group -> 8865
problem -> 8853
code -> 8659
data -> 8657
2020 -> 8398
received -> 8246
google -> 7921
discussion -> 7920
jan -> 7893

real	0m2.695s
user	0m2.689s
sys	0m0.004s

I have run the above comparison for many times. The results were similar. My case shows there is no visible speed improvement by using mmap() for reading files.

The OS is ubuntu 18.04 x86_64, running on KVM VPS. There are 4GB dedicated ram, double AMD 7302 cores, 40GB ssd disk. Ruby version 2.5.1, Perl version 5.26.1.

Perl has its built-in mmap support. Ruby has mmap library installed by this way:

sudo apt install ruby-mmap2

Why this happens? I will continue to research with the problem.