This documentation is for an out-of-date version of Apache Flink. We recommend you use the latest stable version.


Catalogs provide metadata, such as databases, tables, partitions, views, and functions and information needed to access data stored in a database or other external systems.

One of the most crucial aspects of data processing is managing metadata. It may be transient metadata like temporary tables, or UDFs registered against the table environment. Or permanent metadata, like that in a Hive Metastore. Catalogs provide a unified API for managing metadata and making it accessible from the Table API and SQL Queries.

Catalog Types


The GenericInMemoryCatalog is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session.


The HiveCatalog serves two purposes; as persistent storage for pure Flink metadata, and as an interface for reading and writing existing Hive metadata. Flink’s Hive documentation provides full details on setting up the catalog and interfacing with an existing Hive installation.

Warning The Hive Metastore stores all meta-object names in lower case. This is unlike GenericInMemoryCatalog which is case-sensitive

User-Defined Catalog

Catalogs are pluggable and users can develop custom catalogs by implementing the Catalog interface. To use custom catalogs in SQL CLI, users should develop both a catalog and its corresponding catalog factory by implementing the CatalogFactory interface.

The catalog factory defines a set of properties for configuring the catalog when the SQL CLI bootstraps. The set of properties will be passed to a discovery service where the service tries to match the properties to a CatalogFactory and initiate a corresponding catalog instance.


Users can use SQL DDL to create tables in catalogs in both Table API and SQL.

For Table API:

TableEnvironment tableEnv = ...

// Create a HiveCatalog 
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");

// Register the catalog
tableEnv.registerCatalog("myhive", catalog);

// Create a catalog database
tableEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)");

// Create a catalog table
tableEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)");

tableEnv.listTables(); // should return the tables in current catalog and database.

For SQL Client:

// the catalog should have been registered via yaml file

Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);


For detailed information, please check out Flink SQL CREATE DDL.

Using Java/Scala/Python API

Users can use Java, Scala, or Python API to create catalog tables programmatically.

TableEnvironment tableEnv = ...

// Create a HiveCatalog 
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");

// Register the catalog
tableEnv.registerCatalog("myhive", catalog);

// Create a catalog database 
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))

// Create a catalog table
TableSchema schema = TableSchema.builder()
    .field("name", DataTypes.STRING())
    .field("age", DataTypes.INT())

        new ObjectPath("mydb", "mytable"), 
        new CatalogTableImpl(
            new Kafka()
            "my comment"
List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"

Catalog API

Note: only catalog program APIs are listed here. Users can achieve many of the same funtionalities with SQL DDL. For detailed DDL information, please refer to SQL CREATE DDL.

Database operations

// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);

// drop database
catalog.dropDatabase("mydb", false);

// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);

// get databse

// check if a database exist

// list databases in a catalog

Table operations

// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");

// get table

// check if a table exist or not

// list tables in a database

View operations

// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view");

// get view

// check if a view exist or not

// list views in a database

Partition operations

// create view
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),

// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);

// alter partition
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),

// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));

// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table by expression filter
catalog.listPartitions(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));

Function operations

// create function
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);

// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// get function

// check if a function exist or not

// list functions in a database

Table API and SQL for Catalog

Registering a Catalog

Users have access to a default in-memory catalog named default_catalog, that is always created by default. This catalog by default has a single database called default_database. Users can also register additional catalogs into an existing Flink session.

tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

All catalogs defined using YAML must provide a type property that specifies the type of catalog. The following types are supported out of the box.

Catalog Type Value
GenericInMemory generic_in_memory
Hive hive
   - name: myCatalog
     type: custom_catalog
     hive-conf-dir: ...

Changing the Current Catalog And Database

Flink will always search for tables, views, and UDF’s in the current catalog and database.

Flink SQL> USE CATALOG myCatalog;
Flink SQL> USE myDB;

Metadata from catalogs that are not the current catalog are accessible by providing fully qualified names in the form catalog.database.object.

Flink SQL> SELECT * FROM not_the_current_catalog.not_the_current_db.my_table;

List Available Catalogs

Flink SQL> show catalogs;

List Available Databases

Flink SQL> show databases;

List Available Tables

Flink SQL> show tables;