Search
  • Steve Flowers

Azure Data Explorer and Synapse Spark Part 1

Updated: Apr 22




Many organizations are implementing streaming data flows via IoT devices or app telemetry. In the modern data platform, bits and bytes are being ingested at insane speeds, and wrangling that data can be troublesome. I have written about Azure Data Explorer (ADX) in the past and love the service. However, we often require additional reference data to complement our streaming data to complete the analytics story.


Natively, ADX can work with external tables by referencing files in an Azure Data Lake or a SQL table. But, organizations using Azure Synapse have come to love the ecosystem in Synapse workspaces and it has become the go-to playground for data engineers and data scientists alike. In this post, I will demonstrate how to add a Kusto cluster as an external, linked data source in Synapse and mashup our telemetry with business data stored in the data lake.


First, let me describe my test data. In this scenario, there will be an IoT device sending messages to IoT hub with sensor information. Below is an example message:


{"message":
   [
	{"name":"sensorID","value":"1003"},
	{"name":"temperature","value":"150"},
	{"name":"pressure","value":"25"},
	{"name":"eventT","value":"2021-04-20T21:51:54.861Z"}
   ]
}

In a typical IoT scenario, the device will report the least amount of information possible to keep message sizes small. This benefits performance as upstream services charge based on throughput, have limitations on throughput, and batching messages is common. So in this case we have a sensor but know very little about it based on this message. Luckily for us, our data engineers have ingested a SQL table into the data lake which defines all of the devices in the field. The table looks like this:


The above table is stored as a CSV file in our data lake. Our architecture looks like this:


As you can see, data is flowing into my ADX table:



We have our telemetry data in ADX, we have our reference data in Azure Data Lake, let's use Spark to mash it up.


First, we need to provide Synapse access to the ADX database. This requires either a Synapse managed identity or an SPN. I will not walk through how to accomplish this here as there is plenty of info in Microsoft docs on this topic.


To provide an identity access to the ADX database, open the ADX cluster in the Azure portal, select "databases" and select your database. Click "permissions" in the navigation bar and add permissions for your SPN or MSI based on your requirements. See this article for further information.


Next, add Azure Data Explorer as a new external data set in your Azure Synapse workspace:



Create the linked service using the required information from your ADX cluster. Test your connection and create! You'll now see ADX as a linked data source. When you expand the source you will see the databases and tables associated with your cluster based on your permissions.


Perform the same steps to add your Azure Data Lake as a linked service in Synapse. Your Synapse MSI or an SPN will need access to your data lake. Once both services are available, create a new Spark notebook. If you don't have one, you will need to create a Spark cluster in Synapse.


To ingest the ADX data and place into a data frame, simply right-click the table, "New Notebook", "Load to data frame"! It is that easy. Now you just need to define the KQL query that will serve the data properly to be stored in your data frame.


%%pyspark

# Read data from Azure Data Explorer table(s)
# Full Sample Code available at: https://github.com/Azure/azure-kusto-spark/blob/master/samples/src/main/python/SynapseSample.py

kustoDf  = spark.read \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("spark.synapse.linkedService", "AzureDataExplorer1") \
    .option("kustoDatabase", "messages") \
    .option("kustoQuery", "telemetry | mv-apply message on (summarize b = make_bag(pack(tostring(message.name), message.value))) | project b | evaluate bag_unpack(b)") \
    .load()

display(kustoDf.limit(3))

As you can see, we define a kustQuery option which is the query called against our remote table telemetry against the Kusto database called messages. This is the KQL query:


telemetry 
| mv-apply message on (
summarize b = make_bag(pack(tostring(message.name), message.value)) ) 
| project b 
| evaluate bag_unpack(b)

This query is a little more complicated than most since in order to keep the schema flexible in my ADX table, I have to "pack" then "unpack" the dictionary. If I didn't do this, I would have a column called "name" and a second column called "value" but this is not the table structure I want. From this example you can see how having schema on read is flexible but also arduous at times. This is what the data looks like when queried directly in ADX:



So, we have a data frame with the results from our kusto query. Now we need to ingest the CSV of our reference data into a data frame.


%%pyspark
df = spark.read.load('abfss://data@mydatalake.dfs.core.windows.net/reference/adx-synapse-ref-data.csv', format='csv'
, header=True
)
display(df.limit(3))

This data frame looks like this:



As you can see, it lists information based on sensor ID. Now we can join these data frames together to display a coherent table.



joinedDf = kustoDf.join(df, ["sensorid"], "inner")
display(joinedDf.limit(3))

And our output looks like this:




Now you can aggregate the data to display messages based on manufacturer or version. Perhaps you are also sending device errors to IoT Hub and ADX, with this you can easily determine which devices are having problems across your environment. The possibilities are endless.



In this post I discussed the idea of creating an external data source in Azure Synapse pointing to an Azure Data Explorer cluster. This is a great way to bring your telemetry into your Spark cluster for data mashing. But, marrying telemetry and business data is about to get easier. Stay tuned for part 2, coming soon.


23 views0 comments

Recent Posts

See All