Prediction of Traffic Speed using Spark and Kudu

The open data of traffic in Flanders contains not only data on traffic density, but also on the average speed per minute for 5 vehicle classes. In this blog post and the next blog posts, an attempt is made to create a predictive model for speed estimations (and not traffic density and traffic jams). Mainly during the night, one of the five vehicle classes (no. 2, cars) speeds above 120 km/h are often detected. A more detailed analysis of detected speed will be the subject for a next blog post, but most likely the results will not be different from those observed in The Netherlands. There are two limitations to these data: The speeds that are measured are an average speed during one minute and speeds above 250 km/h are not reliable.

I will start with the creation of a batch regression model based solely on the historical data on the average speed. In later iterations an attempt will be made to make more complex models such as:

    • include the information from previous time periods to make predictions about the current time period.
    • include weather information

The next chart shows average speed per hour for a specific measure point (Boomsesteenweg, Schelle).

Average Speed
Average Speed per hour

The data were retrieved using impala in a pyspark script:

from impala.dbapi import connect

conn = connect(host='kudu_master_ip', port=21050)
cur = conn.cursor()

uid = 2301
sqlStr = "select from_unixtime(cast(time as bigint), 'yyyy-MM-dd') as dy, dayofweek(time) as day_of_week, "
sqlStr+= " case when dayofweek(time) in (6,7) then 1 else 0 end as weekend_day, from_unixtime(cast(time as bigint), 'HH') as hr, "
sqlStr+= " avg(measure_class_2_speed) as avg_speed2, max(measure_class_2_speed) as max_speed2, "
sqlStr+= " from default.kudu_verkeer_stream "
sqlStr+= " where from_unixtime(cast(time as bigint), 'yyyy-MM-dd') >= '2017-01-01' and valid = 0 and uid = " + str(uid)
sqlStr+= " group by "
sqlStr+= " from_unixtime(cast(time as bigint), 'yyyy-MM-dd'), dayofweek(time), from_unixtime(cast(time as bigint), 'HH') "
cur.execute(sqlStr)
sqldata=cur.fetchall()

A simple feature selection is done:

datalist = []
for row in sqldata:
    datarow = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
    datarow[int(row[3])] = 1
    datarow[24] = row[1]
    datarow[25] = row[2]
    datarow[26] = int(row[3])
    datalist.append(LabeledPoint(row[4], datarow))

data=sc.parallelize(datalist)

Next a regression model is created using Spark MLLib.  The Spark ecosystem contains several modules including a library for machine learning, MLLib, graph computations (via GraphX), streaming (real-time calculations), and real-time interactive query processing with Spark SQL and DataFrames.

Most modules have a Python API.  Here I have used the regression algorithms module. A predictive model was created using Ridge regression.

from pyspark.mllib.regression import LabeledPoint, RidgeRegressionWithSGD

lrm = RidgeRegressionWithSGD.train(data, iterations=500, step=0.025, intercept=True, regParam=0.005)

The weights for each feature are:

Weights Value
intercept 4.36571
0h 0.63734
1h 0.10966
2h -0.21648
3h -0.23896
4h 0.15326
5h 0.48998
6h 0.58346
7h 0.76605
8h 0.62572
9h 0.49076
10h 0.42505
11h 0.41065
12h 0.30644
13h 0.21880
14h 0.10592
15h 0.07667
16h 0.02955
17h 0.00451
18h 0.04812
19h 0.01284
20h -0.00428
21h -0.27189
22h -0.48977
23h -0.90398
dow 4.61585
weekend -0.70381
hr 2.52832

I have not created a training, test or validation set here, but I still want to show the result of the prediction (so most likely overfitted):


The same behavior is seen for other measure points (E40, Kortenberg).

Note than this predictive model indicates that the average speed increases during the week, with the lowest average speed on Monday and a steady increase until Sunday.

When the average (and maximum) speed is calculated per day of the week, this trend is clear for the observed data. The chart below shows the speed for a measure point on the E40 at Kortenberg.  The average speed is low on Monday (70 km/h), but gradually increases the next days with the highest average speed recorded on Sunday (95 km/h). The maximum speed does not change that much: 139 km/h during all days, expect Sunday where a slight increase is recorded (143 km/h). However, this might be an artifact of the measurement method, because the average speed per minute is provided which might increase the maximum speed when the traffic density is low.

A second trend is the increase in average and maximum speed during the night

 

Real-time traffic data

The Roads and Traffic Agency (Agentschap Wegen en Verkeer) [2] and the Flemish Traffic Centre (Vlaams Verkeerscentrum) [3] provide open data about the current traffic situation [1]. More of 4000 measurement points dispersed all over Flanders – but mainly located on highways, detect each minute the number of vehicles and their speed.

We use the same front-end and data pipeline as presented for the pollution data [4]. Because the amount of data is pretty big (more than 4000 measurements each minute), only a small selection of measurement points (~ 100) is captured every 10 minutes by an edge node and send to flume for further processing [5]. Two streams are captured, one with the list of measurement points and the second with the real-time traffic data (see figure).

Real Time Traffic Data Pipeline
Real Time Traffic Data Pipeline

The traffic data are send to Apache Kafka [6] and two topics are created. The topic with measurement points are stored in Apache Hive [7] to have a structured query interface for further processing. The topic with the real-time traffic data is consumed with a python script and enriched with the measurement points stored in Hive to create a new Kafka topic with the enriched traffic data (see figure). This new topic is read by Druid [8] for visualization and data discovery using several tools such as Apache Superset [9].

The two other Kafka topics are send to Kudu tables which are used for the visualizations (see live map here).

 

[1] Meten-in-Vlaanderen (MIV) http://opendata.vlaanderen.be/dataset/minuutwaarden-verkeersmetingen-vlaanderen

[2] Agentschap Wegen en Verkeer http://www.wegenenverkeer.be/

[3] Vlaams Verkeerscentrum http://www.verkeerscentrum.be/

[4] Map of pollution in Flanders http://www.bigdatareflections.net/blog/?p=1

[5] Sending data to Flume using Python http://www.bigdatareflections.net/blog/?p=35

[6] Apache Kafka https://kafka.apache.org/

[7] Apache Hive https://hive.apache.org/

[8] Druid http://druid.io/

[9] Using Superset with Kudu http://www.bigdatareflections.net/blog/?p=52

 

 

Uses cases for Kudu

The big data landscape was until 1-3 years ago dominated by several storage systems, the first was Hadoop HDFS and later followed by Apache HBase, a NoSQL database. HDFS is great for high-speed writes and scans while the latter is well suited for random-access queries. A new storage engine, Apache Kudu tries to bridge the gap between those two uses cases.  Apache Kudu is a distributed, columnar database for structured, real-time data. Because Kudu has a schema, it is only suited for structured data, contrary to HBase which is schemaless. The data model resembles that of more traditional databases such as SybaseIQ or SAP Hana which uses columnar storage, that makes it well suited for OLAP queries. It is not an in-memory database such as SAP Hana, but uses persistent memory integrated in the block cache.

Because Apache Kudu allows both fast scan and random access, a big data architecture can be much simplified to address any analytics and business intelligence use case.  Kudu enables the use of the same storage engine for large scale batch jobs and complex data processing jobs that require fast random access and updates. As a result, applications that require both batch as well as real-time data processing capabilities can use Kudu for both types of workloads.

Another step in the data pipeline: from Kafka to Kudu

The data pipeline described in other post (link) gave an overview of the complete data flow from external api to visualization.

Streaming Data Pipeline

A Kafka consumer is created using the Python Client for Kafka and the python Kudu interfaces allows the data to be sent to a Kudu table.

The Kafka consumers is created using a consumer group id which allows offset management to be handled by Kafka.

from kafka import KafkaConsumer
import kudu
import json
import datetime

consumer = KafkaConsumer('aircheckr_city', bootstrap_servers='192.168.1.238:9092', group_id='test_kudu', auto_offset_reset='earliest')
while True:
    msg=next(consumer)

To be fault-tolerant, committing offsets is mandatory.

….
consumer.commit_async()

Before starting the infinite loop, a connection to the Kudu master is established:

client = kudu.connect(host='ubuntu243', port=7051)

When a message is read from the kafka topic, the records is send to Kudu (after transformation – not shown):

    table = client.table('impala::default.kudu_aircheckr_stream')
    session = client.new_session()
    op = table.new_insert(dictval)
    session.apply(op)
    try:
        session.flush()
    except kudu.KuduBadStatus as e:
        print("kudu bad status during flush")
        print(session.get_pending_errors())

Note the naming conventions for Kudu tables that are created via impala: impala::database_name.table_name.

Note that this is a solution for low volumes because the python script is executed on the node where the script is installed and not the complete cluster. If execution on the cluster is needed other solution such as spark streaming need to be considered.

 

Links

Python client for Apache Kafka: https://github.com/dpkp/kafka-python

Python interface for Apache Kudu: https://github.com/cloudera/kudu/tree/master/python

Using Kafka Consumer Groups: https://docs.confluent.io/current/clients/consumer.html

Kudu tables naming conventions: https://www.cloudera.com/documentation/enterprise/5-11-x/topics/impala_tables.html#kudu_tables