Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

performance optimization about CompletableFuture #107

Open
Cczzzz opened this issue Jan 26, 2022 · 9 comments
Open

performance optimization about CompletableFuture #107

Cczzzz opened this issue Jan 26, 2022 · 9 comments
Labels
enhancement enhancement

Comments

@Cczzzz
Copy link
Contributor

Cczzzz commented Jan 26, 2022

in dledger ,Use a lot of CompletableFutures . But there is a problem,if you use thenAccept ,thenApply . the thread that executes the function is, Thread that executes complete.

在dledger 用了非常多的 CompletableFutures,但是CompletableFutures 有一个问题,就是如果用 thenAccept ,thenApply 这样的方法后,谁来执行回调函数呢,是调用future.complete方法的线程。

在如下例子中

    CompletableFuture<String> completableFuture = new CompletableFuture<>();
    completableFuture.thenApply(String -> {
        System.out.println(Thread.currentThread());
        System.out.println("thenApply");
        return 123;
    }).thenAccept(num -> {
        System.out.println(Thread.currentThread());
        System.out.println("thenAccept");
    });

    completableFuture.complete("abc");
    Thread.sleep(10000);

打印值为

Thread[main,5,main]
thenApply
Thread[main,5,main]
thenAccept

执行回调函数的都是main 函数。

dledger 中CompletableFutures 往往都传递的很深,而执行 complete 都是一些单线程的任务,这部分线程资源非常珍贵。
列如发生消息流程中,我用arthas 观察执行回调函数的线程
watch *.StoreStatsService setPutMessageEntireTimeMax "@java.lang.Thread@currentThread().name" -x 1 -n 10

image

DefaultMessageStore 437 :

    putResultFuture.thenAccept((result) -> {
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().add(1);
        }
    });

执行DefaultMessageStore 437 行代码的线程居然是 QuorumAckChecker-n0
QuorumAckChecker-n0 是master 用来同步消息的线程,是个单线程的任务。我觉得如此重要的线程资源用来执行这种回调函数是非常影响性能的

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Feb 7, 2022

我在ack 确认的任务中增加了许多log来确定耗时的问题,如下

  @Override
    public void doWork() {
        long start = System.nanoTime();
        try {
            if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {
                logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",
                    memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));
                lastPrintWatermarkTimeMs = System.currentTimeMillis();
            }
            if (!memberState.isLeader()) {
                waitForRunning(1);
                return;
            }
            long currTerm = memberState.currTerm();
            checkTermForPendingMap(currTerm, "QuorumAckChecker");
            checkTermForWaterMark(currTerm, "QuorumAckChecker");
            long partA = System.nanoTime();
            if (pendingAppendResponsesByTerm.size() > 1) {
                for (Long term : pendingAppendResponsesByTerm.keySet()) {
                    if (term == currTerm) {
                        continue;
                    }
                    for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {
                        AppendEntryResponse response = new AppendEntryResponse();
                        response.setGroup(memberState.getGroup());
                        response.setIndex(futureEntry.getKey());
                        response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());
                        response.setLeaderId(memberState.getLeaderId());
                        logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);
                        futureEntry.getValue().complete(response);
                    }
                    pendingAppendResponsesByTerm.remove(term);
                }
            }
            if (peerWaterMarksByTerm.size() > 1) {
                for (Long term : peerWaterMarksByTerm.keySet()) {
                    if (term == currTerm) {
                        continue;
                    }
                    logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);
                    peerWaterMarksByTerm.remove(term);
                }
            }
            long partB = System.nanoTime();
            Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);

            List<Long> sortedWaterMarks = peerWaterMarks.values()
                .stream()
                .sorted(Comparator.reverseOrder())
                .collect(Collectors.toList());
            long quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2);
            dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);
            long partC = System.nanoTime();
            ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
            int pendingSize = responses.size();
            boolean needCheck = false;
            int ackNum = 0;
            for (Long i = quorumIndex; i > lastQuorumIndex; i--) {
                try {
                    CompletableFuture<AppendEntryResponse> future = responses.remove(i);
                    if (future == null) {
                        needCheck = true;
                        break;
                    } else if (!future.isDone()) {
                        AppendEntryResponse response = new AppendEntryResponse();
                        response.setGroup(memberState.getGroup());
                        response.setTerm(currTerm);
                        response.setIndex(i);
                        response.setLeaderId(memberState.getSelfId());
                        response.setPos(((AppendFuture) future).getPos());
                        future.complete(response);
                    }
                    ackNum++;
                } catch (Throwable t) {
                    logger.error("Error in ack to index={} term={}", i, currTerm, t);
                }
            }
            long partD = System.nanoTime();
            if (ackNum == 0) {
                for (long i = quorumIndex + 1; i < Integer.MAX_VALUE; i++) {
                    TimeoutFuture<AppendEntryResponse> future = responses.get(i);
                    if (future == null) {
                        break;
                    } else if (future.isTimeOut()) {
                        AppendEntryResponse response = new AppendEntryResponse();
                        response.setGroup(memberState.getGroup());
                        response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());
                        response.setTerm(currTerm);
                        response.setIndex(i);
                        response.setLeaderId(memberState.getSelfId());
                        future.complete(response);
                    } else {
                        break;
                    }
                }
                waitForRunning(1);
            }
            long partE = System.nanoTime();

            if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {
                updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());
                for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) {
                    if (futureEntry.getKey() < quorumIndex) {
                        AppendEntryResponse response = new AppendEntryResponse();
                        response.setGroup(memberState.getGroup());
                        response.setTerm(currTerm);
                        response.setIndex(futureEntry.getKey());
                        response.setLeaderId(memberState.getSelfId());
                        response.setPos(((AppendFuture) futureEntry.getValue()).getPos());
                        futureEntry.getValue().complete(response);
                        responses.remove(futureEntry.getKey());
                    }
                }
                lastCheckLeakTimeMs = System.currentTimeMillis();
            }
            lastQuorumIndex = quorumIndex;
            long partF = System.nanoTime();
            if (partF - start > 1500000 || ackNum > 100) {
                logger.info("partA {} , partB {} , partC {} , partD {} , partE {} , partF {}, ackNum {} , pending size {} ",
                    partA - start,
                    partB - partA,
                    partC - partB,
                    partD - partC,
                    partE - partD,
                    partF - partE,
                    ackNum,
                    pendingSize );
            }

        } catch (Throwable t) {
            DLedgerEntryPusher.logger.error("Error in {}", getName(), t);
            DLedgerUtils.sleep(100);
        }
    }
}

在4ktps 写入下,当我观察log时 发现
2022-02-07 10:21:29 INFO QuorumAckChecker-n0 - partA 238 , partB 67 , partC 1972 , partD 1971882 , partE 34 , partF 162, ackNum 10 , pending size 141
2022-02-07 10:21:29 INFO QuorumAckChecker-n0 - partA 423 , partB 73 , partC 1753 , partD 653803 , partE 84 , partF 118, ackNum 120 , pending size 225

2022-02-07 10:21:40 INFO QuorumAckChecker-n0 - partA 201 , partB 43 , partC 1133 , partD 2594866 , partE 73 , partF 148, ackNum 3 , pending size 231
2022-02-07 10:21:40 INFO QuorumAckChecker-n0 - partA 485 , partB 103 , partC 2869 , partD 826787 , partE 72 , partF 98, ackNum 293 , pending size 378
2022-02-07 10:21:40 INFO QuorumAckChecker-n0 - partA 503 , partB 81 , partC 2663 , partD 260621 , partE 42 , partF 63, ackNum 121 , pending size 206
2022-02-07 10:21:40 INFO QuorumAckChecker-n0 - partA 178 , partB 47 , partC 465 , partD 65 , partE 6132364 , partF 180, ackNum 0 , pending size 45
2022-02-07 10:21:40 INFO QuorumAckChecker-n0 - partA 254 , partB 82 , partC 2357 , partD 1146029 , partE 96 , partF 123, ackNum 322 , pending size379
2022-02-07 10:21:41 INFO QuorumAckChecker-n0 - partA 313 , partB 85 , partC 2537 , partD 429066 , partE 70 , partF 128, ackNum 140 , pending size 178

有时partD 部分会超过1ms甚至到达3-5ms,parD 部分的代码为循环递减一个index 然后从ConcurrentHashMap 中remove 元素,(此时ConcurrentHashMap中的元素数量<1000),并且执行 future.complete(response);
也许是 future.complete(response); 耗时较高或者是ConcurrentHashMap 性能不行,毕竟读的时候也在进行写入。

  • 改进方式,将future.complete 后的回调函数变成异步执行
  • 使用新的并发组件代替ConcurrentHashMap,这个场景下所有的key都是数字,并且在写入时是顺序递增的,我们可以参考Disruptor 中的 Ringbuffer 。

@lifepuzzlefun
Copy link
Contributor

i guess the most cost part may be the iteration of the ConcurrentHashMap. can we add iterate entry counter in the log ?

i agree with the first point. and in the current RocketMQ use the thenAcceptAsync in some place. and other code execute in the complete thread are some stats code. i think the time cost is little.

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Feb 7, 2022

pending size 206 is map size before loop .< 1000.Maybe it's caused by read-write locks and hash collisions.
map 里面所有的key都是递增的数字,hash 冲突是否是会很严重呢,如果hash 冲突加上读写锁可能是引起耗时高的关键

@lifepuzzlefun
Copy link
Contributor

the concurrentHashMap will rehash the key hash code, even key is put continuously. hash collision won't be a problem. may be we can profile this case. and i wonder your write qps and the config for maxPendingRequestsNum which is default 10000.

@lifepuzzlefun
Copy link
Contributor

i think your idea is a good point to optimize dledger performance. and hope for more information from you.

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Feb 8, 2022

maxPendingRequestsNum is default 10000.

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Feb 8, 2022

image
主从同步也是严重影响性能的问题,太多在这里要耗费10ms的情况了,我还没有看slave接受请求的逻辑

@Cczzzz
Copy link
Contributor Author

Cczzzz commented Feb 8, 2022

同步给slave 要先将消息从pagche里面读出来,然后解码,然后再json 编码,然后才是发送,是否用bp来代替json更好呢
image
这部分json是否应该放在多线程里面

@RongtongJin RongtongJin added the enhancement enhancement label Feb 18, 2022
@RongtongJin
Copy link
Contributor

Looking forward to your pull request

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement enhancement
Projects
None yet
Development

No branches or pull requests

3 participants