pyflink.datastream.connectors.kinesis.FlinkKinesisConsumer#
- class FlinkKinesisConsumer(streams: Union[str, List[str]], deserializer: Union[pyflink.common.serialization.DeserializationSchema, pyflink.datastream.connectors.kinesis.KinesisDeserializationSchema], config_props: Dict)[source]#
The Flink Kinesis Consumer is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will change as shards are closed and created by Kinesis.
To leverage Flink’s checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.
Kinesis and the Flink consumer support dynamic re-sharding and shard IDs, while sequential, cannot be assumed to be consecutive. There is no perfect generic default assignment function. Default shard to subtask assignment, which is based on hash code, may result in skew, with some subtasks having many shards assigned and others none.
It is recommended to monitor the shard distribution and adjust assignment appropriately. A custom assigner implementation can be set via setShardAssigner(KinesisShardAssigner) to optimize the hash function or use static overrides to limit skew.
In order for the consumer to emit watermarks, a timestamp assigner needs to be set via setPeriodicWatermarkAssigner(AssignerWithPeriodicWatermarks) and the auto watermark emit interval configured via ExecutionConfig.setAutoWatermarkInterval(long).
Watermarks can only advance when all shards of a subtask continuously deliver records. To avoid an inactive or closed shard to block the watermark progress, the idle timeout should be configured via configuration property ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS. By default, shards won’t be considered idle and watermark calculation will wait for newer records to arrive from all shards.
Note that re-sharding of the Kinesis stream while an application (that relies on the Kinesis records for watermarking) is running can lead to incorrect late events. This depends on how shards are assigned to subtasks and applies regardless of whether watermarks are generated in the source or a downstream operator.
Methods
get_java_function
()set_periodic_watermark_assigner
(...)Set the assigner that will extract the timestamp from T and calculate the watermark.
set_shard_assigner
(shard_assigner)Provide a custom assigner to influence how shards are distributed over subtasks.
set_watermark_tracker
(watermark_tracker)Set the global watermark tracker.