This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.
Data Types #
Flink SQL has a rich set of native data types available to users.
Data Type #
A data type describes the logical type of a value in the table ecosystem. It can be used to declare input and/or output types of operations.
Flink’s data types are similar to the SQL standard’s data type terminology but also contain information about the nullability of a value for efficient handling of scalar expressions.
Examples of data types are:
INT
INT NOT NULL
INTERVAL DAY TO SECOND(3)
ROW<myField ARRAY<BOOLEAN>, myOtherField TIMESTAMP(3)>
A list of all pre-defined data types can be found below.
Data Types in the Table API #
Users of the JVM-based API work with instances of org.apache.flink.table.types.DataType
within the Table API or when
defining connectors, catalogs, or user-defined functions.
A DataType
instance has two responsibilities:
- Declaration of a logical type which does not imply a concrete physical representation for transmission or storage but defines the boundaries between JVM-based/Python languages and the table ecosystem.
- Optional: Giving hints about the physical representation of data to the planner which is useful at the edges to other APIs.
For JVM-based languages, all pre-defined data types are available in org.apache.flink.table.api.DataTypes
.
Users of the Python API work with instances of pyflink.table.types.DataType
within the Python Table API or when
defining Python user-defined functions.
A DataType
instance has such a responsibility:
- Declaration of a logical type which does not imply a concrete physical representation for transmission or storage but defines the boundaries between Python languages and the table ecosystem.
For Python language, those types are available in pyflink.table.types.DataTypes
.
It is recommended to add a star import to your table programs for having a fluent API:
import static org.apache.flink.table.api.DataTypes.*;
DataType t = INTERVAL(DAY(), SECOND(3));
It is recommended to add a star import to your table programs for having a fluent API:
import org.apache.flink.table.api.DataTypes._
val t: DataType = INTERVAL(DAY(), SECOND(3))
from pyflink.table.types import DataTypes
t = DataTypes.INTERVAL(DataTypes.DAY(), DataTypes.SECOND(3))
Physical Hints #
Physical hints are required at the edges of the table ecosystem where the SQL-based type system ends and programming-specific data types are required. Hints indicate the data format that an implementation expects.
For example, a data source could express that it produces values for logical TIMESTAMP
s using a java.sql.Timestamp
class
instead of using java.time.LocalDateTime
which would be the default. With this information, the runtime is able to convert
the produced class into its internal data format. In return, a data sink can declare the data format it consumes from the runtime.
Here are some examples of how to declare a bridging conversion class:
// tell the runtime to not produce or consume java.time.LocalDateTime instances
// but java.sql.Timestamp
DataType t = DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class);
// tell the runtime to not produce or consume boxed integer arrays
// but primitive int arrays
DataType t = DataTypes.ARRAY(DataTypes.INT().notNull()).bridgedTo(int[].class);
// tell the runtime to not produce or consume java.time.LocalDateTime instances
// but java.sql.Timestamp
val t: DataType = DataTypes.TIMESTAMP(3).bridgedTo(classOf[java.sql.Timestamp])
// tell the runtime to not produce or consume boxed integer arrays
// but primitive int arrays
val t: DataType = DataTypes.ARRAY(DataTypes.INT().notNull()).bridgedTo(classOf[Array[Int]])
Attention Please note that physical hints are usually only required if the
API is extended. Users of predefined sources/sinks/functions do not need to define such hints. Hints within
a table program (e.g. field.cast(TIMESTAMP(3).bridgedTo(Timestamp.class))
) are ignored.
List of Data Types #
This section lists all pre-defined data types.
org.apache.flink.table.api.DataTypes
.pyflink.table.types.DataTypes
.The default planner supports the following set of SQL types:
Data Type | Remarks for Data Type |
---|---|
CHAR |
|
VARCHAR |
|
STRING |
|
BOOLEAN |
|
BINARY |
|
VARBINARY |
|
BYTES |
|
DECIMAL |
Supports fixed precision and scale. |
TINYINT |
|
SMALLINT |
|
INTEGER |
|
BIGINT |
|
FLOAT |
|
DOUBLE |
|
DATE |
|
TIME |
Supports only a precision of 0 . |
TIMESTAMP |
|
TIMESTAMP_LTZ |
|
INTERVAL |
Supports only interval of MONTH and SECOND(3) . |
ARRAY |
|
MULTISET |
|
MAP |
|
ROW |
|
RAW |
|
Structured types | Only exposed in user-defined functions yet. |
Character Strings #
CHAR
#
Data type of a fixed-length character string.
Declaration
CHAR
CHAR(n)
DataTypes.CHAR(n)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.String |
X | X | Default |
byte[] |
X | X | Assumes UTF-8 encoding. |
org.apache.flink.table.data.StringData |
X | X | Internal data structure. |
Not supported.
The type can be declared using CHAR(n)
where n
is the number of code points. n
must have a value between 1
and 2,147,483,647
(both inclusive). If no length is specified, n
is equal to 1
.
VARCHAR
/ STRING
#
Data type of a variable-length character string.
Declaration
VARCHAR
VARCHAR(n)
STRING
DataTypes.VARCHAR(n)
DataTypes.STRING()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.String |
X | X | Default |
byte[] |
X | X | Assumes UTF-8 encoding. |
org.apache.flink.table.data.StringData |
X | X | Internal data structure. |
DataTypes.VARCHAR(n)
DataTypes.STRING()
Attention The specified maximum number of code points n
in DataTypes.VARCHAR(n)
must be 2,147,483,647
currently.
The type can be declared using VARCHAR(n)
where n
is the maximum number of code points. n
must have a value
between 1
and 2,147,483,647
(both inclusive). If no length is specified, n
is equal to 1
.
STRING
is a synonym for VARCHAR(2147483647)
.
Binary Strings #
BINARY
#
Data type of a fixed-length binary string (=a sequence of bytes).
Declaration
BINARY
BINARY(n)
DataTypes.BINARY(n)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
byte[] |
X | X | Default |
Not supported.
The type can be declared using BINARY(n)
where n
is the number of bytes. n
must have a value
between 1
and 2,147,483,647
(both inclusive). If no length is specified, n
is equal to 1
.
VARBINARY
/ BYTES
#
Data type of a variable-length binary string (=a sequence of bytes).
Declaration
VARBINARY
VARBINARY(n)
BYTES
DataTypes.VARBINARY(n)
DataTypes.BYTES()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
byte[] |
X | X | Default |
DataTypes.VARBINARY(n)
DataTypes.BYTES()
Attention The specified maximum number of bytes n
in DataTypes.VARBINARY(n)
must be 2,147,483,647
currently.
The type can be declared using VARBINARY(n)
where n
is the maximum number of bytes. n
must
have a value between 1
and 2,147,483,647
(both inclusive). If no length is specified, n
is
equal to 1
.
BYTES
is a synonym for VARBINARY(2147483647)
.
Exact Numerics #
DECIMAL
#
Data type of a decimal number with fixed precision and scale.
Declaration
DECIMAL
DECIMAL(p)
DECIMAL(p, s)
DEC
DEC(p)
DEC(p, s)
NUMERIC
NUMERIC(p)
NUMERIC(p, s)
DataTypes.DECIMAL(p, s)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.math.BigDecimal |
X | X | Default |
org.apache.flink.table.data.DecimalData |
X | X | Internal data structure. |
DataTypes.DECIMAL(p, s)
Attention The precision
and scale
specified in DataTypes.DECIMAL(p, s)
must be 38
and 18
separately currently.
The type can be declared using DECIMAL(p, s)
where p
is the number of digits in a
number (precision) and s
is the number of digits to the right of the decimal point
in a number (scale). p
must have a value between 1
and 38
(both inclusive). s
must have a value between 0
and p
(both inclusive). The default value for p
is 10.
The default value for s
is 0
.
NUMERIC(p, s)
and DEC(p, s)
are synonyms for this type.
TINYINT
#
Data type of a 1-byte signed integer with values from -128
to 127
.
Declaration
TINYINT
DataTypes.TINYINT()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Byte |
X | X | Default |
byte |
X | (X) | Output only if type is not nullable. |
DataTypes.TINYINT()
SMALLINT
#
Data type of a 2-byte signed integer with values from -32,768
to 32,767
.
Declaration
SMALLINT
DataTypes.SMALLINT()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Short |
X | X | Default |
short |
X | (X) | Output only if type is not nullable. |
DataTypes.SMALLINT()
INT
#
Data type of a 4-byte signed integer with values from -2,147,483,648
to 2,147,483,647
.
Declaration
INT
INTEGER
DataTypes.INT()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Integer |
X | X | Default |
int |
X | (X) | Output only if type is not nullable. |
DataTypes.INT()
INTEGER
is a synonym for this type.
BIGINT
#
Data type of an 8-byte signed integer with values from -9,223,372,036,854,775,808
to
9,223,372,036,854,775,807
.
Declaration
BIGINT
DataTypes.BIGINT()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Long |
X | X | Default |
long |
X | (X) | Output only if type is not nullable. |
DataTypes.BIGINT()
Approximate Numerics #
FLOAT
#
Data type of a 4-byte single precision floating point number.
Compared to the SQL standard, the type does not take parameters.
Declaration
FLOAT
DataTypes.FLOAT()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Float |
X | X | Default |
float |
X | (X) | Output only if type is not nullable. |
DataTypes.FLOAT()
DOUBLE
#
Data type of an 8-byte double precision floating point number.
Declaration
DOUBLE
DOUBLE PRECISION
DataTypes.DOUBLE()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Double |
X | X | Default |
double |
X | (X) | Output only if type is not nullable. |
DataTypes.DOUBLE()
DOUBLE PRECISION
is a synonym for this type.
Date and Time #
DATE
#
Data type of a date consisting of year-month-day
with values ranging from 0000-01-01
to 9999-12-31
.
Compared to the SQL standard, the range starts at year 0000
.
Declaration
DATE
DataTypes.DATE()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.LocalDate |
X | X | Default |
java.sql.Date |
X | X | |
java.lang.Integer |
X | X | Describes the number of days since epoch. |
int |
X | (X) | Describes the number of days since epoch. Output only if type is not nullable. |
DataTypes.DATE()
TIME
#
Data type of a time without time zone consisting of hour:minute:second[.fractional]
with
up to nanosecond precision and values ranging from 00:00:00.000000000
to
23:59:59.999999999
.
23:59:60
and 23:59:61
) are not supported as
the semantics are closer to java.time.LocalTime
. A time with time zone is not provided.23:59:60
and 23:59:61
) are not supported.
A time with time zone is not provided.Declaration
TIME
TIME(p)
DataTypes.TIME(p)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.LocalTime |
X | X | Default |
java.sql.Time |
X | X | |
java.lang.Integer |
X | X | Describes the number of milliseconds of the day. |
int |
X | (X) | Describes the number of milliseconds of the day. Output only if type is not nullable. |
java.lang.Long |
X | X | Describes the number of nanoseconds of the day. |
long |
X | (X) | Describes the number of nanoseconds of the day. Output only if type is not nullable. |
DataTypes.TIME(p)
Attention The precision
specified in DataTypes.TIME(p)
must be 0
currently.
The type can be declared using TIME(p)
where p
is the number of digits of fractional
seconds (precision). p
must have a value between 0
and 9
(both inclusive). If no
precision is specified, p
is equal to 0
.
TIMESTAMP
#
Data type of a timestamp without time zone consisting of year-month-day hour:minute:second[.fractional]
with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000
to
9999-12-31 23:59:59.999999999
.
Compared to the SQL standard, leap seconds (23:59:60
and 23:59:61
) are not supported as
the semantics are closer to java.time.LocalDateTime
.
A conversion from and to BIGINT
(a JVM long
type) is not supported as this would imply a time
zone. However, this type is time zone free. For more java.time.Instant
-like semantics use
TIMESTAMP_LTZ
.
Compared to the SQL standard, leap seconds (23:59:60
and 23:59:61
) are not supported.
A conversion from and to BIGINT
is not supported as this would imply a time zone.
However, this type is time zone free. If you have such a requirement please use TIMESTAMP_LTZ
.
Declaration
TIMESTAMP
TIMESTAMP(p)
TIMESTAMP WITHOUT TIME ZONE
TIMESTAMP(p) WITHOUT TIME ZONE
DataTypes.TIMESTAMP(p)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.LocalDateTime |
X | X | Default |
java.sql.Timestamp |
X | X | |
org.apache.flink.table.data.TimestampData |
X | X | Internal data structure. |
DataTypes.TIMESTAMP(p)
Attention The precision
specified in DataTypes.TIMESTAMP(p)
must be 3
currently.
The type can be declared using TIMESTAMP(p)
where p
is the number of digits of fractional
seconds (precision). p
must have a value between 0
and 9
(both inclusive). If no precision
is specified, p
is equal to 6
.
TIMESTAMP(p) WITHOUT TIME ZONE
is a synonym for this type.
TIMESTAMP WITH TIME ZONE
#
Data type of a timestamp with time zone consisting of year-month-day hour:minute:second[.fractional] zone
with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000 +14:59
to
9999-12-31 23:59:59.999999999 -14:59
.
23:59:60
and 23:59:61
) are not supported as the semantics
are closer to java.time.OffsetDateTime
.23:59:60
and 23:59:61
) are not supported.Compared to TIMESTAMP_LTZ
, the time zone offset information is physically
stored in every datum. It is used individually for every computation, visualization, or communication
to external systems.
Declaration
TIMESTAMP WITH TIME ZONE
TIMESTAMP(p) WITH TIME ZONE
DataTypes.TIMESTAMP_WITH_TIME_ZONE(p)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.OffsetDateTime |
X | X | Default |
java.time.ZonedDateTime |
X | Ignores the zone ID. |
Not supported.
TIMESTAMP(p) WITH TIME ZONE
where p
is the number of digits of
fractional seconds (precision). p
must have a value between 0
and 9
(both inclusive). If no
precision is specified, p
is equal to 6
.
TIMESTAMP_LTZ
#
Data type of a timestamp with local time zone consisting of year-month-day hour:minute:second[.fractional] zone
with up to nanosecond precision and values ranging from 0000-01-01 00:00:00.000000000 +14:59
to
9999-12-31 23:59:59.999999999 -14:59
.
Leap seconds (23:59:60
and 23:59:61
) are not supported as the semantics are closer to java.time.OffsetDateTime
.
Compared to TIMESTAMP WITH TIME ZONE
, the time zone offset information is not stored physically
in every datum. Instead, the type assumes java.time.Instant
semantics in UTC time zone at
the edges of the table ecosystem. Every datum is interpreted in the local time zone configured in
the current session for computation and visualization.
Leap seconds (23:59:60
and 23:59:61
) are not supported.
Compared to TIMESTAMP WITH TIME ZONE
, the time zone offset information is not stored physically
in every datum.
Every datum is interpreted in the local time zone configured in the current session for computation and visualization.
This type fills the gap between time zone free and time zone mandatory timestamp types by allowing the interpretation of UTC timestamps according to the configured session time zone.
Declaration
TIMESTAMP_LTZ
TIMESTAMP_LTZ(p)
TIMESTAMP WITH LOCAL TIME ZONE
TIMESTAMP(p) WITH LOCAL TIME ZONE
DataTypes.TIMESTAMP_LTZ(p)
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(p)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.Instant |
X | X | Default |
java.lang.Integer |
X | X | Describes the number of seconds since epoch. |
int |
X | (X) | Describes the number of seconds since epoch. Output only if type is not nullable. |
java.lang.Long |
X | X | Describes the number of milliseconds since epoch. |
long |
X | (X) | Describes the number of milliseconds since epoch. Output only if type is not nullable. |
java.sql.Timestamp |
X | X | Describes the number of milliseconds since epoch. |
org.apache.flink.table.data.TimestampData |
X | X | Internal data structure. |
DataTypes.TIMESTAMP_LTZ(p)
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(p)
Attention The precision
specified in DataTypes.TIMESTAMP_LTZ(p)
must be 3
currently.
The type can be declared using TIMESTAMP_LTZ(p)
where p
is the number
of digits of fractional seconds (precision). p
must have a value between 0
and 9
(both inclusive). If no precision is specified, p
is equal to 6
.
TIMESTAMP(p) WITH LOCAL TIME ZONE
is a synonym for this type.
INTERVAL YEAR TO MONTH
#
Data type for a group of year-month interval types.
The type must be parameterized to one of the following resolutions:
- interval of years,
- interval of years to months,
- or interval of months.
An interval of year-month consists of +years-months
with values ranging from -9999-11
to
+9999-11
.
The value representation is the same for all types of resolutions. For example, an interval
of months of 50 is always represented in an interval-of-years-to-months format (with default
year precision): +04-02
.
Declaration
INTERVAL YEAR
INTERVAL YEAR(p)
INTERVAL YEAR(p) TO MONTH
INTERVAL MONTH
DataTypes.INTERVAL(DataTypes.YEAR())
DataTypes.INTERVAL(DataTypes.YEAR(p))
DataTypes.INTERVAL(DataTypes.YEAR(p), DataTypes.MONTH())
DataTypes.INTERVAL(DataTypes.MONTH())
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.Period |
X | X | Ignores the days part. Default |
java.lang.Integer |
X | X | Describes the number of months. |
int |
X | (X) | Describes the number of months. Output only if type is not nullable. |
DataTypes.INTERVAL(DataTypes.YEAR())
DataTypes.INTERVAL(DataTypes.YEAR(p))
DataTypes.INTERVAL(DataTypes.YEAR(p), DataTypes.MONTH())
DataTypes.INTERVAL(DataTypes.MONTH())
The type can be declared using the above combinations where p
is the number of digits of years
(year precision). p
must have a value between 1
and 4
(both inclusive). If no year precision
is specified, p
is equal to 2
.
INTERVAL DAY TO SECOND
#
Data type for a group of day-time interval types.
The type must be parameterized to one of the following resolutions with up to nanosecond precision:
- interval of days,
- interval of days to hours,
- interval of days to minutes,
- interval of days to seconds,
- interval of hours,
- interval of hours to minutes,
- interval of hours to seconds,
- interval of minutes,
- interval of minutes to seconds,
- or interval of seconds.
An interval of day-time consists of +days hours:months:seconds.fractional
with values ranging from
-999999 23:59:59.999999999
to +999999 23:59:59.999999999
. The value representation is the same
for all types of resolutions. For example, an interval of seconds of 70 is always represented in
an interval-of-days-to-seconds format (with default precisions): +00 00:01:10.000000
.
Declaration
INTERVAL DAY
INTERVAL DAY(p1)
INTERVAL DAY(p1) TO HOUR
INTERVAL DAY(p1) TO MINUTE
INTERVAL DAY(p1) TO SECOND(p2)
INTERVAL HOUR
INTERVAL HOUR TO MINUTE
INTERVAL HOUR TO SECOND(p2)
INTERVAL MINUTE
INTERVAL MINUTE TO SECOND(p2)
INTERVAL SECOND
INTERVAL SECOND(p2)
DataTypes.INTERVAL(DataTypes.DAY())
DataTypes.INTERVAL(DataTypes.DAY(p1))
DataTypes.INTERVAL(DataTypes.DAY(p1), DataTypes.HOUR())
DataTypes.INTERVAL(DataTypes.DAY(p1), DataTypes.MINUTE())
DataTypes.INTERVAL(DataTypes.DAY(p1), DataTypes.SECOND(p2))
DataTypes.INTERVAL(DataTypes.HOUR())
DataTypes.INTERVAL(DataTypes.HOUR(), DataTypes.MINUTE())
DataTypes.INTERVAL(DataTypes.HOUR(), DataTypes.SECOND(p2))
DataTypes.INTERVAL(DataTypes.MINUTE())
DataTypes.INTERVAL(DataTypes.MINUTE(), DataTypes.SECOND(p2))
DataTypes.INTERVAL(DataTypes.SECOND())
DataTypes.INTERVAL(DataTypes.SECOND(p2))
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.time.Duration |
X | X | Default |
java.lang.Long |
X | X | Describes the number of milliseconds. |
long |
X | (X) | Describes the number of milliseconds. Output only if type is not nullable. |
DataTypes.INTERVAL(DataTypes.DAY())
DataTypes.INTERVAL(DataTypes.DAY(p1))
DataTypes.INTERVAL(DataTypes.DAY(p1), DataTypes.HOUR())
DataTypes.INTERVAL(DataTypes.DAY(p1), DataTypes.MINUTE())
DataTypes.INTERVAL(DataTypes.DAY(p1), DataTypes.SECOND(p2))
DataTypes.INTERVAL(DataTypes.HOUR())
DataTypes.INTERVAL(DataTypes.HOUR(), DataTypes.MINUTE())
DataTypes.INTERVAL(DataTypes.HOUR(), DataTypes.SECOND(p2))
DataTypes.INTERVAL(DataTypes.MINUTE())
DataTypes.INTERVAL(DataTypes.MINUTE(), DataTypes.SECOND(p2))
DataTypes.INTERVAL(DataTypes.SECOND())
DataTypes.INTERVAL(DataTypes.SECOND(p2))
The type can be declared using the above combinations where p1
is the number of digits of days
(day precision) and p2
is the number of digits of fractional seconds (fractional precision).
p1
must have a value between 1
and 6
(both inclusive). p2
must have a value between 0
and 9
(both inclusive). If no p1
is specified, it is equal to 2
by default. If no p2
is
specified, it is equal to 6
by default.
Constructured Data Types #
ARRAY
#
Data type of an array of elements with same subtype.
Compared to the SQL standard, the maximum cardinality of an array cannot be specified but is
fixed at 2,147,483,647
. Also, any valid type is supported as a subtype.
Declaration
ARRAY<t>
t ARRAY
DataTypes.ARRAY(t)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
t[] |
(X) | (X) | Depends on the subtype. Default |
java.util.List<t> |
X | X | |
subclass of java.util.List<t> |
X | ||
org.apache.flink.table.data.ArrayData |
X | X | Internal data structure. |
DataTypes.ARRAY(t)
The type can be declared using ARRAY<t>
where t
is the data type of the contained
elements.
t ARRAY
is a synonym for being closer to the SQL standard. For example, INT ARRAY
is
equivalent to ARRAY<INT>
.
MAP
#
Data type of an associative array that maps keys (including NULL
) to values (including NULL
). A map
cannot contain duplicate keys; each key can map to at most one value.
There is no restriction of element types; it is the responsibility of the user to ensure uniqueness.
The map type is an extension to the SQL standard.
Declaration
MAP<kt, vt>
DataTypes.MAP(kt, vt)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.util.Map<kt, vt> |
X | X | Default |
subclass of java.util.Map<kt, vt> |
X | ||
org.apache.flink.table.data.MapData |
X | X | Internal data structure. |
DataTypes.MAP(kt, vt)
The type can be declared using MAP<kt, vt>
where kt
is the data type of the key elements
and vt
is the data type of the value elements.
MULTISET
#
Data type of a multiset (=bag). Unlike a set, it allows for multiple instances for each of its
elements with a common subtype. Each unique value (including NULL
) is mapped to some multiplicity.
There is no restriction of element types; it is the responsibility of the user to ensure uniqueness.
Declaration
MULTISET<t>
t MULTISET
DataTypes.MULTISET(t)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.util.Map<t, java.lang.Integer> |
X | X | Assigns each value to an integer multiplicity. Default |
subclass of java.util.Map<t, java.lang.Integer>> |
X | ||
org.apache.flink.table.data.MapData |
X | X | Internal data structure. |
DataTypes.MULTISET(t)
The type can be declared using MULTISET<t>
where t
is the data type
of the contained elements.
t MULTISET
is a synonym for being closer to the SQL standard. For example, INT MULTISET
is
equivalent to MULTISET<INT>
.
ROW
#
Data type of a sequence of fields.
A field consists of a field name, field type, and an optional description. The most specific type of a row of a table is a row type. In this case, each column of the row corresponds to the field of the row type that has the same ordinal position as the column.
Compared to the SQL standard, an optional field description simplifies the handling with complex structures.
A row type is similar to the STRUCT
type known from other non-standard-compliant frameworks.
Declaration
ROW<n0 t0, n1 t1, ...>
ROW<n0 t0 'd0', n1 t1 'd1', ...>
ROW(n0 t0, n1 t1, ...>
ROW(n0 t0 'd0', n1 t1 'd1', ...)
DataTypes.ROW(DataTypes.FIELD(n0, t0), DataTypes.FIELD(n1, t1), ...)
DataTypes.ROW(DataTypes.FIELD(n0, t0, d0), DataTypes.FIELD(n1, t1, d1), ...)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
org.apache.flink.types.Row |
X | X | Default |
org.apache.flink.table.data.RowData |
X | X | Internal data structure. |
DataTypes.ROW([DataTypes.FIELD(n0, t0), DataTypes.FIELD(n1, t1), ...])
DataTypes.ROW([DataTypes.FIELD(n0, t0, d0), DataTypes.FIELD(n1, t1, d1), ...])
The type can be declared using ROW<n0 t0 'd0', n1 t1 'd1', ...>
where n
is the unique name of
a field, t
is the logical type of a field, d
is the description of a field.
ROW(...)
is a synonym for being closer to the SQL standard. For example, ROW(myField INT, myOtherField BOOLEAN)
is
equivalent to ROW<myField INT, myOtherField BOOLEAN>
.
User-Defined Data Types #
Attention User-defined data types are not fully supported yet. They are currently (as of Flink 1.11) only exposed as unregistered structured types in parameters and return types of functions.
A structured type is similar to an object in an object-oriented programming language. It contains zero, one or more attributes. Each attribute consists of a name and a type.
There are two kinds of structured types:
-
Types that are stored in a catalog and are identified by a catalog identifier (like
cat.db.MyType
). Those are equal to the SQL standard definition of structured types. -
Anonymously defined, unregistered types (usually reflectively extracted) that are identified by an implementation class (like
com.myorg.model.MyType
). Those are useful when programmatically defining a table program. They enable reusing existing JVM classes without manually defining the schema of a data type again.
Registered Structured Types #
Currently, registered structured types are not supported. Thus, they cannot be stored in a catalog
or referenced in a CREATE TABLE
DDL.
Unregistered Structured Types #
Unregistered structured types can be created from regular POJOs (Plain Old Java Objects) using automatic reflective extraction.
The implementation class of a structured type must meet the following requirements:
- The class must be globally accessible which means it must be declared
public
,static
, and notabstract
. - The class must offer a default constructor with zero arguments or a full constructor that assigns all fields.
- All fields of the class must be readable by either
public
declaration or a getter that follows common coding style such asgetField()
,isField()
,field()
. - All fields of the class must be writable by either
public
declaration, fully assigning constructor, or a setter that follows common coding style such assetField(...)
,field(...)
. - All fields must be mapped to a data type either implicitly via reflective extraction or explicitly
using the
@DataTypeHint
annotations. - Fields that are declared
static
ortransient
are ignored.
The reflective extraction supports arbitrary nesting of fields as long as a field type does not (transitively) refer to itself.
The declared field class (e.g. public int age;
) must be contained in the list of supported JVM
bridging classes defined for every data type in this document (e.g. java.lang.Integer
or int
for INT
).
For some classes an annotation is required in order to map the class to a data type (e.g. @DataTypeHint("DECIMAL(10, 2)")
to assign a fixed precision and scale for java.math.BigDecimal
).
Declaration
class User {
// extract fields automatically
public int age;
public String name;
// enrich the extraction with precision information
public @DataTypeHint("DECIMAL(10, 2)") BigDecimal totalBalance;
// enrich the extraction with forcing using RAW types
public @DataTypeHint("RAW") Class<?> modelClass;
}
DataTypes.of(User.class);
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
class | X | X | Originating class or subclasses (for input) or superclasses (for output). Default |
org.apache.flink.types.Row |
X | X | Represent the structured type as a row. |
org.apache.flink.table.data.RowData |
X | X | Internal data structure. |
case class User(
// extract fields automatically
age: Int,
name: String,
// enrich the extraction with precision information
@DataTypeHint("DECIMAL(10, 2)") totalBalance: java.math.BigDecimal,
// enrich the extraction with forcing using a RAW type
@DataTypeHint("RAW") modelClass: Class[_]
)
DataTypes.of(classOf[User])
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
class | X | X | Originating class or subclasses (for input) or superclasses (for output). Default |
org.apache.flink.types.Row |
X | X | Represent the structured type as a row. |
org.apache.flink.table.data.RowData |
X | X | Internal data structure. |
Not supported.
Other Data Types #
BOOLEAN
#
Data type of a boolean with a (possibly) three-valued logic of TRUE
, FALSE
, and UNKNOWN
.
Declaration
BOOLEAN
DataTypes.BOOLEAN()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Boolean |
X | X | Default |
boolean |
X | (X) | Output only if type is not nullable. |
DataTypes.BOOLEAN()
RAW
#
Data type of an arbitrary serialized type. This type is a black box within the table ecosystem and is only deserialized at the edges.
The raw type is an extension to the SQL standard.
Declaration
RAW('class', 'snapshot')
DataTypes.RAW(class, serializer)
DataTypes.RAW(class)
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
class | X | X | Originating class or subclasses (for input) or superclasses (for output). Default |
byte[] |
X | ||
org.apache.flink.table.data.RawValueData |
X | X | Internal data structure. |
Not supported.
The type can be declared using RAW('class', 'snapshot')
where class
is the originating class and
snapshot
is the serialized TypeSerializerSnapshot
in Base64 encoding. Usually, the type string is not
declared directly but is generated while persisting the type.
In the API, the RAW
type can be declared either by directly supplying a Class
+ TypeSerializer
or
by passing Class
and letting the framework extract Class
+ TypeSerializer
from there.
NULL
#
Data type for representing untyped NULL
values.
The null type is an extension to the SQL standard. A null type has no other value
except NULL
, thus, it can be cast to any nullable type similar to JVM semantics.
This type helps in representing unknown types in API calls that use a NULL
literal
as well as bridging to formats such as JSON or Avro that define such a type as well.
This type is not very useful in practice and is just mentioned here for completeness.
Declaration
NULL
DataTypes.NULL()
Bridging to JVM Types
Java Type | Input | Output | Remarks |
---|---|---|---|
java.lang.Object |
X | X | Default |
any class | (X) | Any non-primitive type. |
Not supported.
CAST 方法 #
Flink Table API 和 Flink SQL 支持从 输入
数据类型 到 目标
数据类型的转换。有的转换
无论输入值是什么都能保证转换成功,而有些转换则会在运行时失败(即不可能转换为 目标
数据类型对应的值)。
例如,将 INT
数据类型的值转换为 STRING
数据类型一定能转换成功,但无法保证将 STRING
数据类型转换为 INT
数据类型。
在生成执行计划时,Flink 的 SQL 检查器会拒绝提交那些不可能直接转换为 目标
数据类型的SQL,并抛出 ValidationException
异常,
例如从 TIMESTAMP
类型转化到 INTERVAL
类型。
然而有些查询即使通过了 SQL 检查器的验证,依旧可能会在运行期间转换失败,这就需要用户正确处理这些失败了。
在 Flink Table API 和 Flink SQL 中,可以用下面两个内置方法来进行转换操作:
CAST
:定义在 SQL 标准的 CAST 方法。在某些容易发生转换失败的查询场景中,当实际输入数据不合法时,作业便会运行失败。类型推导会保留输入类型的可空性。TRY_CAST
:常规 CAST 方法的扩展,当转换失败时返回NULL
。该方法的返回值允许为空。
例如:
CAST('42' AS INT) --- 结果返回数字 42 的 INT 格式(非空)
CAST(NULL AS VARCHAR) --- 结果返回 VARCHAR 类型的空值
CAST('non-number' AS INT) --- 抛出异常,并停止作业
TRY_CAST('42' AS INT) --- 结果返回数字 42 的 INT 格式
TRY_CAST(NULL AS VARCHAR) --- 结果返回 VARCHAR 类型的空值
TRY_CAST('non-number' AS INT) --- 结果返回 INT 类型的空值
COALESCE(TRY_CAST('non-number' AS INT), 0) --- 结果返回数字 0 的 INT 格式(非空)
下表展示了各个类型的转换程度,“Y” 表示支持,"!" 表示转换可能会失败,“N” 表示不支持:
输入类型\目标类型 | CHAR ¹/VARCHAR ¹/STRING |
BINARY ¹/VARBINARY ¹/BYTES |
BOOLEAN |
DECIMAL |
TINYINT |
SMALLINT |
INTEGER |
BIGINT |
FLOAT |
DOUBLE |
DATE |
TIME |
TIMESTAMP |
TIMESTAMP_LTZ |
INTERVAL |
ARRAY |
MULTISET |
MAP |
ROW |
STRUCTURED |
RAW |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
CHAR /VARCHAR /STRING |
Y | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | ! | N | N | N | N | N | N | N |
BINARY /VARBINARY /BYTES |
Y | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N |
BOOLEAN |
Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N | N | N | N | N | N | N | N | N |
DECIMAL |
Y | N | N | Y | Y | Y | Y | Y | Y | Y | N | N | N | N | N | N | N | N | N | N | N |
TINYINT |
Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N² | N² | N | N | N | N | N | N | N |
SMALLINT |
Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N² | N² | N | N | N | N | N | N | N |
INTEGER |
Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N² | N² | Y⁵ | N | N | N | N | N | N |
BIGINT |
Y | N | Y | Y | Y | Y | Y | Y | Y | Y | N | N | N² | N² | Y⁶ | N | N | N | N | N | N |
FLOAT |
Y | N | N | Y | Y | Y | Y | Y | Y | Y | N | N | N | N | N | N | N | N | N | N | N |
DOUBLE |
Y | N | N | Y | Y | Y | Y | Y | Y | Y | N | N | N | N | N | N | N | N | N | N | N |
DATE |
Y | N | N | N | N | N | N | N | N | N | Y | N | Y | Y | N | N | N | N | N | N | N |
TIME |
Y | N | N | N | N | N | N | N | N | N | N | Y | Y | Y | N | N | N | N | N | N | N |
TIMESTAMP |
Y | N | N | N | N | N | N | N | N | N | Y | Y | Y | Y | N | N | N | N | N | N | N |
TIMESTAMP_LTZ |
Y | N | N | N | N | N | N | N | N | N | Y | Y | Y | Y | N | N | N | N | N | N | N |
INTERVAL |
Y | N | N | N | N | N | Y⁵ | Y⁶ | N | N | N | N | N | N | Y | N | N | N | N | N | N |
ARRAY |
Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | !³ | N | N | N | N | N |
MULTISET |
Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | !³ | N | N | N | N |
MAP |
Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | !³ | N | N | N |
ROW |
Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | !³ | N | N |
STRUCTURED |
Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | !³ | N |
RAW |
Y | ! | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y⁴ |
备注:
- 所有转化到具有固长或变长的类型时会根据类型的定义来裁剪或填充数据。
- 使用
TO_TIMESTAMP
方法和TO_TIMESTAMP_LTZ
方法的场景,不要使用CAST
或TRY_CAST
。 - 支持转换,当且仅当用其内部数据结构也支持转化时。转换可能会失败,当且仅当用其内部数据结构也可能会转换失败。
- 支持转换,当且仅当用使用
RAW
的类和类的序列化器一样。 - 支持转换,当且仅当用使用
INTERVAL
做“月”到“年”的转换。 - 支持转换,当且仅当用使用
INTERVAL
做“天”到“时间”的转换。
请注意:无论是 CAST
还是 TRY_CAST
,当输入为 NULL
,输出也为 NULL
。
旧版本 CAST 方法 #
用户可以通过将参数 table.exec.legacy-cast-behaviour
设置为 enabled
来启用 1.15 版本之前的 CAST 行为。
在 Flink 1.15 版本此参数默认为 disabled。
如果设置为 enabled,请注意以下问题:
- 转换为
CHAR
/VARCHAR
/BINARY
/VARBINARY
数据类型时,不再自动修剪(trim)或填充(pad)。 - 使用
CAST
时不再会因为转化失败而停止作业,只会返回NULL
,但不会像TRY_CAST
那样推断正确的类型。 CHAR
/VARCHAR
/STRING
的转换结果会有一些细微的差别。
我们 不建议 配置此参数,而是 强烈建议 在新项目中保持这个参数为默认禁用,以使用最新版本的 CAST 方法。 在下一个版本,这个参数会被移除。
Data Type Extraction #
At many locations in the API, Flink tries to automatically extract data type from class information using reflection to avoid repetitive manual schema work. However, extracting a data type reflectively is not always successful because logical information might be missing. Therefore, it might be necessary to add additional information close to a class or field declaration for supporting the extraction logic.
The following table lists classes that can be implicitly mapped to a data type without requiring further information.
If you intend to implement classes in Scala, it is recommended to use boxed types (e.g. java.lang.Integer
)
instead of Scala’s primitives. Scala’s primitives (e.g. Int
or Double
) are compiled to JVM primitives (e.g.
int
/double
) and result in NOT NULL
semantics as shown in the table below. Furthermore, Scala primitives that
are used in generics (e.g. java.util.Map[Int, Double]
) are erased during compilation and lead to class
information similar to java.util.Map[java.lang.Object, java.lang.Object]
.
Class | Data Type |
---|---|
java.lang.String |
STRING |
java.lang.Boolean |
BOOLEAN |
boolean |
BOOLEAN NOT NULL |
java.lang.Byte |
TINYINT |
byte |
TINYINT NOT NULL |
java.lang.Short |
SMALLINT |
short |
SMALLINT NOT NULL |
java.lang.Integer |
INT |
int |
INT NOT NULL |
java.lang.Long |
BIGINT |
long |
BIGINT NOT NULL |
java.lang.Float |
FLOAT |
float |
FLOAT NOT NULL |
java.lang.Double |
DOUBLE |
double |
DOUBLE NOT NULL |
java.sql.Date |
DATE |
java.time.LocalDate |
DATE |
java.sql.Time |
TIME(0) |
java.time.LocalTime |
TIME(9) |
java.sql.Timestamp |
TIMESTAMP(9) |
java.time.LocalDateTime |
TIMESTAMP(9) |
java.time.OffsetDateTime |
TIMESTAMP(9) WITH TIME ZONE |
java.time.Instant |
TIMESTAMP_LTZ(9) |
java.time.Duration |
INTERVAL SECOND(9) |
java.time.Period |
INTERVAL YEAR(4) TO MONTH |
byte[] |
BYTES |
T[] |
ARRAY<T> |
java.util.Map<K, V> |
MAP<K, V> |
structured type T |
anonymous structured type T |
Other JVM bridging classes mentioned in this document require a @DataTypeHint
annotation.
Data type hints can parameterize or replace the default extraction logic of individual function parameters
and return types, structured classes, or fields of structured classes. An implementer can choose to what
extent the default extraction logic should be modified by declaring a @DataTypeHint
annotation.
The @DataTypeHint
annotation provides a set of optional hint parameters. Some of those parameters are shown in the
following example. More information can be found in the documentation of the annotation class.
import org.apache.flink.table.annotation.DataTypeHint;
class User {
// defines an INT data type with a default conversion class `java.lang.Integer`
public @DataTypeHint("INT") Object o;
// defines a TIMESTAMP data type of millisecond precision with an explicit conversion class
public @DataTypeHint(value = "TIMESTAMP(3)", bridgedTo = java.sql.Timestamp.class) Object o;
// enrich the extraction with forcing using a RAW type
public @DataTypeHint("RAW") Class<?> modelClass;
// defines that all occurrences of java.math.BigDecimal (also in nested fields) will be
// extracted as DECIMAL(12, 2)
public @DataTypeHint(defaultDecimalPrecision = 12, defaultDecimalScale = 2) AccountStatement stmt;
// defines that whenever a type cannot be mapped to a data type, instead of throwing
// an exception, always treat it as a RAW type
public @DataTypeHint(allowRawGlobally = HintFlag.TRUE) ComplexModel model;
}
import org.apache.flink.table.annotation.DataTypeHint
class User {
// defines an INT data type with a default conversion class `java.lang.Integer`
@DataTypeHint("INT")
var o: AnyRef
// defines a TIMESTAMP data type of millisecond precision with an explicit conversion class
@DataTypeHint(value = "TIMESTAMP(3)", bridgedTo = java.sql.Timestamp.class)
var o: AnyRef
// enrich the extraction with forcing using a RAW type
@DataTypeHint("RAW")
var modelClass: Class[_]
// defines that all occurrences of java.math.BigDecimal (also in nested fields) will be
// extracted as DECIMAL(12, 2)
@DataTypeHint(defaultDecimalPrecision = 12, defaultDecimalScale = 2)
var stmt: AccountStatement
// defines that whenever a type cannot be mapped to a data type, instead of throwing
// an exception, always treat it as a RAW type
@DataTypeHint(allowRawGlobally = HintFlag.TRUE)
var model: ComplexModel
}
Not supported.