################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
from enum import Enum
from typing import Dict, List, Union
from pyflink.common import typeinfo, Duration
from pyflink.common.serialization import DeserializationSchema, Encoder, SerializationSchema
from pyflink.common.typeinfo import RowTypeInfo, TypeInformation
from pyflink.datastream.functions import SourceFunction, SinkFunction, JavaFunctionWrapper
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray
from py4j.java_gateway import JavaObject
__all__ = [
'FileEnumeratorProvider',
'FileSink',
'FileSource',
'FileSourceBuilder',
'FileSplitAssignerProvider',
'FlinkKafkaConsumer',
'FlinkKafkaProducer',
'JdbcSink',
'JdbcConnectionOptions',
'JdbcExecutionOptions',
'NumberSequenceSource',
'OutputFileConfig',
'RollingPolicy',
'Sink',
'Source',
'StreamFormat',
'StreamingFileSink']
class FlinkKafkaConsumerBase(SourceFunction, abc.ABC):
"""
Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
all kafka versions.
The Kafka version specific behavior is defined mainly in the specific subclasses.
"""
def __init__(self, j_flink_kafka_consumer):
super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer)
def set_commit_offsets_on_checkpoints(self,
commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase':
"""
Specifies whether or not the consumer should commit offsets back to kafka on checkpoints.
This setting will only have effect if checkpointing is enabled for the job. If checkpointing
isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
property settings will be used.
"""
self._j_function = self._j_function \
.setCommitOffsetsOnCheckpoints(commit_on_checkpoints)
return self
def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase':
"""
Specifies the consumer to start reading from the earliest offset for all partitions. This
lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
"""
self._j_function = self._j_function.setStartFromEarliest()
return self
def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase':
"""
Specifies the consuer to start reading from the latest offset for all partitions. This lets
the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
"""
self._j_function = self._j_function.setStartFromLatest()
return self
def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase':
"""
Specifies the consumer to start reading partitions from a specified timestamp. The specified
timestamp must be before the current timestamp. This lets the consumer ignore any committed
group offsets in Zookeeper / Kafka brokers.
The consumer will look up the earliest offset whose timestamp is greater than or equal to
the specific timestamp from Kafka. If there's no such offset, the consumer will use the
latest offset to read data from Kafka.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
:param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for
epoch.
"""
self._j_function = self._j_function.setStartFromTimestamp(
startup_offsets_timestamp)
return self
def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase':
"""
Specifies the consumer to start reading from any committed group offsets found in Zookeeper/
Kafka brokers. The 'group.id' property must be set in the configuration properties. If no
offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the
configuration properties will be used for the partition.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
"""
self._j_function = self._j_function.setStartFromGroupOffsets()
return self
def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase':
"""
By default, when restoring from a checkpoint / savepoint, the consumer always ignores
restored partitions that are no longer associated with the current specified topics or topic
pattern to subscribe to.
This method does not affect where partitions are read from when the consumer is restored
from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
savepoint, only the offsets in the restored state will be used.
"""
self._j_function = self._j_function \
.disableFilterRestoredPartitionsWithSubscribedTopics()
return self
def get_produced_type(self) -> TypeInformation:
return typeinfo._from_java_type(self._j_function.getProducedType())
[docs]class FlinkKafkaConsumer(FlinkKafkaConsumerBase):
"""
The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
Apache Kafka. The consumer can run in multiple parallel instances, each of which will
pull data from one or more Kafka partitions.
The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
during a failure, and that the computation processes elements 'exactly once. (These guarantees
naturally assume that Kafka itself does not lose any data.)
Please note that Flink snapshots the offsets internally as part of its distributed checkpoints.
The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in
sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of
how far the Flink Kafka consumer has consumed a topic.
Please refer to Kafka's documentation for the available configuration properties:
http://kafka.apache.org/documentation.html#newconsumerconfigs
"""
def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema,
properties: Dict):
"""
Creates a new Kafka streaming source consumer for Kafka 0.10.x.
This constructor allows passing multiple topics to the consumer.
:param topics: The Kafka topics to read from.
:param deserialization_schema: The de-/serializer used to convert between Kafka's byte
messages and Flink's objects.
:param properties: The properties that are used to configure both the fetcher and the offset
handler.
"""
JFlinkKafkaConsumer = get_gateway().jvm \
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
JFlinkKafkaConsumer)
super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)
class FlinkKafkaProducerBase(SinkFunction, abc.ABC):
"""
Flink Sink to produce data into a Kafka topic.
Please note that this producer provides at-least-once reliability guarantees when checkpoints
are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any
reliability guarantees.
"""
def __init__(self, j_flink_kafka_producer):
super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer)
def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase':
"""
Defines whether the producer should fail on errors, or only log them. If this is set to
true, then exceptions will be only logged, if set to false, exceptions will be eventually
thrown and cause the streaming program to fail (and enter recovery).
:param log_failures_only: The flag to indicate logging-only on exceptions.
"""
self._j_function.setLogFailuresOnly(log_failures_only)
return self
def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase':
"""
If set to true, the Flink producer will wait for all outstanding messages in the Kafka
buffers to be acknowledged by the Kafka producer on a checkpoint.
This way, the producer can guarantee that messages in the Kafka buffers are part of the
checkpoint.
:param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint)
"""
self._j_function.setFlushOnCheckpoint(flush_on_checkpoint)
return self
def set_write_timestamp_to_kafka(self,
write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase':
"""
If set to true, Flink will write the (event time) timestamp attached to each record into
Kafka. Timestamps must be positive for Kafka to accept them.
:param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written
to Kafka.
"""
self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka)
return self
class Semantic(Enum):
"""
Semantics that can be chosen.
:data: `EXACTLY_ONCE`:
The Flink producer will write all messages in a Kafka transaction that will be committed to
the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of
FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is
being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint
complete notifications are running late, FlinkKafkaProducer can run out of
FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot-
State() requests will fail and the FlinkKafkaProducer will keep using the
FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints
there are four options:
1. decrease number of max concurrent checkpoints
2. make checkpoints mre reliable (so that they complete faster)
3. increase delay between checkpoints
4. increase size of FlinkKafkaProducers pool
:data: `AT_LEAST_ONCE`:
The Flink producer will wait for all outstanding messages in the Kafka buffers to be
acknowledged by the Kafka producer on a checkpoint.
:data: `NONE`:
Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of
failure.
"""
EXACTLY_ONCE = 0,
AT_LEAST_ONCE = 1,
NONE = 2
def _to_j_semantic(self):
JSemantic = get_gateway().jvm \
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
return getattr(JSemantic, self.name)
[docs]class FlinkKafkaProducer(FlinkKafkaProducerBase):
"""
Flink Sink to produce data into a Kafka topic. By
default producer will use AT_LEAST_ONCE semantic. Before using EXACTLY_ONCE please refer to
Flink's Kafka connector documentation.
"""
def __init__(self, topic: str, serialization_schema: SerializationSchema,
producer_config: Dict, kafka_producer_pool_size: int = 5,
semantic=Semantic.AT_LEAST_ONCE):
"""
Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic.
Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner.
This default partitioner maps each sink subtask to a single Kafka partition (i.e. all
records received by a sink subtask will end up in the same Kafka partition).
:param topic: ID of the Kafka topic.
:param serialization_schema: User defined key-less serialization schema.
:param producer_config: Properties with the producer configuration.
"""
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in producer_config.items():
j_properties.setProperty(key, value)
JFlinkKafkaProducer = gateway.jvm \
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
j_flink_kafka_producer = JFlinkKafkaProducer(
topic, serialization_schema._j_serialization_schema, j_properties, None,
semantic._to_j_semantic(), kafka_producer_pool_size)
super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer)
[docs] def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer':
"""
Disables the propagation of exceptions thrown when committing presumably timed out Kafka
transactions during recovery of the job. If a Kafka transaction is timed out, a commit will
never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions
will still be logged to inform the user that data loss might have occurred.
Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover,
only exceptions thrown during the recovery are caught, i.e., the producer will attempt at
least one commit of the transaction before giving up.
:return: This FlinkKafkaProducer.
"""
self._j_function.ignoreFailuresAfterTransactionTimeout()
return self
def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz):
if not isinstance(topics, list):
topics = [topics]
gateway = get_gateway()
j_properties = gateway.jvm.java.util.Properties()
for key, value in properties.items():
j_properties.setProperty(key, value)
j_flink_kafka_consumer = j_consumer_clz(topics,
deserialization_schema._j_deserialization_schema,
j_properties)
return j_flink_kafka_consumer
[docs]class JdbcSink(SinkFunction):
def __init__(self, j_jdbc_sink):
super(JdbcSink, self).__init__(sink_func=j_jdbc_sink)
[docs] @staticmethod
def sink(sql: str, type_info: RowTypeInfo, jdbc_connection_options: 'JdbcConnectionOptions',
jdbc_execution_options: 'JdbcExecutionOptions' = None):
"""
Create a JDBC sink.
:param sql: arbitrary DML query (e.g. insert, update, upsert)
:param type_info: A RowTypeInfo for query field types.
:param jdbc_execution_options: parameters of execution, such as batch size and maximum
retries.
:param jdbc_connection_options: parameters of connection, such as JDBC URL.
:return: A JdbcSink.
"""
sql_types = []
gateway = get_gateway()
JJdbcTypeUtil = gateway.jvm.org.apache.flink.connector.jdbc.utils.JdbcTypeUtil
for field_type in type_info.get_field_types():
sql_types.append(JJdbcTypeUtil
.typeInformationToSqlType(field_type.get_java_type_info()))
j_sql_type = to_jarray(gateway.jvm.int, sql_types)
output_format_clz = gateway.jvm.Class\
.forName('org.apache.flink.connector.jdbc.internal.JdbcOutputFormat', False,
get_gateway().jvm.Thread.currentThread().getContextClassLoader())
j_int_array_type = to_jarray(gateway.jvm.int, []).getClass()
j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder',
to_jarray(gateway.jvm.Class,
[j_int_array_type]))
j_builder_method.setAccessible(True)
j_statement_builder = j_builder_method.invoke(None, to_jarray(gateway.jvm.Object,
[j_sql_type]))
jdbc_execution_options = jdbc_execution_options if jdbc_execution_options is not None \
else JdbcExecutionOptions.defaults()
j_jdbc_sink = gateway.jvm.org.apache.flink.connector.jdbc.JdbcSink\
.sink(sql, j_statement_builder, jdbc_execution_options._j_jdbc_execution_options,
jdbc_connection_options._j_jdbc_connection_options)
return JdbcSink(j_jdbc_sink=j_jdbc_sink)
[docs]class JdbcConnectionOptions(object):
"""
JDBC connection options.
"""
def __init__(self, j_jdbc_connection_options):
self._j_jdbc_connection_options = j_jdbc_connection_options
[docs] def get_db_url(self) -> str:
return self._j_jdbc_connection_options.getDbURL()
[docs] def get_driver_name(self) -> str:
return self._j_jdbc_connection_options.getDriverName()
[docs] def get_user_name(self) -> str:
return self._j_jdbc_connection_options.getUsername()
[docs] def get_password(self) -> str:
return self._j_jdbc_connection_options.getPassword()
[docs] class JdbcConnectionOptionsBuilder(object):
"""
Builder for JdbcConnectionOptions.
"""
def __init__(self):
self._j_options_builder = get_gateway().jvm.org.apache.flink.connector\
.jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
[docs] def with_url(self, url: str) -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder':
self._j_options_builder.withUrl(url)
return self
[docs] def with_driver_name(self, driver_name: str) \
-> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder':
self._j_options_builder.withDriverName(driver_name)
return self
[docs] def with_user_name(self, user_name: str) \
-> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder':
self._j_options_builder.withUsername(user_name)
return self
[docs] def with_password(self, password: str) \
-> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder':
self._j_options_builder.withPassword(password)
return self
[docs] def build(self) -> 'JdbcConnectionOptions':
return JdbcConnectionOptions(j_jdbc_connection_options=self._j_options_builder.build())
[docs]class JdbcExecutionOptions(object):
"""
JDBC sink batch options.
"""
def __init__(self, j_jdbc_execution_options):
self._j_jdbc_execution_options = j_jdbc_execution_options
[docs] def get_batch_interval_ms(self) -> int:
return self._j_jdbc_execution_options.getBatchIntervalMs()
[docs] def get_batch_size(self) -> int:
return self._j_jdbc_execution_options.getBatchSize()
[docs] def get_max_retries(self) -> int:
return self._j_jdbc_execution_options.getMaxRetries()
[docs] @staticmethod
def defaults() -> 'JdbcExecutionOptions':
return JdbcExecutionOptions(
j_jdbc_execution_options=get_gateway().jvm
.org.apache.flink.connector.jdbc.JdbcExecutionOptions.defaults())
[docs] @staticmethod
def builder() -> 'Builder':
return JdbcExecutionOptions.Builder()
[docs] class Builder(object):
"""
Builder for JdbcExecutionOptions.
"""
def __init__(self):
self._j_builder = get_gateway().jvm\
.org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder()
[docs] def with_batch_size(self, size: int) -> 'JdbcExecutionOptions.Builder':
self._j_builder.withBatchSize(size)
return self
[docs] def with_batch_interval_ms(self, interval_ms: int) -> 'JdbcExecutionOptions.Builder':
self._j_builder.withBatchIntervalMs(interval_ms)
return self
[docs] def with_max_retries(self, max_retries: int) -> 'JdbcExecutionOptions.Builder':
self._j_builder.withMaxRetries(max_retries)
return self
[docs] def build(self) -> 'JdbcExecutionOptions':
return JdbcExecutionOptions(j_jdbc_execution_options=self._j_builder.build())
[docs]class RollingPolicy(object):
"""
The policy based on which a Bucket in the FileSink rolls its currently
open part file and opens a new one.
"""
def __init__(self, j_rolling_policy):
self._j_rolling_policy = j_rolling_policy
[docs] @staticmethod
def default_rolling_policy(
part_size: int = 1024 * 1024 * 128,
rollover_interval: int = 60 * 1000,
inactivity_interval: int = 60 * 1000) -> 'RollingPolicy':
"""
Returns the default implementation of the RollingPolicy.
This policy rolls a part file if:
- there is no open part file,
- the current file has reached the maximum bucket size (by default 128MB),
- the current file is older than the roll over interval (by default 60 sec), or
- the current file has not been written to for more than the allowed inactivityTime (by
default 60 sec).
:param part_size: The maximum part file size before rolling.
:param rollover_interval: The maximum time duration a part file can stay open before
rolling.
:param inactivity_interval: The time duration of allowed inactivity after which a part file
will have to roll.
"""
JDefaultRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions.\
sink.filesystem.rollingpolicies.DefaultRollingPolicy
j_rolling_policy = JDefaultRollingPolicy.builder()\
.withMaxPartSize(part_size) \
.withRolloverInterval(rollover_interval) \
.withInactivityInterval(inactivity_interval) \
.build()
return RollingPolicy(j_rolling_policy)
[docs] @staticmethod
def on_checkpoint_rolling_policy() -> 'RollingPolicy':
"""
Returns a RollingPolicy which rolls (ONLY) on every checkpoint.
"""
JOnCheckpointRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions. \
sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy
return RollingPolicy(JOnCheckpointRollingPolicy.build())
class BucketAssigner(object):
"""
A BucketAssigner is used with a file sink to determine the bucket each incoming element should
be put into.
The StreamingFileSink can be writing to many buckets at a time, and it is responsible
for managing a set of active buckets. Whenever a new element arrives it will ask the
BucketAssigner for the bucket the element should fall in. The BucketAssigner can, for
example, determine buckets based on system time.
"""
def __init__(self, j_bucket_assigner):
self._j_bucket_assigner = j_bucket_assigner
@staticmethod
def base_path_bucket_assigner() -> 'BucketAssigner':
"""
Creates a BucketAssigner that does not perform any bucketing of files. All files are
written to the base path.
"""
return BucketAssigner(get_gateway().jvm.org.apache.flink.streaming.api.functions.sink.
filesystem.bucketassigners.BasePathBucketAssigner())
@staticmethod
def date_time_bucket_assigner(format_str: str = "yyyy-MM-dd--HH", timezone_id: str = None):
"""
Creates a BucketAssigner that assigns to buckets based on current system time.
It will create directories of the following form: /{basePath}/{dateTimePath}/}.
The basePath is the path that was specified as a base path when creating the new bucket.
The dateTimePath is determined based on the current system time and the user provided format
string.
The Java DateTimeFormatter is used to derive a date string from the current system time and
the date format string. The default format string is "yyyy-MM-dd--HH" so the rolling files
will have a granularity of hours.
:param format_str: The format string used to determine the bucket id.
:param timezone_id: The timezone id, either an abbreviation such as "PST", a full name
such as "America/Los_Angeles", or a custom timezone_id such as
"GMT-08:00". Th e default time zone will b used if it's None.
"""
if timezone_id is not None and isinstance(timezone_id, str):
j_timezone = get_gateway().jvm.java.time.ZoneId.of(timezone_id)
else:
j_timezone = get_gateway().jvm.java.time.ZoneId.systemDefault()
return BucketAssigner(
get_gateway().jvm.org.apache.flink.streaming.api.functions.sink.
filesystem.bucketassigners.DateTimeBucketAssigner(format_str, j_timezone))
[docs]class StreamingFileSink(SinkFunction):
"""
Sink that emits its input elements to `FileSystem` files within buckets. This is
integrated with the checkpointing mechanism to provide exactly once semantics.
When creating the sink a `basePath` must be specified. The base directory contains
one directory for every bucket. The bucket directories themselves contain several part files,
with at least one for each parallel subtask of the sink which is writing data to that bucket.
These part files contain the actual output data.
"""
def __init__(self, j_obj):
super(StreamingFileSink, self).__init__(j_obj)
[docs]class OutputFileConfig(object):
"""
Part file name configuration.
This allow to define a prefix and a suffix to the part file name.
"""
[docs] @staticmethod
def builder():
return OutputFileConfig.OutputFileConfigBuilder()
def __init__(self, part_prefix: str, part_suffix: str):
self._j_output_file_config = get_gateway().jvm.org.apache.flink.streaming.api.\
functions.sink.filesystem.OutputFileConfig(part_prefix, part_suffix)
[docs] def get_part_prefix(self) -> str:
"""
The prefix for the part name.
"""
return self._j_output_file_config.getPartPrefix()
[docs] def get_part_suffix(self) -> str:
"""
The suffix for the part name.
"""
return self._j_output_file_config.getPartSuffix()
[docs] class OutputFileConfigBuilder(object):
"""
A builder to create the part file configuration.
"""
def __init__(self):
self.part_prefix = "part"
self.part_suffix = ""
[docs] def with_part_prefix(self, prefix) -> 'OutputFileConfig.OutputFileConfigBuilder':
self.part_prefix = prefix
return self
[docs] def with_part_suffix(self, suffix) -> 'OutputFileConfig.OutputFileConfigBuilder':
self.part_suffix = suffix
return self
[docs] def build(self) -> 'OutputFileConfig':
return OutputFileConfig(self.part_prefix, self.part_suffix)
[docs]class Source(JavaFunctionWrapper):
"""
Base class for all unified data source in Flink.
"""
def __init__(self, source: Union[str, JavaObject]):
"""
Constructor of Source.
:param source: The java Source object.
"""
super(Source, self).__init__(source)
[docs]class Sink(JavaFunctionWrapper):
"""
Base class for all unified data sink in Flink.
"""
def __init__(self, sink: Union[str, JavaObject]):
"""
Constructor of Sink.
:param sink: The java Sink object.
"""
super(Sink, self).__init__(sink)
[docs]class FileEnumeratorProvider(object):
"""
Factory for FileEnumerator which task is to discover all files to be read and to split them
into a set of file source splits. This includes possibly, path traversals, file filtering
(by name or other patterns) and deciding whether to split files into multiple splits, and
how to split them.
"""
def __init__(self, j_file_enumerator_provider):
self._j_file_enumerator_provider = j_file_enumerator_provider
[docs] @staticmethod
def default_splittable_file_enumerator() -> 'FileEnumeratorProvider':
"""
The default file enumerator used for splittable formats. The enumerator recursively
enumerates files, split files that consist of multiple distributed storage blocks into
multiple splits, and filters hidden files (files starting with '.' or '_'). Files with
suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...)
will not be split.
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileEnumeratorProvider(JFileSource.DEFAULT_SPLITTABLE_FILE_ENUMERATOR)
[docs] @staticmethod
def default_non_splittable_file_enumerator() -> 'FileEnumeratorProvider':
"""
The default file enumerator used for non-splittable formats. The enumerator recursively
enumerates files, creates one split for the file, and filters hidden files
(files starting with '.' or '_').
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileEnumeratorProvider(JFileSource.DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR)
[docs]class FileSplitAssignerProvider(object):
"""
Factory for FileSplitAssigner which is responsible for deciding what split should be
processed next by which node. It determines split processing order and locality.
"""
def __init__(self, j_file_split_assigner):
self._j_file_split_assigner = j_file_split_assigner
[docs] @staticmethod
def locality_aware_split_assigner() -> 'FileSplitAssignerProvider':
"""
A FileSplitAssigner that assigns to each host preferably splits that are local, before
assigning splits that are not local.
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileSplitAssignerProvider(JFileSource.DEFAULT_SPLIT_ASSIGNER)
[docs]class FileSourceBuilder(object):
"""
The builder for the :class:`~pyflink.datastream.connectors.FileSource`, to configure the
various behaviors.
Start building the source via one of the following methods:
- :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format`
"""
def __init__(self, j_file_source_builder):
self._j_file_source_builder = j_file_source_builder
[docs] def monitor_continuously(
self,
discovery_interval: Duration) -> 'FileSourceBuilder':
"""
Sets this source to streaming ("continuous monitoring") mode.
This makes the source a "continuous streaming" source that keeps running, monitoring
for new files, and reads these files when they appear and are discovered by the
monitoring.
The interval in which the source checks for new files is the discovery_interval. Shorter
intervals mean that files are discovered more quickly, but also imply more frequent
listing or directory traversal of the file system / object store.
"""
self._j_file_source_builder.monitorContinuously(discovery_interval._j_duration)
return self
[docs] def process_static_file_set(self) -> 'FileSourceBuilder':
"""
Sets this source to bounded (batch) mode.
In this mode, the source processes the files that are under the given paths when the
application is started. Once all files are processed, the source will finish.
This setting is also the default behavior. This method is mainly here to "switch back"
to bounded (batch) mode, or to make it explicit in the source construction.
"""
self._j_file_source_builder.processStaticFileSet()
return self
[docs] def set_file_enumerator(
self,
file_enumerator: 'FileEnumeratorProvider') -> 'FileSourceBuilder':
"""
Configures the FileEnumerator for the source. The File Enumerator is responsible
for selecting from the input path the set of files that should be processed (and which
to filter out). Furthermore, the File Enumerator may split the files further into
sub-regions, to enable parallelization beyond the number of files.
"""
self._j_file_source_builder.setFileEnumerator(
file_enumerator._j_file_enumerator_provider)
return self
[docs] def set_split_assigner(
self,
split_assigner: 'FileSplitAssignerProvider') -> 'FileSourceBuilder':
"""
Configures the FileSplitAssigner for the source. The File Split Assigner
determines which parallel reader instance gets which {@link FileSourceSplit}, and in
which order these splits are assigned.
"""
self._j_file_source_builder.setSplitAssigner(split_assigner._j_file_split_assigner)
return self
[docs] def build(self) -> 'FileSource':
"""
Creates the file source with the settings applied to this builder.
"""
return FileSource(self._j_file_source_builder.build())
[docs]class FileSource(Source):
"""
A unified data source that reads files - both in batch and in streaming mode.
This source supports all (distributed) file systems and object stores that can be accessed via
the Flink's FileSystem class.
Start building a file source via one of the following calls:
- :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format`
This creates a :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` on which
you can configure all the properties of the file source.
<h2>Batch and Streaming</h2>
This source supports both bounded/batch and continuous/streaming data inputs. For the
bounded/batch case, the file source processes all files under the given path(s). In the
continuous/streaming case, the source periodically checks the paths for new files and will start
reading those.
When you start creating a file source (via the
:class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` created
through one of the above-mentioned methods) the source is by default in bounded/batch mode. Call
:func:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder.monitor_continuously` to put
the source into continuous streaming mode.
<h2>Format Types</h2>
The reading of each file happens through file readers defined by <i>file formats</i>. These
define the parsing logic for the contents of the file. There are multiple classes that the
source supports. Their interfaces trade of simplicity of implementation and
flexibility/efficiency.
- A :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` reads the contents of
a file from a file stream. It is the simplest format to implement, and provides many
features out-of-the-box (like checkpointing logic) but is limited in the optimizations it
can apply (such as object reuse, batching, etc.).
<h2>Discovering / Enumerating Files</h2>
The way that the source lists the files to be processes is defined by the
:class:`~pyflink.datastream.connectors.FileSource.FileEnumeratorProvider`. The
FileEnumeratorProvider is responsible to select the relevant files (for example filter out
hidden files) and to optionally splits files into multiple regions (= file source splits) that
can be read in parallel).
"""
def __init__(self, j_file_source):
super(FileSource, self).__init__(source=j_file_source)
[docs]class NumberSequenceSource(Source):
"""
A data source that produces a sequence of numbers (longs). This source is useful for testing and
for cases that just need a stream of N events of any kind.
The source splits the sequence into as many parallel sub-sequences as there are parallel
source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is
limited to one, this will produce one sequence in order.
This source is always bounded. For very long sequences (for example over the entire domain of
long integer values), user may want to consider executing the application in a streaming manner,
because, despite the fact that the produced stream is bounded, the end bound is pretty far away.
"""
def __init__(self, start, end):
"""
Creates a new NumberSequenceSource that produces parallel sequences covering the
range start to end (both boundaries are inclusive).
"""
JNumberSequenceSource = get_gateway().jvm.org.apache.flink.api.connector.source.lib.\
NumberSequenceSource
j_seq_source = JNumberSequenceSource(start, end)
super(NumberSequenceSource, self).__init__(source=j_seq_source)
[docs]class FileSink(Sink):
"""
A unified sink that emits its input elements to FileSystem files within buckets. This
sink achieves exactly-once semantics for both BATCH and STREAMING.
When creating the sink a basePath must be specified. The base directory contains one
directory for every bucket. The bucket directories themselves contain several part files, with
at least one for each parallel subtask of the sink which is writing data to that bucket.
These part files contain the actual output data.
The sink uses a BucketAssigner to determine in which bucket directory each element
should be written to inside the base directory. The BucketAssigner can, for example, roll
on every checkpoint or use time or a property of the element to determine the bucket directory.
The default BucketAssigner is a DateTimeBucketAssigner which will create one new
bucket every hour. You can specify a custom BucketAssigner using the
:func:`~pyflink.datastream.connectors.FileSink.RowFormatBuilder.with_bucket_assigner`,
after calling :class:`~pyflink.datastream.connectors.FileSink.for_row_format`.
The names of the part files could be defined using OutputFileConfig. This
configuration contains a part prefix and a part suffix that will be used with a random uid
assigned to each subtask of the sink and a rolling counter to determine the file names. For
example with a prefix "prefix" and a suffix ".ext", a file named {@code
"prefix-81fc4980-a6af-41c8-9937-9939408a734b-17.ext"} contains the data from subtask with uid
{@code 81fc4980-a6af-41c8-9937-9939408a734b} of the sink and is the {@code 17th} part-file
created by that subtask.
Part files roll based on the user-specified RollingPolicy. By default, a DefaultRollingPolicy
is used for row-encoded sink output; a OnCheckpointRollingPolicy is
used for bulk-encoded sink output.
In some scenarios, the open buckets are required to change based on time. In these cases, the
user can specify a bucket_check_interval (by default 1m) and the sink will check
periodically and roll the part file if the specified rolling policy says so.
Part files can be in one of three states: in-progress, pending or finished. The reason for this
is how the sink works to provide exactly-once semantics and fault-tolerance. The part file that
is currently being written to is in-progress. Once a part file is closed for writing it becomes
pending. When a checkpoint is successful (for STREAMING) or at the end of the job (for BATCH)
the currently pending files will be moved to finished.
For STREAMING in order to guarantee exactly-once semantics in case of a failure, the
sink should roll back to the state it had when that last successful checkpoint occurred. To this
end, when restoring, the restored files in pending state are transferred into the finished state
while any in-progress files are rolled back, so that they do not contain data that arrived after
the checkpoint from which we restore.
"""
def __init__(self, j_file_sink):
super(FileSink, self).__init__(sink=j_file_sink)