Data Ingestion In Snowflake Using Kafka

1. OVERVIEW AND ARCHITECTURE

➢ APACHE KAFKA:

  • Apache Kafka software uses a publish-and-subscribe model, like a message queue or business communication system, to write and read streams of records.
  • An application posts messages to a topic, and the application subscribes to that topic to receive those messages.
  • Topics can be divided into sections to increase scalability.

➢ KAFKA CONNECT:

  • Kafka Connect is a framework for connecting Kafka to external systems, including databases.
  • A Kafka Connect cluster is a standalone Kafka cluster.
  • Kafka Connect area supports the use and scaling of connectors (components that support reading and/or writing between external systems).

➢ SNOWFLAKE CONNECTORS:

  • Snowflake offers two variants of the connector:
    • One version is based on the Confluent package of Kafka.
    • The other version utilizes the open-source software (OSS) Apache Kafka package.
  • From Snowflake’s viewpoint, a Kafka topic generates a continuous stream of rows that are intended to be inserted into a Snowflake table. Generally, each Kafka message corresponds to one row in the table.
  • In Snowflake, a single topic is responsible for providing messages (rows) destined for one specific Snowflake table.
  • The current iteration of the Kafka connector is designed exclusively for loading data into Snowflake.

➢ TARGET TABLES FOR KAFKA TOPICS:

  • Kafka topics can be associated with pre-existing Snowflake tables through the Kafka configuration.
  • In the event that topics are not mapped, the Kafka connector generates a fresh table for each topic, using the topic name.
  • The rules for forming table names in the absence of explicit names are as follows:
    • Lowercase topics are converted to uppercase table names.
    • If the topic name starts with A-Z, a-z, or an underscore (_), it is directly used as the table name. Otherwise, the connector adds an underscore as a prefix to the table name
    • Any illegal characters in the topic name are replaced with underscores (_).
  • Additionally, Kafka topics can be mapped to existing Snowflake tables through the Kafka configuration. If no mapping exists, the Kafka connector automatically creates a new table for each topic, appending an underscore followed by a generated hash code to the table name
  • The structure of tables created for Kafka topics includes:
    • RECORD_CONTENT: This contains the Kafka message itself.
    • RECORD_METADATA: It stores metadata in JSON format, providing information about the message, such as the topic, partition, offset, createTime/LogAppendTime, key, and schema_id, along with headers.

➢ ARCHITECTURE AND WORKFLOW:

  • Utilizing the configuration details from the Kafka configuration file or command line (or the Confluent Control Center in the case of Confluent), the Kafka connector establishes subscriptions to one or more Kafka topics.
  • For every subscribed topic, the connector generates the following entities:
    • An internal stage, designated to temporarily store data files related to that specific topic.
    • A pipe, specifically created to ingest the data files corresponding to each partition of the topic
    • A distinct table, dedicated to holding the data for that particular topic
  • Architecture diagram:
    • One or more applications publish JSON or Avro records to a Kafka cluster, and these records are subsequently distributed across one or more topic partitions
    • The Kafka connector is responsible for collecting messages from the Kafka topics and buffering them. Upon reaching a specified threshold (time, memory, or number of messages), the connector proceeds to write the accumulated messages into a temporary file stored in the internal stage. Once the temporary file is ready, the connector triggers Snowpipe, which proceeds to ingest the data by copying a pointer to the data file into a queue
    • A virtual warehouse, provided by Snowflake, takes charge of loading the data from the staged file into the designated target table (the table specified in the configuration file for the relevant topic) via the pipe associated with the Kafka topic partition.
    • To ensure data integrity, the connector actively monitors Snowpipe’s operations, promptly deleting each file from the internal stage after verifying successful data loading into the target table.
  • In the event of a failure preventing successful data loading, the connector takes corrective measures by moving the file into the table stage and producing an error message to report the issue
b75d87 9ea4c0364b5b4db38f5c1ee951cdd532mv2

➢ FAULT TOLERANCE:

  • Both Kafka and the Kafka Connector exhibit fault-tolerant characteristics, ensuring high reliability in data processing.
  • Messages undergo a meticulous process to prevent duplicates and data loss. Snowpipe’s data deduplication logic efficiently eliminates redundant copies of repeating data, with very few exceptions.
  • When Snowpipe detects an error while loading a record, the record is not inserted into the system. Instead, it is diverted to a table stage for further handling.
  • However, it’s essential to be aware of certain limitations regarding fault tolerance with the Connector:
    • Changes made to messages in the Kafka topic, such as deletions or updates, may not be automatically reflected in the corresponding Snowflake table.
    • The default retention time for messages is set to seven days. If the system remains offline for more than this retention period, expired records will not be loaded. Additionally, if Kafka’s storage space limit is exceeded, some messages may not be delivered.
  • Performance tuning is crucial for optimal system efficiency, and it involves:
    • Adjusting the number of nodes in the Connect cluster to find the right balance for the workload.
    • Optimizing the allocation of tasks to the connector, ensuring efficient resource utilization.
    • Understanding and considering the impact of network bandwidth between the connector and the Snowflake deployment to ensure smooth data flow.

2. ENVIRONMENT SETUP: ON-PREMISE

A. UBUNTU SETUP (FOR WINDOWS ONLY):

  • Goto Start -> Turn Windows Feature on or off
  • In Windows Feature Box, Scroll Down to “Windows Subsystem for Linux” and enable it by checking the box.
1
  • Goto Microsoft Store -> Download and Install Ubuntu LTS (Any Ver.)
2
  • Restart the PC.
 
B. JAVA SETUP:
  • Install Java-8:
     
    • $ sudo apt-get install openjdk-8-jdk (Might require apt-get update, apt-get upgrade )

      Note: Complete Kafka Service Setup before Steps ii & iii
  • Download Bouncy Castle Libraries(OPTIONAL: Required for Encrypted Private Key):

    • Get the latest version of JAR file for bc-fips
    • Use wget with the link to the jar file from the above step
      • $ wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.2.3/bc-fips-1.0.2.3.jar
    • Get the latest version of JAR file for bcpkix-fips
    • Use wget with the link to the jar file from the above step.

      • $ wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar
  • Move .jar files to libs folder(OPTIONAL: Required for Encrypted Private Key):
    • Use the Move command to mv downloaded bc-fips file
      • $ mv bc-fips-1.0.2.3.jar kafka/libs
    • Use the Move command to mv downloaded bcpkix-fips file.
      • $ mv bcpkix-fips-1.0.3.jar kafka/libs

C. RSA KEY SETUP:

  • Encrypted Private Key
      • $ openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
4
  • Encrypted Public Key
      • $ openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
5
  • RSA File Check
      • $ ls -> Check if rsa_key.p8 and rsa_key.pub is listed
7
  • [IMPORTANT] Copy the Content of rsa_key.pub in one line (No line space. For the next step.
      • –> vim rsa_key.pub
8

 

D. SNOWFLAKE ACCOUNT SETUP:

  • Database, Schema, Warehouse, and Role Setup
    • se the below Script:
9
10
  • Grants and Privileges on created objects
11
  • Create a User and Assign a Role
12

3. KAFKA SERVER SETUP (LINUX AND WINDOWS)

  • Kafka Service Setup:
    • Download Kafka Service from Apache Kafka Site:

Note: Kafka Version 2.8.1 and above doesn’t require Zookeeper Service.

13
      • Navigate to the preferred Kafka Service version.
14
      • Right-click any of the Kafka files ending with .tgz (extension) and Copy the Link Address
15
      • In Windows, Open ubuntu(From Start Menu) -> cd Kafka-Server -> wget <Copied_link_address> -> Enter
16
    • Extract Kafka Tarball(tgz):
      • In Kafka-Server Folder, run the list command to get the kafka-tarball filename       
        • $ ls
17
      • Extract the tarball from the folder
        • $ tar -xvzf <kafka_file_name(ending with .tgz
18
      • Rename Extracted Kafka Directory.
        • $ mv <kafka_directory> kafka
19
  • Zookeeper Service Setup (OPTIONAL: FOR KAFKA VERSION BELOW 2.8.1):
    • Download Zookeeper Service from Apache Zookeeper Site:
      • Get the latest TGZ for Zookeeper from Apache Zookeeper site.
      • Navigate to the preferred Zookeeper Service version.
20

Right-click any of the Zookeeper files ending with .tgz (extension) and Copy the Link Address

21
      • In Windows, Open ubuntu(From Start Menu) -> cd Kafka-Server -> wget <Copied_link_address
 
 
22
    • Extract Zookeeper Tarball(tgz):
      • In Kafka-Server Folder, run the list command to get the zookeeper-tarball filename

        • $ ls

23
    • Extract the tarball from the folder.
      • $ tar -xvzf <zookeeper_file_name(ending with .tgz)>
24
    • Rename Extracted Zookeeper Directory:
      • $ mv <zookeeper_directory> zookeeper
    • Zookeeper Configuration (Use Default):
      • $ mv zookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg

4. SNOWFLAKE KAFKA CONNECTOR:

  • Download Snowflake Kafka Connector Jar:
      •  $ wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.5.0/snowflake-kafka-connector-1.5.0.jar
  • Move the Jar to the kafka/libs directory:
      • $ mv snowflake-kafka-connector-1.5.0.jar kafka/libs
  • Configuring Kafka Snowflake Connector:
    • Create the Configuration file:
      • $ vi kafka/config/snowflake-kafka-connector.properties
    • Add the following configuration with changes as required
30
31
    • Save the File. For vi editor, esc -> Shift + : -> wq -> Enter

5. OPERATION CHECK: KAFKA PRODUCER AND CONSUMER

Note: For Operation, we’ll need multiple terminals in both Linux and Windows

  • Window 1: Start Zookeeper Service
      • $ kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties
  • Window 2: Start Kafka Service
      • $ kafka/bin/kafka-server-start.sh kafka/config/server.properties
  • Window 3: Sample File
    • Create a sample JSON file
      • $ vi car.json
    • Paste the below data in the file
32
    • Save and Exit. For vi editor, esc -> Shift + :        -> wq   -> Enter
  • Window 3: Kafka Topic
    • Create Kafka Topic with 1 Partition:
      • $ kafka/bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 — partitions 1 –topic car(For Kafka-2.8.1 and below)

                                                          Or

      • $ kafka/bin/kafka-topics.sh –create –topic car –replication-factor 1 –partitions 1 –bootstrap-s erver localhost:9092 (For Kafka-2.8.1 and above)
    • Verify Topic Creation
      • $ kafka/bin/kafka-topics.sh –list –zookeeper localhost:2181 (For Kafka-2.8.1 below)

                                                            Or

      • $ kafka/bin/kafka-topics.sh –list –bootstrap-server localhost:9092 (For Kafka-2.8.1 and above)
  • Window 4: Kafka Console Producer

      • $ kafka/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic car < car.json
  • Window 5: Kafka Console Consumer
      • $ kafka/bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic car –from beginning
  • Clean-Up
    • Ctrl + C, to stop kafka Consumer
    • Close Windows 4 and 5.

6. OPERATION EXECUTION: KAFKA TO SNOWFLAKE INGESTION

  • Run jps to check service following services are running:
    • QuorumPeerMain (Zookeeper)
    • Kafka
    • ConnectStandalone (Kafka-Connect Task)
  • Run Kafka Connect Standalone
      • $ kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties kafka/config/snowflake-kafka-connector.properties
  • Log in to Snowflake.
    • Verify data is inserted in Mentioned Table

7. OPERATION PROBLEM

  • Producer Error: Request Error
    • Resolution 1: Update kafka/config/producer.properties by adding the below line’
      • Max.request.size=101214400
    • Then run the below command:
      • $ kafka /bin/kafka-console-producer.sh –broker-list localhost:9092 –topic car < car.json — producer.config kafka/config/producer.properties
    • Resolution 2: Use the below command to upload the file
      • $ kafka /bin/kafka-console-producer.sh –bootstrap-server localhost:9092 –topic car < car.json –producer-property max.request.size=101214400
  • Topic Error: Max Message Size
    • Resolution: Run the Below command on the created topic
      • $ kafka/bin/kafka-configs.sh –bootstrap-server localhost:9092 –alter –entity-type topics –entity name car –add-config max.message.bytes=101214400
    • Kafka Error: On Restart, Kafka Service or Zookeeper Service throws error.
      • Resolution: Run the Below commands, post which Kafka the topic needs to be created again, and all the properties on the topic must be altered again
        • $ rm -rf /tmp/kafka-logs/
        • $ rm -rf /tmp/zookeeper/

8. CONCLUSION

In this article, we have seen:

  • Kafka and the Snowflake Kafka Connector: An Overview
    • Kafka is a distributed event streaming platform used to handle real-time data streams efficiently. On the other hand, the Snowflake Kafka Connector is a tool that facilitates the ingestion of JSON data into Snowflake, a cloud-based data warehousing platform. In this context, we will explore the architecture and workflow for ingesting JSON data into Snowflake using Apache Kafka on a local machine running either Windows or Linux Operating System.
  • The Architecture and Workflow:
    • We will start by understanding the architecture and workflow involved in ingesting JSON data into Snowflake through Kafka. This process ensures seamless data flow with fault tolerance and performance optimization for Kafka.
  • Setting Up the Environment:
    • To proceed, we need to set up the environment based on the version of Kafka we are using and the Snowflake environment. This step involves configuring the necessary settings to enable smooth data transmission between the two platforms.
  • Configuring the Snowflake Kafka Connector:
    • With the environment set up, we will proceed to configure the properties files required for the Snowflake Kafka Connector. These files are essential for establishing the connection between Kafka and Snowflake, enabling data transfer.
  • Using the Producer Console:
    • Once the environment and services are up and running, we will learn how to use the Producer console to send messages to the Consumer console. This communication will ensure that data is flowing correctly from the source to the destination
  • Publishing a JSON File:
    • In practice, we will publish a JSON file using the Standalone Producer Console, which will use the properties file configured earlier. We will confirm that the JSON data is successfully published to the corresponding Snowflake table specified in the configuration.
  • Handling Possible Problems
    • Lastly, we will address potential challenges that may arise during the operations and explore their resolutions. This troubleshooting process will ensure a smooth and reliable data ingestion flow between Kafka and Snowflake

The focus of this article was primarily on utilizing the no-code Apache Kafka service. However, in many real-time data ingestion scenarios, custom producers and consumers are often coded in various programming languages such as Python, Java, etc., providing vast possibilities for data ingestion. While the no-code approach allows ingestion of TEXT and AVRO files, CSV files can also be ingested with a workaround (Reference link attached)

9. REFERENCES:

  • CSV Data Ingestion:
    • https://medium.com/streamthoughts/streaming-data-into-kafka-s01-e01-loading-csv-file-8ea053b232cb
  • Apache Kafka Documentation:
    • https://kafka.apache.org/documentation/
  • Configuration to improve performance:
    • https://towardsdatascience.com/10-configs-to-make-your-kafka-producer-more-resilient-ec6903c63e3f

Add a Comment

Your email address will not be published. Required fields are marked *