################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import warnings
from abc import abstractmethod
from typing import TYPE_CHECKING, Optional
from pyflink.common.serialization import BulkWriterFactory, RowDataBulkWriterFactory
if TYPE_CHECKING:
from pyflink.table.types import RowType
from pyflink.common import Duration, Encoder
from pyflink.datastream.connectors import Source, Sink
from pyflink.datastream.connectors.base import SupportsPreprocessing, StreamTransformer
from pyflink.datastream.functions import SinkFunction
from pyflink.common.utils import JavaObjectWrapper
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray, is_instance_of
__all__ = [
'FileCompactor',
'FileCompactStrategy',
'OutputFileConfig',
'FileSource',
'FileSourceBuilder',
'FileSink',
'StreamingFileSink',
'StreamFormat',
'BulkFormat',
'FileEnumeratorProvider',
'FileSplitAssignerProvider',
'RollingPolicy',
'BucketAssigner'
]
# ---- FileSource ----
[docs]class FileEnumeratorProvider(object):
"""
Factory for FileEnumerator which task is to discover all files to be read and to split them
into a set of file source splits. This includes possibly, path traversals, file filtering
(by name or other patterns) and deciding whether to split files into multiple splits, and
how to split them.
"""
def __init__(self, j_file_enumerator_provider):
self._j_file_enumerator_provider = j_file_enumerator_provider
@staticmethod
def default_splittable_file_enumerator() -> 'FileEnumeratorProvider':
"""
The default file enumerator used for splittable formats. The enumerator recursively
enumerates files, split files that consist of multiple distributed storage blocks into
multiple splits, and filters hidden files (files starting with '.' or '_'). Files with
suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...)
will not be split.
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileEnumeratorProvider(JFileSource.DEFAULT_SPLITTABLE_FILE_ENUMERATOR)
@staticmethod
def default_non_splittable_file_enumerator() -> 'FileEnumeratorProvider':
"""
The default file enumerator used for non-splittable formats. The enumerator recursively
enumerates files, creates one split for the file, and filters hidden files
(files starting with '.' or '_').
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileEnumeratorProvider(JFileSource.DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR)
[docs]class FileSplitAssignerProvider(object):
"""
Factory for FileSplitAssigner which is responsible for deciding what split should be
processed next by which node. It determines split processing order and locality.
"""
def __init__(self, j_file_split_assigner):
self._j_file_split_assigner = j_file_split_assigner
@staticmethod
def locality_aware_split_assigner() -> 'FileSplitAssignerProvider':
"""
A FileSplitAssigner that assigns to each host preferably splits that are local, before
assigning splits that are not local.
"""
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
return FileSplitAssignerProvider(JFileSource.DEFAULT_SPLIT_ASSIGNER)
[docs]class FileSourceBuilder(object):
"""
The builder for the :class:`~FileSource`, to configure the various behaviors.
Start building the source via one of the following methods:
- :func:`~FileSource.for_record_stream_format`
"""
def __init__(self, j_file_source_builder):
self._j_file_source_builder = j_file_source_builder
def monitor_continuously(
self,
discovery_interval: Duration) -> 'FileSourceBuilder':
"""
Sets this source to streaming ("continuous monitoring") mode.
This makes the source a "continuous streaming" source that keeps running, monitoring
for new files, and reads these files when they appear and are discovered by the
monitoring.
The interval in which the source checks for new files is the discovery_interval. Shorter
intervals mean that files are discovered more quickly, but also imply more frequent
listing or directory traversal of the file system / object store.
"""
self._j_file_source_builder.monitorContinuously(discovery_interval._j_duration)
return self
def process_static_file_set(self) -> 'FileSourceBuilder':
"""
Sets this source to bounded (batch) mode.
In this mode, the source processes the files that are under the given paths when the
application is started. Once all files are processed, the source will finish.
This setting is also the default behavior. This method is mainly here to "switch back"
to bounded (batch) mode, or to make it explicit in the source construction.
"""
self._j_file_source_builder.processStaticFileSet()
return self
def set_file_enumerator(
self,
file_enumerator: 'FileEnumeratorProvider') -> 'FileSourceBuilder':
"""
Configures the FileEnumerator for the source. The File Enumerator is responsible
for selecting from the input path the set of files that should be processed (and which
to filter out). Furthermore, the File Enumerator may split the files further into
sub-regions, to enable parallelization beyond the number of files.
"""
self._j_file_source_builder.setFileEnumerator(
file_enumerator._j_file_enumerator_provider)
return self
def set_split_assigner(
self,
split_assigner: 'FileSplitAssignerProvider') -> 'FileSourceBuilder':
"""
Configures the FileSplitAssigner for the source. The File Split Assigner
determines which parallel reader instance gets which {@link FileSourceSplit}, and in
which order these splits are assigned.
"""
self._j_file_source_builder.setSplitAssigner(split_assigner._j_file_split_assigner)
return self
def build(self) -> 'FileSource':
"""
Creates the file source with the settings applied to this builder.
"""
return FileSource(self._j_file_source_builder.build())
[docs]class FileSource(Source):
"""
A unified data source that reads files - both in batch and in streaming mode.
This source supports all (distributed) file systems and object stores that can be accessed via
the Flink's FileSystem class.
Start building a file source via one of the following calls:
- :func:`~FileSource.for_record_stream_format`
This creates a :class:`~FileSource.FileSourceBuilder` on which you can configure all the
properties of the file source.
<h2>Batch and Streaming</h2>
This source supports both bounded/batch and continuous/streaming data inputs. For the
bounded/batch case, the file source processes all files under the given path(s). In the
continuous/streaming case, the source periodically checks the paths for new files and will start
reading those.
When you start creating a file source (via the
:class:`~FileSource.FileSourceBuilder` created through one of the above-mentioned methods)
the source is by default in bounded/batch mode. Call
:func:`~FileSource.FileSourceBuilder.monitor_continuously` to put the source into continuous
streaming mode.
<h2>Format Types</h2>
The reading of each file happens through file readers defined by <i>file formats</i>. These
define the parsing logic for the contents of the file. There are multiple classes that the
source supports. Their interfaces trade of simplicity of implementation and
flexibility/efficiency.
- A :class:`~FileSource.StreamFormat` reads the contents of a file from a file stream.
It is the simplest format to implement, and provides many features out-of-the-box
(like checkpointing logic) but is limited in the optimizations it
can apply (such as object reuse, batching, etc.).
<h2>Discovering / Enumerating Files</h2>
The way that the source lists the files to be processes is defined by the
:class:`~FileSource.FileEnumeratorProvider`. The FileEnumeratorProvider is responsible to
select the relevant files (for example filter out hidden files) and to optionally splits files
into multiple regions (= file source splits) that can be read in parallel).
"""
def __init__(self, j_file_source):
super(FileSource, self).__init__(source=j_file_source)
@staticmethod
def for_record_stream_format(stream_format: StreamFormat, *paths: str) -> FileSourceBuilder:
"""
Builds a new FileSource using a :class:`~FileSource.StreamFormat` to read record-by-record
from a file stream.
When possible, stream-based formats are generally easier (preferable) to file-based
formats, because they support better default behavior around I/O batching or progress
tracking (checkpoints).
Stream formats also automatically de-compress files based on the file extension. This
supports files ending in ".deflate" (Deflate), ".xz" (XZ), ".bz2" (BZip2), ".gz", ".gzip"
(GZip).
"""
JPath = get_gateway().jvm.org.apache.flink.core.fs.Path
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
j_paths = to_jarray(JPath, [JPath(p) for p in paths])
return FileSourceBuilder(
JFileSource.forRecordStreamFormat(stream_format._j_stream_format, j_paths))
@staticmethod
def for_bulk_file_format(bulk_format: BulkFormat, *paths: str) -> FileSourceBuilder:
JPath = get_gateway().jvm.org.apache.flink.core.fs.Path
JFileSource = get_gateway().jvm.org.apache.flink.connector.file.src.FileSource
j_paths = to_jarray(JPath, [JPath(p) for p in paths])
return FileSourceBuilder(
JFileSource.forBulkFileFormat(bulk_format._j_bulk_format, j_paths))
# ---- FileSink ----
[docs]class BucketAssigner(JavaObjectWrapper):
"""
A BucketAssigner is used with a file sink to determine the bucket each incoming element should
be put into.
The StreamingFileSink can be writing to many buckets at a time, and it is responsible
for managing a set of active buckets. Whenever a new element arrives it will ask the
BucketAssigner for the bucket the element should fall in. The BucketAssigner can, for
example, determine buckets based on system time.
"""
def __init__(self, j_bucket_assigner):
super().__init__(j_bucket_assigner)
@staticmethod
def base_path_bucket_assigner() -> 'BucketAssigner':
"""
Creates a BucketAssigner that does not perform any bucketing of files. All files are
written to the base path.
"""
return BucketAssigner(get_gateway().jvm.org.apache.flink.streaming.api.functions.sink.
filesystem.bucketassigners.BasePathBucketAssigner())
@staticmethod
def date_time_bucket_assigner(format_str: str = "yyyy-MM-dd--HH", timezone_id: str = None):
"""
Creates a BucketAssigner that assigns to buckets based on current system time.
It will create directories of the following form: /{basePath}/{dateTimePath}/}.
The basePath is the path that was specified as a base path when creating the new bucket.
The dateTimePath is determined based on the current system time and the user provided format
string.
The Java DateTimeFormatter is used to derive a date string from the current system time and
the date format string. The default format string is "yyyy-MM-dd--HH" so the rolling files
will have a granularity of hours.
:param format_str: The format string used to determine the bucket id.
:param timezone_id: The timezone id, either an abbreviation such as "PST", a full name
such as "America/Los_Angeles", or a custom timezone_id such as
"GMT-08:00". Th e default time zone will b used if it's None.
"""
if timezone_id is not None and isinstance(timezone_id, str):
j_timezone = get_gateway().jvm.java.time.ZoneId.of(timezone_id)
else:
j_timezone = get_gateway().jvm.java.time.ZoneId.systemDefault()
return BucketAssigner(
get_gateway().jvm.org.apache.flink.streaming.api.functions.sink.
filesystem.bucketassigners.DateTimeBucketAssigner(format_str, j_timezone))
[docs]class RollingPolicy(JavaObjectWrapper):
"""
The policy based on which a Bucket in the FileSink rolls its currently
open part file and opens a new one.
"""
def __init__(self, j_rolling_policy):
super().__init__(j_rolling_policy)
@staticmethod
def default_rolling_policy(
part_size: int = 1024 * 1024 * 128,
rollover_interval: int = 60 * 1000,
inactivity_interval: int = 60 * 1000) -> 'DefaultRollingPolicy':
"""
Returns the default implementation of the RollingPolicy.
This policy rolls a part file if:
- there is no open part file,
- the current file has reached the maximum bucket size (by default 128MB),
- the current file is older than the roll over interval (by default 60 sec), or
- the current file has not been written to for more than the allowed inactivityTime (by
default 60 sec).
:param part_size: The maximum part file size before rolling.
:param rollover_interval: The maximum time duration a part file can stay open before
rolling.
:param inactivity_interval: The time duration of allowed inactivity after which a part file
will have to roll.
"""
JDefaultRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions.\
sink.filesystem.rollingpolicies.DefaultRollingPolicy
j_rolling_policy = JDefaultRollingPolicy.builder()\
.withMaxPartSize(part_size) \
.withRolloverInterval(rollover_interval) \
.withInactivityInterval(inactivity_interval) \
.build()
return DefaultRollingPolicy(j_rolling_policy)
@staticmethod
def on_checkpoint_rolling_policy() -> 'OnCheckpointRollingPolicy':
"""
Returns a RollingPolicy which rolls (ONLY) on every checkpoint.
"""
JOnCheckpointRollingPolicy = get_gateway().jvm.org.apache.flink.streaming.api.functions. \
sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy
return OnCheckpointRollingPolicy(JOnCheckpointRollingPolicy.build())
[docs]class DefaultRollingPolicy(RollingPolicy):
"""
The default implementation of the RollingPolicy.
This policy rolls a part file if:
- there is no open part file,
- the current file has reached the maximum bucket size (by default 128MB),
- the current file is older than the roll over interval (by default 60 sec), or
- the current file has not been written to for more than the allowed inactivityTime (by
default 60 sec).
"""
def __init__(self, j_rolling_policy):
super().__init__(j_rolling_policy)
[docs]class OnCheckpointRollingPolicy(RollingPolicy):
"""
A RollingPolicy which rolls (ONLY) on every checkpoint.
"""
def __init__(self, j_rolling_policy):
super().__init__(j_rolling_policy)
[docs]class OutputFileConfig(JavaObjectWrapper):
"""
Part file name configuration.
This allow to define a prefix and a suffix to the part file name.
"""
@staticmethod
def builder():
return OutputFileConfig.OutputFileConfigBuilder()
def __init__(self, part_prefix: str, part_suffix: str):
filesystem = get_gateway().jvm.org.apache.flink.streaming.api.functions.sink.filesystem
self._j_output_file_config = filesystem.OutputFileConfig(part_prefix, part_suffix)
super().__init__(self._j_output_file_config)
def get_part_prefix(self) -> str:
"""
The prefix for the part name.
"""
return self._j_output_file_config.getPartPrefix()
def get_part_suffix(self) -> str:
"""
The suffix for the part name.
"""
return self._j_output_file_config.getPartSuffix()
class OutputFileConfigBuilder(object):
"""
A builder to create the part file configuration.
"""
def __init__(self):
self.part_prefix = "part"
self.part_suffix = ""
def with_part_prefix(self, prefix) -> 'OutputFileConfig.OutputFileConfigBuilder':
self.part_prefix = prefix
return self
def with_part_suffix(self, suffix) -> 'OutputFileConfig.OutputFileConfigBuilder':
self.part_suffix = suffix
return self
def build(self) -> 'OutputFileConfig':
return OutputFileConfig(self.part_prefix, self.part_suffix)
[docs]class FileCompactStrategy(JavaObjectWrapper):
"""
Strategy for compacting the files written in {@link FileSink} before committing.
.. versionadded:: 1.16.0
"""
def __init__(self, j_file_compact_strategy):
super().__init__(j_file_compact_strategy)
@staticmethod
def builder() -> 'FileCompactStrategy.Builder':
return FileCompactStrategy.Builder()
class Builder(object):
def __init__(self):
JFileCompactStrategy = get_gateway().jvm.org.apache.flink.connector.file.sink.\
compactor.FileCompactStrategy
self._j_builder = JFileCompactStrategy.Builder.newBuilder()
def build(self) -> 'FileCompactStrategy':
return FileCompactStrategy(self._j_builder.build())
def enable_compaction_on_checkpoint(self, num_checkpoints_before_compaction: int) \
-> 'FileCompactStrategy.Builder':
"""
Optional, compaction will be triggered when N checkpoints passed since the last
triggering, -1 by default indicating no compaction on checkpoint.
"""
self._j_builder.enableCompactionOnCheckpoint(num_checkpoints_before_compaction)
return self
def set_size_threshold(self, size_threshold: int) -> 'FileCompactStrategy.Builder':
"""
Optional, compaction will be triggered when the total size of compacting files reaches
the threshold. -1 by default, indicating the size is unlimited.
"""
self._j_builder.setSizeThreshold(size_threshold)
return self
def set_num_compact_threads(self, num_compact_threads: int) \
-> 'FileCompactStrategy.Builder':
"""
Optional, the count of compacting threads in a compactor operator, 1 by default.
"""
self._j_builder.setNumCompactThreads(num_compact_threads)
return self
[docs]class FileCompactor(JavaObjectWrapper):
"""
The FileCompactor is responsible for compacting files into one file.
.. versionadded:: 1.16.0
"""
def __init__(self, j_file_compactor):
super().__init__(j_file_compactor)
@staticmethod
def concat_file_compactor(file_delimiter: bytes = None):
"""
Returns a file compactor that simply concat the compacting files. The file_delimiter will be
added between neighbouring files if provided.
"""
JConcatFileCompactor = get_gateway().jvm.org.apache.flink.connector.file.sink.compactor.\
ConcatFileCompactor
if file_delimiter:
return FileCompactor(JConcatFileCompactor(file_delimiter))
else:
return FileCompactor(JConcatFileCompactor())
@staticmethod
def identical_file_compactor():
"""
Returns a file compactor that directly copy the content of the only input file to the
output.
"""
JIdenticalFileCompactor = get_gateway().jvm.org.apache.flink.connector.file.sink.compactor.\
IdenticalFileCompactor
return FileCompactor(JIdenticalFileCompactor())
[docs]class FileSink(Sink, SupportsPreprocessing):
"""
A unified sink that emits its input elements to FileSystem files within buckets. This
sink achieves exactly-once semantics for both BATCH and STREAMING.
When creating the sink a basePath must be specified. The base directory contains one
directory for every bucket. The bucket directories themselves contain several part files, with
at least one for each parallel subtask of the sink which is writing data to that bucket.
These part files contain the actual output data.
The sink uses a BucketAssigner to determine in which bucket directory each element
should be written to inside the base directory. The BucketAssigner can, for example, roll
on every checkpoint or use time or a property of the element to determine the bucket directory.
The default BucketAssigner is a DateTimeBucketAssigner which will create one new
bucket every hour. You can specify a custom BucketAssigner using the
:func:`~FileSink.RowFormatBuilder.with_bucket_assigner`, after calling
:class:`~FileSink.for_row_format`.
The names of the part files could be defined using OutputFileConfig. This
configuration contains a part prefix and a part suffix that will be used with a random uid
assigned to each subtask of the sink and a rolling counter to determine the file names. For
example with a prefix "prefix" and a suffix ".ext", a file named {@code
"prefix-81fc4980-a6af-41c8-9937-9939408a734b-17.ext"} contains the data from subtask with uid
{@code 81fc4980-a6af-41c8-9937-9939408a734b} of the sink and is the {@code 17th} part-file
created by that subtask.
Part files roll based on the user-specified RollingPolicy. By default, a DefaultRollingPolicy
is used for row-encoded sink output; a OnCheckpointRollingPolicy is
used for bulk-encoded sink output.
In some scenarios, the open buckets are required to change based on time. In these cases, the
user can specify a bucket_check_interval (by default 1m) and the sink will check
periodically and roll the part file if the specified rolling policy says so.
Part files can be in one of three states: in-progress, pending or finished. The reason for this
is how the sink works to provide exactly-once semantics and fault-tolerance. The part file that
is currently being written to is in-progress. Once a part file is closed for writing it becomes
pending. When a checkpoint is successful (for STREAMING) or at the end of the job (for BATCH)
the currently pending files will be moved to finished.
For STREAMING in order to guarantee exactly-once semantics in case of a failure, the
sink should roll back to the state it had when that last successful checkpoint occurred. To this
end, when restoring, the restored files in pending state are transferred into the finished state
while any in-progress files are rolled back, so that they do not contain data that arrived after
the checkpoint from which we restore.
"""
def __init__(self, j_file_sink, transformer: Optional[StreamTransformer] = None):
super(FileSink, self).__init__(sink=j_file_sink)
self._transformer = transformer
def get_transformer(self) -> Optional[StreamTransformer]:
return self._transformer
class BaseBuilder(object):
def __init__(self, j_builder):
self._j_builder = j_builder
def with_bucket_check_interval(self, interval: int):
"""
:param interval: The check interval in milliseconds.
"""
self._j_builder.withBucketCheckInterval(interval)
return self
def with_bucket_assigner(self, bucket_assigner: BucketAssigner):
self._j_builder.withBucketAssigner(bucket_assigner.get_java_object())
return self
def with_output_file_config(self, output_file_config: OutputFileConfig):
self._j_builder.withOutputFileConfig(output_file_config.get_java_object())
return self
def enable_compact(self, strategy: FileCompactStrategy, compactor: FileCompactor):
self._j_builder.enableCompact(strategy.get_java_object(), compactor.get_java_object())
return self
def disable_compact(self):
self._j_builder.disableCompact()
return self
@abstractmethod
def with_rolling_policy(self, rolling_policy):
pass
def build(self):
return FileSink(self._j_builder.build())
class RowFormatBuilder(BaseBuilder):
"""
Builder for the vanilla FileSink using a row format.
.. versionchanged:: 1.16.0
Support compaction.
"""
def __init__(self, j_row_format_builder):
super().__init__(j_row_format_builder)
def with_rolling_policy(self, rolling_policy: RollingPolicy):
self._j_builder.withRollingPolicy(rolling_policy.get_java_object())
return self
@staticmethod
def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuilder':
JPath = get_gateway().jvm.org.apache.flink.core.fs.Path
JFileSink = get_gateway().jvm.org.apache.flink.connector.file.sink.FileSink
return FileSink.RowFormatBuilder(
JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
class BulkFormatBuilder(BaseBuilder):
"""
Builder for the vanilla FileSink using a bulk format.
.. versionadded:: 1.16.0
"""
def __init__(self, j_bulk_format_builder):
super().__init__(j_bulk_format_builder)
self._transformer = None
def with_rolling_policy(self, rolling_policy: OnCheckpointRollingPolicy):
if not isinstance(rolling_policy, OnCheckpointRollingPolicy):
raise ValueError('rolling_policy must be OnCheckpointRollingPolicy for bulk format')
return self
def _with_row_type(self, row_type: 'RowType') -> 'FileSink.BulkFormatBuilder':
from pyflink.datastream.data_stream import DataStream
from pyflink.table.types import _to_java_data_type
def _check_if_row_data_type(ds) -> bool:
j_type_info = ds._j_data_stream.getType()
if not is_instance_of(
j_type_info,
'org.apache.flink.table.runtime.typeutils.InternalTypeInfo'
):
return False
return is_instance_of(
j_type_info.toLogicalType(),
'org.apache.flink.table.types.logical.RowType'
)
class RowRowTransformer(StreamTransformer):
def apply(self, ds):
jvm = get_gateway().jvm
if _check_if_row_data_type(ds):
return ds
j_map_function = jvm.org.apache.flink.python.util.PythonConnectorUtils \
.RowRowMapper(_to_java_data_type(row_type))
return DataStream(ds._j_data_stream.process(j_map_function))
self._transformer = RowRowTransformer()
return self
def build(self) -> 'FileSink':
return FileSink(self._j_builder.build(), self._transformer)
@staticmethod
def for_bulk_format(base_path: str, writer_factory: BulkWriterFactory) \
-> 'FileSink.BulkFormatBuilder':
jvm = get_gateway().jvm
j_path = jvm.org.apache.flink.core.fs.Path(base_path)
JFileSink = jvm.org.apache.flink.connector.file.sink.FileSink
builder = FileSink.BulkFormatBuilder(
JFileSink.forBulkFormat(j_path, writer_factory.get_java_object())
)
if isinstance(writer_factory, RowDataBulkWriterFactory):
return builder._with_row_type(writer_factory.get_row_type())
else:
return builder
# ---- StreamingFileSink ----
[docs]class StreamingFileSink(SinkFunction):
"""
Sink that emits its input elements to `FileSystem` files within buckets. This is
integrated with the checkpointing mechanism to provide exactly once semantics.
When creating the sink a `basePath` must be specified. The base directory contains
one directory for every bucket. The bucket directories themselves contain several part files,
with at least one for each parallel subtask of the sink which is writing data to that bucket.
These part files contain the actual output data.
"""
def __init__(self, j_obj):
warnings.warn("Deprecated in 1.15. Use FileSink instead.", DeprecationWarning)
super(StreamingFileSink, self).__init__(j_obj)
class BaseBuilder(object):
def __init__(self, j_builder):
self._j_builder = j_builder
def with_bucket_check_interval(self, interval: int):
self._j_builder.withBucketCheckInterval(interval)
return self
def with_bucket_assigner(self, bucket_assigner: BucketAssigner):
self._j_builder.withBucketAssigner(bucket_assigner.get_java_object())
return self
@abstractmethod
def with_rolling_policy(self, policy):
pass
def with_output_file_config(self, output_file_config: OutputFileConfig):
self._j_builder.withOutputFileConfig(output_file_config.get_java_object())
return self
def build(self) -> 'StreamingFileSink':
j_stream_file_sink = self._j_builder.build()
return StreamingFileSink(j_stream_file_sink)
class DefaultRowFormatBuilder(BaseBuilder):
"""
Builder for the vanilla `StreamingFileSink` using a row format.
"""
def __init__(self, j_default_row_format_builder):
super().__init__(j_default_row_format_builder)
def with_rolling_policy(self, policy: RollingPolicy):
self._j_builder.withRollingPolicy(policy.get_java_object())
return self
@staticmethod
def for_row_format(base_path: str, encoder: Encoder) -> 'DefaultRowFormatBuilder':
j_path = get_gateway().jvm.org.apache.flink.core.fs.Path(base_path)
j_default_row_format_builder = get_gateway().jvm.org.apache.flink.streaming.api.\
functions.sink.filesystem.StreamingFileSink.forRowFormat(j_path, encoder._j_encoder)
return StreamingFileSink.DefaultRowFormatBuilder(j_default_row_format_builder)
class DefaultBulkFormatBuilder(BaseBuilder):
def __init__(self, j_default_bulk_format_builder):
super().__init__(j_default_bulk_format_builder)
def with_rolling_policy(self, policy: OnCheckpointRollingPolicy):
self._j_builder.withRollingPolicy(policy.get_java_object())
return self
@staticmethod
def for_bulk_format(base_path: str, writer_factory: BulkWriterFactory):
jvm = get_gateway().jvm
j_path = jvm.org.apache.flink.core.fs.Path(base_path)
j_default_bulk_format_builder = jvm.org.apache.flink.streaming.api.functions.sink \
.filesystem.StreamingFileSink.forBulkFormat(j_path, writer_factory.get_java_object())
return StreamingFileSink.DefaultBulkFormatBuilder(j_default_bulk_format_builder)