diff --git a/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java b/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java index 78edc2a1..63eb1b7f 100644 --- a/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java +++ b/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java @@ -117,9 +117,9 @@ public static Builder builder() { private final Config config; private final CyclicBarrier barrier; private ExecutorService executorService; - private volatile boolean interrupt; + private volatile boolean interrupted; - private LoadGenerator(Config config) { + LoadGenerator(Config config) { this.config = config; this.barrier = new CyclicBarrier(config.threads); addBean(config); @@ -141,7 +141,7 @@ private CompletableFuture spawn() { @Override protected void doStart() throws Exception { executorService = Executors.newCachedThreadPool(); - interrupt = false; + interrupted = false; super.doStart(); } @@ -208,7 +208,11 @@ public void interrupt() { if (LOGGER.isDebugEnabled()) { LOGGER.debug("interrupting {}", this); } - interrupt = true; + interrupted = true; + } + + boolean isInterrupted() { + return interrupted; } private CompletableFuture process() { @@ -320,7 +324,7 @@ private CompletableFuture process() { break send; } - if (interrupt) { + if (isInterrupted()) { throw new InterruptedException("sender thread interrupted"); } @@ -336,8 +340,22 @@ private CompletableFuture process() { anyFailure.completeExceptionally(x); } - CompletableFuture allResults = CompletableFuture.allOf(allPromises.toArray(CompletableFuture[]::new)); - return CompletableFuture.anyOf(allResults, anyFailure) + // If there was a failure, fail all the resource tree promises. + anyFailure.whenComplete((r, x) -> { + if (x != null) { + allPromises.forEach(p -> p.completeExceptionally(x)); + } + }); + + return CompletableFuture.allOf(allPromises.toArray(CompletableFuture[]::new)) + .whenComplete((r, x) -> { + // When the resource tree promises are complete, + // try to succeed anyFailure, if was not already failed. + anyFailure.complete(null); + }) + // FlatMap anyFailure with the promises, so that if all the + // promises have succeeded, the failure is reported anyway. + .thenCompose(y -> anyFailure) .whenComplete((r, x) -> { if (LOGGER.isDebugEnabled()) { if (x == null) { @@ -353,9 +371,7 @@ private CompletableFuture process() { LOGGER.debug("stopping http clients"); } Arrays.stream(clients).forEach(this::stopHttpClient); - }, executorService) - // Just to please the compiler and convert back to CompletableFuture. - .thenRun(() -> {}); + }, executorService); } protected HttpClient newHttpClient(Config config) { @@ -920,7 +936,7 @@ private Resource asResource(Map map) { return new Resource("/"); } Resource result = new Resource(); - result.fromJSON((Map)obj); + result.fromJSON((Map)obj); return result; } diff --git a/jetty-load-generator-client/src/test/java/org/mortbay/jetty/load/generator/LoadGeneratorTest.java b/jetty-load-generator-client/src/test/java/org/mortbay/jetty/load/generator/LoadGeneratorTest.java index 47eb2288..f492d541 100644 --- a/jetty-load-generator-client/src/test/java/org/mortbay/jetty/load/generator/LoadGeneratorTest.java +++ b/jetty-load-generator-client/src/test/java/org/mortbay/jetty/load/generator/LoadGeneratorTest.java @@ -194,6 +194,42 @@ public void testInterrupt() throws Exception { }).get(5, TimeUnit.SECONDS); } + @Test + public void testInterruptAfterResourceComplete() throws Exception { + startServer(new AbstractHandler() { + @Override + public void handle(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) { + jettyRequest.setHandled(true); + } + }); + + LoadGenerator.Builder config = new LoadGenerator.Builder() + .port(connector.getLocalPort()) + .httpClientTransportBuilder(clientTransportBuilder) + .iterationsPerThread(0) + .resourceListener((Resource.NodeListener)info -> { + info.getLoadGenerator().interrupt(); + }); + LoadGenerator loadGenerator = new LoadGenerator(config) { + @Override + boolean isInterrupted() { + try { + Thread.sleep(1000); + return super.isInterrupted(); + } catch (InterruptedException x) { + return true; + } + } + }; + + CompletableFuture cf = loadGenerator.begin(); + + cf.handle((r, x) -> { + Assert.assertNotNull(x); + return r; + }).get(5, TimeUnit.SECONDS); + } + @Test public void testRunFor() throws Exception { startServer(new TestHandler()); @@ -352,8 +388,6 @@ public void testJMX() throws Exception { mbeanContainer.getMBeanServer().invoke(objectName, "interrupt", new Object[0], new String[0]); cf.handle((r, x) -> { - if (x == null) - Thread.dumpStack(); // Load generation was interrupted. Assert.assertNotNull(x); Throwable cause = x.getCause(); @@ -572,6 +606,7 @@ public void testServerSlowOnFirstIterationFastOnLastIteration() throws Exception int resourceRate = 3; startServer(new AbstractHandler() { private final AtomicInteger requests = new AtomicInteger(); + @Override public void handle(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { jettyRequest.setHandled(true);