Configure Spark in tune with your application

By Hari Doredla

Spark jobs are performed efficiently to process the large data with the configurations discussed below,  taming the big data to get desired output with low latency.    Here we discuss the Spark configuration parameters we applied to resolve issues ,and get efficient performance in AWS to process Big data of 30 gb…

Spark version: 1.2.1

  • spark-defaults.conf:
    In AWS EMR:  /home/hadoop/spark/conf/spark-defaults.conf
    In cloudera(CDH 5.3.1):    /etc/spark/conf/spark-defaults.conf

    Properties:
    executor.memory   8G
    spark.driver.memory     8G
    spark.driver.maxResultSize 4G
    spark.akka.frameSize    256

Spark on yarn environment: (set, below two properties to submit job though spark-submit.

–num-executors NUM        Number of executors to launch (Default: 2).
–executor-cores NUM        Number of cores per executor (Default: 1).
Note: These switches to be used depending upon cluster capacity.

  • Troubleshooting:

Issue 1:

Exception in thread “main” org.apache.spark.SparkException: A master URL must be configured.

Resolution: 

      Spark properties are configured in three ways:

  1. Setting the configuration properties in the code using spark conf
  2. Setting the switches in spark-submit tool
  3. Keeping the configuration properties in spark-defaults.confProperties configured as above will take precedence in the same order as above, i.e. firstly as in the code , secondly as in switches and thirdly as in default.conf .
    To avoid configuration mismatch among the three we have chosen to code “Dynamically Loading Spark Properties”.To avoid hard-coding in a SparkConf for instance, if you would like to run the same application with different masters or different memory size, Spark allows to create an empty conf:- val sc = new SparkContext(new SparkConf())

 Issue 2:

15/04/01 11:25:56 INFO scheduler.DAGScheduler: Job 12 failed: collect at CardTransformations.java:225, took 308.106770 s                             Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 193                   tasks (1028.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at                                                                                                                                       org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages                                 (DAGScheduler.scala:1214)

        Resolution:

spark.driver.maxResultSize: Limit of total size of serialized results of all partitions for each Spark action (e.g. collect). (Should be at                    least 1M, or 0 for unlimited.) Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory                        errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the                    driver from out-of-memory errors.

Increase the spark.driver.maxResultSize (Default: 1G) using “spark.driver.maxResultSize” property.

        Issue 3:

15/04/01 12:00:42 INFO scheduler.DAGScheduler: Job 13 failed: saveAsTextFile at JavaSchemaRDD.scala:42, took                                                 211.893538s  Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 12662:0               was 57503819 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) – reserved (204800 bytes). Consider                             increasing spark.akka.frameSize or using broadcast variables for large values. at org.apache.spark.scheduler.                                                                 DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)

  Resolution:

spark.akka.frameSize: Maximum message size to allow in “control plane” communication (for serialized tasks and task results), in MB.          Increase this if the tasks need to send back large results to the driver (e.g. using collect() on a large dataset).

Increase the spark.akka.frameSize (Default: 10M) by using ” spark.akka.frameSize” property.

Issue 4:

15/04/02 15:27:28 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception             event ([id: 0xc05241a5, /172.31.31.17:55446 => /172.31.31.15:33175] EXCEPTION: java.lang.OutOfMemoryError: Java heap space                  at java.lang.Object.clone(Native Method)  at akka.util.CompactByteString$.apply(ByteString.scala:410)  at                                                                     akka.util.ByteString$.apply(ByteString.scala:22)

 Resolution:

spark.driver.memory: Amount of memory to use for the driver process, i.e. where SparkContext is initialized.

Increase the driver memory (Default: 512M) using “spark.driver.memory” property.

[sparkDriver]

java.lang.OutOfMemoryError: Java heap space

15/04/02 15:28:04 INFO scheduler.DAGScheduler: Job 0 failed: collect at LevelOneTotalReportsManager.java:99, took 895.599089 s

15/04/02 15:28:04 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception:

Job cancelled because SparkContext was shut down)

    Issue 5:

15/03/19 17:13:19 ERROR Executor: Exception in task 55.0 in stage 12.0 (TID 894)

java.lang.OutOfMemoryError: GC overhead limit exceeded

    Resolution:

spark.executor.memory : Amount of memory to use per executor process, in the same format as JVM memory strings.

Increase the executor memory (Default: 1G) using “spark.executor.memory” property.