How to set up a Neo4j sink connector in Kafka

While making my first data stream through Kafka into Neo4j, I found it quite difficult to figure out exactly how to properly set everything up. I'd like to share what I've learned by showing and explaining parts of the docker-compose configuration of a proof of concept that I've made. The proof of concept showcases the power of Neo4j in a food traceability context.
May 5, 2022
Get smarter

How to set up a Neo4j sink connector in Kafka

Introduction

While making my first data stream through Kafka into Neo4j, I found it quite difficult to figure out exactly how to properly set everything up. I'd like to share what I've learned by showing and explaining parts of the docker-compose configuration of a proof of concept that I've made. The proof of concept showcases the power of Neo4j in a food traceability context. The entire docker-compose.yml and Neo4j sink connector configuration can be found on https://github.com/blqblqblq159/Traceability.

We use this to go over the following topics:

  • What are all the necessary different Kafka microservices? What are they used for?
  • How do we set up and configure our Neo4j Sink connector? What are some (personal) best practices for data ingestion into Neo4j?

After this blog post, you will understand how to setup up a Neo4J  sink connector for Kafka as we did in our Traceability proof of concept to have all the event messages in Kafka automatically ingested into our Neo4J database.

Kafka: a quick overview

Kafka and Zookeeper

Kafka and Zookeeper are the two microservices that form the basis of every Kafka setup.

Zookeeper is the distributed consensus manager for the Kafka brokers (these are the container(s) running Kafka). In a production-ready Kafka deployment, there will be multiple brokers running and the data sent to Kafka will be replicated in different brokers, such that data can persist even if a broker goes down. The zookeeper service manages the collaboration between the brokers and ensures the High Availability of the Kafka service.

Even though our example will only use 1 Kafka broker, we still have to spin up zookeeper for it to function (We use docker-compose version 3):

zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000

We set 'ZOOKEEPER_CLIENT_PORT: 2181', so we will connect to zookeeper in the internal docker network on zookeeper:2181.

More info on zookeeper can be found here. This will eventually not be relevant anymore as a new consensus protocol (KRaft) for Kafka is in development at the time of writing.

The Kafka container contains our broker. We will be writing messages to and reading messages from the topics in this broker. [More info on topics](https://dattell.com/data-architecture-blog/what-is-a-kafka-topic/.)

The Kafka container has the following configuration:

kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

The most important configuration is the advertised listeners. We set the advertised listeners such that we can properly connect to the broker. In the internal docker network, we can reach the broker on kafka:9092 and on the host machine on localhost:29092. We then set the security protocol for each listener and tell Kafka which listener it uses for inter-broker communication (not that important for us since we only use 1 broker). more info

We then have to set the replication factor for the "offsets" topic (an internal topic that stores topic offsets for consumers) to 1. We can not replicate topics as we only have 1 broker. And since the default offset replication factor is 3, we would run into problems otherwise.

In our case, we want to create some topics on start-up. We use another Kafka container for this:

init-kafka: image: confluentinc/cp-kafka:latest depends_on: - kafka entrypoint: [ '/bin/sh', '-c' ] command: | " # blocks until kafka is reachable kafka-topics --bootstrap-server kafka:9092 --list # create topics echo -e 'Creating kafka topics' kafka-topics --bootstrap-server kafka:9092 --create --topic grainbatches kafka-topics --bootstrap-server kafka:9092 --create --topic flourbatches kafka-topics --bootstrap-server kafka:9092 --create --topic breadbatches kafka-topics --bootstrap-server kafka:9092 --create --topic distibutions kafka-topics --bootstrap-server kafka:9092 --create --topic purchases "

The five topics grainbatches, flourbatches, breadbatches, distributions, and purchases are now created when we docker-compose up. After the creation of the topics, this container will be obsolete and will shut itself down. If for some reason you want to prevent the container from shutting down, you can simply add "sleep infinity" at the end of the command.

Schema-registry and connect

There are a couple of different APIs to write data to and read data from Kafka topics. In the code on Github, we use a producer in python to write data to our Kafka topics, and then we use the connect API to read this data into Neo4j. An overview of the different APIs can be found here.

We will now focus on configuring a Neo4j sink connector. A (sink) connector consists of 3 different parts:

  • converter: converts the data from the Kafka topics to their original form with its original schema, this is where the schema-registry microservice comes in. The schema registry provides an interface for storing and retrieving your data schemas. More info
  • transform(s): apply some transformations to the data before sinking it into the database. This is optional and we do not use this feature here.
  • connector: sink the data into the database according to its schema and the connector configuration.

A more in-depth discussion on connectors, converters, and schema-registry can be found here.

The schema-registry is configured as follows:

schema-registry: image: confluentinc/cp-schema-registry:latest environment: SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181" SCHEMA_REGISTRY_HOST_NAME: schema-registry depends_on: - zookeeper - kafka ports: - '8081:8081'

Nothing special is happening here. We will be able to connect to the service in the internal docker network on schema-registry:8081.

The connector configuration is a bit more involved:

connect: hostname: connect image: confluentinc/cp-kafka-connect-base:latest depends_on: - schema-registry - kafka ports: - '8083:8083' environment: CONNECT_BOOTSTRAP_SERVERS: "kafka:9092" CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: kafka-connect CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets CONNECT_STATUS_STORAGE_TOPIC: _connect-status CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: true CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components volumes: - ./connectConfFiles:/connectConfFiles command: - bash - -c - | echo "Installing Connectors" confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest # echo "Launching Kafka Connect worker" /etc/confluent/docker/run & # # Wait for Kafka Connect listener echo "Waiting for Kafka Connect to start listening on localhost" while : ; do curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)" if [ $$curl_status -eq 200 ] ; then break fi sleep 5 done # create Neo4j sink connector cd /connectConfFiles curl -i -X PUT -H Content-Type:application/json http://localhost:8083/connectors/sink-neo4j-orders-00/config -d@contrib.sink.string-json.neo4j.json # sleep infinity

Important configuration parameters are:

CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: true

This is the configuration for the converter. Data in Kafka topics gets stored as key/value pairs, where the value contains the main data. Usually, the 'string converter' will be appropriate for CONNECT_KEY_CONVERTER.

For the value converter, we have to choose the right type according to how our data was serialized. I serialized my data as JSON schema, hence why we use the JSON converter with schema enabled.

There are a couple of possibilities for the serialization, most notably AVRO, protobuf, JSON schema, and plain JSON. JSON schema is pretty much equivalent to JSON serialization. The difference is that JSON schema requires the schema of the messages to be explicitly defined. For example, a message in JSON schema could look like this:

message = { "schema": { "type": "struct", "optional": False, "version": 1, "fields": [ { "field": "grainbatch_id", "type": "int64", "optional": True }, { "field": "parent_farmer_id", "type": "int64", "optional": True }, { "field": "timestamp", "type": "string", "optional": True } ] }, "payload": { "grainbatch_id": 3, "parent_farmer_id": 1, "timestamp": '14:01:10.218524' } }

while its plain JSON complement would exist of just the ‘payload’.

If we want to only make a Neo4j sink connector, we could use plain JSON serialization, this works. It is however good practice to always use a serialization type with a schema, since an SQL sink connector, for example, will not function on plain JSON data.

Another important configuration parameter is:

CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components

This is the path to the .jar-files of the connector. We get these .jar-files in that path with the command:

confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest

After that, we launch the connect worker and wait for it to start listening on localhost:8083. We then create the Neo4j sink connector with:

cd /connectConfFiles curl -i -X PUT -H Content-Type:application/json http://localhost:8083/connectors/sink-neo4j-orders-00/config -d@contrib.sink.string-json.neo4j.json

 where we mounted the configuration file 'contrib.sink.string-json.neo4j.json' located in the 'connectConfFiles' folder from our host machine to the connect container via:

volumes: - ./connectConfFiles:/connectConfFiles

Neo4j sink connector

In this section, we look more in-depth at the configuration file 'contrib.sink.string-json.neo4j.json' of the Neo4j sink connector and examine some best practices for data ingestion into Neo4j.

Configuration

The configuration file that I used, looks like this:

{ "name": "sink-neo4j-orders-00", "topics": "grainbatches, flourbatches, breadbatches, distributions, purchases", "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector", "errors.retry.timeout": "-1", "errors.retry.delay.max.ms": "1000", "errors.tolerance": "all", "errors.log.enable": true, "errors.log.include.messages": true, "neo4j.server.uri": "bolt://neo4j:7687", "neo4j.authentication.basic.username": "neo4j", "neo4j.authentication.basic.password": "connect", "neo4j.encryption.enabled": false, #data sink queries "neo4j.topic.cypher.grainbatches": " MERGE (a:Farmer {id:event.parent_farmer_id}) MERGE (b:Grainbatch {id:event.grainbatch_id}) MERGE (a)-[:PRODUCED {timestamp:event.timestamp}]->(b) ", "neo4j.topic.cypher.flourbatches": " MERGE (a:Processor {id:event.parent_processor_id}) MERGE (b:Flourbatch {id:event.flourbatch_id}) MERGE (a)-[:PROCESSED {timestamp:event.timestamp}]->(b) MERGE (c:Grainbatch {id:event.parent_grainbatch_id}) MERGE (c)-[:PROCESSED_TO]->(b) ", "neo4j.topic.cypher.breadbatches": " MERGE (a:Bakery {id:event.parent_bakery_id}) MERGE (b:Breadbatch {id:event.breadbatch_id}) MERGE (a)-[:BAKED {timestamp:event.timestamp}]->(b) MERGE (c:Flourbatch {id:event.parent_flourbatch_id}) MERGE (c)-[:BAKED_TO]->(b) ", "neo4j.topic.cypher.distributions": " MERGE (a:Breadmachine {id:event.parent_vending_machine_id}) MERGE (b:Machinebatch {id:event.distribution_id}) MERGE (a)-[:HOLDS]->(b) MERGE (c:Breadbatch {id:event.parent_breadbatch_id}) MERGE (c)-[:DISTRIBUTED_TO]->(b) MERGE (d:Second {id:event.second}) MERGE (e:Minute {id:event.minute}) MERGE (e)-[:MINUTE_OF]->(d) MERGE (b)-[:DISTRIBUTED_AT]->(d) ", "neo4j.topic.cypher.purchases": " MERGE (a:Machinebatch {id:event.parent_distribution_id}) MERGE (b:Purchase {id:event.purchase_id}) SET b.goodrating = event.good_rating MERGE (a)-[:SELL_SOURCE]->(b) MERGE (c:Customer {id:event.child_customer_id}) MERGE (b)-[:BOUGHT_BY {timestamp:event.timestamp}]->(c) " #These queries were split over multiple lines for readability, #JSON does not support these kind of multi-line strings #Original config file contains single line queries }

The sink connector reads data from the 5 topics that we previously created.

We set the error.tolerance to all, meaning that the connector will not stop working if it comes across unexpected data. For example, if our connector somehow comes across some AVRO serialized data in one of the topics, it will not be able to process this data (we told our connector that the data was serialized as JSON schema).

It is possible to send this unprocessed data to another topic called a dead-letter queue and have it be processed there. More info can be found here. I did not implement this.

The last 5 blocks in the configuration file contain the cypher queries used to process the data from each of the topics. Every message read from the Kafka topic can be seen as 1 row from a table. We specify the specific message entries that we want to use in our query by

event.

For example, a message in our grainbatches topic looks like this:

grainbatches

That we use as:

"neo4j.topic.cypher.grainbatches": " MERGE (a:Farmer {id:event.parent_farmer_id}) MERGE (b:Grainbatch {id:event.grainbatch_id}) MERGE (a)-[:PRODUCED {timestamp:event.timestamp}]->(b)"

So, if they do not yet exist, We create a Farmer node with id 17, a Grainbatch node with id 0, and a PRODUCED relation from the Farmer to the Grainbatch with its timestamp.

Best practices for Neo4j data ingestion

Here, I would like to share some personal best practices for the Neo4j sink connector queries that I found while attempting to properly load my data from the Kafka topics into Neo4j. It consists of 4 rules:

  • use MERGE, never CREATE
  • Individually MERGE every node and relationship
  • MERGE a node on its unique id and SET its properties later
  • Impose unicity constraints to avoid data duplication

This list is by no means exhaustive, but for any relatively simple data ingestion, it should suffice. These rules are all in place to avoid duplicated data and race conditions.

We will now explain for every rule why it is in place. We use messages from the following 2 fictitious Kafka topics in the explanations:

People

Houses

So we want to create a Person node with properties:

  • id: 5
  • first_name: John
  • last_name: Smith

with a LIVES_IN relation to a House node with properties:

  • id: 23
  • house_type: mansion
  • address: 42nd street, New York

Following our best practices, the queries to populate the Neo4j database with this data is:

"neo4j.topic.cypher.People": " MERGE (a:Person {id:event.person_id}) SET a.last_name = event.last_name, a.first_name = event.first_name MERGE (b:House {id:event.house_id}) MERGE (a)-[:LIVES_IN]->(b) ", "neo4j.topic.cypher.Houses": " MERGE (a:House {id:event.house_id}) SET a.house_type = event.house_type, a.address = event.address "

use MERGE, not CREATE

When we use MERGE (= CREATE if it does not exist, MATCH if it exists), we do not have to think about whether or not some entity exists in our database already.

For example, we would try to use:

"neo4j.topic.cypher.People": " MERGE (a:Person {id:event.person_id}) SET a.last_name = event.last_name, a.first_name = event.first_name MATCH (b:House {id:event.house_id}) MERGE (a)-[:LIVES_IN]->(b) ", "neo4j.topic.cypher.Houses": " CREATE (a:House {id:event.house_id}) SET a.house_type = event.house_type, a.address = event.address "

But then the Houses query must be run before the People query for the LIVES_IN relation to be able to be established. These types of race conditions are not present if we simply use MERGE everywhere.

Individually MERGE everything

the MERGE clause MATCHes or CREATEs the entire specified path. It does not look at the individual entities.

For example, we could try to use:

"neo4j.topic.cypher.People": " MERGE (a:Person {id:event.person_id})-[:LIVES_IN]->(b:House {id:event.house_id}) SET a.last_name = event.last_name, a.first_name = event.first_name ", "neo4j.topic.cypher.Houses": " MERGE (a:House {id:event.house_id}) SET a.house_type = event.house_type, a.address = event.address "

If now the Houses query is run before the People query, the People query will create the entire specified path and so another house with id 23 will be created and related to John Smith. We want to avoid these types of data duplication.

MERGE nodes on their unique ids, then set properties

This is again necessary because MERGE only looks at the entirety of what is specified.

For example, we could try to use:

"neo4j.topic.cypher.People": " MERGE (a:Person {id:event.person_id}) SET a.last_name = event.last_name, a.first_name = event.first_name MERGE (b:House {id:event.house_id}) MERGE (a)-[:LIVES_IN]->(b) ", "neo4j.topic.cypher.Houses": " MERGE (a:House { id: event.house_id, a.house_type: event.house_type, a.address: event.address}) "

If the Houses query is run after the People query, the Houses query will not be able to MATCH a house with all properties

  • id: 23
  • house_type: mansion
  • address: 42nd street, New York

Such a house will be CREATEd, but now we again have 2 houses with id 23. Another source of data duplication that we want to avoid.

Impose unicity constraints

Despite all of the measures that we took, it is still possible for ingested data to become duplicated. This is because of concurrent query execution. More info

Luckily, we can completely prevent duplication from happening by initializing our Neo4j database with unicity constraints. More info about constraints

To initialize unicity constraints on our dockerized Neo4j instance, we have to copy an apoc.conf file and a file detailing the constraint cipher queries into the /var/lib/neo4j/conf directory of our container:

neo4j: image: neo4j:latest container_name: neo4j ports: - "7474:7474" - "7687:7687" environment: NEO4J_AUTH: neo4j/connect NEO4J_dbms_memory_heap_max__size: 8G NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes' NEO4J_dbms_directories_import: "/" NEO4JLABS_PLUGINS: '["apoc"]' volumes: - ./neo4j-init-files:/var/lib/neo4j/conf

the apoc.conf file looks like this:

apoc.import.file.use_neo4j_config=true apoc.import.file.enabled=true apoc.initializer.neo4j.1=CALL apoc.cypher.runSchemaFile("file:////var/lib/neo4j/conf/unicity.cypher")

On startup of the container it calls runSchemaFile on the file /var/lib/neo4j/conf/unicity.cypher, which in my case looks like:

CREATE CONSTRAINT idx1 IF NOT EXISTS ON (a:Farmer) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx2 IF NOT EXISTS ON (a:Grainbatch) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx3 IF NOT EXISTS ON (a:Processor) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx4 IF NOT EXISTS ON (a:Flourbatch) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx5 IF NOT EXISTS ON (a:Bakery) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx6 IF NOT EXISTS ON (a:Breadbatch) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx7 IF NOT EXISTS ON (a:Machinebatch) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx8 IF NOT EXISTS ON (a:Breadmachine) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx9 IF NOT EXISTS ON (a:Customer) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx10 IF NOT EXISTS ON (a:Minute) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx11 IF NOT EXISTS ON (a:Purchase) ASSERT a.id IS UNIQUE; CREATE CONSTRAINT idx12 IF NOT EXISTS ON (a:Second) ASSERT a.id IS UNIQUE;

We assert on every node type, that its id must be unique.

More info about initializing your dockerized neo4j instance can be found here.

Conclusion

We went over the most important configuration for setting up a Neo4j sink connector in Kafka, as I did in the project https://github.com/blqblqblq159/Traceability. I hope this was helpful to you. If you still have questions after reading this post or if I forgot to mention some important configuration parameters, definitely let me know.

Any questions, remarks, or suggestions can be directed at hello@dotdash.ai. We'd be delighted to hear from you.

Happy coding!

Download our whitepaper on creating business value with smart data
Short description of whitepaper CTA
Download