Hybrid Transactional/Analytical Processing (HTAP)

It is common to divide data management environments in two classes, one being online transaction processing (OLTP), the other class is online analytical processing (OLAP). OLTP is characterized by large numbers of online transactions (create, replace, update, delete or CRUD operations). These transactions should happen very fast; applications running on top of a OLTP database often require instant responses. OLAP on the other hand is more concerned with historical data on which complex analytical processes need to run. The volumes of the transactions are much lower compared to OLTP, but the transactions are often very complex and involve larger amount of data. OLAP applications typically use machine learning techniques and the data are stored in a multi-dimensional schema or star schema.

Because the same database instance is not able to support both activities with opposing demands, separate databases are setup, one database for OLTP, the other database supporting the analytical type of processes (OLAP). But the consequence of this segregation is that a constant dataflow needs to exist from the OLTP database to the OLAP database. This dataflow is called ETL (extract, transform and load). The ETL process will store the data from the OLTP database in another isolated environment such as a data warehouse or lately often in a data lake. Often a second ETL will move data from the data warehouse to another data store, called the data mart on which reports and ad-hoc queries are performed. Data warehouses rely on traditional RDBMS platforms, and the dimensional models in data marts are aligned to support the performance levels in accordance with the frequent and complex business analysis queries.

This process worked well until now because the reports were needed one day later. This time window was needed by the ETL process to move data from the source application to the data warehouses and the data marts. But lately there an increased demand for real-time reporting systems with the emergence of streaming data from sensors (IoT, internet of things), machine to machine data, social media and other operational data. On top of these new type of data, real-time analytics is required to provide recommendations to users and more personalized responses to any user interaction.

New database technologies have emerged the latest years that support hybrid workloads within one database instance. These new database systems take advantage of the new hardware technologies such as sold-state disks (SSD) and cost reductions in RAM memory. Gartner coined the term “Hybrid Transactional and Analytical Processing” (HTAP) to describe this new type of databases [3]. These database technologies allow for transactional and analytical processing to execute at the same time and on the same data [2]. Consequently, real-time processing and reports are possible at the same time that the transactions occur. Another term used to describe this type of processing is transactional analytics. The term indicates that insight and decision take place instantaneously with a transaction, e.g. at the moment of engagement with the customer, partner or device.

For example, a customer wants to buy a tablet in an online shop. During the selection process, the customer will get personalized recommendations in real time based on his previous and current surfing behavior. Once a tablet is put in the shopping basket and the payment transaction is initiated, fraud analytics is initiated in real time which is another transactional analytics process. In less than a minute all customer interactions are finalized with the aim to increase customer satisfaction and revenue. Note that these functions, fraud detection and recommendation systems exist today, but it requires a heterogenous and complex architecture. HTAP will simplify the architecture dramatically because all data are available in the same database and all analytical processes will occur in the same environment.

Traditional data architectures separate the transactions from the analytics which run on separate systems, each optimized for the particular type of processes and requirements. Data is copied from one system to the other one and those integration data pipelines will only add more complexity.

When we compare current architecture with a HTAP based architecture, the following differences are apparent:

  • Complex architecture. Current data architectures are complex because data is moved from the transactional systems to the analytical systems using ETL and other integration tools to move and transform the data. Also, enterprise service buses (ESB) are used to manage this complex architecture. These tools are not required anymore if transactional and analytical processes occur in the same system.
  • Real-time analytics. Because data is moved from the transactional to the analytical systems, a large time gap exists between the moment the transactions occurred and the moment these transactions can be analyzed.
  • Data duplication. Copying data from one system to the other system not only duplicates the data, but every complex ETL process might introduce data quality problems which leads to inconsistencies in the reporting.

There are today several database implementations that use one or more HTAP features [1] such SAP HANA, MemSQL, IBM dashDB, Hyper, Apache Kudu, ArangoDB, Aerospike, etc.

 

[1] Hybrid Transactional/Analytical Processing: A Survey, Fatma Özcan et al. SIGMOD ’17, May 14–19, 2017, Chicago, IL, USA.

[2] Hybrid transactional/analytical processing (HTAP)  https://en.wikipedia.org/wiki/Hybrid_transactional/analytical_processing_(HTAP)

[3] Hybrid Transaction/Analytical Processing Will Foster Opportunities for Dramatic Business Innovation. Pezzini, Massimo et al.  Gartner. 28 January 2014

Prediction of Traffic Speed using Spark and Kudu

The open data of traffic in Flanders contains not only data on traffic density, but also on the average speed per minute for 5 vehicle classes. In this blog post and the next blog posts, an attempt is made to create a predictive model for speed estimations (and not traffic density and traffic jams). Mainly during the night, one of the five vehicle classes (no. 2, cars) speeds above 120 km/h are often detected. A more detailed analysis of detected speed will be the subject for a next blog post, but most likely the results will not be different from those observed in The Netherlands. There are two limitations to these data: The speeds that are measured are an average speed during one minute and speeds above 250 km/h are not reliable.

I will start with the creation of a batch regression model based solely on the historical data on the average speed. In later iterations an attempt will be made to make more complex models such as:

    • include the information from previous time periods to make predictions about the current time period.
    • include weather information

The next chart shows average speed per hour for a specific measure point (Boomsesteenweg, Schelle).

Average Speed
Average Speed per hour

The data were retrieved using impala in a pyspark script:

from impala.dbapi import connect

conn = connect(host='kudu_master_ip', port=21050)
cur = conn.cursor()

uid = 2301
sqlStr = "select from_unixtime(cast(time as bigint), 'yyyy-MM-dd') as dy, dayofweek(time) as day_of_week, "
sqlStr+= " case when dayofweek(time) in (6,7) then 1 else 0 end as weekend_day, from_unixtime(cast(time as bigint), 'HH') as hr, "
sqlStr+= " avg(measure_class_2_speed) as avg_speed2, max(measure_class_2_speed) as max_speed2, "
sqlStr+= " from default.kudu_verkeer_stream "
sqlStr+= " where from_unixtime(cast(time as bigint), 'yyyy-MM-dd') >= '2017-01-01' and valid = 0 and uid = " + str(uid)
sqlStr+= " group by "
sqlStr+= " from_unixtime(cast(time as bigint), 'yyyy-MM-dd'), dayofweek(time), from_unixtime(cast(time as bigint), 'HH') "
cur.execute(sqlStr)
sqldata=cur.fetchall()

A simple feature selection is done:

datalist = []
for row in sqldata:
    datarow = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
    datarow[int(row[3])] = 1
    datarow[24] = row[1]
    datarow[25] = row[2]
    datarow[26] = int(row[3])
    datalist.append(LabeledPoint(row[4], datarow))

data=sc.parallelize(datalist)

Next a regression model is created using Spark MLLib.  The Spark ecosystem contains several modules including a library for machine learning, MLLib, graph computations (via GraphX), streaming (real-time calculations), and real-time interactive query processing with Spark SQL and DataFrames.

Most modules have a Python API.  Here I have used the regression algorithms module. A predictive model was created using Ridge regression.

from pyspark.mllib.regression import LabeledPoint, RidgeRegressionWithSGD

lrm = RidgeRegressionWithSGD.train(data, iterations=500, step=0.025, intercept=True, regParam=0.005)

The weights for each feature are:

Weights Value
intercept 4.36571
0h 0.63734
1h 0.10966
2h -0.21648
3h -0.23896
4h 0.15326
5h 0.48998
6h 0.58346
7h 0.76605
8h 0.62572
9h 0.49076
10h 0.42505
11h 0.41065
12h 0.30644
13h 0.21880
14h 0.10592
15h 0.07667
16h 0.02955
17h 0.00451
18h 0.04812
19h 0.01284
20h -0.00428
21h -0.27189
22h -0.48977
23h -0.90398
dow 4.61585
weekend -0.70381
hr 2.52832

I have not created a training, test or validation set here, but I still want to show the result of the prediction (so most likely overfitted):


The same behavior is seen for other measure points (E40, Kortenberg).

Note than this predictive model indicates that the average speed increases during the week, with the lowest average speed on Monday and a steady increase until Sunday.

When the average (and maximum) speed is calculated per day of the week, this trend is clear for the observed data. The chart below shows the speed for a measure point on the E40 at Kortenberg.  The average speed is low on Monday (70 km/h), but gradually increases the next days with the highest average speed recorded on Sunday (95 km/h). The maximum speed does not change that much: 139 km/h during all days, expect Sunday where a slight increase is recorded (143 km/h). However, this might be an artifact of the measurement method, because the average speed per minute is provided which might increase the maximum speed when the traffic density is low.

A second trend is the increase in average and maximum speed during the night

 

Flume monitoring

The Flume agents can be monitored individually by adding two parameters:

flume-ng agent -n agent_name -c conf -f conf config.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=19256

The parameters flume.monitoring.type=http and flume.monitoring.port=24105 enable JSON monitoring.

The metrics are retrieved with following URL: http://<ip-address-agent:24105/metrics

Example of a response:

{
    "SOURCE.http_traffic":{"OpenConnectionCount":"0","Type":"SOURCE","AppendBatchAcceptedCount":"2561700","AppendBatchReceivedCount":"2561700","EventAcceptedCount":"2561700","AppendReceivedCount":"0","StopTime":"0","StartTime":"1504012941615","EventReceivedCount":"2561700","AppendAcceptedCount":"0"},
    "SINK.k4":{"Type":"SINK","ConnectionClosedCount":"0","EventDrainSuccessCount":"2561700","KafkaEventSendTimer":"17461960","ConnectionFailedCount":"0","BatchCompleteCount":"0","EventDrainAttemptCount":"0","ConnectionCreatedCount":"0","BatchEmptyCount":"679409","StopTime":"0","RollbackCount":"0","StartTime":"1504012941570","BatchUnderflowCount":"3942"},
    "CHANNEL.c4":{"EventPutSuccessCount":"2561700","ChannelFillPercentage":"0.0","Type":"CHANNEL","StopTime":"0","EventPutAttemptCount":"2561700","ChannelSize":"0","StartTime":"1504012941382","EventTakeSuccessCount":"2561700","ChannelCapacity":"5000","EventTakeAttemptCount":"3245052"},
    "CHANNEL.c1":{"EventPutSuccessCount":"2561700","ChannelFillPercentage":"0.0","Type":"CHANNEL","StopTime":"0","EventPutAttemptCount":"2561700","ChannelSize":"0","StartTime":"1504012941382","EventTakeSuccessCount":"2561700","ChannelCapacity":"5000","EventTakeAttemptCount":"3242098"},
    "CHANNEL.c3":{"EventPutSuccessCount":"2561700","ChannelFillPercentage":"0.0","Type":"CHANNEL","StopTime":"0","EventPutAttemptCount":"2561700","ChannelSize":"0","StartTime":"1504012941381","EventTakeSuccessCount":"2561700","ChannelCapacity":"5000","EventTakeAttemptCount":"3245036"},
    "SINK.k3":{"BatchCompleteCount":"22260","ConnectionFailedCount":"15","EventDrainAttemptCount":"2561701","ConnectionCreatedCount":"2228","Type":"SINK","BatchEmptyCount":"679389","ConnectionClosedCount":"2223","EventDrainSuccessCount":"2561700","StopTime":"0","StartTime":"1504012941383","BatchUnderflowCount":"3942"}
}

The source metrics are listen in the next table.

Table: Source metrics
Metric Description
EventReceivedCount The total number of events that the source has received until now.
EventAcceptedCount The total number of events where the event was successfully written out to the channel and the source returned success to the sink/RPC client/system that created the event.
AppendReceivedCount The total number of events that came in with only one event per batch (the equivalent of an append call in RPC calls).
AppendAcceptedCount The total number of events that came in individually that were written to the channel and returned successfully.
AppendBatchReceivedCount The total number of batches of events received.
AppendBatchAcceptedCount The total number of batches successfully committed to the channel.
StartTime Milliseconds since the epoch when the source was started.
StopTime Milliseconds since the epoch when the source was stopped.
OpenConnectionCount The number of connections currently open with clients/sinks (only an Avro Source currently exposes this). Type For sources, this always returns SOURCE.

The next table gives more information on the channel metrics.

Table: Channel metrics
Metric Description
ChannelSize The total number of events currently in the channel.
EventPutAttemptCount The total number of events the source(s) attempted to write to the channel.
EventPutSuccessCount The total number of events that were successfully written and committed to the channel.
EventTakeAttemptCount The total number of times the sink(s) attempted to read events from the channel. This does not mean that events were returned each time, since sinks might poll and the channel might not have any data.
EventTakeSuccessCount The total number of events that were successfully taken by the sink(s).
StartTime Milliseconds since the epoch when the channel was started.
StopTime Milliseconds since the epoch when the channel was stopped.
ChannelCapacity The capacity of the channel.
ChannelFillPercentage The percentage of the channel that is full. Type For channels, this always returns CHANNEL.

The Sink metrics given are:

Table: Sink metrics
Metric Description
ConnectionCreatedCount The number of connections created with the next hop or storage system (like when a new file is created on HDFS).
ConnectionClosedCount The number of connections closed with the next hop or storage system (like when a file on HDFS is closed).
ConnectionFailedCount The number of connections that were closed due to an error with the next hop or storage system (like when a new file on HDFS is closed because of timeouts).
BatchEmptyCount The number of batches that were empty—a high number indicates that the sources are writing data slower than the sinks are clearing it.
BatchUnderflowCount The number of batches that were smaller than the maximum batch size this sink is configured to use—this also indicates sinks are faster than sources if it’s high.
BatchCompleteCount The number of batches that were equal to the maximum batch size.
EventDrainAttemptCount The total number of events the sink tried to write out to storage.
EventDrainSuccessCount The total number of events that the sink successfully wrote out to storage.
StartTime Milliseconds since the epoch when the sink was started.
StopTime Milliseconds since the epoch when the sink was stopped. Type For sinks, this always returns SINK.

Note that Flume monitoring is also available with Cloudera Manager or Hortonworks Ganglia.

The real-time data mart and its integration in an enterprise data warehouse

It are challenging times; not only is there higher volumes on data to handle, and most predictions expect are an exponential increase, but more requests are received to have real-time or near real-time data. The value of real-time business data decreases when it gets older, but at the same time there is a higher demand on integrated and high-quality data which requires more processing power.

The main uses cases for real-time data marts are [2]:

  • Operational reporting and dashboards: Operational data that needs to be visualized for immediate actions using business intelligence tools.
  • Query offloading: replication of high-cost or legacy OLTP databases to another infrastructure of easy querying without impacts on the applications.
  • Real-time analytics: the data and the results of any predictive analytics process require often that actions are taken with the shortest delay possible, for example fraud detection.

There are large differences between the requirements for a real-time data mart and those for an enterprise data warehouse [6]. The following table lists some of the requirements of each data store:

  Real-time (RT) data mart Enterprise data warehouse (EDW)
Description Data is available in reports and dashboards from several source systems after a few seconds or a few minutes the latest. Data is available in reports and dashboards from ALL source systems next day or later. There are integrated and of high quality.
Latency Seconds/minutes Days/months
Capture Event-based/streaming Batch
Processing requirements Moderate High
Target loading impact Low impact High
User access/queries Yes No (used to feed the user access layer with data marts)

The techniques used vary mostly on the latency of data integration, from daily/monthly batches to streaming real-time integration. The capture of data from the application sources can be performed through queries that filter based on a timestamp or a flag, or through a change data capture (CDC) mechanism that detects any changes as it is happening. In case of streaming, the events are captured as they occur and are immediately integrated in the data mart. With a batch process, changes after a specified period are detected, rather than events. A daily batch mechanism is most suitable if intra-day freshness is not required for the data, such as longer-term trends or data that is only calculated once daily, for example financial close information. Batch loads might be performed in a downtime window, if the business model doesn’t require 24-hr availability of the data warehouse.

A well-known architecture to solve these conflicting requirements is the lambda architecture [3]. This architecture takes advantage of both batch- and stream-processing methods.

Lambda Architecture
Lambda Architecture

A more recent approach is the Kappa architecture [4], which removes the batch layer from the lambda architecture. It has many advantages, such as the need to write and maintain only code for the streaming layer. There are no two group of codes or frameworks; one for the batch and another for the streaming layer.

Kappa Architecture
Kappa Architecture

The kappa architecture treats a batch as a special case of a stream. A batch is a data set with a start and an end (bounded), while a stream has no start or end and is infinite (unbounded). Because a batch is a bounded stream, one can conclude that batch processing is a subset of stream processing.  Hence, the Lambda batch layer results can also be obtained by using a streaming engine. This simplification reduces the architecture to a single streaming engine capable of ingesting the needed volumes of data to handle both batch and real-time processing. Overall system complexity significantly decreases with Kappa architecture.

The streams (real-time and batch) are send to the serving back-end after processing. This serving back-end can contain the several components of a BI stack such as a data warehouse or a real-time data mart.

The BI architecture for the Serving Layer is shown in the next figure.

Serving Layer components
Serving Layer components

The transactions that are needed for real-time data marts are sent immediately to the data marts, while all other transactions (and referential data) are sent the data warehouse for long-term storage. The real-time transactions are sent in a second step to the data warehouse which makes that all data are present in the data warehouse. Note that this design pattern is also presented for the operational data store [1].

There will be many real-time data marts for a specific use case or business channel, but the data marts are should all use the same platform as the data warehouse and be integrated as described before. Else tactical/operational decision and strategic decision support will not be based on the same data which will lead to inconsistencies and poor execution. [5]

[1] Operational data store https://en.wikipedia.org/wiki/Operational_data_store

[2] Best Practices for Real-Time Data Warehousing  http://www.oracle.com/us/products/middleware/data-integration/realtime-data-warehousing-bp-2167237.pdf

[3] Lambda Architecture http://lambda-architecture.net/

[4] Understand Kappa Architecture in 2 minutes http://dataottam.com/2016/06/02/understand-kappa-architecture-in-2-minutes/

[5] Ten Mistakes to Avoid When Constructing a Real-Time Data Warehouse http://www.bi-bestpractices.com/view-articles/4766

[6] What is operational data store vs. data warehouse technology? http://searchdatamanagement.techtarget.com/answer/What-is-an-operational-data-store-vs-a-data-warehouse