top of page
Search
  • Writer's pictureSteve Flowers

Cosmos DB All Versions and Deletes Change Feed with .NET and Fabric Spark



Quick links


Introduction


Azure Cosmos DB All Versions and Deletes change feed mode (also known as full fidelity change feed) is a preview feature which expands the change feed to include intermediate updates and deletes. The change feed is a way to access documents based on their modification date. This allows developers to react to changes to documents by listening to the change feed.


Popular use cases include creating a materialized view pattern, duplicating data in additional containers for broader partition key support, and event-based actions. The MV pattern has been implemented for many use cases to create a new view of your data such as aggregate sales or total orders. Managed MV's are in preview currently. Duplicating data into additional containers has been necessary for uses cases where a single partition key does not address the queries required by the workload. Queries which omit a partition key are expensive and it is much more efficient to duplicate the data into a new container. Finally, event-based actions such as and Azure Function that kicks off a downstream process or updates service state are common.


There are 2 change feed modes. Latest version mode has been supported for some time and works well for most use cases. Latest version mode has 2 primary limitations: deletes are not captured in the change feed and intermediary updates are lost. When a document is deleted, there is not accounting for this action in the change feed when using latest version mode. This limits the use cases where change feed could be beneficial for applications needing to react to deletes for data consistency. Intermediary updates include a document that is change several times in between polling of the change feed. The change feed only represents the current state of the document and a client reading the change feed would not know what happened to the document since the last time the change feed was read.


As of this writing All Versions and Deletes mode aka Full Fidelity Change Feed is in preview. This mode of the change feed includes all updates and deletes. This is quite different from latest version mode. Latest version mode returns documents when polled just as they are stored in containers. The change feed simply indexes them based on their modified time stamp. In All Version and Deletes (AVAD) mode a document is returned that describes the changes. You must account for this schema change to fully leverage the AVAD change feed.


A document returned from the latest version change feed mode looks identical to the document stored in your container:



{
    "value": "some data",
    "time": "12:26 pm",
    "id": "e73cc32e-bfb7-4f64-911f-4a28dc192ab2",
    "_rid": "alAdAO12GpgTAAAAAAAAAA==",
    "_self": "...redacted...",
    "_etag": "\"1a00a336-0000-0500-0000-653006f60000\"",
    "_attachments": "attachments/",
    "_ts": 1697646326
}



Whereas in AVAD mode the list of changes is returned. The schema will be determined by the language and the library you are using. Here is an example of the data returned using the azure-cosmos-spark library.



{
    "fields": [...array redacted...],
    "rows": [
        {
            "_rawBody": "{}",
            "id": "fb9ed13c-ef6e-4370-b8f2-1430d3761acf",
            "original_ts": "1697640489",
            "_etag": "...",
            "_lsn": "41",
            "metadata": "{\"lsn\":41,\"crts\":1697640499,\"operationType\":\"delete\",\"previousImageLSN\":40}",
            "previous": "<< PREVIOUS VERSION OF DOCUMENT >>",
            "operationType": "delete",
            "crts": "1697640499",
            "previousImageLSN": "40",
            "copiedAt": "1697640500601"
        },
        {
            "_rawBody": "<< CURRENT DOCUMENT AFTER UPDATE >>",
            "id": "fb9ed13c-ef6e-4370-b8f2-1430d3761acf",
            "original_ts": "1697640489",
            "_etag": "\"1200368d-0000-0500-0000-652ff0290000\"",
            "_lsn": "40",
            "metadata": "{\"lsn\":40,\"crts\":1697640489,\"operationType\":\"replace\",\"previousImageLSN\":39}",
            "previous": "NULL",
            "operationType": "replace",
            "crts": "1697640489",
            "previousImageLSN": "39",
            "copiedAt": "1697640490731"
        }
    ]
}


The "fields" field provides a list of the document's properties. The "rows" field is an array of changes from the change feed. Each row includes several fields that describe the operation that took place. "_rawBody" is the output of the change. As you can see the output from a delete is null. The output from the replace operation is the new document. Currently previous version is not supported for replace operations, but it is for the delete operations.


Using this metadata, you can retrieve a delete operation then take action to find and delete the document based on the data in the "previous" field. When updating data to reflect a replace operation, you can use the data from the raw body. The LSN field allows you to find various versions of the document for greater control.


Currently, only .NET and Java are supported for interacting with AVAD change feed mode. When using the Azure Cosmos .NET library, the following properties of interest are returned:


  • Resource - IEnumerable of document changes since the last check

    • Current - current version of document

    • Previous - previous version of document

    • Metadata

      • OperationType - create, update, delete

      • TimeToLiveExpired - was the delete due to TTL expiration


Enabling All Versions and Deletes Mode


AVAD mode is only supported for the Azure Cosmos DB API for NoSQL. Continuous backups must be configured on the account. Enable the feature by using the "Preview Features" blade in the Azure portal. Once enabled, the feature will be in a "pending" state and you will receive an email requesting additional information such as the subscription and the Azure Cosmos DB account.



Using Spark


In this example I will be using Microsoft Fabric Spark, but Synapse Spark or Azure Databricks will also work. I leveraged the azure-cosmos library from PyPI version 4.3.1.


The change feed configuration is the most important aspect of the code to follow. Most of it is standard for interacting with the change feed in Spark. The configuration property to pay note to is the change feed mode. This must be set to "AllVersionsAndDeletes" instead of "Incremental". Also, the "startFrom" must be set to "Now". If your write stream includes a valid checkpoint, then the stream will always read from the checkpoint regardless of the value of the "startFrom" property.



import uuid
import datetime
import time
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
 
changeFeedCfg = {
  "spark.cosmos.accountEndpoint": cosmos_uri,
  "spark.cosmos.accountKey": cosmos_key,
  "spark.cosmos.database": "myDatabase",
  "spark.cosmos.container": "myContainer",
  "spark.cosmos.read.partitioning.strategy": "Restrictive",
  "spark.cosmos.read.inferSchema.enabled" : "false",
  "spark.cosmos.changeFeed.startFrom" : "Now",
  "spark.cosmos.changeFeed.mode" : "AllVersionsAndDeletes",
}
 
nowUdf= udf(lambda : int(time.time() * 1000),LongType())

Next, we will look at the read stream and write stream from Spark.


changeFeedDF = spark \
    .readStream  \
    .format("cosmos.oltp.changeFeed") \
    .options(**changeFeedCfg) \
    .load()

df_withTimestamps = changeFeedDF \
    .withColumnRenamed("_ts","original_ts") \
    .withColumnRenamed("insertedAt","original_insertedAt") \
    .withColumn("copiedAt", nowUdf())


microBatchQuery = df_withTimestamps \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .queryName("myQueryName") \
    .option("checkpointLocation", "Files/cosmos_ffcf_checkpoint/colors/") \
    .toTable("myTable")

The write stream writes the output data to a Delta Lake table in Fabric OneLake. I can then query this table to see the operations and write additional code to take actions based on these entries.



df = spark.sql("SELECT * FROM myLakehouse.mytable")
display(df)


Using .NET


The required version of the .NET library is Microsoft.Azure.Cosmos -Version 3.32.0-preview. Once this library is installed you can interact with the change feed using the pull model. This is well documented here and I will only reference relevant pieces of code in the following snippets.


FeedIterator<VersionsAndDeletesResponse> 
iteratorForTheEntireContainer = 
container.GetChangeFeedIterator<VersionsAndDeletesResponse>(
                ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

When the "GetChangeFeedIterator()" method of the container class is called you must provide a definition for the document class (VersionsAndDeleteResponse). "ChangefeedStartFrom" can be defined as "Now()" or a continutation token.



string allVersionsContinuationToken = "<< YOUR TOKEN >>";

FeedIterator<VersionsAndDeletesResponse> 
iteratorForTheEntireContainer =                container.GetChangeFeedIterator<VersionsAndDeletesResponse>(              ChangeFeedStartFrom.ContinuationToken(allVersionsContinuationToken), ChangeFeedMode.AllVersionsAndDeletes);

Once a response is received you can iterate through the objects array called "resources" as outlined earlier in the introduction. Make sure to store your checkpoint somewhere to be used later. You can now take whatever action that makes sense for your use case.


Here is a more thorough example using .NET:



Summary


The All Versions and Deletes mode of the Azure Cosmos DB change feed is a preview feature that has been requested by my customers for a long time. Try this feature in your environment and don't forget to provide feedback to the Azure Cosmos DB team on your use case and experience working with the new mode.






70 views0 comments
bottom of page