Skip to content

Commit

Permalink
Fixes #69 - LoadGenerator.interrupt() does not work reliably.
Browse files Browse the repository at this point in the history
Now always reporting failures in the CompletableFuture returned by begin().

Before, CompletableFuture.anyOf() could have returned either the failure,
or the reduction of the resource promises, which may have succeeded.
This was causing random failures in testJMX().

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Mar 23, 2021
1 parent 9fa9345 commit d5e9b4d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ public static Builder builder() {
private final Config config;
private final CyclicBarrier barrier;
private ExecutorService executorService;
private volatile boolean interrupt;
private volatile boolean interrupted;

private LoadGenerator(Config config) {
LoadGenerator(Config config) {
this.config = config;
this.barrier = new CyclicBarrier(config.threads);
addBean(config);
Expand All @@ -141,7 +141,7 @@ private CompletableFuture<Void> spawn() {
@Override
protected void doStart() throws Exception {
executorService = Executors.newCachedThreadPool();
interrupt = false;
interrupted = false;
super.doStart();
}

Expand Down Expand Up @@ -208,7 +208,11 @@ public void interrupt() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("interrupting {}", this);
}
interrupt = true;
interrupted = true;
}

boolean isInterrupted() {
return interrupted;
}

private CompletableFuture<Void> process() {
Expand Down Expand Up @@ -320,7 +324,7 @@ private CompletableFuture<Void> process() {
break send;
}

if (interrupt) {
if (isInterrupted()) {
throw new InterruptedException("sender thread interrupted");
}

Expand All @@ -336,8 +340,22 @@ private CompletableFuture<Void> process() {
anyFailure.completeExceptionally(x);
}

CompletableFuture<Void> allResults = CompletableFuture.allOf(allPromises.toArray(CompletableFuture[]::new));
return CompletableFuture.anyOf(allResults, anyFailure)
// If there was a failure, fail all the resource tree promises.
anyFailure.whenComplete((r, x) -> {
if (x != null) {
allPromises.forEach(p -> p.completeExceptionally(x));
}
});

return CompletableFuture.allOf(allPromises.toArray(CompletableFuture[]::new))
.whenComplete((r, x) -> {
// When the resource tree promises are complete,
// try to succeed anyFailure, if was not already failed.
anyFailure.complete(null);
})
// FlatMap anyFailure with the promises, so that if all the
// promises have succeeded, the failure is reported anyway.
.thenCompose(y -> anyFailure)
.whenComplete((r, x) -> {
if (LOGGER.isDebugEnabled()) {
if (x == null) {
Expand All @@ -353,9 +371,7 @@ private CompletableFuture<Void> process() {
LOGGER.debug("stopping http clients");
}
Arrays.stream(clients).forEach(this::stopHttpClient);
}, executorService)
// Just to please the compiler and convert back to CompletableFuture<Void>.
.thenRun(() -> {});
}, executorService);
}

protected HttpClient newHttpClient(Config config) {
Expand Down Expand Up @@ -920,7 +936,7 @@ private Resource asResource(Map<?, ?> map) {
return new Resource("/");
}
Resource result = new Resource();
result.fromJSON((Map<?,?>)obj);
result.fromJSON((Map<?, ?>)obj);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,42 @@ public void testInterrupt() throws Exception {
}).get(5, TimeUnit.SECONDS);
}

@Test
public void testInterruptAfterResourceComplete() throws Exception {
startServer(new AbstractHandler() {
@Override
public void handle(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) {
jettyRequest.setHandled(true);
}
});

LoadGenerator.Builder config = new LoadGenerator.Builder()
.port(connector.getLocalPort())
.httpClientTransportBuilder(clientTransportBuilder)
.iterationsPerThread(0)
.resourceListener((Resource.NodeListener)info -> {
info.getLoadGenerator().interrupt();
});
LoadGenerator loadGenerator = new LoadGenerator(config) {
@Override
boolean isInterrupted() {
try {
Thread.sleep(1000);
return super.isInterrupted();
} catch (InterruptedException x) {
return true;
}
}
};

CompletableFuture<Void> cf = loadGenerator.begin();

cf.handle((r, x) -> {
Assert.assertNotNull(x);
return r;
}).get(5, TimeUnit.SECONDS);
}

@Test
public void testRunFor() throws Exception {
startServer(new TestHandler());
Expand Down Expand Up @@ -352,8 +388,6 @@ public void testJMX() throws Exception {
mbeanContainer.getMBeanServer().invoke(objectName, "interrupt", new Object[0], new String[0]);

cf.handle((r, x) -> {
if (x == null)
Thread.dumpStack();
// Load generation was interrupted.
Assert.assertNotNull(x);
Throwable cause = x.getCause();
Expand Down Expand Up @@ -572,6 +606,7 @@ public void testServerSlowOnFirstIterationFastOnLastIteration() throws Exception
int resourceRate = 3;
startServer(new AbstractHandler() {
private final AtomicInteger requests = new AtomicInteger();

@Override
public void handle(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException {
jettyRequest.setHandled(true);
Expand Down

0 comments on commit d5e9b4d

Please sign in to comment.