update 4/28/2022: Greater flexibility and parameterization has been introduced. See the details at https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-transform-data-using-spark-job-definition?branch=pr-en-us-195215#settings-tab
Azure Synapse is evolving quickly and working with Data Science workloads using Apache Spark pools brings power and flexibility to the platform. Synapse is an abstraction layer on top of the core Apache Spark services, and it can be helpful to understand how this relationship is built and managed. The goal of this post is to hone in on managing executors and other session related configurations.
First, some brief background information. Synapse uses Apache Livy to submit jobs to the backend spark cluster. This is how you are requesting resources such as executors and defining their configuration: cores, memory, instance count, etc... So what are executors? When a job is submitted, a driver will request the resources from the cluster, communicates with the cluster, and distributes your tasks to the cluster using executors. This is why in Synapse you will see driver logs as well as Livy logs.
Armed with the above information, it seems like a good place to start is configuring your Spark cluster in Azure Synapse. There is plenty of information in the Microsoft Docs for Azure Synapse Spark pools to get started. Also, there is further explanation on pools here. When working with a new Notebook, you can configure your session specific settings by clicking the gear at the top right of the workspace window.
Here you can specify a different executor size than node size in your pool. This only works for reducing executor size, as you can't consume more resources than you've configured for your pool. For example, an XL pool can have large, medium, and small executors. A small pool cannot support an XL executor. You'll see the drive size matches the executor size and you cannot change this setting. And finally, you can set the number of executors you'd like to configure for this session. These are all values helping to build a config that Livy will use to submit our job to the Spark cluster.
So how do you confirm that these settings are being honored? When you run your notebook, a new Spark session is started. Everything gets bundled into a Spark application and ran on the cluster for you. In the monitoring tab, you can check Apache Spark Applications to see a list of recently ran jobs. Click to open one and then click "Spark History Server."
Click on the app ID link to get the details then click the Executors tab. The bottom half of the report shows you the number of drivers (1) and the number of executors that was ran with your job. You can also see the number of cores and memory that were consumed (useful if you are tweaking these values which I'll show in a bit). If you click the Environment tab you'll see a text configuration of your Spark session. Some key attributes to look for:
What is Dynamic Allocation? Dynamic allocation allows your Spark application to perform elastic scaling of your executors automatically. As of this post, it is my understanding that the only way to enable this feature is to configure it via the %%configure magic command in your notebook:
This must be placed at the top of your Spark notebook. These values cannot be parameterized unfortantely (if you know of a way, please share). By being placed at the top of the notebook, the settings are honored when your job is submitted to Livy and subsequently to Spark.
If we don't want Dynamic Allocation, we can ignore that for now. And we can use the numExecutors statement to define a static number of executors. You can confirm if you are using executors efficiently by looking the the Spark application monitoring in Spark History Server and navigating to the Diagnostics (preview) page and looking at "Executor Usage Analysis."
Can we do the same for Spark Job Definitions? Can these configurations be parameterized for Spark Job Definitions? The answer: yes.
If you look at a Spark Job Definition, you can configure the executor size and the number of executors.
There is a Synapse REST API endpoint for managing Spark Job Definitions, however, I am unable to get it to actually update a job definition or create a new one. It seems this is true for setting Dynamic Allocation as well. Getting all of the job definitions of a workspace seems to work fine using REST.
There is another way however, and that is we can override these settings by using the SparkConf() method to create a SparkConf object. You can find the complete list of Spark (3.0) properties here. Check out the below python:
from pyspark import SparkContext, SparkConf
if __name__ == "__main__":
# create Spark context with necessary configuration
conf = SparkConf().setAppName("WordCount")\
sc = SparkContext(conf=conf)
# read data from text file and split each line into words
words = sc.textFile(sys.argv).flatMap(lambda line: line.split(" "))
# count the occurrence of each word
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)
# save the counts to output
As you can see, the executor cores and instances are being changed. In this case, I am using sys.argv which repesents the first command line parameter in the Spark Job Definition. This overwrites the Spark context properties without deallocating the context. The behavior is the same in Spark 2.4 and 3.0.
Take a look again at how my Spark Job Definition is defined above.
Pay close attention to the Command line arguments. As I mentioned earlier, I am using the first argument to define the number of executors (2) and the next two parameters as input file and output director (per the example posted in the beginning). The Spark pool configured is an XL but I am manually setting executor size to medium and the number of executors to 4. However, the code we wrote above will override this configuration. Once the Spark application has completed, open the Spark History Server UI and navigate to Executors. As you can see, 2 executor instances were used not 4 and 3 cores per executor, not 8.
In conclusion, using command line parameters in your Spark Job Definition will allow you to parameterize the number of resources you are using in your job. This can be very useful if you want to include the "priority" of a job as a parameter to feed to the job definition. Controlling the number of executors will help you control costs or commit the appropriate amount of resources to get your tasks done faster. I hope also this was a good introduction to the Spark History Server UI, and how Synapse is interacting with the backend Apache Spark Cluster. Happy hacking.