################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Callable, Union
from pyflink.common import typeinfo, ExecutionConfig, Row
from pyflink.common.typeinfo import RowTypeInfo, PickledBytesTypeInfo, Types, WrapperTypeInfo
from pyflink.common.typeinfo import TypeInformation
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.functions import _get_python_env, FlatMapFunctionWrapper, FlatMapFunction, \
MapFunction, MapFunctionWrapper, Function, FunctionWrapper, SinkFunction, FilterFunction, \
FilterFunctionWrapper, KeySelectorFunctionWrapper, KeySelector, ReduceFunction, \
ReduceFunctionWrapper, CoMapFunction, CoFlatMapFunction, Partitioner, \
PartitionerFunctionWrapper, RuntimeContext, ProcessFunction, KeyedProcessFunction
from pyflink.java_gateway import get_gateway
[docs]class DataStream(object):
"""
A DataStream represents a stream of elements of the same type. A DataStream can be transformed
into another DataStream by applying a transformation as for example:
::
>>> DataStream.map(MapFunctionImpl())
>>> DataStream.filter(FilterFunctionImpl())
"""
def __init__(self, j_data_stream):
self._j_data_stream = j_data_stream
[docs] def get_name(self) -> str:
"""
Gets the name of the current data stream. This name is used by the visualization and logging
during runtime.
:return: Name of the stream.
"""
return self._j_data_stream.getName()
[docs] def name(self, name: str) -> 'DataStream':
"""
Sets the name of the current data stream. This name is used by the visualization and logging
during runting.
:param name: Name of the stream.
:return: The named operator.
"""
self._j_data_stream.name(name)
return self
[docs] def uid(self, uid: str) -> 'DataStream':
"""
Sets an ID for this operator. The specified ID is used to assign the same operator ID across
job submissions (for example when starting a job from a savepoint).
Important: this ID needs to be unique per transformation and job. Otherwise, job submission
will fail.
:param uid: The unique user-specified ID of this transformation.
:return: The operator with the specified ID.
"""
self._j_data_stream.uid(uid)
return self
[docs] def set_uid_hash(self, uid_hash: str) -> 'DataStream':
"""
Sets an user provided hash for this operator. This will be used AS IS the create the
JobVertexID. The user provided hash is an alternative to the generated hashed, that is
considered when identifying an operator through the default hash mechanics fails (e.g.
because of changes between Flink versions).
Important: this should be used as a workaround or for trouble shooting. The provided hash
needs to be unique per transformation and job. Otherwise, job submission will fail.
Furthermore, you cannot assign user-specified hash to intermediate nodes in an operator
chain and trying so will let your job fail.
A use case for this is in migration between Flink versions or changing the jobs in a way
that changes the automatically generated hashes. In this case, providing the previous hashes
directly through this method (e.g. obtained from old logs) can help to reestablish a lost
mapping from states to their target operator.
:param uid_hash: The user provided hash for this operator. This will become the jobVertexID,
which is shown in the logs and web ui.
:return: The operator with the user provided hash.
"""
self._j_data_stream.setUidHash(uid_hash)
return self
[docs] def set_parallelism(self, parallelism: int) -> 'DataStream':
"""
Sets the parallelism for this operator.
:param parallelism: THe parallelism for this operator.
:return: The operator with set parallelism.
"""
self._j_data_stream.setParallelism(parallelism)
return self
[docs] def set_max_parallelism(self, max_parallelism: int) -> 'DataStream':
"""
Sets the maximum parallelism of this operator.
The maximum parallelism specifies the upper bound for dynamic scaling. It also defines the
number of key groups used for partitioned state.
:param max_parallelism: Maximum parallelism.
:return: The operator with set maximum parallelism.
"""
self._j_data_stream.setMaxParallelism(max_parallelism)
return self
[docs] def get_type(self) -> TypeInformation:
"""
Gets the type of the stream.
:return: The type of the DataStream.
"""
return typeinfo._from_java_type(self._j_data_stream.getType())
[docs] def get_execution_environment(self):
"""
Returns the StreamExecutionEnvironment that was used to create this DataStream.
:return: The Execution Environment.
"""
from pyflink.datastream import StreamExecutionEnvironment
return StreamExecutionEnvironment(
j_stream_execution_environment=self._j_data_stream.getExecutionEnvironment())
[docs] def get_execution_config(self) -> ExecutionConfig:
return ExecutionConfig(j_execution_config=self._j_data_stream.getExecutionConfig())
[docs] def force_non_parallel(self) -> 'DataStream':
"""
Sets the parallelism and maximum parallelism of this operator to one. And mark this operator
cannot set a non-1 degree of parallelism.
:return: The operator with only one parallelism.
"""
self._j_data_stream.forceNonParallel()
return self
[docs] def set_buffer_timeout(self, timeout_millis: int) -> 'DataStream':
"""
Sets the buffering timeout for data produced by this operation. The timeout defines how long
data may linger ina partially full buffer before being sent over the network.
Lower timeouts lead to lower tail latencies, but may affect throughput. Timeouts of 1 ms
still sustain high throughput, even for jobs with high parallelism.
A value of '-1' means that the default buffer timeout should be used. A value of '0'
indicates that no buffering should happen, and all records/events should be immediately sent
through the network, without additional buffering.
:param timeout_millis: The maximum time between two output flushes.
:return: The operator with buffer timeout set.
"""
self._j_data_stream.setBufferTimeout(timeout_millis)
return self
[docs] def start_new_chain(self) -> 'DataStream':
"""
Starts a new task chain beginning at this operator. This operator will be chained (thread
co-located for increased performance) to any previous tasks even if possible.
:return: The operator with chaining set.
"""
self._j_data_stream.startNewChain()
return self
[docs] def disable_chaining(self) -> 'DataStream':
"""
Turns off chaining for this operator so thread co-location will not be used as an
optimization.
Chaining can be turned off for the whole job by
StreamExecutionEnvironment.disableOperatorChaining() however it is not advised for
performance consideration.
:return: The operator with chaining disabled.
"""
self._j_data_stream.disableChaining()
return self
[docs] def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStream':
"""
Sets the slot sharing group of this operation. Parallel instances of operations that are in
the same slot sharing group will be co-located in the same TaskManager slot, if possible.
Operations inherit the slot sharing group of input operations if all input operations are in
the same slot sharing group and no slot sharing group was explicitly specified.
Initially an operation is in the default slot sharing group. An operation can be put into
the default group explicitly by setting the slot sharing group to 'default'.
:param slot_sharing_group: The slot sharing group name.
:return: This operator.
"""
self._j_data_stream.slotSharingGroup(slot_sharing_group)
return self
[docs] def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation = None) \
-> 'DataStream':
"""
Applies a Map transformation on a DataStream. The transformation calls a MapFunction for
each element of the DataStream. Each MapFunction call returns exactly one element. The user
can also extend RichMapFunction to gain access to other features provided by the
RichFunction interface.
Note that If user does not specify the output data type, the output data will be serialized
as pickle primitive byte array.
:param func: The MapFunction that is called for each element of the DataStream.
:param output_type: The type information of the MapFunction output data.
:return: The transformed DataStream.
"""
if not isinstance(func, MapFunction):
if callable(func):
func = MapFunctionWrapper(func) # type: ignore
else:
raise TypeError("The input must be a MapFunction or a callable function")
from pyflink.fn_execution import flink_fn_execution_pb2
j_operator, j_output_type_info = _get_one_input_stream_operator(
self,
func, # type: ignore
flink_fn_execution_pb2.UserDefinedDataStreamFunction.MAP, # type: ignore
output_type)
return DataStream(self._j_data_stream.transform(
"Map",
j_output_type_info,
j_operator
))
[docs] def flat_map(self, func: Union[Callable, FlatMapFunction],
result_type: TypeInformation = None) -> 'DataStream':
"""
Applies a FlatMap transformation on a DataStream. The transformation calls a FlatMapFunction
for each element of the DataStream. Each FlatMapFunction call can return any number of
elements including none. The user can also extend RichFlatMapFunction to gain access to
other features provided by the RichFUnction.
:param func: The FlatMapFunction that is called for each element of the DataStream.
:param result_type: The type information of output data.
:return: The transformed DataStream.
"""
if not isinstance(func, FlatMapFunction):
if callable(func):
func = FlatMapFunctionWrapper(func) # type: ignore
else:
raise TypeError("The input must be a FlatMapFunction or a callable function")
from pyflink.fn_execution import flink_fn_execution_pb2
j_operator, j_output_type_info = _get_one_input_stream_operator(
self,
func, # type: ignore
flink_fn_execution_pb2.UserDefinedDataStreamFunction.FLAT_MAP, # type: ignore
result_type)
return DataStream(self._j_data_stream.transform(
"FLAT_MAP",
j_output_type_info,
j_operator
))
[docs] def key_by(self, key_selector: Union[Callable, KeySelector],
key_type_info: TypeInformation = None) -> 'KeyedStream':
"""
Creates a new KeyedStream that uses the provided key for partitioning its operator states.
:param key_selector: The KeySelector to be used for extracting the key for partitioning.
:param key_type_info: The type information describing the key type.
:return: The DataStream with partitioned state(i.e. KeyedStream).
"""
if callable(key_selector):
key_selector = KeySelectorFunctionWrapper(key_selector) # type: ignore
if not isinstance(key_selector, (KeySelector, KeySelectorFunctionWrapper)):
raise TypeError("Parameter key_selector should be a type of KeySelector.")
output_type_info = typeinfo._from_java_type(
self._j_data_stream.getTransformation().getOutputType())
is_key_pickled_byte_array = False
if key_type_info is None:
key_type_info = Types.PICKLED_BYTE_ARRAY()
is_key_pickled_byte_array = True
if not isinstance(key_type_info, WrapperTypeInfo):
raise ValueError('key_type_info must be WrapperTypeInfo')
intermediate_map_stream = self.map(
lambda x: Row(key_selector.get_key(x), x), # type: ignore
output_type=Types.ROW([key_type_info, output_type_info]))
gateway = get_gateway()
JKeyByKeySelector = gateway.jvm.KeyByKeySelector
intermediate_map_stream.name(gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
.STREAM_KEY_BY_MAP_OPERATOR_NAME)
key_stream = KeyedStream(
intermediate_map_stream._j_data_stream.keyBy(
JKeyByKeySelector(is_key_pickled_byte_array),
key_type_info.get_java_type_info()), output_type_info,
self)
return key_stream
[docs] def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
"""
Applies a Filter transformation on a DataStream. The transformation calls a FilterFunction
for each element of the DataStream and retains only those element for which the function
returns true. Elements for which the function returns false are filtered. The user can also
extend RichFilterFunction to gain access to other features provided by the RichFunction
interface.
:param func: The FilterFunction that is called for each element of the DataStream.
:return: The filtered DataStream.
"""
class FilterFlatMap(FlatMapFunction):
def __init__(self, filter_func):
self._func = filter_func
def flat_map(self, value):
if self._func.filter(value):
yield value
if callable(func):
func = FilterFunctionWrapper(func) # type: ignore
elif not isinstance(func, FilterFunction):
raise TypeError("func must be a Callable or instance of FilterFunction.")
type_info = typeinfo._from_java_type(
self._j_data_stream.getTransformation().getOutputType())
data_stream = self.flat_map(FilterFlatMap(func), result_type=type_info)
data_stream.name("Filter")
return data_stream
[docs] def union(self, *streams: 'DataStream') -> 'DataStream':
"""
Creates a new DataStream by merging DataStream outputs of the same type with each other. The
DataStreams merged using this operator will be transformed simultaneously.
:param streams: The DataStream to union outputwith.
:return: The DataStream.
"""
j_data_streams = []
for data_stream in streams:
j_data_streams.append(data_stream._j_data_stream)
gateway = get_gateway()
JDataStream = gateway.jvm.org.apache.flink.streaming.api.datastream.DataStream
j_data_stream_arr = get_gateway().new_array(JDataStream, len(j_data_streams))
for i in range(len(j_data_streams)):
j_data_stream_arr[i] = j_data_streams[i]
j_united_stream = self._j_data_stream.union(j_data_stream_arr)
return DataStream(j_data_stream=j_united_stream)
[docs] def connect(self, ds: 'DataStream') -> 'ConnectedStreams':
"""
Creates a new 'ConnectedStreams' by connecting 'DataStream' outputs of (possible)
different types with each other. The DataStreams connected using this operator can
be used with CoFunctions to apply joint transformations.
:param ds: The DataStream with which this stream will be connected.
:return: The `ConnectedStreams`.
"""
return ConnectedStreams(self, ds)
[docs] def shuffle(self) -> 'DataStream':
"""
Sets the partitioning of the DataStream so that the output elements are shuffled uniformly
randomly to the next operation.
:return: The DataStream with shuffle partitioning set.
"""
return DataStream(self._j_data_stream.shuffle())
[docs] def project(self, *field_indexes: int) -> 'DataStream':
"""
Initiates a Project transformation on a Tuple DataStream.
Note that only Tuple DataStreams can be projected.
:param field_indexes: The field indexes of the input tuples that are retained. The order of
fields in the output tuple corresponds to the order of field indexes.
:return: The projected DataStream.
"""
if not isinstance(self.get_type(), typeinfo.TupleTypeInfo):
raise Exception('Only Tuple DataStreams can be projected.')
gateway = get_gateway()
j_index_arr = gateway.new_array(gateway.jvm.int, len(field_indexes))
for i in range(len(field_indexes)):
j_index_arr[i] = field_indexes[i]
return DataStream(self._j_data_stream.project(j_index_arr))
[docs] def rescale(self) -> 'DataStream':
"""
Sets the partitioning of the DataStream so that the output elements are distributed evenly
to a subset of instances of the next operation in a round-robin fashion.
The subset of downstream operations to which the upstream operation sends elements depends
on the degree of parallelism of both the upstream and downstream operation. For example, if
the upstream operation has parallelism 2 and the downstream operation has parallelism 4,
then one upstream operation would distribute elements to two downstream operations. If, on
the other hand, the downstream operation has parallelism 4 then two upstream operations will
distribute to one downstream operation while the other two upstream operations will
distribute to the other downstream operations.
In cases where the different parallelisms are not multiples of each one or several
downstream operations will have a differing number of inputs from upstream operations.
:return: The DataStream with rescale partitioning set.
"""
return DataStream(self._j_data_stream.rescale())
[docs] def rebalance(self) -> 'DataStream':
"""
Sets the partitioning of the DataStream so that the output elements are distributed evenly
to instances of the next operation in a round-robin fashion.
:return: The DataStream with rebalance partition set.
"""
return DataStream(self._j_data_stream.rebalance())
[docs] def forward(self) -> 'DataStream':
"""
Sets the partitioning of the DataStream so that the output elements are forwarded to the
local sub-task of the next operation.
:return: The DataStream with forward partitioning set.
"""
return DataStream(self._j_data_stream.forward())
[docs] def broadcast(self) -> 'DataStream':
"""
Sets the partitioning of the DataStream so that the output elements are broadcasted to every
parallel instance of the next operation.
:return: The DataStream with broadcast partitioning set.
"""
return DataStream(self._j_data_stream.broadcast())
[docs] def process(self, func: ProcessFunction, output_type: TypeInformation = None) -> 'DataStream':
"""
Applies the given ProcessFunction on the input stream, thereby creating a transformed output
stream.
The function will be called for every element in the input streams and can produce zero or
more output elements.
:param func: The ProcessFunction that is called for each element in the stream.
:param output_type: TypeInformation for the result type of the function.
:return: The transformed DataStream.
"""
from pyflink.fn_execution import flink_fn_execution_pb2
j_python_data_stream_function_operator, j_output_type_info = \
_get_one_input_stream_operator(
self,
func,
flink_fn_execution_pb2.UserDefinedDataStreamFunction.PROCESS, # type: ignore
output_type)
return DataStream(self._j_data_stream.transform(
"PROCESS",
j_output_type_info,
j_python_data_stream_function_operator))
[docs] def assign_timestamps_and_watermarks(self, watermark_strategy: WatermarkStrategy) -> \
'DataStream':
"""
Assigns timestamps to the elements in the data stream and generates watermarks to signal
event time progress. The given {@link WatermarkStrategy} is used to create a
TimestampAssigner and WatermarkGenerator.
:param watermark_strategy: The strategy to generate watermarks based on event timestamps.
:return: The stream after the transformation, with assigned timestamps and watermarks.
"""
if watermark_strategy._timestamp_assigner is not None:
# user implement a TimestampAssigner, we need to extracted and generate watermarks with
# a custom Operator.
from pyflink.fn_execution import flink_fn_execution_pb2 as ffpb2
gateway = get_gateway()
import cloudpickle
serialized_func = cloudpickle.dumps(watermark_strategy._timestamp_assigner)
JDataStreamPythonFunction = gateway.jvm.DataStreamPythonFunction
j_data_stream_python_function = JDataStreamPythonFunction(
bytearray(serialized_func),
_get_python_env())
JDataStreamPythonFunctionInfo = gateway.jvm.DataStreamPythonFunctionInfo
j_data_stream_python_function_info = JDataStreamPythonFunctionInfo(
j_data_stream_python_function,
ffpb2.UserDefinedDataStreamFunction.TIMESTAMP_ASSIGNER) # type: ignore
j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
j_output_type = self._j_data_stream.getType()
j_operator = gateway.jvm\
.org.apache.flink.streaming.api.operators.python\
.PythonTimestampsAndWatermarksOperator(
j_conf,
j_output_type,
j_data_stream_python_function_info,
watermark_strategy._j_watermark_strategy)
operator_name = gateway.jvm.org.apache.flink.streaming.api.operators.python\
.PythonTimestampsAndWatermarksOperator.STREAM_TIMESTAMP_AND_WATERMARK_OPERATOR_NAME
return DataStream(self._j_data_stream.transform(
operator_name,
j_output_type,
j_operator))
else:
# if user not specify a TimestampAssigner, then return directly assign the Java
# watermark strategy.
return DataStream(self._j_data_stream.assignTimestampsAndWatermarks(
watermark_strategy._j_watermark_strategy))
[docs] def partition_custom(self, partitioner: Union[Callable, Partitioner],
key_selector: Union[Callable, KeySelector]) -> 'DataStream':
"""
Partitions a DataStream on the key returned by the selector, using a custom partitioner.
This method takes the key selector to get the key to partition on, and a partitioner that
accepts the key type.
Note that this method works only on single field keys, i.e. the selector cannet return
tuples of fields.
:param partitioner: The partitioner to assign partitions to keys.
:param key_selector: The KeySelector with which the DataStream is partitioned.
:return: The partitioned DataStream.
"""
if callable(key_selector):
key_selector = KeySelectorFunctionWrapper(key_selector) # type: ignore
if not isinstance(key_selector, (KeySelector, KeySelectorFunctionWrapper)):
raise TypeError("Parameter key_selector should be a type of KeySelector.")
if callable(partitioner):
partitioner = PartitionerFunctionWrapper(partitioner) # type: ignore
if not isinstance(partitioner, (Partitioner, PartitionerFunctionWrapper)):
raise TypeError("Parameter partitioner should be a type of Partitioner.")
gateway = get_gateway()
class PartitionCustomMapFunction(MapFunction):
"""
A wrapper class for partition_custom map function. It indicates that it is a partition
custom operation that we need to apply PythonPartitionCustomOperator
to run the map function.
"""
def __init__(self):
self.num_partitions = None
def open(self, runtime_context: RuntimeContext):
self.num_partitions = int(runtime_context.get_job_parameter(
"NUM_PARTITIONS", "-1"))
if self.num_partitions <= 0:
raise ValueError(
"The partition number should be a positive value, got %s"
% self.num_partitions)
def map(self, value):
partition = partitioner.partition(key_selector.get_key(value), self.num_partitions)
return Row(partition, value)
def __repr__(self) -> str:
return '_Flink_PartitionCustomMapFunction'
original_type_info = self.get_type()
intermediate_map_stream = self.map(PartitionCustomMapFunction(),
output_type=Types.ROW([Types.INT(), original_type_info]))
intermediate_map_stream.name(
gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
.STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME)
JPartitionCustomKeySelector = gateway.jvm.PartitionCustomKeySelector
JIdParitioner = gateway.jvm.org.apache.flink.api.java.functions.IdPartitioner
intermediate_map_stream = DataStream(intermediate_map_stream._j_data_stream
.partitionCustom(JIdParitioner(),
JPartitionCustomKeySelector()))
values_map_stream = intermediate_map_stream.map(lambda x: x[1], original_type_info)
values_map_stream.name(gateway.jvm.org.apache.flink.python.util.PythonConfigUtil
.KEYED_STREAM_VALUE_OPERATOR_NAME)
return DataStream(values_map_stream._j_data_stream)
[docs] def add_sink(self, sink_func: SinkFunction) -> 'DataStreamSink':
"""
Adds the given sink to this DataStream. Only streams with sinks added will be executed once
the StreamExecutionEnvironment.execute() method is called.
:param sink_func: The SinkFunction object.
:return: The closed DataStream.
"""
return DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
[docs] def print(self, sink_identifier: str = None) -> 'DataStreamSink':
"""
Writes a DataStream to the standard output stream (stdout).
For each element of the DataStream the object string is writen.
NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink
worker, and is not fault tolerant.
:param sink_identifier: The string to prefix the output with.
:return: The closed DataStream.
"""
if sink_identifier is not None:
j_data_stream_sink = self._align_output_type()._j_data_stream.print(sink_identifier)
else:
j_data_stream_sink = self._align_output_type()._j_data_stream.print()
return DataStreamSink(j_data_stream_sink)
def _align_output_type(self) -> 'DataStream':
"""
Transform the pickled python object into String if the output type is PickledByteArrayInfo.
"""
output_type_info_class = self._j_data_stream.getTransformation().getOutputType().getClass()
if output_type_info_class.isAssignableFrom(
PickledBytesTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO().get_java_type_info()
.getClass()):
def python_obj_to_str_map_func(value):
if not isinstance(value, (str, bytes)):
value = str(value)
return value
transformed_data_stream = DataStream(
self.map(python_obj_to_str_map_func,
output_type=Types.STRING())._j_data_stream)
return transformed_data_stream
else:
return self
class DataStreamSink(object):
"""
A Stream Sink. This is used for emitting elements from a streaming topology.
"""
def __init__(self, j_data_stream_sink):
"""
The constructor of DataStreamSink.
:param j_data_stream_sink: A DataStreamSink java object.
"""
self._j_data_stream_sink = j_data_stream_sink
def name(self, name: str) -> 'DataStreamSink':
"""
Sets the name of this sink. THis name is used by the visualization and logging during
runtime.
:param name: The name of this sink.
:return: The named sink.
"""
self._j_data_stream_sink.name(name)
return self
def uid(self, uid: str) -> 'DataStreamSink':
"""
Sets an ID for this operator. The specified ID is used to assign the same operator ID across
job submissions (for example when starting a job from a savepoint).
Important: this ID needs to be unique per transformation and job. Otherwise, job submission
will fail.
:param uid: The unique user-specified ID of this transformation.
:return: The operator with the specified ID.
"""
self._j_data_stream_sink.uid(uid)
return self
def set_uid_hash(self, uid_hash: str) -> 'DataStreamSink':
"""
Sets an user provided hash for this operator. This will be used AS IS the create the
JobVertexID. The user provided hash is an alternative to the generated hashed, that is
considered when identifying an operator through the default hash mechanics fails (e.g.
because of changes between Flink versions).
Important: this should be used as a workaround or for trouble shooting. The provided hash
needs to be unique per transformation and job. Otherwise, job submission will fail.
Furthermore, you cannot assign user-specified hash to intermediate nodes in an operator
chain and trying so will let your job fail.
A use case for this is in migration between Flink versions or changing the jobs in a way
that changes the automatically generated hashes. In this case, providing the previous hashes
directly through this method (e.g. obtained from old logs) can help to reestablish a lost
mapping from states to their target operator.
:param uid_hash: The user provided hash for this operator. This will become the jobVertexID,
which is shown in the logs and web ui.
:return: The operator with the user provided hash.
"""
self._j_data_stream_sink.setUidHash(uid_hash)
return self
def set_parallelism(self, parallelism: int) -> 'DataStreamSink':
"""
Sets the parallelism for this operator.
:param parallelism: THe parallelism for this operator.
:return: The operator with set parallelism.
"""
self._j_data_stream_sink.setParallelism(parallelism)
return self
def disable_chaining(self) -> 'DataStreamSink':
"""
Turns off chaining for this operator so thread co-location will not be used as an
optimization.
Chaining can be turned off for the whole job by
StreamExecutionEnvironment.disableOperatorChaining() however it is not advised for
performance consideration.
:return: The operator with chaining disabled.
"""
self._j_data_stream_sink.disableChaining()
return self
def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStreamSink':
"""
Sets the slot sharing group of this operation. Parallel instances of operations that are in
the same slot sharing group will be co-located in the same TaskManager slot, if possible.
Operations inherit the slot sharing group of input operations if all input operations are in
the same slot sharing group and no slot sharing group was explicitly specified.
Initially an operation is in the default slot sharing group. An operation can be put into
the default group explicitly by setting the slot sharing group to 'default'.
:param slot_sharing_group: The slot sharing group name.
:return: This operator.
"""
self._j_data_stream_sink.slotSharingGroup(slot_sharing_group)
return self
class KeyedStream(DataStream):
"""
A KeyedStream represents a DataStream on which operator state is partitioned by key using a
provided KeySelector. Typical operations supported by a DataStream are also possible on a
KeyedStream, with the exception of partitioning methods such as shuffle, forward and keyBy.
Reduce-style operations, such as reduce and sum work on elements that have the same key.
"""
def __init__(self, j_keyed_stream, original_data_type_info, origin_stream: DataStream):
"""
Constructor of KeyedStream.
:param j_keyed_stream: A java KeyedStream object.
:param original_data_type_info: Original data typeinfo.
:param origin_stream: The DataStream before key by.
"""
super(KeyedStream, self).__init__(j_data_stream=j_keyed_stream)
self._original_data_type_info = original_data_type_info
self._origin_stream = origin_stream
def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation = None) \
-> 'DataStream':
return self._values().map(func, output_type)
def flat_map(self, func: Union[Callable, FlatMapFunction], result_type: TypeInformation = None)\
-> 'DataStream':
return self._values().flat_map(func, result_type)
def reduce(self, func: Union[Callable, ReduceFunction]) -> 'DataStream':
"""
Applies a reduce transformation on the grouped data stream grouped on by the given
key position. The `ReduceFunction` will receive input values based on the key value.
Only input values with the same key will go to the same reducer.
Example:
::
>>> ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b'])
>>> ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1])
:param func: The ReduceFunction that is called for each element of the DataStream.
:return: The transformed DataStream.
"""
if not isinstance(func, ReduceFunction):
if callable(func):
func = ReduceFunctionWrapper(func) # type: ignore
else:
raise TypeError("The input must be a ReduceFunction or a callable function!")
from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
j_operator, j_output_type_info = \
_get_one_input_stream_operator(
self, func, UserDefinedDataStreamFunction.REDUCE) # type: ignore
return DataStream(self._j_data_stream.transform(
"Keyed Reduce",
j_output_type_info,
j_operator
))
def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream':
return self._values().filter(func)
def add_sink(self, sink_func: SinkFunction) -> 'DataStreamSink':
return self._values().add_sink(sink_func)
def key_by(self, key_selector: Union[Callable, KeySelector],
key_type_info: TypeInformation = None) -> 'KeyedStream':
return self._origin_stream.key_by(key_selector, key_type_info)
def process(self, func: KeyedProcessFunction, # type: ignore
output_type: TypeInformation = None) -> 'DataStream':
"""
Applies the given ProcessFunction on the input stream, thereby creating a transformed output
stream.
The function will be called for every element in the input streams and can produce zero or
more output elements.
:param func: The KeyedProcessFunction that is called for each element in the stream.
:param output_type: TypeInformation for the result type of the function.
:return: The transformed DataStream.
"""
if not isinstance(func, KeyedProcessFunction):
raise TypeError("KeyedProcessFunction is required for KeyedStream.")
from pyflink.fn_execution import flink_fn_execution_pb2
j_python_data_stream_function_operator, j_output_type_info = \
_get_one_input_stream_operator(
self,
func,
flink_fn_execution_pb2.UserDefinedDataStreamFunction.KEYED_PROCESS, # type: ignore
output_type)
return DataStream(self._j_data_stream.transform(
"KEYED PROCESS",
j_output_type_info,
j_python_data_stream_function_operator))
def union(self, *streams) -> 'DataStream':
return self._values().union(*streams)
def shuffle(self) -> 'DataStream':
raise Exception('Cannot override partitioning for KeyedStream.')
def project(self, *field_indexes) -> 'DataStream':
return self._values().project(*field_indexes)
def rescale(self) -> 'DataStream':
raise Exception('Cannot override partitioning for KeyedStream.')
def rebalance(self) -> 'DataStream':
raise Exception('Cannot override partitioning for KeyedStream.')
def forward(self) -> 'DataStream':
raise Exception('Cannot override partitioning for KeyedStream.')
def broadcast(self) -> 'DataStream':
raise Exception('Cannot override partitioning for KeyedStream.')
def partition_custom(self, partitioner: Union[Callable, Partitioner],
key_selector: Union[Callable, KeySelector]) -> 'DataStream':
raise Exception('Cannot override partitioning for KeyedStream.')
def print(self, sink_identifier=None):
return self._values().print()
def _values(self) -> 'DataStream':
"""
Since python KeyedStream is in the format of Row(key_value, original_data), it is used for
getting the original_data.
"""
transformed_stream = super().map(lambda x: x[1], output_type=self._original_data_type_info)
transformed_stream.name(get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil
.KEYED_STREAM_VALUE_OPERATOR_NAME)
return DataStream(transformed_stream._j_data_stream)
def set_parallelism(self, parallelism: int):
raise Exception("Set parallelism for KeyedStream is not supported.")
def name(self, name: str):
raise Exception("Set name for KeyedStream is not supported.")
def get_name(self) -> str:
raise Exception("Get name of KeyedStream is not supported.")
def uid(self, uid: str):
raise Exception("Set uid for KeyedStream is not supported.")
def set_uid_hash(self, uid_hash: str):
raise Exception("Set uid hash for KeyedStream is not supported.")
def set_max_parallelism(self, max_parallelism: int):
raise Exception("Set max parallelism for KeyedStream is not supported.")
def force_non_parallel(self):
raise Exception("Set force non-parallel for KeyedStream is not supported.")
def set_buffer_timeout(self, timeout_millis: int):
raise Exception("Set buffer timeout for KeyedStream is not supported.")
def start_new_chain(self) -> 'DataStream':
raise Exception("Start new chain for KeyedStream is not supported.")
def disable_chaining(self) -> 'DataStream':
raise Exception("Disable chaining for KeyedStream is not supported.")
def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStream':
raise Exception("Setting slot sharing group for KeyedStream is not supported.")
class ConnectedStreams(object):
"""
ConnectedStreams represent two connected streams of (possibly) different data types.
Connected streams are useful for cases where operations on one stream directly
affect the operations on the other stream, usually via shared state between the streams.
An example for the use of connected streams would be to apply rules that change over time
onto another stream. One of the connected streams has the rules, the other stream the
elements to apply the rules to. The operation on the connected stream maintains the
current set of rules in the state. It may receive either a rule update and update the state
or a data element and apply the rules in the state to the element.
The connected stream can be conceptually viewed as a union stream of an Either type, that
holds either the first stream's type or the second stream's type.
"""
def __init__(self, stream1: DataStream, stream2: DataStream):
self.stream1 = stream1
self.stream2 = stream2
def key_by(self, key_selector1: Union[Callable, KeySelector],
key_selector2: Union[Callable, KeySelector],
key_type_info: TypeInformation = None) -> 'ConnectedStreams':
"""
KeyBy operation for connected data stream. Assigns keys to the elements of
input1 and input2 using keySelector1 and keySelector2 with explicit type information
for the common key type.
:param key_selector1: The `KeySelector` used for grouping the first input.
:param key_selector2: The `KeySelector` used for grouping the second input.
:param key_type_info: The type information of the common key type.
:return: The partitioned `ConnectedStreams`
"""
ds1 = self.stream1
ds2 = self.stream2
if isinstance(self.stream1, KeyedStream):
ds1 = self.stream1._origin_stream
if isinstance(self.stream2, KeyedStream):
ds2 = self.stream2._origin_stream
return ConnectedStreams(
ds1.key_by(key_selector1, key_type_info),
ds2.key_by(key_selector2, key_type_info))
def map(self, func: CoMapFunction, output_type: TypeInformation = None) \
-> 'DataStream':
"""
Applies a CoMap transformation on a `ConnectedStreams` and maps the output to a common
type. The transformation calls a `CoMapFunction.map1` for each element of the first
input and `CoMapFunction.map2` for each element of the second input. Each CoMapFunction
call returns exactly one element.
:param func: The CoMapFunction used to jointly transform the two input DataStreams
:param output_type: `TypeInformation` for the result type of the function.
:return: The transformed `DataStream`
"""
if not isinstance(func, CoMapFunction):
raise TypeError("The input function must be a CoMapFunction!")
# get connected stream
j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
from pyflink.fn_execution import flink_fn_execution_pb2
j_operator, j_output_type = _get_two_input_stream_operator(
self,
func,
flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_MAP, # type: ignore
output_type)
return DataStream(j_connected_stream.transform("Co-Map", j_output_type, j_operator))
def flat_map(self, func: CoFlatMapFunction, output_type: TypeInformation = None) \
-> 'DataStream':
"""
Applies a CoFlatMap transformation on a `ConnectedStreams` and maps the output to a
common type. The transformation calls a `CoFlatMapFunction.flatMap1` for each element
of the first input and `CoFlatMapFunction.flatMap2` for each element of the second
input. Each CoFlatMapFunction call returns any number of elements including none.
:param func: The CoFlatMapFunction used to jointly transform the two input DataStreams
:param output_type: `TypeInformation` for the result type of the function.
:return: The transformed `DataStream`
"""
if not isinstance(func, CoFlatMapFunction):
raise TypeError("The input must be a CoFlatMapFunction!")
# get connected stream
j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream)
from pyflink.fn_execution import flink_fn_execution_pb2
j_operator, j_output_type = _get_two_input_stream_operator(
self,
func,
flink_fn_execution_pb2.UserDefinedDataStreamFunction.CO_FLAT_MAP, # type: ignore
output_type)
return DataStream(j_connected_stream.transform("Co-Flat Map", j_output_type, j_operator))
def _is_keyed_stream(self):
return isinstance(self.stream1, KeyedStream) and isinstance(self.stream2, KeyedStream)
def _get_one_input_stream_operator(data_stream: DataStream,
func: Union[Function, FunctionWrapper],
func_type: int,
type_info: TypeInformation = None):
"""
Create a Java one input stream operator.
:param func: a function object that implements the Function interface.
:param func_type: function type, supports MAP, FLAT_MAP, etc.
:param type_info: the data type of the function output data.
:return: A Java operator which is responsible for execution user defined python function.
"""
gateway = get_gateway()
import cloudpickle
serialized_func = cloudpickle.dumps(func)
j_input_types = data_stream._j_data_stream.getTransformation().getOutputType()
if type_info is None:
output_type_info = PickledBytesTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO()
elif isinstance(type_info, list):
output_type_info = RowTypeInfo(type_info)
else:
output_type_info = type_info
j_output_type_info = output_type_info.get_java_type_info()
j_data_stream_python_function = gateway.jvm.DataStreamPythonFunction(
bytearray(serialized_func),
_get_python_env())
j_data_stream_python_function_info = gateway.jvm.DataStreamPythonFunctionInfo(
j_data_stream_python_function,
func_type)
j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
if func_type == UserDefinedDataStreamFunction.REDUCE: # type: ignore
# set max bundle size to 1 to force synchronize process for reduce function.
j_conf.setInteger(gateway.jvm.org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE, 1)
j_output_type_info = j_input_types.getTypeAt(1)
JDataStreamPythonFunctionOperator = gateway.jvm.PythonReduceOperator
elif func_type == UserDefinedDataStreamFunction.MAP: # type: ignore
if str(func) == '_Flink_PartitionCustomMapFunction':
JDataStreamPythonFunctionOperator = gateway.jvm.PythonPartitionCustomOperator
else:
JDataStreamPythonFunctionOperator = gateway.jvm.PythonMapOperator
elif func_type == UserDefinedDataStreamFunction.FLAT_MAP: # type: ignore
JDataStreamPythonFunctionOperator = gateway.jvm.PythonFlatMapOperator
elif func_type == UserDefinedDataStreamFunction.PROCESS: # type: ignore
JDataStreamPythonFunctionOperator = gateway.jvm.PythonProcessOperator
elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS: # type: ignore
JDataStreamPythonFunctionOperator = gateway.jvm.PythonKeyedProcessOperator
else:
raise TypeError("Unsupported function type: %s" % func_type)
j_python_function_operator = JDataStreamPythonFunctionOperator(
j_conf,
j_input_types,
j_output_type_info,
j_data_stream_python_function_info)
return j_python_function_operator, j_output_type_info
def _get_two_input_stream_operator(connected_streams: ConnectedStreams,
func: Union[Function, FunctionWrapper],
func_type: int,
type_info: TypeInformation):
"""
Create a Java two input stream operator.
:param func: a function object that implements the Function interface.
:param func_type: function type, supports MAP, FLAT_MAP, etc.
:param type_info: the data type of the function output data.
:return: A Java operator which is responsible for execution user defined python function.
"""
gateway = get_gateway()
import cloudpickle
serialized_func = cloudpickle.dumps(func)
j_input_types1 = connected_streams.stream1._j_data_stream.getTransformation().getOutputType()
j_input_types2 = connected_streams.stream2._j_data_stream.getTransformation().getOutputType()
if type_info is None:
output_type_info = PickledBytesTypeInfo.PICKLED_BYTE_ARRAY_TYPE_INFO()
elif isinstance(type_info, list):
output_type_info = RowTypeInfo(type_info)
else:
output_type_info = type_info
j_output_type_info = output_type_info.get_java_type_info()
j_data_stream_python_function = gateway.jvm.DataStreamPythonFunction(
bytearray(serialized_func),
_get_python_env())
j_data_stream_python_function_info = gateway.jvm.DataStreamPythonFunctionInfo(
j_data_stream_python_function,
func_type)
from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction
if func_type == UserDefinedDataStreamFunction.CO_FLAT_MAP: # type: ignore
JTwoInputPythonFunctionOperator = gateway.jvm.PythonCoFlatMapOperator
elif func_type == UserDefinedDataStreamFunction.CO_MAP: # type: ignore
JTwoInputPythonFunctionOperator = gateway.jvm.PythonCoMapOperator
else:
raise TypeError("Unsupported function type: %s" % func_type)
j_conf = gateway.jvm.org.apache.flink.configuration.Configuration()
j_python_data_stream_function_operator = JTwoInputPythonFunctionOperator(
j_conf,
j_input_types1,
j_input_types2,
j_output_type_info,
j_data_stream_python_function_info,
connected_streams._is_keyed_stream())
return j_python_data_stream_function_operator, j_output_type_info