Source code for pyflink.table.sources
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.java_gateway import get_gateway
from pyflink.table.types import DataType, _to_java_type
from pyflink.util import utils
__all__ = ['TableSource', 'CsvTableSource']
[docs]class TableSource(object):
"""
Defines a table from an external system or location.
"""
def __init__(self, j_table_source):
self._j_table_source = j_table_source
[docs]class CsvTableSource(TableSource):
"""
A :class:`TableSource` for simple CSV files with a
(logically) unlimited number of fields.
:param source_path: The path to the CSV file.
:param field_names: The names of the table fields.
:param field_types: The types of the table fields.
"""
def __init__(self, source_path, field_names, field_types):
# type: (str, list[str], list[DataType]) -> None
gateway = get_gateway()
j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(field_type)
for field_type in field_types])
super(CsvTableSource, self).__init__(
gateway.jvm.CsvTableSource(source_path, j_field_names, j_field_types))