{"id":3133,"date":"2021-07-06T09:00:11","date_gmt":"2021-07-06T16:00:11","guid":{"rendered":"https:\/\/devblogs.microsoft.com\/cosmosdb\/?p=3133"},"modified":"2021-07-20T17:19:48","modified_gmt":"2021-07-21T00:19:48","slug":"kafka-azure-cosmos-db-docker","status":"publish","type":"post","link":"https:\/\/devblogs.microsoft.com\/cosmosdb\/kafka-azure-cosmos-db-docker\/","title":{"rendered":"Getting started with Kafka Connector for Azure Cosmos DB using Docker"},"content":{"rendered":"<div>\n<div>\n<p class=\"code-line\" data-line=\"2\">Having a local development environment is quite handy when trying out a new service or technology.\u00a0<a title=\"https:\/\/docs.docker.com\/\" href=\"https:\/\/docs.docker.com\/\" data-href=\"https:\/\/docs.docker.com\/\">Docker<\/a>\u00a0has emerged as the de-facto choice in such cases. It is specially useful in scenarios where you&#8217;re trying to integrate multiple services and gives you the ability to to start fresh before each run.<\/p>\n<p class=\"code-line\" data-line=\"4\">This blog post is a getting started guide for the Kafka Connector for Azure Cosmos DB. All the components (including Azure Cosmos DB) will run on your local machine, thanks to:<\/p>\n<ul>\n<li class=\"code-line\" data-line=\"6\">The\u00a0<a title=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu\" href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu\" data-href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu\">Azure Cosmos DB Linux Emulator<\/a>\u00a0which can be used for local development and testing purposes without creating an Azure subscription or incurring any costs.<\/li>\n<li class=\"code-line\" data-line=\"7\">And,\u00a0<a title=\"https:\/\/docs.docker.com\/compose\/\" href=\"https:\/\/docs.docker.com\/compose\/\" data-href=\"https:\/\/docs.docker.com\/compose\/\">Docker Compose<\/a>\u00a0which is a tool for defining and running multi-container Docker applications. It will orchestrate all the components required by our setup including Azure Cosmos DB emulator, Kafka, Zookeeper, Kafka connectors etc.<\/li>\n<\/ul>\n<\/div>\n<\/div>\n<div>\n<div>\n<p>&nbsp;<\/p>\n<p><a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/diagram.jpg\"><img decoding=\"async\" class=\"aligncenter wp-image-3136 size-full\" src=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/diagram.jpg\" alt=\"Image diagram\" width=\"747\" height=\"420\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/diagram.jpg 747w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/diagram-300x169.jpg 300w\" sizes=\"(max-width: 747px) 100vw, 747px\" \/><\/a><\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"11\">To make things easier, we will pick single-focused scenarios and go step by step:<\/p>\n<ul>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"13\">Step 0 &#8211; A simple scenario to check if our setup if functional.<\/li>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"14\">How to handle streaming JSON data<\/li>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"15\">How to handle streaming JSON data which is\u00a0<em>not<\/em>\u00a0compatible with Azure Cosmos DB<\/li>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"16\">How to handle Avro data using Schema Registry<\/li>\n<\/ul>\n<\/div>\n<p><div class=\"alert alert-primary\">It is assumed that you&#8217;re comfortable with Kafka and have an understanding of Kafka Connect<\/div><\/p>\n<h2 id=\"first-things-first\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"20\">First things first&#8230;<\/h2>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"22\">&#8230; here is a quick overview of the the Azure Cosmos DB Emulator and the Kafka Connector.<\/p>\n<p data-line=\"22\">The Azure Cosmos DB connector allows you to move data between Azure Cosmos DB and Kafka. It&#8217;s available as a source as well as a sink. The Azure Cosmos DB Sink connector writes data from a Kafka topic to an Azure Cosmos DB container and the Source connector writes changes from an Azure Cosmos DB container to a Kafka topic. At the time of writing, the connector is in <code>pre-production<\/code>\u00a0mode. You can read more about it on the\u00a0<a title=\"https:\/\/github.com\/microsoft\/kafka-connect-cosmosdb\" href=\"https:\/\/github.com\/Azure-Samples\/cosmosdb-kafka-connect-docker\" data-href=\"https:\/\/github.com\/microsoft\/kafka-connect-cosmosdb\">GitHub repo<\/a>\u00a0or install\/download it from the\u00a0<a title=\"https:\/\/www.confluent.io\/hub\/microsoftcorporation\/kafka-connect-cosmos\" href=\"https:\/\/www.confluent.io\/hub\/microsoftcorporation\/kafka-connect-cosmos\" data-href=\"https:\/\/www.confluent.io\/hub\/microsoftcorporation\/kafka-connect-cosmos\">Confluent Hub<\/a>.<\/p>\n<p data-line=\"22\">The\u00a0<a title=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu\" href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu\" data-href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu\">Azure Cosmos DB Linux Emulator<\/a>\u00a0provides a local environment that emulates the Azure Cosmos DB service for development purposes (currently, it only supports SQL API). It provides a high-fidelity emulation of the Azure Cosmos DB service and supports functionality such as creating data, querying data, provisioning and scaling containers, and executing stored procedures and triggers.<\/p>\n<p data-line=\"22\"><div class=\"alert alert-info\">At the time of writing, the Azure Cosmos DB Linux Emulator is in preview.<\/div><\/p>\n<p data-line=\"22\">You can dive into how to use the emulator on\u00a0<a title=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#run-on-macos\" href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#run-on-macos\" data-href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#run-on-macos\">macOS<\/a>\u00a0or\u00a0<a title=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#run-on-linux\" href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#run-on-linux\" data-href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#run-on-linux\">Linux<\/a>,\u00a0<a title=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#differences-between-the-linux-emulator-and-the-cloud-service\" href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#differences-between-the-linux-emulator-and-the-cloud-service\" data-href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#differences-between-the-linux-emulator-and-the-cloud-service\">how is it different<\/a>\u00a0from the Azure Cosmos DB cloud service,\u00a0<a title=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#troubleshoot-issues\" href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#troubleshoot-issues\" data-href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/linux-emulator?tabs=ssl-netstd21&amp;WT.mc_id=data-30458-abhishgu#troubleshoot-issues\">troubleshoot issues<\/a>, etc.<\/p>\n<h2 id=\"before-you-start\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"32\">Before you start&#8230;<\/h2>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"34\">Make sure you have\u00a0<a title=\"https:\/\/docs.docker.com\/engine\/install\/\" href=\"https:\/\/docs.docker.com\/engine\/install\/\" data-href=\"https:\/\/docs.docker.com\/engine\/install\/\">Docker<\/a>\u00a0and\u00a0<a title=\"https:\/\/docs.docker.com\/compose\/install\/\" href=\"https:\/\/docs.docker.com\/compose\/install\/\" data-href=\"https:\/\/docs.docker.com\/compose\/install\/\">docker-compose<\/a>\u00a0installed.<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"36\">Also, clone the project from GitHub:<\/p>\n<pre class=\"prettyprint\">git clone https:\/\/github.com\/Azure-Samples\/cosmosdb-kafka-connect-docker\r\ncd cosmosdb-kafka-connect-docker<\/pre>\n<h2 id=\"start-all-the-services\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"43\">Start all the services<\/h2>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"45\">All the components are defined in the <a href=\"https:\/\/github.com\/Azure-Samples\/cosmosdb-kafka-connect-docker\">docker-compose<\/a> file:<\/p>\n<ul>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"47\">Azure Cosmos DB emulator<\/li>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"48\">Kafka and Zookeeper<\/li>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"49\">Azure Cosmos DB and Datagen connectors (run as separate Kafka Connect workers)<\/li>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"50\">Confluent Schema Registry<\/li>\n<\/ul>\n<p>Thanks to Docker Compose, the environment can be brought up with a single command. When you run this for this the first time, it may take a while for the containers to be downloaded (subsequent executions are faster). Optionally, you can also download the images separately before starting Docker Compose:<\/p>\n<pre class=\"prettyprint\">(optional)\r\ndocker pull confluentinc\/cp-zookeeper:latest\r\ndocker pull confluentinc\/cp-kafka:latest\r\ndocker pull confluentinc\/cp-schema-registry:latest<\/pre>\n<p>To start all the services:<\/p>\n<pre class=\"prettyprint\">docker-compose -p cosmosdb-kafka-docker up --build<\/pre>\n<p>After a couple of minutes, check the containers:<\/p>\n<pre class=\"prettyprint\">docker-compose -p cosmosdb-kafka-docker ps\r\n\r\n# expected output\r\n\r\n               Name                              Command                  State                     Ports               \r\n------------------------------------------------------------------------------------------------------------------------\r\ncosmosdb                              \/usr\/local\/bin\/cosmos\/start.sh   Up             0.0.0.0:10251-&gt;10251\/tcp,:::10251-\r\n                                                                                      &gt;10251\/tcp, 0.0.0.0:10252-&gt;10252\/t\r\n                                                                                      cp,:::10252-&gt;10252\/tcp, 0.0.0.0:10\r\n                                                                                      253-&gt;10253\/tcp,:::10253-&gt;10253\/tcp\r\n                                                                                      , 0.0.0.0:10254-&gt;10254\/tcp,:::1025\r\n                                                                                      4-&gt;10254\/tcp, 0.0.0.0:8081-&gt;8081\/t\r\n                                                                                      cp,:::8081-&gt;8081\/tcp              \r\ncosmosdb-kafka-docker_cosmosdb-       \/etc\/confluent\/docker\/run        Up (healthy)   0.0.0.0:8083-&gt;8083\/tcp,:::8083-&gt;80\r\nconnector_1                                                                           83\/tcp, 9092\/tcp                  \r\ncosmosdb-kafka-docker_datagen-        \/etc\/confluent\/docker\/run        Up (healthy)   0.0.0.0:8080-&gt;8080\/tcp,:::8080-&gt;80\r\nconnector_1                                                                           80\/tcp, 8083\/tcp, 9092\/tcp        \r\nkafka                                 \/etc\/confluent\/docker\/run        Up             0.0.0.0:29092-&gt;29092\/tcp,:::29092-\r\n                                                                                      &gt;29092\/tcp, 0.0.0.0:9092-&gt;9092\/tcp\r\n                                                                                      ,:::9092-&gt;9092\/tcp                \r\nschema-registry                       \/etc\/confluent\/docker\/run        Up             0.0.0.0:9090-&gt;8081\/tcp,:::9090-&gt;80\r\n                                                                                      81\/tcp                            \r\nzookeeper                             \/etc\/confluent\/docker\/run        Up             0.0.0.0:2181-&gt;2181\/tcp,:::2181-&gt;21\r\n                                                                                      81\/tcp, 2888\/tcp, 3888\/tcp<\/pre>\n<p data-line=\"101\">Once all the services as up and running, the next logical step is to install the connector, right? Well, there a couple of things we need to take care of. For Java apps to connect to the Azure Cosmos DB emulator, you need to have\u00a0<a title=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/local-emulator-export-ssl-certificates?WT.mc_id=data-30458-abhishgu#use-the-certificate-with-java-apps\" href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/local-emulator-export-ssl-certificates?WT.mc_id=data-30458-abhishgu#use-the-certificate-with-java-apps\" data-href=\"https:\/\/docs.microsoft.com\/azure\/cosmos-db\/local-emulator-export-ssl-certificates?WT.mc_id=data-30458-abhishgu#use-the-certificate-with-java-apps\">certificates installed in the Java certificate store<\/a>. In this case, we will seed the certificate from the Azure Cosmos DB emulator container to the Cosmos DB Kafka Connect container.<\/p>\n<p data-line=\"101\"><div class=\"alert alert-success\">Although this process can be automated, I am doing it manually to make things clear.<\/div><\/p>\n<h3 id=\"configure-emulator-certificates\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"101\">Configure Azure Cosmos DB Emulator Certificates<\/h3>\n<p>Execute this command to store the certificate in the Java certificate store (using\u00a0<code>docker exec<\/code>):<\/p>\n<pre class=\"prettyprint\">docker exec --user root -it cosmosdb-kafka-docker_cosmosdb-connector_1 \/bin\/bash\r\n\r\n# execute the below command inside the container\r\ncurl -k https:\/\/cosmosdb:8081\/_explorer\/emulator.pem &gt; ~\/emulatorcert.crt &amp;&amp; keytool -noprompt -storepass changeit -keypass changeit -keystore \/usr\/lib\/jvm\/zulu11-ca\/lib\/security\/cacerts -importcert -alias emulator_cert -file ~\/emulatorcert.crt<\/pre>\n<p><div class=\"alert alert-primary\">You should see this output &#8211; Certificate was added to keystore<\/div><\/p>\n<\/div>\n<p>And, one last thing before we proceed&#8230;<\/p>\n<h3 id=\"create-cosmos-db-database-and-containers\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"118\">Create Azure Cosmos DB database and containers<\/h3>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"120\">Access the Azure Cosmos DB emulator portal at\u00a0<a title=\"https:\/\/localhost:8081\/_explorer\/index.html\" href=\"https:\/\/localhost:8081\/_explorer\/index.html\" data-href=\"https:\/\/localhost:8081\/_explorer\/index.html\">https:\/\/localhost:8081\/_explorer\/index.html<\/a>\u00a0and create the below resources:<\/p>\n<ul>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"122\">Database named\u00a0<code>testdb<\/code><\/li>\n<li class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"123\">Containers &#8211;\u00a0<code>inventory<\/code>,\u00a0<code>orders<\/code>,\u00a0<code>orders_avro<\/code>\u00a0(ensure that the partition key for all the containers is\u00a0<code>\/id<\/code>)<\/li>\n<\/ul>\n<p><a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/cosmosdb-objects.png\"><img decoding=\"async\" class=\"aligncenter wp-image-3139 size-full\" src=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/cosmosdb-objects.png\" alt=\"Image cosmosdb objects\" width=\"2500\" height=\"892\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/cosmosdb-objects.png 2500w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/cosmosdb-objects-300x107.png 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/cosmosdb-objects-1024x366.png 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/cosmosdb-objects-768x274.png 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/cosmosdb-objects-1536x548.png 1536w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/cosmosdb-objects-2048x731.png 2048w\" sizes=\"(max-width: 2500px) 100vw, 2500px\" \/><\/a><\/p>\n<h2 id=\"scenarios\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"129\">Let&#8217;s explore all the scenarios<\/h2>\n<p data-line=\"131\">To start off let&#8217;s look at the basic scenario. Before trying out other things, we want to make sure everything is functional.<\/p>\n<h3 id=\"1-hello-world\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"131\">1. Hello world!<\/h3>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"135\">Start the inventory data connector for Cosmos DB:<\/p>\n<pre class=\"prettyprint\">curl -X POST -H \"Content-Type: application\/json\" -d @cosmosdb-inventory-connector_1.json http:\/\/localhost:8083\/connectors\r\n\r\n# to check the connector status\r\ncurl http:\/\/localhost:8083\/connectors\/inventory-sink\/status<\/pre>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"144\">To test the end to end flow, send a few records to the\u00a0<code>inventory_topic<\/code> topic in Kafka:<\/p>\n<pre class=\"prettyprint\">docker exec -it kafka bash -c 'cd \/usr\/bin &amp;&amp; kafka-console-producer --topic inventory_topic --bootstrap-server kafka:29092'<\/pre>\n<p data-line=\"144\">Once the prompt is ready, send the JSON records one by one:<\/p>\n<pre class=\"prettyprint\">{\"id\": \"5000\",\"quantity\": 100,\"productid\": 42}\r\n{\"id\": \"5001\",\"quantity\": 99,\"productid\": 43}\r\n{\"id\": \"5002\",\"quantity\": 98,\"productid\": 44}<\/pre>\n<p data-line=\"144\">Check Cosmos DB container to confirm if records have been saved. Navigate to the the portal\u00a0<a title=\"https:\/\/localhost:8081\/_explorer\/index.html\" href=\"https:\/\/localhost:8081\/_explorer\/index.html\" data-href=\"https:\/\/localhost:8081\/_explorer\/index.html\">https:\/\/localhost:8081\/_explorer\/index.html<\/a>\u00a0and check the\u00a0<code>inventory<\/code>\u00a0container:<\/p>\n<p data-line=\"144\"><a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result1.png\"><img decoding=\"async\" class=\"aligncenter size-full wp-image-3140\" src=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result1.png\" alt=\"Image result1\" width=\"2974\" height=\"902\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result1.png 2974w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result1-300x91.png 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result1-1024x311.png 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result1-768x233.png 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result1-1536x466.png 1536w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result1-2048x621.png 2048w\" sizes=\"(max-width: 2974px) 100vw, 2974px\" \/><\/a><\/p>\n<p data-line=\"144\">Ok, it worked! Let&#8217;s go ahead and do something slightly more useful. Before moving on, delete the\u00a0<code>inventory<\/code>\u00a0connector.<\/p>\n<pre class=\"prettyprint\">curl -X DELETE http:\/\/localhost:8083\/connectors\/inventory-sink\/<\/pre>\n<h3 id=\"2-sync-streaming-data-json-format-from-kafka-to-azure-cosmos-db\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"168\">2. Sync streaming data (JSON format) from Kafka to Azure Cosmos DB<\/h3>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"170\">For the remaining scenarios, we will use a producer component to generate records. The\u00a0<a title=\"https:\/\/github.com\/confluentinc\/kafka-connect-datagen\" href=\"https:\/\/github.com\/confluentinc\/kafka-connect-datagen\" data-href=\"https:\/\/github.com\/confluentinc\/kafka-connect-datagen\">Kafka Connect Datagen connector<\/a>\u00a0is our friend. It is meant for generating mock data for testing, so let&#8217;s put it to good use!<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"172\">Start an instance of the Azure Cosmos DB connector:<\/p>\n<pre class=\"prettyprint\">curl -X POST -H \"Content-Type: application\/json\" -d @cosmosdb-inventory-connector_2.json http:\/\/localhost:8083\/connectors\r\n\r\n# to check the connector status\r\ncurl http:\/\/localhost:8083\/connectors\/inventory-sink\/status<\/pre>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"181\">Once it&#8217;s ready, go ahead and start the Datagen connector which will generate mock inventory data in JSON format:<\/p>\n<pre class=\"prettyprint\">curl -X POST -H \"Content-Type: application\/json\" -d @datagen-inventory-connector.json http:\/\/localhost:8080\/connectors\r\n\r\n# to check the connector status\r\ncurl http:\/\/localhost:8080\/connectors\/datagen-inventory\/status<\/pre>\n<p data-line=\"181\"><div class=\"alert alert-primary\">Note that we use port 8080 for the Datagen connector since it&#8217;s running in a separate Kafka Connect container<\/div><\/p>\n<p data-line=\"181\">To look at the data produced by the Datagen connector, check the\u00a0<code>inventory_topic1<\/code>\u00a0Kafka topic:<\/p>\n<pre class=\"prettyprint\">docker exec -it kafka bash -c 'cd \/usr\/bin &amp;&amp; kafka-console-consumer --topic inventory_topic1 --bootstrap-server kafka:29092'<\/pre>\n<p data-line=\"181\">Notice the data (it maybe different in your case):<\/p>\n<pre class=\"prettyprint\">{\"id\":5,\"quantity\":5,\"productid\":5}\r\n{\"id\":6,\"quantity\":6,\"productid\":6}\r\n{\"id\":7,\"quantity\":7,\"productid\":7}\r\n...<\/pre>\n<p data-line=\"181\"><div class=\"alert alert-info\">Note that id\u00a0has a\u00a0Integer\u00a0value<\/div><\/p>\n<p data-line=\"181\">Check Azure Cosmos DB containers to confirm if records have been saved. Navigate to the the portal\u00a0<a title=\"https:\/\/localhost:8081\/_explorer\/index.html\" href=\"https:\/\/localhost:8081\/_explorer\/index.html\" data-href=\"https:\/\/localhost:8081\/_explorer\/index.html\">https:\/\/localhost:8081\/_explorer\/index.html<\/a>\u00a0and check the\u00a0<code>inventory<\/code>\u00a0container:<\/p>\n<p data-line=\"181\"><a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result2.png\"><img decoding=\"async\" class=\"aligncenter size-full wp-image-3141\" src=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result2.png\" alt=\"Image result2\" width=\"3022\" height=\"1332\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result2.png 2500w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result2-300x132.png 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result2-1024x451.png 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result2-768x339.png 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result2-1536x677.png 1536w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result2-2048x903.png 2048w\" sizes=\"(max-width: 3022px) 100vw, 3022px\" \/><\/a><\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"213\">The records in Cosmos DB have an\u00a0<code>id<\/code>\u00a0attribute of\u00a0<code>String<\/code>\u00a0data type. The original data in the Kafka topic had an\u00a0<code>id<\/code>\u00a0attribute of\u00a0<code>Integer<\/code>\u00a0type &#8211; but that would not have worked, since\u00a0<a title=\"https:\/\/docs.microsoft.com\/rest\/api\/cosmos-db\/documents?WT.mc_id=data-30458-abhishgu\" href=\"https:\/\/docs.microsoft.com\/rest\/api\/cosmos-db\/documents?WT.mc_id=data-30458-abhishgu\" data-href=\"https:\/\/docs.microsoft.com\/rest\/api\/cosmos-db\/documents?WT.mc_id=data-30458-abhishgu\">Azure Cosmos DB requires\u00a0<code>id<\/code>\u00a0to be a unique user-defined string<\/a>. This conversion was made possible by a\u00a0<a title=\"https:\/\/docs.confluent.io\/platform\/current\/connect\/transforms\/cast.html\" href=\"https:\/\/docs.confluent.io\/platform\/current\/connect\/transforms\/cast.html\" data-href=\"https:\/\/docs.confluent.io\/platform\/current\/connect\/transforms\/cast.html\">Kafka Connect transform<\/a>\u00a0&#8211;\u00a0<code>Cast<\/code>\u00a0updates fields (or the entire key or value) to a specific type, updating the schema if one is present.<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"215\">Here is the part in the <a href=\"https:\/\/github.com\/Azure-Samples\/cosmosdb-kafka-connect-docker\/blob\/master\/cosmosdb-inventory-connector_2.json#L15\">connector configuration<\/a> which did the trick:<\/p>\n<pre class=\"prettyprint\">\"transforms\": \"Cast\",\r\n\"transforms.Cast.type\": \"org.apache.kafka.connect.transforms.Cast$Value\",\r\n\"transforms.Cast.spec\": \"id:string\"<\/pre>\n<p data-line=\"215\">Before moving on, delete the Cosmos DB and Datagen\u00a0<code>inventory<\/code>\u00a0connectors.<\/p>\n<pre class=\"prettyprint\">curl -X DELETE http:\/\/localhost:8080\/connectors\/datagen-inventory\r\ncurl -X DELETE http:\/\/localhost:8083\/connectors\/inventory-sink\/<\/pre>\n<h3 id=\"3-push-streaming-orders-data-json-format-from-kafka-to-azure-cosmos-db\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"230\">3. Push streaming Orders data (JSON format) from Kafka to Azure Cosmos DB<\/h3>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"232\">Now, let&#8217;s switch gears and use the same data (JSON formatted) data, but with a slight twist. We will use a variant of the Datagen connector to generate mock orders data and adjust the Cosmos DB connector as well.<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"234\">To install a different instance of the Azure Cosmos DB connector:<\/p>\n<pre class=\"prettyprint\">curl -X POST -H \"Content-Type: application\/json\" -d @cosmosdb-orders-connector_1.json http:\/\/localhost:8083\/connectors\r\n\r\n# to check the connector status\r\ncurl http:\/\/localhost:8083\/connectors\/orders-sink\/status<\/pre>\n<p data-line=\"234\">Install the Datagen orders connector:<\/p>\n<pre class=\"prettyprint\">curl -X POST -H \"Content-Type: application\/json\" -d @datagen-orders-connector.json http:\/\/localhost:8080\/connectors\r\n\r\n# to check the connector status\r\ncurl http:\/\/localhost:8080\/connectors\/datagen-orders\/status<\/pre>\n<p data-line=\"234\">To look at the data produced by the Datagen connector, check the\u00a0<code>orders<\/code>\u00a0Kafka topic:<\/p>\n<pre class=\"prettyprint\">docker exec -it kafka bash -c 'cd \/usr\/bin &amp;&amp; kafka-console-consumer --topic orders_topic --bootstrap-server kafka:29092'<\/pre>\n<p data-line=\"234\">Notice the data (it maybe different in your case):<\/p>\n<pre class=\"prettyprint\">{\"ordertime\":1496251410176,\"orderid\":3,\"itemid\":\"Item_869\",\"orderunits\":3.2897805449886226,\"address\":{\"city\":\"City_99\",\"state\":\"State_46\",\"zipcode\":50570}}\r\n\r\n{\"ordertime\":1500129505219,\"orderid\":4,\"itemid\":\"Item_339\",\"orderunits\":3.6719921257659918,\"address\":{\"city\":\"City_84\",\"state\":\"State_55\",\"zipcode\":88573}}\r\n\r\n{\"ordertime\":1498873571020,\"orderid\":5,\"itemid\":\"Item_922\",\"orderunits\":8.4506812669258,\"address\":{\"city\":\"City_48\",\"state\":\"State_66\",\"zipcode\":55218}}\r\n\r\n{\"ordertime\":1513855504436,\"orderid\":6,\"itemid\":\"Item_545\",\"orderunits\":7.82561522361042,\"address\":{\"city\":\"City_44\",\"state\":\"State_71\",\"zipcode\":87868}}\r\n...<\/pre>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"271\">I chose Orders data on purpose since it is different compared to the Inventory data. Notice that JSON record produced by the Datagen connector has a\u00a0<code>orderid<\/code>\u00a0attribute (Integer data type), but no\u00a0<code>id<\/code> attribute &#8211; but we know that Azure Cosmos DB won&#8217;t work without one.<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"273\">Check Cosmos DB containers to confirm if records have been saved. Navigate to the the portal\u00a0<a title=\"https:\/\/localhost:8081\/_explorer\/index.html\" href=\"https:\/\/localhost:8081\/_explorer\/index.html\" data-href=\"https:\/\/localhost:8081\/_explorer\/index.html\">https:\/\/localhost:8081\/_explorer\/index.html<\/a>\u00a0and check the\u00a0<code>orders<\/code>\u00a0container:<\/p>\n<p data-line=\"273\"><a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result3.png\"><img decoding=\"async\" class=\"aligncenter size-full wp-image-3142\" src=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result3.png\" alt=\"Image result3\" width=\"3032\" height=\"1426\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result3.png 3032w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result3-300x141.png 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result3-1024x482.png 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result3-768x361.png 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result3-1536x722.png 1536w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result3-2048x963.png 2048w\" sizes=\"(max-width: 3032px) 100vw, 3032px\" \/><\/a><\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"277\">Notice that there is no\u00a0<code>orderid<\/code>\u00a0attribute in the records stored in Azure Cosmos DB. In fact, it has been replaced by the\u00a0<code>id<\/code>\u00a0attribute (with a\u00a0<code>String<\/code> value). This achieved by the <a title=\"https:\/\/docs.confluent.io\/platform\/current\/connect\/transforms\/replacefield.html\" href=\"https:\/\/docs.confluent.io\/platform\/current\/connect\/transforms\/replacefield.html\" data-href=\"https:\/\/docs.confluent.io\/platform\/current\/connect\/transforms\/replacefield.html\">ReplaceField transformer<\/a>.<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"279\">Here is the part in the <a href=\"https:\/\/github.com\/Azure-Samples\/cosmosdb-kafka-connect-docker\/blob\/master\/cosmosdb-orders-connector_1.json#L16\">connector configuration<\/a> which made this possible:<\/p>\n<pre class=\"prettyprint\">\"transforms\": \"RenameField,Cast\",\r\n\"transforms.RenameField.type\": \"org.apache.kafka.connect.transforms.ReplaceField$Value\",\r\n\"transforms.RenameField.renames\": \"orderid:id\",\r\n\"transforms.Cast.type\": \"org.apache.kafka.connect.transforms.Cast$Value\",\r\n\"transforms.Cast.spec\": \"id:string\"<\/pre>\n<p data-line=\"279\"><div class=\"alert alert-warning\">Depending on your use case, removing\/renaming a field altogether may not be an ideal solution. However, it&#8217;s good to know that there are options. Also, remember that the original data in Kafka topics is still there, untouched. Other downstream apps can still leverage it.<\/div><\/p>\n<p data-line=\"279\">Before moving on, delete the Cosmos DB and Datagen\u00a0<code>inventory<\/code>\u00a0connectors.<\/p>\n<pre class=\"prettyprint\">curl -X DELETE http:\/\/localhost:8080\/connectors\/datagen-orders\r\ncurl -X DELETE http:\/\/localhost:8083\/connectors\/orders-sink\/<\/pre>\n<h3 id=\"4-push-streaming-orders-data-avro-format-from-kafka-to-azure-cosmos-db\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"298\">4. Push streaming Orders data (AVRO format) from Kafka to Azure Cosmos DB<\/h3>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"300\">So far we dealt with JSON, a commonly used data format. But,\u00a0<a title=\"https:\/\/avro.apache.org\/\" href=\"https:\/\/avro.apache.org\/\" data-href=\"https:\/\/avro.apache.org\/\">Avro<\/a>\u00a0is heavily used in production due to its compact format which leads to better performance and cost savings. To make it easier to deal with\u00a0<code>Avro<\/code>\u00a0data schema, there is\u00a0<a title=\"https:\/\/docs.confluent.io\/platform\/current\/schema-registry\/index.html\" href=\"https:\/\/docs.confluent.io\/platform\/current\/schema-registry\/index.html\" data-href=\"https:\/\/docs.confluent.io\/platform\/current\/schema-registry\/index.html\">Confluent Schema Registry<\/a>\u00a0which provides a serving layer for your metadata along with a RESTful interface for storing and retrieving your Avro (as well as JSON and Protobuf schemas). We will use\u00a0<a title=\"https:\/\/hub.docker.com\/r\/confluentinc\/cp-schema-registry\" href=\"https:\/\/hub.docker.com\/r\/confluentinc\/cp-schema-registry\" data-href=\"https:\/\/hub.docker.com\/r\/confluentinc\/cp-schema-registry\">the Docker version<\/a>\u00a0for the purposes of this blog post.<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"302\">Install a new instance of the Azure Cosmos DB connector that can handle\u00a0<code>Avro<\/code>\u00a0data:<\/p>\n<pre class=\"prettyprint\">curl -X POST -H \"Content-Type: application\/json\" -d @cosmosdb-orders-connector_2.json http:\/\/localhost:8083\/connectors\r\n\r\n# to check the connector status\r\ncurl http:\/\/localhost:8083\/connectors\/orders-sink\/status<\/pre>\n<p data-line=\"302\">Install Datagen connector which will generate mock orders data in\u00a0<code>Avro<\/code>\u00a0format:<\/p>\n<pre class=\"prettyprint\">curl -X POST -H \"Content-Type: application\/json\" -d @datagen-orders-connector-avro.json http:\/\/localhost:8080\/connectors\r\n\r\n# to check the connector status\r\ncurl http:\/\/localhost:8080\/connectors\/datagen-orders\/status<\/pre>\n<p data-line=\"302\">To look at the\u00a0<code>Avro<\/code>\u00a0data produced by the Datagen connector, check the\u00a0<code>orders_avro_topic<\/code>\u00a0Kafka topic:<\/p>\n<pre class=\"prettyprint\">docker exec -it kafka bash -c 'cd \/usr\/bin &amp;&amp; kafka-console-consumer --topic orders_avro_topic --bootstrap-server kafka:29092'<\/pre>\n<p data-line=\"302\">Since the <code>Avro<\/code> data in binary format, it&#8217;s not human readable:<\/p>\n<pre class=\"prettyprint\">\ufffd\ufffd\ufffd\ufffd\ufffdVItem_185lqf\ufffd@City_61State_73\ufffd\ufffd\r\n\ufffd\ufffd\ufffd\ufffdWItem_219[\ufffdC\ufffd\ufffd@City_74State_77\ufffd\ufffd\r\n\ufffd\ufffd\ufffd\ufffd\ufffdVItem_7167Ix\ufffddF\ufffd?City_53State_53\ufffd\ufffd\r\n\ufffd\ufffd\ufffd\u05a9WItem_126*\ufffd\ufffd\ufffd?@City_58State_21\ufffd\ufffd\r\n\ufffd\ufffd\ufffd\ufffd\ufffdVItem_329X\ufffd2,@City_49State_79\ufffd\ufffd\r\n\ufffd\ufffd\ufffd\ufffd\ufffdXItem_886\ufffd\ufffd&gt;\ufffd|\ufffd@City_88State_27\ufffd\ufffd\r\n\ufffd\ue9d8\ufffdV Item_956\ufffdr#\ufffd!@City_45State_96\ufffd\ufffd\r\n\ufffd\u047c\u0495W\"Item_157E\ufffd)$\ufffd\ufffd\ufffd?City_96State_63\ufffd\ufffd\r\n...<\/pre>\n<p data-line=\"302\">Check Cosmos DB containers to confirm if records have been saved. Navigate to the the portal\u00a0<a title=\"https:\/\/localhost:8081\/_explorer\/index.html\" href=\"https:\/\/localhost:8081\/_explorer\/index.html\" data-href=\"https:\/\/localhost:8081\/_explorer\/index.html\">https:\/\/localhost:8081\/_explorer\/index.html<\/a>\u00a0and check the\u00a0<code>orders_avro<\/code>\u00a0container:<\/p>\n<p data-line=\"302\"><a href=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result4.png\"><img decoding=\"async\" class=\"aligncenter size-full wp-image-3143\" src=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result4.png\" alt=\"Image result4\" width=\"3068\" height=\"1464\" srcset=\"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result4.png 3068w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result4-300x143.png 300w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result4-1024x489.png 1024w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result4-768x366.png 768w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result4-1536x733.png 1536w, https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-content\/uploads\/sites\/52\/2021\/06\/result4-2048x977.png 2048w\" sizes=\"(max-width: 3068px) 100vw, 3068px\" \/><\/a><\/p>\n<p data-line=\"302\">Great, things work as expected! <a href=\"https:\/\/github.com\/Azure-Samples\/cosmosdb-kafka-connect-docker\/blob\/master\/cosmosdb-orders-connector_2.json\">The connector configuration<\/a> was updated to handle this:<\/p>\n<pre class=\"prettyprint\">\"value.converter\": \"io.confluent.connect.avro.AvroConverter\",\r\n\"value.converter.schemas.enable\": \"true\",\r\n\"value.converter.schema.registry.url\": \"http:\/\/schema-registry:8081\",\r\n...<\/pre>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"354\">The changes include choosing the\u00a0<code>AvroConverter<\/code>, enabling schemas and pointing to Schema Registry (in our case running locally in Docker).<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"356\">That is all for the use cases covered in this blog post. We only covered the Sink connector, but feel free to explore and experiment further! For example, you could extend the current setup to include the Source connector and configure it to send records from Azure Cosmos DB containers to Kafka.<\/p>\n<h2 id=\"clean-up\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"358\">Clean up<\/h2>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"360\">Once you have finished, you can delete the connectors:<\/p>\n<pre class=\"prettyprint\">curl -X DELETE http:\/\/localhost:8080\/connectors\/datagen-orders\r\ncurl -X DELETE http:\/\/localhost:8083\/connectors\/orders-sink\/<\/pre>\n<p data-line=\"360\">To stop all the Docker components:<\/p>\n<pre class=\"prettyprint\">docker-compose -p cosmosdb-kafka-docker down -v<\/pre>\n<h2 id=\"conclusion\" class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"373\">Conclusion<\/h2>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"375\">Although we covered simple scenarios for demonstration purposes, it goes to show how you can leverage off-the shelf solutions (Connectors, Transformers, Schema Registry etc.) and focus on the heavy lifting that your Azure Cosmos DB based application or data-pipeline requires. Since this example adopts a Docker based approach for local development, it is cost-effective (well, free!) and can be easily customised to suit your requirements.<\/p>\n<p class=\"code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line code-line\" data-line=\"377\">For production scenarios, you would need to setup, configure and operate these connectors. Kafka Connect workers are simply JVM processes, thus inherently stateless (all the state handling is offloaded to Kafka). There is a lot of flexibility in terms of your overall architecture as well as orchestration &#8211; for instance, you could run them in Kubernetes for fault-tolerance and scalability.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>This blog post is a getting started guide for the Kafka Connector for Azure Cosmos DB.<\/p>\n","protected":false},"author":38710,"featured_media":3136,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[14],"tags":[1783,1784,1775],"class_list":["post-3133","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-core-sql-api","tag-docker","tag-emulator","tag-kafka"],"acf":[],"blog_post_summary":"<p>This blog post is a getting started guide for the Kafka Connector for Azure Cosmos DB.<\/p>\n","_links":{"self":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts\/3133","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/users\/38710"}],"replies":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/comments?post=3133"}],"version-history":[{"count":0,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/posts\/3133\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/media\/3136"}],"wp:attachment":[{"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/media?parent=3133"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/categories?post=3133"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/devblogs.microsoft.com\/cosmosdb\/wp-json\/wp\/v2\/tags?post=3133"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}