Spark performance regression with sum aggregations

1 min read
Apr 6, 2018

There is an interesting bug that was found during the latest performance tuning we performed for Spark 2.2 (2.3 is also affected). It was a batch Spark job scheduled to be executed hourly and to process about 1Tb worth of data stored in parquet format. There were a number of things we tuned and this resulted in around 10% performance boost. We were looking for known issues and found explode related issue SPARK-21657. Although explode was used in our job it didn't cause any issues due to the low number of exploding rows. But this forced us to take a closer look into wholeStage codegen. Simple test with disabled spark.sql.codegen.wholeStage resulted in 30-40% time savings. We spent another day working on each piece of code and performed a test case with nothing but sum aggregations. [code language="scala"] val cnt = 50 val rows = 5000000 val dummy = udf(() => 1) def addConstColumns(inputDF: DataFrame) = (0 until cnt).foldLeft(inputDF)((df, idx) => df.withColumn(s"col_$idx", dummy())) spark.range(rows).toDF() .withColumn("grp", lit(1)) .transform(addConstColumns) .groupBy("grp") .agg(sum("col_0"), (1 until cnt).map(idx => sum(s"col_$idx")): _*) .collect() [/code] This code creates a dataframe with 50 columns that are sum aggregated. I did this test locally on a laptop (HotSpot JVM) with WHOLESTAGE_CODEGEN_ENABLED=true/false and noticed that timing was about 15 sec vs 3 sec. The result was it created a Spark Jira issue SPARK-23791. Apparently I found one very similar SPARK-20184 and SPARK-20479 issues targeted Spark 2.4. After a series of tests we also found that with a low number of cnt <= 13 and cnt>=100 there were no noticeable differences; with 14 <= cnt <= 83 it's about a 4-7 time difference and finally with 84 <= cnt <= 99 code simply failed with nasty a error: [code language="scala"]java.lang.ClassFormatError: Too many arguments in method signature in class file org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage2 at java.lang.ClassLoader.defineClass1(Native Method)[/code] If you have a performance critical Spark job that's performing a lot of aggregations with Dataframe API be careful, there is no time savings that can be achieved with disabling wholeStage code generation.

