Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2244. Reduce the number of log messages during bootstrap #1217

Merged
merged 2 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void sendRequestWithRetry(PendingOrderedRequest pending) {
final Throwable exception = e;
final String key = client.getId() + "-" + request.getCallId() + "-" + exception;
final Consumer<String> op = suffix -> LOG.error("{} {}: Failed* {}", suffix, client.getId(), request, exception);
BatchLogger.warn(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
BatchLogger.print(BatchLogKey.SEND_REQUEST_EXCEPTION, key, op);
handleException(pending, request, e);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ default TimeDuration getBatchDuration() {

private static final class UniqueId {
private final Key key;
private final String name;
private final Object name;

private UniqueId(Key key, String name) {
private UniqueId(Key key, Object name) {
this.key = Objects.requireNonNull(key, "key == null");
this.name = name;
}
Expand Down Expand Up @@ -99,15 +99,15 @@ private synchronized boolean tryStartBatch(Consumer<String> op) {
private static final TimeoutExecutor SCHEDULER = TimeoutExecutor.getInstance();
private static final ConcurrentMap<UniqueId, BatchedLogEntry> LOG_CACHE = new ConcurrentHashMap<>();

public static void warn(Key key, String name, Consumer<String> op) {
warn(key, name, op, key.getBatchDuration(), true);
public static void print(Key key, Object name, Consumer<String> op) {
print(key, name, op, key.getBatchDuration(), true);
}

public static void warn(Key key, String name, Consumer<String> op, TimeDuration batchDuration) {
warn(key, name, op, batchDuration, true);
public static void print(Key key, Object name, Consumer<String> op, TimeDuration batchDuration) {
print(key, name, op, batchDuration, true);
}

public static void warn(Key key, String name, Consumer<String> op, TimeDuration batchDuration, boolean shouldBatch) {
public static void print(Key key, Object name, Consumer<String> op, TimeDuration batchDuration, boolean shouldBatch) {
if (!shouldBatch || batchDuration.isNonPositive()) {
op.accept("");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class GrpcLogAppender extends LogAppenderBase {

private enum BatchLogKey implements BatchLogger.Key {
RESET_CLIENT,
INCONSISTENCY_REPLY,
APPEND_LOG_RESPONSE_HANDLER_ON_ERROR
}

Expand Down Expand Up @@ -217,7 +218,7 @@ private void resetClient(AppendEntriesRequest request, Event event) {
.orElseGet(f::getMatchIndex);
if (event.isError() && request == null) {
final long followerNextIndex = f.getNextIndex();
BatchLogger.warn(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix ->
BatchLogger.print(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix ->
LOG.warn("{}: Follower failed (request=null, errorCount={}); keep nextIndex ({}) unchanged and retry.{}",
this, errorCount, followerNextIndex, suffix), logMessageBatchDuration);
return;
Expand Down Expand Up @@ -534,8 +535,9 @@ private void onNextImpl(AppendEntriesRequest request, AppendEntriesReplyProto re
break;
case INCONSISTENCY:
grpcServerMetrics.onRequestInconsistency(getFollowerId().toString());
LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={}",
this, reply.getResult(), reply.getNextIndex(), errorCount, request);
BatchLogger.print(BatchLogKey.INCONSISTENCY_REPLY, getFollower().getName() + "_" + reply.getNextIndex(),
suffix -> LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={} {}",
this, reply.getResult(), reply.getNextIndex(), errorCount, request, suffix));
final long requestFirstIndex = request != null? request.getFirstIndex(): RaftLog.INVALID_LOG_INDEX;
updateNextIndex(getNextIndexForInconsistency(requestFirstIndex, reply.getNextIndex()));
break;
Expand All @@ -555,7 +557,7 @@ public void onError(Throwable t) {
LOG.info("{} is already stopped", GrpcLogAppender.this);
return;
}
BatchLogger.warn(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name,
BatchLogger.print(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name,
suffix -> GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries" + suffix, t),
logMessageBatchDuration, t instanceof StatusRuntimeException);
grpcServerMetrics.onRequestRetry(); // Update try counter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReferenceCountedObject;
import org.slf4j.Logger;
Expand All @@ -49,6 +51,11 @@
class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);

private enum BatchLogKey implements BatchLogger.Key {
COMPLETED_REQUEST,
COMPLETED_REPLY
}

static class PendingServerRequest<REQUEST> {
private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
private final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -76,6 +83,7 @@ void release() {

abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> {
private final RaftServer.Op op;
private final Supplier<String> nameSupplier;
private final StreamObserver<REPLY> responseObserver;
/** For ordered {@link #onNext(Object)} requests. */
private final AtomicReference<PendingServerRequest<REQUEST>> previousOnNext = new AtomicReference<>();
Expand All @@ -86,9 +94,14 @@ abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObse

ServerRequestStreamObserver(RaftServer.Op op, StreamObserver<REPLY> responseObserver) {
this.op = op;
this.nameSupplier = MemoizedSupplier.valueOf(() -> getId() + "_" + op);
this.responseObserver = responseObserver;
}

String getName() {
return nameSupplier.get();
}

private String getPreviousRequestString() {
return Optional.ofNullable(previousOnNext.get())
.map(PendingServerRequest::getRequest)
Expand Down Expand Up @@ -197,9 +210,12 @@ public void onNext(REQUEST request) {
@Override
public void onCompleted() {
if (isClosed.compareAndSet(false, true)) {
LOG.info("{}: Completed {}, lastRequest: {}", getId(), op, getPreviousRequestString());
BatchLogger.print(BatchLogKey.COMPLETED_REQUEST, getName(),
suffix -> LOG.info("{}: Completed {}, lastRequest: {} {}",
getId(), op, getPreviousRequestString(), suffix));
requestFuture.get().thenAccept(reply -> {
LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply);
BatchLogger.print(BatchLogKey.COMPLETED_REPLY, getName(),
suffix -> LOG.info("{}: Completed {}, lastReply: {} {}", getId(), op, reply, suffix));
responseObserver.onCompleted();
});
releaseLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
Expand All @@ -59,6 +60,11 @@
class SnapshotInstallationHandler {
static final Logger LOG = LoggerFactory.getLogger(SnapshotInstallationHandler.class);

private enum BatchLogKey implements BatchLogger.Key {
INSTALL_SNAPSHOT_REQUEST,
INSTALL_SNAPSHOT_REPLY
}

static final TermIndex INVALID_TERM_INDEX = TermIndex.valueOf(0, INVALID_LOG_INDEX);

private final RaftServerImpl server;
Expand Down Expand Up @@ -93,21 +99,19 @@ long getInProgressInstallSnapshotIndex() {
}

InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("{}: receive installSnapshot: {}", getMemberId(),
ServerStringUtils.toInstallSnapshotRequestString(request));
}
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REQUEST, getMemberId(),
suffix -> LOG.info("{}: receive installSnapshot: {} {}",
getMemberId(), ServerStringUtils.toInstallSnapshotRequestString(request), suffix));
final InstallSnapshotReplyProto reply;
try {
reply = installSnapshotImpl(request);
} catch (Exception e) {
LOG.error("{}: installSnapshot failed", getMemberId(), e);
throw e;
}
if (LOG.isInfoEnabled()) {
LOG.info("{}: reply installSnapshot: {}", getMemberId(),
ServerStringUtils.toInstallSnapshotReplyString(reply));
}
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REPLY, getMemberId(),
suffix -> LOG.info("{}: reply installSnapshot: {} {}",
getMemberId(), ServerStringUtils.toInstallSnapshotReplyString(reply), suffix));
return reply;
}

Expand Down
Loading