Window#
Tumble Window#
Tumbling Time Window#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import argparse
from typing import Iterable
from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])
class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, context.window().start, context.window().end, len([e for e in elements]))]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output
env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)
# define the source
data_stream = env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
# define the watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()]))
# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()
# submit for execution
env.execute()
Tumbling Count Window#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import argparse
from typing import Iterable
from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, Encoder
from pyflink.datastream import StreamExecutionEnvironment, WindowFunction
from pyflink.datastream.window import CountWindow
class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
result = 0
for i in inputs:
result += i[0]
return [(key, result)]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output
env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)
# define the source
data_stream = env.from_collection([
(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
ds = data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
.count_window(2) \
.apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()
# submit for execution
env.execute()
Sliding Window#
Sliding Time Window#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import argparse
from typing import Iterable
from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])
class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, context.window().start, context.window().end, len([e for e in elements]))]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output
env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)
# define the source
data_stream = env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
# define the watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(SlidingEventTimeWindows.of(Time.milliseconds(5), Time.milliseconds(2))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()]))
# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()
# submit for execution
env.execute()
Session Window#
Session With Gap Window#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import argparse
from typing import Iterable
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy, OutputFileConfig
from pyflink.common import Types, WatermarkStrategy, Time, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.window import EventTimeSessionWindows, \
SessionWindowTimeGapExtractor, TimeWindow
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])
class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
def extract(self, element: tuple) -> int:
return element[1]
class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, context.window().start, context.window().end, len([e for e in elements]))]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output
env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)
# define the source
data_stream = env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
# define the watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()]))
# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()
# submit for execution
env.execute()
Session With Dynamic Gap Window#
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import sys
import argparse
from typing import Iterable
from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig, RollingPolicy
from pyflink.common import Types, WatermarkStrategy, Encoder
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, ProcessWindowFunction
from pyflink.datastream.window import EventTimeSessionWindows, \
SessionWindowTimeGapExtractor, TimeWindow
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])
class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
def extract(self, element: tuple) -> int:
return element[1]
class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, context.window().start, context.window().end, len([e for e in elements]))]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')
argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output
env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)
# define the source
data_stream = env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
# define the watermark strategy
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(MyTimestampAssigner())
ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(), Types.INT()]))
# define the sink
if output_path is not None:
ds.sink_to(
sink=FileSink.for_row_format(
base_path=output_path,
encoder=Encoder.simple_string_encoder())
.with_output_file_config(
OutputFileConfig.builder()
.with_part_prefix("prefix")
.with_part_suffix(".ext")
.build())
.with_rolling_policy(RollingPolicy.default_rolling_policy())
.build()
)
else:
print("Printing result to stdout. Use --output to specify output path.")
ds.print()
# submit for execution
env.execute()