Source code for pyflink.datastream.connectors

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
from enum import Enum
from typing import Dict, List, Union

from pyflink.common import typeinfo, Duration, ConfigOption, ExecutionConfig
from pyflink.common.serialization import DeserializationSchema, Encoder, SerializationSchema
from pyflink.common.typeinfo import RowTypeInfo, TypeInformation
from pyflink.datastream.functions import SourceFunction, SinkFunction, JavaFunctionWrapper
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray

from py4j.java_gateway import JavaObject

__all__ = [
    'FileEnumeratorProvider',
    'FileSink',
    'FileSource',
    'FileSourceBuilder',
    'FileSplitAssignerProvider',
    'FlinkKafkaConsumer',
    'FlinkKafkaProducer',
    'JdbcSink',
    'JdbcConnectionOptions',
    'JdbcExecutionOptions',
    'NumberSequenceSource',
    'OutputFileConfig',
    'PulsarDeserializationSchema',
    'PulsarSource',
    'PulsarSourceBuilder',
    'RMQConnectionConfig',
    'RMQSource',
    'RMQSink',
    'RollingPolicy',
    'Sink',
    'Source',
    'StartCursor',
    'StopCursor',
    'StreamFormat',
    'StreamingFileSink',
    'SubscriptionType']


class FlinkKafkaConsumerBase(SourceFunction, abc.ABC):
    """
    Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
    all kafka versions.

    The Kafka version specific behavior is defined mainly in the specific subclasses.
    """

    def __init__(self, j_flink_kafka_consumer):
        super(FlinkKafkaConsumerBase, self).__init__(source_func=j_flink_kafka_consumer)

    def set_commit_offsets_on_checkpoints(self,
                                          commit_on_checkpoints: bool) -> 'FlinkKafkaConsumerBase':
        """
        Specifies whether or not the consumer should commit offsets back to kafka on checkpoints.
        This setting will only have effect if checkpointing is enabled for the job. If checkpointing
        isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
        property settings will be used.
        """
        self._j_function = self._j_function \
            .setCommitOffsetsOnCheckpoints(commit_on_checkpoints)
        return self

    def set_start_from_earliest(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading from the earliest offset for all partitions. This
        lets the consumer ignore any committed group offsets in Zookeeper/ Kafka brokers.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromEarliest()
        return self

    def set_start_from_latest(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consuer to start reading from the latest offset for all partitions. This lets
        the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromLatest()
        return self

    def set_start_from_timestamp(self, startup_offsets_timestamp: int) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading partitions from a specified timestamp. The specified
        timestamp must be before the current timestamp. This lets the consumer ignore any committed
        group offsets in Zookeeper / Kafka brokers.

        The consumer will look up the earliest offset whose timestamp is greater than or equal to
        the specific timestamp from Kafka. If there's no such offset, the consumer will use the
        latest offset to read data from Kafka.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.

        :param startup_offsets_timestamp: timestamp for the startup offsets, as milliseconds for
                                          epoch.
        """
        self._j_function = self._j_function.setStartFromTimestamp(
            startup_offsets_timestamp)
        return self

    def set_start_from_group_offsets(self) -> 'FlinkKafkaConsumerBase':
        """
        Specifies the consumer to start reading from any committed group offsets found in Zookeeper/
        Kafka brokers. The 'group.id' property must be set in the configuration properties. If no
        offset can be found for a partition, the behaviour in 'auto.offset.reset' set in the
        configuration properties will be used for the partition.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function.setStartFromGroupOffsets()
        return self

    def disable_filter_restored_partitions_with_subscribed_topics(self) -> 'FlinkKafkaConsumerBase':
        """
        By default, when restoring from a checkpoint / savepoint, the consumer always ignores
        restored partitions that are no longer associated with the current specified topics or topic
        pattern to subscribe to.

        This method does not affect where partitions are read from when the consumer is restored
        from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
        savepoint, only the offsets in the restored state will be used.
        """
        self._j_function = self._j_function \
            .disableFilterRestoredPartitionsWithSubscribedTopics()
        return self

    def get_produced_type(self) -> TypeInformation:
        return typeinfo._from_java_type(self._j_function.getProducedType())


[docs]class FlinkKafkaConsumer(FlinkKafkaConsumerBase): """ The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from Apache Kafka. The consumer can run in multiple parallel instances, each of which will pull data from one or more Kafka partitions. The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost during a failure, and that the computation processes elements 'exactly once. (These guarantees naturally assume that Kafka itself does not lose any data.) Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how far the Flink Kafka consumer has consumed a topic. Please refer to Kafka's documentation for the available configuration properties: http://kafka.apache.org/documentation.html#newconsumerconfigs """ def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema, properties: Dict): """ Creates a new Kafka streaming source consumer for Kafka 0.10.x. This constructor allows passing multiple topics to the consumer. :param topics: The Kafka topics to read from. :param deserialization_schema: The de-/serializer used to convert between Kafka's byte messages and Flink's objects. :param properties: The properties that are used to configure both the fetcher and the offset handler. """ JFlinkKafkaConsumer = get_gateway().jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, JFlinkKafkaConsumer) super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)
class FlinkKafkaProducerBase(SinkFunction, abc.ABC): """ Flink Sink to produce data into a Kafka topic. Please note that this producer provides at-least-once reliability guarantees when checkpoints are enabled and set_flush_on_checkpoint(True) is set. Otherwise, the producer doesn;t provid any reliability guarantees. """ def __init__(self, j_flink_kafka_producer): super(FlinkKafkaProducerBase, self).__init__(sink_func=j_flink_kafka_producer) def set_log_failures_only(self, log_failures_only: bool) -> 'FlinkKafkaProducerBase': """ Defines whether the producer should fail on errors, or only log them. If this is set to true, then exceptions will be only logged, if set to false, exceptions will be eventually thrown and cause the streaming program to fail (and enter recovery). :param log_failures_only: The flag to indicate logging-only on exceptions. """ self._j_function.setLogFailuresOnly(log_failures_only) return self def set_flush_on_checkpoint(self, flush_on_checkpoint: bool) -> 'FlinkKafkaProducerBase': """ If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. :param flush_on_checkpoint: Flag indicating the flush mode (true = flush on checkpoint) """ self._j_function.setFlushOnCheckpoint(flush_on_checkpoint) return self def set_write_timestamp_to_kafka(self, write_timestamp_to_kafka: bool) -> 'FlinkKafkaProducerBase': """ If set to true, Flink will write the (event time) timestamp attached to each record into Kafka. Timestamps must be positive for Kafka to accept them. :param write_timestamp_to_kafka: Flag indicating if Flink's internal timestamps are written to Kafka. """ self._j_function.setWriteTimestampToKafka(write_timestamp_to_kafka) return self class Semantic(Enum): """ Semantics that can be chosen. :data: `EXACTLY_ONCE`: The Flink producer will write all messages in a Kafka transaction that will be committed to the Kafka on a checkpoint. In this mode FlinkKafkaProducer sets up a pool of FlinkKafkaProducer. Between each checkpoint there is created new Kafka transaction, which is being committed on FlinkKafkaProducer.notifyCheckpointComplete(long). If checkpoint complete notifications are running late, FlinkKafkaProducer can run out of FlinkKafkaProducers in the pool. In that case any subsequent FlinkKafkaProducer.snapshot- State() requests will fail and the FlinkKafkaProducer will keep using the FlinkKafkaProducer from previous checkpoint. To decrease chances of failing checkpoints there are four options: 1. decrease number of max concurrent checkpoints 2. make checkpoints mre reliable (so that they complete faster) 3. increase delay between checkpoints 4. increase size of FlinkKafkaProducers pool :data: `AT_LEAST_ONCE`: The Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. :data: `NONE`: Means that nothing will be guaranteed. Messages can be lost and/or duplicated in case of failure. """ EXACTLY_ONCE = 0, AT_LEAST_ONCE = 1, NONE = 2 def _to_j_semantic(self): JSemantic = get_gateway().jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic return getattr(JSemantic, self.name)
[docs]class FlinkKafkaProducer(FlinkKafkaProducerBase): """ Flink Sink to produce data into a Kafka topic. By default producer will use AT_LEAST_ONCE semantic. Before using EXACTLY_ONCE please refer to Flink's Kafka connector documentation. """ def __init__(self, topic: str, serialization_schema: SerializationSchema, producer_config: Dict, kafka_producer_pool_size: int = 5, semantic=Semantic.AT_LEAST_ONCE): """ Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to the topic. Using this constructor, the default FlinkFixedPartitioner will be used as the partitioner. This default partitioner maps each sink subtask to a single Kafka partition (i.e. all records received by a sink subtask will end up in the same Kafka partition). :param topic: ID of the Kafka topic. :param serialization_schema: User defined key-less serialization schema. :param producer_config: Properties with the producer configuration. """ gateway = get_gateway() j_properties = gateway.jvm.java.util.Properties() for key, value in producer_config.items(): j_properties.setProperty(key, value) JFlinkKafkaProducer = gateway.jvm \ .org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer j_flink_kafka_producer = JFlinkKafkaProducer( topic, serialization_schema._j_serialization_schema, j_properties, None, semantic._to_j_semantic(), kafka_producer_pool_size) super(FlinkKafkaProducer, self).__init__(j_flink_kafka_producer=j_flink_kafka_producer) def ignore_failures_after_transaction_timeout(self) -> 'FlinkKafkaProducer': """ Disables the propagation of exceptions thrown when committing presumably timed out Kafka transactions during recovery of the job. If a Kafka transaction is timed out, a commit will never be successful. Hence, use this feature to avoid recovery loops of the Job. Exceptions will still be logged to inform the user that data loss might have occurred. Note that we use the System.currentTimeMillis() to track the age of a transaction. Moreover, only exceptions thrown during the recovery are caught, i.e., the producer will attempt at least one commit of the transaction before giving up. :return: This FlinkKafkaProducer. """ self._j_function.ignoreFailuresAfterTransactionTimeout() return self
def _get_kafka_consumer(topics, properties, deserialization_schema, j_consumer_clz): if not isinstance(topics, list): topics = [topics] gateway = get_gateway() j_properties = gateway.jvm.java.util.Properties() for key, value in properties.items(): j_properties.setProperty(key, value) j_flink_kafka_consumer = j_consumer_clz(topics, deserialization_schema._j_deserialization_schema, j_properties) return j_flink_kafka_consumer
[docs]class JdbcSink(SinkFunction): def __init__(self, j_jdbc_sink): super(JdbcSink, self).__init__(sink_func=j_jdbc_sink) @staticmethod def sink(sql: str, type_info: RowTypeInfo, jdbc_connection_options: 'JdbcConnectionOptions', jdbc_execution_options: 'JdbcExecutionOptions' = None): """ Create a JDBC sink. :param sql: arbitrary DML query (e.g. insert, update, upsert) :param type_info: A RowTypeInfo for query field types. :param jdbc_execution_options: parameters of execution, such as batch size and maximum retries. :param jdbc_connection_options: parameters of connection, such as JDBC URL. :return: A JdbcSink. """ sql_types = [] gateway = get_gateway() JJdbcTypeUtil = gateway.jvm.org.apache.flink.connector.jdbc.utils.JdbcTypeUtil for field_type in type_info.get_field_types(): sql_types.append(JJdbcTypeUtil .typeInformationToSqlType(field_type.get_java_type_info())) j_sql_type = to_jarray(gateway.jvm.int, sql_types) output_format_clz = gateway.jvm.Class\ .forName('org.apache.flink.connector.jdbc.internal.JdbcOutputFormat', False, get_gateway().jvm.Thread.currentThread().getContextClassLoader()) j_int_array_type = to_jarray(gateway.jvm.int, []).getClass() j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder', to_jarray(gateway.jvm.Class, [j_int_array_type])) j_builder_method.setAccessible(True) j_statement_builder = j_builder_method.invoke(None, to_jarray(gateway.jvm.Object, [j_sql_type])) jdbc_execution_options = jdbc_execution_options if jdbc_execution_options is not None \ else JdbcExecutionOptions.defaults() j_jdbc_sink = gateway.jvm.org.apache.flink.connector.jdbc.JdbcSink\ .sink(sql, j_statement_builder, jdbc_execution_options._j_jdbc_execution_options, jdbc_connection_options._j_jdbc_connection_options) return JdbcSink(j_jdbc_sink=j_jdbc_sink)
[docs]class JdbcConnectionOptions(object): """ JDBC connection options. """ def __init__(self, j_jdbc_connection_options): self._j_jdbc_connection_options = j_jdbc_connection_options def get_db_url(self) -> str: return self._j_jdbc_connection_options.getDbURL() def get_driver_name(self) -> str: return self._j_jdbc_connection_options.getDriverName() def get_user_name(self) -> str: return self._j_jdbc_connection_options.getUsername() def get_password(self) -> str: return self._j_jdbc_connection_options.getPassword() class JdbcConnectionOptionsBuilder(object): """ Builder for JdbcConnectionOptions. """ def __init__(self): self._j_options_builder = get_gateway().jvm.org.apache.flink.connector\ .jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder() def with_url(self, url: str) -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withUrl(url) return self def with_driver_name(self, driver_name: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withDriverName(driver_name) return self def with_user_name(self, user_name: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withUsername(user_name) return self def with_password(self, password: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withPassword(password) return self def build(self) -> 'JdbcConnectionOptions': return JdbcConnectionOptions(j_jdbc_connection_options=self._j_options_builder.build())
[docs]class JdbcExecutionOptions(object): """ JDBC sink batch options. """ def __init__(self, j_jdbc_execution_options): self._j_jdbc_execution_options = j_jdbc_execution_options def get_batch_interval_ms(self) -> int: return self._j_jdbc_execution_options.getBatchIntervalMs() def get_batch_size(self) -> int: return self._j_jdbc_execution_options.getBatchSize() def get_max_retries(self) -> int: return self._j_jdbc_execution_options.getMaxRetries() @staticmethod def defaults() -> 'JdbcExecutionOptions': return JdbcExecutionOptions( j_jdbc_execution_options=get_gateway().jvm .org.apache.flink.connector.jdbc.JdbcExecutionOptions.defaults()) @staticmethod def builder() -> 'Builder': return JdbcExecutionOptions.Builder() class Builder(object): """ Builder for JdbcExecutionOptions. """ def __init__(self): self._j_builder = get_gateway().jvm\ .org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder() def with_batch_size(self, size: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withBatchSize(size) return self def with_batch_interval_ms(self, interval_ms: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withBatchIntervalMs(interval_ms) return self def with_max_retries(self, max_retries: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withMaxRetries(max_retries) return self def build(self) -> 'JdbcExecutionOptions': return JdbcExecutionOptions(j_jdbc_execution_options=self._j_builder.build())
[docs]class RollingPolicy(object): """ The policy based on which a Bucket in the FileSink rolls its currently open part file and opens a new one. """ def __init__(self, j_rolling_policy): self._j_rolling_policy = j_rolling_policy @staticmethod def default_rolling_policy( part_size: int = 1024 * 1024 * 128, rollover_interval: int = 60 * 1000, inactivity_interval: int = 60 * 1000) -> 'RollingPolicy': """ Returns the default implementation of the RollingPolicy. This policy rolls a part file if: - there is no open part file, - the current file has reached the maximum bucket size (by default 128MB), - the current file is older than the roll over interval (by default 60 sec), or - the current file has not been written to for more than the allowed inactivityTime (by default 60 sec). :param part_size: The maximum part file size before rolling. :param rollover_interval: The maximum time duration a part file can stay open before rolling. :param inactivity_interval: The time duration of allowed inactivity after which a part file will have to roll. """ JDefaultRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions.\ sink.filesystem.rollingpolicies.DefaultRollingPolicy j_rolling_policy = JDefaultRollingPolicy.builder()\ .withMaxPartSize(part_size) \ .withRolloverInterval(rollover_interval) \ .withInactivityInterval(inactivity_interval) \ .build() return RollingPolicy(j_rolling_policy) @staticmethod def on_checkpoint_rolling_policy() -> 'RollingPolicy': """ Returns a RollingPolicy which rolls (ONLY) on every checkpoint. """ JOnCheckpointRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions. \ sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy return RollingPolicy(JOnCheckpointRollingPolicy.build())
[docs]class BucketAssigner(object): """ A BucketAssigner is used with a file sink to determine the bucket each incoming element should be put into. The StreamingFileSink can be writing to many buckets at a time, and it is responsible for managing a set of active buckets. Whenever a new element arrives it will ask the BucketAssigner for the bucket the element should fall in. The BucketAssigner can, for example, determine buckets based on system time. """ def __init__(self, j_bucket_assigner): self._j_bucket_assigner = j_bucket_assigner @staticmethod def base_path_bucket_assigner() -> 'BucketAssigner': """ Creates a BucketAssigner that does not perform any bucketing of files. All files are written to the base path. """ return BucketAssigner(get_gateway().jvm.org.apache.flink.streaming.api.functions.sink. filesystem.bucketassigners.BasePathBucketAssigner()) @staticmethod def date_time_bucket_assigner(format_str: str = "yyyy-MM-dd--HH", timezone_id: str = None): """ Creates a BucketAssigner that assigns to buckets based on current system time. It will create directories of the following form: /{basePath}/{dateTimePath}/}. The basePath is the path that was specified as a base path when creating the new bucket. The dateTimePath is determined based on the current system time and the user provided format string. The Java DateTimeFormatter is used to derive a date string from the current system time and the date format string. The default format string is "yyyy-MM-dd--HH" so the rolling files will have a granularity of hours. :param format_str: The format string used to determine the bucket id. :param timezone_id: The timezone id, either an abbreviation such as "PST", a full name such as "America/Los_Angeles", or a custom timezone_id such as "GMT-08:00". Th e default time zone will b used if it's None. """ if timezone_id is not None and isinstance(timezone_id, str): j_timezone = get_gateway().jvm.java.time.ZoneId.of(timezone_id) else: j_timezone = get_gateway().jvm.java.time.ZoneId.systemDefault() return BucketAssigner( get_gateway().jvm.org.apache.flink.streaming.api.functions.sink. filesystem.bucketassigners.DateTimeBucketAssigner(format_str, j_timezone))
[docs]class StreamingFileSink(SinkFunction): """ Sink that emits its input elements to `FileSystem` files within buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics. When creating the sink a `basePath` must be specified. The base directory contains one directory for every bucket. The bucket directories themselves contain several part files, with at least one for each parallel subtask of the sink which is writing data to that bucket. These part files contain the actual output data. """ def __init__(self, j_obj): super(StreamingFileSink, self).__init__(j_obj) class DefaultRowFormatBuilder(object): """ Builder for the vanilla `StreamingFileSink` using a row format. """ def __init__(self, j_default_row_format_builder): self.j_default_row_format_builder = j_default_row_format_builder def with_bucket_check_interval( self, interval: int) -> 'StreamingFileSink.DefaultRowFormatBuilder': self.j_default_row_format_builder.withBucketCheckInterval(interval) return self def with_bucket_assigner( self, bucket_assigner: BucketAssigner) -> 'StreamingFileSink.DefaultRowFormatBuilder': self.j_default_row_format_builder.withBucketAssigner(bucket_assigner._j_bucket_assigner) return self def with_rolling_policy( self, policy: RollingPolicy) -> 'StreamingFileSink.DefaultRowFormatBuilder': self.j_default_row_format_builder.withRollingPolicy(policy._j_rolling_policy) return self def with_output_file_config( self, output_file_config: 'OutputFileConfig') \ -> 'StreamingFileSink.DefaultRowFormatBuilder': self.j_default_row_format_builder.withOutputFileConfig( output_file_config._j_output_file_config) return self def build(self) -> 'StreamingFileSink': j_stream_file_sink = self.j_default_row_format_builder.build() return StreamingFileSink(j_stream_file_sink) @staticmethod def for_row_format(base_path: str, encoder: Encoder) -> 'DefaultRowFormatBuilder': j_path = get_gateway().jvm.org.apache.flink.core.fs.Path(base_path) j_default_row_format_builder = get_gateway().jvm.org.apache.flink.streaming.api.\ functions.sink.filesystem.StreamingFileSink.forRowFormat(j_path, encoder._j_encoder) return StreamingFileSink.DefaultRowFormatBuilder(j_default_row_format_builder)
[docs]class OutputFileConfig(object): """ Part file name configuration. This allow to define a prefix and a suffix to the part file name. """ @staticmethod def builder(): return OutputFileConfig.OutputFileConfigBuilder() def __init__(self, part_prefix: str, part_suffix: str): self._j_output_file_config = get_gateway().jvm.org.apache.flink.streaming.api.\ functions.sink.filesystem.OutputFileConfig(part_prefix, part_suffix) def get_part_prefix(self) -> str: """ The prefix for the part name. """ return self._j_output_file_config.getPartPrefix() def get_part_suffix(self) -> str: """ The suffix for the part name. """ return self._j_output_file_config.getPartSuffix() class OutputFileConfigBuilder(object): """ A builder to create the part file configuration. """ def __init__(self): self.part_prefix = "part" self.part_suffix = "" def with_part_prefix(self, prefix) -> 'OutputFileConfig.OutputFileConfigBuilder': self.part_prefix = prefix return self def with_part_suffix(self, suffix) -> 'OutputFileConfig.OutputFileConfigBuilder': self.part_suffix = suffix return self def build(self) -> 'OutputFileConfig': return OutputFileConfig(self.part_prefix, self.part_suffix)
class Source(JavaFunctionWrapper): """ Base class for all unified data source in Flink. """ def __init__(self, source: Union[str, JavaObject]): """ Constructor of Source. :param source: The java Source object. """ super(Source, self).__init__(source) class Sink(JavaFunctionWrapper): """ Base class for all unified data sink in Flink. """ def __init__(self, sink: Union[str, JavaObject]): """ Constructor of Sink. :param sink: The java Sink object. """ super(Sink, self).__init__(sink)
[docs]class StreamFormat(object): """ A reader format that reads individual records from a stream. Compared to the :class:`~pyflink.datastream.connectors.FileSource.BulkFormat`, the stream format handles a few things out-of-the-box, like deciding how to batch records or dealing with compression. Internally in the file source, the readers pass batches of records from the reading threads (that perform the typically blocking I/O operations) to the async mailbox threads that do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time) much reduces the thread-to-thread handover overhead. This batching is by default based on I/O fetch size for the StreamFormat, meaning the set of records derived from one I/O buffer will be handed over as one. See config option `source.file.stream.io-fetch-size` to configure that fetch size. """ def __init__(self, j_stream_format): self._j_stream_format = j_stream_format @staticmethod def text_line_format(charset_name: str = "UTF-8") -> 'StreamFormat': """ Creates a reader format that text lines from a file. The reader uses Java's built-in java.io.InputStreamReader to decode the byte stream using various supported charset encodings. This format does not support optimized recovery from checkpoints. On recovery, it will re-read and discard the number of lined that were processed before the last checkpoint. That is due to the fact that the offsets of lines in the file cannot be tracked through the charset decoders with their internal buffering of stream input and charset decoder state. :param charset_name: The charset to decode the byte stream. """ j_stream_format = get_gateway().jvm.org.apache.flink.connector.file.src.reader. \ TextLineInputFormat(charset_name) return StreamFormat(j_stream_format)
[docs]class FileEnumeratorProvider(object): """ Factory for FileEnumerator which task is to discover all files to be read and to split them into a set of file source splits. This includes possibly, path traversals, file filtering (by name or other patterns) and deciding whether to split files into multiple splits, and how to split them. """ def __init__(self, j_file_enumerator_provider): self._j_file_enumerator_provider = j_file_enumerator_provider @staticmethod def default_splittable_file_enumerator() -> 'FileEnumeratorProvider': """ The default file enumerator used for splittable formats. The enumerator recursively enumerates files, split files that consist of multiple distributed storage blocks into multiple splits, and filters hidden files (files starting with '.' or '_'). Files with suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...) will not be split. """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileEnumeratorProvider(JFileSource.DEFAULT_SPLITTABLE_FILE_ENUMERATOR) @staticmethod def default_non_splittable_file_enumerator() -> 'FileEnumeratorProvider': """ The default file enumerator used for non-splittable formats. The enumerator recursively enumerates files, creates one split for the file, and filters hidden files (files starting with '.' or '_'). """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileEnumeratorProvider(JFileSource.DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR)
[docs]class FileSplitAssignerProvider(object): """ Factory for FileSplitAssigner which is responsible for deciding what split should be processed next by which node. It determines split processing order and locality. """ def __init__(self, j_file_split_assigner): self._j_file_split_assigner = j_file_split_assigner @staticmethod def locality_aware_split_assigner() -> 'FileSplitAssignerProvider': """ A FileSplitAssigner that assigns to each host preferably splits that are local, before assigning splits that are not local. """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileSplitAssignerProvider(JFileSource.DEFAULT_SPLIT_ASSIGNER)
[docs]class FileSourceBuilder(object): """ The builder for the :class:`~pyflink.datastream.connectors.FileSource`, to configure the various behaviors. Start building the source via one of the following methods: - :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format` """ def __init__(self, j_file_source_builder): self._j_file_source_builder = j_file_source_builder def monitor_continuously( self, discovery_interval: Duration) -> 'FileSourceBuilder': """ Sets this source to streaming ("continuous monitoring") mode. This makes the source a "continuous streaming" source that keeps running, monitoring for new files, and reads these files when they appear and are discovered by the monitoring. The interval in which the source checks for new files is the discovery_interval. Shorter intervals mean that files are discovered more quickly, but also imply more frequent listing or directory traversal of the file system / object store. """ self._j_file_source_builder.monitorContinuously(discovery_interval._j_duration) return self def process_static_file_set(self) -> 'FileSourceBuilder': """ Sets this source to bounded (batch) mode. In this mode, the source processes the files that are under the given paths when the application is started. Once all files are processed, the source will finish. This setting is also the default behavior. This method is mainly here to "switch back" to bounded (batch) mode, or to make it explicit in the source construction. """ self._j_file_source_builder.processStaticFileSet() return self def set_file_enumerator( self, file_enumerator: 'FileEnumeratorProvider') -> 'FileSourceBuilder': """ Configures the FileEnumerator for the source. The File Enumerator is responsible for selecting from the input path the set of files that should be processed (and which to filter out). Furthermore, the File Enumerator may split the files further into sub-regions, to enable parallelization beyond the number of files. """ self._j_file_source_builder.setFileEnumerator( file_enumerator._j_file_enumerator_provider) return self def set_split_assigner( self, split_assigner: 'FileSplitAssignerProvider') -> 'FileSourceBuilder': """ Configures the FileSplitAssigner for the source. The File Split Assigner determines which parallel reader instance gets which {@link FileSourceSplit}, and in which order these splits are assigned. """ self._j_file_source_builder.setSplitAssigner(split_assigner._j_file_split_assigner) return self def build(self) -> 'FileSource': """ Creates the file source with the settings applied to this builder. """ return FileSource(self._j_file_source_builder.build())
[docs]class FileSource(Source): """ A unified data source that reads files - both in batch and in streaming mode. This source supports all (distributed) file systems and object stores that can be accessed via the Flink's FileSystem class. Start building a file source via one of the following calls: - :func:`~pyflink.datastream.connectors.FileSource.for_record_stream_format` This creates a :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` on which you can configure all the properties of the file source. <h2>Batch and Streaming</h2> This source supports both bounded/batch and continuous/streaming data inputs. For the bounded/batch case, the file source processes all files under the given path(s). In the continuous/streaming case, the source periodically checks the paths for new files and will start reading those. When you start creating a file source (via the :class:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder` created through one of the above-mentioned methods) the source is by default in bounded/batch mode. Call :func:`~pyflink.datastream.connectors.FileSource.FileSourceBuilder.monitor_continuously` to put the source into continuous streaming mode. <h2>Format Types</h2> The reading of each file happens through file readers defined by <i>file formats</i>. These define the parsing logic for the contents of the file. There are multiple classes that the source supports. Their interfaces trade of simplicity of implementation and flexibility/efficiency. - A :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply (such as object reuse, batching, etc.). <h2>Discovering / Enumerating Files</h2> The way that the source lists the files to be processes is defined by the :class:`~pyflink.datastream.connectors.FileSource.FileEnumeratorProvider`. The FileEnumeratorProvider is responsible to select the relevant files (for example filter out hidden files) and to optionally splits files into multiple regions (= file source splits) that can be read in parallel). """ def __init__(self, j_file_source): super(FileSource, self).__init__(source=j_file_source) @staticmethod def for_record_stream_format(stream_format: StreamFormat, *paths: str) -> FileSourceBuilder: """ Builds a new FileSource using a :class:`~pyflink.datastream.connectors.FileSource.StreamFormat` to read record-by-record from a file stream. When possible, stream-based formats are generally easier (preferable) to file-based formats, because they support better default behavior around I/O batching or progress tracking (checkpoints). Stream formats also automatically de-compress files based on the file extension. This supports files ending in ".deflate" (Deflate), ".xz" (XZ), ".bz2" (BZip2), ".gz", ".gzip" (GZip). """ JPath = get_gateway().jvm.org.apache.flink.core.fs.Path JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource j_paths = to_jarray(JPath, [JPath(p) for p in paths]) return FileSourceBuilder( JFileSource.forRecordStreamFormat(stream_format._j_stream_format, j_paths))
[docs]class NumberSequenceSource(Source): """ A data source that produces a sequence of numbers (longs). This source is useful for testing and for cases that just need a stream of N events of any kind. The source splits the sequence into as many parallel sub-sequences as there are parallel source readers. Each sub-sequence will be produced in order. Consequently, if the parallelism is limited to one, this will produce one sequence in order. This source is always bounded. For very long sequences (for example over the entire domain of long integer values), user may want to consider executing the application in a streaming manner, because, despite the fact that the produced stream is bounded, the end bound is pretty far away. """ def __init__(self, start, end): """ Creates a new NumberSequenceSource that produces parallel sequences covering the range start to end (both boundaries are inclusive). """ JNumberSequenceSource = get_gateway().jvm.org.apache.flink.api.connector.source.lib.\ NumberSequenceSource j_seq_source = JNumberSequenceSource(start, end) super(NumberSequenceSource, self).__init__(source=j_seq_source)
[docs]class FileSink(Sink): """ A unified sink that emits its input elements to FileSystem files within buckets. This sink achieves exactly-once semantics for both BATCH and STREAMING. When creating the sink a basePath must be specified. The base directory contains one directory for every bucket. The bucket directories themselves contain several part files, with at least one for each parallel subtask of the sink which is writing data to that bucket. These part files contain the actual output data. The sink uses a BucketAssigner to determine in which bucket directory each element should be written to inside the base directory. The BucketAssigner can, for example, roll on every checkpoint or use time or a property of the element to determine the bucket directory. The default BucketAssigner is a DateTimeBucketAssigner which will create one new bucket every hour. You can specify a custom BucketAssigner using the :func:`~pyflink.datastream.connectors.FileSink.RowFormatBuilder.with_bucket_assigner`, after calling :class:`~pyflink.datastream.connectors.FileSink.for_row_format`. The names of the part files could be defined using OutputFileConfig. This configuration contains a part prefix and a part suffix that will be used with a random uid assigned to each subtask of the sink and a rolling counter to determine the file names. For example with a prefix "prefix" and a suffix ".ext", a file named {@code "prefix-81fc4980-a6af-41c8-9937-9939408a734b-17.ext"} contains the data from subtask with uid {@code 81fc4980-a6af-41c8-9937-9939408a734b} of the sink and is the {@code 17th} part-file created by that subtask. Part files roll based on the user-specified RollingPolicy. By default, a DefaultRollingPolicy is used for row-encoded sink output; a OnCheckpointRollingPolicy is used for bulk-encoded sink output. In some scenarios, the open buckets are required to change based on time. In these cases, the user can specify a bucket_check_interval (by default 1m) and the sink will check periodically and roll the part file if the specified rolling policy says so. Part files can be in one of three states: in-progress, pending or finished. The reason for this is how the sink works to provide exactly-once semantics and fault-tolerance. The part file that is currently being written to is in-progress. Once a part file is closed for writing it becomes pending. When a checkpoint is successful (for STREAMING) or at the end of the job (for BATCH) the currently pending files will be moved to finished. For STREAMING in order to guarantee exactly-once semantics in case of a failure, the sink should roll back to the state it had when that last successful checkpoint occurred. To this end, when restoring, the restored files in pending state are transferred into the finished state while any in-progress files are rolled back, so that they do not contain data that arrived after the checkpoint from which we restore. """ def __init__(self, j_file_sink): super(FileSink, self).__init__(sink=j_file_sink) class RowFormatBuilder(object): """ Builder for the vanilla FileSink using a row format. """ def __init__(self, j_row_format_builder): self._j_row_format_builder = j_row_format_builder def with_bucket_check_interval(self, interval: int): """ :param interval: The check interval in milliseconds. """ self._j_row_format_builder.withBucketCheckInterval(interval) return self def with_bucket_assigner(self, bucket_assigner: BucketAssigner): self._j_row_format_builder.withBucketAssigner(bucket_assigner._j_bucket_assigner) return self def with_rolling_policy(self, rolling_policy: RollingPolicy): self._j_row_format_builder.withRollingPolicy(rolling_policy._j_rolling_policy) return self def with_output_file_config(self, output_file_config: OutputFileConfig): self._j_row_format_builder.withOutputFileConfig( output_file_config._j_output_file_config) return self def build(self): return FileSink(self._j_row_format_builder.build()) @staticmethod def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuilder': JPath = get_gateway().jvm.org.apache.flink.core.fs.Path JFileSink = get_gateway().jvm.org.apache.flink.connector.file.sink.FileSink return FileSink.RowFormatBuilder( JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
[docs]class PulsarDeserializationSchema(object): """ A schema bridge for deserializing the pulsar's Message into a flink managed instance. We support both the pulsar's self managed schema and flink managed schema. """ def __init__(self, _j_pulsar_deserialization_schema): self._j_pulsar_deserialization_schema = _j_pulsar_deserialization_schema @staticmethod def flink_schema(deserialization_schema: DeserializationSchema) \ -> 'PulsarDeserializationSchema': """ Create a PulsarDeserializationSchema by using the flink's DeserializationSchema. It would consume the pulsar message as byte array and decode the message by using flink's logic. """ JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \ .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkSchema( deserialization_schema._j_deserialization_schema) return PulsarDeserializationSchema(_j_pulsar_deserialization_schema) @staticmethod def flink_type_info(type_information: TypeInformation, execution_config: ExecutionConfig = None) -> 'PulsarDeserializationSchema': """ Create a PulsarDeserializationSchema by using the given TypeInformation. This method is only used for treating message that was written into pulsar by TypeInformation. """ JPulsarDeserializationSchema = get_gateway().jvm.org.apache.flink \ .connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema JExecutionConfig = get_gateway().jvm.org.apache.flink.api.common.ExecutionConfig _j_execution_config = execution_config._j_execution_config \ if execution_config is not None else JExecutionConfig() _j_pulsar_deserialization_schema = JPulsarDeserializationSchema.flinkTypeInfo( type_information.get_java_type_info(), _j_execution_config) return PulsarDeserializationSchema(_j_pulsar_deserialization_schema)
[docs]class SubscriptionType(Enum): """ Types of subscription supported by Pulsar. :data: `Exclusive`: There can be only 1 consumer on the same topic with the same subscription name. :data: `Shared`: Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to a round-robin rotation between the connected consumers. In this mode, the consumption order is not guaranteed. :data: `Failover`: Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages. In failover mode, the consumption ordering is guaranteed. In case of partitioned topics, the ordering is guaranteed on a per-partition basis. The partitions assignments will be split across the available consumers. On each partition, at most one consumer will be active at a given point in time. :data: `Key_Shared`: Multiple consumer will be able to use the same subscription and all messages with the same key will be dispatched to only one consumer. Use ordering_key to overwrite the message key for message ordering. """ Exclusive = 0, Shared = 1, Failover = 2, Key_Shared = 3 def _to_j_subscription_type(self): JSubscriptionType = get_gateway().jvm.org.apache.pulsar.client.api.SubscriptionType return getattr(JSubscriptionType, self.name)
[docs]class StartCursor(object): """ A factory class for users to specify the start position of a pulsar subscription. Since it would be serialized into split. The implementation for this interface should be well considered. I don't recommend adding extra internal state for this implementation. This class would be used only for SubscriptionType.Exclusive and SubscriptionType.Failover. """ def __init__(self, _j_start_cursor): self._j_start_cursor = _j_start_cursor @staticmethod def default_start_cursor() -> 'StartCursor': return StartCursor.earliest() @staticmethod def earliest() -> 'StartCursor': JStartCursor = get_gateway().jvm \ .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor return StartCursor(JStartCursor.earliest()) @staticmethod def latest() -> 'StartCursor': JStartCursor = get_gateway().jvm \ .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor return StartCursor(JStartCursor.latest()) @staticmethod def from_message_time(timestamp: int) -> 'StartCursor': JStartCursor = get_gateway().jvm \ .org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor return StartCursor(JStartCursor.fromMessageTime(timestamp))
[docs]class StopCursor(object): """ A factory class for users to specify the stop position of a pulsar subscription. Since it would be serialized into split. The implementation for this interface should be well considered. I don't recommend adding extra internal state for this implementation. """ def __init__(self, _j_stop_cursor): self._j_stop_cursor = _j_stop_cursor @staticmethod def default_stop_cursor() -> 'StopCursor': return StopCursor.never() @staticmethod def never() -> 'StopCursor': JStopCursor = get_gateway().jvm \ .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor return StopCursor(JStopCursor.never()) @staticmethod def latest() -> 'StopCursor': JStopCursor = get_gateway().jvm \ .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor return StopCursor(JStopCursor.latest()) @staticmethod def at_event_time(timestamp: int) -> 'StopCursor': JStopCursor = get_gateway().jvm \ .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor return StopCursor(JStopCursor.atEventTime(timestamp))
[docs]class PulsarSource(Source): """ The Source implementation of Pulsar. Please use a PulsarSourceBuilder to construct a PulsarSource. The following example shows how to create a PulsarSource emitting records of String type. Example: :: >>> source = PulsarSource() \\ ... .builder() \\ ... .set_topics(TOPIC1, TOPIC2) \\ ... .set_service_url(get_service_url()) \\ ... .set_admin_url(get_admin_url()) \\ ... .set_subscription_name("test") \\ ... .set_deserialization_schema( ... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\ ... .set_bounded_stop_cursor(StopCursor.default_stop_cursor()) \\ ... .build() See PulsarSourceBuilder for more details. """ def __init__(self, j_pulsar_source): super(PulsarSource, self).__init__(source=j_pulsar_source) @staticmethod def builder() -> 'PulsarSourceBuilder': """ Get a PulsarSourceBuilder to builder a PulsarSource. """ return PulsarSourceBuilder()
[docs]class PulsarSourceBuilder(object): """ The builder class for PulsarSource to make it easier for the users to construct a PulsarSource. The following example shows the minimum setup to create a PulsarSource that reads the String values from a Pulsar topic. Example: :: >>> source = PulsarSource() \\ ... .builder() \\ ... .set_service_url(PULSAR_BROKER_URL) \\ ... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\ ... .set_subscription_name("flink-source-1") \\ ... .set_topics([TOPIC1, TOPIC2]) \\ ... .set_deserialization_schema( ... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\ ... .build() The service url, admin url, subscription name, topics to consume, and the record deserializer are required fields that must be set. To specify the starting position of PulsarSource, one can call set_start_cursor(StartCursor). By default the PulsarSource runs in an Boundedness.CONTINUOUS_UNBOUNDED mode and never stop until the Flink job is canceled or fails. To let the PulsarSource run in Boundedness.CONTINUOUS_UNBOUNDED but stops at some given offsets, one can call set_unbounded_stop_cursor(StopCursor). For example the following PulsarSource stops after it consumes up to a event time when the Flink started. Example: :: >>> source = PulsarSource() \\ ... .builder() \\ ... .set_service_url(PULSAR_BROKER_URL) \\ ... .set_admin_url(PULSAR_BROKER_HTTP_URL) \\ ... .set_subscription_name("flink-source-1") \\ ... .set_topics([TOPIC1, TOPIC2]) \\ ... .set_deserialization_schema( ... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\ ... .set_bounded_stop_cursor(StopCursor.at_event_time(int(time.time() * 1000))) ... .build() """ def __init__(self): JPulsarSource = \ get_gateway().jvm.org.apache.flink.connector.pulsar.source.PulsarSource self._j_pulsar_source_builder = JPulsarSource.builder() def set_admin_url(self, admin_url: str) -> 'PulsarSourceBuilder': """ Sets the admin endpoint for the PulsarAdmin of the PulsarSource. """ self._j_pulsar_source_builder.setAdminUrl(admin_url) return self def set_service_url(self, service_url: str) -> 'PulsarSourceBuilder': """ Sets the server's link for the PulsarConsumer of the PulsarSource. """ self._j_pulsar_source_builder.setServiceUrl(service_url) return self def set_subscription_name(self, subscription_name: str) -> 'PulsarSourceBuilder': """ Sets the name for this pulsar subscription. """ self._j_pulsar_source_builder.setSubscriptionName(subscription_name) return self def set_subscription_type(self, subscription_type: SubscriptionType) -> 'PulsarSourceBuilder': """ SubscriptionType is the consuming behavior for pulsar, we would generator different split by the given subscription type. Please take some time to consider which subscription type matches your application best. Default is SubscriptionType.Shared. """ self._j_pulsar_source_builder.setSubscriptionType( subscription_type._to_j_subscription_type()) return self def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder': """ Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this non-existed topic wouldn't throw any exception. But the best solution is just consuming by using a topic regex. You can set topics once either with set_topics or set_topic_pattern in this builder. """ if not isinstance(topics, list): topics = [topics] self._j_pulsar_source_builder.setTopics(topics) return self def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder': """ Set a topic pattern to consume from the java regex str. You can set topics once either with set_topics or set_topic_pattern in this builder. """ self._j_pulsar_source_builder.setTopicPattern(topics_pattern) return self def set_topic_pattern(self, topic_pattern: str) -> 'PulsarSourceBuilder': """ Set a topic pattern to consume from the java regex str. You can set topics once either with set_topics or set_topic_pattern in this builder. """ self._j_pulsar_source_builder.setTopicPattern(topic_pattern) return self def set_start_cursor(self, start_cursor: StartCursor) -> 'PulsarSourceBuilder': """ Specify from which offsets the PulsarSource should start consume from by providing an StartCursor. """ self._j_pulsar_source_builder.setStartCursor(start_cursor._j_start_cursor) return self def set_unbounded_stop_cursor(self, stop_cursor: StopCursor) -> 'PulsarSourceBuilder': """ By default the PulsarSource is set to run in Boundedness.CONTINUOUS_UNBOUNDED manner and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run as a streaming source but still stops at some point, one can set an StopCursor to specify the stopping offsets for each partition. When all the partitions have reached their stopping offsets, the PulsarSource will then exit. This method is different from set_bounded_stop_cursor(StopCursor) that after setting the stopping offsets with this method, PulsarSource.getBoundedness() will still return Boundedness.CONTINUOUS_UNBOUNDED even though it will stop at the stopping offsets specified by the stopping offsets StopCursor. """ self._j_pulsar_source_builder.setUnboundedStopCursor(stop_cursor._j_stop_cursor) return self def set_bounded_stop_cursor(self, stop_cursor: StopCursor) -> 'PulsarSourceBuilder': """ By default the PulsarSource is set to run in Boundedness.CONTINUOUS_UNBOUNDED manner and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in Boundedness.BOUNDED manner and stops at some point, one can set an StopCursor to specify the stopping offsets for each partition. When all the partitions have reached their stopping offsets, the PulsarSource will then exit. This method is different from set_unbounded_stop_cursor(StopCursor) that after setting the stopping offsets with this method, PulsarSource.getBoundedness() will return Boundedness.BOUNDED instead of Boundedness.CONTINUOUS_UNBOUNDED. """ self._j_pulsar_source_builder.setBoundedStopCursor(stop_cursor._j_stop_cursor) return self def set_deserialization_schema(self, pulsar_deserialization_schema: PulsarDeserializationSchema) \ -> 'PulsarSourceBuilder': """ DeserializationSchema is required for getting the Schema for deserialize message from pulsar and getting the TypeInformation for message serialization in flink. We have defined a set of implementations, using PulsarDeserializationSchema#flink_type_info or PulsarDeserializationSchema#flink_schema for creating the desired schema. """ self._j_pulsar_source_builder.setDeserializationSchema( pulsar_deserialization_schema._j_pulsar_deserialization_schema) return self def set_config(self, key: ConfigOption, value) -> 'PulsarSourceBuilder': """ Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be found in PulsarSourceOptions and PulsarOptions. Make sure the option could be set only once or with same value. """ self._j_pulsar_source_builder.setConfig(key._j_config_option, value) return self def set_config_with_dict(self, config: Dict) -> 'PulsarSourceBuilder': """ Set arbitrary properties for the PulsarSource and PulsarConsumer. The valid keys can be found in PulsarSourceOptions and PulsarOptions. """ JConfiguration = get_gateway().jvm.org.apache.flink.configuration.Configuration self._j_pulsar_source_builder.setConfig(JConfiguration.fromMap(config)) return self def build(self) -> 'PulsarSource': """ Build the PulsarSource. """ return PulsarSource(self._j_pulsar_source_builder.build())
[docs]class RMQConnectionConfig(object): """ Connection Configuration for RMQ. """ def __init__(self, j_rmq_connection_config): self._j_rmq_connection_config = j_rmq_connection_config def get_host(self) -> str: return self._j_rmq_connection_config.getHost() def get_port(self) -> int: return self._j_rmq_connection_config.getPort() def get_virtual_host(self) -> str: return self._j_rmq_connection_config.getVirtualHost() def get_user_name(self) -> str: return self._j_rmq_connection_config.getUsername() def get_password(self) -> str: return self._j_rmq_connection_config.getPassword() def get_uri(self) -> str: return self._j_rmq_connection_config.getUri() def get_network_recovery_interval(self) -> int: return self._j_rmq_connection_config.getNetworkRecoveryInterval() def is_automatic_recovery(self) -> bool: return self._j_rmq_connection_config.isAutomaticRecovery() def is_topology_recovery(self) -> bool: return self._j_rmq_connection_config.isTopologyRecovery() def get_connection_timeout(self) -> int: return self._j_rmq_connection_config.getConnectionTimeout() def get_requested_channel_max(self) -> int: return self._j_rmq_connection_config.getRequestedChannelMax() def get_requested_frame_max(self) -> int: return self._j_rmq_connection_config.getRequestedFrameMax() def get_requested_heartbeat(self) -> int: return self._j_rmq_connection_config.getRequestedHeartbeat() class Builder(object): """ Builder for RMQConnectionConfig. """ def __init__(self): self._j_options_builder = get_gateway().jvm.org.apache.flink.streaming.connectors\ .rabbitmq.common.RMQConnectionConfig.Builder() def set_port(self, port: int) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setPort(port) return self def set_host(self, host: str) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setHost(host) return self def set_virtual_host(self, vhost: str) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setVirtualHost(vhost) return self def set_user_name(self, user_name: str) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setUserName(user_name) return self def set_password(self, password: str) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setPassword(password) return self def set_uri(self, uri: str) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setUri(uri) return self def set_topology_recovery_enabled( self, topology_recovery_enabled: bool) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setTopologyRecoveryEnabled(topology_recovery_enabled) return self def set_requested_heartbeat( self, requested_heartbeat: int) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setRequestedHeartbeat(requested_heartbeat) return self def set_requested_frame_max( self, requested_frame_max: int) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setRequestedFrameMax(requested_frame_max) return self def set_requested_channel_max( self, requested_channel_max: int) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setRequestedChannelMax(requested_channel_max) return self def set_network_recovery_interval( self, network_recovery_interval: int) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setNetworkRecoveryInterval(network_recovery_interval) return self def set_connection_timeout(self, connection_timeout: int) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setConnectionTimeout(connection_timeout) return self def set_automatic_recovery(self, automatic_recovery: bool) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setAutomaticRecovery(automatic_recovery) return self def set_prefetch_count(self, prefetch_count: int) -> 'RMQConnectionConfig.Builder': self._j_options_builder.setPrefetchCount(prefetch_count) return self def build(self) -> 'RMQConnectionConfig': return RMQConnectionConfig(self._j_options_builder.build())
[docs]class RMQSource(SourceFunction): def __init__(self, connection_config: 'RMQConnectionConfig', queue_name: str, use_correlation_id: bool, deserialization_schema: DeserializationSchema ): """ Creates a new RabbitMQ source. For exactly-once, you must set the correlation ids of messages at the producer. The correlation id must be unique. Otherwise the behavior of the source is undefined. If in doubt, set use_correlation_id to False. When correlation ids are not used, this source has at-least-once processing semantics when checkpointing is enabled. :param connection_config: The RabbiMQ connection configuration. :param queue_name: The queue to receive messages from. :param use_correlation_id: Whether the messages received are supplied with a unique id to deduplicate messages (in case of failed acknowledgments). Only used when checkpointing is enabled. :param deserialization_schema: A deserializer used to convert between RabbitMQ's messages and Flink's objects. """ JRMQSource = get_gateway().jvm.org.apache.flink.streaming.connectors.rabbitmq.RMQSource j_rmq_source = JRMQSource( connection_config._j_rmq_connection_config, queue_name, use_correlation_id, deserialization_schema._j_deserialization_schema ) super(RMQSource, self).__init__(source_func=j_rmq_source)
[docs]class RMQSink(SinkFunction): def __init__(self, connection_config: 'RMQConnectionConfig', queue_name: str, serialization_schema: SerializationSchema): """ Creates a new RabbitMQ sink. :param connection_config: The RabbiMQ connection configuration. :param queue_name: The queue to publish messages to. :param serialization_schema: A serializer used to convert Flink objects to bytes. """ JRMQSink = get_gateway().jvm.org.apache.flink.streaming.connectors.rabbitmq.RMQSink j_rmq_sink = JRMQSink( connection_config._j_rmq_connection_config, queue_name, serialization_schema._j_serialization_schema, ) super(RMQSink, self).__init__(sink_func=j_rmq_sink)