Spark Sql Auto Broadcast Join Tuning
Spark Configuration
Jul 18, 2016 Spark is a component of IBM® Open Platform with Apache Spark and Apache Hadoop. Apache Spark is a fast and general-purpose cluster computing system that allows you to process massive amount of data using your favorite programming languages including Java, Scala and Python. Spark also automatically uses the spark.sql.conf.autoBroadcastJoinThreshold to determine if a table should be broadcast. Dataset Joins Joining Datasets is done with joinWith, and this behaves similarly to a regular relational join, except the result is a tuple of the different record types as shown in Example 4-11.
- Nov 14, 2017 The default implementation of a join in Spark is a shuffled hash join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition.
- Broadcast Join Besides enabling CBO, another way to optimize joining datasets in Spark is by using the broadcast join. In a shuffle join, records from both tables will be transferred through the network to executors, which is suboptimal when one table is substantially bigger than the other.
- Auto-suggest helps you quickly narrow down your search results by suggesting possible matches as you type. Issue with broadcast Join in Spark 1.6.0 vladimir85. New Contributor. Created 05:56 AM. The Spark SQL is configured by the broadcast parameter (1GB) which is more than the size of the tables.
spot-ml main component uses Spark and Spark SQL to analyze network events and those considered the most unlikelyor most suspicious.
To run spot-ml with its best performance and scalability, it will probably be necessary to configure Yarn, Spark and Spot. Here are our recommended settings.
General Yarn tuning
spot-ml Spark application has been developed and tested on CDH Yarnclusters. Careful tuning of the Yarn cluster may be necessary before analyzing large amounts of data with spot-ml.
For small data sets, under 100 GB parquet files, default Yarn configurations should be enoughbut if users try to analyze hundreds of gigabytes of data in parquet format it's probable that it don't work; Yarn most likely will start killingcontainers or will terminate the application with Out Of Memory errors.
For more on how to tune Yarn for general use, we suggest these links:
Users need to keep in mind that to get a Spark application running, especially for big data sets, it might take more thanone try before getting any results.
Configuring Spot's Usage of Spark
When running Spark on Yarn users can set up a set of properties in order to get the best performance and consume resources in a moreeffective way. Since not all clusters are the same and not all users are planning to have the same capacity of computation, we have createdvariables that users need to configure before running spot-ml.
After installing spot-setup users will find the spot.conffile under /etc folder. This file contains all the required configuration to run spot-ml, as explained in INSTALL.md. In thisfile exist a section for Spark properties, below is the explanation for each of those variables:
Besides the variables in spot.conf, users can modify the rest of the properties in ml_ops.sh based on their needs.
Setting Spark properties
After Yarn cluster has been tuned the next step is to set Spark properties assigning the right values to spot.conf Sparkvariables.
Number of Executors, Executor Memory, Executor Cores and Executor Memory Overhead
The first thing users need to know is how to set the number of executors and the memory per executor as well as the number of cores.To get that number, users should know the available total memory per node after Yarn tuning, this total memory is determined by yarn.nodemanager.resource.memory-mbproperty and the total number of available cores is given by yarn.nodemanager.resource.cpu-vcores.
Depending on the total physical memory available for Yarn containers, the memory per executor can determine thestarting point to set the total amount of executors. To calculate the memory per executor and number of executors we suggest tousers follow the next steps:
- Divide the total of physical memory per node by a number between 3 and 5.
- If the result of the division is something equal or bigger than 30 GB then continue to calculate the number of executors.
- If the result of the division is less than 30 GB try smaller number.
- The result of the division will be the memory per executor. Multiply the number used in the first division by the number of nodes.
- The result of step 4 could be the total executors but users need to consider resources for the application driver. Depending on theresult of the multiplication, we recommend to subtract 2 or 3 executors.
See example below:
Having a cluster with 9 nodes, each node with 152 GB physical memory available: 152/5 ~ 30 GB.5 executors x 9 nodes = 45 executors - 2 = 43 executors.
In the previous example, taking off 2 executors ensures enough memory for the application driver.
Users can determine the number of cores per executor having the total of executors and theavailable vcpus per node:
- Divide the available vcpus by the number of executors.
Example:
Having a total of 432 vcpus, 48 per node: 432/43 ~ 10 cores per executor.
Although it sounds like a good idea to allocate all the available cores, we have seen cases where many coresper executor will cause Spark to assign more task to every executor and that can potentially cause OOM errors.Is recommended to keep a close relation between cores and executor memory.
Lastly, for overhead memory we recommend to use something between 8% and 10% of executor memory. Do they auto tune on the voice.
Following the example, the values for the Spark variables in spot.conf would look like this:
Driver Memory, Driver Maximum Results and Driver Memory Overhead
spot-ml application executes actions such as .collect, .orderBy, .saveAsTextFile so we recommend to assign aconsiderable amount of memory for the driver.
The same way, driver maximum results should be enough for the serialized results.
Depending on users data volume these two properties can be small as tens of gigabytes for driver and a couple of gigabytes fordriver maximum results or grow up to 50 GB and 8 GB respectively. Users can follow the next steps to determine the amount of memoryfor driver and maximum results:
- If executor memory is equal or bigger than 30 GB, make driver memory the same as executor memory and driver maximum resultsa value equal or bigger than 6 GB.
- If executor memory is less than 30 GB but data to be analyzed is equal or bigger than 100 GB make driver something between 30 GBand 50 GB. Driver maximum results should be something equal or bigger than 8 GB.
Following the example in the previous section, with 9 nodes, 43 executors, 30 GB memory each executor, we can set driver memoryto 30GB and 8GB driver maximum results.
Memory overhead for driver can be set to something between 8% and 10% of driver memory.
This is how Spark variables look like for driver properties:
Representation of memory allocation in driver node.
For more information about Spark properties click here.
Spark autoBroadcastJoinThreshold in spot-ml
After Spark LDA runs, Topics Matrix and Topics Distribution are joined with the original data set i.e. NetFlow records, DNS records or Proxy records to determine the probability of each event to happen. This joining process is similar to join a big data set and a lookup table. In this case, the big data set is the entire set of records, and the lookup table is a dictionary of documents and probabilities per topic or words and probabilities per topic.
Because of the possible diversity of documents/IPs, the lookup table containing document probability distribution can grow to something bigger than 10 MB. Taking in account that 10 MB is Spark's default auto broadcast threshold for joins, a join with a lookup table bigger than that threshold will result in the execution of a traditional join with lots of shuffling.
The correct setting of SPK_AUTO_BRDCST_JOIN_THR and PRECISION can help to always broadcast document probability distribution lookup table and avoid slow joins.
As a first step, users need to decide whether they want to change from 64 bit floating point probabilities to 32 bit floating point probabilities; if users decide to change from 64 to 32 bit, the document probability distribution lookup table will be half the size and more easily broadcasted.
If users want to cut payload memory consumption roughly in half, they should set the precision option to 32.
PRECISION='32'
If users prefer to keep 64 bit floating point numbers, they should set precision option to 64 (default).
PRECISION='64'
Given the approximate number of distinct IPs in every batch or data set being analyzed, users should set SPK_AUTO_BRDCST_JOIN_THR to something that can fit the document probability distribution lookup table.
For instance, if a user knows there can be 2,000,000 distinct IP addresses and is using 20 Topics, the document probability distribution lookup table can grow to something like 190 bytes per row if using PRECISION as 64 bit and 110 bytes per row if using 32 bit option.
Document probability distribution lookup table record example:
(192.169.111.110, [0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05]
In that case, users should set auto broadcast join threshold to something that can fit 365 MB (380000000 Bytes) for 64 bit floating precision numbers or 210 MB (220000000 Bytes) for 32 bit floating precision numbers.
PRECISION='32'
SPK_AUTO_BRDCST_JOIN_THR='220000000'
Known Spark error messages running spot-ml
Out Of Memory Error
This issue includes java.lang.OutOfMemoryError: Java heap space and java.lang.OutOfMemoryError : GC overhead limit exceeded.When users get OOME can be for many different issues but we have identified a couple of reasons for thiserror in spot-ml.
The main reason for this error in spot-ml can be when the ML algorithm returns large results for word probabilities per topic. Dev c download windows 10. Since ML algorithm results are broadcast, each executor needs more memory.
Spark Sql Join
Another possible reason for this error is driver is running out of memory, try increasing driver memory.
Container killed by Yarn for exceeding memory limits. X.Y GB of X GB physical memory used. Consider boosting spark.yarn.executor memoryOverhead
This issue is caused by certain operations, mainly during the join of document probabilities per topic with the rest ofthe data - scoring stage. If users receive this error they should try with increasing memory overhead up to 10% of executor memoryor increase executors memory.
Spark Sql Auto Broadcast Join Tuning 2017
org.apache.spark.serializer.KryoSerializer
Broadcast Hash Join Spark Sql
KryoSerializer can cause issues if property spark.kryoserializer.buffer.max is not enough for the data being serialized.Try increasing memory up to 2 GB but keeping in mind the total of the available memory.