Source code for pyflink.datastream.connectors.jdbc

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.common.typeinfo import RowTypeInfo
from pyflink.datastream.functions import SinkFunction
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_jarray


__all__ = [
    'JdbcSink',
    'JdbcConnectionOptions',
    'JdbcExecutionOptions'
]


[docs]class JdbcSink(SinkFunction): def __init__(self, j_jdbc_sink): super(JdbcSink, self).__init__(sink_func=j_jdbc_sink) @staticmethod def sink(sql: str, type_info: RowTypeInfo, jdbc_connection_options: 'JdbcConnectionOptions', jdbc_execution_options: 'JdbcExecutionOptions' = None): """ Create a JDBC sink. :param sql: arbitrary DML query (e.g. insert, update, upsert) :param type_info: A RowTypeInfo for query field types. :param jdbc_execution_options: parameters of execution, such as batch size and maximum retries. :param jdbc_connection_options: parameters of connection, such as JDBC URL. :return: A JdbcSink. """ sql_types = [] gateway = get_gateway() JJdbcTypeUtil = gateway.jvm.org.apache.flink.connector.jdbc.utils.JdbcTypeUtil for field_type in type_info.get_field_types(): sql_types.append(JJdbcTypeUtil .typeInformationToSqlType(field_type.get_java_type_info())) j_sql_type = to_jarray(gateway.jvm.int, sql_types) output_format_clz = gateway.jvm.Class\ .forName('org.apache.flink.connector.jdbc.internal.JdbcOutputFormat', False, get_gateway().jvm.Thread.currentThread().getContextClassLoader()) j_int_array_type = to_jarray(gateway.jvm.int, []).getClass() j_builder_method = output_format_clz.getDeclaredMethod('createRowJdbcStatementBuilder', to_jarray(gateway.jvm.Class, [j_int_array_type])) j_builder_method.setAccessible(True) j_statement_builder = j_builder_method.invoke(None, to_jarray(gateway.jvm.Object, [j_sql_type])) jdbc_execution_options = jdbc_execution_options if jdbc_execution_options is not None \ else JdbcExecutionOptions.defaults() j_jdbc_sink = gateway.jvm.org.apache.flink.connector.jdbc.JdbcSink\ .sink(sql, j_statement_builder, jdbc_execution_options._j_jdbc_execution_options, jdbc_connection_options._j_jdbc_connection_options) return JdbcSink(j_jdbc_sink=j_jdbc_sink)
[docs]class JdbcConnectionOptions(object): """ JDBC connection options. """ def __init__(self, j_jdbc_connection_options): self._j_jdbc_connection_options = j_jdbc_connection_options def get_db_url(self) -> str: return self._j_jdbc_connection_options.getDbURL() def get_driver_name(self) -> str: return self._j_jdbc_connection_options.getDriverName() def get_user_name(self) -> str: return self._j_jdbc_connection_options.getUsername() def get_password(self) -> str: return self._j_jdbc_connection_options.getPassword() class JdbcConnectionOptionsBuilder(object): """ Builder for JdbcConnectionOptions. """ def __init__(self): self._j_options_builder = get_gateway().jvm.org.apache.flink.connector\ .jdbc.JdbcConnectionOptions.JdbcConnectionOptionsBuilder() def with_url(self, url: str) -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withUrl(url) return self def with_driver_name(self, driver_name: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withDriverName(driver_name) return self def with_user_name(self, user_name: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withUsername(user_name) return self def with_password(self, password: str) \ -> 'JdbcConnectionOptions.JdbcConnectionOptionsBuilder': self._j_options_builder.withPassword(password) return self def build(self) -> 'JdbcConnectionOptions': return JdbcConnectionOptions(j_jdbc_connection_options=self._j_options_builder.build())
[docs]class JdbcExecutionOptions(object): """ JDBC sink batch options. """ def __init__(self, j_jdbc_execution_options): self._j_jdbc_execution_options = j_jdbc_execution_options def get_batch_interval_ms(self) -> int: return self._j_jdbc_execution_options.getBatchIntervalMs() def get_batch_size(self) -> int: return self._j_jdbc_execution_options.getBatchSize() def get_max_retries(self) -> int: return self._j_jdbc_execution_options.getMaxRetries() @staticmethod def defaults() -> 'JdbcExecutionOptions': return JdbcExecutionOptions( j_jdbc_execution_options=get_gateway().jvm .org.apache.flink.connector.jdbc.JdbcExecutionOptions.defaults()) @staticmethod def builder() -> 'Builder': return JdbcExecutionOptions.Builder() class Builder(object): """ Builder for JdbcExecutionOptions. """ def __init__(self): self._j_builder = get_gateway().jvm\ .org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder() def with_batch_size(self, size: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withBatchSize(size) return self def with_batch_interval_ms(self, interval_ms: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withBatchIntervalMs(interval_ms) return self def with_max_retries(self, max_retries: int) -> 'JdbcExecutionOptions.Builder': self._j_builder.withMaxRetries(max_retries) return self def build(self) -> 'JdbcExecutionOptions': return JdbcExecutionOptions(j_jdbc_execution_options=self._j_builder.build())