Sending data to Flume using Python

The pollution data are retrieved from an external api (see this post for more information) and send to Apache Flume. Apache Flume is a service for streaming data into Hadoop and other streaming applications. Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of streaming data.

For the streaming data pipeline on pollution in Flanders, the data was send to Hadoop HDFS and to Apache Kafka. A Flume source captures the data received from the external api.

The python code to send the data to the flume agent using json format. The standard JSONHandler needs a header and a body section. Also the headers of the request requires to specify the content type :

url_flume = 'http://<ip-address>:<port>'
payload = [{'headers': {}, 'body': data_clean }]
headers = {'content-type': 'application/json'}
response = requests.post(url_flume, data=json.dumps(payload), 
            headers=headers)

Every Flume agent has normally one source, a memory channel and a sink. The incoming data can however be sent to more than one sink. For each additional sink, the source needs another memory channel.
A flow multiplexer is defined that can replicate or selectively route an event to one or more channels.

The configuration of the agent to receive and send the data is given below.

# Name the components on this agent
aircheckr1.sources = http_aircheckr
aircheckr1.sinks = hdfs_sink kafka_sink
aircheckr1.channels = channel_hdfs channel_kafka

# Describe/configure the source
aircheckr1.sources.http_aircheckr.type = http
aircheckr1.sources.http_aircheckr.bind = 0.0.0.0
aircheckr1.sources.http_aircheckr.port = 9260

# Describe the sink
aircheckr1.sinks.hdfs_sink.type = hdfs
aircheckr1.sinks.hdfs_sink.hdfs.path = hdfs://192.168.1.242/flume/aircheckr
aircheckr1.sinks.hdfs_sink.hdfs.rollInterval = 86400
aircheckr1.sinks.hdfs_sink.hdfs.rollSize = 0

aircheckr1.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
aircheckr1.sinks.kakfa_sink.kafka.bootstrap.servers = ubuntu238:9092
aircheckr1.sinks.kafka_sink.kafka.topic = aircheckr
aircheckr1.sinks.kafka_sink.flumeBatchSize = 10

# Use a channel which buffers events in memory
aircheckr1.channels.channel_hdfs.type = memory
aircheckr1.channels.channel_hdfs.capacity = 1000
aircheckr1.channels.channel_hdfs.transactionCapacity = 500

aircheckr1.channels.channel_kafka.type = memory
aircheckr1.channels.channel_kafka.capacity = 1000
aircheckr1.channels.channel_kafka.transactionCapacity = 10

# Bind the source and sinks to the channel
aircheckr1.sources.http_aircheckr.channels = channel_hdfs channel_kafka
aircheckr1.sinks.hdfs_sink.channel = channel_hdfs
aircheckr1.sinks.kafka_sink.channel = channel_kafka

Links:
Apache Flume: https://flume.apache.org/

2 Replies to “Sending data to Flume using Python”

Leave a Reply

Your email address will not be published. Required fields are marked *