The implementation of the working hour monitoring example demonstrates how Flink applications operate with state and time, the core ingredients of any slightly advanced stream processing application. Flink provides many features for working with of state and time, such as support for processing and event time. In addition, Flink provides several strategies for dealing with late records, different types of state, an efficient checkpointing mechanism to guarantee exactly-once state consistency, and many unique features centered around the concept of “savepoints,” just to name a few. The combination of all of these features results in a stream processor that provides the flexibility that is required to build very sophisticated event-driven applications and run them at scale and with operational ease.
Analyzing data streams with SQL in Flink
The previous example showed the expressiveness and low-level control that you have at hand when implementing applications with Flink’s DataStream API and ProcessFunctions
. However, a large fraction of applications (and subtasks of more advanced applications) have very similar requirements and do not need this level of expressiveness. They can be defined more concisely and conveniently using SQL, the standard language for data processing and analytics.
Flink features unified SQL support for batch and streaming data. For a given query Flink computes the same result regardless of whether it is executed on a stream of continuously ingested records or on a bounded set of records, given that the stream and data set provide the same data. Flink supports the syntax and semantics of ANSI SQL; it does not define a language that looks similar to SQL and comes with proprietary syntax and semantics.
Supporting standard SQL as an API for unified batch and stream processing provides many advantages and benefits. Users who are familiar with standard SQL immediately know how to write queries and what results to expect. Queries can be applied to bounded and unbounded data, as well as recorded and real-time streams. Therefore, batch-stream unification enables use cases such as bootstrapping state from historical data and backfilling results to incorporate late data or to compensate for outages—all without query modifications. It’s also easy to perform data exploration on a small static data set, and later run the same query on a continuous stream.
So, how does Flink evaluate SQL queries on streams? In principle, running a SQL query on a stream of data is the same as maintaining a materialized view. Materialized views are a well-known feature of relational database systems that are commonly used to speed up analytical workloads. They can be understood as a hybrid of regular virtual views and indexes. A materialized view is defined as a query (similar to a regular view), but the query result is immediately computed, stored as a table on disk or in memory, and automatically updated when the view’s base tables change (similar to an index). When planning a query, the query optimizer aims to rewrite queries to read pre-computed results from a materialized view instead of computing them again.
The similarity between materialized view maintenance and the evaluation of SQL queries on streams should become clear when we treat the continuous SQL query as the query defining a materialized view, the input stream of a SQL query as a stream of changes that are applied on the view’s base table, and the result of the continuous query as the stream of changes to update the materialized view. Hence, evaluating a SQL query on a stream is the same as maintaining a materialized view. Basically, Flink maintains materialized views by running incremental SQL queries on a distributed, stateful stream processor.
We can appreciate the elegance and power of Flink’s SQL support by taking a look at two queries. The first query computes the average total ride fare per hour and location. To group by location, we discretize the drop-off locations’ coordinates into areas of approximately 250 by 250 meters. The query to compute this result is shown below. You can find the source code of the application that runs the query in our repository.
SELECT
toCellId(dropOffLon, dropOffLat) AS area,
TUMBLE_START(dropOffTime, INTERVAL ‘1’ HOUR) AS t,
AVG(total) AS avgTotal
FROM Rides
GROUP BY
toCellId(dropOffLon, dropOffLat),
TUMBLE(dropOffTime, INTERVAL ‘1’ HOUR)
The query looks and behaves just like a regular SELECT-FROM-GROUP-BY
query. However, there are three functions that we need to explain:
TUMBLE
is a function to assign records to windows of a fixed length, here one hour.TUMBLE_START
is a function that returns the start timestamp of a window, e.g.,TUMBLE_START
will return“2018-06-01 13:00:00.000”
for the window that starts at 1 p.m. and ends at 2 p.m. on June 1, 2018.toCellId
is a user-defined function that computes a cell ID, representing an area of 250 by 250 meters, from a pair of longitude-latitude values.
The query will return a result similar to this:
8> 29126,2013-01-04 11:00:00.0,44.0
4> 46551,2013-01-04 11:00:00.0,14.4625
1> 44565,2013-01-04 12:00:00.0,8.188571
3> 57031,2013-01-04 12:00:00.0,13.066667
8> 46847,2013-01-04 12:00:00.0,20.5
7> 56781,2013-01-04 12:00:00.0,7.5
An important detail of our example query is that dropOffTime
is an event-time timestamp attribute. The timestamps of an event-time attribute are (roughly) increasing and are guarded by watermarks. Watermarks are special records that provide the system with a lower bound for the event-time timestamps of all future rows. Among other things, Flink’s SQL engine uses watermarks to determine when the result of a window can be computed and emitted.
For example, from a watermark with a timestamp of “2018-06-14 15:00:00.0”
Flink infers that all windows that end before “2018-06-14 15:00:00.0”
can be finalized because no more records with a timestamp equal to or less than the watermark should arrive. Event-time attributes and their watermarks are declared in the table definition.
The second query that I’d like to discuss is very similar to the first one. It computes the average total ride fare per location and hour of day. The difference from the first query is that we do not compute the average for each hour but for each hour of the day; i.e., we aggregate the fares of rides that happened every day at the same time. You can check out the application’s source code in our repository.
SELECT
toCellId(dropOffLon, dropOffLat) AS area,
EXTRACT(HOUR FROM dropOffTime) AS hourOfDay,
AVG(total) AS avgTotal
FROM Rides
GROUP BY
toCellId(dropOffLon, dropOffLat),
EXTRACT(HOUR FROM dropOffTime)
The query looks very similar to the first one. Syntactically the only difference is that we replaced the TUMBLE
function with the EXTRACT
function that extracts the hour of day of a timestamp. However, this modification affects how the query is processed. For the first query, Flink tracks watermarks to decide when to compute the result of a windowed aggregation. This is not possible for the second query, because the query aggregates per hour of the day, such that Flink cannot wait until all data are present before emitting a result. Therefore, Flink emits results and continuously updates them as new rows are ingested.
The query will return a result similar to the one below.
5> (false,45563,14,7.125)
5> (true,45563,14,7.25)
7> (true,49792,14,10.095588)
3> (false,48295,15,6.7)
3> (true,48295,15,6.76)
Comparing this result with the result of the first query, you will notice the boolean flags (false, true) at the start of each row. These flags indicate whether a row is added to the result (true) or removed from the result (false). Whenever a result needs to be updated, Flink emits two rows, one to remove the previous result and one to add a new result. Also note that in the second set of results the time of day appears as an integer representing the hour, such as 14
, rather than something like 2013-01-04 14:00:00.0
, as seen in the first set of results.
Using insert and delete messages, Flink is able to encode arbitrary changes and continuously maintain the result of a streaming SQL query in an external storage system, such as PostgreSQL or Elasticsearch. Dashboards or other external applications can connect to such storage systems and retrieve and visualize the results, which are updated with low latency as the input streams are ingested.
The example queries that we have examined demonstrate the versatility of Flink’s SQL support. Besides traditional batch analytics, SQL queries can perform common stream analytics operations such as windowed joins and aggregations. At the same time, Flink can maintain and update materialized views with low latency. All use cases benefit from the unified standard SQL syntax and semantics for batch and streaming input data. Flink SQL supports the most common relational operations including projection, selection, aggregation, and joins. Moreover, Flink offers many built-in functions and provides excellent support for user-defined functions, such that custom logic can be easily embedded in SQL queries.
Time to start exploring
The applications and queries discussed in this article should have given you a good intuition of the capabilities and style of Flink’s APIs. While Flink’s DataStream API and ProcessFunctions
provide low-level control of time and state, Flink’s SQL support offers a well-known and concise interface to process streaming and batch data in a unified fashion.
The GitHub repository that we published for this article is a good starting point to learn more about Flink and its APIs. We encourage you to fork and clone it. You can run the examples that I have presented, modify them, or create completely new applications or queries. Please check out the documentation or reach out to the friendly Flink community via the mailing lists if you have any questions. Happy hacking!
Fabian Hueske is a committer and Project Management Committee member of the Apache Flink project. He is one of the three original creators of Apache Flink and a co-founder of data Artisans, a Berlin-based startup devoted to fostering Flink. Fabian is currently writing a book, “Stream Processing with Apache Flink,” to be published by O’Reilly later this year. Follow him on Twitter at @fhueske.
—
New Tech Forum provides a venue to explore and discuss emerging enterprise technology in unprecedented depth and breadth. The selection is subjective, based on our pick of the technologies we believe to be important and of greatest interest to InfoWorld readers. InfoWorld does not accept marketing collateral for publication and reserves the right to edit all contributed content. Send all inquiries to newtechforum@infoworld.com.