From d5e9b4de55e352ac2193874e5a6d100ecb08c693 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 23 Mar 2021 13:24:46 +0100 Subject: [PATCH] Fixes #69 - LoadGenerator.interrupt() does not work reliably. Now always reporting failures in the CompletableFuture returned by begin(). Before, CompletableFuture.anyOf() could have returned either the failure, or the reduction of the resource promises, which may have succeeded. This was causing random failures in testJMX(). Signed-off-by: Simone Bordet --- .../jetty/load/generator/LoadGenerator.java | 38 ++++++++++++------ .../load/generator/LoadGeneratorTest.java | 39 ++++++++++++++++++- 2 files changed, 64 insertions(+), 13 deletions(-) diff --git a/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java b/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java index 78edc2a1..63eb1b7f 100644 --- a/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java +++ b/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java @@ -117,9 +117,9 @@ public static Builder builder() { private final Config config; private final CyclicBarrier barrier; private ExecutorService executorService; - private volatile boolean interrupt; + private volatile boolean interrupted; - private LoadGenerator(Config config) { + LoadGenerator(Config config) { this.config = config; this.barrier = new CyclicBarrier(config.threads); addBean(config); @@ -141,7 +141,7 @@ private CompletableFuture spawn() { @Override protected void doStart() throws Exception { executorService = Executors.newCachedThreadPool(); - interrupt = false; + interrupted = false; super.doStart(); } @@ -208,7 +208,11 @@ public void interrupt() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("interrupting {}", this); } - interrupt = true; + interrupted = true; + } + + boolean isInterrupted() { + return interrupted; } private CompletableFuture process() { @@ -320,7 +324,7 @@ private CompletableFuture process() { break send; } - if (interrupt) { + if (isInterrupted()) { throw new InterruptedException("sender thread interrupted"); } @@ -336,8 +340,22 @@ private CompletableFuture process() { anyFailure.completeExceptionally(x); } - CompletableFuture allResults = CompletableFuture.allOf(allPromises.toArray(CompletableFuture[]::new)); - return CompletableFuture.anyOf(allResults, anyFailure) + // If there was a failure, fail all the resource tree promises. + anyFailure.whenComplete((r, x) -> { + if (x != null) { + allPromises.forEach(p -> p.completeExceptionally(x)); + } + }); + + return CompletableFuture.allOf(allPromises.toArray(CompletableFuture[]::new)) + .whenComplete((r, x) -> { + // When the resource tree promises are complete, + // try to succeed anyFailure, if was not already failed. + anyFailure.complete(null); + }) + // FlatMap anyFailure with the promises, so that if all the + // promises have succeeded, the failure is reported anyway. + .thenCompose(y -> anyFailure) .whenComplete((r, x) -> { if (LOGGER.isDebugEnabled()) { if (x == null) { @@ -353,9 +371,7 @@ private CompletableFuture process() { LOGGER.debug("stopping http clients"); } Arrays.stream(clients).forEach(this::stopHttpClient); - }, executorService) - // Just to please the compiler and convert back to CompletableFuture. - .thenRun(() -> {}); + }, executorService); } protected HttpClient newHttpClient(Config config) { @@ -920,7 +936,7 @@ private Resource asResource(Map map) { return new Resource("/"); } Resource result = new Resource(); - result.fromJSON((Map)obj); + result.fromJSON((Map)obj); return result; } diff --git a/jetty-load-generator-client/src/test/java/org/mortbay/jetty/load/generator/LoadGeneratorTest.java b/jetty-load-generator-client/src/test/java/org/mortbay/jetty/load/generator/LoadGeneratorTest.java index 47eb2288..f492d541 100644 --- a/jetty-load-generator-client/src/test/java/org/mortbay/jetty/load/generator/LoadGeneratorTest.java +++ b/jetty-load-generator-client/src/test/java/org/mortbay/jetty/load/generator/LoadGeneratorTest.java @@ -194,6 +194,42 @@ public void testInterrupt() throws Exception { }).get(5, TimeUnit.SECONDS); } + @Test + public void testInterruptAfterResourceComplete() throws Exception { + startServer(new AbstractHandler() { + @Override + public void handle(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) { + jettyRequest.setHandled(true); + } + }); + + LoadGenerator.Builder config = new LoadGenerator.Builder() + .port(connector.getLocalPort()) + .httpClientTransportBuilder(clientTransportBuilder) + .iterationsPerThread(0) + .resourceListener((Resource.NodeListener)info -> { + info.getLoadGenerator().interrupt(); + }); + LoadGenerator loadGenerator = new LoadGenerator(config) { + @Override + boolean isInterrupted() { + try { + Thread.sleep(1000); + return super.isInterrupted(); + } catch (InterruptedException x) { + return true; + } + } + }; + + CompletableFuture cf = loadGenerator.begin(); + + cf.handle((r, x) -> { + Assert.assertNotNull(x); + return r; + }).get(5, TimeUnit.SECONDS); + } + @Test public void testRunFor() throws Exception { startServer(new TestHandler()); @@ -352,8 +388,6 @@ public void testJMX() throws Exception { mbeanContainer.getMBeanServer().invoke(objectName, "interrupt", new Object[0], new String[0]); cf.handle((r, x) -> { - if (x == null) - Thread.dumpStack(); // Load generation was interrupted. Assert.assertNotNull(x); Throwable cause = x.getCause(); @@ -572,6 +606,7 @@ public void testServerSlowOnFirstIterationFastOnLastIteration() throws Exception int resourceRate = 3; startServer(new AbstractHandler() { private final AtomicInteger requests = new AtomicInteger(); + @Override public void handle(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { jettyRequest.setHandled(true);