@Experimental public class FlinkKafkaShuffle extends Object
FlinkKafkaShuffle
uses Kafka as a message bus to shuffle and persist data at the same
time.
Persisting shuffle data is useful when - you would like to reuse the shuffle data and/or, - you would like to avoid a full restart of a pipeline during failure recovery
Persisting shuffle is achieved by wrapping a FlinkKafkaShuffleProducer
and a FlinkKafkaShuffleConsumer
together into a FlinkKafkaShuffle
. Here is an example how to
use a FlinkKafkaShuffle
.
StreamExecutionEnvironment env = ... // create execution environment
DataStream<X> source = env.addSource(...) // add data stream source
DataStream<Y> dataStream = ... // some transformation(s) based on source
KeyedStream<Y, KEY> keyedStream = FlinkKafkaShuffle
.persistentKeyBy( // keyBy shuffle through kafka
dataStream, // data stream to be shuffled
topic, // Kafka topic written to
producerParallelism, // the number of tasks of a Kafka Producer
numberOfPartitions, // the number of partitions of the Kafka topic written to
kafkaProperties, // kafka properties for Kafka Producer and Consumer
keySelector<Y, KEY>); // key selector to retrieve key from `dataStream'
keyedStream.transform... // some other transformation(s)
KeyedStream<Y, KEY> keyedStreamReuse = FlinkKafkaShuffle
.readKeyBy( // Read the Kafka shuffle data again for other usages
topic, // the topic of Kafka where data is persisted
env, // execution environment, and it can be a new environment
typeInformation<Y>, // type information of the data persisted in Kafka
kafkaProperties, // kafka properties for Kafka Consumer
keySelector<Y, KEY>); // key selector to retrieve key
keyedStreamReuse.transform... // some other transformation(s)
Usage of persistentKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, int, int, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
is similar to DataStream.keyBy(KeySelector)
. The differences are:
1). Partitioning is done through FlinkKafkaShuffleProducer
. FlinkKafkaShuffleProducer
decides which partition a key goes when writing to Kafka
2). Shuffle data can be reused through readKeyBy(java.lang.String, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, org.apache.flink.api.common.typeinfo.TypeInformation<T>, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
, as shown in the
example above.
3). Job execution is decoupled by the persistent Kafka message bus. In the example, the job execution graph is decoupled to three regions: `KafkaShuffleProducer', `KafkaShuffleConsumer' and `KafkaShuffleConsumerReuse' through `PERSISTENT DATA` as shown below. If any region fails the execution, the other two keep progressing.
source -> ... KafkaShuffleProducer -> PERSISTENT DATA -> KafkaShuffleConsumer -> ... | | ----------> KafkaShuffleConsumerReuse -> ...
Constructor and Description |
---|
FlinkKafkaShuffle() |
Modifier and Type | Method and Description |
---|---|
static <T> KeyedStream<T,Tuple> |
persistentKeyBy(DataStream<T> dataStream,
String topic,
int producerParallelism,
int numberOfPartitions,
Properties properties,
int... fields)
Uses Kafka as a message bus to persist keyBy shuffle.
|
static <T,K> KeyedStream<T,K> |
persistentKeyBy(DataStream<T> dataStream,
String topic,
int producerParallelism,
int numberOfPartitions,
Properties properties,
KeySelector<T,K> keySelector)
Uses Kafka as a message bus to persist keyBy shuffle.
|
static <T,K> KeyedStream<T,K> |
readKeyBy(String topic,
StreamExecutionEnvironment env,
TypeInformation<T> typeInformation,
Properties kafkaProperties,
KeySelector<T,K> keySelector)
|
static <T> void |
writeKeyBy(DataStream<T> dataStream,
String topic,
Properties kafkaProperties,
int... fields)
|
static <T,K> void |
writeKeyBy(DataStream<T> dataStream,
String topic,
Properties kafkaProperties,
KeySelector<T,K> keySelector)
|
public static <T,K> KeyedStream<T,K> persistentKeyBy(DataStream<T> dataStream, String topic, int producerParallelism, int numberOfPartitions, Properties properties, KeySelector<T,K> keySelector)
Persisting keyBy shuffle is achieved by wrapping a FlinkKafkaShuffleProducer
and
FlinkKafkaShuffleConsumer
together.
On the producer side, FlinkKafkaShuffleProducer
is similar to DataStream.keyBy(KeySelector)
. They use the same key group assignment function KeyGroupRangeAssignment.assignKeyToParallelOperator(java.lang.Object, int, int)
to decide which partition a key goes.
Hence, each producer task can potentially write to each Kafka partition based on where the
key goes. Here, `numberOfPartitions` equals to the key group size. In the case of using
TimeCharacteristic.EventTime
, each producer task broadcasts its watermark to ALL of
the Kafka partitions to make sure watermark information is propagated correctly.
On the consumer side, each consumer task should read partitions equal to the key group
indices it is assigned. `numberOfPartitions` is the maximum parallelism of the consumer. This
version only supports numberOfPartitions = consumerParallelism. In the case of using TimeCharacteristic.EventTime
, a consumer task is responsible to emit watermarks. Watermarks
are read from the corresponding Kafka partitions. Notice that a consumer task only starts to
emit a watermark after reading at least one watermark from each producer task to make sure
watermarks are monotonically increasing. Hence a consumer task needs to know
`producerParallelism` as well.
T
- Type of the input data streamK
- Type of keydataStream
- Data stream to be shuffledtopic
- Kafka topic written toproducerParallelism
- Parallelism of producernumberOfPartitions
- Number of partitionsproperties
- Kafka propertieskeySelector
- Key selector to retrieve key from `dataStream'writeKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
,
readKeyBy(java.lang.String, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, org.apache.flink.api.common.typeinfo.TypeInformation<T>, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
public static <T> KeyedStream<T,Tuple> persistentKeyBy(DataStream<T> dataStream, String topic, int producerParallelism, int numberOfPartitions, Properties properties, int... fields)
Persisting keyBy shuffle is achieved by wrapping a FlinkKafkaShuffleProducer
and
FlinkKafkaShuffleConsumer
together.
On the producer side, FlinkKafkaShuffleProducer
is similar to DataStream.keyBy(KeySelector)
. They use the same key group assignment function KeyGroupRangeAssignment.assignKeyToParallelOperator(java.lang.Object, int, int)
to decide which partition a key goes.
Hence, each producer task can potentially write to each Kafka partition based on where the
key goes. Here, `numberOfPartitions` equals to the key group size. In the case of using
TimeCharacteristic.EventTime
, each producer task broadcasts its watermark to ALL of
the Kafka partitions to make sure watermark information is propagated correctly.
On the consumer side, each consumer task should read partitions equal to the key group
indices it is assigned. `numberOfPartitions` is the maximum parallelism of the consumer. This
version only supports numberOfPartitions = consumerParallelism. In the case of using TimeCharacteristic.EventTime
, a consumer task is responsible to emit watermarks. Watermarks
are read from the corresponding Kafka partitions. Notice that a consumer task only starts to
emit a watermark after reading at least one watermark from each producer task to make sure
watermarks are monotonically increasing. Hence a consumer task needs to know
`producerParallelism` as well.
T
- Type of the input data streamdataStream
- Data stream to be shuffledtopic
- Kafka topic written toproducerParallelism
- Parallelism of producernumberOfPartitions
- Number of partitionsproperties
- Kafka propertiesfields
- Key positions from the input data streamwriteKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
,
readKeyBy(java.lang.String, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, org.apache.flink.api.common.typeinfo.TypeInformation<T>, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
public static <T,K> void writeKeyBy(DataStream<T> dataStream, String topic, Properties kafkaProperties, KeySelector<T,K> keySelector)
persistentKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, int, int, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
.
This function contains a FlinkKafkaShuffleProducer
to shuffle and persist data in
Kafka. FlinkKafkaShuffleProducer
uses the same key group assignment function KeyGroupRangeAssignment.assignKeyToParallelOperator(java.lang.Object, int, int)
to decide which partition a key goes.
Hence, each producer task can potentially write to each Kafka partition based on the key.
Here, the number of partitions equals to the key group size. In the case of using TimeCharacteristic.EventTime
, each producer task broadcasts each watermark to all of the
Kafka partitions to make sure watermark information is propagated properly.
Attention: make sure kafkaProperties include PRODUCER_PARALLELISM
and PARTITION_NUMBER
explicitly. PRODUCER_PARALLELISM
is the parallelism of the
producer. PARTITION_NUMBER
is the number of partitions. They are
not necessarily the same and allowed to be set independently.
T
- Type of the input data streamK
- Type of keydataStream
- Data stream to be shuffledtopic
- Kafka topic written tokafkaProperties
- Kafka properties for Kafka ProducerkeySelector
- Key selector to retrieve key from `dataStream'persistentKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, int, int, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
,
readKeyBy(java.lang.String, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, org.apache.flink.api.common.typeinfo.TypeInformation<T>, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
public static <T> void writeKeyBy(DataStream<T> dataStream, String topic, Properties kafkaProperties, int... fields)
persistentKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, int, int, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
.
This function contains a FlinkKafkaShuffleProducer
to shuffle and persist data in
Kafka. FlinkKafkaShuffleProducer
uses the same key group assignment function KeyGroupRangeAssignment.assignKeyToParallelOperator(java.lang.Object, int, int)
to decide which partition a key goes.
Hence, each producer task can potentially write to each Kafka partition based on the key.
Here, the number of partitions equals to the key group size. In the case of using TimeCharacteristic.EventTime
, each producer task broadcasts each watermark to all of the
Kafka partitions to make sure watermark information is propagated properly.
Attention: make sure kafkaProperties include PRODUCER_PARALLELISM
and PARTITION_NUMBER
explicitly. PRODUCER_PARALLELISM
is the parallelism of the
producer. PARTITION_NUMBER
is the number of partitions. They are
not necessarily the same and allowed to be set independently.
T
- Type of the input data streamdataStream
- Data stream to be shuffledtopic
- Kafka topic written tokafkaProperties
- Kafka properties for Kafka Producerfields
- Key positions from the input data streampersistentKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, int, int, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
,
readKeyBy(java.lang.String, org.apache.flink.streaming.api.environment.StreamExecutionEnvironment, org.apache.flink.api.common.typeinfo.TypeInformation<T>, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
public static <T,K> KeyedStream<T,K> readKeyBy(String topic, StreamExecutionEnvironment env, TypeInformation<T> typeInformation, Properties kafkaProperties, KeySelector<T,K> keySelector)
persistentKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, int, int, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
.
Each consumer task should read kafka partitions equal to the key group indices it is
assigned. The number of kafka partitions is the maximum parallelism of the consumer. This
version only supports numberOfPartitions = consumerParallelism. In the case of using TimeCharacteristic.EventTime
, a consumer task is responsible to emit watermarks. Watermarks
are read from the corresponding Kafka partitions. Notice that a consumer task only starts to
emit a watermark after receiving at least one watermark from each producer task to make sure
watermarks are monotonically increasing. Hence a consumer task needs to know
`producerParallelism` as well.
Attention: make sure kafkaProperties include PRODUCER_PARALLELISM
and PARTITION_NUMBER
explicitly. PRODUCER_PARALLELISM
is the parallelism of the
producer. PARTITION_NUMBER
is the number of partitions. They are
not necessarily the same and allowed to be set independently.
T
- Schema typeK
- Key typetopic
- The topic of Kafka where data is persistedenv
- Execution environment. readKeyBy's environment can be different from writeKeyBy'stypeInformation
- Type information of the data persisted in KafkakafkaProperties
- kafka properties for Kafka ConsumerkeySelector
- key selector to retrieve keypersistentKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, int, int, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
,
writeKeyBy(org.apache.flink.streaming.api.datastream.DataStream<T>, java.lang.String, java.util.Properties, org.apache.flink.api.java.functions.KeySelector<T, K>)
Copyright © 2014–2021 The Apache Software Foundation. All rights reserved.