Prediction of Traffic Speed using Spark and Kudu

The open data of traffic in Flanders contains not only data on traffic density, but also on the average speed per minute for 5 vehicle classes. In this blog post and the next blog posts, an attempt is made to create a predictive model for speed estimations (and not traffic density and traffic jams). Mainly during the night, one of the five vehicle classes (no. 2, cars) speeds above 120 km/h are often detected. A more detailed analysis of detected speed will be the subject for a next blog post, but most likely the results will not be different from those observed in The Netherlands. There are two limitations to these data: The speeds that are measured are an average speed during one minute and speeds above 250 km/h are not reliable.

I will start with the creation of a batch regression model based solely on the historical data on the average speed. In later iterations an attempt will be made to make more complex models such as:

    • include the information from previous time periods to make predictions about the current time period.
    • include weather information

The next chart shows average speed per hour for a specific measure point (Boomsesteenweg, Schelle).

Average Speed
Average Speed per hour

The data were retrieved using impala in a pyspark script:

from impala.dbapi import connect

conn = connect(host='kudu_master_ip', port=21050)
cur = conn.cursor()

uid = 2301
sqlStr = "select from_unixtime(cast(time as bigint), 'yyyy-MM-dd') as dy, dayofweek(time) as day_of_week, "
sqlStr+= " case when dayofweek(time) in (6,7) then 1 else 0 end as weekend_day, from_unixtime(cast(time as bigint), 'HH') as hr, "
sqlStr+= " avg(measure_class_2_speed) as avg_speed2, max(measure_class_2_speed) as max_speed2, "
sqlStr+= " from default.kudu_verkeer_stream "
sqlStr+= " where from_unixtime(cast(time as bigint), 'yyyy-MM-dd') >= '2017-01-01' and valid = 0 and uid = " + str(uid)
sqlStr+= " group by "
sqlStr+= " from_unixtime(cast(time as bigint), 'yyyy-MM-dd'), dayofweek(time), from_unixtime(cast(time as bigint), 'HH') "
cur.execute(sqlStr)
sqldata=cur.fetchall()

A simple feature selection is done:

datalist = []
for row in sqldata:
    datarow = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
    datarow[int(row[3])] = 1
    datarow[24] = row[1]
    datarow[25] = row[2]
    datarow[26] = int(row[3])
    datalist.append(LabeledPoint(row[4], datarow))

data=sc.parallelize(datalist)

Next a regression model is created using Spark MLLib.  The Spark ecosystem contains several modules including a library for machine learning, MLLib, graph computations (via GraphX), streaming (real-time calculations), and real-time interactive query processing with Spark SQL and DataFrames.

Most modules have a Python API.  Here I have used the regression algorithms module. A predictive model was created using Ridge regression.

from pyspark.mllib.regression import LabeledPoint, RidgeRegressionWithSGD

lrm = RidgeRegressionWithSGD.train(data, iterations=500, step=0.025, intercept=True, regParam=0.005)

The weights for each feature are:

Weights Value
intercept 4.36571
0h 0.63734
1h 0.10966
2h -0.21648
3h -0.23896
4h 0.15326
5h 0.48998
6h 0.58346
7h 0.76605
8h 0.62572
9h 0.49076
10h 0.42505
11h 0.41065
12h 0.30644
13h 0.21880
14h 0.10592
15h 0.07667
16h 0.02955
17h 0.00451
18h 0.04812
19h 0.01284
20h -0.00428
21h -0.27189
22h -0.48977
23h -0.90398
dow 4.61585
weekend -0.70381
hr 2.52832

I have not created a training, test or validation set here, but I still want to show the result of the prediction (so most likely overfitted):


The same behavior is seen for other measure points (E40, Kortenberg).

Note than this predictive model indicates that the average speed increases during the week, with the lowest average speed on Monday and a steady increase until Sunday.

When the average (and maximum) speed is calculated per day of the week, this trend is clear for the observed data. The chart below shows the speed for a measure point on the E40 at Kortenberg.  The average speed is low on Monday (70 km/h), but gradually increases the next days with the highest average speed recorded on Sunday (95 km/h). The maximum speed does not change that much: 139 km/h during all days, expect Sunday where a slight increase is recorded (143 km/h). However, this might be an artifact of the measurement method, because the average speed per minute is provided which might increase the maximum speed when the traffic density is low.

A second trend is the increase in average and maximum speed during the night

 

Using Superset with Kudu

Apache Superset is a visualization tool, originally developed by Airbnb [1]. The goal to develop this tool is to empower every user of data, especially users for which SQL is often a too high barrier. A second goal is to allow users to explore and discover insights with a user-friendly interface. The solution to both of these challenges is the development of Superset. Only recently it became an Apache project [2].

Once Apache superset is running, a new database connection is made using Impala. The URI has the format impala://<host>:21050/<database>.

Note that first the impala driver needs to be installed (pip install impyla) [3]

Next, a table is created to enable the usage of a dataset inside a slice or dashboard. Because only one data source can be used for each slice, the tables will often be a view of several tables that are joined beforehand. [4] This means if your tables are designed as dimensions and facts, a join between all tables needs to be made when the “table” is defined, or else a single table needs to be materialized that contain all table in the data source.

Once that table/view is defined, columns and metrics need to be defined before it can be used to define a slice.

At last, the slice can be created and added to a dashboard.

Links

[1] Superset: Scaling Data Access and Visual Insights at Airbnb  https://medium.com/airbnb-engineering/superset-scaling-data-access-and-visual-insights-at-airbnb-3ce3e9b88a7f

[2] Apache Superset (incubating) https://superset.incubator.apache.org/

[3] Installation & Configuration http://superset.apache.org/installation.html

[4] FAQ http://superset.apache.org/faq.html