From 8bd75a94b2fa49784bce963ee5b050b0188d7de7 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sat, 30 Jul 2022 21:31:32 +0800 Subject: [PATCH] [ISSUE #176] Realize the ability to read from the replica node,Optimize read performance --- .../dledger/DLedgerRpcNettyService.java | 76 +++++++----- .../storage/dledger/DLedgerServer.java | 111 ++++++++++++++++-- .../storage/dledger/client/DLedgerClient.java | 29 +++-- .../dledger/protocol/DLedgerProtocol.java | 2 + .../protocol/DLedgerProtocolHandler.java | 2 + .../dledger/protocol/DLedgerRequestCode.java | 3 +- .../dledger/protocol/DLedgerResponseCode.java | 4 +- .../protocol/PullReadIndexRequest.java | 21 ++++ .../protocol/PullReadIndexResponse.java | 53 +++++++++ .../store/file/DLedgerMmapFileStore.java | 6 +- 10 files changed, 254 insertions(+), 53 deletions(-) create mode 100644 src/main/java/io/openmessaging/storage/dledger/protocol/PullReadIndexRequest.java create mode 100644 src/main/java/io/openmessaging/storage/dledger/protocol/PullReadIndexResponse.java diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java index d59d0268..9a58eb9c 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java @@ -32,6 +32,8 @@ import io.openmessaging.storage.dledger.protocol.MetadataResponse; import io.openmessaging.storage.dledger.protocol.PullEntriesRequest; import io.openmessaging.storage.dledger.protocol.PullEntriesResponse; +import io.openmessaging.storage.dledger.protocol.PullReadIndexRequest; +import io.openmessaging.storage.dledger.protocol.PullReadIndexResponse; import io.openmessaging.storage.dledger.protocol.PushEntryRequest; import io.openmessaging.storage.dledger.protocol.PushEntryResponse; import io.openmessaging.storage.dledger.protocol.RequestOrResponse; @@ -77,11 +79,13 @@ public DLedgerRpcNettyService(DLedgerServer dLedgerServer) { this(dLedgerServer, null, null, null); } - public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) { + public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, + NettyClientConfig nettyClientConfig) { this(dLedgerServer, nettyServerConfig, nettyClientConfig, null); } - public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) { + public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, + NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) { this.dLedgerServer = dLedgerServer; this.memberState = dLedgerServer.getMemberState(); NettyRequestProcessor protocolProcessor = new NettyRequestProcessor() { @@ -109,6 +113,7 @@ public boolean rejectRequest() { this.remotingServer.registerProcessor(DLedgerRequestCode.VOTE.getCode(), protocolProcessor, null); this.remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null); this.remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null); + this.remotingServer.registerProcessor(DLedgerRequestCode.PULL_READ_INDEX.getCode(), protocolProcessor, null); //start the remoting client if (nettyClientConfig == null) { @@ -252,9 +257,29 @@ public CompletableFuture push(PushEntryRequest request) throw return future; } + @Override + public CompletableFuture pullReadIndex(PullReadIndexRequest request) throws Exception { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.PULL_READ_INDEX.getCode(), null); + wrapperRequest.setBody(JSON.toJSONBytes(request)); + remotingClient.invokeAsync(getPeerAddr(request), wrapperRequest, 3000, responseFuture -> { + RemotingCommand responseCommand = responseFuture.getResponseCommand(); + PullReadIndexResponse response; + if (null != responseCommand) { + response = JSON.parseObject(responseCommand.getBody(), PullReadIndexResponse.class); + } else { + response = new PullReadIndexResponse(); + response.copyBaseInfo(request); + response.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode()); + } + future.complete(response); + }); + return future; + } + @Override public CompletableFuture leadershipTransfer( - LeadershipTransferRequest request) throws Exception { + LeadershipTransferRequest request) throws Exception { CompletableFuture future = new CompletableFuture<>(); try { RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null); @@ -283,7 +308,7 @@ public CompletableFuture leadershipTransfer( } private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request, - ChannelHandlerContext ctx) { + ChannelHandlerContext ctx) { RemotingCommand response = null; try { if (t != null) { @@ -319,57 +344,43 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand case METADATA: { MetadataRequest metadataRequest = JSON.parseObject(request.getBody(), MetadataRequest.class); CompletableFuture future = handleMetadata(metadataRequest); - future.whenCompleteAsync((x, y) -> { - writeResponse(x, y, request, ctx); - }, futureExecutor); + future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor); break; } case APPEND: { AppendEntryRequest appendEntryRequest = JSON.parseObject(request.getBody(), AppendEntryRequest.class); CompletableFuture future = handleAppend(appendEntryRequest); - future.whenCompleteAsync((x, y) -> { - writeResponse(x, y, request, ctx); - }, futureExecutor); + future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor); break; } case GET: { GetEntriesRequest getEntriesRequest = JSON.parseObject(request.getBody(), GetEntriesRequest.class); CompletableFuture future = handleGet(getEntriesRequest); - future.whenCompleteAsync((x, y) -> { - writeResponse(x, y, request, ctx); - }, futureExecutor); + future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor); break; } case PULL: { PullEntriesRequest pullEntriesRequest = JSON.parseObject(request.getBody(), PullEntriesRequest.class); CompletableFuture future = handlePull(pullEntriesRequest); - future.whenCompleteAsync((x, y) -> { - writeResponse(x, y, request, ctx); - }, futureExecutor); + future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor); break; } case PUSH: { PushEntryRequest pushEntryRequest = JSON.parseObject(request.getBody(), PushEntryRequest.class); CompletableFuture future = handlePush(pushEntryRequest); - future.whenCompleteAsync((x, y) -> { - writeResponse(x, y, request, ctx); - }, futureExecutor); + future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor); break; } case VOTE: { VoteRequest voteRequest = JSON.parseObject(request.getBody(), VoteRequest.class); CompletableFuture future = handleVote(voteRequest); - future.whenCompleteAsync((x, y) -> { - writeResponse(x, y, request, ctx); - }, futureExecutor); + future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor); break; } case HEART_BEAT: { HeartBeatRequest heartBeatRequest = JSON.parseObject(request.getBody(), HeartBeatRequest.class); CompletableFuture future = handleHeartBeat(heartBeatRequest); - future.whenCompleteAsync((x, y) -> { - writeResponse(x, y, request, ctx); - }, futureExecutor); + future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor); break; } case LEADERSHIP_TRANSFER: { @@ -379,10 +390,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand future.whenCompleteAsync((x, y) -> { writeResponse(x, y, request, ctx); logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms", - request, x, DLedgerUtils.elapsed(start)); + request, x, DLedgerUtils.elapsed(start)); }, futureExecutor); break; } + case PULL_READ_INDEX: { + PullReadIndexRequest pullReadIndexRequest = JSON.parseObject(request.getBody(), PullReadIndexRequest.class); + CompletableFuture future = handlePullReadIndex(pullReadIndexRequest); + future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor); + break; + } default: logger.error("Unknown request code {} from {}", request.getCode(), request); break; @@ -392,7 +409,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand @Override public CompletableFuture handleLeadershipTransfer( - LeadershipTransferRequest leadershipTransferRequest) throws Exception { + LeadershipTransferRequest leadershipTransferRequest) throws Exception { return dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest); } @@ -432,6 +449,11 @@ public CompletableFuture handlePush(PushEntryRequest request) return dLedgerServer.handlePush(request); } + @Override + public CompletableFuture handlePullReadIndex(PullReadIndexRequest request) throws Exception { + return dLedgerServer.handlePullReadIndex(request); + } + public RemotingCommand handleResponse(RequestOrResponse response, RemotingCommand request) { RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(DLedgerResponseCode.SUCCESS.getCode(), null); remotingCommand.setBody(JSON.toJSONBytes(response)); diff --git a/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index 723e1c9a..3eb823f3 100644 --- a/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -33,6 +33,8 @@ import io.openmessaging.storage.dledger.protocol.MetadataResponse; import io.openmessaging.storage.dledger.protocol.PullEntriesRequest; import io.openmessaging.storage.dledger.protocol.PullEntriesResponse; +import io.openmessaging.storage.dledger.protocol.PullReadIndexRequest; +import io.openmessaging.storage.dledger.protocol.PullReadIndexResponse; import io.openmessaging.storage.dledger.protocol.PushEntryRequest; import io.openmessaging.storage.dledger.protocol.PushEntryResponse; import io.openmessaging.storage.dledger.protocol.VoteRequest; @@ -44,7 +46,6 @@ import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore; import io.openmessaging.storage.dledger.utils.DLedgerUtils; import io.openmessaging.storage.dledger.utils.PreConditions; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -52,11 +53,10 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.CompletableFuture; - import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; @@ -211,7 +211,7 @@ public CompletableFuture handleAppend(AppendEntryRequest re // record positions to return; long[] positions = new long[batchRequest.getBatchMsgs().size()]; DLedgerEntry resEntry = null; - // split bodys to append + // split bodies to append int index = 0; Iterator iterator = batchRequest.getBatchMsgs().iterator(); while (iterator.hasNext()) { @@ -226,8 +226,8 @@ public CompletableFuture handleAppend(AppendEntryRequest re batchAppendFuture.setPositions(positions); return batchAppendFuture; } - throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" + - " with empty bodys"); + throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODIES, "BatchAppendEntryRequest" + + " with empty bodies"); } else { DLedgerEntry dLedgerEntry = new DLedgerEntry(); dLedgerEntry.setBody(request.getBody()); @@ -246,16 +246,58 @@ public CompletableFuture handleAppend(AppendEntryRequest re } @Override - public CompletableFuture handleGet(GetEntriesRequest request) throws IOException { + public CompletableFuture handleGet(GetEntriesRequest request) throws Exception { try { PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId()); PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup()); - PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); - DLedgerEntry entry = dLedgerStore.get(request.getBeginIndex()); + PreConditions.check(!memberState.isCandidate(), DLedgerResponseCode.IS_CANDIDATE); GetEntriesResponse response = new GetEntriesResponse(); response.setGroup(memberState.getGroup()); - if (entry != null) { - response.setEntries(Collections.singletonList(entry)); + Long requestIndex = request.getBeginIndex(); + if (memberState.isFollower()) { + //Get from follower + if (requestIndex <= memberState.getLedgerEndIndex()) { + getEntry(response, requestIndex); + return CompletableFuture.completedFuture(response); + } + + // when requestIndex greater than ledgerEndIndex then send pull readIndex(ledgerEndIndex) request to leader + PullReadIndexRequest indexRequest = new PullReadIndexRequest(); + indexRequest.setGroup(request.getGroup()); + indexRequest.setRemoteId(memberState.getLeaderId()); + CompletableFuture future = dLedgerRpcService.pullReadIndex(indexRequest); + PullReadIndexResponse pullReadIndexResponse = future.get(); + if (pullReadIndexResponse.getCode() != DLedgerResponseCode.SUCCESS.getCode()) { + response.copyBaseInfo(request); + response.setLeaderId(memberState.getLeaderId()); + response.setCode(pullReadIndexResponse.getCode()); + return CompletableFuture.completedFuture(response); + } + + long readIndex = pullReadIndexResponse.getReadIndex(); + if (requestIndex > readIndex) { + response.copyBaseInfo(request); + response.setLeaderId(memberState.getLeaderId()); + response.setCode(DLedgerResponseCode.INDEX_OUT_OF_RANGE.getCode()); + return CompletableFuture.completedFuture(response); + } + + if (readIndex <= memberState.getLedgerEndIndex()) { + getEntry(response, requestIndex); + return CompletableFuture.completedFuture(response); + } + + //wait for follower ledgerEndIndex to update + if (!waitFollowerEndIndex2Update(2, TimeUnit.SECONDS, requestIndex)) { + logger.warn("update follower[{}] ledgerEndIndex time out", memberState.getSelfId()); + response.setCode(DLedgerResponseCode.FOLLOWER_UPDATE_END_INDEX_TIMEOUT.getCode()); + return CompletableFuture.completedFuture(response); + } + getEntry(response, requestIndex); + return CompletableFuture.completedFuture(response); + } else { + //get from leader + getEntry(response, requestIndex); } return CompletableFuture.completedFuture(response); } catch (DLedgerException e) { @@ -268,6 +310,13 @@ public CompletableFuture handleGet(GetEntriesRequest request } } + private void getEntry(GetEntriesResponse response, Long requestIndex) { + DLedgerEntry entry = dLedgerStore.get(requestIndex); + if (entry != null) { + response.setEntries(Collections.singletonList(entry)); + } + } + @Override public CompletableFuture handleMetadata(MetadataRequest request) throws Exception { try { @@ -311,6 +360,30 @@ public CompletableFuture handlePush(PushEntryRequest request) } + @Override + public CompletableFuture handlePullReadIndex(PullReadIndexRequest request) throws Exception { + try { + PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId()); + PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup()); + PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER); + + PullReadIndexResponse response = new PullReadIndexResponse(); + response.setGroup(memberState.getGroup()); + response.setLeaderId(memberState.getLeaderId()); + response.setEndIndex(memberState.getLedgerEndIndex()); + response.setReadIndex(memberState.getLedgerEndIndex()); + + return CompletableFuture.completedFuture(response); + } catch (DLedgerException e) { + logger.error("[{}][HandlePullReadIndex] failed", memberState.getSelfId(), e); + PullReadIndexResponse response = new PullReadIndexResponse(); + response.copyBaseInfo(request); + response.setCode(e.getCode().getCode()); + response.setLeaderId(memberState.getLeaderId()); + return CompletableFuture.completedFuture(response); + } + } + @Override public CompletableFuture handleLeadershipTransfer( LeadershipTransferRequest request) throws Exception { @@ -486,4 +559,20 @@ public NettyRemotingClient getRemotingClient() { return null; } + private boolean waitFollowerEndIndex2Update(long maxWaitTime, TimeUnit unit, long requestIndex) { + long maxWaitMs = unit.toMillis(maxWaitTime); + long start = System.currentTimeMillis(); + while (DLedgerUtils.elapsed(start) < maxWaitMs) { + try { + if (requestIndex <= memberState.getLedgerEndIndex()) { + return true; + } + DLedgerUtils.sleep(1); + } catch (Exception e) { + logger.warn("Wait [{}]Follower update endIndex error",memberState.getSelfId(),e); + break; + } + } + return false; + } } diff --git a/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java b/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java index 4d45171c..631ec964 100644 --- a/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java +++ b/src/main/java/io/openmessaging/storage/dledger/client/DLedgerClient.java @@ -22,17 +22,18 @@ import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; import io.openmessaging.storage.dledger.protocol.GetEntriesRequest; import io.openmessaging.storage.dledger.protocol.GetEntriesResponse; +import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest; +import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse; import io.openmessaging.storage.dledger.protocol.MetadataRequest; import io.openmessaging.storage.dledger.protocol.MetadataResponse; -import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse; -import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest; import io.openmessaging.storage.dledger.utils.DLedgerUtils; - +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ public class DLedgerClient { private final Map peerMap = new ConcurrentHashMap<>(); private final String group; private String leaderId; + private String remoteId; private DLedgerClientRpcService dLedgerClientRpcService; private MetadataUpdater metadataUpdater = new MetadataUpdater("MetadataUpdater", logger); @@ -51,7 +53,10 @@ public DLedgerClient(String group, String peers) { updatePeers(peers); dLedgerClientRpcService = new DLedgerClientRpcNettyService(); dLedgerClientRpcService.updatePeers(peers); - leaderId = peerMap.keySet().iterator().next(); + List peerList = new ArrayList<>(peerMap.keySet()); + Collections.shuffle(peerList); + this.leaderId = peerList.get(0); + this.remoteId = this.leaderId; } public AppendEntryResponse append(byte[] body) { @@ -86,19 +91,18 @@ public AppendEntryResponse append(byte[] body) { public GetEntriesResponse get(long index) { try { - waitOnUpdatingMetadata(1500, false); - if (leaderId == null) { + if (remoteId == null) { GetEntriesResponse response = new GetEntriesResponse(); response.setCode(DLedgerResponseCode.METADATA_ERROR.getCode()); return response; } - GetEntriesRequest request = new GetEntriesRequest(); request.setGroup(group); - request.setRemoteId(leaderId); + request.setRemoteId(remoteId); request.setBeginIndex(index); GetEntriesResponse response = dLedgerClientRpcService.get(request).get(); - if (response.getCode() == DLedgerResponseCode.NOT_LEADER.getCode()) { + if (response.getCode() == DLedgerResponseCode.FOLLOWER_UPDATE_END_INDEX_TIMEOUT.getCode()) { + //update leader id waitOnUpdatingMetadata(1500, true); if (leaderId != null) { request.setRemoteId(leaderId); @@ -108,7 +112,7 @@ public GetEntriesResponse get(long index) { return response; } catch (Exception t) { needFreshMetadata(); - logger.error("", t); + logger.error("Get error", t); GetEntriesResponse getEntriesResponse = new GetEntriesResponse(); getEntriesResponse.setCode(DLedgerResponseCode.INTERNAL_ERROR.getCode()); return getEntriesResponse; @@ -143,6 +147,9 @@ public void shutdown() { } private void updatePeers(String peers) { + if (peers == null || peers.trim().isEmpty()) { + throw new IllegalArgumentException("peers can't be empty"); + } for (String peerInfo : peers.split(";")) { String nodeId = peerInfo.split("-")[0]; peerMap.put(nodeId, peerInfo.substring(nodeId.length() + 1)); diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerProtocol.java b/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerProtocol.java index 592a2fd7..8b021862 100644 --- a/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerProtocol.java +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerProtocol.java @@ -31,4 +31,6 @@ public interface DLedgerProtocol extends DLedgerClientProtocol { CompletableFuture push(PushEntryRequest request) throws Exception; + CompletableFuture pullReadIndex(PullReadIndexRequest request) throws Exception; + } diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerProtocolHandler.java b/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerProtocolHandler.java index 0697d53f..87cf3d6d 100644 --- a/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerProtocolHandler.java +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerProtocolHandler.java @@ -30,4 +30,6 @@ public interface DLedgerProtocolHandler extends DLedgerClientProtocolHandler { CompletableFuture handlePull(PullEntriesRequest request) throws Exception; CompletableFuture handlePush(PushEntryRequest request) throws Exception; + + CompletableFuture handlePullReadIndex(PullReadIndexRequest request) throws Exception; } diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerRequestCode.java b/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerRequestCode.java index 521e7fb2..c4dcc232 100644 --- a/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerRequestCode.java +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerRequestCode.java @@ -28,7 +28,8 @@ public enum DLedgerRequestCode { HEART_BEAT(51002, ""), PULL(51003, ""), PUSH(51004, ""), - LEADERSHIP_TRANSFER(51005, ""); + LEADERSHIP_TRANSFER(51005, ""), + PULL_READ_INDEX(51006, ""); private static Map codeMap = new HashMap<>(); diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java b/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java index 6022bdca..d8c105cf 100644 --- a/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerResponseCode.java @@ -47,7 +47,9 @@ public enum DLedgerResponseCode { FALL_BEHIND_TOO_MUCH(417, ""), TAKE_LEADERSHIP_FAILED(418, ""), INDEX_LESS_THAN_LOCAL_BEGIN(419, ""), - REQUEST_WITH_EMPTY_BODYS(420, ""), + REQUEST_WITH_EMPTY_BODIES(420, ""), + FOLLOWER_UPDATE_END_INDEX_TIMEOUT(421, ""), + IS_CANDIDATE(422, ""), INTERNAL_ERROR(500, ""), TERM_CHANGED(501, ""), WAIT_QUORUM_ACK_TIMEOUT(502, ""), diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/PullReadIndexRequest.java b/src/main/java/io/openmessaging/storage/dledger/protocol/PullReadIndexRequest.java new file mode 100644 index 00000000..36726ee1 --- /dev/null +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/PullReadIndexRequest.java @@ -0,0 +1,21 @@ +/* + * Copyright 2017-2022 The DLedger Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.protocol; + +public class PullReadIndexRequest extends RequestOrResponse { + +} diff --git a/src/main/java/io/openmessaging/storage/dledger/protocol/PullReadIndexResponse.java b/src/main/java/io/openmessaging/storage/dledger/protocol/PullReadIndexResponse.java new file mode 100644 index 00000000..c3b2fde0 --- /dev/null +++ b/src/main/java/io/openmessaging/storage/dledger/protocol/PullReadIndexResponse.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017-2022 The DLedger Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.protocol; + +public class PullReadIndexResponse extends RequestOrResponse { + + private long beginIndex; + + private long endIndex; + + private long readIndex; + + public PullReadIndexResponse() { + } + + public long getBeginIndex() { + return beginIndex; + } + + public void setBeginIndex(long beginIndex) { + this.beginIndex = beginIndex; + } + + public long getEndIndex() { + return endIndex; + } + + public void setEndIndex(long endIndex) { + this.endIndex = endIndex; + } + + public long getReadIndex() { + return readIndex; + } + + public void setReadIndex(long readIndex) { + this.readIndex = readIndex; + } +} diff --git a/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java b/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java index c57900a2..cc17bb73 100644 --- a/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java +++ b/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java @@ -649,7 +649,8 @@ public FlushDataService(String name, Logger logger) { super(name, logger); } - @Override public void doWork() { + @Override + public void doWork() { try { long start = System.currentTimeMillis(); DLedgerMmapFileStore.this.dataFileList.flush(0); @@ -681,7 +682,8 @@ public CleanSpaceService(String name, Logger logger) { super(name, logger); } - @Override public void doWork() { + @Override + public void doWork() { try { storeBaseRatio = DLedgerUtils.getDiskPartitionSpaceUsedPercent(dLedgerConfig.getStoreBaseDir()); dataRatio = calcDataStorePathPhysicRatio();