Class FactoryUtil
- java.lang.Object
-
- org.apache.flink.table.factories.FactoryUtil
-
@PublicEvolving public final class FactoryUtil extends Object
Utility for working withFactory
s.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
FactoryUtil.CatalogFactoryHelper
Helper utility for validating all options for aCatalogFactory
.static class
FactoryUtil.CatalogStoreFactoryHelper
Helper utility for validating all options for aCatalogStoreFactory
.static class
FactoryUtil.DefaultCatalogContext
Default implementation ofCatalogFactory.Context
.static class
FactoryUtil.DefaultCatalogStoreContext
Default implementation ofCatalogStoreFactory.Context
.static class
FactoryUtil.DefaultDynamicTableContext
Default implementation ofDynamicTableFactory.Context
.static class
FactoryUtil.DefaultModuleContext
Default implementation ofModuleFactory.Context
.static class
FactoryUtil.FactoryHelper<F extends Factory>
Base helper utility for validating all options for aFactory
.static class
FactoryUtil.ModuleFactoryHelper
Helper utility for validating all options for aModuleFactory
.static class
FactoryUtil.TableFactoryHelper
Helper utility for discovering formats and validating all options for aDynamicTableFactory
.
-
Field Summary
Fields Modifier and Type Field Description static ConfigOption<String>
CONNECTOR
static ConfigOption<String>
FORMAT
static String
FORMAT_SUFFIX
Suffix for keys ofConfigOption
in case a connector requires multiple formats (e.g.static String
PLACEHOLDER_SYMBOL
The placeholder symbol to be used for keys of options which can be templated.static ConfigOption<Integer>
PROPERTY_VERSION
Describes the property version.static ConfigOption<Integer>
SINK_PARALLELISM
static ConfigOption<Duration>
SOURCE_IDLE_TIMEOUT
static ConfigOption<Integer>
SOURCE_PARALLELISM
static ConfigOption<List<String>>
SQL_GATEWAY_ENDPOINT_TYPE
static ConfigOption<String>
WATERMARK_ALIGNMENT_GROUP
static ConfigOption<Duration>
WATERMARK_ALIGNMENT_MAX_DRIFT
static ConfigOption<Duration>
WATERMARK_ALIGNMENT_UPDATE_INTERVAL
static ConfigOption<WatermarkEmitStrategy>
WATERMARK_EMIT_STRATEGY
static ConfigOption<String>
WORKFLOW_SCHEDULER_TYPE
-
Method Summary
All Methods Static Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static Optional<String>
checkWatermarkOptions(ReadableConfig conf)
Check watermark-related options and return error messages.static Catalog
createCatalog(String catalogName, Map<String,String> options, ReadableConfig configuration, ClassLoader classLoader)
Attempts to discover an appropriate catalog factory and creates an instance of the catalog.static FactoryUtil.CatalogFactoryHelper
createCatalogFactoryHelper(CatalogFactory factory, CatalogFactory.Context context)
Creates a utility that helps validating options for aCatalogFactory
.static FactoryUtil.CatalogStoreFactoryHelper
createCatalogStoreFactoryHelper(CatalogStoreFactory factory, CatalogStoreFactory.Context context)
Creates a utility that helps validating options for aCatalogStoreFactory
.static DynamicTableSink
createDynamicTableSink(DynamicTableSinkFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, Map<String,String> enrichmentOptions, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
Creates aDynamicTableSink
from aCatalogTable
.static DynamicTableSink
createDynamicTableSink(DynamicTableSinkFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
static DynamicTableSource
createDynamicTableSource(DynamicTableSourceFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, Map<String,String> enrichmentOptions, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
Creates aDynamicTableSource
from aCatalogTable
.static DynamicTableSource
createDynamicTableSource(DynamicTableSourceFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
static Module
createModule(String moduleName, Map<String,String> options, ReadableConfig configuration, ClassLoader classLoader)
Discovers a matching module factory and creates an instance of it.static FactoryUtil.ModuleFactoryHelper
createModuleFactoryHelper(ModuleFactory factory, ModuleFactory.Context context)
Creates a utility that helps validating options for aModuleFactory
.static FactoryUtil.TableFactoryHelper
createTableFactoryHelper(DynamicTableFactory factory, DynamicTableFactory.Context context)
Creates a utility that helps in discovering formats, merging options withDynamicTableFactory.Context.getEnrichmentOptions()
and validating them all for aDynamicTableFactory
.static DynamicTableSink
createTableSink(Catalog catalog, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
static DynamicTableSource
createTableSource(Catalog catalog, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
static <T extends Factory>
TdiscoverFactory(ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier)
Discovers a factory using the given factory base class and identifier.static <T extends DynamicTableFactory>
Optional<T>getDynamicTableFactory(Class<T> factoryClass, Catalog catalog)
Returns theDynamicTableFactory
viaCatalog
.static String
getFormatPrefix(ConfigOption<String> formatOption, String formatIdentifier)
Returns the required option prefix for options of the given format.static String
stringifyOption(String key, String value)
static void
validateFactoryOptions(Set<ConfigOption<?>> requiredOptions, Set<ConfigOption<?>> optionalOptions, ReadableConfig options)
Validates the required options and optional options.static void
validateFactoryOptions(Factory factory, ReadableConfig options)
Validates the required and optionalConfigOption
s of a factory.static void
validateUnconsumedKeys(String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys)
Validates unconsumed option keys.static void
validateUnconsumedKeys(String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys, Set<String> deprecatedOptionKeys)
Validates unconsumed option keys.static void
validateWatermarkOptions(String factoryIdentifier, ReadableConfig conf)
Validate watermark options from table options.
-
-
-
Field Detail
-
PROPERTY_VERSION
public static final ConfigOption<Integer> PROPERTY_VERSION
Describes the property version. This can be used for backwards compatibility in case the property format changes.
-
CONNECTOR
public static final ConfigOption<String> CONNECTOR
-
FORMAT
public static final ConfigOption<String> FORMAT
-
SINK_PARALLELISM
public static final ConfigOption<Integer> SINK_PARALLELISM
-
SQL_GATEWAY_ENDPOINT_TYPE
public static final ConfigOption<List<String>> SQL_GATEWAY_ENDPOINT_TYPE
-
SOURCE_PARALLELISM
public static final ConfigOption<Integer> SOURCE_PARALLELISM
-
WATERMARK_EMIT_STRATEGY
public static final ConfigOption<WatermarkEmitStrategy> WATERMARK_EMIT_STRATEGY
-
WATERMARK_ALIGNMENT_GROUP
public static final ConfigOption<String> WATERMARK_ALIGNMENT_GROUP
-
WATERMARK_ALIGNMENT_MAX_DRIFT
public static final ConfigOption<Duration> WATERMARK_ALIGNMENT_MAX_DRIFT
-
WATERMARK_ALIGNMENT_UPDATE_INTERVAL
public static final ConfigOption<Duration> WATERMARK_ALIGNMENT_UPDATE_INTERVAL
-
SOURCE_IDLE_TIMEOUT
public static final ConfigOption<Duration> SOURCE_IDLE_TIMEOUT
-
WORKFLOW_SCHEDULER_TYPE
public static final ConfigOption<String> WORKFLOW_SCHEDULER_TYPE
-
FORMAT_SUFFIX
public static final String FORMAT_SUFFIX
Suffix for keys ofConfigOption
in case a connector requires multiple formats (e.g. for both key and value).See
createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
for more information.- See Also:
- Constant Field Values
-
PLACEHOLDER_SYMBOL
public static final String PLACEHOLDER_SYMBOL
The placeholder symbol to be used for keys of options which can be templated. SeeFactory
for details.- See Also:
- Constant Field Values
-
-
Method Detail
-
createDynamicTableSource
public static DynamicTableSource createDynamicTableSource(@Nullable DynamicTableSourceFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, Map<String,String> enrichmentOptions, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
Creates aDynamicTableSource
from aCatalogTable
.If {@param preferredFactory} is passed, the table source is created from that factory. Otherwise, an attempt is made to discover a matching factory using Java SPI (see
Factory
for details).
-
createDynamicTableSource
@Deprecated public static DynamicTableSource createDynamicTableSource(@Nullable DynamicTableSourceFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
-
createTableSource
@Deprecated public static DynamicTableSource createTableSource(@Nullable Catalog catalog, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
Deprecated.
-
createDynamicTableSink
public static DynamicTableSink createDynamicTableSink(@Nullable DynamicTableSinkFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, Map<String,String> enrichmentOptions, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
Creates aDynamicTableSink
from aCatalogTable
.If {@param preferredFactory} is passed, the table sink is created from that factory. Otherwise, an attempt is made to discover a matching factory using Java SPI (see
Factory
for details).
-
createDynamicTableSink
@Deprecated public static DynamicTableSink createDynamicTableSink(@Nullable DynamicTableSinkFactory preferredFactory, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
-
createTableSink
@Deprecated public static DynamicTableSink createTableSink(@Nullable Catalog catalog, ObjectIdentifier objectIdentifier, ResolvedCatalogTable catalogTable, ReadableConfig configuration, ClassLoader classLoader, boolean isTemporary)
Deprecated.
-
createCatalogFactoryHelper
public static FactoryUtil.CatalogFactoryHelper createCatalogFactoryHelper(CatalogFactory factory, CatalogFactory.Context context)
Creates a utility that helps validating options for aCatalogFactory
.Note: This utility checks for left-over options in the final step.
-
createCatalogStoreFactoryHelper
public static FactoryUtil.CatalogStoreFactoryHelper createCatalogStoreFactoryHelper(CatalogStoreFactory factory, CatalogStoreFactory.Context context)
Creates a utility that helps validating options for aCatalogStoreFactory
.Note: This utility checks for left-over options in the final step.
-
createModuleFactoryHelper
public static FactoryUtil.ModuleFactoryHelper createModuleFactoryHelper(ModuleFactory factory, ModuleFactory.Context context)
Creates a utility that helps validating options for aModuleFactory
.Note: This utility checks for left-over options in the final step.
-
createTableFactoryHelper
public static FactoryUtil.TableFactoryHelper createTableFactoryHelper(DynamicTableFactory factory, DynamicTableFactory.Context context)
Creates a utility that helps in discovering formats, merging options withDynamicTableFactory.Context.getEnrichmentOptions()
and validating them all for aDynamicTableFactory
.The following example sketches the usage:
// in createDynamicTableSource() helper = FactoryUtil.createTableFactoryHelper(this, context); keyFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT); valueFormat = helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT); helper.validate(); ... // construct connector with discovered formats
Note: The format option parameter of
FactoryUtil.TableFactoryHelper.discoverEncodingFormat(Class, ConfigOption)
andFactoryUtil.TableFactoryHelper.discoverDecodingFormat(Class, ConfigOption)
must beFORMAT
or end withFORMAT_SUFFIX
. The discovery logic will replace 'format' with the factory identifier value as the format prefix. For example, assuming the identifier is 'json', if the format option key is 'format', then the format prefix is 'json.'. If the format option key is 'value.format', then the format prefix is 'value.json'. The format prefix is used to project the options for the format factory.Note: When created, this utility merges the options from
DynamicTableFactory.Context.getEnrichmentOptions()
usingDynamicTableFactory.forwardOptions()
. When invokingFactoryUtil.FactoryHelper.validate()
, this utility checks for left-over options in the final step.
-
createCatalog
public static Catalog createCatalog(String catalogName, Map<String,String> options, ReadableConfig configuration, ClassLoader classLoader)
Attempts to discover an appropriate catalog factory and creates an instance of the catalog.This first uses the legacy
TableFactory
stack to discover a matchingCatalogFactory
. If none is found, it falls back to the new stack usingFactory
instead.
-
createModule
public static Module createModule(String moduleName, Map<String,String> options, ReadableConfig configuration, ClassLoader classLoader)
Discovers a matching module factory and creates an instance of it.This first uses the legacy
TableFactory
stack to discover a matchingModuleFactory
. If none is found, it falls back to the new stack usingFactory
instead.
-
discoverFactory
public static <T extends Factory> T discoverFactory(ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier)
Discovers a factory using the given factory base class and identifier.This method is meant for cases where
createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
createTableSource(Catalog, ObjectIdentifier, ResolvedCatalogTable, ReadableConfig, ClassLoader, boolean)
, andcreateTableSink(Catalog, ObjectIdentifier, ResolvedCatalogTable, ReadableConfig, ClassLoader, boolean)
are not applicable.
-
validateFactoryOptions
public static void validateFactoryOptions(Factory factory, ReadableConfig options)
Validates the required and optionalConfigOption
s of a factory.Note: It does not check for left-over options.
-
validateFactoryOptions
public static void validateFactoryOptions(Set<ConfigOption<?>> requiredOptions, Set<ConfigOption<?>> optionalOptions, ReadableConfig options)
Validates the required options and optional options.Note: It does not check for left-over options.
-
validateUnconsumedKeys
public static void validateUnconsumedKeys(String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys, Set<String> deprecatedOptionKeys)
Validates unconsumed option keys.
-
validateUnconsumedKeys
public static void validateUnconsumedKeys(String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys)
Validates unconsumed option keys.
-
getFormatPrefix
public static String getFormatPrefix(ConfigOption<String> formatOption, String formatIdentifier)
Returns the required option prefix for options of the given format.
-
getDynamicTableFactory
public static <T extends DynamicTableFactory> Optional<T> getDynamicTableFactory(Class<T> factoryClass, @Nullable Catalog catalog)
Returns theDynamicTableFactory
viaCatalog
.
-
validateWatermarkOptions
public static void validateWatermarkOptions(String factoryIdentifier, ReadableConfig conf)
Validate watermark options from table options.- Parameters:
factoryIdentifier
- identifier of tableconf
- table options
-
checkWatermarkOptions
public static Optional<String> checkWatermarkOptions(ReadableConfig conf)
Check watermark-related options and return error messages.- Parameters:
conf
- table options- Returns:
- Optional of error messages
-
-