Produce and Consume a Message in the Kafka Cluster

Before you begin:

To follow this procedure, you must have a Repository containing the information of the Saagie platform. To this end, create a context group and define the contexts and variables required.

In this case, define the following context variables:

Name Type Value

zookeeper

String

localhost:2181

brokers

String

localhost:9092

Once your context group is created and stored in the Repository, you can apply the Repository context variables to your jobs.

For more information, read the whole section on Using contexts and variables on Talend Studio User Guide.

Produce a Message to the Kafka Cluster

  1. Create a new job in Talend.

  2. Add the following components and source component:

    • tKafkaConnection, to establish a Kafka connection reusable by other Kafka components.

    • tKafkaCreateTopic, to create the Kafka topic that the other Kafka components can use.

    • tRowGenerator is the source component used to generate as many rows and fields as required.

    • tKafkaOutput, to send the message to the Kafka cluster.

    kafka produce message with talend

  3. Double-click each component and configure their settings as follows:

    • tKafkaConnection

    • tKafkaCreateTopic

    • tRowGenerator

    • tKafkaOutput

    In the Basic settings tab:

    1. From the Version list, select the version of the Kafka cluster to be used.

    2. In the Zookeeper quorum list field, enter the address of the Zookeeper service of the Kafka cluster to be used.

      Example 1. Zookeeper IP address

      localhost:2181

    3. In the Broker list field, enter the addresses of the broker nodes of the Kafka cluster to be used.

      Example 2. Brokers IP address

      localhost:9092

    For more information, you can refer to Talend’s documentation on the tKafkaConnection component.

    In the Basic settings tab:

    1. From the Version list, select the version of the Kafka cluster to be used.

    2. Select the Use an existing connection option.

    3. From the Component List list, select the tKafkaConnection connection component to reuse the connection details already defined.

    4. From the Action on topic list, select how a topic is created.

    5. In the Topic name field, enter the name of the topic to be created.

    6. In the Replication factor field, specify the number of the replicas to be created for the logs of the topic partitions.

    7. In the Number of partitions field, specify the number of partitions to be created for the topic.

    8. Select the Set topic retention time (ms) option, and define the maximum time in milliseconds during which the contents of the topic are retained.

    For more information, you can refer to Talend’s documentation on the tKafkaCreateTopic component.

    Define the structure of data to be generated as follows:

    1. Add a column by clicking the plus (+) button.

    2. Define the nature of the data; In the Type column, select bytes[] from the list.

    3. Set the environment variable as follows: TalendDataGenerator.getUsCity().getBytes().

    4. In the Number of Rows for RowGenerator field, enter the number of rows to generate.

    For more information, you can refer to Talend’s documentation on the tRowGenerator component.

    In the Basic settings tab:

    1. From the Schema list, select Built-In to create and store the schema locally for this component only.

    2. Select the Use an existing connection option.

    3. From the Component List list, select the tKafkaConnection connection component to reuse the connection details already defined.

    4. In the Topic name field, enter the name of the topic you want to publish messages to.

      This topic must already exist.
    For more information, you can refer to Talend’s documentation on the tKafkaOutput component.
  4. Run the job.

Consume a Message to the Kafka Cluster

  1. Create a new job in Talend.

  2. Add the following components:

    • tKafkaConnection, to establish a Kafka connection reusable by other Kafka components.

    • tKafkaInput, to transmit messages.

    • tLogRow, to display data or results in the Run console to monitor data processed.

    kafka consume message with talend

  3. Double-click each component and configure their settings as follows:

    • tKafkaConnection

    • tKafkaInput

    • tLogRow

    In the Basic settings tab:

    1. From the Version list, select the version of the Kafka cluster to be used.

    2. In the Zookeeper quorum list field, enter the address of the Zookeeper service of the Kafka cluster to be used.

      Example 3. Zookeeper IP address

      localhost:2181

    3. In the Broker list field, enter the addresses of the broker nodes of the Kafka cluster to be used.

      Example 4. Brokers IP address

      localhost:9092

    For more information, you can refer to Talend’s documentation on the tKafkaConnection component.

    In the Basic settings tab:

    1. In the Output type list, select the type of the data to be sent to the next component: bytes[].

    2. From the Schema list, select Built-In to create and store the schema locally for this component only.

    3. Select the Use an existing connection option.

    4. From the Component List list, select the tKafkaConnection connection component to reuse the connection details already defined.

    5. In the Topic name field, enter the name of the topic from which tKafkaInput receives the feed of messages.

    6. In the Consumer group ID field, enter the name of the consumer group to which you want the current consumer (the tKafkaInput component) to belong.

    7. From the New consumer group starts from list, select the starting point from which the messages of a topic are consumed.

    8. Select the Auto-commit offsets option to make tKafkaInput automatically save its consumption state at the end of each given time interval. You must then define this interval in the Interval field that is displayed.

    9. Select the Stop after maximum time waiting between messages (ms) option, then enter the waiting time by tKafkaInput for a new message.

    For more information, you can refer to Talend’s documentation on the tKafkaInput component.
    For more information, you can refer to Talend’s documentation on the tLogRow component.
  4. Run the job.