Skip to content

Commit

Permalink
chore(logger): move trim log to broker (#1819)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Aug 15, 2024
1 parent 34eb8c1 commit 08b520b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metadata.StreamState;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.LogContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -55,11 +56,9 @@
import org.apache.kafka.common.requests.s3.TrimStreamsRequest;
import org.apache.kafka.server.common.automq.AutoMQVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ControllerStreamManager implements StreamManager {

private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStreamManager.class);
private final Logger logger;
private final StreamMetadataManager streamMetadataManager;
private final int nodeId;
private final long nodeEpoch;
Expand All @@ -70,6 +69,7 @@ public class ControllerStreamManager implements StreamManager {

public ControllerStreamManager(StreamMetadataManager streamMetadataManager, ControllerRequestSender requestSender,
int nodeId, long nodeEpoch, Supplier<AutoMQVersion> version, boolean failoverMode) {
this.logger = new LogContext(String.format("[nodeId=%s nodeEpoch=%s]", nodeId, nodeEpoch)).logger(ControllerStreamManager.class);
this.streamMetadataManager = streamMetadataManager;
this.nodeId = nodeId;
this.nodeEpoch = nodeEpoch;
Expand Down Expand Up @@ -112,10 +112,10 @@ public String toString() {
.map(m -> new StreamMetadata(m.streamId(), m.epoch(), m.startOffset(), m.endOffset(), StreamState.OPENED))
.collect(Collectors.toList()));
case NODE_EPOCH_EXPIRED:
LOGGER.error("Node epoch expired: {}, code: {}", req, code);
logger.error("Node epoch expired: {}, code: {}", req, code);
throw code.exception();
default:
LOGGER.error("Error while getting streams offset: {}, code: {}, retry later", req, code);
logger.error("Error while getting streams offset: {}, code: {}, retry later", req, code);
return ResponseHandleResult.withRetry();
}
});
Expand Down Expand Up @@ -169,10 +169,10 @@ public Builder toRequestBuilder() {
return ResponseHandleResult.withSuccess(resp.streamId());
case NODE_EPOCH_EXPIRED:
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist: {}, code: {}", req, Errors.forCode(resp.errorCode()));
logger.error("Node epoch expired or not exist: {}, code: {}", req, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
default:
LOGGER.error("Error while creating stream: {}, code: {}, retry later", req, Errors.forCode(resp.errorCode()));
logger.error("Error while creating stream: {}, code: {}, retry later", req, Errors.forCode(resp.errorCode()));
return ResponseHandleResult.withRetry();
}
});
Expand Down Expand Up @@ -230,20 +230,20 @@ public String toString() {
new StreamMetadata(streamId, epoch, resp.startOffset(), resp.nextOffset(), StreamState.OPENED));
case NODE_EPOCH_EXPIRED:
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist, stream {}, epoch {}, code: {}", streamId, epoch, code);
logger.error("Node epoch expired or not exist, stream {}, epoch {}, code: {}", streamId, epoch, code);
throw code.exception();
case STREAM_FENCED:
LOGGER.warn("[STREAM_FENCED] open stream failed streamId={}, epoch {}, code: {}", streamId, epoch, code);
logger.warn("[STREAM_FENCED] open stream failed streamId={}, epoch {}, code: {}", streamId, epoch, code);
throw code.exception();
case STREAM_NOT_EXIST:
case STREAM_INNER_ERROR:
LOGGER.error("Unexpected error while opening stream: {}, epoch {}, code: {}", streamId, epoch, code);
logger.error("Unexpected error while opening stream: {}, epoch {}, code: {}", streamId, epoch, code);
throw code.exception();
case STREAM_NOT_CLOSED:
LOGGER.warn("open stream fail: {}, epoch {}, code: STREAM_NOT_CLOSED, retry later", streamId, epoch);
logger.warn("open stream fail: {}, epoch {}, code: STREAM_NOT_CLOSED, retry later", streamId, epoch);
return ResponseHandleResult.withRetry();
default:
LOGGER.error("Error while opening stream: {}, epoch {}, code: {}, retry later", streamId, epoch, code);
logger.error("Error while opening stream: {}, epoch {}, code: {}, retry later", streamId, epoch, code);
return ResponseHandleResult.withRetry();
}
});
Expand Down Expand Up @@ -290,22 +290,28 @@ public Builder toRequestBuilder() {
return ResponseHandleResult.withSuccess(null);
case NODE_EPOCH_EXPIRED:
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
logger.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
case STREAM_NOT_EXIST:
case STREAM_FENCED:
case STREAM_NOT_OPENED:
case OFFSET_NOT_MATCHED:
case STREAM_INNER_ERROR:
LOGGER.error("Unexpected error while trimming stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
logger.error("Unexpected error while trimming stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
default:
LOGGER.warn("Error while trimming stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode()));
logger.warn("Error while trimming stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode()));
return ResponseHandleResult.withRetry();
}
});
this.requestSender.send(task);
return future;
return future.whenComplete((nil, ex) -> {
if (ex != null) {
logger.error("[TRIM_STREAM_FAIL],request={}", request, ex);
} else {
logger.info("[TRIM_STREAM],request={}", request);
}
});
}

@Override
Expand Down Expand Up @@ -350,15 +356,15 @@ public Builder toRequestBuilder() {
return ResponseHandleResult.withSuccess(null);
case NODE_EPOCH_EXPIRED:
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
logger.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
case STREAM_NOT_EXIST:
case STREAM_FENCED:
case STREAM_INNER_ERROR:
LOGGER.error("Unexpected error while closing stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
logger.error("Unexpected error while closing stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
default:
LOGGER.warn("Error while closing stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode()));
logger.warn("Error while closing stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode()));
return ResponseHandleResult.withRetry();
}
});
Expand Down Expand Up @@ -404,15 +410,15 @@ public Builder toRequestBuilder() {
return ResponseHandleResult.withSuccess(null);
case NODE_EPOCH_EXPIRED:
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
logger.error("Node epoch expired or not exist: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
case STREAM_NOT_EXIST:
case STREAM_FENCED:
case STREAM_INNER_ERROR:
LOGGER.error("Unexpected error while deleting stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
logger.error("Unexpected error while deleting stream: {}, code: {}", request, Errors.forCode(resp.errorCode()));
throw Errors.forCode(resp.errorCode()).exception();
default:
LOGGER.warn("Error while deleting stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode()));
logger.warn("Error while deleting stream: {}, code: {}, retry later", request, Errors.forCode(resp.errorCode()));
return ResponseHandleResult.withRetry();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,6 @@ public ControllerResult<TrimStreamResponse> trimStream(int nodeId, long nodeEpoc
if (resp.errorCode() != Errors.NONE.code()) {
return ControllerResult.of(Collections.emptyList(), resp);
}
log.info("[TrimStream] successfully trim the stream. streamId={}, streamEpoch={}, trimOffset={}, nodeId={}, nodeEpoch={}",
streamId, epoch, newStartOffset, nodeId, nodeEpoch);
return ControllerResult.atomicOf(records, resp);
}

Expand Down

0 comments on commit 08b520b

Please sign in to comment.