Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The lite-core module decouples ZooKeeper #2182

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions elasticjob-lite/elasticjob-lite-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
Expand All @@ -24,7 +25,7 @@
</parent>
<artifactId>elasticjob-lite-core</artifactId>
<name>${project.artifactId}</name>

<dependencies>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
Expand All @@ -41,11 +42,6 @@
<artifactId>elasticjob-registry-center-api</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-registry-center-zookeeper-curator</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-simple-executor</artifactId>
Expand All @@ -71,7 +67,7 @@
<artifactId>elasticjob-tracing-rdb</artifactId>
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand All @@ -88,12 +84,12 @@
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
Expand Down Expand Up @@ -125,5 +121,11 @@
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-registry-center-zookeeper-curator</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we consider setting it runtime? Otherwise users need to introduce it sperately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after decoupling, users are expected to actively introduce.

</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.shardingsphere.elasticjob.lite.api.registry;

import java.util.Arrays;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many unrelated changes in this PR. Please revert them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caused by reformatting code.

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;

import org.apache.curator.utils.ThreadUtils;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
Expand All @@ -28,30 +33,23 @@
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodePath;
import org.apache.shardingsphere.elasticjob.lite.internal.util.ThreadUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEvent;
import org.apache.shardingsphere.elasticjob.reg.listener.DataChangedEventListener;

import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Job instance registry.
*/
@RequiredArgsConstructor
public final class JobInstanceRegistry {

private static final Pattern JOB_CONFIG_COMPILE = Pattern.compile("/(\\w+)/config");

private final CoordinatorRegistryCenter regCenter;

private final JobInstance jobInstance;

/**
* Register.
*/
Expand All @@ -60,9 +58,9 @@ public void register() {
Executor executor = Executors.newSingleThreadExecutor(threadFactory);
regCenter.watch("/", new JobInstanceRegistryListener(), executor);
}

public class JobInstanceRegistryListener implements DataChangedEventListener {

@Override
public void onChange(final DataChangedEvent event) {
if (event.getType() != DataChangedEvent.Type.ADDED || !isJobConfigPath(event.getKey())) {
Expand All @@ -78,13 +76,13 @@ public void onChange(final DataChangedEvent event) {
new OneOffJobBootstrap(regCenter, newElasticJobInstance(jobConfig), jobConfig).execute();
}
}

private boolean isAllShardingItemsCompleted(final JobConfiguration jobConfig) {
JobNodePath jobNodePath = new JobNodePath(jobConfig.getJobName());
return IntStream.range(0, jobConfig.getShardingTotalCount())
.allMatch(each -> regCenter.isExisted(jobNodePath.getShardingNodePath(String.valueOf(each), "completed")));
.allMatch(each -> regCenter.isExisted(jobNodePath.getShardingNodePath(String.valueOf(each), "completed")));
}

private ElasticJob newElasticJobInstance(final JobConfiguration jobConfig) {
String clazz = regCenter.get(String.format("/%s", jobConfig.getJobName()));
try {
Expand All @@ -95,7 +93,7 @@ private ElasticJob newElasticJobInstance(final JobConfiguration jobConfig) {
throw new RuntimeException(String.format("new elastic job instance by class '%s' failure", clazz), ex);
}
}

private boolean isLabelMatch(final JobConfiguration jobConfig) {
if (jobConfig.getLabel() == null) {
return false;
Expand All @@ -105,7 +103,7 @@ private boolean isLabelMatch(final JobConfiguration jobConfig) {
}
return Arrays.stream(jobInstance.getLabels().split(",")).collect(Collectors.toSet()).contains(jobConfig.getLabel());
}

private boolean isJobConfigPath(final String path) {
return JOB_CONFIG_COMPILE.matcher(path).matches();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.shardingsphere.elasticjob.lite.internal.listener;

import org.apache.curator.utils.ThreadUtils;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.shardingsphere.elasticjob.lite.internal.util.ThreadUtils;

/**
* Manage listener's notify executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@
package org.apache.shardingsphere.elasticjob.lite.internal.snapshot;

import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.shardingsphere.elasticjob.lite.internal.util.SensitiveInfoUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
Expand All @@ -35,29 +28,32 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.lite.internal.util.SensitiveInfoUtils;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;

/**
* Snapshot service.
*/
@Slf4j
public final class SnapshotService {

public static final String DUMP_COMMAND = "dump@";

private final int port;

private final CoordinatorRegistryCenter regCenter;

private ServerSocket serverSocket;

private volatile boolean closed;

public SnapshotService(final CoordinatorRegistryCenter regCenter, final int port) {
Preconditions.checkArgument(port >= 0 && port <= 0xFFFF, "Port value out of range: " + port);
this.regCenter = regCenter;
this.port = port;
}

/**
* Start to listen.
*/
Expand All @@ -68,7 +64,7 @@ public void listen() {
log.error("ElasticJob: Snapshot service listen failure, error is: ", ex);
}
}

private int openSocket(final int port) throws IOException {
closed = false;
serverSocket = new ServerSocket(port);
Expand All @@ -88,16 +84,16 @@ private int openSocket(final int port) throws IOException {
}, threadName).start();
return localPort;
}

private boolean isIgnoredException() {
return serverSocket.isClosed();
}

private void process(final Socket socket) throws IOException {
try (
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
Socket ignored = socket) {
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
Socket ignored = socket) {
String cmdLine = reader.readLine();
if (null != cmdLine && cmdLine.startsWith(DUMP_COMMAND) && cmdLine.split("@").length == 2) {
String jobName = cmdLine.split("@")[1];
Expand All @@ -106,45 +102,32 @@ private void process(final Socket socket) throws IOException {
}
}
}
private void dumpDirectly(final String path, final String jobName, final List<String> result) {

private void dumpDirectly(final String path, final List<String> result) {
for (String each : regCenter.getChildrenKeys(path)) {
String zkPath = path + "/" + each;
String zkValue = Optional.ofNullable(regCenter.get(zkPath)).orElse("");
String cachePath = zkPath;
String cacheValue = zkValue;
// TODO Decoupling ZooKeeper
if (regCenter instanceof ZookeeperRegistryCenter) {
CuratorCache cache = (CuratorCache) regCenter.getRawCache("/" + jobName);
if (null != cache) {
Optional<ChildData> cacheData = cache.get(zkPath);
cachePath = cacheData.map(ChildData::getPath).orElse("");
cacheValue = cacheData.map(ChildData::getData).map(String::new).orElse("");
}
}
if (zkValue.equals(cacheValue) && zkPath.equals(cachePath)) {
result.add(String.join(" | ", zkPath, zkValue));
} else {
result.add(String.join(" | ", zkPath, zkValue, cachePath, cacheValue));
}
dumpDirectly(zkPath, jobName, result);
String childrenPath = path + "/" + each;
String childrenValue = Optional.ofNullable(regCenter.get(childrenPath)).orElse("");
result.add(String.join(" | ", childrenPath, childrenValue));
dumpDirectly(childrenPath, result);
}
}

/**
* Dump job.
*
* @param jobName job's name
* @return dump job's info
*/
public String dumpJobDirectly(final String jobName) {
String path = "/" + jobName;
final List<String> result = new ArrayList<>();
dumpDirectly(path, jobName, result);
dumpDirectly(path, result);
return String.join("\n", SensitiveInfoUtils.filterSensitiveIps(result)) + "\n";
}

/**
* Dump job.
*
* @param instanceIp job instance ip addr
* @param dumpPort dump port
* @param jobName job's name
Expand All @@ -153,9 +136,9 @@ public String dumpJobDirectly(final String jobName) {
*/
public static String dumpJob(final String instanceIp, final int dumpPort, final String jobName) throws IOException {
try (
Socket socket = new Socket(instanceIp, dumpPort);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()))
Socket socket = new Socket(instanceIp, dumpPort);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()))
) {
writer.write(DUMP_COMMAND + jobName);
writer.newLine();
Expand All @@ -173,7 +156,7 @@ private void outputMessage(final BufferedWriter outputWriter, final String msg)
outputWriter.append(msg);
outputWriter.flush();
}

/**
* Close listener.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.elasticjob.lite.internal.util;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ThreadFactory;
import lombok.extern.slf4j.Slf4j;

/**
* Thread utility.
*/
@Slf4j
public final class ThreadUtils {

private ThreadUtils() {

}

/**
* Create a new generic thread factory instance.
*
* @param processName Process thread name prefix.
* @return Return generic thread factory instance.
*/
public static ThreadFactory newGenericThreadFactory(final String processName) {
Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (t, e) -> {
log.error("Unexpected exception in thread: " + t, e);
Throwables.throwIfUnchecked(e);
};
return new ThreadFactoryBuilder()
.setNameFormat(processName + "-%d")
.setDaemon(true)
.setUncaughtExceptionHandler(uncaughtExceptionHandler)
.build();
}
}
Loading