Elasticsearch Connector
This connector provides a Sink that can write to an Elasticsearch Index. To use this connector, add the following dependency to your project:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch_2.10</artifactId>
<version>1.1.5</version>
</dependency>
Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
Installing Elasticsearch
Instructions for setting up an Elasticsearch cluster can be found here. Make sure to set and remember a cluster name. This must be set when creating a Sink for writing to your cluster
Elasticsearch Sink
The connector provides a Sink that can send data to an Elasticsearch Index.
The sink can use two different methods for communicating with Elasticsearch:
- An embedded Node
- The TransportClient
See here for information about the differences between the two modes.
This code shows how to create a sink that uses an embedded Node for communication:
DataStream<String> input = ...;
Map<String, String> config = Maps.newHashMap();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");
input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
@Override
public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}));
val input: DataStream[String] = ...
val config = new util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")
text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {
override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
val json = new util.HashMap[String, AnyRef]
json.put("data", element)
println("SENDING: " + element)
Requests.indexRequest.index("my-index").`type`("my-type").source(json)
}
}))
Note how a Map of Strings is used to configure the Sink. The configuration keys
are documented in the Elasticsearch documentation
here.
Especially important is the cluster.name
parameter that must correspond to
the name of your cluster.
Internally, the sink uses a BulkProcessor
to send index requests to the cluster.
This will buffer elements before sending a request to the cluster. The behaviour of the
BulkProcessor
can be configured using these config keys:
* bulk.flush.max.actions: Maximum amount of elements to buffer
* bulk.flush.max.size.mb: Maximum amount of data (in megabytes) to buffer
* bulk.flush.interval.ms: Interval at which to flush data regardless of the other two
settings in milliseconds
This example code does the same, but with a TransportClient
:
DataStream<String> input = ...;
Map<String, String> config = Maps.newHashMap();
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");
List<TransportAddress> transports = new ArrayList<String>();
transports.add(new InetSocketTransportAddress("node-1", 9300));
transports.add(new InetSocketTransportAddress("node-2", 9300));
input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() {
@Override
public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}));
val input: DataStream[String] = ...
val config = new util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")
val transports = new ArrayList[String]
transports.add(new InetSocketTransportAddress("node-1", 9300))
transports.add(new InetSocketTransportAddress("node-2", 9300))
text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] {
override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
val json = new util.HashMap[String, AnyRef]
json.put("data", element)
println("SENDING: " + element)
Requests.indexRequest.index("my-index").`type`("my-type").source(json)
}
}))
The difference is that we now need to provide a list of Elasticsearch Nodes
to which the sink should connect using a TransportClient
.
More information about Elasticsearch can be found here.