Due to historical reasons, before Flink 1.9, Flink’s Table & SQL API data types were
tightly coupled to Flink’s TypeInformation
. TypeInformation
is used in the DataStream
and DataSet API and is sufficient to describe all information needed to serialize and
deserialize JVM-based objects in a distributed setting.
However, TypeInformation
was not designed to represent logical types independent of
an actual JVM class. In the past, it was difficult to map SQL standard types to this
abstraction. Furthermore, some types were not SQL-compliant and introduced without a
bigger picture in mind.
Starting with Flink 1.9, the Table & SQL API will receive a new type system that serves as a long-term solution for API stability and standard compliance.
Reworking the type system is a major effort that touches almost all user-facing interfaces. Therefore, its introduction spans multiple releases, and the community aims to finish this effort by Flink 1.10.
Due to the simultaneous addition of a new planner for table programs (see FLINK-11439), not every combination of planner and data type is supported. Furthermore, planners might not support every data type with the desired precision or parameter.
Attention Please see the planner compatibility table and limitations section before using a data type.
A data type describes the logical type of a value in the table ecosystem. It can be used to declare input and/or output types of operations.
Flink’s data types are similar to the SQL standard’s data type terminology but also contain information about the nullability of a value for efficient handling of scalar expressions.
Examples of data types are:
INT
INT NOT NULL
INTERVAL DAY TO SECOND(3)
ROW<myField ARRAY<BOOLEAN>, myOtherField TIMESTAMP(3)>
A list of all pre-defined data types can be found below.
Users of the JVM-based API work with instances of org.apache.flink.table.types.DataType
within the Table API or when
defining connectors, catalogs, or user-defined functions.
A DataType
instance has two responsibilities:
For JVM-based languages, all pre-defined data types are available in org.apache.flink.table.api.DataTypes
.
It is recommended to add a star import to your table programs for having a fluent API:
Physical hints are required at the edges of the table ecosystem where the SQL-based type system ends and programming-specific data types are required. Hints indicate the data format that an implementation expects.
For example, a data source could express that it produces values for logical TIMESTAMP
s using a java.sql.Timestamp
class
instead of using java.time.LocalDateTime
which would be the default. With this information, the runtime is able to convert
the produced class into its internal data format. In return, a data sink can declare the data format it consumes from the runtime.
Here are some examples of how to declare a bridging conversion class:
Attention Please note that physical hints are usually only required if the
API is extended. Users of predefined sources/sinks/functions do not need to define such hints. Hints within
a table program (e.g. field.cast(TIMESTAMP(3).bridgedTo(Timestamp.class))
) are ignored.
As mentioned in the introduction, reworking the type system will span multiple releases, and the support of each data type depends on the used planner. This section aims to summarize the most significant differences.
Flink’s old planner, introduced before Flink 1.9, primarily supports type information. It has only limited support for data types. It is possible to declare data types that can be translated into type information such that the old planner understands them.
The following table summarizes the difference between data type and type information. Most simple types, as well as the row type remain the same. Time types, array types, and the decimal type need special attention. Other hints as the ones mentioned are not allowed.
For the Type Information column the table omits the prefix org.apache.flink.table.api.Types
.
For the Data Type Representation column the table omits the prefix org.apache.flink.table.api.DataTypes
.
Type Information | Java Expression String | Data Type Representation | Remarks for Data Type |
---|---|---|---|
STRING() |
STRING |
STRING() |
|
BOOLEAN() |
BOOLEAN |
BOOLEAN() |
|
BYTE() |
BYTE |
TINYINT() |
|
SHORT() |
SHORT |
SMALLINT() |
|
INT() |
INT |
INT() |
|
LONG() |
LONG |
BIGINT() |
|
FLOAT() |
FLOAT |
FLOAT() |
|
DOUBLE() |
DOUBLE |
DOUBLE() |
|
ROW(...) |
ROW<...> |
ROW(...) |
|
BIG_DEC() |
DECIMAL |
[DECIMAL() ] |
Not a 1:1 mapping as precision and scale are ignored and Java’s variable precision and scale are used. |
SQL_DATE() |
SQL_DATE |
DATE() .bridgedTo(java.sql.Date.class) |
|
SQL_TIME() |
SQL_TIME |
TIME(0) .bridgedTo(java.sql.Time.class) |
|
SQL_TIMESTAMP() |
SQL_TIMESTAMP |
TIMESTAMP(3) .bridgedTo(java.sql.Timestamp.class) |
|
INTERVAL_MONTHS() |
INTERVAL_MONTHS |
INTERVAL(MONTH()) .bridgedTo(Integer.class) |
|
INTERVAL_MILLIS() |
INTERVAL_MILLIS |
INTERVAL(DataTypes.SECOND(3)) .bridgedTo(Long.class) |
|
PRIMITIVE_ARRAY(...) |
PRIMITIVE_ARRAY<...> |
ARRAY(DATATYPE.notNull() .bridgedTo(PRIMITIVE.class)) |
Applies to all JVM primitive types except for byte . |
PRIMITIVE_ARRAY(BYTE()) |
PRIMITIVE_ARRAY<BYTE> |
BYTES() |
|
OBJECT_ARRAY(...) |
OBJECT_ARRAY<...> |
ARRAY( DATATYPE.bridgedTo(OBJECT.class)) |
|
MULTISET(...) |
MULTISET(...) |
||
MAP(..., ...) |
MAP<...,...> |
MAP(...) |
|
other generic types | ANY(...) |
Attention If there is a problem with the new type system. Users
can fallback to type information defined in org.apache.flink.table.api.Types
at any time.
The new Blink planner supports all of types of the old planner. This includes in particular the listed Java expression strings and type information.
The following data types are supported:
Data Type | Remarks for Data Type |
---|---|
STRING |
CHAR and VARCHAR are not supported yet. |
BOOLEAN |
|
BYTES |
BINARY and VARBINARY are not supported yet. |
DECIMAL |
Supports fixed precision and scale. |
TINYINT |
|
SMALLINT |
|
INTEGER |
|
BIGINT |
|
FLOAT |
|
DOUBLE |
|
DATE |
|
TIME |
Supports only a precision of 0 . |
TIMESTAMP |
Supports only a precision of 3 . |
TIMESTAMP WITH LOCAL TIME ZONE |
Supports only a precision of 3 . |
INTERVAL |
Supports only interval of MONTH and SECOND(3) . |
ARRAY |
|
MULTISET |
|
MAP |
|
ROW |
|
ANY |
Java Expression String: Java expression strings in the Table API such as table.select("field.cast(STRING)")
have not been updated to the new type system yet. Use the string representations declared in
the old planner section.
Connector Descriptors and SQL Client: Descriptor string representations have not been updated to the new type system yet. Use the string representation declared in the Connect to External Systems section
User-defined Functions: User-defined functions cannot declare a data type yet.
This section lists all pre-defined data types. For the JVM-based Table API those types are also available in org.apache.flink.table.api.DataTypes
.
CHAR
Data type of a fixed-length character string.
Declaration
The type can be declared using CHAR(n)
where n
is the number of code points. n
must have a value between 1
and 2,147,483,647
(both inclusive). If no length is specified, n
is equal to 1
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.String |
X | X | Default |
byte[] |
X | X | Assumes UTF-8 encoding. |
VARCHAR
/ STRING
Data type of a variable-length character string.
Declaration
The type can be declared using VARCHAR(n)
where n
is the maximum number of code points. n
must have a value
between 1
and 2,147,483,647
(both inclusive). If no length is specified, n
is equal to 1
.
STRING
is a synonym for VARCHAR(2147483647)
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.String |
X | X | Default |
byte[] |
X | X | Assumes UTF-8 encoding. |
BINARY
Data type of a fixed-length binary string (=a sequence of bytes).
Declaration
The type can be declared using BINARY(n)
where n
is the number of bytes. n
must have a value
between 1
and 2,147,483,647
(both inclusive). If no length is specified, n
is equal to 1
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
byte[] |
X | X | Default |
VARBINARY
/ BYTES
Data type of a variable-length binary string (=a sequence of bytes).
Declaration
The type can be declared using VARBINARY(n)
where n
is the maximum number of bytes. n
must
have a value between 1
and 2,147,483,647
(both inclusive). If no length is specified, n
is
equal to 1
.
BYTES
is a synonym for VARBINARY(2147483647)
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
byte[] |
X | X | Default |
DECIMAL
Data type of a decimal number with fixed precision and scale.
Declaration
The type can be declared using DECIMAL(p, s)
where p
is the number of digits in a
number (precision) and s
is the number of digits to the right of the decimal point
in a number (scale). p
must have a value between 1
and 38
(both inclusive). s
must have a value between 0
and p
(both inclusive). The default value for p
is 10.
The default value for s
is 0
.
NUMERIC(p, s)
and DEC(p, s)
are synonyms for this type.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.math.BigDecimal |
X | X | Default |
TINYINT
Data type of a 1-byte signed integer with values from -128
to 127
.
Declaration
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Byte |
X | X | Default |
byte |
X | (X) | Output only if type is not nullable. |
SMALLINT
Data type of a 2-byte signed integer with values from -32,768
to 32,767
.
Declaration
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Short |
X | X | Default |
short |
X | (X) | Output only if type is not nullable. |
INT
Data type of a 4-byte signed integer with values from -2,147,483,648
to 2,147,483,647
.
Declaration
INTEGER
is a synonym for this type.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Integer |
X | X | Default |
int |
X | (X) | Output only if type is not nullable. |
BIGINT
Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808
to
9,223,372,036,854,775,807
.
Declaration
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Long |
X | X | Default |
long |
X | (X) | Output only if type is not nullable. |
FLOAT
Data type of a 4-byte single precision floating point number.
Compared to the SQL standard, the type does not take parameters.
Declaration
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Float |
X | X | Default |
float |
X | (X) | Output only if type is not nullable. |
DOUBLE
Data type of an 8-byte double precision floating point number.
Declaration
DOUBLE PRECISION
is a synonym for this type.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Double |
X | X | Default |
double |
X | (X) | Output only if type is not nullable. |
DATE
Data type of a date consisting of year-month-day
with values ranging from 0000-01-01
to 9999-12-31
.
Compared to the SQL standard, the range starts at year 0000
.
Declaration
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.LocalDate |
X | X | Default |
java.sql.Date |
X | X | |
java.lang.Integer |
X | X | Describes the number of days since epoch. |
int |
X | (X) | Describes the number of days since epoch. Output only if type is not nullable. |
TIME
Data type of a time without time zone consisting of hour:minute:second[.fractional]
with
up to nanosecond precision and values ranging from 00:00:00.000000000
to
23:59:59.999999999
.
Compared to the SQL standard, leap seconds (23:59:60
and 23:59:61
) are not supported as
the semantics are closer to java.time.LocalTime
. A time with time zone is not provided.
Declaration
The type can be declared using TIME(p)
where p
is the number of digits of fractional
seconds (precision). p
must have a value between 0
and 9
(both inclusive). If no
precision is specified, p
is equal to 0
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.LocalTime |
X | X | Default |
java.sql.Time |
X | X | |
java.lang.Integer |
X | X | Describes the number of milliseconds of the day. |
int |
X | (X) | Describes the number of milliseconds of the day. Output only if type is not nullable. |
java.lang.Long |
X | X | Describes the number of nanoseconds of the day. |
long |
X | (X) | Describes the number of nanoseconds of the day. Output only if type is not nullable. |
TIMESTAMP
Data type of a timestamp without time zone consisting of year-month-day hour:minute:second[.fractional]
with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000
to
9999-12-31 23:59:59.999999999
.
Compared to the SQL standard, leap seconds (23:59:60
and 23:59:61
) are not supported as
the semantics are closer to java.time.LocalDateTime
.
A conversion from and to BIGINT
(a JVM long
type) is not supported as this would imply a time
zone. However, this type is time zone free. For more java.time.Instant
-like semantics use
TIMESTAMP WITH LOCAL TIME ZONE
.
Declaration
The type can be declared using TIMESTAMP(p)
where p
is the number of digits of fractional
seconds (precision). p
must have a value between 0
and 9
(both inclusive). If no precision
is specified, p
is equal to 6
.
TIMESTAMP(p) WITHOUT TIME ZONE
is a synonym for this type.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.LocalDateTime |
X | X | Default |
java.sql.Timestamp |
X | X |
TIMESTAMP WITH TIME ZONE
Data type of a timestamp with time zone consisting of year-month-day hour:minute:second[.fractional] zone
with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000 +14:59
to
9999-12-31 23:59:59.999999999 -14:59
.
Compared to the SQL standard, leap seconds (23:59:60
and 23:59:61
) are not supported as the semantics
are closer to java.time.OffsetDateTime
.
Compared to TIMESTAMP WITH LOCAL TIME ZONE
, the time zone offset information is physically
stored in every datum. It is used individually for every computation, visualization, or communication
to external systems.
Declaration
The type can be declared using TIMESTAMP(p) WITH TIME ZONE
where p
is the number of digits of
fractional seconds (precision). p
must have a value between 0
and 9
(both inclusive). If no
precision is specified, p
is equal to 6
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.OffsetDateTime |
X | X | Default |
java.time.ZonedDateTime |
X | Ignores the zone ID. |
TIMESTAMP WITH LOCAL TIME ZONE
Data type of a timestamp with local time zone consisting of year-month-day hour:minute:second[.fractional] zone
with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000 +14:59
to
9999-12-31 23:59:59.999999999 -14:59
.
Leap seconds (23:59:60
and 23:59:61
) are not supported as the semantics are closer to java.time.OffsetDateTime
.
Compared to TIMESTAMP WITH TIME ZONE
, the time zone offset information is not stored physically
in every datum. Instead, the type assumes java.time.Instant
semantics in UTC time zone at
the edges of the table ecosystem. Every datum is interpreted in the local time zone configured in
the current session for computation and visualization.
This type fills the gap between time zone free and time zone mandatory timestamp types by allowing the interpretation of UTC timestamps according to the configured session time zone.
Declaration
The type can be declared using TIMESTAMP(p) WITH LOCAL TIME ZONE
where p
is the number
of digits of fractional seconds (precision). p
must have a value between 0
and 9
(both inclusive). If no precision is specified, p
is equal to 6
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.Instant |
X | X | Default |
java.lang.Integer |
X | X | Describes the number of seconds since epoch. |
int |
X | (X) | Describes the number of seconds since epoch. Output only if type is not nullable. |
java.lang.Long |
X | X | Describes the number of milliseconds since epoch. |
long |
X | (X) | Describes the number of milliseconds since epoch. Output only if type is not nullable. |
INTERVAL YEAR TO MONTH
Data type for a group of year-month interval types.
The type must be parameterized to one of the following resolutions:
An interval of year-month consists of +years-months
with values ranging from -9999-11
to
+9999-11
.
The value representation is the same for all types of resolutions. For example, an interval
of months of 50 is always represented in an interval-of-years-to-months format (with default
year precision): +04-02
.
Declaration
The type can be declared using the above combinations where p
is the number of digits of years
(year precision). p
must have a value between 1
and 4
(both inclusive). If no year precision
is specified, p
is equal to 2
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.Period |
X | X | Ignores the days part. Default |
java.lang.Integer |
X | X | Describes the number of months. |
int |
X | (X) | Describes the number of months. Output only if type is not nullable. |
INTERVAL DAY TO MONTH
Data type for a group of day-time interval types.
The type must be parameterized to one of the following resolutions with up to nanosecond precision:
An interval of day-time consists of +days hours:months:seconds.fractional
with values ranging from
-999999 23:59:59.999999999
to +999999 23:59:59.999999999
. The value representation is the same
for all types of resolutions. For example, an interval of seconds of 70 is always represented in
an interval-of-days-to-seconds format (with default precisions): +00 00:01:10.000000
.
Declaration
The type can be declared using the above combinations where p1
is the number of digits of days
(day precision) and p2
is the number of digits of fractional seconds (fractional precision).
p1
must have a value between 1
and 6
(both inclusive). p2
must have a value between 0
and 9
(both inclusive). If no p1
is specified, it is equal to 2
by default. If no p2
is
specified, it is equal to 6
by default.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.Duration |
X | X | Default |
java.lang.Long |
X | X | Describes the number of milliseconds. |
long |
X | (X) | Describes the number of milliseconds. Output only if type is not nullable. |
ARRAY
Data type of an array of elements with same subtype.
Compared to the SQL standard, the maximum cardinality of an array cannot be specified but is
fixed at 2,147,483,647
. Also, any valid type is supported as a subtype.
Declaration
The type can be declared using ARRAY<t>
where t
is the data type of the contained
elements.
t ARRAY
is a synonym for being closer to the SQL standard. For example, INT ARRAY
is
equivalent to ARRAY<INT>
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
t[] |
(X) | (X) | Depends on the subtype. Default |
MULTISET
Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its
elements with a common subtype. Each unique value (including NULL
) is mapped to some multiplicity.
There is no restriction of element types; it is the responsibility of the user to ensure uniqueness.
Declaration
The type can be declared using MULTISET<t>
where t
is the data type
of the contained elements.
t MULTISET
is a synonym for being closer to the SQL standard. For example, INT MULTISET
is
equivalent to MULTISET<INT>
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.util.Map<t, java.lang.Integer> |
X | X | Assigns each value to an integer multiplicity. Default |
ROW
Data type of a sequence of fields.
A field consists of a field name, field type, and an optional description. The most specific type of a row of a table is a row type. In this case, each column of the row corresponds to the field of the row type that has the same ordinal position as the column.
Compared to the SQL standard, an optional field description simplifies the handling with complex structures.
A row type is similar to the STRUCT
type known from other non-standard-compliant frameworks.
Declaration
The type can be declared using ROW<n0 t0 'd0', n1 t1 'd1', ...>
where n
is the unique name of
a field, t
is the logical type of a field, d
is the description of a field.
ROW(...)
is a synonym for being closer to the SQL standard. For example, ROW(myField INT, myOtherField BOOLEAN)
is
equivalent to ROW<myField INT, myOtherField BOOLEAN>
.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
org.apache.flink.types.Row |
X | X | Default |
BOOLEAN
Data type of a boolean with a (possibly) three-valued logic of TRUE
, FALSE
, and UNKNOWN
.
Declaration
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Boolean |
X | X | Default |
boolean |
X | (X) | Output only if type is not nullable. |
NULL
Data type for representing untyped NULL
values.
The null type is an extension to the SQL standard. A null type has no other value
except NULL
, thus, it can be cast to any nullable type similar to JVM semantics.
This type helps in representing unknown types in API calls that use a NULL
literal
as well as bridging to formats such as JSON or Avro that define such a type as well.
This type is not very useful in practice and is just mentioned here for completeness.
Declaration
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Object |
X | X | Default |
any class | (X) | Any non-primitive type. |
ANY
Data type of an arbitrary serialized type. This type is a black box within the table ecosystem and is only deserialized at the edges.
The any type is an extension to the SQL standard.
Declaration
The type can be declared using ANY('class', 'snapshot')
where class
is the originating class and
snapshot
is the serialized TypeSerializerSnapshot
in Base64 encoding. Usually, the type string is not
declared directly but is generated while persisting the type.
In the API, the ANY
type can be declared either by directly supplying a Class
+ TypeSerializer
or
by passing TypeInformation
and let the framework extract Class
+ TypeSerializer
from there.
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
class | X | X | Originating class or subclasses (for input) or superclasses (for output). Default |
byte[] |
X |