The open data of traffic in Flanders contains not only data on traffic density, but also on the average speed per minute for 5 vehicle classes. In this blog post and the next blog posts, an attempt is made to create a predictive model for speed estimations (and not traffic density and traffic jams). Mainly during the night, one of the five vehicle classes (no. 2, cars) speeds above 120 km/h are often detected. A more detailed analysis of detected speed will be the subject for a next blog post, but most likely the results will not be different from those observed in The Netherlands. There are two limitations to these data: The speeds that are measured are an average speed during one minute and speeds above 250 km/h are not reliable.
I will start with the creation of a batch regression model based solely on the historical data on the average speed. In later iterations an attempt will be made to make more complex models such as:
- include the information from previous time periods to make predictions about the current time period.
- include weather information
The next chart shows average speed per hour for a specific measure point (Boomsesteenweg, Schelle).
The data were retrieved using impala in a pyspark script:
from impala.dbapi import connect conn = connect(host='kudu_master_ip', port=21050) cur = conn.cursor() uid = 2301 sqlStr = "select from_unixtime(cast(time as bigint), 'yyyy-MM-dd') as dy, dayofweek(time) as day_of_week, " sqlStr+= " case when dayofweek(time) in (6,7) then 1 else 0 end as weekend_day, from_unixtime(cast(time as bigint), 'HH') as hr, " sqlStr+= " avg(measure_class_2_speed) as avg_speed2, max(measure_class_2_speed) as max_speed2, " sqlStr+= " from default.kudu_verkeer_stream " sqlStr+= " where from_unixtime(cast(time as bigint), 'yyyy-MM-dd') >= '2017-01-01' and valid = 0 and uid = " + str(uid) sqlStr+= " group by " sqlStr+= " from_unixtime(cast(time as bigint), 'yyyy-MM-dd'), dayofweek(time), from_unixtime(cast(time as bigint), 'HH') " cur.execute(sqlStr) sqldata=cur.fetchall()
A simple feature selection is done:
datalist =  for row in sqldata: datarow = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] datarow[int(row)] = 1 datarow = row datarow = row datarow = int(row) datalist.append(LabeledPoint(row, datarow)) data=sc.parallelize(datalist)
Next a regression model is created using Spark MLLib. The Spark ecosystem contains several modules including a library for machine learning, MLLib, graph computations (via GraphX), streaming (real-time calculations), and real-time interactive query processing with Spark SQL and DataFrames.
from pyspark.mllib.regression import LabeledPoint, RidgeRegressionWithSGD lrm = RidgeRegressionWithSGD.train(data, iterations=500, step=0.025, intercept=True, regParam=0.005)
The weights for each feature are:
I have not created a training, test or validation set here, but I still want to show the result of the prediction (so most likely overfitted):
The same behavior is seen for other measure points (E40, Kortenberg).
Note than this predictive model indicates that the average speed increases during the week, with the lowest average speed on Monday and a steady increase until Sunday.
When the average (and maximum) speed is calculated per day of the week, this trend is clear for the observed data. The chart below shows the speed for a measure point on the E40 at Kortenberg. The average speed is low on Monday (70 km/h), but gradually increases the next days with the highest average speed recorded on Sunday (95 km/h). The maximum speed does not change that much: 139 km/h during all days, expect Sunday where a slight increase is recorded (143 km/h). However, this might be an artifact of the measurement method, because the average speed per minute is provided which might increase the maximum speed when the traffic density is low.
A second trend is the increase in average and maximum speed during the night