From 220b1b7b4c46c1f9b26c5bed28a719a0144fdb61 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 4 Jun 2021 12:11:45 +0200 Subject: [PATCH] Fixes #88 - Make request sending more time-accurate. Fixed send algorithm to improve ramp-up accuracy and sleep accuracy. Signed-off-by: Simone Bordet --- .../jetty/load/generator/LoadGenerator.java | 144 +++++++++--------- 1 file changed, 74 insertions(+), 70 deletions(-) diff --git a/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java b/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java index b1371ecf..c865ce4c 100644 --- a/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java +++ b/jetty-load-generator-client/src/main/java/org/mortbay/jetty/load/generator/LoadGenerator.java @@ -35,8 +35,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpRequest; import org.eclipse.jetty.client.api.Request; @@ -260,91 +260,86 @@ private CompletableFuture process() { } int rate = config.getResourceRate(); - long period = rate > 0 ? TimeUnit.SECONDS.toNanos(config.getThreads()) / rate : 0; + long ratePeriod = rate > 0 ? TimeUnit.SECONDS.toNanos(config.getThreads()) / rate : 0; long rateRampUpPeriod = TimeUnit.SECONDS.toNanos(config.getRateRampUpPeriod()); long runFor = config.getRunFor(); int iterations = runFor > 0 ? 0 : config.getIterationsPerThread(); - long alreadySent = 0; - long rampUpUnsent = 0; int clientIndex = 0; boolean warmup = true; long begin = System.nanoTime(); long warmupWait = 0; + long loops = 0; + long rampUpLoops = 0; - send: while (true) { - // Typically only one batch is sent. - // However, for high rates the period may be smaller than the - // timer resolution so the sleep may last more than expected. - // Also in case of GC pauses time may be lost. - // To compensate for oversleeping, the batch is adjusted. - long batchToSend = 1; - if (period > 0) { - TimeUnit.NANOSECONDS.sleep(period); + ++loops; + if (ratePeriod > 0) { + // If there is a ramp-up, calculate how + // long to sleep before sending a request. + long expectedSendTime; long elapsedNanos = System.nanoTime() - begin - warmupWait; - long expectedSent = Math.round((double)elapsedNanos / period); - if (rateRampUpPeriod > 0 && elapsedNanos < rateRampUpPeriod) { - // The rate ramp-up is linear: it will bring the rate up in the - // given time, so that the rate over time graph is a right triangle. - double rampUpRate = ((double)elapsedNanos / rateRampUpPeriod) / period; - // The accumulated number of requests is the area of the triangle. - long rampUpExpectedSent = Math.round(elapsedNanos * rampUpRate / 2); - rampUpUnsent = expectedSent - rampUpExpectedSent; - expectedSent = rampUpExpectedSent; + if (rateRampUpPeriod > 0) { + if (elapsedNanos < rateRampUpPeriod) { + ++rampUpLoops; + // The curve of the rate over time during the ramp-up is a straight line: y = k * x. + // The area (i.e. the integral) below the curve is the number of requests sent. + // We want to calculate the x coordinate (i.e. the send time) when the area is 1, 2, etc. + // The area (i.e. the number of ramp-up loops) is: a = 1/2 * k * x^2 and we want to find x. + expectedSendTime = begin + Math.round(Math.sqrt(2.0D * rampUpLoops * ratePeriod * rateRampUpPeriod)); + } else { + // Correct by 1 to take into account that the last ramp-up + // expected send time is likely to be past the rampUpPeriod. + expectedSendTime = begin + rateRampUpPeriod + (loops - rampUpLoops + 1) * ratePeriod; + } } else { - // Adjust for those requests that could - // not be sent in the last ramp-up step. - expectedSent -= rampUpUnsent; + expectedSendTime = begin + loops * ratePeriod; } - batchToSend = expectedSent - alreadySent; - alreadySent = expectedSent; + long sleep = Math.max(0, expectedSendTime - System.nanoTime()); + // Note that the sleep below may over-sleep or under-sleep. + // Either case, the next loop will adjust the next sleep time. + // Even in case of long pauses (e.g. GC) during one loop, the + // next loops will catch up by sleeping very little or zero. + sleep(sleep); } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("sending batch: {} resources", batchToSend); - } - - while (batchToSend > 0) { - Callback callback; - boolean lastIteration = false; - if (warmup) { - if (warmupIterations == 0) { - warmup = false; - long start = System.nanoTime(); - warmupCallback.join(); - warmupWait = System.nanoTime() - start; - continue; - } else { - --warmupIterations; - callback = warmupCallback; - } + Callback callback; + boolean lastIteration = false; + if (warmup) { + if (warmupIterations == 0) { + warmup = false; + long start = System.nanoTime(); + warmupCallback.join(); + warmupWait = System.nanoTime() - start; + continue; } else { - if (iterations > 0) { - lastIteration = --iterations == 0; - } else { - lastIteration = runFor > 0 && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - begin) >= runFor; - } - runCallback.increment(lastIteration); - callback = runCallback; + --warmupIterations; + callback = warmupCallback; + } + } else { + if (iterations > 0) { + lastIteration = --iterations == 0; + } else { + lastIteration = runFor > 0 && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - begin) >= runFor; } + runCallback.increment(lastIteration); + callback = runCallback; + } - HttpClient client = clients[clientIndex]; - sendResourceTree(client, config.getResource(), warmup, callback); - --batchToSend; + HttpClient client = clients[clientIndex]; + sendResourceTree(client, config.getResource(), warmup, callback); - if (lastIteration || anyFailure.isCompletedExceptionally()) { - break send; - } + if (lastIteration || anyFailure.isCompletedExceptionally()) { + break; + } - if (isInterrupted()) { - throw new InterruptedException("sender thread interrupted"); - } + if (isInterrupted()) { + throw new InterruptedException("sender thread interrupted"); + } - if (++clientIndex == clients.length) { - clientIndex = 0; - } + if (++clientIndex == clients.length) { + clientIndex = 0; } } } catch (Throwable x) { @@ -565,6 +560,14 @@ private void invokeResourceTreeListener(Resource.TreeListener listener, Resource } } + private void sleep(long nanos) { + long start = System.nanoTime(); + while (nanos > 0) { + LockSupport.parkNanos(nanos); + nanos = start + nanos - System.nanoTime(); + } + } + private class Sender { private final Queue queue = new ArrayDeque<>(); private final Set pushCache = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -660,9 +663,13 @@ private void send(List resources) { private void sendChildren(Resource resource) { List children = resource.getResources(); if (!children.isEmpty()) { - offer(children.stream() - .map(child -> child.newInfo(LoadGenerator.this)) - .collect(Collectors.toList())); + List infos = new ArrayList<>(children.size()); + for (Resource child : children) { + Resource.Info info = child.newInfo(LoadGenerator.this); + info.setRequestTime(System.nanoTime()); + infos.add(info); + } + offer(infos); send(); } } @@ -1390,8 +1397,5 @@ public void increment(boolean last) { this.last = last; counter.incrementAndGet(); } - - } - }