NiFi
This documentation is for an unreleased version of Apache Flink. We recommend you use the latest stable version.

Apache NiFi Connector #

The NiFi connector is deprecated and will be removed with Flink 1.16.

This connector provides a Source and Sink that can read from and write to Apache NiFi. To use this connector, add the following dependency to your project:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-nifi</artifactId>
    <version>1.16-SNAPSHOT</version>
</dependency>

Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.

Installing Apache NiFi #

Instructions for setting up a Apache NiFi cluster can be found here.

Apache NiFi Source #

The connector provides a Source for reading data from Apache NiFi to Apache Flink.

The class NiFiSource(…) provides 2 constructors for reading data from NiFi.

  • NiFiSource(SiteToSiteConfig config) - Constructs a NiFiSource(…) given the client’s SiteToSiteConfig and a default wait time of 1000 ms.

  • NiFiSource(SiteToSiteConfig config, long waitTimeMs) - Constructs a NiFiSource(…) given the client’s SiteToSiteConfig and the specified wait time (in milliseconds).

Example:

StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
        .url("http://localhost:8080/nifi")
        .portName("Data for Flink")
        .requestBatchCount(5)
        .buildConfig();

SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()

val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
       .url("http://localhost:8080/nifi")
       .portName("Data for Flink")
       .requestBatchCount(5)
       .buildConfig()

val nifiSource = new NiFiSource(clientConfig)       

Here data is read from the Apache NiFi Output Port called “Data for Flink” which is part of Apache NiFi Site-to-site protocol configuration.

Apache NiFi Sink #

The connector provides a Sink for writing data from Apache Flink to Apache NiFi.

The class NiFiSink(…) provides a constructor for instantiating a NiFiSink.

  • NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder<T>) constructs a NiFiSink(…) given the client’s SiteToSiteConfig and a NiFiDataPacketBuilder that converts data from Flink to NiFiDataPacket to be ingested by NiFi.

Example:

StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();

SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
        .url("http://localhost:8080/nifi")
        .portName("Data from Flink")
        .requestBatchCount(5)
        .buildConfig();

SinkFunction<NiFiDataPacket> nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder<T>() {...});

streamExecEnv.addSink(nifiSink);
val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment()

val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder()
       .url("http://localhost:8080/nifi")
       .portName("Data from Flink")
       .requestBatchCount(5)
       .buildConfig()

val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder<T>() {...})

streamExecEnv.addSink(nifiSink)

More information about Apache NiFi Site-to-Site Protocol can be found here

Back to top