Source code for pyflink.datastream.connectors

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
from enum import Enum
from typing import Dict, List, Union

from pyflink.common import typeinfo, Duration
from pyflink.common.serialization import DeserializationSchema, Encoder, SerializationSchema
from pyflink.common.typeinfo import RowTypeInfo, TypeInformation
from pyflink.datastream.functions import SourceFunction, SinkFunction, JavaFunctionWrapper
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray

from py4j.java_gateway import JavaObject

__all__ = [
    'FileEnumeratorProvider',
    'FileSink',
    'FileSource',
    'FileSourceBuilder',
    'FileSplitAssignerProvider',
    'FlinkKafkaConsumer',
    'FlinkKafkaProducer',
    'JdbcSink',
    'JdbcConnectionOptions',
    'JdbcExecutionOptions',
    'NumberSequenceSource',
    'OutputFileConfig',
    'RollingPolicy',
    'Sink',
    'Source',
    'StreamFormat',
    'StreamingFileSink']


class FlinkKafkaConsumerBase(SourceFunction, abc.ABC):
    """
    Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
    all kafka versions.

    The Kafka version specific behavior is defined mainly in the specific subclasses.
    """

    def __init__(self, j_flink_kafka_consumer):
        super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer)

    def set_commit_offsets_on_checkpoints(self,
                                          commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase':
        """
        Specifies whether or not the consumer should commit offsets back to kafka on checkpoints.
        This setting will only have effect if checkpointing is enabled for the job. If checkpointing
        isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
        property settings will be used.
        """
        self._j_function = self._j_function \
            .setCommitOffsetsOnCheckpoints(commit_on_checkpoints)
        return self

    def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading from the earliest offset for all partitions. This
        lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromEarliest()
        return self

    def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consuer to start reading from the latest offset for all partitions. This lets
        the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromLatest()
        return self

    def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading partitions from a specified timestamp. The specified
        timestamp must be before the current timestamp. This lets the consumer ignore any committed
        group offsets in Zookeeper / Kafka brokers.

        The consumer will look up the earliest offset whose timestamp is greater than or equal to
        the specific timestamp from Kafka. If there's no such offset, the consumer will use the
        latest offset to read data from Kafka.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.

        :param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for
                                          epoch.
        """
        self._j_function = self._j_function.setStartFromTimestamp(
            startup_offsets_timestamp)
        return self

    def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading from any committed group offsets found in Zookeeper/
        Kafka brokers. The 'group.id' property must be set in the configuration properties. If no
        offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the
        configuration properties will be used for the partition.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromGroupOffsets()
        return self

    def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase':
        """
        By default, when restoring from a checkpoint / savepoint, the consumer always ignores
        restored partitions that are no longer associated with the current specified topics or topic
        pattern to subscribe to.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function \
            .disableFilterRestoredPartitionsWithSubscribedTopics()
        return self

    def get_produced_type(self) -> TypeInformation:
        return typeinfo._from_java_type(self._j_function.getProducedType())


[docs]class FlinkKafkaConsumer(FlinkKafkaConsumerBase): """ The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull data from one or more Kafka partitions. The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost during a failure, and that the computation processes elements 'exactly once. (These guarantees naturally assume that Kafka itself does not lose any data.) Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer has consumed a topic. Please refer to Kafka's documentation for the available configuration properties: http://kafka.apache.org/documentation.html#newconsumerconfigs """ def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema, properties: Dict): """ Creates a new Kafka streaming source consumer for Kafka 0.10.x. This constructor allows passing multiple topics to the consumer. :param topics: The Kafka topics to read from. :param deserialization_schema: The de-/serializer used to convert between Kafka's byte messages and Flink's objects. :param properties: The properties that are used to configure both the fetcher and the offset handler. """ JFlinkKafkaConsumer = get_gateway().jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, JFlinkKafkaConsumer) super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)
class FlinkKafkaProducerBase(SinkFunction, abc.ABC): """ Flink Sink to produce data into a Kafka topic. Please note that this producer provides at-least-once reliability guarantees when checkpoints are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any reliability guarantees. """ def __init__(self, j_flink_kafka_producer): super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer) def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase': """ Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). :param log_failures_only: The flag to indicate logging-only on exceptions. """ self._j_function.setLogFailuresOnly(log_failures_only) return self def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase': """ If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. :param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint) """ self._j_function.setFlushOnCheckpoint(flush_on_checkpoint) return self def set_write_timestamp_to_kafka(self, write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase': """ If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. Timestamps must be positive for Kafka to accept them. :param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written to Kafka. """ self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka) return self class Semantic(Enum): """ Semantics that can be chosen. :data: `EXACTLY_ONCE`: The Flink producer will write all messages in a Kafka transaction that will be committed to the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint complete notifications are running late, FlinkKafkaProducer can run out of FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot- State() requests will fail and the FlinkKafkaProducer will keep using the FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints there are four options: 1. decrease number of max concurrent checkpoints 2. make checkpoints mre reliable (so that they complete faster) 3. increase delay between checkpoints 4. increase size of FlinkKafkaProducers pool :data: `AT_LEAST_ONCE`: The Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. :data: `NONE`: Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of failure. """ EXACTLY_ONCE = 0, AT_LEAST_ONCE = 1, NONE = 2 def _to_j_semantic(self): JSemantic = get_gateway().jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic return getattr(JSemantic, self.name)
[docs]class FlinkKafkaProducer(FlinkKafkaProducerBase): """ Flink Sink to produce data into a Kafka topic. By default producer will use AT_LEAST_ONCE semantic. Before using EXACTLY_ONCE please refer to Flink's Kafka connector documentation. """ def __init__(self, topic: str, serialization_schema: SerializationSchema, producer_config: Dict, kafka_producer_pool_size: int = 5, semantic=Semantic.AT_LEAST_ONCE): """ Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner. This default partitioner maps each sink subtask to a single Kafka partition (i.e. all records received by a sink subtask will end up in the same Kafka partition). :param topic: ID of the Kafka topic. :param serialization_schema: User defined key-less serialization schema. :param producer_config: Properties with the producer configuration. """ gateway = get_gateway() j_properties = gateway.jvm.java.util.Properties() for key, value in producer_config.items(): j_properties.setProperty(key, value) JFlinkKafkaProducer = gateway.jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer j_flink_kafka_producer = JFlinkKafkaProducer( topic, serialization_schema._j_serialization_schema, j_properties, None, semantic._to_j_semantic(), kafka_producer_pool_size) super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer)
[docs] def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer': """ Disables the propagation of exceptions thrown when committing presumably timed out Kafka transactions during recovery of the job. If a Kafka transaction is timed out, a commit will never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions will still be logged to inform the user that data loss might have occurred. Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will attempt at least one commit of the transaction before giving up. :return: This FlinkKafkaProducer. """ self._j_function.ignoreFailuresAfterTransactionTimeout() return self
def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz): if not isinstance(topics, list): topics = [topics] gateway = get_gateway() j_properties = gateway.jvm.java.util.Properties() for key, value in properties.items(): j_properties.setProperty(key, value) j_flink_kafka_consumer = j_consumer_clz(topics, deserialization_schema._j_deserialization_schema, j_properties) return j_flink_kafka_consumer
[docs]class JdbcSink(SinkFunction): def __init__(self, j_jdbc_sink): super(JdbcSink, self).__init__(sink_func=j_jdbc_sink)
[docs] @staticmethod def sink(sql: str, type_info: RowTypeInfo, jdbc_connection_options: 'JdbcConnectionOptions', jdbc_execution_options: 'JdbcExecutionOptions' = None): """ Create a JDBC sink. :param sql: arbitrary DML query (e.g. insert, update, upsert) :param type_info: A RowTypeInfo for query field types. :param jdbc_execution_options: parameters of execution, such as batch size and maximum retries. :param jdbc_connection_options: parameters of connection, such as JDBC URL. :return: A JdbcSink. """ sql_types = [] gateway = get_gateway() JJdbcTypeUtil = gateway.jvm.org.apache.flink.connector.jdbc.utils.JdbcTypeUtil for field_type in type_info.get_field_types(): sql_types.append(JJdbcTypeUtil .typeInformationToSqlType(field_type.get_java_type_info())) j_sql_type = to_jarray(gateway.jvm.int, sql_types) output_format_clz = gateway.jvm.Class\ .forName('org.apache.flink.connector.jdbc.internal.JdbcOutputFormat', False, get_gateway().jvm.Thread.currentThread().getContextClassLoader()) j_int_array_type = to_jarray(gateway.jvm.int, []).getClass() j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder', to_jarray(gateway.jvm.Class, [j_int_array_type])) j_builder_method.setAccessible(True) j_statement_builder = j_builder_method.invoke(None, to_jarray(gateway.jvm.Object, [j_sql_type])) jdbc_execution_options = jdbc_execution_options if jdbc_execution_options is not None \ else JdbcExecutionOptions.defaults() j_jdbc_sink = gateway.jvm.org.apache.flink.connector.jdbc.JdbcSink\ .sink(sql, j_statement_builder, jdbc_execution_options._j_jdbc_execution_options, jdbc_connection_options._j_jdbc_connection_options) return JdbcSink(j_jdbc_sink=j_jdbc_sink)
[docs]class JdbcConnectionOptions(object): """ JDBC connection options. """ def __init__(self, j_jdbc_connection_options): self._j_jdbc_connection_options = j_jdbc_connection_options
[docs] def get_db_url(self) -> str: return self._j_jdbc_connection_options.getDbURL()
[docs] def get_driver_name(self) -> str: return self._j_jdbc_connection_options.getDriverName()
[docs] def get_user_name(self) -> str: return self._j_jdbc_connection_options.getUsername()
[docs] def get_password(self) -> str: return self._j_jdbc_connection_options.getPassword()
[docs] class JdbcConnectionOptionsBuilder(object): """ Builder for JdbcConnectionOptions. """ def __init__(self): self._j_options_builder = get_gateway().jvm.org.apache.flink.connector\ .jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
[docs] def with_url(self, url: str) -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withUrl(url) return self
[docs] def with_driver_name(self, driver_name: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withDriverName(driver_name) return self
[docs] def with_user_name(self, user_name: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withUsername(user_name) return self
[docs] def with_password(self, password: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withPassword(password) return self
[docs] def build(self) -> 'JdbcConnectionOptions': return JdbcConnectionOptions(j_jdbc_connection_options=self._j_options_builder.build())
[docs]class JdbcExecutionOptions(object): """ JDBC sink batch options. """ def __init__(self, j_jdbc_execution_options): self._j_jdbc_execution_options = j_jdbc_execution_options
[docs] def get_batch_interval_ms(self) -> int: return self._j_jdbc_execution_options.getBatchIntervalMs()
[docs] def get_batch_size(self) -> int: return self._j_jdbc_execution_options.getBatchSize()
[docs] def get_max_retries(self) -> int: return self._j_jdbc_execution_options.getMaxRetries()
[docs] @staticmethod def defaults() -> 'JdbcExecutionOptions': return JdbcExecutionOptions( j_jdbc_execution_options=get_gateway().jvm .org.apache.flink.connector.jdbc.JdbcExecutionOptions.defaults())
[docs] @staticmethod def builder() -> 'Builder': return JdbcExecutionOptions.Builder()
[docs] class Builder(object): """ Builder for JdbcExecutionOptions. """ def __init__(self): self._j_builder = get_gateway().jvm\ .org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder()
[docs] def with_batch_size(self, size: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withBatchSize(size) return self
[docs] def with_batch_interval_ms(self, interval_ms: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withBatchIntervalMs(interval_ms) return self
[docs] def with_max_retries(self, max_retries: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withMaxRetries(max_retries) return self
[docs] def build(self) -> 'JdbcExecutionOptions': return JdbcExecutionOptions(j_jdbc_execution_options=self._j_builder.build())
[docs]class RollingPolicy(object): """ The policy based on which a Bucket in the FileSink rolls its currently open part file and opens a new one. """ def __init__(self, j_rolling_policy): self._j_rolling_policy = j_rolling_policy
[docs] @staticmethod def default_rolling_policy( part_size: int = 1024 * 1024 * 128, rollover_interval: int = 60 * 1000, inactivity_interval: int = 60 * 1000) -> 'RollingPolicy': """ Returns the default implementation of the RollingPolicy. This policy rolls a part file if: - there is no open part file, - the current file has reached the maximum bucket size (by default 128MB), - the current file is older than the roll over interval (by default 60 sec), or - the current file has not been written to for more than the allowed inactivityTime (by default 60 sec). :param part_size: The maximum part file size before rolling. :param rollover_interval: The maximum time duration a part file can stay open before rolling. :param inactivity_interval: The time duration of allowed inactivity after which a part file will have to roll. """ JDefaultRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions.\ sink.filesystem.rollingpolicies.DefaultRollingPolicy j_rolling_policy = JDefaultRollingPolicy.builder()\ .withMaxPartSize(part_size) \ .withRolloverInterval(rollover_interval) \ .withInactivityInterval(inactivity_interval) \ .build() return RollingPolicy(j_rolling_policy)
[docs] @staticmethod def on_checkpoint_rolling_policy() -> 'RollingPolicy': """ Returns a RollingPolicy which rolls (ONLY) on every checkpoint. """ JOnCheckpointRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions. \ sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy return RollingPolicy(JOnCheckpointRollingPolicy.build())
class BucketAssigner(object): """ A BucketAssigner is used with a file sink to determine the bucket each incoming element should be put into. The StreamingFileSink can be writing to many buckets at a time, and it is responsible for managing a set of active buckets. Whenever a new element arrives it will ask the BucketAssigner for the bucket the element should fall in. The BucketAssigner can, for example, determine buckets based on system time. """ def __init__(self, j_bucket_assigner): self._j_bucket_assigner = j_bucket_assigner @staticmethod def base_path_bucket_assigner() -> 'BucketAssigner': """ Creates a BucketAssigner that does not perform any bucketing of files. All files are written to the base path. """ return BucketAssigner(get_gateway().jvm.org.apache.flink.streaming.api.functions.sink. filesystem.bucketassigners.BasePathBucketAssigner()) @staticmethod def date_time_bucket_assigner(format_str: str = "yyyy-MM-dd--HH", timezone_id: str = None): """ Creates a BucketAssigner that assigns to buckets based on current system time. It will create directories of the following form: /{basePath}/{dateTimePath}/}. The basePath is the path that was specified as a base path when creating the new bucket. The dateTimePath is determined based on the current system time and the user provided format string. The Java DateTimeFormatter is used to derive a date string from the current system time and the date format string. The default format string is "yyyy-MM-dd--HH" so the rolling files will have a granularity of hours. :param format_str: The format string used to determine the bucket id. :param timezone_id: The timezone id, either an abbreviation such as "PST", a full name such as "America/Los_Angeles", or a custom timezone_id such as "GMT-08:00". Th e default time zone will b used if it's None. """ if timezone_id is not None and isinstance(timezone_id, str): j_timezone = get_gateway().jvm.java.time.ZoneId.of(timezone_id) else: j_timezone = get_gateway().jvm.java.time.ZoneId.systemDefault() return BucketAssigner( get_gateway().jvm.org.apache.flink.streaming.api.functions.sink. filesystem.bucketassigners.DateTimeBucketAssigner(format_str, j_timezone))
[docs]class StreamingFileSink(SinkFunction): """ Sink that emits its input elements to `FileSystem` files within buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics. When creating the sink a `basePath` must be specified. The base directory contains one directory for every bucket. The bucket directories themselves contain several part files, with at least one for each parallel subtask of the sink which is writing data to that bucket. These part files contain the actual output data. """ def __init__(self, j_obj): super(StreamingFileSink, self).__init__(j_obj)
[docs] class DefaultRowFormatBuilder(object): """ Builder for the vanilla `StreamingFileSink` using a row format. """ def __init__(self, j_default_row_format_builder): self.j_default_row_format_builder = j_default_row_format_builder
[docs] def with_bucket_check_interval( self, interval: int) -> 'StreamingFileSink.DefaultRowFormatBuilder': self.j_default_row_format_builder.withBucketCheckInterval(interval) return self
[docs] def with_bucket_assigner( self, bucket_assigner: BucketAssigner) -> 'StreamingFileSink.DefaultRowFormatBuilder': self.j_default_row_format_builder.withBucketAssigner(bucket_assigner._j_bucket_assigner) return self
[docs] def with_rolling_policy( self, policy: RollingPolicy) -> 'StreamingFileSink.DefaultRowFormatBuilder': self.j_default_row_format_builder.withRollingPolicy(policy._j_rolling_policy) return self
[docs] def with_output_file_config( self, output_file_config: 'OutputFileConfig') \ -> 'StreamingFileSink.DefaultRowFormatBuilder': self.j_default_row_format_builder.withOutputFileConfig( output_file_config._j_output_file_config) return self
[docs] def build(self) -> 'StreamingFileSink': j_stream_file_sink = self.j_default_row_format_builder.build() return StreamingFileSink(j_stream_file_sink)
[docs] @staticmethod def for_row_format(base_path: str, encoder: Encoder) -> 'DefaultRowFormatBuilder': j_path = get_gateway().jvm.org.apache.flink.core.fs.Path(base_path) j_default_row_format_builder = get_gateway().jvm.org.apache.flink.streaming.api.\ functions.sink.filesystem.StreamingFileSink.forRowFormat(j_path, encoder._j_encoder) return StreamingFileSink.DefaultRowFormatBuilder(j_default_row_format_builder)
[docs]class OutputFileConfig(object): """ Part file name configuration. This allow to define a prefix and a suffix to the part file name. """
[docs] @staticmethod def builder(): return OutputFileConfig.OutputFileConfigBuilder()
def __init__(self, part_prefix: str, part_suffix: str): self._j_output_file_config = get_gateway().jvm.org.apache.flink.streaming.api.\ functions.sink.filesystem.OutputFileConfig(part_prefix, part_suffix)
[docs] def get_part_prefix(self) -> str: """ The prefix for the part name. """ return self._j_output_file_config.getPartPrefix()
[docs] def get_part_suffix(self) -> str: """ The suffix for the part name. """ return self._j_output_file_config.getPartSuffix()
[docs] class OutputFileConfigBuilder(object): """ A builder to create the part file configuration. """ def __init__(self): self.part_prefix = "part" self.part_suffix = ""
[docs] def with_part_prefix(self, prefix) -> 'OutputFileConfig.OutputFileConfigBuilder': self.part_prefix = prefix return self
[docs] def with_part_suffix(self, suffix) -> 'OutputFileConfig.OutputFileConfigBuilder': self.part_suffix = suffix return self
[docs] def build(self) -> 'OutputFileConfig': return OutputFileConfig(self.part_prefix, self.part_suffix)
[docs]class Source(JavaFunctionWrapper): """ Base class for all unified data source in Flink. """ def __init__(self, source: Union[str, JavaObject]): """ Constructor of Source. :param source: The java Source object. """ super(Source, self).__init__(source)
[docs]class Sink(JavaFunctionWrapper): """ Base class for all unified data sink in Flink. """ def __init__(self, sink: Union[str, JavaObject]): """ Constructor of Sink. :param sink: The java Sink object. """ super(Sink, self).__init__(sink)
[docs]class StreamFormat(object): """ A reader format that reads individual records from a stream. Compared to the :class:`~pyflink.datastream.connectors.FileSource.BulkFormat`, the stream format handles a few things out-of-the-box, like deciding how to batch records or dealing with compression. Internally in the file source, the readers pass batches of records from the reading threads (that perform the typically blocking I/O operations) to the async mailbox threads that do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time) much reduces the thread-to-thread handover overhead. This batching is by default based on I/O fetch size for the StreamFormat, meaning the set of records derived from one I/O buffer will be handed over as one. See config option `source.file.stream.io-fetch-size` to configure that fetch size. """ def __init__(self, j_stream_format): self._j_stream_format = j_stream_format
[docs] @staticmethod def text_line_format(charset_name: str = "UTF-8") -> 'StreamFormat': """ Creates a reader format that text lines from a file. The reader uses Java's built-in java.io.InputStreamReader to decode the byte stream using various supported charset encodings. This format does not support optimized recovery from checkpoints. On recovery, it will re-read and discard the number of lined that were processed before the last checkpoint. That is due to the fact that the offsets of lines in the file cannot be tracked through the charset decoders with their internal buffering of stream input and charset decoder state. :param charset_name: The charset to decode the byte stream. """ j_stream_format = get_gateway().jvm.org.apache.flink.connector.file.src.reader. \ TextLineFormat(charset_name) return StreamFormat(j_stream_format)
[docs]class FileEnumeratorProvider(object): """ Factory for FileEnumerator which task is to discover all files to be read and to split them into a set of file source splits. This includes possibly, path traversals, file filtering (by name or other patterns) and deciding whether to split files into multiple splits, and how to split them. """ def __init__(self, j_file_enumerator_provider): self._j_file_enumerator_provider = j_file_enumerator_provider
[docs] @staticmethod def default_splittable_file_enumerator() -> 'FileEnumeratorProvider': """ The default file enumerator used for splittable formats. The enumerator recursively enumerates files, split files that consist of multiple distributed storage blocks into multiple splits, and filters hidden files (files starting with '.' or '_'). Files with suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...) will not be split. """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileEnumeratorProvider(JFileSource.DEFAULT_SPLITTABLE_FILE_ENUMERATOR)
[docs] @staticmethod def default_non_splittable_file_enumerator() -> 'FileEnumeratorProvider': """ The default file enumerator used for non-splittable formats. The enumerator recursively enumerates files, creates one split for the file, and filters hidden files (files starting with '.' or '_'). """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileEnumeratorProvider(JFileSource.DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR)
[docs]class FileSplitAssignerProvider(object): """ Factory for FileSplitAssigner which is responsible for deciding what split should be processed next by which node. It determines split processing order and locality. """ def __init__(self, j_file_split_assigner): self._j_file_split_assigner = j_file_split_assigner
[docs] @staticmethod def locality_aware_split_assigner() -> 'FileSplitAssignerProvider': """ A FileSplitAssigner that assigns to each host preferably splits that are local, before assigning splits that are not local. """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileSplitAssignerProvider(JFileSource.DEFAULT_SPLIT_ASSIGNER)
[docs]class FileSourceBuilder(object): """ The builder for the :class:`~pyflink.datastream.connectors.FileSource`, to configure the various behaviors. Start building the source via one of the following methods: - :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format` """ def __init__(self, j_file_source_builder): self._j_file_source_builder = j_file_source_builder
[docs] def monitor_continuously( self, discovery_interval: Duration) -> 'FileSourceBuilder': """ Sets this source to streaming ("continuous monitoring") mode. This makes the source a "continuous streaming" source that keeps running, monitoring for new files, and reads these files when they appear and are discovered by the monitoring. The interval in which the source checks for new files is the discovery_interval. Shorter intervals mean that files are discovered more quickly, but also imply more frequent listing or directory traversal of the file system / object store. """ self._j_file_source_builder.monitorContinuously(discovery_interval._j_duration) return self
[docs] def process_static_file_set(self) -> 'FileSourceBuilder': """ Sets this source to bounded (batch) mode. In this mode, the source processes the files that are under the given paths when the application is started. Once all files are processed, the source will finish. This setting is also the default behavior. This method is mainly here to "switch back" to bounded (batch) mode, or to make it explicit in the source construction. """ self._j_file_source_builder.processStaticFileSet() return self
[docs] def set_file_enumerator( self, file_enumerator: 'FileEnumeratorProvider') -> 'FileSourceBuilder': """ Configures the FileEnumerator for the source. The File Enumerator is responsible for selecting from the input path the set of files that should be processed (and which to filter out). Furthermore, the File Enumerator may split the files further into sub-regions, to enable parallelization beyond the number of files. """ self._j_file_source_builder.setFileEnumerator( file_enumerator._j_file_enumerator_provider) return self
[docs] def set_split_assigner( self, split_assigner: 'FileSplitAssignerProvider') -> 'FileSourceBuilder': """ Configures the FileSplitAssigner for the source. The File Split Assigner determines which parallel reader instance gets which {@link FileSourceSplit}, and in which order these splits are assigned. """ self._j_file_source_builder.setSplitAssigner(split_assigner._j_file_split_assigner) return self
[docs] def build(self) -> 'FileSource': """ Creates the file source with the settings applied to this builder. """ return FileSource(self._j_file_source_builder.build())
[docs]class FileSource(Source): """ A unified data source that reads files - both in batch and in streaming mode. This source supports all (distributed) file systems and object stores that can be accessed via the Flink's FileSystem class. Start building a file source via one of the following calls: - :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format` This creates a :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` on which you can configure all the properties of the file source. <h2>Batch and Streaming</h2> This source supports both bounded/batch and continuous/streaming data inputs. For the bounded/batch case, the file source processes all files under the given path(s). In the continuous/streaming case, the source periodically checks the paths for new files and will start reading those. When you start creating a file source (via the :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` created through one of the above-mentioned methods) the source is by default in bounded/batch mode. Call :func:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder.monitor_continuously` to put the source into continuous streaming mode. <h2>Format Types</h2> The reading of each file happens through file readers defined by <i>file formats</i>. These define the parsing logic for the contents of the file. There are multiple classes that the source supports. Their interfaces trade of simplicity of implementation and flexibility/efficiency. - A :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply (such as object reuse, batching, etc.). <h2>Discovering / Enumerating Files</h2> The way that the source lists the files to be processes is defined by the :class:`~pyflink.datastream.connectors.FileSource.FileEnumeratorProvider`. The FileEnumeratorProvider is responsible to select the relevant files (for example filter out hidden files) and to optionally splits files into multiple regions (= file source splits) that can be read in parallel). """ def __init__(self, j_file_source): super(FileSource, self).__init__(source=j_file_source)
[docs] @staticmethod def for_record_stream_format(stream_format: StreamFormat, *paths: str) -> FileSourceBuilder: """ Builds a new FileSource using a :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` to read record-by-record from a file stream. When possible, stream-based formats are generally easier (preferable) to file-based formats, because they support better default behavior around I/O batching or progress tracking (checkpoints). Stream formats also automatically de-compress files based on the file extension. This supports files ending in ".deflate" (Deflate), ".xz" (XZ), ".bz2" (BZip2), ".gz", ".gzip" (GZip). """ JPath = get_gateway().jvm.org.apache.flink.core.fs.Path JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource j_paths = to_jarray(JPath, [JPath(p) for p in paths]) return FileSourceBuilder( JFileSource.forRecordStreamFormat(stream_format._j_stream_format, j_paths))
[docs]class NumberSequenceSource(Source): """ A data source that produces a sequence of numbers (longs). This source is useful for testing and for cases that just need a stream of N events of any kind. The source splits the sequence into as many parallel sub-sequences as there are parallel source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is limited to one, this will produce one sequence in order. This source is always bounded. For very long sequences (for example over the entire domain of long integer values), user may want to consider executing the application in a streaming manner, because, despite the fact that the produced stream is bounded, the end bound is pretty far away. """ def __init__(self, start, end): """ Creates a new NumberSequenceSource that produces parallel sequences covering the range start to end (both boundaries are inclusive). """ JNumberSequenceSource = get_gateway().jvm.org.apache.flink.api.connector.source.lib.\ NumberSequenceSource j_seq_source = JNumberSequenceSource(start, end) super(NumberSequenceSource, self).__init__(source=j_seq_source)
[docs]class FileSink(Sink): """ A unified sink that emits its input elements to FileSystem files within buckets. This sink achieves exactly-once semantics for both BATCH and STREAMING. When creating the sink a basePath must be specified. The base directory contains one directory for every bucket. The bucket directories themselves contain several part files, with at least one for each parallel subtask of the sink which is writing data to that bucket. These part files contain the actual output data. The sink uses a BucketAssigner to determine in which bucket directory each element should be written to inside the base directory. The BucketAssigner can, for example, roll on every checkpoint or use time or a property of the element to determine the bucket directory. The default BucketAssigner is a DateTimeBucketAssigner which will create one new bucket every hour. You can specify a custom BucketAssigner using the :func:`~pyflink.datastream.connectors.FileSink.RowFormatBuilder.with_bucket_assigner`, after calling :class:`~pyflink.datastream.connectors.FileSink.for_row_format`. The names of the part files could be defined using OutputFileConfig. This configuration contains a part prefix and a part suffix that will be used with a random uid assigned to each subtask of the sink and a rolling counter to determine the file names. For example with a prefix "prefix" and a suffix ".ext", a file named {@code "prefix-81fc4980-a6af-41c8-9937-9939408a734b-17.ext"} contains the data from subtask with uid {@code 81fc4980-a6af-41c8-9937-9939408a734b} of the sink and is the {@code 17th} part-file created by that subtask. Part files roll based on the user-specified RollingPolicy. By default, a DefaultRollingPolicy is used for row-encoded sink output; a OnCheckpointRollingPolicy is used for bulk-encoded sink output. In some scenarios, the open buckets are required to change based on time. In these cases, the user can specify a bucket_check_interval (by default 1m) and the sink will check periodically and roll the part file if the specified rolling policy says so. Part files can be in one of three states: in-progress, pending or finished. The reason for this is how the sink works to provide exactly-once semantics and fault-tolerance. The part file that is currently being written to is in-progress. Once a part file is closed for writing it becomes pending. When a checkpoint is successful (for STREAMING) or at the end of the job (for BATCH) the currently pending files will be moved to finished. For STREAMING in order to guarantee exactly-once semantics in case of a failure, the sink should roll back to the state it had when that last successful checkpoint occurred. To this end, when restoring, the restored files in pending state are transferred into the finished state while any in-progress files are rolled back, so that they do not contain data that arrived after the checkpoint from which we restore. """ def __init__(self, j_file_sink): super(FileSink, self).__init__(sink=j_file_sink)
[docs] class RowFormatBuilder(object): """ Builder for the vanilla FileSink using a row format. """ def __init__(self, j_row_format_builder): self._j_row_format_builder = j_row_format_builder
[docs] def with_bucket_check_interval(self, interval: int): """ :param interval: The check interval in milliseconds. """ self._j_row_format_builder.withBucketCheckInterval(interval) return self
[docs] def with_bucket_assigner(self, bucket_assigner: BucketAssigner): self._j_row_format_builder.withBucketAssigner(bucket_assigner._j_bucket_assigner) return self
[docs] def with_rolling_policy(self, rolling_policy: RollingPolicy): self._j_row_format_builder.withRollingPolicy(rolling_policy._j_rolling_policy) return self
[docs] def with_output_file_config(self, output_file_config: OutputFileConfig): self._j_row_format_builder.withOutputFileConfig( output_file_config._j_output_file_config) return self
[docs] def build(self): return FileSink(self._j_row_format_builder.build())
[docs] @staticmethod def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuilder': JPath = get_gateway().jvm.org.apache.flink.core.fs.Path JFileSink = get_gateway().jvm.org.apache.flink.connector.file.sink.FileSink return FileSink.RowFormatBuilder( JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))