The technicalities 10 min read 2 November 2020

Spark shuffle – Case #3 – using salt in repartition

Author
TantusData

Company

Share
Share:
use case big data

Why use salt in repartition? In the previous blog entry we saw how a skew in a processed dataset is affecting performance of Spark jobs. We resolved the problem by repartitioning the dataset by a column which naturally splits the data into reasonably sized chunks. But what if we don’t have such columns in our dataset? Or what if you would like to control a number of files/file size produced by your Spark job? For the sake of this exercise let’s assume that we have 1.1T of data: 7 days of events. This means roughly 165G per day.

Keep in mind that hour attribute we used before splits one day of data into 24 files. Quick back of the envelope calculation leads to conclusion that the size of the file we expect is 165/24=7G. If you are unlucky two different hours might end up in the same file which means 14G file. This is problematic because it might require you to have beefy executors. Other than that it limits the parallelism – the maximum executors you can effectively throw at a problem is 24×7=156 – it would be a problem if your logic required heavy calculations. Because of that using hour column and creating 24 files per day is not the number you aim for in this case.

What can you do if you want to produce more files per output dir? But still control the number…

The trick is quite simple – add a column with random value (salt). The value will be random but you can control the range of generated values. Then you can just use that column when repartitioning. The range you choose will reflect number of files generated for each directory*. After repartitioning the data will be organised into spark partitions in an expected way. As the last step you need to remove the salt column you just introduced – it holds no business value. This is what the code would look like:

df
      .withColumn("year", year(col("eventTimestamp")))
      .withColumn("month", month(col("eventTimestamp")))
      .withColumn("day", dayofmonth(col("eventTimestamp")))
      .withColumn("salt", toInt(rand() * 540))
      .repartition(col("year"), col("month"), col("day"), col("salt"))
      .drop("salt")
      .write
      .partitionBy("year", "month", "day")
      .save(output)

I want to stress on this part:

.withColumn("salt", toInt(rand() * 540))
      .repartition(col("year"), col("month"), col("day"), col("salt"))
      .drop("salt")

… which is generating new ‘salt‘ column with random int from a range of [0, 540], then repartitioning using the salt and eventually removing the column. The salt is only needed in order to distribute your data into 540 separate partitions instead of having single partitioning processing entire day (which is ~165 GB in our case).

Salting prevents from creating a single file per partition which could be too large and could lead to OOM and other errors. By salting you get much better control of number of files (so size of them) per output directory. When you run the job and observe SparkUI:

You can observe that most of the tasks are processing few hundreds megs. You don’t have very heavy tasks and you don’t have many tasks which are doing nothing – exactly what you want to achieve. Other than that you can confirm the above observation by looking into the files written. There are hundreds of them and each of them is few hundreds megabytes in size.

303.4 M  910.2 M  /user/marcin/result/year=2018/month=3/day=1/part-00013-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.4 M  910.1 M  /user/marcin/result/year=2018/month=3/day=1/part-00019-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.6 M  910.8 M  /user/marcin/result/year=2018/month=3/day=1/part-00023-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
605.6 M  1.8 G    /user/marcin/result/year=2018/month=3/day=1/part-00031-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
606.2 M  1.8 G    /user/marcin/result/year=2018/month=3/day=1/part-00043-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.4 M  910.1 M  /user/marcin/result/year=2018/month=3/day=1/part-00044-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.6 M  910.9 M  /user/marcin/result/year=2018/month=3/day=1/part-00049-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.2 M  909.6 M  /user/marcin/result/year=2018/month=3/day=1/part-00055-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.7 M  911.0 M  /user/marcin/result/year=2018/month=3/day=1/part-00062-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.9 M  911.7 M  /user/marcin/result/year=2018/month=3/day=1/part-00066-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.7 M  911.2 M  /user/marcin/result/year=2018/month=3/day=1/part-00071-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.5 M  910.5 M  /user/marcin/result/year=2018/month=3/day=1/part-00073-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.2 M  909.7 M  /user/marcin/result/year=2018/month=3/day=1/part-00078-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.4 M  910.1 M  /user/marcin/result/year=2018/month=3/day=1/part-00079-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.7 M  911.2 M  /user/marcin/result/year=2018/month=3/day=1/part-00082-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.6 M  910.9 M  /user/marcin/result/year=2018/month=3/day=1/part-00083-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.6 M  910.7 M  /user/marcin/result/year=2018/month=3/day=1/part-00085-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.8 M  911.4 M  /user/marcin/result/year=2018/month=3/day=1/part-00086-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.7 M  911.1 M  /user/marcin/result/year=2018/month=3/day=1/part-00090-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet
303.3 M  910.0 M  /user/marcin/result/year=2018/month=3/day=1/part-00096-bcbb5461-1df0-46a1-abe9-c11382f6a94c.snappy.parquet

To conclude:

Firstly, Spark is keeping repartitioned data in the same task is values of the columns used for repartitioning are the same. Secondly, if the amount of data in single task is too large for your use case you can repartition by more columns in repartition clause. Lastly, you can use salt for artificially splitting data into more tasks and keep better control over size of the data processed per task.

Share
Share: