Source code for pyflink.common.serialization
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.common.utils import JavaObjectWrapper
from pyflink.java_gateway import get_gateway
__all__ = [
'SerializationSchema',
'DeserializationSchema',
'SimpleStringSchema',
'Encoder',
'BulkWriterFactory'
]
[docs]class SerializationSchema(object):
"""
Base class for SerializationSchema. The serialization schema describes how to turn a data object
into a different serialized representation. Most data sinks (for example Apache Kafka) require
the data to be handed to them in a specific format (for example as byte strings).
"""
def __init__(self, j_serialization_schema=None):
self._j_serialization_schema = j_serialization_schema
[docs]class DeserializationSchema(object):
"""
Base class for DeserializationSchema. The deserialization schema describes how to turn the byte
messages delivered by certain data sources (for example Apache Kafka) into data types (Java/
Scala objects) that are processed by Flink.
In addition, the DeserializationSchema describes the produced type which lets Flink create
internal serializers and structures to handle the type.
"""
def __init__(self, j_deserialization_schema=None):
self._j_deserialization_schema = j_deserialization_schema
[docs]class SimpleStringSchema(SerializationSchema, DeserializationSchema):
"""
Very simple serialization/deserialization schema for strings. By default, the serializer uses
'UTF-8' for string/byte conversion.
"""
def __init__(self, charset: str = 'UTF-8'):
gate_way = get_gateway()
j_char_set = gate_way.jvm.java.nio.charset.Charset.forName(charset)
j_simple_string_serialization_schema = gate_way \
.jvm.org.apache.flink.api.common.serialization.SimpleStringSchema(j_char_set)
SerializationSchema.__init__(self,
j_serialization_schema=j_simple_string_serialization_schema)
DeserializationSchema.__init__(
self, j_deserialization_schema=j_simple_string_serialization_schema)
[docs]class Encoder(object):
"""
Encoder is used by the file sink to perform the actual writing of the
incoming elements to the files in a bucket.
"""
def __init__(self, j_encoder):
self._j_encoder = j_encoder
@staticmethod
def simple_string_encoder(charset_name: str = "UTF-8") -> 'Encoder':
"""
A simple Encoder that uses toString() on the input elements and writes them to
the output bucket file separated by newline.
"""
j_encoder = get_gateway().jvm.org.apache.flink.api.common.serialization.\
SimpleStringEncoder(charset_name)
return Encoder(j_encoder)
[docs]class BulkWriterFactory(JavaObjectWrapper):
"""
The Python wrapper of Java BulkWriter.Factory interface, which is the base interface for data
sinks that write records into files in a bulk manner.
"""
def __init__(self, j_bulk_writer_factory):
super().__init__(j_bulk_writer_factory)
[docs]class RowDataBulkWriterFactory(BulkWriterFactory):
"""
A :class:`~BulkWriterFactory` that receives records with RowData type. This is for indicating
that Row record from Python must be first converted to RowData.
"""
def __init__(self, j_bulk_writer_factory, row_type):
super().__init__(j_bulk_writer_factory)
self._row_type = row_type
def get_row_type(self):
return self._row_type