While making my first data stream through Kafka into Neo4j, I found it quite difficult to figure out exactly how to properly set everything up. I'd like to share what I've learned by showing and explaining parts of the docker-compose configuration of a proof of concept that I've made. The proof of concept showcases the power of Neo4j in a food traceability context. The entire docker-compose.yml and Neo4j sink connector configuration can be found here.
We use this to go over the following topics:
After this blog post, you will understand how to setup up a Neo4J sink connector for Kafka as we did in our Traceability proof of concept to have all the event messages in Kafka automatically ingested into our Neo4J database.
Kafka and Zookeeper are the two microservices that form the basis of every Kafka setup.
Zookeeper is the distributed consensus manager for the Kafka brokers (these are the container(s) running Kafka). In a production-ready Kafka deployment, there will be multiple brokers running and the data sent to Kafka will be replicated in different brokers, such that data can persist even if a broker goes down. The zookeeper service manages the collaboration between the brokers and ensures the High Availability of the Kafka service.
Even though our example will only use 1 Kafka broker, we still have to spin up zookeeper for it to function (We use docker-compose version 3):
We set 'ZOOKEEPER_CLIENT_PORT: 2181', so we will connect to zookeeper in the internal docker network on zookeeper:2181.
The Kafka container contains our broker. We will be writing messages to and reading messages from the topics in this broker. [More info on topics](https://dattell.com/data-architecture-blog/what-is-a-kafka-topic/.)
The Kafka container has the following configuration:
The most important configuration is the advertised listeners. We set the advertised listeners such that we can properly connect to the broker. In the internal docker network, we can reach the broker on kafka:9092 and on the host machine on localhost:29092. We then set the security protocol for each listener and tell Kafka which listener it uses for inter-broker communication (not that important for us since we only use 1 broker). more info
We then have to set the replication factor for the "offsets" topic (an internal topic that stores topic offsets for consumers) to 1. We can not replicate topics as we only have 1 broker. And since the default offset replication factor is 3, we would run into problems otherwise.
In our case, we want to create some topics on start-up. We use another Kafka container for this:
The five topics grainbatches, flourbatches, breadbatches, distributions, and purchases are now created when we docker-compose up. After the creation of the topics, this container will be obsolete and will shut itself down. If for some reason you want to prevent the container from shutting down, you can simply add "sleep infinity" at the end of the command.
There are a couple of different APIs to write data to and read data from Kafka topics. In the code on Github, we use a producer in python to write data to our Kafka topics, and then we use the connect API to read this data into Neo4j. An overview of the different APIs can be found here.
We will now focus on configuring a Neo4j sink connector. A (sink) connector consists of 3 different parts:
A more in-depth discussion on connectors, converters, and schema-registry can be found here.
The schema-registry is configured as follows:
Nothing special is happening here. We will be able to connect to the service in the internal docker network on schema-registry:8081.
The connector configuration is a bit more involved:
Important configuration parameters are:
This is the configuration for the converter. Data in Kafka topics gets stored as key/value pairs, where the value contains the main data. Usually, the 'string converter' will be appropriate for CONNECT_KEY_CONVERTER.
For the value converter, we have to choose the right type according to how our data was serialized. I serialized my data as JSON schema, hence why we use the JSON converter with schema enabled.
There are a couple of possibilities for the serialization, most notably AVRO, protobuf, JSON schema, and plain JSON. JSON schema is pretty much equivalent to JSON serialization. The difference is that JSON schema requires the schema of the messages to be explicitly defined. For example, a message in JSON schema could look like this:
while its plain JSON complement would exist of just the ‘payload’.
If we want to only make a Neo4j sink connector, we could use plain JSON serialization, this works. It is however good practice to always use a serialization type with a schema, since an SQL sink connector, for example, will not function on plain JSON data.
Another important configuration parameter is:
This is the path to the .jar-files of the connector. We get these .jar-files in that path with the command:
After that, we launch the connect worker and wait for it to start listening on localhost:8083. We then create the Neo4j sink connector with:
where we mounted the configuration file 'contrib.sink.string-json.neo4j.json' located in the 'connectConfFiles' folder from our host machine to the connect container via:
In this section, we look more in-depth at the configuration file 'contrib.sink.string-json.neo4j.json' of the Neo4j sink connector and examine some best practices for data ingestion into Neo4j.
The configuration file that I used, looks like this:
The sink connector reads data from the 5 topics that we previously created.
We set the error.tolerance to all, meaning that the connector will not stop working if it comes across unexpected data. For example, if our connector somehow comes across some AVRO serialized data in one of the topics, it will not be able to process this data (we told our connector that the data was serialized as JSON schema).
It is possible to send this unprocessed data to another topic called a dead-letter queue and have it be processed there. More info can be found here. I did not implement this.
The last 5 blocks in the configuration file contain the cypher queries used to process the data from each of the topics. Every message read from the Kafka topic can be seen as 1 row from a table. We specify the specific message entries that we want to use in our query by
For example, a message in our grainbatches topic looks like this:
That we use as:
So, if they do not yet exist, We create a Farmer node with id 17, a Grainbatch node with id 0, and a PRODUCED relation from the Farmer to the Grainbatch with its timestamp.
Here, I would like to share some personal best practices for the Neo4j sink connector queries that I found while attempting to properly load my data from the Kafka topics into Neo4j. It consists of 4 rules:
This list is by no means exhaustive, but for any relatively simple data ingestion, it should suffice. These rules are all in place to avoid duplicated data and race conditions.
We will now explain for every rule why it is in place. We use messages from the following 2 fictitious Kafka topics in the explanations:
So we want to create a Person node with properties:
with a LIVES_IN relation to a House node with properties:
Following our best practices, the queries to populate the Neo4j database with this data is:
When we use MERGE (= CREATE if it does not exist, MATCH if it exists), we do not have to think about whether or not some entity exists in our database already.
For example, we would try to use:
But then the Houses query must be run before the People query for the LIVES_IN relation to be able to be established. These types of race conditions are not present if we simply use MERGE everywhere.
the MERGE clause MATCHes or CREATEs the entire specified path. It does not look at the individual entities.
For example, we could try to use:
If now the Houses query is run before the People query, the People query will create the entire specified path and so another house with id 23 will be created and related to John Smith. We want to avoid these types of data duplication.
This is again necessary because MERGE only looks at the entirety of what is specified.
For example, we could try to use:
If the Houses query is run after the People query, the Houses query will not be able to MATCH a house with all properties
Such a house will be CREATEd, but now we again have 2 houses with id 23. Another source of data duplication that we want to avoid.
Despite all of the measures that we took, it is still possible for ingested data to become duplicated. This is because of concurrent query execution. More info
Luckily, we can completely prevent duplication from happening by initializing our Neo4j database with unicity constraints. More info about constraints
To initialize unicity constraints on our dockerized Neo4j instance, we have to copy an apoc.conf file and a file detailing the constraint cipher queries into the /var/lib/neo4j/conf directory of our container:
the apoc.conf file looks like this:
On startup of the container it calls runSchemaFile on the file /var/lib/neo4j/conf/unicity.cypher, which in my case looks like:
We assert on every node type, that its id must be unique.
More info about initializing your dockerized neo4j instance can be found here.
We went over the most important configuration for setting up a Neo4j sink connector in Kafka, as I did in the project https://github.com/blqblqblq159/Traceability. I hope this was helpful to you. If you still have questions after reading this post or if I forgot to mention some important configuration parameters, definitely let me know.
Any questions, remarks, or suggestions can be directed at firstname.lastname@example.org. We'd be delighted to hear from you.