By Hari Doredla
Spark jobs are performed efficiently to process the large data with the configurations discussed below, taming the big data to get desired output with low latency. Here we discuss the Spark configuration parameters we applied to resolve issues ,and get efficient performance in AWS to process Big data of 30 gb…
Spark version: 1.2.1
In AWS EMR: /home/hadoop/spark/conf/spark-defaults.conf
In cloudera(CDH 5.3.1): /etc/spark/conf/spark-defaults.conf
Spark on yarn environment: (set, below two properties to submit job though spark-submit.
–num-executors NUM Number of executors to launch (Default: 2).
–executor-cores NUM Number of cores per executor (Default: 1).
Note: These switches to be used depending upon cluster capacity.
Exception in thread “main” org.apache.spark.SparkException: A master URL must be configured.
Spark properties are configured in three ways:
- Setting the configuration properties in the code using spark conf
- Setting the switches in spark-submit tool
- Keeping the configuration properties in spark-defaults.confProperties configured as above will take precedence in the same order as above, i.e. firstly as in the code , secondly as in switches and thirdly as in default.conf .
To avoid configuration mismatch among the three we have chosen to code “Dynamically Loading Spark Properties”.To avoid hard-coding in a SparkConf for instance, if you would like to run the same application with different masters or different memory size, Spark allows to create an empty conf:- val sc = new SparkContext(new SparkConf())
15/04/01 11:25:56 INFO scheduler.DAGScheduler: Job 12 failed: collect at CardTransformations.java:225, took 308.106770 s Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 193 tasks (1028.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages (DAGScheduler.scala:1214)
spark.driver.maxResultSize: Limit of total size of serialized results of all partitions for each Spark action (e.g. collect). (Should be at least 1M, or 0 for unlimited.) Jobs will be aborted if the total size is above this limit. Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory and memory overhead of objects in JVM). Setting a proper limit can protect the driver from out-of-memory errors.
Increase the spark.driver.maxResultSize (Default: 1G) using “spark.driver.maxResultSize” property.
15/04/01 12:00:42 INFO scheduler.DAGScheduler: Job 13 failed: saveAsTextFile at JavaSchemaRDD.scala:42, took 211.893538s Exception in thread “main” org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 12662:0 was 57503819 bytes, which exceeds max allowed: spark.akka.frameSize (10485760 bytes) – reserved (204800 bytes). Consider increasing spark.akka.frameSize or using broadcast variables for large values. at org.apache.spark.scheduler. DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
spark.akka.frameSize: Maximum message size to allow in “control plane” communication (for serialized tasks and task results), in MB. Increase this if the tasks need to send back large results to the driver (e.g. using collect() on a large dataset).
Increase the spark.akka.frameSize (Default: 10M) by using ” spark.akka.frameSize” property.
15/04/02 15:27:28 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xc05241a5, /172.31.31.17:55446 => /172.31.31.15:33175] EXCEPTION: java.lang.OutOfMemoryError: Java heap space at java.lang.Object.clone(Native Method) at akka.util.CompactByteString$.apply(ByteString.scala:410) at akka.util.ByteString$.apply(ByteString.scala:22)
spark.driver.memory: Amount of memory to use for the driver process, i.e. where SparkContext is initialized.
Increase the driver memory (Default: 512M) using “spark.driver.memory” property.
java.lang.OutOfMemoryError: Java heap space
15/04/02 15:28:04 INFO scheduler.DAGScheduler: Job 0 failed: collect at LevelOneTotalReportsManager.java:99, took 895.599089 s
15/04/02 15:28:04 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception:
Job cancelled because SparkContext was shut down)
15/03/19 17:13:19 ERROR Executor: Exception in task 55.0 in stage 12.0 (TID 894)
java.lang.OutOfMemoryError: GC overhead limit exceeded
spark.executor.memory : Amount of memory to use per executor process, in the same format as JVM memory strings.
Increase the executor memory (Default: 1G) using “spark.executor.memory” property.