Skip to content

Commit

Permalink
Fixes #88 - Make request sending more time-accurate.
Browse files Browse the repository at this point in the history
Fixed send algorithm to improve ramp-up accuracy and sleep accuracy.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Jun 4, 2021
1 parent f2d05d4 commit 41b51a5
Showing 1 changed file with 74 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.api.Request;
Expand Down Expand Up @@ -260,91 +260,86 @@ private CompletableFuture<Void> process() {
}

int rate = config.getResourceRate();
long period = rate > 0 ? TimeUnit.SECONDS.toNanos(config.getThreads()) / rate : 0;
long ratePeriod = rate > 0 ? TimeUnit.SECONDS.toNanos(config.getThreads()) / rate : 0;
long rateRampUpPeriod = TimeUnit.SECONDS.toNanos(config.getRateRampUpPeriod());

long runFor = config.getRunFor();
int iterations = runFor > 0 ? 0 : config.getIterationsPerThread();

long alreadySent = 0;
long rampUpUnsent = 0;
int clientIndex = 0;
boolean warmup = true;
long begin = System.nanoTime();
long warmupWait = 0;
long loops = 0;
long rampUpLoops = 0;

send:
while (true) {
// Typically only one batch is sent.
// However, for high rates the period may be smaller than the
// timer resolution so the sleep may last more than expected.
// Also in case of GC pauses time may be lost.
// To compensate for oversleeping, the batch is adjusted.
long batchToSend = 1;
if (period > 0) {
TimeUnit.NANOSECONDS.sleep(period);
++loops;
if (ratePeriod > 0) {
// If there is a ramp-up, calculate how
// long to sleep before sending a request.
long expectedSendTime;
long elapsedNanos = System.nanoTime() - begin - warmupWait;
long expectedSent = Math.round((double)elapsedNanos / period);
if (rateRampUpPeriod > 0 && elapsedNanos < rateRampUpPeriod) {
// The rate ramp-up is linear: it will bring the rate up in the
// given time, so that the rate over time graph is a right triangle.
double rampUpRate = ((double)elapsedNanos / rateRampUpPeriod) / period;
// The accumulated number of requests is the area of the triangle.
long rampUpExpectedSent = Math.round(elapsedNanos * rampUpRate / 2);
rampUpUnsent = expectedSent - rampUpExpectedSent;
expectedSent = rampUpExpectedSent;
if (rateRampUpPeriod > 0) {
if (elapsedNanos < rateRampUpPeriod) {
++rampUpLoops;
// The curve of the rate over time during the ramp-up is a straight line: y = k * x.
// The area (i.e. the integral) below the curve is the number of requests sent.
// We want to calculate the x coordinate (i.e. the send time) when the area is 1, 2, etc.
// The area (i.e. the number of ramp-up loops) is: a = 1/2 * k * x^2 and we want to find x.
expectedSendTime = begin + Math.round(Math.sqrt(2.0D * rampUpLoops * ratePeriod * rateRampUpPeriod));
} else {
// Correct by 1 to take into account that the last ramp-up
// expected send time is likely to be past the rampUpPeriod.
expectedSendTime = begin + rateRampUpPeriod + (loops - rampUpLoops + 1) * ratePeriod;
}
} else {
// Adjust for those requests that could
// not be sent in the last ramp-up step.
expectedSent -= rampUpUnsent;
expectedSendTime = begin + loops * ratePeriod;
}
batchToSend = expectedSent - alreadySent;
alreadySent = expectedSent;
long sleep = Math.max(0, expectedSendTime - System.nanoTime());
// Note that the sleep below may over-sleep or under-sleep.
// Either case, the next loop will adjust the next sleep time.
// Even in case of long pauses (e.g. GC) during one loop, the
// next loops will catch up by sleeping very little or zero.
sleep(sleep);
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("sending batch: {} resources", batchToSend);
}

while (batchToSend > 0) {
Callback callback;
boolean lastIteration = false;
if (warmup) {
if (warmupIterations == 0) {
warmup = false;
long start = System.nanoTime();
warmupCallback.join();
warmupWait = System.nanoTime() - start;
continue;
} else {
--warmupIterations;
callback = warmupCallback;
}
Callback callback;
boolean lastIteration = false;
if (warmup) {
if (warmupIterations == 0) {
warmup = false;
long start = System.nanoTime();
warmupCallback.join();
warmupWait = System.nanoTime() - start;
continue;
} else {
if (iterations > 0) {
lastIteration = --iterations == 0;
} else {
lastIteration = runFor > 0 && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - begin) >= runFor;
}
runCallback.increment(lastIteration);
callback = runCallback;
--warmupIterations;
callback = warmupCallback;
}
} else {
if (iterations > 0) {
lastIteration = --iterations == 0;
} else {
lastIteration = runFor > 0 && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - begin) >= runFor;
}
runCallback.increment(lastIteration);
callback = runCallback;
}

HttpClient client = clients[clientIndex];
sendResourceTree(client, config.getResource(), warmup, callback);
--batchToSend;
HttpClient client = clients[clientIndex];
sendResourceTree(client, config.getResource(), warmup, callback);

if (lastIteration || anyFailure.isCompletedExceptionally()) {
break send;
}
if (lastIteration || anyFailure.isCompletedExceptionally()) {
break;
}

if (isInterrupted()) {
throw new InterruptedException("sender thread interrupted");
}
if (isInterrupted()) {
throw new InterruptedException("sender thread interrupted");
}

if (++clientIndex == clients.length) {
clientIndex = 0;
}
if (++clientIndex == clients.length) {
clientIndex = 0;
}
}
} catch (Throwable x) {
Expand Down Expand Up @@ -565,6 +560,14 @@ private void invokeResourceTreeListener(Resource.TreeListener listener, Resource
}
}

private void sleep(long nanos) {
long start = System.nanoTime();
while (nanos > 0) {
LockSupport.parkNanos(nanos);
nanos = start + nanos - System.nanoTime();
}
}

private class Sender {
private final Queue<Resource.Info> queue = new ArrayDeque<>();
private final Set<URI> pushCache = Collections.newSetFromMap(new ConcurrentHashMap<>());
Expand Down Expand Up @@ -660,9 +663,13 @@ private void send(List<Resource.Info> resources) {
private void sendChildren(Resource resource) {
List<Resource> children = resource.getResources();
if (!children.isEmpty()) {
offer(children.stream()
.map(child -> child.newInfo(LoadGenerator.this))
.collect(Collectors.toList()));
List<Resource.Info> infos = new ArrayList<>(children.size());
for (Resource child : children) {
Resource.Info info = child.newInfo(LoadGenerator.this);
info.setRequestTime(System.nanoTime());
infos.add(info);
}
offer(infos);
send();
}
}
Expand Down Expand Up @@ -1390,8 +1397,5 @@ public void increment(boolean last) {
this.last = last;
counter.incrementAndGet();
}


}

}

0 comments on commit 41b51a5

Please sign in to comment.