Produce and Consume a Message in the Kafka Cluster
To follow this procedure, you must have a Repository containing the information of the Saagie platform. To this end, create a context group and define the contexts and variables required.
In this case, define the following context variables:
Name | Type | Value |
---|---|---|
zookeeper |
String |
|
brokers |
String |
|
Once your context group is created and stored in the Repository, you can apply the Repository context variables to your jobs.
For more information, read the whole section on Using contexts and variables on Talend Studio User Guide. |
Produce a Message to the Kafka Cluster
-
Create a new job in Talend.
-
Add the following components and source component:
-
tKafkaConnection
, to establish a Kafka connection reusable by other Kafka components. -
tKafkaCreateTopic
, to create the Kafka topic that the other Kafka components can use. -
tRowGenerator
is the source component used to generate as many rows and fields as required. -
tKafkaOutput
, to send the message to the Kafka cluster.
-
-
Double-click each component and configure their settings as follows:
In the Basic settings tab:
-
From the Version list, select the version of the Kafka cluster to be used.
-
In the Zookeeper quorum list field, enter the address of the Zookeeper service of the Kafka cluster to be used.
Example 1. Zookeeper IP addresslocalhost:2181
-
In the Broker list field, enter the addresses of the broker nodes of the Kafka cluster to be used.
Example 2. Brokers IP addresslocalhost:9092
For more information, you can refer to Talend’s documentation on the tKafkaConnection
component.In the Basic settings tab:
-
From the Version list, select the version of the Kafka cluster to be used.
-
Select the Use an existing connection option.
-
From the Component List list, select the
tKafkaConnection
connection component to reuse the connection details already defined. -
From the Action on topic list, select how a topic is created.
-
In the Topic name field, enter the name of the topic to be created.
-
In the Replication factor field, specify the number of the replicas to be created for the logs of the topic partitions.
-
In the Number of partitions field, specify the number of partitions to be created for the topic.
-
Select the Set topic retention time (ms) option, and define the maximum time in milliseconds during which the contents of the topic are retained.
For more information, you can refer to Talend’s documentation on the tKafkaCreateTopic
component.Define the structure of data to be generated as follows:
-
Add a column by clicking the plus (+) button.
-
Define the nature of the data; In the Type column, select
bytes[]
from the list. -
Set the environment variable as follows:
TalendDataGenerator.getUsCity().getBytes()
. -
In the Number of Rows for RowGenerator field, enter the number of rows to generate.
For more information, you can refer to Talend’s documentation on the tRowGenerator
component.In the Basic settings tab:
-
From the Schema list, select Built-In to create and store the schema locally for this component only.
-
Select the Use an existing connection option.
-
From the Component List list, select the
tKafkaConnection
connection component to reuse the connection details already defined. -
In the Topic name field, enter the name of the topic you want to publish messages to.
This topic must already exist.
For more information, you can refer to Talend’s documentation on the tKafkaOutput
component. -
-
Run the job.
Consume a Message to the Kafka Cluster
-
Create a new job in Talend.
-
Add the following components:
-
tKafkaConnection
, to establish a Kafka connection reusable by other Kafka components. -
tKafkaInput
, to transmit messages. -
tLogRow
, to display data or results in the Run console to monitor data processed.
-
-
Double-click each component and configure their settings as follows:
In the Basic settings tab:
-
From the Version list, select the version of the Kafka cluster to be used.
-
In the Zookeeper quorum list field, enter the address of the Zookeeper service of the Kafka cluster to be used.
Example 3. Zookeeper IP addresslocalhost:2181
-
In the Broker list field, enter the addresses of the broker nodes of the Kafka cluster to be used.
Example 4. Brokers IP addresslocalhost:9092
For more information, you can refer to Talend’s documentation on the tKafkaConnection
component.In the Basic settings tab:
-
In the Output type list, select the type of the data to be sent to the next component:
bytes[]
. -
From the Schema list, select Built-In to create and store the schema locally for this component only.
-
Select the Use an existing connection option.
-
From the Component List list, select the
tKafkaConnection
connection component to reuse the connection details already defined. -
In the Topic name field, enter the name of the topic from which
tKafkaInput
receives the feed of messages. -
In the Consumer group ID field, enter the name of the consumer group to which you want the current consumer (the
tKafkaInput
component) to belong. -
From the New consumer group starts from list, select the starting point from which the messages of a topic are consumed.
-
Select the Auto-commit offsets option to make tKafkaInput automatically save its consumption state at the end of each given time interval. You must then define this interval in the Interval field that is displayed.
-
Select the Stop after maximum time waiting between messages (ms) option, then enter the waiting time by tKafkaInput for a new message.
For more information, you can refer to Talend’s documentation on the tKafkaInput
component.For more information, you can refer to Talend’s documentation on the tLogRow
component. -
-
Run the job.
-
Example of a Git project to produce a message in Kafka (GitHub page)
-
Example of a Git project to consume a message in Kafka (GitHub page)
-
Kafka documentation about Kafka Producers and Kafka Consumers