Client-Side HTTP/2 (Preview)

Warning

Client-Side HTTP/2 support in pekko-http is currently available as a preview. This means it is ready to be evaluated, but the APIs and behavior are likely to change.

Note

It is recommended to first read the Implications of the streaming nature of Request/Response Entities and Host-Level Client-Side API sections, as they explain the underlying full-stack streaming concepts, tuning the client settings and HTTPS context and how to handle the Request-Response Cycle, which may be unexpected when coming from a background with non-“streaming first” HTTP Clients.

Create the client

There are three mechanisms for a client to establish an HTTP/2 connection. Apache Pekko HTTP supports:

  • HTTP/2 over TLS
  • HTTP/2 over a plain TCP connection (“h2c with prior knowledge”)

Apache Pekko HTTP doesn’t support:

  • HTTP Upgrade mechanism

HTTP/2 over TLS

To create a client, use the Http() fluent API to connect and use the http2() creator:

Scala
sourceimport org.apache.pekko.http.scaladsl.Http

Http().connectionTo("localhost").toPort(8443).http2()
Java
sourceimport org.apache.pekko.http.javadsl.Http;

Http.get(system).connectionTo("127.0.0.1").toPort(8443).http2();

HTTP/2 over TLS needs Application-Layer Protocol Negotiation (ALPN) to negotiate whether both client and server support HTTP/2. The JVM provides ALPN support starting from JDK 8u252. Make sure to use at least that version.

Apache Pekko HTTP does not currently support protocol negotiation to fall back to HTTP/1.1 for this API. When the server does not support HTTP/2, the stream will fail.

h2c with prior knowledge

The other option is to connect and start communicating in HTTP/2 immediately. You must know beforehand the target server supports HTTP/2 over a plain TCP connection. For this reason this approach is known as h2c with Prior Knowledge of HTTP/2 support.

To create a client, use the Http() fluent API to connect and use the http2WithPriorKnowledge() creator:

Scala
sourceimport org.apache.pekko.http.scaladsl.Http

Http().connectionTo("localhost").toPort(8080).http2WithPriorKnowledge()
Java
sourceimport org.apache.pekko.http.javadsl.Http;

Http.get(system).connectionTo("127.0.0.1").toPort(8080).http2WithPriorKnowledge();

HTTP Upgrade mechanism

The Apache Pekko HTTP client doesn’t support HTTP/1 to HTTP/2 negotiation over plaintext using the Upgrade mechanism.

Request-response ordering

For HTTP/2 connections the responses are not guaranteed to arrive in the same order that the requests were emitted to the server, for example a request with a quickly available response may outrun a previous request that the server is slower to respond to. For HTTP/2 it is therefore important to have a way to correlate the response with the request it was made for. This can be achieved through a RequestResponseAssociationRequestResponseAssociation set on the request, Apache Pekko HTTP will pass such association objects on to the response.

In this sample the built-in org.apache.pekko.http.scaladsl.model.ResponsePromiseorg.apache.pekko.http.javadsl.model.ResponseFuture RequestResponseAssociation is used to return a FutureCompletionStage for the response:

Scala
sourceval dispatch = singleRequest(Http().connectionTo("pekko.apache.org").http2())

dispatch(
  HttpRequest(
    uri = "https://pekko.apache.org/api/pekko/current/org/apache/pekko/actor/typed/scaladsl/index.html",
    headers = headers.`Accept-Encoding`(HttpEncodings.gzip) :: Nil)).onComplete { res =>
  println(s"[1] Got index.html: $res")
  res.get.entity.dataBytes.runWith(Sink.ignore).onComplete(res => println(s"Finished reading [1] $res"))
}

def singleRequest(
    connection: Flow[HttpRequest, HttpResponse, Any], bufferSize: Int = 100): HttpRequest => Future[HttpResponse] = {
  val queue =
    Source.queue(bufferSize)
      .via(connection)
      .to(Sink.foreach { response =>
        // complete the response promise with the response when it arrives
        val responseAssociation = response.attribute(ResponsePromise.Key).get
        responseAssociation.promise.trySuccess(response)
      })
      .run()

  req => {
    // create a promise of the response for each request and set it as an attribute on the request
    val p = Promise[HttpResponse]()
    queue.offer(req.addAttribute(ResponsePromise.Key, ResponsePromise(p))) match {
      // return the future response
      case QueueOfferResult.Enqueued    => p.future
      case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed."))
      case QueueOfferResult.Failure(ex) => Future.failed(ex)
      case QueueOfferResult.QueueClosed => Future.failed(
          new RuntimeException("Queue was closed (pool shut down)."))
    }
  }
}
Java
source  Function<HttpRequest, CompletionStage<HttpResponse>> dispatch =
      singleRequest(system, Http.get(system).connectionTo("pekko.apache.org").http2());

  dispatch
      .apply(
          HttpRequest.create(
                  "https://pekko.apache.org/api/pekko/current/org/apache/pekko/actor/typed/scaladsl/index.html")
              .withHeaders(Arrays.asList(AcceptEncoding.create(HttpEncodings.GZIP))))
      .thenAccept(
          res -> {
            System.out.println("[1] Got index.html: " + res);
            res.entity()
                .getDataBytes()
                .runWith(Sink.ignore(), mat)
                .thenAccept(
                    consumedRes -> System.out.println("Finished reading [1] " + consumedRes));
          });

private static Function<HttpRequest, CompletionStage<HttpResponse>> singleRequest(
    ActorSystem system, Flow<HttpRequest, HttpResponse, ?> connection) {
  BoundedSourceQueue<HttpRequest> queue =
      Source.<HttpRequest>queue(100)
          .via(connection)
          .to(
              Sink.foreach(
                  res -> {
                    try {
                      // complete the future with the response when it arrives
                      ResponseFuture responseFuture =
                          res.getAttribute(ResponseFuture.KEY()).get();
                      responseFuture.future().complete(res);
                    } catch (Exception ex) {
                      ex.printStackTrace();
                    }
                  }))
          .run(SystemMaterializer.get(system).materializer());

  return (HttpRequest req) -> {
    // create a future of the response for each request and set it as an attribute on the request
    CompletableFuture<HttpResponse> future = new CompletableFuture<>();
    ResponseFuture attribute = new ResponseFuture(future);
    queue.offer(req.addAttribute(ResponseFuture.KEY(), attribute));
    // return the future response
    return attribute.future();
  };
}