Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ public H2AsyncRequester(
final TlsStrategy tlsStrategy,
final Timeout handshakeTimeout,
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
final IOWorkerSelector workerSelector,
final int maxPendingCommandsPerConnection) {
super(ioReactorConfig, eventHandlerFactory, ioSessionDecorator, exceptionCallback, sessionListener, connPool,
tlsStrategy, handshakeTimeout, threadPoolListener, workerSelector);
tlsStrategy, handshakeTimeout, threadPoolListener, workerSelector, maxPendingCommandsPerConnection);
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.Cancellable;
Expand Down Expand Up @@ -87,6 +88,12 @@ public class H2MultiplexingRequester extends AsyncRequester {

private final H2ConnPool connPool;

/**
* Hard cap on per-connection queued / in-flight requests.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturobernalg Let's use the term "command" instead of "request" here to make sure there is no confusion with HTTP requests.

* {@code <= 0} disables the cap.
*/
private final int maxRequestsPerConnection;

/**
* Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class.
*/
Expand All @@ -100,11 +107,13 @@ public H2MultiplexingRequester(
final Resolver<HttpHost, InetSocketAddress> addressResolver,
final TlsStrategy tlsStrategy,
final IOReactorMetricsListener threadPoolListener,
final IOWorkerSelector workerSelector) {
final IOWorkerSelector workerSelector,
final int maxRequestsPerConnection) {
super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener,
ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE,
threadPoolListener, workerSelector);
this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy);
this.maxRequestsPerConnection = maxRequestsPerConnection;
}

public void closeIdle(final TimeValue idleTime) {
Expand Down Expand Up @@ -245,6 +254,16 @@ public void failed(final Exception cause) {
}

};
final int max = maxRequestsPerConnection;
if (max > 0) {
final int current = ioSession.getPendingCommandCount();
if (current >= 0 && current >= max) {
exchangeHandler.failed(new RejectedExecutionException(
"Maximum number of pending requests per connection reached (max=" + max + ")"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturobernalg Same as above. Let say "command" instead of "request" here

exchangeHandler.releaseResources();
return;
}
}
final Timeout socketTimeout = ioSession.getSocketTimeout();
ioSession.enqueue(new RequestExecutionCommand(
handlerProxy,
Expand Down Expand Up @@ -349,5 +368,4 @@ public final <T> Future<T> execute(
public H2ConnPool getConnPool() {
return connPool;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.function.Supplier;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap {

private IOReactorMetricsListener threadPoolListener;

private int maxRequestsPerConnection;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private H2MultiplexingRequesterBootstrap() {
this.routeEntries = new ArrayList<>();
}
Expand Down Expand Up @@ -180,6 +183,23 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final
return this;
}

/**
* Sets a hard limit on the number of pending request execution commands that can be queued per connection.
* When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
* A value {@code <= 0} disables the limit (default).
* Note: this limit applies to commands waiting in the connection's internal queue (backlog). HTTP/2 in-flight
* concurrency is governed separately by protocol settings (e.g. MAX_CONCURRENT_STREAMS).
*
* @param max maximum number of pending requests per connection; {@code <= 0} to disable the limit.
* @return this instance.
* @since 5.5
*/
@Experimental
public final H2MultiplexingRequesterBootstrap setMaxRequestsPerConnection(final int max) {
this.maxRequestsPerConnection = max;
return this;
}

/**
* Sets {@link H2StreamListener} instance.
*
Expand Down Expand Up @@ -274,7 +294,8 @@ public H2MultiplexingRequester create() {
DefaultAddressResolver.INSTANCE,
tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(),
threadPoolListener,
null);
null,
maxRequestsPerConnection);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class H2RequesterBootstrap {
private ConnPoolListener<HttpHost> connPoolListener;
private IOReactorMetricsListener threadPoolListener;
private FrameFactory frameFactory;
private int maxPendingCommandsPerConnection;


private H2RequesterBootstrap() {
Expand Down Expand Up @@ -210,6 +211,11 @@ public final H2RequesterBootstrap setPoolConcurrencyPolicy(final PoolConcurrency
return this;
}

public final H2RequesterBootstrap setMaxPendingCommandsPerConnection(final int maxPendingCommandsPerConnection) {
this.maxPendingCommandsPerConnection = maxPendingCommandsPerConnection;
return this;
}

/**
* Sets {@link TlsStrategy} instance.
*
Expand Down Expand Up @@ -433,7 +439,8 @@ public H2AsyncRequester create() {
actualTlsStrategy,
handshakeTimeout,
threadPoolListener,
null);
null,
maxPendingCommandsPerConnection);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arturobernalg Let's drop all examples and integration tests. They look heavy and require a specially crafted server side listener. I wish we had unit test coverage for HttpAsyncRequester / H2MultiplexingRequester but we do not.

This functionality can have integration tests and examples in client along with other client request per connection policy configuration.

* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.http2.examples;

import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer;
import org.apache.hc.core5.http.nio.AsyncRequestConsumer;
import org.apache.hc.core5.http.nio.AsyncServerRequestHandler;
import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer;
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder;
import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap;
import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apache.hc.core5.util.Timeout;

/**
* Local integration example that exercises {@code H2MultiplexingRequesterBootstrap#setMaxRequestsPerConnection(int)}.
* <p>
* The example starts a local HTTP/2 server and a single-connection HTTP/2 requester. The server responds with a fixed
* delay to keep streams busy and make the client build up a backlog of request execution commands on the connection.
* With {@code maxRequestsPerConnection} set to a small value, submissions beyond the configured cap fail fast with
* {@link java.util.concurrent.RejectedExecutionException}. This demonstrates a per-connection hard cap on queued
* (pending) requests using the {@link org.apache.hc.core5.reactor.IOSession} command queue, not a separate client-side
* queue.
* <p>
* Note this cap limits the number of pending execution commands associated with a single connection. Protocol-level
* concurrency (in-flight streams) is still governed by HTTP/2 settings (for example {@code MAX_CONCURRENT_STREAMS})
* and server behaviour.
*
* @since 5.5
*/
public final class H2MaxRequestsPerConnectionLocalExample {

public static void main(final String[] args) throws Exception {
final int maxPerConn = 2; // keep small
final int totalRequests = 50; // make larger
final Timeout timeout = Timeout.ofSeconds(30);

final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setIoThreadCount(1)
.build();

final H2Config serverH2Config = H2Config.custom()
.setPushEnabled(false)
.build();

final HttpAsyncServer server = H2ServerBootstrap.bootstrap()
.setIOReactorConfig(ioReactorConfig)
.setH2Config(serverH2Config)
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
.setCanonicalHostName("127.0.0.1") // avoids 421
.register("*", new AsyncServerRequestHandler<Message<HttpRequest, Void>>() {

@Override
public AsyncRequestConsumer<Message<HttpRequest, Void>> prepare(
final HttpRequest request,
final EntityDetails entityDetails,
final HttpContext context) {
return new BasicRequestConsumer<>(
entityDetails != null ? new DiscardingEntityConsumer<>() : null);
}

@Override
public void handle(
final Message<HttpRequest, Void> message,
final ResponseTrigger responseTrigger,
final HttpContext context) {

final String path = message.getHead().getPath();
System.out.println("server accepted " + path + " (reply in 2s)");

scheduler.schedule(() -> {
try {
responseTrigger.submitResponse(
AsyncResponseBuilder.create(200)
.setEntity("ok\n", ContentType.TEXT_PLAIN)
.build(),
context);
} catch (final Exception ex) {
try {
responseTrigger.submitResponse(
AsyncResponseBuilder.create(500)
.setEntity(ex.toString(), ContentType.TEXT_PLAIN)
.build(),
context);
} catch (final Exception ignore) {
// ignore
}
}
}, 2, TimeUnit.SECONDS);
}

})
.create();

server.start();
final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get();
final int port = ((InetSocketAddress) ep.getAddress()).getPort();
System.out.println("server on 127.0.0.1:" + port);

final H2Config clientH2Config = H2Config.custom()
.setPushEnabled(false)
.build();

final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap()
.setIOReactorConfig(ioReactorConfig)
.setH2Config(clientH2Config)
.setMaxRequestsPerConnection(maxPerConn)
.create();

requester.start();

final HttpHost target = new HttpHost("http", "127.0.0.1", port);

// Warmup (establish connection)
requester.execute(
target,
AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
timeout,
HttpCoreContext.create(),
new FutureCallback<Message<HttpResponse, String>>() {
@Override
public void completed(final Message<HttpResponse, String> result) {
System.out.println("warmup -> " + result.getHead().getCode());
}

@Override
public void failed(final Exception ex) {
System.out.println("warmup failed -> " + ex.getClass().getName() + ": " + ex.getMessage());
}

@Override
public void cancelled() {
System.out.println("warmup cancelled");
}
}).get();

final AtomicInteger ok = new AtomicInteger(0);
final AtomicInteger rejected = new AtomicInteger(0);
final AtomicInteger failed = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(totalRequests);

final ExecutorService exec = Executors.newFixedThreadPool(16);

for (int i = 0; i < totalRequests; i++) {
final int id = i;
exec.execute(() -> {
final String path = "/slow?i=" + id;
requester.execute(
target,
AsyncRequestBuilder.get().setHttpHost(target).setPath(path).build(),
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
timeout,
HttpCoreContext.create(),
new FutureCallback<Message<HttpResponse, String>>() {

@Override
public void completed(final Message<HttpResponse, String> message) {
ok.incrementAndGet();
latch.countDown();
}

@Override
public void failed(final Exception ex) {
if (ex instanceof RejectedExecutionException) {
rejected.incrementAndGet();
} else {
failed.incrementAndGet();
}
latch.countDown();
}

@Override
public void cancelled() {
failed.incrementAndGet();
latch.countDown();
}
});
});
}

final boolean done = latch.await(60, TimeUnit.SECONDS);
System.out.println("done=" + done + " ok=" + ok.get() + ", rejected=" + rejected.get() + ", failed=" + failed.get());

exec.shutdownNow();

requester.close(CloseMode.GRACEFUL);
server.close(CloseMode.GRACEFUL);
scheduler.shutdownNow();
}
}
Loading