Real-time traffic data

The Roads and Traffic Agency (Agentschap Wegen en Verkeer) [2] and the Flemish Traffic Centre (Vlaams Verkeerscentrum) [3] provide open data about the current traffic situation [1]. More of 4000 measurement points dispersed all over Flanders – but mainly located on highways, detect each minute the number of vehicles and their speed.

We use the same front-end and data pipeline as presented for the pollution data [4]. Because the amount of data is pretty big (more than 4000 measurements each minute), only a small selection of measurement points (~ 100) is captured every 10 minutes by an edge node and send to flume for further processing [5]. Two streams are captured, one with the list of measurement points and the second with the real-time traffic data (see figure).

Real Time Traffic Data Pipeline
Real Time Traffic Data Pipeline

The traffic data are send to Apache Kafka [6] and two topics are created. The topic with measurement points are stored in Apache Hive [7] to have a structured query interface for further processing. The topic with the real-time traffic data is consumed with a python script and enriched with the measurement points stored in Hive to create a new Kafka topic with the enriched traffic data (see figure). This new topic is read by Druid [8] for visualization and data discovery using several tools such as Apache Superset [9].

The two other Kafka topics are send to Kudu tables which are used for the visualizations (see live map here).

 

[1] Meten-in-Vlaanderen (MIV) http://opendata.vlaanderen.be/dataset/minuutwaarden-verkeersmetingen-vlaanderen

[2] Agentschap Wegen en Verkeer http://www.wegenenverkeer.be/

[3] Vlaams Verkeerscentrum http://www.verkeerscentrum.be/

[4] Map of pollution in Flanders http://www.bigdatareflections.net/blog/?p=1

[5] Sending data to Flume using Python http://www.bigdatareflections.net/blog/?p=35

[6] Apache Kafka https://kafka.apache.org/

[7] Apache Hive https://hive.apache.org/

[8] Druid http://druid.io/

[9] Using Superset with Kudu http://www.bigdatareflections.net/blog/?p=52

 

 

Using Superset with Kudu

Apache Superset is a visualization tool, originally developed by Airbnb [1]. The goal to develop this tool is to empower every user of data, especially users for which SQL is often a too high barrier. A second goal is to allow users to explore and discover insights with a user-friendly interface. The solution to both of these challenges is the development of Superset. Only recently it became an Apache project [2].

Once Apache superset is running, a new database connection is made using Impala. The URI has the format impala://<host>:21050/<database>.

Note that first the impala driver needs to be installed (pip install impyla) [3]

Next, a table is created to enable the usage of a dataset inside a slice or dashboard. Because only one data source can be used for each slice, the tables will often be a view of several tables that are joined beforehand. [4] This means if your tables are designed as dimensions and facts, a join between all tables needs to be made when the “table” is defined, or else a single table needs to be materialized that contain all table in the data source.

Once that table/view is defined, columns and metrics need to be defined before it can be used to define a slice.

At last, the slice can be created and added to a dashboard.

Links

[1] Superset: Scaling Data Access and Visual Insights at Airbnb  https://medium.com/airbnb-engineering/superset-scaling-data-access-and-visual-insights-at-airbnb-3ce3e9b88a7f

[2] Apache Superset (incubating) https://superset.incubator.apache.org/

[3] Installation & Configuration http://superset.apache.org/installation.html

[4] FAQ http://superset.apache.org/faq.html

 

 

Uses cases for Kudu

The big data landscape was until 1-3 years ago dominated by several storage systems, the first was Hadoop HDFS and later followed by Apache HBase, a NoSQL database. HDFS is great for high-speed writes and scans while the latter is well suited for random-access queries. A new storage engine, Apache Kudu tries to bridge the gap between those two uses cases.  Apache Kudu is a distributed, columnar database for structured, real-time data. Because Kudu has a schema, it is only suited for structured data, contrary to HBase which is schemaless. The data model resembles that of more traditional databases such as SybaseIQ or SAP Hana which uses columnar storage, that makes it well suited for OLAP queries. It is not an in-memory database such as SAP Hana, but uses persistent memory integrated in the block cache.

Because Apache Kudu allows both fast scan and random access, a big data architecture can be much simplified to address any analytics and business intelligence use case.  Kudu enables the use of the same storage engine for large scale batch jobs and complex data processing jobs that require fast random access and updates. As a result, applications that require both batch as well as real-time data processing capabilities can use Kudu for both types of workloads.

Another step in the data pipeline: from Kafka to Kudu

The data pipeline described in other post (link) gave an overview of the complete data flow from external api to visualization.

Streaming Data Pipeline

A Kafka consumer is created using the Python Client for Kafka and the python Kudu interfaces allows the data to be sent to a Kudu table.

The Kafka consumers is created using a consumer group id which allows offset management to be handled by Kafka.

from kafka import KafkaConsumer
import kudu
import json
import datetime

consumer = KafkaConsumer('aircheckr_city', bootstrap_servers='192.168.1.238:9092', group_id='test_kudu', auto_offset_reset='earliest')
while True:
    msg=next(consumer)

To be fault-tolerant, committing offsets is mandatory.

….
consumer.commit_async()

Before starting the infinite loop, a connection to the Kudu master is established:

client = kudu.connect(host='ubuntu243', port=7051)

When a message is read from the kafka topic, the records is send to Kudu (after transformation – not shown):

    table = client.table('impala::default.kudu_aircheckr_stream')
    session = client.new_session()
    op = table.new_insert(dictval)
    session.apply(op)
    try:
        session.flush()
    except kudu.KuduBadStatus as e:
        print("kudu bad status during flush")
        print(session.get_pending_errors())

Note the naming conventions for Kudu tables that are created via impala: impala::database_name.table_name.

Note that this is a solution for low volumes because the python script is executed on the node where the script is installed and not the complete cluster. If execution on the cluster is needed other solution such as spark streaming need to be considered.

 

Links

Python client for Apache Kafka: https://github.com/dpkp/kafka-python

Python interface for Apache Kudu: https://github.com/cloudera/kudu/tree/master/python

Using Kafka Consumer Groups: https://docs.confluent.io/current/clients/consumer.html

Kudu tables naming conventions: https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_tables.html#kudu_tables

Sending data to Flume using Python

The pollution data are retrieved from an external api (see this post for more information) and send to Apache Flume. Apache Flume is a service for streaming data into Hadoop and other streaming applications. Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of streaming data.

For the streaming data pipeline on pollution in Flanders, the data was send to Hadoop HDFS and to Apache Kafka. A Flume source captures the data received from the external api.

The python code to send the data to the flume agent using json format. The standard JSONHandler needs a header and a body section. Also the headers of the request requires to specify the content type :

url_flume = 'http://<ip-address>:<port>'
payload = [{'headers': {}, 'body': data_clean }]
headers = {'content-type': 'application/json'}
response = requests.post(url_flume, data=json.dumps(payload), 
            headers=headers)

Every Flume agent has normally one source, a memory channel and a sink. The incoming data can however be sent to more than one sink. For each additional sink, the source needs another memory channel.
A flow multiplexer is defined that can replicate or selectively route an event to one or more channels.

The configuration of the agent to receive and send the data is given below.

# Name the components on this agent
aircheckr1.sources = http_aircheckr
aircheckr1.sinks = hdfs_sink kafka_sink
aircheckr1.channels = channel_hdfs channel_kafka

# Describe/configure the source
aircheckr1.sources.http_aircheckr.type = http
aircheckr1.sources.http_aircheckr.bind = 0.0.0.0
aircheckr1.sources.http_aircheckr.port = 9260

# Describe the sink
aircheckr1.sinks.hdfs_sink.type = hdfs
aircheckr1.sinks.hdfs_sink.hdfs.path = hdfs://192.168.1.242/flume/aircheckr
aircheckr1.sinks.hdfs_sink.hdfs.rollInterval = 86400
aircheckr1.sinks.hdfs_sink.hdfs.rollSize = 0

aircheckr1.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
aircheckr1.sinks.kakfa_sink.kafka.bootstrap.servers = ubuntu238:9092
aircheckr1.sinks.kafka_sink.kafka.topic = aircheckr
aircheckr1.sinks.kafka_sink.flumeBatchSize = 10

# Use a channel which buffers events in memory
aircheckr1.channels.channel_hdfs.type = memory
aircheckr1.channels.channel_hdfs.capacity = 1000
aircheckr1.channels.channel_hdfs.transactionCapacity = 500

aircheckr1.channels.channel_kafka.type = memory
aircheckr1.channels.channel_kafka.capacity = 1000
aircheckr1.channels.channel_kafka.transactionCapacity = 10

# Bind the source and sinks to the channel
aircheckr1.sources.http_aircheckr.channels = channel_hdfs channel_kafka
aircheckr1.sinks.hdfs_sink.channel = channel_hdfs
aircheckr1.sinks.kafka_sink.channel = channel_kafka

Links:
Apache Flume: https://flume.apache.org/