From 02881a005d0849cbdba58c34c11f0b002e92ed07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ram=C3=B3n=20Berrutti?= Date: Thu, 22 Feb 2024 19:39:57 +0000 Subject: [PATCH] AddPeer API --- server/jetstream_api.go | 96 ++++++++++++++++++++++++++++++ server/jetstream_cluster.go | 5 ++ server/jetstream_cluster_1_test.go | 40 +++++++++++++ 3 files changed, 141 insertions(+) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 03c55ea5935..1accbf44f6f 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -195,6 +195,12 @@ const ( // Will return JSON response. JSApiLeaderStepDown = "$JS.API.META.LEADER.STEPDOWN" + // JSApiAddServer is the endpoint to add a peer server to the cluster. + // Only allow to add peer previously removed. + // Only works from system account. + // Will return JSON response. + JSApiAddServer = "$JS.API.SERVER.ADD" + // JSApiRemoveServer is the endpoint to remove a peer server from the cluster. // Only works from system account. // Will return JSON response. @@ -591,6 +597,23 @@ type JSApiLeaderStepDownResponse struct { const JSApiLeaderStepDownResponseType = "io.nats.jetstream.api.v1.meta_leader_stepdown_response" +// JSApiMetaServerAddRequest will add a peer to the meta group. +type JSApiMetaServerAddRequest struct { + // Server name of the peer to be added. + Server string `json:"peer"` + // Peer ID of the peer to be added. If specified this is used + // instead of the server name. + Peer string `json:"peer_id,omitempty"` +} + +// JSApiMetaServerAddResponse is the response to a peer addition request in the meta group. +type JSApiMetaServerAddResponse struct { + ApiResponse + Success bool `json:"success,omitempty"` +} + +const JSApiMetaServerAddResponseType = "io.nats.jetstream.api.v1.meta_server_add_response" + // JSApiMetaServerRemoveRequest will remove a peer from the meta group. type JSApiMetaServerRemoveRequest struct { // Server name of the peer to be removed. @@ -757,6 +780,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub // Ignore system level directives meta stepdown and peer remove requests here. if subject == JSApiLeaderStepDown || subject == JSApiRemoveServer || + subject == JSApiAddServer || strings.HasPrefix(subject, jsAPIAccountPre) { return } @@ -2292,6 +2316,78 @@ func (s *Server) jsStreamRemovePeerRequest(sub *subscription, c *client, _ *Acco s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp)) } +// Request to have the metaleader add a peer to the system. +func (s *Server) jsLeaderServerAddRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { + if c == nil || !s.JetStreamEnabled() { + return + } + + ci, acc, _, msg, err := s.getRequestInfo(c, rmsg) + if err != nil { + s.Warnf(badAPIRequestT, msg) + return + } + + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil || cc.meta == nil { + return + } + + js.mu.RLock() + isLeader := cc.isLeader() + js.mu.RUnlock() + + if !isLeader { + return + } + + var resp = JSApiMetaServerAddResponse{ApiResponse: ApiResponse{Type: JSApiMetaServerAddResponseType}} + + if isEmptyRequest(msg) { + resp.Error = NewJSBadRequestError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + var req JSApiMetaServerAddRequest + if err := json.Unmarshal(msg, &req); err != nil { + resp.Error = NewJSInvalidJSONError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + return + } + + var found string + js.mu.RLock() + // Use nodeToInfo because cc.meta.Peers() does not have peer. + if req.Peer != _EMPTY_ { + _, ok := s.nodeToInfo.Load(req.Peer) + if ok { + found = req.Peer + } + } else { + s.nodeToInfo.Range(func(id, ni interface{}) bool { + if req.Server == ni.(nodeInfo).name { + found = id.(string) + return false + } + return true + }) + } + js.mu.RUnlock() + + if found == _EMPTY_ { + resp.Error = NewJSClusterServerNotMemberError() + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) + } + + js.mu.Lock() + cc.meta.ProposeAddPeer(found) + js.mu.Unlock() + + resp.Success = true + s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp)) +} + // Request to have the metaleader remove a peer from the system. func (s *Server) jsLeaderServerRemoveRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { if c == nil || !s.JetStreamEnabled() { diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index f25ec4efd7f..cb500588b7c 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -59,6 +59,8 @@ type jetStreamCluster struct { consumerResults *subscription // System level request to have the leader stepdown. stepdown *subscription + // System level requests to add a peer. + peerAdd *subscription // System level requests to remove a peer. peerRemove *subscription // System level request to move a stream @@ -5310,6 +5312,9 @@ func (js *jetStream) startUpdatesSub() { if cc.stepdown == nil { cc.stepdown, _ = s.systemSubscribe(JSApiLeaderStepDown, _EMPTY_, false, c, s.jsLeaderStepDownRequest) } + if cc.peerAdd == nil { + cc.peerAdd, _ = s.systemSubscribe(JSApiAddServer, _EMPTY_, false, c, s.jsLeaderServerAddRequest) + } if cc.peerRemove == nil { cc.peerRemove, _ = s.systemSubscribe(JSApiRemoveServer, _EMPTY_, false, c, s.jsLeaderServerRemoveRequest) } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 0c2b0552978..f5f34b41888 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -3568,6 +3568,10 @@ func TestJetStreamClusterPeerRemovalAndStreamReassignment(t *testing.T) { if resp.Error != nil { t.Fatalf("Unexpected error: %+v", resp.Error) } + if !resp.Success { + t.Fatal("Unexpected success") + } + // In case that server was also meta-leader. c.waitOnLeader() @@ -3600,6 +3604,42 @@ func TestJetStreamClusterPeerRemovalAndStreamReassignment(t *testing.T) { } return nil }) + + // Restore the node. + toAdd := toRemove + reqAdd := JSApiMetaServerAddRequest{Server: toAdd} + jsAddReq, err := json.Marshal(reqAdd) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + rAddMsg, err := nc.Request(JSApiAddServer, jsAddReq, time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + var addResp JSApiMetaServerAddResponse + if err := json.Unmarshal(rAddMsg.Data, &addResp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if addResp.Error != nil { + t.Fatalf("Unexpected error: %+v", addResp.Error) + } + + if !addResp.Success { + t.Fatal("Unexpected success") + } + + // In case that server was also meta-leader. + c.waitOnLeader() + + checkFor(t, 15*time.Second, 250*time.Millisecond, func() error { + for _, s := range ml.JetStreamClusterPeers() { + if s == toAdd { + return nil + } + } + return fmt.Errorf("Server not in the peer list") + }) } func TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace(t *testing.T) {