Building a Near-Real Time (NRT) Data Pipeline using Debezium, Kafka, and Snowflake

Authors: Arun Kumar Ponnurangam, Karunakar Goud

Institutional investors in real estate usually require several discussions to finalize their investment strategies and goals. They need to acquire properties on a large scale and at a fast pace. To facilitate this, the pipeline must be refreshed in near-real-time with properties that have recently come on to the market.

With this business use case, we worked to get home listing data to the operational data store (ODS), PostgreSQL, and sync them to the cloud data warehouse, Snowflake.

We solve the first part of the challenge —collecting data about new property listings— by using a real estate data aggregator called Xome to exchange data and load them into the ODS.

Next, we feed the properties in the ODS in near-real-time (NRT) to the Snowflake data warehouse. An analytics engine filters and selects homes based on the buy-box criteria set by investors, enriched by supporting data such as nearby schools and their ratings; neighborhood crime levels; proximity to healthcare facilities and public transportation, etc. The analytics engine then ranks the properties based on the cap rate, discount, and yield.

The property ranks are sent back into the ODS, giving underwriters a prioritized list based on their ranking. Underwriters can adjudicate risks, calculate financials like the target offer price, renovation cost, and estimated returns, and store their results in the same ODS.

Here is how we built the NRT pipeline from the Amazon Web Services (AWS) Postgres data source to Snowflake. Our solution:

– Uses the database log as the seed for transferring data, as it is minimally invasive to production systems;
– Employs Debezium, an open-source connector that listens for changes in log files and records them as consumable events;
– Transfers events reliably using Kafka, the distributed messaging system;
Connects Kafka to Snowflake directly and writes to Snowflake using Snowpipe, stages, files, and tables; and
– Schedules Snowflake tasks to merge the final data sets to the target table.

Solution architecture demonstrating the high-level flow and relationship between components

Here, step by step is how to do it:

A. Configure PostgreSQL in AWS RDS

1. To capture DML changes that persist in the database, set the Write-Ahead-Log (WAL) level to logical.

Create a new parameter group and set the value of rds.logical_replication to 1.
Modify the database instance to associate to this customized parameter group.

2. Log into PostgreSQL and check the WAL level.

SHOW wal_level

It should be set to logical.

3. Create a replication slot to stream the sequence of changes from the database.
The built-in logical decoding process extracts DML changes from the database log into a row format that is easy to understand.

SELECT * FROM pg_create_logical_replication_slot(, ‘wal2json’);

B. Configure the Debezium and Kafka cluster

We use Debezium and Kafka in a cluster as the event messaging system that reads data from the database logs and loads them into Snowflake.

To demonstrate this use case, we have selected the minimum hardware requirements to execute this pipeline for a sample of records. To extend this to cluster size requirements for production data, please refer to the product documentation.

 

1. Prepare the hardware

For connector nodes, we use memory-optimized machines; for Kafka brokers, CPU-optimised machines with high storage capacity.
Install Java and open-source Kafka on all the machines and set up a $KAFKA_HOME directory.

2. Set up the Kafka-PostgreSQL Connector

This node connects to PostgreSQL and decodes the database log using the Debezium connector, returning events in JSON format. The node requires several JAR files to be downloaded from the Maven repository.

There are four configuration steps involved:

Config 1

cd $KAFKA_HOME
mkdir source_jars
cd source_jars
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.9.2/avro-1.9.2.jar
wget https://packages.confluent.io/maven/io/confluent/common-utils/5.4.0/common-utils-5.4.0.jar
wget https://repo1.maven.org/maven2/io/debezium/debezium-core/0.9.5.Final/debezium-core-0.9.5.Final.jar
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/0.9.5.Final/debezium-connector-postgres-0.9.5.Final.jar
wget https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar
wget http://www.java2s.com/Code/JarDownload/jackson-all/jackson-all-1.7.4.jar.zip
wget http://www.java2s.com/Code/JarDownload/jdbc/jdbc-2.0-sources.jar.zip
wget https://maven.repository.redhat.com/earlyaccess/all/io/confluent/kafka-avro-serializer/5.3.0/kafka-avro-serializer-5.3.0.jar
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.7/postgresql-42.2.7.jar

Config 2
In the $KAFKA_HOME/config directory, create a new file called postgres-kafka-connector.properties and establish a connection to Postgres to capture changed data.

There are multiple options to control the Debezium connector. Please consult the product documentation for more details.

For example:

name=postgres-debezium-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname= database.port=5432
database.user=postgres
database.password= ********
database.dbname=postgres
database.server.name= suppyTopic
#This appears as prefix Kafka
topicschema.whitelist = supply
#Provide schema to sync data
fromplugin.name=wal2jsonslot.name=
#Provide Postgres replication slot name
snapshot.fetch.size = 1000

Config 3

Set Classpath and execute

export CLASSPATH=$KAFKA_HOME/source_jars/
#Execute below from $KAFKA_HOME
bin/connect-standalone.sh config/postgres-supply.properties config/postgres-kafka-connect-json.properties

3. Turn on Zookeeper

By default, Zookeeper runs on its localhost with Port ID 2181.

Start the Zookeeper process from $KAFKA_HOME.

bin/zookeeper-server-start.sh config/zookeeper.properties

4. Set up the Kafka brokers

In contrast to related technologies like Apache Spark, Hadoop, etc., Kafka does not use a master/slave concept for its brokers: all the brokers transparently work out how to coordinate amongst themselves.

Within the $KAFKA_HOME/config directory, find the template file called server.properties and edit the two Kafka brokers as follows:

Config 1: KAFKA broker 1, Port no and ZooKeeper address
server1.propertieslisteners=PLAINTEXT:<Public IP address of this Kafka broker 1>:9093
zookeeper.connect=<Private IP address of Zookeeper>:2181
zookeeper.connection.timeout.ms=6000 Config 2: KAFKA broker 2, Port no and ZooKeeper address
server2.propertieslisteners=PLAINTEXT:<Public IP address of this Kafka broker 2>:9094
zookeeper.connect=<Private IP address of Zookeeper>:2181
zookeeper.connection.timeout.ms=6000

Start the Kafka brokers by running the following commands from KAFKA_HOME.

#Run this from Kafka broker 1
bin/kafka-server-start.sh config/server1.properties 

#Run this from Kafka broker 2
bin/kafka-server-start.sh config/server2.properties

With this setup, the two Kafka brokers will transfer data from Postgres in topics, with one topic for each source table.

C. Set up the Snowflake connector

This node reads data from each Kafka topic and writes them to Snowflake. Internally, it uses Snowflake stages and Snowpipe to sync the data to the Snowflake tables.

There are four configuration steps involved:

Config 1

Download all the dependent JARs, including the Snowflake-Kafka connector, from the Maven repository and save them under a new directory, $KAFKA_HOME/sink_jars.

cd $KAFKA_HOME
mkdir sink_jars
wgethttps://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar
wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar
wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.1.0/snowflake-kafka-connector-1.1.0.jar
wget https://packages.confluent.io/maven/io/confluent/common-utils/5.4.0/common-utils-5.4.0.jar

Config 2

In $KAFKA_HOME/config/connect-standalone.properties, provide the details of the Kafka broker and its port.

Config 2: connect-standalone.properties
#Provide Kafka server details under this propertybootstrap.servers=ec2-X-YY-ZZZ-XXX.us-east-2.compute.amazonaws.com:9093,ec2-X-YY-ZZZ-XXX.us-east-2.compute.amazonaws.com:9092

Config 3

In $KAFKA_HOME/config, create a new file called kafka-snowflake-connect-json.properties. In this file, we tag each Kafka topic to its corresponding table in Snowflake, like this:

snowflake.private.key.passphrase=<Password>#to connect Snowflake#Database and schema configuration
snowflake.database.name=<Target database in Snowflake>
snowflake.schema.name=<Target Schema in Snowflake> #Data format configuration
key.converter = org.apache.kafka.connect.storage.StringConverter
value.converter = com.snowflake.kafka.connector.records.SnowflakeJsonConverter #Provide a map from Kafka topic to table in Snowflake
#We have two tables here. 1. Supply 2. BuyboxtopicsBuyboxtopics=postgresRDS.supply.supply,postgresRDS.supply.buybox
snowflake.topic2table.map=postgresRDS.supply.supply:dbz_supply,postgresRDS.supply.buybox:dbz_buybox

Config 4

Set Classpath and execute.

export CLASSPATH=$KAFKA_HOME/sink_jars/ 

#Execute below from $KAFKA_HOME

bin/connect-standalone.sh config/connect-standalone.properties
config/kafka-snowflake-connect-json.properties

With this setup, data from the Kafka topics get loaded to the Snowflake target tables.

For example, the SUPPLY table that contains the list of homes in PostgreSQL will look like this in Snowflake:

The table has only two JSON columns:

– Record_Metadata: JSON-formatted primary key column
– Record_Content: JSON-formatted actual row values

Real-time refresh

We want additions, deletions, and changes to the original data to flow down to Snowflake in real-time, or near-real-time. To achieve this, and to track the changes from original to updated data set, we use the following payload code:

“payload”: {
“after”: {
“actual_photos_count”: null,
“additional_home_details”: null,
“address1”: “8146 Lesourdsville West Chester”,
“bathrooms”: 1,
“bedrooms”: 2,
“census_tract_id”: “390170111262013”,
“city”: “West Chester”,
“close_date”: null,
“close_price”: null,
“country”: “US”,
“created_on_utc”: 1581089444809073,
“latitude”: 39.3491554260254,
“laundry_features”: null,
******DELETED ROWS to have few columns
},
“before”: null,
“op”: “r”,
“source”: {
“connector”: “postgresql”,
“db”: “postgres”,
“last_snapshot_record”: false,
“lsn”: 309438972864,
“name”: “postgresRDS”,
“schema”: “supply”,
“snapshot”: true,
“table”: “supply”,
“ts_usec”: 1582796538603000,
“txId”: 5834,
“version”: “0.9.5.Final”,
“xmin”: null
},
“ts_ms”: 1582796538603
}

The payload data structure has four event types:

– R: initial data extract
– C: inserts
– U: updates
– D: deletes

It holds the actual data in JSON nodes before and after.

Debezium Postgres payload data and event types

 

D. Create views in Snowflake

Snowflake natively supports JSON structured data. We can parse and normalize data from the table into columns simply by using database views in Snowflake.

Create a view to parse inserts, updates, and snapshots

CREATE OR REPLACE VIEW DBZ_SUPPLY_I_U_VIEW as
SELECT –Get contents from After JSON node for snapshot, inserts and updates ID is the primary keyrecord_content:”payload”.”after”.”id”::FLOAT as id
,record_content:”payload”.”after”.”actual_photos_count”::FLOAT as actual_photos_count
,record_content:”payload”.”after”.”additional_home_details”::STRING as additional_home_details
,record_content:”payload”.”after”.”address1″::STRING as address1
,record_content:”payload”.”after”.”bathrooms”::VARIANT as bathrooms
,record_content:”payload”.”after”.”bedrooms”::VARIANT as bedrooms
,record_content:”payload”.”after”.”census_tract_id”::STRING as census_tract_id
,record_content:”payload”.”after”.”city”::STRING as city
,record_content:”payload”.”after”.”close_date”::STRING::DATE as close_date
,record_content:”payload”.”after”.”close_price”::VARIANT as close_price
,record_content:”payload”.”after”.”country”::STRING as country
,record_content:”payload”.”after”.”created_on_utc”::STRING::TIMESTAMP_NTZ as created_on_utc
,record_content:”payload”.”after”.”latitude”::STRING as latitude
,record_content:”payload”.”after”.”longitude”::STRING as longitude
,record_content:”payload”.”after”.”laundry_features”::STRING as laundry_features–Get additional fields, about timestamp when debezium captured data, when postgres applied that transaction,REGEXP_REPLACE(record_content:”payload”.”op”, ”) as dml_operator ,
, TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:”payload”.”ts_ms”, ”)) as debezium_processed_ts
, TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:”payload”.”source”.”ts_usec”, ”)) as source_processed_ts
, REGEXP_REPLACE(record_content:”payload”.”source”.”name”, ”) as source_server
, REGEXP_REPLACE(record_content:”payload”.”source”.”db”, ”) as source_db
, REGEXP_REPLACE(record_content:”payload”.”source”.”table”, ”) as source_table
, REGEXP_REPLACE(record_content:”payload”.”source”.”schema”, ”) as source_schemaFROM <Database>.<Schema>.DBZ_SUPPLY
WHERE lower(DML_OPERATOR) in (‘r’,’c’,’u’);

Create a view to parse deletes

CREATE OR REPLACE VIEW DBZ_SUPPLY_D_VIEW as
SELECT –Get contents from before JSON node for snapshot, inserts and updates. ID is the primary keyrecord_content:”payload”.”before”.”id”::FLOAT as id
,record_content:”payload”.”before”.”actual_photos_count”::FLOAT as actual_photos_count
,record_content:”payload”.”before”.”additional_home_details”::STRING as additional_home_details
,record_content:”payload”.”before”.”address1″::STRING as address1
,record_content:”payload”.”before”.”bathrooms”::VARIANT as bathrooms
,record_content:”payload”.”before”.”bedrooms”::VARIANT as bedrooms
,record_content:”payload”.”before”.”census_tract_id”::STRING as census_tract_id
,record_content:”payload”.”before”.”city”::STRING as city
,record_content:”payload”.”before”.”close_date”::STRING::DATE as close_date
,record_content:”payload”.”before”.”close_price”::VARIANT as close_price
,record_content:”payload”.”before”.”country”::STRING as country
,record_content:”payload”.”before”.”created_on_utc”::STRING::TIMESTAMP_NTZ as created_on_utc
,record_content:”payload”.”before”.”latitude”::STRING as latitude
,record_content:”payload”.”before”.”longitude”::STRING as longitude
,record_content:”payload”.”before”.”laundry_features”::STRING as laundry_features–Get additional fields, about timestamp when debezium captured data, when postgres applied that transaction
,REGEXP_REPLACE(record_content:”payload”.”op”, ”) as dml_operator, TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:”payload”.”ts_ms”, ”)) as debezium_processed_ts
, TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:”payload”.”source”.”ts_usec”, ”)) as source_processed_ts
, REGEXP_REPLACE(record_content:”payload”.”source”.”name”, ”) as source_server
, REGEXP_REPLACE(record_content:”payload”.”source”.”db”, ”) as source_db
, REGEXP_REPLACE(record_content:”payload”.”source”.”table”, ”) as source_table
, REGEXP_REPLACE(record_content:”payload”.”source”.”schema”, ”) as source_schemaFROM <Database>.<Schema>.DBZ_SUPPLY
WHERE lower(DML_OPERATOR) in (‘d’);

You can automate the creation of these views using the Information Schema tables in Snowflake. To create stored procedures to automatically create these views for all tables involved in the data pipeline, please refer to the product documentation.

E. Merge the data into the target table

Using the two views, DBZ_SUPPLY_I_U_VIEW and DBZ_SUPPLY_D_VIEW, as the source, you can merge data to the final target table, SUPPLY, using the SQL merge command.

To automate this using Snowflake Tasks:

CREATE TASK SUPPLY_MERGE
WAREHOUSE <WAREHOUSE_NAME>
SCHEDULE 5 MINUTE
ASmerge into <Database>.<Tgt_Schema>.DBZ_SUPPLY as tgt
using <Database>.<Src_Schema>.DBZ_SUPPLY_I_U_VIEW as src
on tgt.id =src.id –Deletes
when matched AND src.dml_operator=’d’ THEN DELETE –Updates
when matched AND src.dml_operator=’u’ then
update set tgt.ACTUAL_PHOTOS_COUNT =src.ACTUAL_PHOTOS_COUNT
,tgt.ADDRESS1 =src.ADDRESS1
,tgt.BATHROOMS =src.BATHROOMS
,tgt.BEDROOMS =src.BEDROOMS
,tgt.CENSUS_TRACT_ID =src.CENSUS_TRACT_ID
,tgt.CITY =src.CITY
,tgt.CLOSE_DATE =src.CLOSE_DATE
,tgt.CLOSE_PRICE =src.CLOSE_PRICE
,tgt.COUNTRY =src.COUNTRY
,tgt.CREATED_ON_UTC =src.CREATED_ON_UTC
,tgt.LATITUDE =src.LATITUDE
,tgt.LONGITUDE =src.LONGITUDE
,tgt.LAUNDRY_FEATURES =src.LAUNDRY_FEATURES–Inserts
when not matched and src.dml_operator in (‘c’,’r’) then
insert (ID, ACTUAL_PHOTOS_COUNT ,ADDRESS1 ,BATHROOMS ,BEDROOMS ,
CENSUS_TRACT_ID ,CITY ,CLOSE_DATE ,CLOSE_PRICE ,COUNTRY ,
CREATED_ON_UTC ,LATITUDE , LONGITUDE,LAUNDRY_FEATURES )
values (src.ID ,src.ACTUAL_PHOTOS_COUNT ,src.ADDRESS1 ,
src.BATHROOMS ,src.BEDROOMS ,src.CENSUS_TRACT_ID ,src.CITY ,
src.CLOSE_DATE ,src.CLOSE_PRICE ,src.COUNTRY ,src.CREATED_ON_UTC ,
src.LATITUDE ,src.LAUNDRY_FEATURES ,src.LONGITUDE )

This task is configured to execute every five minutes.

You can monitor it using the task history:

select *
from table(information_schema.task_history())
order by scheduled_time;

The NRT pipeline is complete!

F. Things to keep in mind

When attempting to set up an NRT to respond to your own use case, here are a few caveats:

– All sources tables must contain the primary key to propagate DML changes. If you create tables without a primary key, be sure to request the source database administrator or application team to set one up for you using the data elements in that table.

If you are still unable to include a primary key, write a separate data pipeline to perform a full data load for your tables.

– PostgreSQL wal2Json database logs don’t track DDL changes (for example, new column additions and deletions).

However, the payload data available in JSON will contain values for recently added columns. To identify DDL changes within a given timeframe, you will need to code a separate process to use database metadata tables and/or query history to scan and capture DDL changes.

These events must be pre-processed before merging the usual DML changes on to the Snowflake data warehouse.

G. Success!

This CDC-based solution reduced the waiting time for new listings posted from a daily batch window to under 30 minutes, after which the analytics engine ranked the listings and pushed them to the queue using the investors’ criteria.

Underwriters could review the listings, estimate values, and successfully meet their target of completing 1,000 single-family home acquisitions for a large investor in a very short time.

Setting up the NRT data pipeline involves configuring multiple systems to talk to each other. If set up correctly, these components will work well together to handle this and many other use cases.

Gathering and compiling data from multiple sources and making them usable in a short time is often the greatest challenge to be overcome to get value from business analytics. Write to [email protected] so that we can help.

Tags:
0 Comments

Leave a reply

Your email address will not be published. Required fields are marked *

*

©2017 Tiger Analytics. All rights reserved.

Log in with your credentials

Forgot your details?