diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/AbstractClusterSubscribeListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/AbstractClusterSubscribeListener.java index 677687eaf13b..b730f9042680 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/AbstractClusterSubscribeListener.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/AbstractClusterSubscribeListener.java @@ -30,8 +30,8 @@ public void notify(Event event) { try { // make sure the event is processed in order synchronized (this) { - Event.Type type = event.type(); - T server = parseServerFromHeartbeat(event.data()); + Event.Type type = event.getType(); + T server = parseServerFromHeartbeat(event.getEventData()); if (server == null) { log.error("Unknown cluster change event: {}", event); return; @@ -58,6 +58,11 @@ public void notify(Event event) { } } + @Override + public SubscribeScope getSubscribeScope() { + return SubscribeScope.CHILDREN_ONLY; + } + abstract T parseServerFromHeartbeat(String serverHeartBeatJson); public abstract void onServerAdded(T serverHeartBeat); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java index d99445498285..b260a591e799 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/Event.java @@ -17,115 +17,30 @@ package org.apache.dolphinscheduler.registry.api; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString +@Builder +@AllArgsConstructor public class Event { - // The prefix which is watched - private String key; + // The path which is watched + private final String watchedPath; // The full path where the event was generated - private String path; + private final String eventPath; // The value corresponding to the path - private String data; + private final String eventData; // The event type {ADD, REMOVE, UPDATE} private Type type; - public Event(String key, String path, String data, Type type) { - this.key = key; - this.path = path; - this.data = data; - this.type = type; - } - - public Event() { - } - - public static EventBuilder builder() { - return new EventBuilder(); - } - - public String key() { - return this.key; - } - - public String path() { - return this.path; - } - - public String data() { - return this.data; - } - - public Type type() { - return this.type; - } - - public Event key(String key) { - this.key = key; - return this; - } - - public Event path(String path) { - this.path = path; - return this; - } - - public Event data(String data) { - this.data = data; - return this; - } - - public Event type(Type type) { - this.type = type; - return this; - } - - public String toString() { - return "Event(key=" + this.key() + ", path=" + this.path() + ", data=" + this.data() + ", type=" + this.type() - + ")"; - } - public enum Type { ADD, REMOVE, UPDATE } - public static class EventBuilder { - - private String key; - private String path; - private String data; - private Type type; - - EventBuilder() { - } - - public EventBuilder key(String key) { - this.key = key; - return this; - } - - public EventBuilder path(String path) { - this.path = path; - return this; - } - - public EventBuilder data(String data) { - this.data = data; - return this; - } - - public EventBuilder type(Type type) { - this.type = type; - return this; - } - - public Event build() { - return new Event(key, path, data, type); - } - - public String toString() { - return "Event.EventBuilder(key=" + this.key + ", path=" + this.path + ", data=" + this.data + ", type=" - + this.type + ")"; - } - } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java index e7b434e5f6df..e63d50a2af85 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/SubscribeListener.java @@ -19,5 +19,23 @@ public interface SubscribeListener { - void notify(Event event); + void notify(final Event event); + + SubscribeScope getSubscribeScope(); + + enum SubscribeScope { + /** + * Only watch the path itself + */ + PATH_ONLY, + /** + * Only watch the children of the path + */ + CHILDREN_ONLY, + /** + * Watch the path and all its children and the parent path + */ + ALL + + } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java index c87f341a30a9..a79b14d99d5c 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-api/src/main/java/org/apache/dolphinscheduler/registry/api/ha/AbstractHAServer.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.Registry; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; import java.util.List; @@ -56,16 +57,25 @@ public AbstractHAServer(final Registry registry, final String selectorPath, fina @Override public void start() { - registry.subscribe(selectorPath, event -> { - if (Event.Type.REMOVE.equals(event.type())) { - if (serverIdentify.equals(event.data())) { - statusChange(ServerStatus.STAND_BY); - } else { - if (participateElection()) { - statusChange(ServerStatus.ACTIVE); + registry.subscribe(selectorPath, new SubscribeListener() { + + @Override + public void notify(Event event) { + if (Event.Type.REMOVE.equals(event.getType())) { + if (serverIdentify.equals(event.getEventData())) { + statusChange(ServerStatus.STAND_BY); + } else { + if (participateElection()) { + statusChange(ServerStatus.ACTIVE); + } } } } + + @Override + public SubscribeScope getSubscribeScope() { + return SubscribeScope.PATH_ONLY; + } }); if (participateElection()) { diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java index 54a270f047f9..9f1d031131ce 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -158,7 +159,24 @@ public void subscribe(String path, SubscribeListener listener) { watcherMap.computeIfAbsent(path, $ -> client.getWatchClient().watch(watchKey, watchOption, watchResponse -> { for (WatchEvent event : watchResponse.getEvents()) { - listener.notify(new EventAdaptor(event, path)); + final String eventPath = event.getKeyValue().getKey().toString(StandardCharsets.UTF_8); + switch (listener.getSubscribeScope()) { + case PATH_ONLY: + if (eventPath.equals(path)) { + listener.notify(toEvent(event, path)); + } + break; + case CHILDREN_ONLY: + if (!eventPath.equals(path)) { + listener.notify(toEvent(event, path)); + } + break; + case ALL: + listener.notify(toEvent(event, path)); + break; + default: + throw new RegistryException("Unknown event scope: " + listener.getSubscribeScope()); + } } })); } catch (Exception e) { @@ -373,30 +391,31 @@ private static ByteSequence byteSequence(String val) { return ByteSequence.from(val, StandardCharsets.UTF_8); } - static final class EventAdaptor extends Event { - - public EventAdaptor(WatchEvent event, String key) { - key(key); - - switch (event.getEventType()) { - case PUT: - if (event.getPrevKV().getKey().isEmpty()) { - type(Type.ADD); - } else { - type(Type.UPDATE); - } - break; - case DELETE: - type(Type.REMOVE); - break; - default: - break; - } - KeyValue keyValue = event.getKeyValue(); - if (keyValue != null) { - path(keyValue.getKey().toString(StandardCharsets.UTF_8)); - data(keyValue.getValue().toString(StandardCharsets.UTF_8)); - } + private Event toEvent(final WatchEvent watchEvent, final String watchedPath) { + Event.Type eventType = null; + switch (watchEvent.getEventType()) { + case PUT: + if (watchEvent.getPrevKV().getKey().isEmpty()) { + eventType = Event.Type.ADD; + } else { + eventType = Event.Type.UPDATE; + } + break; + case DELETE: + eventType = Event.Type.REMOVE; + break; + default: + break; } + final KeyValue keyValue = watchEvent.getKeyValue(); + return Event.builder() + .type(eventType) + .watchedPath(watchedPath) + .eventPath(Optional.ofNullable(keyValue).map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)) + .orElse(null)) + .eventData(Optional.ofNullable(keyValue).map(kv -> kv.getValue().toString(StandardCharsets.UTF_8)) + .orElse(null)) + .build(); } + } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java index a9a23dfa0fc3..5db9798d505d 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java @@ -81,16 +81,24 @@ public void testSubscribe() { final AtomicBoolean subscribeRemoved = new AtomicBoolean(false); final AtomicBoolean subscribeUpdated = new AtomicBoolean(false); - SubscribeListener subscribeListener = event -> { - System.out.println("Receive event: " + event); - if (event.type() == Event.Type.ADD) { - subscribeAdded.compareAndSet(false, true); + final SubscribeListener subscribeListener = new SubscribeListener() { + + @Override + public void notify(Event event) { + if (event.getType() == Event.Type.ADD) { + subscribeAdded.compareAndSet(false, true); + } + if (event.getType() == Event.Type.REMOVE) { + subscribeRemoved.compareAndSet(false, true); + } + if (event.getType() == Event.Type.UPDATE) { + subscribeUpdated.compareAndSet(false, true); + } } - if (event.type() == Event.Type.REMOVE) { - subscribeRemoved.compareAndSet(false, true); - } - if (event.type() == Event.Type.UPDATE) { - subscribeUpdated.compareAndSet(false, true); + + @Override + public SubscribeScope getSubscribeScope() { + return SubscribeScope.PATH_ONLY; } }; String key = "/nodes/master" + System.nanoTime(); diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java index 37a301bcadf1..f9614d3c388d 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java @@ -24,10 +24,8 @@ import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataDTO; import org.apache.dolphinscheduler.plugin.registry.jdbc.server.ConnectionStateListener; import org.apache.dolphinscheduler.plugin.registry.jdbc.server.IJdbcRegistryServer; -import org.apache.dolphinscheduler.plugin.registry.jdbc.server.JdbcRegistryDataChangeListener; import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.ConnectionState; -import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.Registry; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.SubscribeListener; @@ -97,56 +95,11 @@ public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryExcept } @Override - public void subscribe(String subscribePath, SubscribeListener listener) { - checkNotNull(subscribePath); + public void subscribe(String watchedPath, SubscribeListener listener) { + checkNotNull(watchedPath); checkNotNull(listener); - jdbcRegistryClient.subscribeJdbcRegistryDataChange(new JdbcRegistryDataChangeListener() { - - @Override - public void onJdbcRegistryDataChanged(String eventPath, String value) { - if (!isPathMatch(subscribePath, eventPath)) { - return; - } - final Event event = Event.builder() - .key(subscribePath) - .path(eventPath) - .data(value) - .type(Event.Type.UPDATE) - .build(); - listener.notify(event); - } - - @Override - public void onJdbcRegistryDataDeleted(String eventPath) { - if (!isPathMatch(subscribePath, eventPath)) { - return; - } - final Event event = Event.builder() - .key(subscribePath) - .path(eventPath) - .type(Event.Type.REMOVE) - .build(); - listener.notify(event); - } - - @Override - public void onJdbcRegistryDataAdded(String eventPath, String value) { - if (!isPathMatch(subscribePath, eventPath)) { - return; - } - final Event event = Event.builder() - .key(subscribePath) - .path(eventPath) - .data(value) - .type(Event.Type.ADD) - .build(); - listener.notify(event); - } - - private boolean isPathMatch(String subscribePath, String eventPath) { - return KeyUtils.isParent(subscribePath, eventPath) || KeyUtils.isSamePath(subscribePath, eventPath); - } - }); + jdbcRegistryClient + .subscribeJdbcRegistryDataChange(new JdbcRegistryDataChangeListenerAdapter(watchedPath, listener)); } @Override diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapter.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapter.java new file mode 100644 index 000000000000..ccfb95f96391 --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryDataChangeListenerAdapter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.jdbc; + +import org.apache.dolphinscheduler.plugin.registry.jdbc.server.JdbcRegistryDataChangeListener; +import org.apache.dolphinscheduler.registry.api.Event; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; + +public class JdbcRegistryDataChangeListenerAdapter implements JdbcRegistryDataChangeListener { + + private final String watchedPath; + private final SubscribeListener listener; + + public JdbcRegistryDataChangeListenerAdapter(final String watchedPath, final SubscribeListener listener) { + this.watchedPath = watchedPath; + this.listener = listener; + } + + @Override + public void onJdbcRegistryDataChanged(String eventPath, String value) { + if (!isPathMatch(watchedPath, eventPath, listener.getSubscribeScope())) { + return; + } + final Event event = Event.builder() + .watchedPath(watchedPath) + .eventPath(eventPath) + .eventData(value) + .type(Event.Type.UPDATE) + .build(); + listener.notify(event); + } + + @Override + public void onJdbcRegistryDataDeleted(String eventPath) { + if (!isPathMatch(watchedPath, eventPath, listener.getSubscribeScope())) { + return; + } + final Event event = Event.builder() + .watchedPath(watchedPath) + .eventPath(eventPath) + .type(Event.Type.REMOVE) + .build(); + listener.notify(event); + } + + @Override + public void onJdbcRegistryDataAdded(String eventPath, String value) { + if (!isPathMatch(watchedPath, eventPath, listener.getSubscribeScope())) { + return; + } + final Event event = Event.builder() + .watchedPath(watchedPath) + .eventPath(eventPath) + .eventData(value) + .type(Event.Type.ADD) + .build(); + listener.notify(event); + } + + private boolean isPathMatch(final String subscribePath, + final String eventPath, + final SubscribeListener.SubscribeScope subscribeScope) { + switch (subscribeScope) { + case PATH_ONLY: + return KeyUtils.isSamePath(subscribePath, eventPath); + case CHILDREN_ONLY: + return KeyUtils.isParent(subscribePath, eventPath); + case ALL: + return KeyUtils.isParent(subscribePath, eventPath) + || KeyUtils.isSamePath(subscribePath, eventPath); + default: + throw new IllegalArgumentException("Invalid subscribe scope " + subscribeScope); + } + } + +} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/KeyUtils.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/KeyUtils.java index 84576243ca90..2512eb0a3fe4 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/KeyUtils.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/KeyUtils.java @@ -40,8 +40,10 @@ public static boolean isParent(final String parentPath, final String childPath) if (StringUtils.isEmpty(childPath)) { throw new IllegalArgumentException("Invalid child path " + childPath); } - final String[] parentSplit = parentPath.split(RegistryConstants.PATH_SEPARATOR); - final String[] childSplit = childPath.split(RegistryConstants.PATH_SEPARATOR); + final String[] parentSplit = removeLastSlash(parentPath).split(RegistryConstants.PATH_SEPARATOR); + final String[] childSplit = removeLastSlash(childPath).split(RegistryConstants.PATH_SEPARATOR); + // If the parent path is longer than or equals the child path, it is impossible to be the parent path of the + // child path if (parentSplit.length >= childSplit.length) { return false; } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java index 70754f6a6ed7..be059ad423b5 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java @@ -20,7 +20,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import org.apache.dolphinscheduler.registry.api.ConnectionListener; -import org.apache.dolphinscheduler.registry.api.Event; import org.apache.dolphinscheduler.registry.api.Registry; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.SubscribeListener; @@ -30,9 +29,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; -import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; @@ -142,9 +139,9 @@ public void connectUntilTimeout(@NonNull Duration timeout) throws RegistryExcept } @Override - public void subscribe(String path, SubscribeListener listener) { + public void subscribe(final String path, final SubscribeListener listener) { final TreeCache treeCache = treeCacheMap.computeIfAbsent(path, $ -> new TreeCache(client, path)); - treeCache.getListenable().addListener(($, event) -> listener.notify(new EventAdaptor(event, path))); + treeCache.getListenable().addListener(new ZookeeperTreeCacheListenerAdapter(path, listener)); try { treeCache.start(); } catch (Exception e) { @@ -305,31 +302,4 @@ public void close() { treeCacheMap.values().forEach(CloseableUtils::closeQuietly); CloseableUtils.closeQuietly(client); } - - static final class EventAdaptor extends Event { - - public EventAdaptor(TreeCacheEvent event, String key) { - key(key); - - switch (event.getType()) { - case NODE_ADDED: - type(Type.ADD); - break; - case NODE_UPDATED: - type(Type.UPDATE); - break; - case NODE_REMOVED: - type(Type.REMOVE); - break; - default: - break; - } - - final ChildData data = event.getData(); - if (data != null) { - path(data.getPath()); - data(new String(data.getData())); - } - } - } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperTreeCacheListenerAdapter.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperTreeCacheListenerAdapter.java new file mode 100644 index 000000000000..da35f21bbf0f --- /dev/null +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperTreeCacheListenerAdapter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.registry.zookeeper; + +import org.apache.dolphinscheduler.registry.api.Event; +import org.apache.dolphinscheduler.registry.api.RegistryException; +import org.apache.dolphinscheduler.registry.api.SubscribeListener; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; + +public class ZookeeperTreeCacheListenerAdapter implements TreeCacheListener { + + private final String watchedPath; + + private final SubscribeListener listener; + + public ZookeeperTreeCacheListenerAdapter(final String watchedPath, final SubscribeListener listener) { + this.listener = listener; + this.watchedPath = watchedPath; + } + + @Override + public void childEvent(final CuratorFramework curatorFramework, final TreeCacheEvent event) { + final String eventPath = event.getData().getPath(); + switch (listener.getSubscribeScope()) { + case PATH_ONLY: + if (eventPath.equals(watchedPath)) { + listener.notify(convertToEvent(event, watchedPath)); + } + break; + case CHILDREN_ONLY: + if (!eventPath.equals(watchedPath)) { + listener.notify(convertToEvent(event, watchedPath)); + } + break; + case ALL: + listener.notify(convertToEvent(event, watchedPath)); + break; + default: + throw new RegistryException("Unknown event scope: " + listener.getSubscribeScope()); + } + } + + private Event convertToEvent(TreeCacheEvent event, String watchedPath) { + + Event.Type type; + switch (event.getType()) { + case NODE_ADDED: + type = Event.Type.ADD; + break; + case NODE_UPDATED: + type = Event.Type.UPDATE; + break; + case NODE_REMOVED: + type = Event.Type.REMOVE; + break; + default: + throw new IllegalArgumentException("Unsupported event type: " + event.getType()); + } + + final ChildData data = event.getData(); + return Event.builder() + .type(type) + .watchedPath(watchedPath) + .eventPath(data.getPath()) + .eventData(new String(data.getData())) + .build(); + } +}