Data processing is at the core of any data architecture. It involves transforming raw data into useful insights through analysis techniques like machine learning algorithms or statistical models depending on what type of problem needs solving within an organization’s context. 

We have seen in the past posts that raw data, already extracted from data sources, can be prepared and transformed (using Redis Gears) into the target format required by the downstream systems. In this post, we push this concept further by coupling the event-processing of RedisGears and stream-based ingestion using Redis Data Integration (RDI). Thus, you can imagine that data flowing in your operational systems (e.g., ERPs, CRMs…) will be ingested into Redis using a Change Data Capture (see Data & Redis - part 1) and processed with RedisGears to derive rapid operational decisions in near real-time. 

In fact, Redis Data Integration is not only a data integration tool but also a data processing engine that relies on Redis Gears. Therefore, it provides a more straightforward way to implement data transformations (declarative files) to avoid the complexity of Redis Gears.

Pre-requisites

1 - Create a Redis Database 

For this article, you need to install and set up a few things. First, you need to prepare a Redis Enterprise Cluster, which is the target storage support. This storage support will be the target infrastructure for the data transformed in this stage. You can use this project to create a Redis Enterprise cluster in the cloud provider of your choice.

Once you have created a Redis Enterprise cluster, you must create a target database that holds the transformed data. Redis Enterprise Software lets you create and distribute databases across a cluster of nodes. To create a new database, follow the instructions here

For Redis Data Integration, you need two databases: the config database exposed on redis-12000.cluster.redis-process.demo.redislabs.com:12000 and the target database on: redis-13000.cluster.redis-process.demo.redislabs.com:13000. Don’t forget to add the RedisJSON module when you create the target database.

2 - Install RedisGears

Now, let’s install RedisGears on the cluster. In case it’s missing, follow this guide to install it. 

1
2
curl -s https://redismodules.s3.amazonaws.com/redisgears/redisgears.Linux-ubuntu20.04-x86_64.1.2.5.zip -o /tmp/redis-gears.zip
curl -v -k -s -u "<REDIS_CLUSTER_USER>:<REDIS_CLUSTER_PASSWORD>" -F "module=@/tmp/redis-gears.zip" https://<REDIS_CLUSTER_HOST>:9443/v2/modules

3 - Install Redis Data Integration (RDI)

For the second part of this article, you will need to install Redis Data Integration (RDI). Redis Data Integration installation is done via the RDI CLI. The CLI should have network access to the Redis Enterprise cluster API (port 9443 by default). You need first to download the RDI offline package:

UBUNTU20.04

wget https://qa-onprem.s3.amazonaws.com/redis-di/latest/redis-di-offline-ubuntu20.04-latest.tar.gz -O /tmp/redis-di-offline.tar.gz

UBUNTU18.04

wget https://qa-onprem.s3.amazonaws.com/redis-di/latest/redis-di-offline-ubuntu18.04-latest.tar.gz -O /tmp/redis-di-offline.tar.gz

RHEL8

wget https://qa-onprem.s3.amazonaws.com/redis-di/latest/redis-di-offline-rhel8-latest.tar.gz -O /tmp/redis-di-offline.tar.gz

RHEL7

wget https://qa-onprem.s3.amazonaws.com/redis-di/latest/redis-di-offline-rhel7-latest.tar.gz -O /tmp/redis-di-offline.tar.gz 

Then Copy and unpack the downloaded redis-di-offline.tar.gz into the master node of your Redis Cluster under the /tmp directory:

tar xvf /tmp/redis-di-offline.tar.gz -C /tmp

Then you install the RDI CLI by unpacking redis-di.tar.gz into /usr/local/bin/ directory:

sudo tar xvf /tmp/redis-di-offline/redis-di-cli/redis-di.tar.gz -C /usr/local/bin/

Run create command to set up the Redis Data Integration config database (on port 13000) instance within an existing Redis Enterprise Cluster:

redis-di create --silent --cluster-host <CLUSTER_HOST> --cluster-user <CLUSTER_USER> --cluster-password <CLUSTER_PASSWORD> --rdi-port <RDI_PORT> --rdi-password <RDI_PASSWORD> --rdi-memory 512

Finally, run the scaffold command to generate configuration files for Redis Data Integration and Debezium Redis Sink Connector:

redis-di scaffold --db-type <cassandra|mysql|oracle|postgresql|sqlserver> --dir <PATH_TO_DIR>

In our article, we will capture a SQL Server database, so choose (sqlserver). The following files will be created in the provided directory:

├── debezium
│   └── application.properties
├── jobs
│   └── README.md
└── config.yaml
  • config.yaml - Redis Data Integration configuration file (definitions of target database, applier, etc.)
  • debezium/application.properties - Debezium Server configuration file
  • jobs - Data transformation jobs, read here

To use debezium as a docker container, download the debezium Image

wget https://qa-onprem.s3.amazonaws.com/redis-di/debezium/debezium_server_2.1.1.Final_offline.tar.gz -O /tmp/debezium_server.tar.gz

and load it as a docker image.

docker load < /tmp/debezium_server.tar.gz

Then tag the image:

1
2
docker tag debezium/server:2.1.1.Final_offline debezium/server:2.1.1.Final
docker tag debezium/server:2.1.1.Final_offline debezium/server:latest

For the non-containerized deployment, you need to install Java 11 or Java 17. Then download Debezium Server 2.1.1.Final from here.

Unpack Debezium Server:

tar xvfz debezium-server-dist-2.1.1.Final.tar.gz

Copy the scaffolded application.properties file (created by the scaffold command) to the extracted debezium-server/conf directory. Verify that you’ve configured this file based on these instructions.

If you use Oracle as your source DB, please note that Debezium Server does not include the Oracle JDBC driver. You should download it and locate it under the debezium-server/lib directory:

1
2
cd debezium-server/lib
wget https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/21.1.0.0/ojdbc8-21.1.0.0.jar

Then, start Debezium Server from the debezium-server directory:

./run.sh

Data processing using Redis Data Integration

Data transformation is a critical part of the data journey. This process can perform constructive tasks such as adding or copying records and fields, destructive actions like filtering and deleting specific values, aesthetic adjustments to standardize values, or structural changes that include renaming columns, moving them around, and merging them together.

The key functionality offered by RDI is mapping the data coming from Debezium Server (representing a Source Database row data or row state change) into a Redis key/value pair. The incoming data includes the schema. By default, each source row is converted into one Hash or one JSON key in Redis. RDI will use a set of handlers to automatically convert each source column to a Redis Hash field or a JSON attribute based on the Debezium type in the schema.

Converting captured streams.

However, if you want to customize this default mapping or add a new transformation, RDI provides Declarative Data Transformations (YAML files). Each YAML file contains a Job, a set of transformations per source table. The source is typically a database table or collection and is specified as the full name of this table/collection. The job contains logical steps to transform data into the desired output and store it in Redis (as Hash or JSON). All of these files will be uploaded to Redis Data Integration using the deploy command when they are available under the jobs folder:

├── debezium
│   └── application.properties
├── jobs
│   └── README.md
└── config.yaml

We’ve seen in Data 101 - part 5 that the pipelines required to run the transformation processes can be implemented using one of these approaches:

  • Code-centric tools: analysis and manipulation libraries built on top of general-purpose programming languages (Scala, Java or Python). These libraries manipulate data using the native data structures of the programming language.
  • Query-centric tools: use a querying language like SQL (Structured Query Language) to manage and manipulate datasets. These languages can be used to create, update and delete records, as well as query the data for specific information.
  • Hybrid tools: implement SQL on top of general-purpose programming languages. This is the case for libraries like Apache Spark or Apache Kafka, which provides a SQL dialect called KSQL.

Redis Data Integration (RDI) leverages the hybrid approach since all transformation jobs are implemented using a human-readable format (YAML files) that embeds JMESPath and/or SQL.

The YAML files accept the following blocks/fields: 

source - This section describes what is the table that this job works on:

  • server_name: logical server name (optional)
  • db: DB name (optional)
  • schema: DB schema (optional)
  • table: DB table
  • row_format: Format of the data to be transformed: data_only (default) - only payload, full - complete change record 

transform: his section includes a series of blocks that the data should go through. See documentation of the supported blocks and JMESPath custom functions.

output - This section includes the outputs where the data should be written to:

  1. Redis:
    • usesredis.write: Write to a Redis data structure
    • with:
      • connection: Connection name
      • key: This allows to override the key of the record by applying a custom logic:
        • expression: Expression to execute
        • language: Expression language, JMESPath, or SQL
  2. SQL:
    • usesrelational.write: Write into a SQL-compatible data store
    • with:
      • connection: Connection name
      • schema: Schema
      • table: Target table
      • keys: Array of key columns
      • mapping: Array of mapping columns
      • opcode_field: Name of the field in the payload that holds the operation (c - create, d - delete, u - update) for this record in the DB

I’ve detailed many data transformations archetypes in Data 101 - part 5 and find it interesting to evaluate Redis Data Integration through this list of capabilities. Thus, you can see how to perform different kinds of transformations using RDI.

1 - Filtering

This process selects a subset from your dataset (specific columns) that require transformation, viewing, or analysis. This selection can be based on certain criteria, such as specific values in one or more columns, and it typically results in only part of the original data being used. As a result, filtering allows you to quickly identify trends and patterns within your dataset that may not have been visible before. It also lets you focus on particular aspects of interest without sifting through all available information. In addition, this technique can reduce complexity by eliminating unnecessary details while still preserving important insights about the underlying data structure.

Filtering a dataset.

Using Redis Data Integration, filtering the Employee’s data (example above) to keep only people having a salary that exceeds 1,000 can be implemented using the following YAML blocks:

1
2
3
4
5
6
7
source:
  table: Employee
transform:
  - uses: filter
    with:
      language: sql
      expression: SAL>1000

When you put this YAML file under the jobs folder, Redis Data Integration will capture changes from the source table and apply the filter to store only records confirming the filtering expression (see Data & Redis - part 1 for RDI and SQL Server configuration).

Filtering Employees having salaries of more than 1,000.

2 - Enriching

This process fills out the basic gaps in the data set. It also enhances existing information by supplementing incomplete or missing data with relevant context. It aims to improve accuracy, quality, and value for better results.

Enriching a dataset.

Let’s assume the example above. We need to replace all NULL salaries in the Employee’s table with a default value of 0. In SQL, the COALESCE function returns the first non-NULL value in the attribute list. Thus COALESCE(SAL, 0) returns the salary if it is not null or 0 elsewhere. With RDI, we can implement this enrichment using the following job:

1
2
3
4
5
6
7
8
9
10
source:
    table: Employee
  transform:
    - uses: map
      with:
        expression:
          EMPNO: EMPNO
          ENAME: ENAME
          SAL: COALESCE(SAL, 0)
        language: sql

In this configuration, we used the map block that maps each source record into a new output based on the expressions. Here we changed only the salary field that implements the COALESCE expression.

Replacing missing salaries with a default value (0).

If you are using SQL Server, another alternative to performing this enrichment is to use the ISNULL function. Thus, we can use ISNULL(SAL, 0) in the expression block. The ISNULL function and the COALESCE expression have a similar purpose but can behave differently. Because ISNULL is a function, it is evaluated only once. However, the input values for the COALESCE expression can be evaluated multiple times. Moreover, the data type determination of the resulting expression is different. ISNULL uses the data type of the first parameter, COALESCE follows the CASE expression rules and returns the data type of value with the highest precedence.

1
2
3
4
5
6
7
8
9
10
source:
  table: Employee
transform:
  - uses: map
    with:
      expression:
        EMPNO: EMPNO
        ENAME: ENAME
        SAL: ISNULL(SAL, 0)
      language: sql 

3 - Splitting

Splitting fields into multiple ones consists of two atomic operations: adding the new fields according to specific transformation rules, then removing the source field (split column).

Splitting a column.

In the example above, we split the EFULLNAME into two fields: ELASTNAME and EFIRSTNAME. The following configuration uses the add_field block to create the new fields ELASTNAME and EFIRSTNAME. Then, we can use the SUBSTRING function from SQL or the SPLIT function from JMESPath. In both cases, we need the additional block remove_field to remove the source column EFULLNAME.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
source:
  table: Employee
transform:
  - uses: add_field
    with:
      fields:
        - field: EFIRSTNAME
          language: jmespath
          expression: split(EFULLNAME, ', ')[0]
        - field: ELASTNAME
          language: jmespath
          expression: split(EFULLNAME, ', ')[1]
  - uses: remove_field
    with:
      field: EFULLNAME

The split function breaks down the EFULLNAME into an array using the string separators provided as parameters (the comma character as separator).

Splitting Full Name into First and Last Name. 

4 - Merging

Merging multiple fields into one consists of two atomic operations: adding the new field according to a specific transformation rule, then removing the source fields (merged columns).

Merging two columns. 

In the example above, we merge the EFIRSTNAME and ELASTNAME into one field: EFULLNAME. The following configuration uses the add_field block to create the new fields EFULLNAME and two remove_field blocks to remove the merged columns EFIRSTNAME and ELASTNAME. To express the transformation rule, we can use the CONCAT_WS function from SQL or the JOIN / CONCAT functions from JMESPath.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
source:
  table: Employee
transform:
  - uses: add_field
    with:
      fields:
        - field: EFULLNAME
          language: jmespath
          expression: concat([EFIRSTNAME, ' ', ELASTNAME])
          
  - uses: remove_field
    with:
      fields:
        - field: EFIRSTNAME
        - field: ELASTNAME

Merging First Name and Last Name into one field. 

5 - Removing

Besides removing specific columns from the source using the remove_field block or even avoiding the load of some columns using filtering. We might need to drop parts of data according to a specific condition, such as duplicates. In this case, Redis Data Integration doesn’t have a specific block or function to perform the drop of duplicates. However, we can use the key block to create a custom key for the output composed of all fields that form the duplicate.

Drop duplicates. 

For example, let’s assume the use case above. If we observe the EMPNO column, we have a distinct ID for each record. However, three records are duplicates, in fact. So, in this case, we want to drop these duplicates according to the EFULLNAME and SAL fields and not to EMPNO. The solution in RDI is to create a new key that preserves the unicity of records: A key composed of the concatenation of EFULLNAME and SAL. Thus RDI can drop any duplicates based on the newly created key.

1
2
3
4
5
6
7
8
9
source:
  table: Employee
output:
  - uses: redis.write
    with:
      connection: target
      key:
        expression: hash(concat([EFULLNAME, '-', SAL]), 'sha3_512')
        language: jmespath

In addition, we use the hash function to create an ID instead of a set of concatenated fields. However, beware that It may be possible that two concatenations (different strings) have the same hash values. This may occur because we take modulo ‘M’ in the final hash value. In that case, two different combinations of (EFULLNAME ‘-‘ SAL) may have the same hash values, called a collision. 

However, the chances of random collisions are negligibly small for even billions of assets. Because the SHA-3 series is built to offer 2n/2 collision resistance. In our transformation, we’ve chosen SHA3-512, which offers 2256 (or 1 chance over 115,792,089,237,316,195,423,570,985,008,687,907,853,269,984,665,640,564,039,457,584,007,913,129,639,936 to get another String combination having the same hash!

Dropping 3 duplicates. 

6 - Derivation

The derivation is cross-column calculations. With RDI, we can easily create a new field based on calculations from existing fields. Let’s assume the example below. We need to calculate the total compensation of each employee based on the salaries and bonuses they get.

Derivation. 

The following job implements this kind of derivation using SQL by summing up the SAL and BONUS fields and storing them into one additional field called TOTALCOMP:

1
2
3
4
5
6
7
8
9
source:
  table: Employee
transform:
  - uses: add_field
    with:
      fields:
        - field: TOTALCOMP
          language: sql
          expression: SAL + BONUS

Derivate Total Compensation from Salary and Bonus fields. 

7 - Data Denormalization

Redis Data Integration (RDI) has a different approach to performing joins between two tables that have one-to-many or many-to-one relationships. This approach is called the nesting strategy. It consists of organizing data into a logical structure where the parent-children relationship is converted into a schema based on nesting. Denormalized data often contain duplicate values when represented as tables or hashes, increasing storage requirements but making querying faster since all relevant information for a given task may be found within one table instead of having to join multiple tables/hashes together first before running queries on them. 

Duplicate values when denormalizing. 

However, you can choose to perform the denormalization using JSON format. In this case, no duplication will be represented; thus no impact on the storage since the parent-children relationship is just reflected hierarchically.

Let’s assume the two tables: Department and Employee. We will create a declarative data transformation that denormalizes these two tables into one nested structure in JSON. The aim is to get the details of employees in each department.

Denormalization using the nest strategy. 

Let’s create the following file in the jobs directory. This declarative file merges the two tables into a single JSON object. It also demonstrates how easy to set such a complex transformation using a simple YAML declarative file.

1
2
3
4
5
6
7
8
9
10
11
12
source:
  table: Employee
output:
  - uses: redis.write
    with:
      nest:
        parent:
          table: Department
        nesting_key: EMPNO         # cannot be composite
        parent_key: DEPTNO         # cannot be composite
        path: $.Employees          # path must start from root ($)
        structure: map  

Using the Debezium SQL Server connector is a good practice to have a dedicated user with the minimal required permissions in SQL Server to control blast radius. For that, you need to run the T-SQL script below:

1
2
3
4
5
6
7
8
USE master
GO
CREATE LOGIN dbzuser WITH PASSWORD = 'dbz-password'
GO
USE HR
GO
CREATE USER dbzuser FOR LOGIN dbzuser
GO

And Grant the Required Permissions to the new User

1
2
3
4
USE HR
GO
EXEC sp_addrolemember N'db_datareader', N'dbzuser'
GO

Then you must enable Change Data Capture (CDC) for each database and table you want to capture.

1
2
EXEC msdb.dbo.rds_cdc_enable_db 'HR'
GO

Run this T-SQL script for each table in the database and substitute the table name in @source_name with the names of the tables (Employee and Department):

1
2
3
4
5
6
7
8
USE HR
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name   = N'<Table_Name>', 
@role_name     = N'db_cdc',
@supports_net_changes = 0
GO

Finally, the Debezium user created earlier (dbzuser) needs access to the captured change data, so it must be added to the role created in the previous step.

1
2
3
4
USE HR
GO  
EXEC sp_addrolemember N'db_cdc', N'dbzuser'
GO

You can verify access by running this T-SQL script as user dbzuser:

1
2
3
4
USE HR
GO  
EXEC sys.sp_cdc_help_change_data_capture
GO

In the RDI configuration file config.yaml, you need to add some of the following settings.

1
2
3
4
5
6
7
8
9
connections:
  target:
    host: redis-13000.cluster.redis-ingest.demo.redislabs.com
    port: 13000
    user: default
    password: rdi-password  
applier:
  target_data_type: json
  json_update_strategy: merge 

Caution: If you want to execute normalization/denormalization jobs, It is mandatory to load the release 0.100 (at least) of Redis Data Integration.

For UBUNTU20.04

wget https://qa-onprem.s3.amazonaws.com/redis-di/0.100.0/redis-di-ubuntu20.04-0.100.0.tar.gz -O /tmp/redis-di-offline.tar.gz

For UBUNTU18.04

wget https://qa-onprem.s3.amazonaws.com/redis-di/0.100.0/redis-di-ubuntu18.04-0.100.0.tar.gz -O /tmp/redis-di-offline.tar.gz

For RHEL8

wget https://qa-onprem.s3.amazonaws.com/redis-di/0.100.0/redis-di-rhel8-0.100.0.tar.gz -O /tmp/redis-di-offline.tar.gz

For RHEL7

wget https://qa-onprem.s3.amazonaws.com/redis-di/0.100.0/redis-di-rhel7-0.100.0.tar.gz -O /tmp/redis-di-offline.tar.gz

Then you install the RDI CLI by unpacking redis-di-offline.tar.gz into the /usr/local/bin/ directory:

sudo tar xvf /tmp/redis-di-offline.tar.gz -C /usr/local/bin/

Upgrade your Redis Data Integration (RDI) engine to comply with the new redis-di CLI. For this run:

redis-di upgrade --cluster-host cluster.redis-process.demo.redislabs.com --cluster-user [CLUSTER_ADMIN_USER] --cluster-password [ADMIN_PASSWORD] --rdi-host redis-13000.cluster.redis-process.demo.redislabs.com --rdi-port 13000 --rdi-password rdi-password

Then, run the deploy command to deploy the local configuration to the remote RDI config database:

redis-di deploy --rdi-host redis-12000.cluster.redis-process.demo.redislabs.com --rdi-port 12000 --rdi-password rdi-password

Change directory to your Redis Data Integration configuration folder created by the scaffold command, then run:

docker run -d --name debezium --network=host --restart always -v $PWD/debezium:/debezium/conf --log-driver local --log-opt max-size=100m --log-opt max-file=4 --log-opt mode=non-blocking debezium/server:2.1.1.Final

Check the Debezium Server log:

docker logs debezium --follow

Denormalizing Employee and Department tables. 

Redis Data Integration (RDI) performs data denormalization in a performant and complete manner. It is not only structuring the source tables into one single structure, but it can handle in the same way the late arriving data: If the nested data is captured before the parent-level data, RDI creates a JSON structure for the child-level records, and as soon as the parent-level data arrives, RDI creates the structure for the parent-level record, then merges all children (nested) records into their parent structure. For example, let’s consider these two tables: Invoice and InvoiceLine. When you try to insert an InvoiceLine contained by an Invoice before this later, RDI will create the JSON structure for InvoiceLine and wait for the Invoice structure. As soon as you insert the containing Invoice, RDI initiates the Invoice JSON structure and merges it with the InvoiceLines created earlier.

Late arriving data with Redis Data Integration. 

One of the issues observed so far with RDI’s denormalization is the nesting limit (limited to one level). It is only possible for the moment to denormalize up to two tables with one-to-many or many-to-one relationships.

8 - Data Normalization

In addition to data ingest, Redis Data Integration (RDI) also allows synchronizing the data stored in a Redis DB with some downstream data stores. This scenario is called Write-Behind, and you can think about it as a pipeline that starts with Capture Data Change (CDC) events for a Redis DB and then filters, transforms, and maps the data to a target data store (e.g., a relational database).

Redis Data Integration use cases. 

We’ve seen in the last section that we can perform data denormalization to join multiple tables with one-to-many or many-to-one relationships into one single structure in Redis. On the other side, data normalization is one of the transformations we can perform using the Write-Behind use case. Data normalization is organizing data into a logical structure that can be used to improve performance and reduce redundancy. This involves breaking down complex datasets into smaller, more manageable pieces by eliminating redundant information or consolidating related items together. Normalization also helps ensure consistency in storing and accessing data across different systems.

Normalization vs. Denormalization. 

Let’s assume this JSON document is stored in Redis, which consists of an Invoice with the details it contains (InvoiceLines). We want to normalize this structure into two separate tables: a table including invoices and another containing invoice lines. For example, with a single nested structure (one invoice composed of three invoice lines), we should have in the target two tables containing four records: one in the invoice table and three in the invoice line table.

In this section, we will use the database redis-13000.cluster.redis-process.demo.redislabs.com:13000 as a data source. This database must include RedisGears and RedisJSON modules to execute the following actions.

First, you need to create and install the RDI engine on your Redis source database so it is ready to process data. You need to run the configure command if you have not used this Redis database with RDI Write Behind before.

redis-di configure --rdi-host redis-13000.cluster.redis-process.demo.redislabs.com --rdi-port 13000 --rdi-password rdi-password

Then run the scaffold command with the type of data store you want to use, for example:

redis-di scaffold --strategy write_behind --dir . --db-type mysql

This will create a template of config.yaml and a folder named jobs under the current directory. You can specify any folder name with --dir or use the --preview config.yaml option in order to get the config.yaml template to the terminal.

Let’s assume that your target MySQL database endpoint is rdi-wb-db.cluster-cpqlgenz3kvv.eu-west-3.rds.amazonaws.com. You need to add the connection(s) required for downstream targets in the connections section of the config.yaml, for example:

1
2
3
4
5
6
7
8
connections:
  my-sql-target:
    type: mysql
    host: rdi-wb-db.cluster-cpqlgenz3kvv.eu-west-3.rds.amazonaws.com
    port: 3306
    database: sales
    user: admin
    password: rdi-password

In the MySQL server, you need to create the sales database and the two tables, Invoice and InvoiceLine:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
USE mysql;
CREATE DATABASE `sales`;

CREATE TABLE `sales`.`Invoice` (
    `InvoiceId` bigint NOT NULL,
    `CustomerId` bigint NOT NULL,
    `InvoiceDate` varchar(100) NOT NULL,
    `BillingAddress` varchar(100) NOT NULL,
    `BillingPostalCode` varchar(100) NOT NULL,
    `BillingCity` varchar(100) NOT NULL,
    `BillingState` varchar(100) NOT NULL,
    `BillingCountry` varchar(100) NOT NULL,
    `Total` int NOT NULL,
    PRIMARY KEY (InvoiceId)
);

CREATE TABLE `sales`.`InvoiceLine` (
    `InvoiceLineId` bigint NOT NULL,
    `TrackId` bigint NOT NULL,
    `InvoiceId` bigint NOT NULL,
    `Quantity` int NOT NULL,
    `UnitPrice` int NOT NULL,
    PRIMARY KEY (InvoiceLineId)
);

Now, let’s create the following file in the jobs directory. This declarative file splits the JSON structure and creates the two tables in a MySQL database called sales. You can define different targets for these two tables by defining other connections in the config.yaml file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
source:
  keyspace:
    pattern : invoice:*
output:
  - uses: relational.write
    with:
      connection: my-sql-target
      schema: sales
      table: Invoice
      keys:
        - InvoiceId
      mapping:
        - CustomerId
        - InvoiceId
        - InvoiceDate
        - BillingAddress
        - BillingPostalCode
        - BillingCity
        - BillingState
        - BillingCountry
        - Total
  - uses: relational.write
    with:
      connection: my-sql-target
      schema: sales
      table: InvoiceLine
      foreach: "IL: InvoiceLineItems.values(@)"
      keys:
        - IL: InvoiceLineItems.InvoiceLineId
      mapping:
        - UnitPrice: IL.UnitPrice
        - Quantity: IL.Quantity
        - TrackId: IL.TrackId
        - InvoiceId

To start the pipeline, run the deploy command:

redis-di deploy

You can check that the pipeline is running, receiving, and writing data using the status command:

redis-di status

Once you run the deploy command, the RDI engine registers the job and listens to the keyspace notifications on the pattern invoice:* Thus, if you add this JSON document, RDI will run the job and execute the data transformation accordingly.

Normalizing a nested JSON into Invoice and InvoiceLine tables. 

Summary

This article illustrates how to perform complex data transformations using Redis Data Integration (RDI). This is my second post on RDI since I presented it in the past as a data ingestion tool. Here, we pushed the data journey further and used RDI as a data processing and transformation engine. 

In the previous sections, I presented a set of data transformation scenarios more often required in any enterprise-grade data platform and tried to assess RDI capabilities accordingly. The tool is still under heavy development and private previews, but it offers many promising capabilities to implement a complete real-time data platform.

References