While making my first data stream through Kafka into Neo4j, I found it quite difficult to figure out exactly how to properly set everything up. I'd like to share what I've learned by showing and explaining parts of the docker-compose configuration of a proof of concept that I've made. The proof of concept showcases the power of Neo4j in a food traceability context.
May 5, 2022
Get smarter
How to set up a Neo4j sink connector in Kafka
Introduction
While making my first data stream through Kafka into Neo4j, I found it quite difficult to figure out exactly how to properly set everything up. I'd like to share what I've learned by showing and explaining parts of the docker-compose configuration of a proof of concept that I've made. The proof of concept showcases the power of Neo4j in a food traceability context. The entire docker-compose.yml and Neo4j sink connector configuration can be found here.
We use this to go over the following topics:
What are all the necessary different Kafka microservices? What are they used for?
How do we set up and configure our Neo4j Sink connector? What are some (personal) best practices for data ingestion into Neo4j?
After this blog post, you will understand how to setup up a Neo4J sink connector for Kafka as we did in our Traceability proof of concept to have all the event messages in Kafka automatically ingested into our Neo4J database.
Kafka: a quick overview
Kafka and Zookeeper
Kafka and Zookeeper are the two microservices that form the basis of every Kafka setup.
Zookeeper is the distributed consensus manager for the Kafka brokers (these are the container(s) running Kafka). In a production-ready Kafka deployment, there will be multiple brokers running and the data sent to Kafka will be replicated in different brokers, such that data can persist even if a broker goes down. The zookeeper service manages the collaboration between the brokers and ensures the High Availability of the Kafka service.
Even though our example will only use 1 Kafka broker, we still have to spin up zookeeper for it to function (We use docker-compose version 3):
We set 'ZOOKEEPER_CLIENT_PORT: 2181', so we will connect to zookeeper in the internal docker network on zookeeper:2181.
More info on zookeeper can be found here. This will eventually not be relevant anymore as a new consensus protocol (KRaft) for Kafka is in development at the time of writing.
The most important configuration is the advertised listeners. We set the advertised listeners such that we can properly connect to the broker. In the internal docker network, we can reach the broker on kafka:9092 and on the host machine on localhost:29092. We then set the security protocol for each listener and tell Kafka which listener it uses for inter-broker communication (not that important for us since we only use 1 broker). more info
We then have to set the replication factor for the "offsets" topic (an internal topic that stores topic offsets for consumers) to 1. We can not replicate topics as we only have 1 broker. And since the default offset replication factor is 3, we would run into problems otherwise.
In our case, we want to create some topics on start-up. We use another Kafka container for this:
The five topics grainbatches, flourbatches, breadbatches, distributions, and purchases are now created when we docker-compose up. After the creation of the topics, this container will be obsolete and will shut itself down. If for some reason you want to prevent the container from shutting down, you can simply add "sleep infinity" at the end of the command.
Schema-registry and connect
There are a couple of different APIs to write data to and read data from Kafka topics. In the code on Github, we use a producer in python to write data to our Kafka topics, and then we use the connect API to read this data into Neo4j. An overview of the different APIs can be found here.
We will now focus on configuring a Neo4j sink connector. A (sink) connector consists of 3 different parts:
converter: converts the data from the Kafka topics to their original form with its original schema, this is where the schema-registry microservice comes in. The schema registry provides an interface for storing and retrieving your data schemas. More info
transform(s): apply some transformations to the data before sinking it into the database. This is optional and we do not use this feature here.
connector: sink the data into the database according to its schema and the connector configuration.
A more in-depth discussion on connectors, converters, and schema-registry can be found here.
This is the configuration for the converter. Data in Kafka topics gets stored as key/value pairs, where the value contains the main data. Usually, the 'string converter' will be appropriate for CONNECT_KEY_CONVERTER.
For the value converter, we have to choose the right type according to how our data was serialized. I serialized my data as JSON schema, hence why we use the JSON converter with schema enabled.
There are a couple of possibilities for the serialization, most notably AVRO, protobuf, JSON schema, and plain JSON. JSON schema is pretty much equivalent to JSON serialization. The difference is that JSON schema requires the schema of the messages to be explicitly defined. For example, a message in JSON schema could look like this:
while its plain JSON complement would exist of just the ‘payload’.
If we want to only make a Neo4j sink connector, we could use plain JSON serialization, this works. It is however good practice to always use a serialization type with a schema, since an SQL sink connector, for example, will not function on plain JSON data.
After that, we launch the connect worker and wait for it to start listening on localhost:8083. We then create the Neo4j sink connector with:
cd /connectConfFiles
curl -i -X PUT -H Content-Type:application/json http://localhost:8083/connectors/sink-neo4j-orders-00/config -d@contrib.sink.string-json.neo4j.json
where we mounted the configuration file 'contrib.sink.string-json.neo4j.json' located in the 'connectConfFiles' folder from our host machine to the connect container via:
volumes:
- ./connectConfFiles:/connectConfFiles
Neo4j sink connector
In this section, we look more in-depth at the configuration file 'contrib.sink.string-json.neo4j.json' of the Neo4j sink connector and examine some best practices for data ingestion into Neo4j.
Configuration
The configuration file that I used, looks like this:
The sink connector reads data from the 5 topics that we previously created.
We set the error.tolerance to all, meaning that the connector will not stop working if it comes across unexpected data. For example, if our connector somehow comes across some AVRO serialized data in one of the topics, it will not be able to process this data (we told our connector that the data was serialized as JSON schema).
It is possible to send this unprocessed data to another topic called a dead-letter queue and have it be processed there. More info can be found here. I did not implement this.
The last 5 blocks in the configuration file contain the cypher queries used to process the data from each of the topics. Every message read from the Kafka topic can be seen as 1 row from a table. We specify the specific message entries that we want to use in our query by
event.
For example, a message in our grainbatches topic looks like this:
So, if they do not yet exist, We create a Farmer node with id 17, a Grainbatch node with id 0, and a PRODUCED relation from the Farmer to the Grainbatch with its timestamp.
Best practices for Neo4j data ingestion
Here, I would like to share some personal best practices for the Neo4j sink connector queries that I found while attempting to properly load my data from the Kafka topics into Neo4j. It consists of 4 rules:
use MERGE, never CREATE
Individually MERGE every node and relationship
MERGE a node on its unique id and SET its properties later
Impose unicity constraints to avoid data duplication
This list is by no means exhaustive, but for any relatively simple data ingestion, it should suffice. These rules are all in place to avoid duplicated data and race conditions.
We will now explain for every rule why it is in place. We use messages from the following 2 fictitious Kafka topics in the explanations:
People
Houses
So we want to create a Person node with properties:
id: 5
first_name: John
last_name: Smith
with a LIVES_IN relation to a House node with properties:
id: 23
house_type: mansion
address: 42nd street, New York
Following our best practices, the queries to populate the Neo4j database with this data is:
When we use MERGE (= CREATE if it does not exist, MATCH if it exists), we do not have to think about whether or not some entity exists in our database already.
But then the Houses query must be run before the People query for the LIVES_IN relation to be able to be established. These types of race conditions are not present if we simply use MERGE everywhere.
Individually MERGE everything
the MERGE clause MATCHes or CREATEs the entire specified path. It does not look at the individual entities.
If now the Houses query is run before the People query, the People query will create the entire specified path and so another house with id 23 will be created and related to John Smith. We want to avoid these types of data duplication.
MERGE nodes on their unique ids, then set properties
This is again necessary because MERGE only looks at the entirety of what is specified.
If the Houses query is run after the People query, the Houses query will not be able to MATCH a house with all properties
id: 23
house_type: mansion
address: 42nd street, New York
Such a house will be CREATEd, but now we again have 2 houses with id 23. Another source of data duplication that we want to avoid.
Impose unicity constraints
Despite all of the measures that we took, it is still possible for ingested data to become duplicated. This is because of concurrent query execution. More info
Luckily, we can completely prevent duplication from happening by initializing our Neo4j database with unicity constraints. More info about constraints
To initialize unicity constraints on our dockerized Neo4j instance, we have to copy an apoc.conf file and a file detailing the constraint cipher queries into the /var/lib/neo4j/conf directory of our container:
On startup of the container it calls runSchemaFile on the file /var/lib/neo4j/conf/unicity.cypher, which in my case looks like:
CREATE CONSTRAINT idx1 IF NOT EXISTS ON (a:Farmer) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx2 IF NOT EXISTS ON (a:Grainbatch) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx3 IF NOT EXISTS ON (a:Processor) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx4 IF NOT EXISTS ON (a:Flourbatch) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx5 IF NOT EXISTS ON (a:Bakery) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx6 IF NOT EXISTS ON (a:Breadbatch) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx7 IF NOT EXISTS ON (a:Machinebatch) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx8 IF NOT EXISTS ON (a:Breadmachine) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx9 IF NOT EXISTS ON (a:Customer) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx10 IF NOT EXISTS ON (a:Minute) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx11 IF NOT EXISTS ON (a:Purchase) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT idx12 IF NOT EXISTS ON (a:Second) ASSERT a.id IS UNIQUE;
We assert on every node type, that its id must be unique.
More info about initializing your dockerized neo4j instance can be found here.
Conclusion
We went over the most important configuration for setting up a Neo4j sink connector in Kafka, as I did in the project https://github.com/blqblqblq159/Traceability. I hope this was helpful to you. If you still have questions after reading this post or if I forgot to mention some important configuration parameters, definitely let me know.
Any questions, remarks, or suggestions can be directed at hello@dotdash.ai. We'd be delighted to hear from you.
Happy coding!
Download our whitepaper on creating business value with smart data