Source code for pyflink.datastream.formats.json
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.common import SerializationSchema, TypeInformation, typeinfo, DeserializationSchema
from pyflink.java_gateway import get_gateway
__all__ = [
'JsonRowDeserializationSchema',
'JsonRowSerializationSchema'
]
[docs]class JsonRowDeserializationSchema(DeserializationSchema):
"""
Deserialization schema from JSON to Flink types.
Deserializes a byte[] message as a JSON object and reads the specified fields.
Failures during deserialization are forwarded as wrapped IOExceptions.
"""
def __init__(self, j_deserialization_schema):
super(JsonRowDeserializationSchema, self).__init__(j_deserialization_schema)
@staticmethod
def builder():
"""
A static method to get a Builder for JsonRowDeserializationSchema.
"""
return JsonRowDeserializationSchema.Builder()
class Builder(object):
"""
Builder for JsonRowDeserializationSchema.
"""
def __init__(self):
self._type_info = None
self._fail_on_missing_field = False
self._ignore_parse_errors = False
def type_info(self, type_info: TypeInformation):
"""
Creates a JSON deserialization schema for the given type information.
:param type_info: Type information describing the result type. The field names of Row
are used to parse the JSON properties.
"""
self._type_info = type_info
return self
def json_schema(self, json_schema: str):
"""
Creates a JSON deserialization schema for the given JSON schema.
:param json_schema: JSON schema describing the result type.
"""
if json_schema is None:
raise TypeError("The json_schema must not be None.")
j_type_info = get_gateway().jvm \
.org.apache.flink.formats.json.JsonRowSchemaConverter.convert(json_schema)
self._type_info = typeinfo._from_java_type(j_type_info)
return self
def fail_on_missing_field(self):
"""
Configures schema to fail if a JSON field is missing. A missing field is ignored and the
field is set to null by default.
"""
self._fail_on_missing_field = True
return self
def ignore_parse_errors(self):
"""
Configures schema to fail when parsing json failed. An exception will be thrown when
parsing json fails.
"""
self._ignore_parse_errors = True
return self
def build(self):
JBuilder = get_gateway().jvm.org.apache.flink.formats.json.JsonRowDeserializationSchema\
.Builder
j_builder = JBuilder(self._type_info.get_java_type_info())
if self._fail_on_missing_field:
j_builder = j_builder.failOnMissingField()
if self._ignore_parse_errors:
j_builder = j_builder.ignoreParseErrors()
j_deserialization_schema = j_builder.build()
return JsonRowDeserializationSchema(j_deserialization_schema=j_deserialization_schema)
[docs]class JsonRowSerializationSchema(SerializationSchema):
"""
Serialization schema that serializes an object of Flink types into a JSON bytes. Serializes the
input Flink object into a JSON string and converts it into byte[].
Result byte[] message can be deserialized using JsonRowDeserializationSchema.
"""
def __init__(self, j_serialization_schema):
super(JsonRowSerializationSchema, self).__init__(j_serialization_schema)
@staticmethod
def builder():
return JsonRowSerializationSchema.Builder()
class Builder(object):
"""
Builder for JsonRowSerializationSchema.
"""
def __init__(self):
self._type_info = None
def with_type_info(self, type_info: TypeInformation):
"""
Creates a JSON serialization schema for the given type information.
:param type_info: Type information describing the result type. The field names of Row
are used to parse the JSON properties.
"""
self._type_info = type_info
return self
def build(self):
if self._type_info is None:
raise TypeError("Typeinfo should be set.")
j_builder = get_gateway().jvm \
.org.apache.flink.formats.json.JsonRowSerializationSchema.builder()
j_schema = j_builder.withTypeInfo(self._type_info.get_java_type_info()).build()
return JsonRowSerializationSchema(j_serialization_schema=j_schema)