Real-time traffic data

The Roads and Traffic Agency (Agentschap Wegen en Verkeer) [2] and the Flemish Traffic Centre (Vlaams Verkeerscentrum) [3] provide open data about the current traffic situation [1]. More of 4000 measurement points dispersed all over Flanders – but mainly located on highways, detect each minute the number of vehicles and their speed.

We use the same front-end and data pipeline as presented for the pollution data [4]. Because the amount of data is pretty big (more than 4000 measurements each minute), only a small selection of measurement points (~ 100) is captured every 10 minutes by an edge node and send to flume for further processing [5]. Two streams are captured, one with the list of measurement points and the second with the real-time traffic data (see figure).

Real Time Traffic Data Pipeline
Real Time Traffic Data Pipeline

The traffic data are send to Apache Kafka [6] and two topics are created. The topic with measurement points are stored in Apache Hive [7] to have a structured query interface for further processing. The topic with the real-time traffic data is consumed with a python script and enriched with the measurement points stored in Hive to create a new Kafka topic with the enriched traffic data (see figure). This new topic is read by Druid [8] for visualization and data discovery using several tools such as Apache Superset [9].

The two other Kafka topics are send to Kudu tables which are used for the visualizations (see live map here).

 

[1] Meten-in-Vlaanderen (MIV) http://opendata.vlaanderen.be/dataset/minuutwaarden-verkeersmetingen-vlaanderen

[2] Agentschap Wegen en Verkeer http://www.wegenenverkeer.be/

[3] Vlaams Verkeerscentrum http://www.verkeerscentrum.be/

[4] Map of pollution in Flanders http://www.bigdatareflections.net/blog/?p=1

[5] Sending data to Flume using Python http://www.bigdatareflections.net/blog/?p=35

[6] Apache Kafka https://kafka.apache.org/

[7] Apache Hive https://hive.apache.org/

[8] Druid http://druid.io/

[9] Using Superset with Kudu http://www.bigdatareflections.net/blog/?p=52

 

 

Another step in the data pipeline: from Kafka to Kudu

The data pipeline described in other post (link) gave an overview of the complete data flow from external api to visualization.

Streaming Data Pipeline

A Kafka consumer is created using the Python Client for Kafka and the python Kudu interfaces allows the data to be sent to a Kudu table.

The Kafka consumers is created using a consumer group id which allows offset management to be handled by Kafka.

from kafka import KafkaConsumer
import kudu
import json
import datetime

consumer = KafkaConsumer('aircheckr_city', bootstrap_servers='192.168.1.238:9092', group_id='test_kudu', auto_offset_reset='earliest')
while True:
    msg=next(consumer)

To be fault-tolerant, committing offsets is mandatory.

….
consumer.commit_async()

Before starting the infinite loop, a connection to the Kudu master is established:

client = kudu.connect(host='ubuntu243', port=7051)

When a message is read from the kafka topic, the records is send to Kudu (after transformation – not shown):

    table = client.table('impala::default.kudu_aircheckr_stream')
    session = client.new_session()
    op = table.new_insert(dictval)
    session.apply(op)
    try:
        session.flush()
    except kudu.KuduBadStatus as e:
        print("kudu bad status during flush")
        print(session.get_pending_errors())

Note the naming conventions for Kudu tables that are created via impala: impala::database_name.table_name.

Note that this is a solution for low volumes because the python script is executed on the node where the script is installed and not the complete cluster. If execution on the cluster is needed other solution such as spark streaming need to be considered.

 

Links

Python client for Apache Kafka: https://github.com/dpkp/kafka-python

Python interface for Apache Kudu: https://github.com/cloudera/kudu/tree/master/python

Using Kafka Consumer Groups: https://docs.confluent.io/current/clients/consumer.html

Kudu tables naming conventions: https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_tables.html#kudu_tables