SQL queries are specified with the sqlQuery()
method of the TableEnvironment
. The method returns the result of the SQL query as a Table
. A Table
can be used in subsequent SQL and Table API queries, be converted into a DataSet or DataStream, or written to a TableSink). SQL and Table API queries can seamlessly mixed and are holistically optimized and translated into a single program.
In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a TableSource, Table, DataStream, or DataSet. Alternatively, users can also register external catalogs in a TableEnvironment to specify the location of the data sources.
For convenience Table.toString()
automatically registers the table under a unique name in its TableEnvironment
and returns the name. Hence, Table
objects can be directly inlined into SQL queries (by string concatenation) as shown in the examples below.
Note: Flink’s SQL support is not yet feature complete. Queries that include unsupported SQL features cause a TableException
. The supported features of SQL on batch and streaming tables are listed in the following sections.
The following examples show how to specify a SQL queries on registered and inlined tables.
Flink parses SQL using Apache Calcite, which supports standard ANSI SQL. DDL statements are not supported by Flink.
The following BNFgrammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.
Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:
"SELECT a AS `my field` FROM t"
).Operation  Description 

Scan / Select / As Batch Streaming 

Where / Filter Batch Streaming 

Userdefined Scalar Functions (Scalar UDF) Batch Streaming 
UDFs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register scalar UDFs. 
Operation  Description 

GroupBy Aggregation Batch Streaming Result Updating 
Note: GroupBy on a streaming table produces an updating result. See the Streaming Concepts page for details. 
GroupBy Window Aggregation Batch Streaming 
Use a group window to compute a single result row per group. See Group Windows section for more details. 
Over Window aggregation Streaming 
Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute 
Distinct Batch Streaming Result Updating 
Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. 
Grouping sets, Rollup, Cube Batch 

Having Batch Streaming 

Userdefined Aggregate Functions (UDAGG) Batch Streaming 
UDAGGs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register UDAGGs. 
Operation  Description 

Inner Equijoin Batch Streaming 
Currently, only equijoins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported. Note: The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail. Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. 
Outer Equijoin Batch Streaming Result Updating 
Currently, only equijoins are supported, i.e., joins that have at least one conjunctive condition with an equality predicate. Arbitrary cross or theta joins are not supported. Note: The order of joins is not optimized. Tables are joined in the order in which they are specified in the FROM clause. Make sure to specify tables in an order that does not yield a cross join (Cartesian product) which are not supported and would cause a query to fail. Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. 
Timewindowed Join Batch Streaming 
Note: Timewindowed joins are a subset of regular joins that can be processed in a streaming fashion. A timewindowed join requires at least one equijoin predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates ( For example, the following predicates are valid window join conditions:

Expanding arrays into a relation Batch Streaming 
Unnesting WITH ORDINALITY is not supported yet. 
Join with Table Function Batch Streaming 
UDTFs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register UDTFs. Inner Join Left Outer Join Note: Currently, only literal 
Operation  Description 

Union Batch 

UnionAll Batch Streaming 

Intersect / Except Batch 

In Batch Streaming 
Returns true if an expression exists in a given table subquery. The subquery table must consist of one column. This column must have the same data type as the expression. Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. 
Exists Batch Streaming 
Returns true if the subquery returns at least one row. Only supported if the operation can be rewritten in a join and group operation. Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. 
Operation  Description 

Order By Batch Streaming 
Note: The result of streaming queries must be primarily sorted on an ascending time attribute. Additional sorting attributes are supported. 
Limit Batch 
Operation  Description 

Insert Into Batch Streaming 
Output tables must be registered in the TableEnvironment (see Register a TableSink). Moreover, the schema of the registered table must match the schema of the query. 
Group windows are defined in the GROUP BY
clause of a SQL query. Just like queries with regular GROUP BY
clauses, queries with a GROUP BY
clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.
Group Window Function  Description 

TUMBLE(time_attr, interval) 
Defines a tumbling time window. A tumbling time window assigns rows to nonoverlapping, continuous windows with a fixed duration (interval ). For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on eventtime (stream + batch) or processingtime (stream). 
HOP(time_attr, interval, interval) 
Defines a hopping time window (called sliding window in the Table API). A hopping time window has a fixed duration (second interval parameter) and hops by a specified hop interval (first interval parameter). If the hop interval is smaller than the window size, hopping windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a hopping window of 15 minutes size and 5 minute hop interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Hopping windows can be defined on eventtime (stream + batch) or processingtime (stream). 
SESSION(time_attr, interval) 
Defines a session time window. Session time windows do not have a fixed duration but their bounds are defined by a time interval of inactivity, i.e., a session window is closed if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on eventtime (stream + batch) or processingtime (stream). 
For SQL queries on streaming tables, the time_attr
argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the documentation of time attributes to learn how to define time attributes.
For SQL on batch tables, the time_attr
argument of the group window function must be an attribute of type TIMESTAMP
.
The start and end timestamps of group windows as well as time attributes can be selected with the following auxiliary functions:
Auxiliary Function  Description 

TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) 
Returns the timestamp of the inclusive lower bound of the corresponding tumbling, hopping, or session window. 
TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) 
Returns the timestamp of the exclusive upper bound of the corresponding tumbling, hopping, or session window. Note: The exclusive upper bound timestamp cannot be used as a rowtime attribute in subsequent timebased operations, such as timewindowed joins and group window or over window aggregations. 
TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) 
Returns the timestamp of the inclusive upper bound of the corresponding tumbling, hopping, or session window. The resulting attribute is a rowtime attribute that can be used in subsequent timebased operations such as timewindowed joins and group window or over window aggregations. 
TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) 
Returns a proctime attribute that can be used in subsequent timebased operations such as timewindowed joins and group window or over window aggregations. 
Note: Auxiliary functions must be called with exactly same arguments as the group window function in the GROUP BY
clause.
The following examples show how to specify SQL queries with group windows on streaming tables.
The SQL runtime is built on top of Flink’s DataSet and DataStream APIs. Internally, it also uses Flink’s TypeInformation
to define data types. Fully supported types are listed in org.apache.flink.table.api.Types
. The following table summarizes the relation between SQL Types, Table API types, and the resulting Java class.
Table API  SQL  Java type 

Types.STRING 
VARCHAR 
java.lang.String 
Types.BOOLEAN 
BOOLEAN 
java.lang.Boolean 
Types.BYTE 
TINYINT 
java.lang.Byte 
Types.SHORT 
SMALLINT 
java.lang.Short 
Types.INT 
INTEGER, INT 
java.lang.Integer 
Types.LONG 
BIGINT 
java.lang.Long 
Types.FLOAT 
REAL, FLOAT 
java.lang.Float 
Types.DOUBLE 
DOUBLE 
java.lang.Double 
Types.DECIMAL 
DECIMAL 
java.math.BigDecimal 
Types.SQL_DATE 
DATE 
java.sql.Date 
Types.SQL_TIME 
TIME 
java.sql.Time 
Types.SQL_TIMESTAMP 
TIMESTAMP(3) 
java.sql.Timestamp 
Types.INTERVAL_MONTHS 
INTERVAL YEAR TO MONTH 
java.lang.Integer 
Types.INTERVAL_MILLIS 
INTERVAL DAY TO SECOND(3) 
java.lang.Long 
Types.PRIMITIVE_ARRAY 
ARRAY 
e.g. int[] 
Types.OBJECT_ARRAY 
ARRAY 
e.g. java.lang.Byte[] 
Types.MAP 
MAP 
java.util.HashMap 
Types.MULTISET 
MULTISET 
e.g. java.util.HashMap<String, Integer> for a multiset of String 
Generic types and composite types (e.g., POJOs or Tuples) can be fields of a row as well. Generic types are treated as a black box and can be passed on or processed by userdefined functions. Composite types can be accessed with builtin functions (see Value access functions section).
Flink’s SQL support comes with a set of builtin functions for data transformations. This section gives a brief overview of the available functions.
The Flink SQL functions (including their syntax) are a subset of Apache Calcite’s builtin functions. Most of the documentation has been adopted from the Calcite SQL reference.
Comparison functions  Description 

Equals. 

Not equal. 

Greater than. 

Greater than or equal. 

Less than. 

Less than or equal. 

Returns TRUE if value is null. 

Returns TRUE if value is not null. 

Returns TRUE if two values are not equal, treating null values as the same. 

Returns TRUE if two values are equal, treating null values as the same. 

Returns TRUE if value1 is greater than or equal to value2 and less than or equal to value3. 

Returns TRUE if value1 is less than value2 or greater than value3. 

Returns TRUE if string1 matches pattern string2. An escape character can be defined if necessary. 

Returns TRUE if string1 does not match pattern string2. An escape character can be defined if necessary. 

Returns TRUE if string1 matches regular expression string2. An escape character can be defined if necessary. 

Returns TRUE if string1 does not match regular expression string2. An escape character can be defined if necessary. 

Returns TRUE if an expression exists in a given list of expressions. This is a shorthand for multiple OR conditions. If the testing set contains NULL, the result will be NULL if the element can not be found and TRUE if it can be found. If the element is NULL, the result is always NULL. E.g. "42 IN (1, 2, 3)" leads to FALSE. 

Returns TRUE if value is not equal to every value in a list. 

Returns TRUE if subquery returns at least one row. Only supported if the operation can be rewritten in a join and group operation. Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. 

Returns TRUE if value is equal to a row returned by subquery. Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. 

Returns TRUE if value is not equal to every row returned by subquery. Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. 
Logical functions  Description 

Returns TRUE if boolean1 is TRUE or boolean2 is TRUE. Supports threevalued logic. 

Returns TRUE if boolean1 and boolean2 are both TRUE. Supports threevalued logic. 

Returns TRUE if boolean is not TRUE; returns UNKNOWN if boolean is UNKNOWN. 

Returns TRUE if boolean is FALSE; returns FALSE if boolean is UNKNOWN. 

Returns TRUE if boolean is not FALSE; returns TRUE if boolean is UNKNOWN. 

Returns TRUE if boolean is TRUE; returns FALSE if boolean is UNKNOWN. 

Returns TRUE if boolean is not TRUE; returns TRUE if boolean is UNKNOWN. 

Returns TRUE if boolean is UNKNOWN. 

Returns TRUE if boolean is not UNKNOWN. 
Arithmetic functions  Description 

Returns numeric. 

Returns negative numeric. 

Returns numeric1 plus numeric2. 

Returns numeric1 minus numeric2. 

Returns numeric1 multiplied by numeric2. 

Returns numeric1 divided by numeric2. 

Returns numeric1 raised to the power of numeric2. 

Returns the absolute value of numeric. 

Returns the remainder (modulus) of numeric1 divided by numeric2. The result is negative only if numeric1 is negative. 

Returns the square root of numeric. 

Returns the natural logarithm (base e) of numeric. 

Returns the base 10 logarithm of numeric. 

Returns the logarithm of a numeric. If called with one parameter, this function returns the natural logarithm of 

Returns e raised to the power of numeric. 

Rounds numeric up, and returns the smallest number that is greater than or equal to numeric. 

Rounds numeric down, and returns the largest number that is less than or equal to numeric. 

Calculates the sine of a given number. 

Calculates the cosine of a given number. 

Calculates the tangent of a given number. 

Calculates the cotangent of a given number. 

Calculates the arc sine of a given number. 

Calculates the arc cosine of a given number. 

Calculates the arc tangent of a given number. 

Converts numeric from radians to degrees. 

Converts numeric from degrees to radians. 

Calculates the signum of a given number. 

Rounds the given number to integer places right to the decimal point. 

Returns a value that is closer than any other value to pi. 

Returns a value that is closer than any other value to e. 

Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive). 

Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two RAND functions will return identical sequences of numbers if they have same initial seed. 

Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive). 

Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two RAND_INTEGER functions will return identical sequences of numbers if they have same initial seed and same bound. 

Returns a string representation of an integer numeric value in binary format. Returns null if numeric is null. E.g. "4" leads to "100", "12" leads to "1100". 
String functions  Description 

Concatenates two character strings. 

Returns the number of characters in a character string. 

As CHAR_LENGTH(string). 

Returns a character string converted to upper case. 

Returns a character string converted to lower case. 

Returns the position of the first occurrence of string1 in string2. 

Removes leading and/or trailing characters from string2. By default, whitespaces at both sides are removed. 

Replaces a substring of string1 with string2. 

Returns a substring of a character string starting at a given point. 

Returns a substring of a character string starting at a given point with a given length. 

Returns string with the first letter of each word converter to upper case and the rest to lower case. Words are sequences of alphanumeric characters separated by nonalphanumeric characters. 

Returns the string that results from concatenating the arguments. Returns NULL if any argument is NULL. E.g. 

Returns the string that results from concatenating the arguments using a separator. The separator is added between the strings to be concatenated. Returns NULL If the separator is NULL. CONCAT_WS() does not skip empty strings. However, it does skip any NULL argument. E.g. 

Returns the string text leftpadded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. 

Returns the string text rightpadded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. 
Conditional functions  Description 

Simple case. 

Searched case. 

Returns NULL if the values are the same. For example, 

Provides a value if the first value is null. For example, 
Type conversion functions  Description 

Converts a value to a given type. 
Temporal functions  Description 

Parses a date string in the form "yymmdd" to a SQL date. 

Parses a time string in the form "hh:mm:ss" to a SQL time. 

Parses a timestamp string in the form "yymmdd hh:mm:ss.fff" to a SQL timestamp. 

Parses an interval string in the form "dd hh:mm:ss.fff" for SQL intervals of milliseconds or "yyyymm" for SQL intervals of months. An interval range might be e.g. 

Returns the current SQL date in UTC time zone. 

Returns the current SQL time in UTC time zone. 

Returns the current SQL timestamp in UTC time zone. 

Returns the current SQL time in local time zone. 

Returns the current SQL timestamp in local time zone. 

Extracts parts of a time point or time interval. Returns the part as a long value. E.g. 

Returns the year from a SQL date. Equivalent to 

Rounds a time point down to the given unit. E.g. 

Rounds a time point up to the given unit. E.g. 

Returns the quarter of a year from a SQL date (an integer between 1 and 4). Equivalent to 

Returns the month of a year from a SQL date (an integer between 1 and 12). Equivalent to 

Returns the week of a year from a SQL date (an integer between 1 and 53). Equivalent to 

Returns the day of a year from a SQL date (an integer between 1 and 366). Equivalent to 

Returns the day of a month from a SQL date (an integer between 1 and 31). Equivalent to 

Returns the day of a week from a SQL date (an integer between 1 and 7; Sunday = 1). Equivalent to 

Returns the hour of a day from a SQL timestamp (an integer between 0 and 23). Equivalent to 

Returns the minute of an hour from a SQL timestamp (an integer between 0 and 59). Equivalent to 

Returns the second of a minute from a SQL timestamp (an integer between 0 and 59). Equivalent to 

Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates 

Attention This function has serious bugs and should not be used for now. Please implement a custom UDF instead or use EXTRACT as a workaround. 

Adds a (signed) integer interval to a timestamp. The unit for the interval is given by the unit argument, which should be one of the following values: 
Aggregate functions  Description 

Returns the number of input rows for which value is not null. Use 

Returns the number of input rows. 

Returns the average (arithmetic mean) of numeric across all input values. 

Returns the sum of numeric across all input values. Use 

Returns the maximum value of value across all input values. 

Returns the minimum value of value across all input values. 

Returns the population standard deviation of the numeric field across all input values. 

Returns the sample standard deviation of the numeric field across all input values. 

Returns the population variance (square of the population standard deviation) of the numeric field across all input values. 

Returns the sample variance (square of the sample standard deviation) of the numeric field across all input values. 

Returns a multiset of the values. null input value will be ignored. Return an empty multiset if only null values are added. 
Grouping functions  Description 

Returns an integer that uniquely identifies the combination of grouping keys. 

Returns 1 if expression is rolled up in the current row’s grouping set, 0 otherwise. 

Returns a bit vector of the given grouping expressions. 
Value access functions  Description 

Accesses the field of a Flink composite type (such as Tuple, POJO, etc.) by name and returns it's value. 

Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field. 
Value constructor functions  Description 

Creates a row from a list of values. 

Creates a row from a list of values. 

Creates an array from a list of values. 

Creates a map from a list of keyvalue pairs. 
Array functions  Description 

Returns the number of elements of an array. 

Returns the element at a particular position in an array. The index starts at 1. 

Returns the sole element of an array with a single element. Returns 
Map functions  Description 

Returns the number of entries of a map. 

Returns the value specified by a particular key in a map. 
Hash functions  Description 

Returns the MD5 hash of the string argument as a string of 32 hexadecimal digits; null if string is null. 

Returns the SHA1 hash of the string argument as a string of 40 hexadecimal digits; null if string is null. 

Returns the SHA224 hash of the string argument as a string of 56 hexadecimal digits; null if string is null. 

Returns the SHA256 hash of the string argument as a string of 64 hexadecimal digits; null if string is null. 

Returns the SHA384 hash of the string argument as a string of 96 hexadecimal digits; null if string is null. 

Returns the SHA512 hash of the string argument as a string of 128 hexadecimal digits; null if string is null. 

Returns the hash using the SHA2 family of hash functions (SHA224, SHA256, SHA384, or SHA512). The first argument string is the string to be hashed. hashLength is the bit length of the result (either 224, 256, 384, or 512). Returns null if string or hashLength is null. 
The following functions are not supported yet:
Although not every SQL feature is implemented yet, some string combinations are already reserved as keywords for future use. If you want to use one of the following strings as a field name, make sure to surround them with backticks (e.g. `value`
, `count`
).
Specifier  Description 

Abbreviated weekday name (Sun .. Sat ) 

Abbreviated month name (Jan .. Dec ) 

Month, numeric (1 .. 12 ) 

Day of the month with English suffix (0th , 1st , 2nd , 3rd , ...) 

Day of the month, numeric (01 .. 31 ) 

Day of the month, numeric (1 .. 31 ) 

Fraction of second (6 digits for printing: 000000 .. 999000 ; 1  9 digits for parsing: 0 .. 999999999 ) (Timestamp is truncated to milliseconds.) 

Hour (00 .. 23 ) 

Hour (01 .. 12 ) 

Hour (01 .. 12 ) 

Minutes, numeric (00 .. 59 ) 

Day of year (001 .. 366 ) 

Hour (0 .. 23 ) 

Hour (1 .. 12 ) 

Month name (January .. December ) 

Month, numeric (01 .. 12 ) 

AM or PM 

Time, 12hour (hh:mm:ss followed by AM or PM ) 

Seconds (00 .. 59 ) 

Seconds (00 .. 59 ) 

Time, 24hour (hh:mm:ss ) 

Week (00 .. 53 ), where Sunday is the first day of the week 

Week (00 .. 53 ), where Monday is the first day of the week 

Week (01 .. 53 ), where Sunday is the first day of the week; used with %X 

Week (01 .. 53 ), where Monday is the first day of the week; used with %x 

Weekday name (Sunday .. Saturday ) 

Day of the week (0 .. 6 ), where Sunday is the first day of the week 

Year for the week where Sunday is the first day of the week, numeric, four digits; used with %V 

Year for the week, where Monday is the first day of the week, numeric, four digits; used with %v 

Year, numeric, four digits  
Year, numeric (two digits)  
A literal % character 

x , for any x not listed above 