Another step in the data pipeline: from Kafka to Kudu

The data pipeline described in other post (link) gave an overview of the complete data flow from external api to visualization.

Streaming Data Pipeline

A Kafka consumer is created using the Python Client for Kafka and the python Kudu interfaces allows the data to be sent to a Kudu table.

The Kafka consumers is created using a consumer group id which allows offset management to be handled by Kafka.

from kafka import KafkaConsumer
import kudu
import json
import datetime

consumer = KafkaConsumer('aircheckr_city', bootstrap_servers='', group_id='test_kudu', auto_offset_reset='earliest')
while True:

To be fault-tolerant, committing offsets is mandatory.


Before starting the infinite loop, a connection to the Kudu master is established:

client = kudu.connect(host='ubuntu243', port=7051)

When a message is read from the kafka topic, the records is send to Kudu (after transformation – not shown):

    table = client.table('impala::default.kudu_aircheckr_stream')
    session = client.new_session()
    op = table.new_insert(dictval)
    except kudu.KuduBadStatus as e:
        print("kudu bad status during flush")

Note the naming conventions for Kudu tables that are created via impala: impala::database_name.table_name.

Note that this is a solution for low volumes because the python script is executed on the node where the script is installed and not the complete cluster. If execution on the cluster is needed other solution such as spark streaming need to be considered.



Python client for Apache Kafka:

Python interface for Apache Kudu:

Using Kafka Consumer Groups:

Kudu tables naming conventions:

Leave a Reply