diff --git a/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java b/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java index 63eb1b7f..af7f0d0f 100644 --- a/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java +++ b/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; import org.eclipse.jetty.client.HttpClient; @@ -222,15 +223,27 @@ private CompletableFuture process() { // The method returns a CompletableFuture, but the implementation // uses Callbacks that need to reference the innermost CompletableFuture. - List> allPromises = new ArrayList<>(); - Callback.Completable anyFailure = new Callback.Completable(); - HttpClient[] clients = new HttpClient[config.getUsersPerThread()]; + Callback.Completable anyFailure = new Callback.Completable(); + // This is the callback to use for warmup iterations. int warmupIterations = config.getWarmupIterationsPerThread(); WarmupCallback warmupCallback = new WarmupCallback(anyFailure, warmupIterations); + // This is the callback to use for run iterations. + RunCallback runCallback = new RunCallback(); + // Fail fast in case of run failures. + runCallback.exceptionally(x -> { + anyFailure.completeExceptionally(x); + return null; + }); + // Fail the run iterations if there is any failure. + anyFailure.exceptionally(x -> { + runCallback.completeExceptionally(x); + return null; + }); + try { // Wait for all the sender threads to arrive here. awaitBarrier(); @@ -307,13 +320,8 @@ private CompletableFuture process() { } else { lastIteration = runFor > 0 && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - begin) >= runFor; } - Callback.Completable promise = new Callback.Completable(); - callback = promise; - // Fail fast in case of failures. - allPromises.add(promise.exceptionally(x -> { - anyFailure.completeExceptionally(x); - return null; - })); + runCallback.increment(lastIteration); + callback = runCallback; } HttpClient client = clients[clientIndex]; @@ -340,21 +348,14 @@ private CompletableFuture process() { anyFailure.completeExceptionally(x); } - // If there was a failure, fail all the resource tree promises. - anyFailure.whenComplete((r, x) -> { - if (x != null) { - allPromises.forEach(p -> p.completeExceptionally(x)); - } - }); - - return CompletableFuture.allOf(allPromises.toArray(CompletableFuture[]::new)) + return runCallback .whenComplete((r, x) -> { - // When the resource tree promises are complete, - // try to succeed anyFailure, if was not already failed. + // When the resource trees are complete, try to + // succeed anyFailure, if was not already failed. anyFailure.complete(null); }) - // FlatMap anyFailure with the promises, so that if all the - // promises have succeeded, the failure is reported anyway. + // FlatMap anyFailure so that even if all the promises have succeeded, + // the failure is reported anyway (for example, manual interruption). .thenCompose(y -> anyFailure) .whenComplete((r, x) -> { if (LOGGER.isDebugEnabled()) { @@ -1355,4 +1356,24 @@ private void failure(Throwable failure) { super.failed(failure); } } + + private static class RunCallback extends Callback.Completable { + private final AtomicLong counter = new AtomicLong(); + private boolean last; + + @Override + public void succeeded() { + if (counter.decrementAndGet() == 0 && last) { + super.succeeded(); + } + } + + public void increment(boolean last) { + this.last = last; + counter.incrementAndGet(); + } + + + } + }