have around 6-8TB of data each in a sharded table with 5 partitions. This table is in HBase. I have built a Java based spark job that reads data from this table and performs some aggregations to get aggregates for a set of columns treated as key and then finally writes back the results into another table. Initially, i tried with spark map and foreach api, and performed aggregations in memory using data structures such HashMap. This was finally upserted into table using jdbc connection. However, the performance was really bad and the job never completed. Then, wrote a new job using DataFrames. I am pulling the data using HBaseRDD API and converting it into dataframe, then i perform groupBY and aggregations and finally saving the results using
" finalDF.save(“org.apache.phoenix.spark”, SaveMode.Overwrite, output_conf);"
This also was taking time,so i divided the task based on a key range and processed a range (say 1 million users) at a time with repartitioning the data by 2001 to ensure high compression.
DataFrame sessionDF = new PhoenixRDD(sqlContext.sparkContext(),inputTable,JavaConverters.asScalaBufferConverter(cols).asScala().toSeq(),Option.apply(filter),Option.apply(source),hconf).toDataFrame(sqlContext).repartition(partitions);
The spark job properties used are as below:
--spark.app.name test --spark.master yarn --spark.deploy.mode cluster --spark.driver.cores 2 --spark.driver.memory 4G --spark.executor.instances 8 --spark.executor.cores 2 --spark.executor.memory 16G --spark.executor.heartbeatInterval 6000000 --spark.default.parallelism 2001 --spark.yarn.executor.memoryOverhead 4096 --spark.yarn.scheduler.heartbeat.interval-ms 6000000 --spark.network.timeout 6000000 --spark.serializer org.apache.spark.serializer.KryoSerializer --spark.shuffle.io.retryWait 60s --spark.shuffle.io.maxRetries 10
The problem is that this job takes around 8-10hrs to process just one million users which is close to 1TB of data and after that it usually start giving “org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1” and takes another 5-6hrs to finish. I tried increasing executors and memory, but still end up with this issue somewhere during the run and its getting difficult to process this whole data.
Can someone please advise how can i improve the processing of this job? Please let me know if you need any further information.
Here is the cut-down version of aggregation step:
finalDF .select(col("OID"), col("CID") , col(“P”)) .groupBy(col("OID") , col("CID”)) .agg(sum(when(col("P").equalTo(lit("sd")).or(col("P").equalTo(lit("hd"))), lit(1)).otherwise(lit(0))).alias("P"));
There are many more fields and other aggregations as part of this statement.