Skip to content

Commit

Permalink
Fixes #76 - Long runs retain too much memory.
Browse files Browse the repository at this point in the history
Now using a counter instead of a list of `CompletableFuture`s.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Apr 30, 2021
1 parent 9b6b831 commit cdd128d
Showing 1 changed file with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.HttpClient;
Expand Down Expand Up @@ -222,15 +223,27 @@ private CompletableFuture<Void> process() {
// The method returns a CompletableFuture, but the implementation
// uses Callbacks that need to reference the innermost CompletableFuture.

List<CompletableFuture<Void>> allPromises = new ArrayList<>();
Callback.Completable anyFailure = new Callback.Completable();

HttpClient[] clients = new HttpClient[config.getUsersPerThread()];

Callback.Completable anyFailure = new Callback.Completable();

// This is the callback to use for warmup iterations.
int warmupIterations = config.getWarmupIterationsPerThread();
WarmupCallback warmupCallback = new WarmupCallback(anyFailure, warmupIterations);

// This is the callback to use for run iterations.
RunCallback runCallback = new RunCallback();
// Fail fast in case of run failures.
runCallback.exceptionally(x -> {
anyFailure.completeExceptionally(x);
return null;
});
// Fail the run iterations if there is any failure.
anyFailure.exceptionally(x -> {
runCallback.completeExceptionally(x);
return null;
});

try {
// Wait for all the sender threads to arrive here.
awaitBarrier();
Expand Down Expand Up @@ -307,13 +320,8 @@ private CompletableFuture<Void> process() {
} else {
lastIteration = runFor > 0 && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - begin) >= runFor;
}
Callback.Completable promise = new Callback.Completable();
callback = promise;
// Fail fast in case of failures.
allPromises.add(promise.exceptionally(x -> {
anyFailure.completeExceptionally(x);
return null;
}));
runCallback.increment(lastIteration);
callback = runCallback;
}

HttpClient client = clients[clientIndex];
Expand All @@ -340,21 +348,14 @@ private CompletableFuture<Void> process() {
anyFailure.completeExceptionally(x);
}

// If there was a failure, fail all the resource tree promises.
anyFailure.whenComplete((r, x) -> {
if (x != null) {
allPromises.forEach(p -> p.completeExceptionally(x));
}
});

return CompletableFuture.allOf(allPromises.toArray(CompletableFuture[]::new))
return runCallback
.whenComplete((r, x) -> {
// When the resource tree promises are complete,
// try to succeed anyFailure, if was not already failed.
// When the resource trees are complete, try to
// succeed anyFailure, if was not already failed.
anyFailure.complete(null);
})
// FlatMap anyFailure with the promises, so that if all the
// promises have succeeded, the failure is reported anyway.
// FlatMap anyFailure so that even if all the promises have succeeded,
// the failure is reported anyway (for example, manual interruption).
.thenCompose(y -> anyFailure)
.whenComplete((r, x) -> {
if (LOGGER.isDebugEnabled()) {
Expand Down Expand Up @@ -1355,4 +1356,24 @@ private void failure(Throwable failure) {
super.failed(failure);
}
}

private static class RunCallback extends Callback.Completable {
private final AtomicLong counter = new AtomicLong();
private boolean last;

@Override
public void succeeded() {
if (counter.decrementAndGet() == 0 && last) {
super.succeeded();
}
}

public void increment(boolean last) {
this.last = last;
counter.incrementAndGet();
}


}

}

0 comments on commit cdd128d

Please sign in to comment.