Source code for pyflink.datastream.connectors.file_system

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings
from abc import abstractmethod

from typing import TYPE_CHECKING, Optional

from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory

if TYPE_CHECKING:
    from pyflink.table.types import RowType

from pyflink.common import Duration, Encoder
from pyflink.datastream.connectors import Source, Sink
from pyflink.datastream.connectors.base import SupportsPreprocessing, StreamTransformer
from pyflink.datastream.functions import SinkFunction
from pyflink.common.utils import JavaObjectWrapper
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray, is_instance_of

__all__ = [
    'FileCompactor',
    'FileCompactStrategy',
    'OutputFileConfig',
    'FileSource',
    'FileSourceBuilder',
    'FileSink',
    'StreamingFileSink',
    'StreamFormat',
    'BulkFormat',
    'FileEnumeratorProvider',
    'FileSplitAssignerProvider',
    'RollingPolicy',
    'BucketAssigner'
]


# ---- FileSource ----

[docs]class FileEnumeratorProvider(object): """ Factory for FileEnumerator which task is to discover all files to be read and to split them into a set of file source splits. This includes possibly, path traversals, file filtering (by name or other patterns) and deciding whether to split files into multiple splits, and how to split them. """ def __init__(self, j_file_enumerator_provider): self._j_file_enumerator_provider = j_file_enumerator_provider @staticmethod def default_splittable_file_enumerator() -> 'FileEnumeratorProvider': """ The default file enumerator used for splittable formats. The enumerator recursively enumerates files, split files that consist of multiple distributed storage blocks into multiple splits, and filters hidden files (files starting with '.' or '_'). Files with suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...) will not be split. """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileEnumeratorProvider(JFileSource.DEFAULT_SPLITTABLE_FILE_ENUMERATOR) @staticmethod def default_non_splittable_file_enumerator() -> 'FileEnumeratorProvider': """ The default file enumerator used for non-splittable formats. The enumerator recursively enumerates files, creates one split for the file, and filters hidden files (files starting with '.' or '_'). """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileEnumeratorProvider(JFileSource.DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR)
[docs]class FileSplitAssignerProvider(object): """ Factory for FileSplitAssigner which is responsible for deciding what split should be processed next by which node. It determines split processing order and locality. """ def __init__(self, j_file_split_assigner): self._j_file_split_assigner = j_file_split_assigner @staticmethod def locality_aware_split_assigner() -> 'FileSplitAssignerProvider': """ A FileSplitAssigner that assigns to each host preferably splits that are local, before assigning splits that are not local. """ JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource return FileSplitAssignerProvider(JFileSource.DEFAULT_SPLIT_ASSIGNER)
[docs]class StreamFormat(object): """ A reader format that reads individual records from a stream. Compared to the :class:`~BulkFormat`, the stream format handles a few things out-of-the-box, like deciding how to batch records or dealing with compression. Internally in the file source, the readers pass batches of records from the reading threads (that perform the typically blocking I/O operations) to the async mailbox threads that do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time) much reduces the thread-to-thread handover overhead. This batching is by default based on I/O fetch size for the StreamFormat, meaning the set of records derived from one I/O buffer will be handed over as one. See config option `source.file.stream.io-fetch-size` to configure that fetch size. """ def __init__(self, j_stream_format): self._j_stream_format = j_stream_format @staticmethod def text_line_format(charset_name: str = "UTF-8") -> 'StreamFormat': """ Creates a reader format that text lines from a file. The reader uses Java's built-in java.io.InputStreamReader to decode the byte stream using various supported charset encodings. This format does not support optimized recovery from checkpoints. On recovery, it will re-read and discard the number of lined that were processed before the last checkpoint. That is due to the fact that the offsets of lines in the file cannot be tracked through the charset decoders with their internal buffering of stream input and charset decoder state. :param charset_name: The charset to decode the byte stream. """ j_stream_format = get_gateway().jvm.org.apache.flink.connector.file.src.reader. \ TextLineInputFormat(charset_name) return StreamFormat(j_stream_format)
[docs]class BulkFormat(object): """ The BulkFormat reads and decodes batches of records at a time. Examples of bulk formats are formats like ORC or Parquet. Internally in the file source, the readers pass batches of records from the reading threads (that perform the typically blocking I/O operations) to the async mailbox threads that do the streaming and batch data processing. Passing records in batches (rather than one-at-a-time) much reduce the thread-to-thread handover overhead. For the BulkFormat, one batch is handed over as one. .. versionadded:: 1.16.0 """ def __init__(self, j_bulk_format): self._j_bulk_format = j_bulk_format
[docs]class FileSourceBuilder(object): """ The builder for the :class:`~FileSource`, to configure the various behaviors. Start building the source via one of the following methods: - :func:`~FileSource.for_record_stream_format` """ def __init__(self, j_file_source_builder): self._j_file_source_builder = j_file_source_builder def monitor_continuously( self, discovery_interval: Duration) -> 'FileSourceBuilder': """ Sets this source to streaming ("continuous monitoring") mode. This makes the source a "continuous streaming" source that keeps running, monitoring for new files, and reads these files when they appear and are discovered by the monitoring. The interval in which the source checks for new files is the discovery_interval. Shorter intervals mean that files are discovered more quickly, but also imply more frequent listing or directory traversal of the file system / object store. """ self._j_file_source_builder.monitorContinuously(discovery_interval._j_duration) return self def process_static_file_set(self) -> 'FileSourceBuilder': """ Sets this source to bounded (batch) mode. In this mode, the source processes the files that are under the given paths when the application is started. Once all files are processed, the source will finish. This setting is also the default behavior. This method is mainly here to "switch back" to bounded (batch) mode, or to make it explicit in the source construction. """ self._j_file_source_builder.processStaticFileSet() return self def set_file_enumerator( self, file_enumerator: 'FileEnumeratorProvider') -> 'FileSourceBuilder': """ Configures the FileEnumerator for the source. The File Enumerator is responsible for selecting from the input path the set of files that should be processed (and which to filter out). Furthermore, the File Enumerator may split the files further into sub-regions, to enable parallelization beyond the number of files. """ self._j_file_source_builder.setFileEnumerator( file_enumerator._j_file_enumerator_provider) return self def set_split_assigner( self, split_assigner: 'FileSplitAssignerProvider') -> 'FileSourceBuilder': """ Configures the FileSplitAssigner for the source. The File Split Assigner determines which parallel reader instance gets which {@link FileSourceSplit}, and in which order these splits are assigned. """ self._j_file_source_builder.setSplitAssigner(split_assigner._j_file_split_assigner) return self def build(self) -> 'FileSource': """ Creates the file source with the settings applied to this builder. """ return FileSource(self._j_file_source_builder.build())
[docs]class FileSource(Source): """ A unified data source that reads files - both in batch and in streaming mode. This source supports all (distributed) file systems and object stores that can be accessed via the Flink's FileSystem class. Start building a file source via one of the following calls: - :func:`~FileSource.for_record_stream_format` This creates a :class:`~FileSource.FileSourceBuilder` on which you can configure all the properties of the file source. <h2>Batch and Streaming</h2> This source supports both bounded/batch and continuous/streaming data inputs. For the bounded/batch case, the file source processes all files under the given path(s). In the continuous/streaming case, the source periodically checks the paths for new files and will start reading those. When you start creating a file source (via the :class:`~FileSource.FileSourceBuilder` created through one of the above-mentioned methods) the source is by default in bounded/batch mode. Call :func:`~FileSource.FileSourceBuilder.monitor_continuously` to put the source into continuous streaming mode. <h2>Format Types</h2> The reading of each file happens through file readers defined by <i>file formats</i>. These define the parsing logic for the contents of the file. There are multiple classes that the source supports. Their interfaces trade of simplicity of implementation and flexibility/efficiency. - A :class:`~FileSource.StreamFormat` reads the contents of a file from a file stream. It is the simplest format to implement, and provides many features out-of-the-box (like checkpointing logic) but is limited in the optimizations it can apply (such as object reuse, batching, etc.). <h2>Discovering / Enumerating Files</h2> The way that the source lists the files to be processes is defined by the :class:`~FileSource.FileEnumeratorProvider`. The FileEnumeratorProvider is responsible to select the relevant files (for example filter out hidden files) and to optionally splits files into multiple regions (= file source splits) that can be read in parallel). """ def __init__(self, j_file_source): super(FileSource, self).__init__(source=j_file_source) @staticmethod def for_record_stream_format(stream_format: StreamFormat, *paths: str) -> FileSourceBuilder: """ Builds a new FileSource using a :class:`~FileSource.StreamFormat` to read record-by-record from a file stream. When possible, stream-based formats are generally easier (preferable) to file-based formats, because they support better default behavior around I/O batching or progress tracking (checkpoints). Stream formats also automatically de-compress files based on the file extension. This supports files ending in ".deflate" (Deflate), ".xz" (XZ), ".bz2" (BZip2), ".gz", ".gzip" (GZip). """ JPath = get_gateway().jvm.org.apache.flink.core.fs.Path JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource j_paths = to_jarray(JPath, [JPath(p) for p in paths]) return FileSourceBuilder( JFileSource.forRecordStreamFormat(stream_format._j_stream_format, j_paths)) @staticmethod def for_bulk_file_format(bulk_format: BulkFormat, *paths: str) -> FileSourceBuilder: JPath = get_gateway().jvm.org.apache.flink.core.fs.Path JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource j_paths = to_jarray(JPath, [JPath(p) for p in paths]) return FileSourceBuilder( JFileSource.forBulkFileFormat(bulk_format._j_bulk_format, j_paths))
# ---- FileSink ----
[docs]class BucketAssigner(JavaObjectWrapper): """ A BucketAssigner is used with a file sink to determine the bucket each incoming element should be put into. The StreamingFileSink can be writing to many buckets at a time, and it is responsible for managing a set of active buckets. Whenever a new element arrives it will ask the BucketAssigner for the bucket the element should fall in. The BucketAssigner can, for example, determine buckets based on system time. """ def __init__(self, j_bucket_assigner): super().__init__(j_bucket_assigner) @staticmethod def base_path_bucket_assigner() -> 'BucketAssigner': """ Creates a BucketAssigner that does not perform any bucketing of files. All files are written to the base path. """ return BucketAssigner(get_gateway().jvm.org.apache.flink.streaming.api.functions.sink. filesystem.bucketassigners.BasePathBucketAssigner()) @staticmethod def date_time_bucket_assigner(format_str: str = "yyyy-MM-dd--HH", timezone_id: str = None): """ Creates a BucketAssigner that assigns to buckets based on current system time. It will create directories of the following form: /{basePath}/{dateTimePath}/}. The basePath is the path that was specified as a base path when creating the new bucket. The dateTimePath is determined based on the current system time and the user provided format string. The Java DateTimeFormatter is used to derive a date string from the current system time and the date format string. The default format string is "yyyy-MM-dd--HH" so the rolling files will have a granularity of hours. :param format_str: The format string used to determine the bucket id. :param timezone_id: The timezone id, either an abbreviation such as "PST", a full name such as "America/Los_Angeles", or a custom timezone_id such as "GMT-08:00". Th e default time zone will b used if it's None. """ if timezone_id is not None and isinstance(timezone_id, str): j_timezone = get_gateway().jvm.java.time.ZoneId.of(timezone_id) else: j_timezone = get_gateway().jvm.java.time.ZoneId.systemDefault() return BucketAssigner( get_gateway().jvm.org.apache.flink.streaming.api.functions.sink. filesystem.bucketassigners.DateTimeBucketAssigner(format_str, j_timezone))
[docs]class RollingPolicy(JavaObjectWrapper): """ The policy based on which a Bucket in the FileSink rolls its currently open part file and opens a new one. """ def __init__(self, j_rolling_policy): super().__init__(j_rolling_policy) @staticmethod def default_rolling_policy( part_size: int = 1024 * 1024 * 128, rollover_interval: int = 60 * 1000, inactivity_interval: int = 60 * 1000) -> 'DefaultRollingPolicy': """ Returns the default implementation of the RollingPolicy. This policy rolls a part file if: - there is no open part file, - the current file has reached the maximum bucket size (by default 128MB), - the current file is older than the roll over interval (by default 60 sec), or - the current file has not been written to for more than the allowed inactivityTime (by default 60 sec). :param part_size: The maximum part file size before rolling. :param rollover_interval: The maximum time duration a part file can stay open before rolling. :param inactivity_interval: The time duration of allowed inactivity after which a part file will have to roll. """ JDefaultRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions.\ sink.filesystem.rollingpolicies.DefaultRollingPolicy j_rolling_policy = JDefaultRollingPolicy.builder()\ .withMaxPartSize(part_size) \ .withRolloverInterval(rollover_interval) \ .withInactivityInterval(inactivity_interval) \ .build() return DefaultRollingPolicy(j_rolling_policy) @staticmethod def on_checkpoint_rolling_policy() -> 'OnCheckpointRollingPolicy': """ Returns a RollingPolicy which rolls (ONLY) on every checkpoint. """ JOnCheckpointRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions. \ sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy return OnCheckpointRollingPolicy(JOnCheckpointRollingPolicy.build())
[docs]class DefaultRollingPolicy(RollingPolicy): """ The default implementation of the RollingPolicy. This policy rolls a part file if: - there is no open part file, - the current file has reached the maximum bucket size (by default 128MB), - the current file is older than the roll over interval (by default 60 sec), or - the current file has not been written to for more than the allowed inactivityTime (by default 60 sec). """ def __init__(self, j_rolling_policy): super().__init__(j_rolling_policy)
[docs]class OnCheckpointRollingPolicy(RollingPolicy): """ A RollingPolicy which rolls (ONLY) on every checkpoint. """ def __init__(self, j_rolling_policy): super().__init__(j_rolling_policy)
[docs]class OutputFileConfig(JavaObjectWrapper): """ Part file name configuration. This allow to define a prefix and a suffix to the part file name. """ @staticmethod def builder(): return OutputFileConfig.OutputFileConfigBuilder() def __init__(self, part_prefix: str, part_suffix: str): filesystem = get_gateway().jvm.org.apache.flink.streaming.api.functions.sink.filesystem self._j_output_file_config = filesystem.OutputFileConfig(part_prefix, part_suffix) super().__init__(self._j_output_file_config) def get_part_prefix(self) -> str: """ The prefix for the part name. """ return self._j_output_file_config.getPartPrefix() def get_part_suffix(self) -> str: """ The suffix for the part name. """ return self._j_output_file_config.getPartSuffix() class OutputFileConfigBuilder(object): """ A builder to create the part file configuration. """ def __init__(self): self.part_prefix = "part" self.part_suffix = "" def with_part_prefix(self, prefix) -> 'OutputFileConfig.OutputFileConfigBuilder': self.part_prefix = prefix return self def with_part_suffix(self, suffix) -> 'OutputFileConfig.OutputFileConfigBuilder': self.part_suffix = suffix return self def build(self) -> 'OutputFileConfig': return OutputFileConfig(self.part_prefix, self.part_suffix)
[docs]class FileCompactStrategy(JavaObjectWrapper): """ Strategy for compacting the files written in {@link FileSink} before committing. .. versionadded:: 1.16.0 """ def __init__(self, j_file_compact_strategy): super().__init__(j_file_compact_strategy) @staticmethod def builder() -> 'FileCompactStrategy.Builder': return FileCompactStrategy.Builder() class Builder(object): def __init__(self): JFileCompactStrategy = get_gateway().jvm.org.apache.flink.connector.file.sink.\ compactor.FileCompactStrategy self._j_builder = JFileCompactStrategy.Builder.newBuilder() def build(self) -> 'FileCompactStrategy': return FileCompactStrategy(self._j_builder.build()) def enable_compaction_on_checkpoint(self, num_checkpoints_before_compaction: int) \ -> 'FileCompactStrategy.Builder': """ Optional, compaction will be triggered when N checkpoints passed since the last triggering, -1 by default indicating no compaction on checkpoint. """ self._j_builder.enableCompactionOnCheckpoint(num_checkpoints_before_compaction) return self def set_size_threshold(self, size_threshold: int) -> 'FileCompactStrategy.Builder': """ Optional, compaction will be triggered when the total size of compacting files reaches the threshold. -1 by default, indicating the size is unlimited. """ self._j_builder.setSizeThreshold(size_threshold) return self def set_num_compact_threads(self, num_compact_threads: int) \ -> 'FileCompactStrategy.Builder': """ Optional, the count of compacting threads in a compactor operator, 1 by default. """ self._j_builder.setNumCompactThreads(num_compact_threads) return self
[docs]class FileCompactor(JavaObjectWrapper): """ The FileCompactor is responsible for compacting files into one file. .. versionadded:: 1.16.0 """ def __init__(self, j_file_compactor): super().__init__(j_file_compactor) @staticmethod def concat_file_compactor(file_delimiter: bytes = None): """ Returns a file compactor that simply concat the compacting files. The file_delimiter will be added between neighbouring files if provided. """ JConcatFileCompactor = get_gateway().jvm.org.apache.flink.connector.file.sink.compactor.\ ConcatFileCompactor if file_delimiter: return FileCompactor(JConcatFileCompactor(file_delimiter)) else: return FileCompactor(JConcatFileCompactor()) @staticmethod def identical_file_compactor(): """ Returns a file compactor that directly copy the content of the only input file to the output. """ JIdenticalFileCompactor = get_gateway().jvm.org.apache.flink.connector.file.sink.compactor.\ IdenticalFileCompactor return FileCompactor(JIdenticalFileCompactor())
[docs]class FileSink(Sink, SupportsPreprocessing): """ A unified sink that emits its input elements to FileSystem files within buckets. This sink achieves exactly-once semantics for both BATCH and STREAMING. When creating the sink a basePath must be specified. The base directory contains one directory for every bucket. The bucket directories themselves contain several part files, with at least one for each parallel subtask of the sink which is writing data to that bucket. These part files contain the actual output data. The sink uses a BucketAssigner to determine in which bucket directory each element should be written to inside the base directory. The BucketAssigner can, for example, roll on every checkpoint or use time or a property of the element to determine the bucket directory. The default BucketAssigner is a DateTimeBucketAssigner which will create one new bucket every hour. You can specify a custom BucketAssigner using the :func:`~FileSink.RowFormatBuilder.with_bucket_assigner`, after calling :class:`~FileSink.for_row_format`. The names of the part files could be defined using OutputFileConfig. This configuration contains a part prefix and a part suffix that will be used with a random uid assigned to each subtask of the sink and a rolling counter to determine the file names. For example with a prefix "prefix" and a suffix ".ext", a file named {@code "prefix-81fc4980-a6af-41c8-9937-9939408a734b-17.ext"} contains the data from subtask with uid {@code 81fc4980-a6af-41c8-9937-9939408a734b} of the sink and is the {@code 17th} part-file created by that subtask. Part files roll based on the user-specified RollingPolicy. By default, a DefaultRollingPolicy is used for row-encoded sink output; a OnCheckpointRollingPolicy is used for bulk-encoded sink output. In some scenarios, the open buckets are required to change based on time. In these cases, the user can specify a bucket_check_interval (by default 1m) and the sink will check periodically and roll the part file if the specified rolling policy says so. Part files can be in one of three states: in-progress, pending or finished. The reason for this is how the sink works to provide exactly-once semantics and fault-tolerance. The part file that is currently being written to is in-progress. Once a part file is closed for writing it becomes pending. When a checkpoint is successful (for STREAMING) or at the end of the job (for BATCH) the currently pending files will be moved to finished. For STREAMING in order to guarantee exactly-once semantics in case of a failure, the sink should roll back to the state it had when that last successful checkpoint occurred. To this end, when restoring, the restored files in pending state are transferred into the finished state while any in-progress files are rolled back, so that they do not contain data that arrived after the checkpoint from which we restore. """ def __init__(self, j_file_sink, transformer: Optional[StreamTransformer] = None): super(FileSink, self).__init__(sink=j_file_sink) self._transformer = transformer def get_transformer(self) -> Optional[StreamTransformer]: return self._transformer class BaseBuilder(object): def __init__(self, j_builder): self._j_builder = j_builder def with_bucket_check_interval(self, interval: int): """ :param interval: The check interval in milliseconds. """ self._j_builder.withBucketCheckInterval(interval) return self def with_bucket_assigner(self, bucket_assigner: BucketAssigner): self._j_builder.withBucketAssigner(bucket_assigner.get_java_object()) return self def with_output_file_config(self, output_file_config: OutputFileConfig): self._j_builder.withOutputFileConfig(output_file_config.get_java_object()) return self def enable_compact(self, strategy: FileCompactStrategy, compactor: FileCompactor): self._j_builder.enableCompact(strategy.get_java_object(), compactor.get_java_object()) return self def disable_compact(self): self._j_builder.disableCompact() return self @abstractmethod def with_rolling_policy(self, rolling_policy): pass def build(self): return FileSink(self._j_builder.build()) class RowFormatBuilder(BaseBuilder): """ Builder for the vanilla FileSink using a row format. .. versionchanged:: 1.16.0 Support compaction. """ def __init__(self, j_row_format_builder): super().__init__(j_row_format_builder) def with_rolling_policy(self, rolling_policy: RollingPolicy): self._j_builder.withRollingPolicy(rolling_policy.get_java_object()) return self @staticmethod def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuilder': JPath = get_gateway().jvm.org.apache.flink.core.fs.Path JFileSink = get_gateway().jvm.org.apache.flink.connector.file.sink.FileSink return FileSink.RowFormatBuilder( JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder)) class BulkFormatBuilder(BaseBuilder): """ Builder for the vanilla FileSink using a bulk format. .. versionadded:: 1.16.0 """ def __init__(self, j_bulk_format_builder): super().__init__(j_bulk_format_builder) self._transformer = None def with_rolling_policy(self, rolling_policy: OnCheckpointRollingPolicy): if not isinstance(rolling_policy, OnCheckpointRollingPolicy): raise ValueError('rolling_policy must be OnCheckpointRollingPolicy for bulk format') return self def _with_row_type(self, row_type: 'RowType') -> 'FileSink.BulkFormatBuilder': from pyflink.datastream.data_stream import DataStream from pyflink.table.types import _to_java_data_type def _check_if_row_data_type(ds) -> bool: j_type_info = ds._j_data_stream.getType() if not is_instance_of( j_type_info, 'org.apache.flink.table.runtime.typeutils.InternalTypeInfo' ): return False return is_instance_of( j_type_info.toLogicalType(), 'org.apache.flink.table.types.logical.RowType' ) class RowRowTransformer(StreamTransformer): def apply(self, ds): jvm = get_gateway().jvm if _check_if_row_data_type(ds): return ds j_map_function = jvm.org.apache.flink.python.util.PythonConnectorUtils \ .RowRowMapper(_to_java_data_type(row_type)) return DataStream(ds._j_data_stream.process(j_map_function)) self._transformer = RowRowTransformer() return self def build(self) -> 'FileSink': return FileSink(self._j_builder.build(), self._transformer) @staticmethod def for_bulk_format(base_path: str, writer_factory: BulkWriterFactory) \ -> 'FileSink.BulkFormatBuilder': jvm = get_gateway().jvm j_path = jvm.org.apache.flink.core.fs.Path(base_path) JFileSink = jvm.org.apache.flink.connector.file.sink.FileSink builder = FileSink.BulkFormatBuilder( JFileSink.forBulkFormat(j_path, writer_factory.get_java_object()) ) if isinstance(writer_factory, RowDataBulkWriterFactory): return builder._with_row_type(writer_factory.get_row_type()) else: return builder
# ---- StreamingFileSink ----
[docs]class StreamingFileSink(SinkFunction): """ Sink that emits its input elements to `FileSystem` files within buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics. When creating the sink a `basePath` must be specified. The base directory contains one directory for every bucket. The bucket directories themselves contain several part files, with at least one for each parallel subtask of the sink which is writing data to that bucket. These part files contain the actual output data. """ def __init__(self, j_obj): warnings.warn("Deprecated in 1.15. Use FileSink instead.", DeprecationWarning) super(StreamingFileSink, self).__init__(j_obj) class BaseBuilder(object): def __init__(self, j_builder): self._j_builder = j_builder def with_bucket_check_interval(self, interval: int): self._j_builder.withBucketCheckInterval(interval) return self def with_bucket_assigner(self, bucket_assigner: BucketAssigner): self._j_builder.withBucketAssigner(bucket_assigner.get_java_object()) return self @abstractmethod def with_rolling_policy(self, policy): pass def with_output_file_config(self, output_file_config: OutputFileConfig): self._j_builder.withOutputFileConfig(output_file_config.get_java_object()) return self def build(self) -> 'StreamingFileSink': j_stream_file_sink = self._j_builder.build() return StreamingFileSink(j_stream_file_sink) class DefaultRowFormatBuilder(BaseBuilder): """ Builder for the vanilla `StreamingFileSink` using a row format. """ def __init__(self, j_default_row_format_builder): super().__init__(j_default_row_format_builder) def with_rolling_policy(self, policy: RollingPolicy): self._j_builder.withRollingPolicy(policy.get_java_object()) return self @staticmethod def for_row_format(base_path: str, encoder: Encoder) -> 'DefaultRowFormatBuilder': j_path = get_gateway().jvm.org.apache.flink.core.fs.Path(base_path) j_default_row_format_builder = get_gateway().jvm.org.apache.flink.streaming.api.\ functions.sink.filesystem.StreamingFileSink.forRowFormat(j_path, encoder._j_encoder) return StreamingFileSink.DefaultRowFormatBuilder(j_default_row_format_builder) class DefaultBulkFormatBuilder(BaseBuilder): def __init__(self, j_default_bulk_format_builder): super().__init__(j_default_bulk_format_builder) def with_rolling_policy(self, policy: OnCheckpointRollingPolicy): self._j_builder.withRollingPolicy(policy.get_java_object()) return self @staticmethod def for_bulk_format(base_path: str, writer_factory: BulkWriterFactory): jvm = get_gateway().jvm j_path = jvm.org.apache.flink.core.fs.Path(base_path) j_default_bulk_format_builder = jvm.org.apache.flink.streaming.api.functions.sink \ .filesystem.StreamingFileSink.forBulkFormat(j_path, writer_factory.get_java_object()) return StreamingFileSink.DefaultBulkFormatBuilder(j_default_bulk_format_builder)