Source code for pyflink.datastream.connectors.rabbitmq
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.common import SerializationSchema, DeserializationSchema
from pyflink.datastream.functions import SinkFunction, SourceFunction
from pyflink.java_gateway import get_gateway
__all__ = [
'RMQConnectionConfig',
'RMQSource',
'RMQSink'
]
[docs]class RMQConnectionConfig(object):
"""
Connection Configuration for RMQ.
"""
def __init__(self, j_rmq_connection_config):
self._j_rmq_connection_config = j_rmq_connection_config
def get_host(self) -> str:
return self._j_rmq_connection_config.getHost()
def get_port(self) -> int:
return self._j_rmq_connection_config.getPort()
def get_virtual_host(self) -> str:
return self._j_rmq_connection_config.getVirtualHost()
def get_user_name(self) -> str:
return self._j_rmq_connection_config.getUsername()
def get_password(self) -> str:
return self._j_rmq_connection_config.getPassword()
def get_uri(self) -> str:
return self._j_rmq_connection_config.getUri()
def get_network_recovery_interval(self) -> int:
return self._j_rmq_connection_config.getNetworkRecoveryInterval()
def is_automatic_recovery(self) -> bool:
return self._j_rmq_connection_config.isAutomaticRecovery()
def is_topology_recovery(self) -> bool:
return self._j_rmq_connection_config.isTopologyRecovery()
def get_connection_timeout(self) -> int:
return self._j_rmq_connection_config.getConnectionTimeout()
def get_requested_channel_max(self) -> int:
return self._j_rmq_connection_config.getRequestedChannelMax()
def get_requested_frame_max(self) -> int:
return self._j_rmq_connection_config.getRequestedFrameMax()
def get_requested_heartbeat(self) -> int:
return self._j_rmq_connection_config.getRequestedHeartbeat()
class Builder(object):
"""
Builder for RMQConnectionConfig.
"""
def __init__(self):
self._j_options_builder = get_gateway().jvm.org.apache.flink.streaming.connectors\
.rabbitmq.common.RMQConnectionConfig.Builder()
def set_port(self, port: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setPort(port)
return self
def set_host(self, host: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setHost(host)
return self
def set_virtual_host(self, vhost: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setVirtualHost(vhost)
return self
def set_user_name(self, user_name: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setUserName(user_name)
return self
def set_password(self, password: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setPassword(password)
return self
def set_uri(self, uri: str) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setUri(uri)
return self
def set_topology_recovery_enabled(
self, topology_recovery_enabled: bool) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setTopologyRecoveryEnabled(topology_recovery_enabled)
return self
def set_requested_heartbeat(
self, requested_heartbeat: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setRequestedHeartbeat(requested_heartbeat)
return self
def set_requested_frame_max(
self, requested_frame_max: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setRequestedFrameMax(requested_frame_max)
return self
def set_requested_channel_max(
self, requested_channel_max: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setRequestedChannelMax(requested_channel_max)
return self
def set_network_recovery_interval(
self, network_recovery_interval: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setNetworkRecoveryInterval(network_recovery_interval)
return self
def set_connection_timeout(self, connection_timeout: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setConnectionTimeout(connection_timeout)
return self
def set_automatic_recovery(self, automatic_recovery: bool) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setAutomaticRecovery(automatic_recovery)
return self
def set_prefetch_count(self, prefetch_count: int) -> 'RMQConnectionConfig.Builder':
self._j_options_builder.setPrefetchCount(prefetch_count)
return self
def build(self) -> 'RMQConnectionConfig':
return RMQConnectionConfig(self._j_options_builder.build())
[docs]class RMQSource(SourceFunction):
def __init__(self,
connection_config: 'RMQConnectionConfig',
queue_name: str,
use_correlation_id: bool,
deserialization_schema: DeserializationSchema
):
"""
Creates a new RabbitMQ source.
For exactly-once, you must set the correlation ids of messages at the producer.
The correlation id must be unique. Otherwise the behavior of the source is undefined.
If in doubt, set use_correlation_id to False.
When correlation ids are not used, this source has at-least-once processing semantics
when checkpointing is enabled.
:param connection_config: The RabbiMQ connection configuration.
:param queue_name: The queue to receive messages from.
:param use_correlation_id: Whether the messages received are supplied with a unique id
to deduplicate messages (in case of failed acknowledgments).
Only used when checkpointing is enabled.
:param deserialization_schema: A deserializer used to convert between RabbitMQ's
messages and Flink's objects.
"""
JRMQSource = get_gateway().jvm.org.apache.flink.streaming.connectors.rabbitmq.RMQSource
j_rmq_source = JRMQSource(
connection_config._j_rmq_connection_config,
queue_name,
use_correlation_id,
deserialization_schema._j_deserialization_schema
)
super(RMQSource, self).__init__(source_func=j_rmq_source)
[docs]class RMQSink(SinkFunction):
def __init__(self, connection_config: 'RMQConnectionConfig',
queue_name: str, serialization_schema: SerializationSchema):
"""
Creates a new RabbitMQ sink.
:param connection_config: The RabbiMQ connection configuration.
:param queue_name: The queue to publish messages to.
:param serialization_schema: A serializer used to convert Flink objects to bytes.
"""
JRMQSink = get_gateway().jvm.org.apache.flink.streaming.connectors.rabbitmq.RMQSink
j_rmq_sink = JRMQSink(
connection_config._j_rmq_connection_config,
queue_name,
serialization_schema._j_serialization_schema,
)
super(RMQSink, self).__init__(sink_func=j_rmq_sink)