Ctrl+K
Logo image Logo image

Site Navigation

  • API Reference
  • Examples

Site Navigation

  • API Reference
  • Examples

Section Navigation

  • PyFlink Table
    • TableEnvironment
    • Table
    • Data Types
    • Window
    • Expressions
    • User Defined Functions
    • Descriptors
    • StatementSet
    • Catalog
  • PyFlink DataStream
  • PyFlink Common

pyflink.table.Table.execute_insert#

Table.execute_insert(table_path_or_descriptor: Union[str, pyflink.table.table_descriptor.TableDescriptor], overwrite: bool = False) → pyflink.table.table_result.TableResult[source]#
  1. When target_path_or_descriptor is a tale path:

    Writes the Table to a TableSink that was registered under the specified name, and then execute the insert operation. For the path resolution algorithm see use_database().

    Example:

    >>> tab.execute_insert("sink")
    
  2. When target_path_or_descriptor is a table descriptor:

    Declares that the pipeline defined by the given Table object should be written to a table (backed by a DynamicTableSink) expressed via the given TableDescriptor. It executes the insert operation.

    TableDescriptor is registered as an inline (i.e. anonymous) temporary catalog table (see create_temporary_table()) using a unique identifier. Note that calling this method multiple times, even with the same descriptor, results in multiple sink tables being registered.

    This method allows to declare a Schema for the sink descriptor. The declaration is similar to a {@code CREATE TABLE} DDL in SQL and allows to:

    1. overwrite automatically derived columns with a custom DataType

    2. add metadata columns next to the physical columns

    3. declare a primary key

    It is possible to declare a schema without physical/regular columns. In this case, those columns will be automatically derived and implicitly put at the beginning of the schema declaration.

    Examples:

    >>> schema = Schema.new_builder()
    ...      .column("f0", DataTypes.STRING())
    ...      .build()
    >>> table = table_env.from_descriptor(TableDescriptor.for_connector("datagen")
    ...      .schema(schema)
    ...      .build())
    >>> table.execute_insert(TableDescriptor.for_connector("blackhole")
    ...      .schema(schema)
    ...      .build())
    

    If multiple pipelines should insert data into one or more sink tables as part of a single execution, use a StatementSet (see create_statement_set()).

    By default, all insertion operations are executed asynchronously. Use await() or get_job_client() to monitor the execution.

    Note

    execute_insert for a table descriptor (case 2.) was added from flink 1.14.0.

Parameters
  • table_path_or_descriptor – The path of the registered TableSink or the descriptor describing the sink table into which data should be inserted to which the Table is written.

  • overwrite – Indicates whether the insert should overwrite existing data or not.

Returns

The table result.

New in version 1.11.0.

previous

pyflink.table.Table.execute

next

pyflink.table.Table.explain

Show Source

Created using Sphinx 4.5.0.