-
Notifications
You must be signed in to change notification settings - Fork 355
Cap total number of concurrent requests per HTTP/2 connection #592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |
| import java.util.List; | ||
| import java.util.Set; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
|
|
||
| import org.apache.hc.core5.annotation.Internal; | ||
| import org.apache.hc.core5.concurrent.Cancellable; | ||
|
|
@@ -87,6 +88,12 @@ public class H2MultiplexingRequester extends AsyncRequester { | |
|
|
||
| private final H2ConnPool connPool; | ||
|
|
||
| /** | ||
| * Hard cap on per-connection queued / in-flight requests. | ||
| * {@code <= 0} disables the cap. | ||
| */ | ||
| private final int maxRequestsPerConnection; | ||
|
|
||
| /** | ||
| * Use {@link H2MultiplexingRequesterBootstrap} to create instances of this class. | ||
| */ | ||
|
|
@@ -100,11 +107,13 @@ public H2MultiplexingRequester( | |
| final Resolver<HttpHost, InetSocketAddress> addressResolver, | ||
| final TlsStrategy tlsStrategy, | ||
| final IOReactorMetricsListener threadPoolListener, | ||
| final IOWorkerSelector workerSelector) { | ||
| final IOWorkerSelector workerSelector, | ||
| final int maxRequestsPerConnection) { | ||
| super(eventHandlerFactory, ioReactorConfig, ioSessionDecorator, exceptionCallback, sessionListener, | ||
| ShutdownCommand.GRACEFUL_IMMEDIATE_CALLBACK, DefaultAddressResolver.INSTANCE, | ||
| threadPoolListener, workerSelector); | ||
| this.connPool = new H2ConnPool(this, addressResolver, tlsStrategy); | ||
| this.maxRequestsPerConnection = maxRequestsPerConnection; | ||
| } | ||
|
|
||
| public void closeIdle(final TimeValue idleTime) { | ||
|
|
@@ -245,6 +254,16 @@ public void failed(final Exception cause) { | |
| } | ||
|
|
||
| }; | ||
| final int max = maxRequestsPerConnection; | ||
ok2c marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (max > 0) { | ||
| final int current = ioSession.getPendingCommandCount(); | ||
| if (current >= 0 && current >= max) { | ||
| exchangeHandler.failed(new RejectedExecutionException( | ||
| "Maximum number of pending requests per connection reached (max=" + max + ")")); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @arturobernalg Same as above. Let say "command" instead of "request" here |
||
| exchangeHandler.releaseResources(); | ||
| return; | ||
| } | ||
| } | ||
| final Timeout socketTimeout = ioSession.getSocketTimeout(); | ||
| ioSession.enqueue(new RequestExecutionCommand( | ||
| handlerProxy, | ||
|
|
@@ -349,5 +368,4 @@ public final <T> Future<T> execute( | |
| public H2ConnPool getConnPool() { | ||
| return connPool; | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ | |
| import java.util.ArrayList; | ||
| import java.util.List; | ||
|
|
||
| import org.apache.hc.core5.annotation.Experimental; | ||
| import org.apache.hc.core5.function.Callback; | ||
| import org.apache.hc.core5.function.Decorator; | ||
| import org.apache.hc.core5.function.Supplier; | ||
|
|
@@ -76,6 +77,8 @@ public class H2MultiplexingRequesterBootstrap { | |
|
|
||
| private IOReactorMetricsListener threadPoolListener; | ||
|
|
||
| private int maxRequestsPerConnection; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @arturobernalg Same. |
||
|
|
||
| private H2MultiplexingRequesterBootstrap() { | ||
| this.routeEntries = new ArrayList<>(); | ||
| } | ||
|
|
@@ -180,6 +183,23 @@ public final H2MultiplexingRequesterBootstrap setIOReactorMetricsListener(final | |
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets a hard limit on the number of pending request execution commands that can be queued per connection. | ||
| * When the limit is reached, new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}. | ||
| * A value {@code <= 0} disables the limit (default). | ||
| * Note: this limit applies to commands waiting in the connection's internal queue (backlog). HTTP/2 in-flight | ||
| * concurrency is governed separately by protocol settings (e.g. MAX_CONCURRENT_STREAMS). | ||
| * | ||
| * @param max maximum number of pending requests per connection; {@code <= 0} to disable the limit. | ||
| * @return this instance. | ||
| * @since 5.5 | ||
| */ | ||
| @Experimental | ||
| public final H2MultiplexingRequesterBootstrap setMaxRequestsPerConnection(final int max) { | ||
| this.maxRequestsPerConnection = max; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets {@link H2StreamListener} instance. | ||
| * | ||
|
|
@@ -274,7 +294,8 @@ public H2MultiplexingRequester create() { | |
| DefaultAddressResolver.INSTANCE, | ||
| tlsStrategy != null ? tlsStrategy : new H2ClientTlsStrategy(), | ||
| threadPoolListener, | ||
| null); | ||
| null, | ||
| maxRequestsPerConnection); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,245 @@ | ||
| /* | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @arturobernalg Let's drop all examples and integration tests. They look heavy and require a specially crafted server side listener. I wish we had unit test coverage for This functionality can have integration tests and examples in client along with other client request per connection policy configuration. |
||
| * ==================================================================== | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| * ==================================================================== | ||
| * | ||
| * This software consists of voluntary contributions made by many | ||
| * individuals on behalf of the Apache Software Foundation. For more | ||
| * information on the Apache Software Foundation, please see | ||
| * <http://www.apache.org/>. | ||
| * | ||
| */ | ||
| package org.apache.hc.core5.http2.examples; | ||
|
|
||
| import java.net.InetSocketAddress; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| import org.apache.hc.core5.concurrent.FutureCallback; | ||
| import org.apache.hc.core5.http.ContentType; | ||
| import org.apache.hc.core5.http.EntityDetails; | ||
| import org.apache.hc.core5.http.HttpHost; | ||
| import org.apache.hc.core5.http.HttpRequest; | ||
| import org.apache.hc.core5.http.HttpResponse; | ||
| import org.apache.hc.core5.http.Message; | ||
| import org.apache.hc.core5.http.URIScheme; | ||
| import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncServer; | ||
| import org.apache.hc.core5.http.nio.AsyncRequestConsumer; | ||
| import org.apache.hc.core5.http.nio.AsyncServerRequestHandler; | ||
| import org.apache.hc.core5.http.nio.entity.DiscardingEntityConsumer; | ||
| import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; | ||
| import org.apache.hc.core5.http.nio.support.AsyncRequestBuilder; | ||
| import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder; | ||
| import org.apache.hc.core5.http.nio.support.BasicRequestConsumer; | ||
| import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; | ||
| import org.apache.hc.core5.http.protocol.HttpContext; | ||
| import org.apache.hc.core5.http.protocol.HttpCoreContext; | ||
| import org.apache.hc.core5.http2.HttpVersionPolicy; | ||
| import org.apache.hc.core5.http2.config.H2Config; | ||
| import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; | ||
| import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; | ||
| import org.apache.hc.core5.http2.impl.nio.bootstrap.H2ServerBootstrap; | ||
| import org.apache.hc.core5.io.CloseMode; | ||
| import org.apache.hc.core5.reactor.IOReactorConfig; | ||
| import org.apache.hc.core5.reactor.ListenerEndpoint; | ||
| import org.apache.hc.core5.util.Timeout; | ||
|
|
||
| /** | ||
| * Local integration example that exercises {@code H2MultiplexingRequesterBootstrap#setMaxRequestsPerConnection(int)}. | ||
| * <p> | ||
| * The example starts a local HTTP/2 server and a single-connection HTTP/2 requester. The server responds with a fixed | ||
| * delay to keep streams busy and make the client build up a backlog of request execution commands on the connection. | ||
| * With {@code maxRequestsPerConnection} set to a small value, submissions beyond the configured cap fail fast with | ||
| * {@link java.util.concurrent.RejectedExecutionException}. This demonstrates a per-connection hard cap on queued | ||
| * (pending) requests using the {@link org.apache.hc.core5.reactor.IOSession} command queue, not a separate client-side | ||
| * queue. | ||
| * <p> | ||
| * Note this cap limits the number of pending execution commands associated with a single connection. Protocol-level | ||
| * concurrency (in-flight streams) is still governed by HTTP/2 settings (for example {@code MAX_CONCURRENT_STREAMS}) | ||
| * and server behaviour. | ||
| * | ||
| * @since 5.5 | ||
| */ | ||
| public final class H2MaxRequestsPerConnectionLocalExample { | ||
|
|
||
| public static void main(final String[] args) throws Exception { | ||
| final int maxPerConn = 2; // keep small | ||
| final int totalRequests = 50; // make larger | ||
| final Timeout timeout = Timeout.ofSeconds(30); | ||
|
|
||
| final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); | ||
|
|
||
| final IOReactorConfig ioReactorConfig = IOReactorConfig.custom() | ||
| .setIoThreadCount(1) | ||
| .build(); | ||
|
|
||
| final H2Config serverH2Config = H2Config.custom() | ||
| .setPushEnabled(false) | ||
| .build(); | ||
|
|
||
| final HttpAsyncServer server = H2ServerBootstrap.bootstrap() | ||
| .setIOReactorConfig(ioReactorConfig) | ||
| .setH2Config(serverH2Config) | ||
| .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) | ||
| .setCanonicalHostName("127.0.0.1") // avoids 421 | ||
| .register("*", new AsyncServerRequestHandler<Message<HttpRequest, Void>>() { | ||
|
|
||
| @Override | ||
| public AsyncRequestConsumer<Message<HttpRequest, Void>> prepare( | ||
| final HttpRequest request, | ||
| final EntityDetails entityDetails, | ||
| final HttpContext context) { | ||
| return new BasicRequestConsumer<>( | ||
| entityDetails != null ? new DiscardingEntityConsumer<>() : null); | ||
| } | ||
|
|
||
| @Override | ||
| public void handle( | ||
| final Message<HttpRequest, Void> message, | ||
| final ResponseTrigger responseTrigger, | ||
| final HttpContext context) { | ||
|
|
||
| final String path = message.getHead().getPath(); | ||
| System.out.println("server accepted " + path + " (reply in 2s)"); | ||
|
|
||
| scheduler.schedule(() -> { | ||
| try { | ||
| responseTrigger.submitResponse( | ||
| AsyncResponseBuilder.create(200) | ||
| .setEntity("ok\n", ContentType.TEXT_PLAIN) | ||
| .build(), | ||
| context); | ||
| } catch (final Exception ex) { | ||
| try { | ||
| responseTrigger.submitResponse( | ||
| AsyncResponseBuilder.create(500) | ||
| .setEntity(ex.toString(), ContentType.TEXT_PLAIN) | ||
| .build(), | ||
| context); | ||
| } catch (final Exception ignore) { | ||
| // ignore | ||
| } | ||
| } | ||
| }, 2, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| }) | ||
| .create(); | ||
|
|
||
| server.start(); | ||
| final ListenerEndpoint ep = server.listen(new InetSocketAddress("127.0.0.1", 0), URIScheme.HTTP).get(); | ||
| final int port = ((InetSocketAddress) ep.getAddress()).getPort(); | ||
| System.out.println("server on 127.0.0.1:" + port); | ||
|
|
||
| final H2Config clientH2Config = H2Config.custom() | ||
| .setPushEnabled(false) | ||
| .build(); | ||
|
|
||
| final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() | ||
| .setIOReactorConfig(ioReactorConfig) | ||
| .setH2Config(clientH2Config) | ||
| .setMaxRequestsPerConnection(maxPerConn) | ||
| .create(); | ||
|
|
||
| requester.start(); | ||
|
|
||
| final HttpHost target = new HttpHost("http", "127.0.0.1", port); | ||
|
|
||
| // Warmup (establish connection) | ||
| requester.execute( | ||
| target, | ||
| AsyncRequestBuilder.get().setHttpHost(target).setPath("/warmup").build(), | ||
| new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), | ||
| timeout, | ||
| HttpCoreContext.create(), | ||
| new FutureCallback<Message<HttpResponse, String>>() { | ||
| @Override | ||
| public void completed(final Message<HttpResponse, String> result) { | ||
| System.out.println("warmup -> " + result.getHead().getCode()); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(final Exception ex) { | ||
| System.out.println("warmup failed -> " + ex.getClass().getName() + ": " + ex.getMessage()); | ||
| } | ||
|
|
||
| @Override | ||
| public void cancelled() { | ||
| System.out.println("warmup cancelled"); | ||
| } | ||
| }).get(); | ||
|
|
||
| final AtomicInteger ok = new AtomicInteger(0); | ||
| final AtomicInteger rejected = new AtomicInteger(0); | ||
| final AtomicInteger failed = new AtomicInteger(0); | ||
| final CountDownLatch latch = new CountDownLatch(totalRequests); | ||
|
|
||
| final ExecutorService exec = Executors.newFixedThreadPool(16); | ||
|
|
||
| for (int i = 0; i < totalRequests; i++) { | ||
| final int id = i; | ||
| exec.execute(() -> { | ||
| final String path = "/slow?i=" + id; | ||
| requester.execute( | ||
| target, | ||
| AsyncRequestBuilder.get().setHttpHost(target).setPath(path).build(), | ||
| new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), | ||
| timeout, | ||
| HttpCoreContext.create(), | ||
| new FutureCallback<Message<HttpResponse, String>>() { | ||
|
|
||
| @Override | ||
| public void completed(final Message<HttpResponse, String> message) { | ||
| ok.incrementAndGet(); | ||
| latch.countDown(); | ||
| } | ||
|
|
||
| @Override | ||
| public void failed(final Exception ex) { | ||
| if (ex instanceof RejectedExecutionException) { | ||
| rejected.incrementAndGet(); | ||
| } else { | ||
| failed.incrementAndGet(); | ||
| } | ||
| latch.countDown(); | ||
| } | ||
|
|
||
| @Override | ||
| public void cancelled() { | ||
| failed.incrementAndGet(); | ||
| latch.countDown(); | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| final boolean done = latch.await(60, TimeUnit.SECONDS); | ||
| System.out.println("done=" + done + " ok=" + ok.get() + ", rejected=" + rejected.get() + ", failed=" + failed.get()); | ||
|
|
||
| exec.shutdownNow(); | ||
|
|
||
| requester.close(CloseMode.GRACEFUL); | ||
| server.close(CloseMode.GRACEFUL); | ||
| scheduler.shutdownNow(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arturobernalg Let's use the term "command" instead of "request" here to make sure there is no confusion with HTTP requests.