Using Spark for Data Profiling or Exploratory Data Analysis

Data profiling is the process of examining the data available in an existing data source (e.g. a database or a file) and collecting statistics and information about that data. The purpose of these statistics may be to find out whether existing data can easily be used for other purposes. 

Before any dataset is used for advanced data analytics, an exploratory data analysis (EDA) or data profiling step is necessary. This is an ideal solution for datasets containing personal data because only aggregated data are shown. The Social-3 Personal Data Framework provides metadata and data profiling information of each available dataset. One of the earliest steps after the data ingestion step is the automated creation of a data profile.

Exploratory data analysis (EDA) or data profiling can help assess which data might be useful and reveals the yet unknown characteristics of such new dataset including data quality and data transformation requirements before data analytics can be used.

Data consumers can browse and get insight in the available datasets in the data lake of the Social-3 Personal Data Framework and can make informed decision on their usage and privacy requirements. The Social-3 Personal Data Framework contains a data catalogue that allows data consumers to select interesting datasets and put them in a “shopping basket” to indicate which datasets they want to use and how they want to use them.

Before using a dataset with any algorithm it is essential to understand how the data looks like and what are the edge cases and distribution of each attribute. Questions that need to be answered are related to the distribution of the attributes (columns of the table), the completeness or the missing data.

EDA can in a subsequent cleansing step be translated into constraints or rules that are then enforced. For instance, after discovering that the most frequent pattern for phone numbers is (ddd)ddd-dddd, this pattern can be promoted to the rule that all phone numbers must be formatted accordingly. Most cleansing tools can then either transform differently formatted numbers or at least mark them as violations.

Most of the EDA provides summary statistics for each attribute independently. However, some are based on pairs of attributes or multiple attributes. Data profiling should address following topics:

  • Completeness: How complete is the data? What percentage of records has missing or null values?
  • Uniqueness: How many unique values does an attribute have? Does an attribute that is supposed to be unique key, have all unique values?
  • Distribution: What is the distribution of values of an attribute?
  • Basic statistics: The mean, standard deviation, minimum, maximum for numerical attributes.
  • Pattern matching: What patterns are matched by data values of an attribute?
  • Outliers: Are there outliers in the numerical data?
  • Correlation: What is the correlation between two given attributes? This kind of profiling may be important for feature analysis prior to building predictive models.
  • Functional dependency: Is there functional dependency between two attributes?

The advantages of EDA can be summarized as:

  • Find out what is in the data before using it
  • Get data quality metrics
  • Get an early assessment on the difficulties in creating business rules
  • Input the a subsequent cleansing step
  • Discover value patterns and distributions
  • Understanding data challenges early to avoid delays and cost overruns
  • Improve the ability to search the data

Data volumes can be so large that traditional EDA or data profiling, using for example a python script, for computing descriptive statistics become intractable. But even with scalable infrastructure like Hadoop, aggressive optimization and statistical approximation techniques must sometimes be used.

However, using Spark for data profiling or EDA might provide enough capabilities to compute summary statistics on very large datasets.

Exploratory data analysis or data profiling are typical steps performed using Python and R, but since Spark has introduced dataframes, it will be possible to do the exploratory data analysis step in Spark, especially for the larger datasets.

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

The data are stored in RDDs (with schema), which means you can also process the dataframes with the original RDD APIs, as well as algorithms and utilities in MLLib.

One of the useful functions for Spark Dataframes is the describe method. It returns summary statistics for numeric columns in the source dataframe. The summary statistics includes min, max, count, mean and standard deviations. It takes the names of one or more columns as arguments.

The above results provides information about missing data (e.g. a StartYear = 0, or an empty StopYear) and the type and range of data. Also notice that numeric calculations are sometimes made on a non-numeric field such as the ICD9Code.

The most basic form of data profiling is the analysis of individual columns in a given table. Typically, generated metadata comprises various counts, such as the number of values, the number of unique values, and the number of non-null values. Following statistics are calculated:

Statistics Description
Count Using the Dataframe describe method
Average Using the Dataframe describe method
Minimum Using the Dataframe describe method
Maximum Using the Dataframe describe method
Standard deviation Using the Dataframe describe method
Missing values Using the Dataframe filter method
Density Ratio calculation
Min. string length Using the Dataframe expr, groupBy, agg, min, max, avg methods
Max. string length Using the Dataframe expr, groupBy, agg, min, max, avg methods
# uniques values Using the Dataframe distinct and count methods
Top 100 of most frequent values Using the Dataframe groupBy, count, filter, orderBy, limit methods

 

This article was orginally posted at : http://www.social-3.com/solutions/personal_data_profiling.php

 

 

Prediction of Traffic Speed using Spark and Kudu

The open data of traffic in Flanders contains not only data on traffic density, but also on the average speed per minute for 5 vehicle classes. In this blog post and the next blog posts, an attempt is made to create a predictive model for speed estimations (and not traffic density and traffic jams). Mainly during the night, one of the five vehicle classes (no. 2, cars) speeds above 120 km/h are often detected. A more detailed analysis of detected speed will be the subject for a next blog post, but most likely the results will not be different from those observed in The Netherlands. There are two limitations to these data: The speeds that are measured are an average speed during one minute and speeds above 250 km/h are not reliable.

I will start with the creation of a batch regression model based solely on the historical data on the average speed. In later iterations an attempt will be made to make more complex models such as:

    • include the information from previous time periods to make predictions about the current time period.
    • include weather information

The next chart shows average speed per hour for a specific measure point (Boomsesteenweg, Schelle).

Average Speed
Average Speed per hour

The data were retrieved using impala in a pyspark script:

from impala.dbapi import connect

conn = connect(host='kudu_master_ip', port=21050)
cur = conn.cursor()

uid = 2301
sqlStr = "select from_unixtime(cast(time as bigint), 'yyyy-MM-dd') as dy, dayofweek(time) as day_of_week, "
sqlStr+= " case when dayofweek(time) in (6,7) then 1 else 0 end as weekend_day, from_unixtime(cast(time as bigint), 'HH') as hr, "
sqlStr+= " avg(measure_class_2_speed) as avg_speed2, max(measure_class_2_speed) as max_speed2, "
sqlStr+= " from default.kudu_verkeer_stream "
sqlStr+= " where from_unixtime(cast(time as bigint), 'yyyy-MM-dd') >= '2017-01-01' and valid = 0 and uid = " + str(uid)
sqlStr+= " group by "
sqlStr+= " from_unixtime(cast(time as bigint), 'yyyy-MM-dd'), dayofweek(time), from_unixtime(cast(time as bigint), 'HH') "
cur.execute(sqlStr)
sqldata=cur.fetchall()

A simple feature selection is done:

datalist = []
for row in sqldata:
    datarow = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
    datarow[int(row[3])] = 1
    datarow[24] = row[1]
    datarow[25] = row[2]
    datarow[26] = int(row[3])
    datalist.append(LabeledPoint(row[4], datarow))

data=sc.parallelize(datalist)

Next a regression model is created using Spark MLLib.  The Spark ecosystem contains several modules including a library for machine learning, MLLib, graph computations (via GraphX), streaming (real-time calculations), and real-time interactive query processing with Spark SQL and DataFrames.

Most modules have a Python API.  Here I have used the regression algorithms module. A predictive model was created using Ridge regression.

from pyspark.mllib.regression import LabeledPoint, RidgeRegressionWithSGD

lrm = RidgeRegressionWithSGD.train(data, iterations=500, step=0.025, intercept=True, regParam=0.005)

The weights for each feature are:

Weights Value
intercept 4.36571
0h 0.63734
1h 0.10966
2h -0.21648
3h -0.23896
4h 0.15326
5h 0.48998
6h 0.58346
7h 0.76605
8h 0.62572
9h 0.49076
10h 0.42505
11h 0.41065
12h 0.30644
13h 0.21880
14h 0.10592
15h 0.07667
16h 0.02955
17h 0.00451
18h 0.04812
19h 0.01284
20h -0.00428
21h -0.27189
22h -0.48977
23h -0.90398
dow 4.61585
weekend -0.70381
hr 2.52832

I have not created a training, test or validation set here, but I still want to show the result of the prediction (so most likely overfitted):


The same behavior is seen for other measure points (E40, Kortenberg).

Note than this predictive model indicates that the average speed increases during the week, with the lowest average speed on Monday and a steady increase until Sunday.

When the average (and maximum) speed is calculated per day of the week, this trend is clear for the observed data. The chart below shows the speed for a measure point on the E40 at Kortenberg.  The average speed is low on Monday (70 km/h), but gradually increases the next days with the highest average speed recorded on Sunday (95 km/h). The maximum speed does not change that much: 139 km/h during all days, expect Sunday where a slight increase is recorded (143 km/h). However, this might be an artifact of the measurement method, because the average speed per minute is provided which might increase the maximum speed when the traffic density is low.

A second trend is the increase in average and maximum speed during the night