Skip to content

Commit

Permalink
feature: support select (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Jan 15, 2025
1 parent 3b20aaa commit ec7da47
Show file tree
Hide file tree
Showing 34 changed files with 300 additions and 135 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ Redis 3.x - latest

Details can be found here: [redispike-proxy/src/test/java/icu/funkye/redispike/ServerTest.java at main · funky-eyes/redispike-proxy (github.com)](https://github.com/funky-eyes/redispike-proxy/blob/main/src/test/java/icu/funkye/redispike/ServerTest.java)

| feature | support | note |
|----------|--------------------------------------------------------------------------------------------------------------|----------------------------------------------------------|
| String | done | |
| Hash | done | hsetnx only supports the key level, not the column level |
| Scan | | |
| List | | |
| Set | scard done<br/>srem done <br/>sadd done<br/>spop done<br/>smembers done <br/>srandmember done<br/>other wait | |
| ZSet | wait | |
| keys | done | |
| pipeline | done | |
| feature | support | note |
|----------|--------------------------------------------------------------------------------------------------------------|------|
| String | done | |
| Hash | done | |
| Scan | | |
| List | | |
| Set | scard done<br/>srem done <br/>sadd done<br/>spop done<br/>smembers done <br/>srandmember done<br/>other wait | |
| ZSet | wait | |
| keys | done | |
| pipeline | done | |
| select | done | |

### Performance Test Report
aerospike 3.x 2c4g redispike-proxy 2c4g:
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/icu/funkye/redispike/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.aerospike.client.policy.ClientPolicy;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.ProtocolManager;
import icu.funkye.redispike.conts.RedisConstants;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void start(String... args) throws ParseException {
String targetUser = cl.getOptionValue("TU", null);
String targetPassword = cl.getOptionValue("TP", null);
AeroSpikeClientFactory.namespace = cl.getOptionValue("n", "test");
AeroSpikeClientFactory.set = cl.getOptionValue("s", "demoset");
AeroSpikeClientFactory.set = cl.getOptionValue("s", "0");
Host[] hosts = Host.parseHosts(host, targetPort);
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.user = targetUser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,22 @@
*/
package icu.funkye.redispike.common;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventProcessor;
import icu.funkye.redispike.conts.RedisConstants;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import org.junit.jupiter.api.Assertions;

/**
* ConnectionEventProcessor for ConnectionEventType.CONNECT
*
* @author xiaomin.cxm
* @version $Id: CONNECTEventProcessor.java, v 0.1 Apr 8, 2016 10:58:48 AM xiaomin.cxm Exp $
* @author [email protected]
*/
public class CONNECTEventProcessor implements ConnectionEventProcessor {

private AtomicBoolean connected = new AtomicBoolean();
private AtomicInteger connectTimes = new AtomicInteger();
private Connection connection;
private String remoteAddr;
private CountDownLatch latch = new CountDownLatch(1);

@Override
public void onEvent(String remoteAddr, Connection conn) {
Assertions.assertNotNull(remoteAddr);
doCheckConnection(conn);
this.remoteAddr = remoteAddr;
this.connection = conn;
connected.set(true);
connectTimes.incrementAndGet();
latch.countDown();
conn.setAttribute(RedisConstants.REDIS_DB, AeroSpikeClientFactory.set);
}

/**
Expand All @@ -61,30 +46,4 @@ private void doCheckConnection(Connection conn) {
Assertions.assertNotNull(conn.getUrl());
Assertions.assertNotNull(conn.getChannel().attr(Connection.CONNECTION).get());
}

public boolean isConnected() throws InterruptedException {
latch.await();
return this.connected.get();
}

public int getConnectTimes() throws InterruptedException {
latch.await();
return this.connectTimes.get();
}

public Connection getConnection() throws InterruptedException {
latch.await();
return this.connection;
}

public String getRemoteAddr() throws InterruptedException {
latch.await();
return this.remoteAddr;
}

public void reset() {
this.connectTimes.set(0);
this.connected.set(false);
this.connection = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,18 @@
*/
package icu.funkye.redispike.common;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventProcessor;
import org.junit.jupiter.api.Assertions;

/**
* ConnectionEventProcessor for ConnectionEventType.CLOSE
*
* @author xiaomin.cxm
* @version $Id: DISCONNECTEventProcessor.java, v 0.1 Apr 8, 2016 10:58:48 AM xiaomin.cxm Exp $
* @author [email protected]
*/
public class DISCONNECTEventProcessor implements ConnectionEventProcessor {

private AtomicBoolean dicConnected = new AtomicBoolean();
private AtomicInteger disConnectTimes = new AtomicInteger();

@Override
public void onEvent(String remoteAddr, Connection conn) {
Assertions.assertNotNull(conn);
dicConnected.set(true);
disConnectTimes.incrementAndGet();
}

public boolean isDisConnected() {
return this.dicConnected.get();
}

public int getDisConnectTimes() {
return this.disConnectTimes.get();
}

public void reset() {
this.disConnectTimes.set(0);
this.dicConnected.set(false);
}
}
28 changes: 28 additions & 0 deletions src/main/java/icu/funkye/redispike/conts/RedisConstants.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package icu.funkye.redispike.conts;

/**
* @author [email protected]
*/
public interface RedisConstants {

String REDIS_DB = "db";

String REDIS_SUCCESS_RESULT = "OK";

}
21 changes: 10 additions & 11 deletions src/main/java/icu/funkye/redispike/handler/RedisCommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import icu.funkye.redispike.handler.process.impl.AuthRequestProcessor;
import icu.funkye.redispike.handler.process.impl.GetRequestProcessor;
import icu.funkye.redispike.handler.process.impl.NotSupportProcessor;
import icu.funkye.redispike.handler.process.impl.SelectProcessor;
import icu.funkye.redispike.handler.process.impl.hash.HDelRequestProcessor;
import icu.funkye.redispike.handler.process.impl.hash.HExistsRequestProcessor;
import icu.funkye.redispike.handler.process.impl.hash.HGetAllRequestProcessor;
Expand Down Expand Up @@ -100,16 +101,12 @@ public RedisCommandHandler() {
processorMap.put(hValsRequestProcessor.getCmdCode().value(), hValsRequestProcessor);
HIncrbyRequestProcessor hIncrbyRequestProcessor = new HIncrbyRequestProcessor();
processorMap.put(hIncrbyRequestProcessor.getCmdCode().value(), hIncrbyRequestProcessor);
HIncrbyfloatRequestProcessor hIncrbyfloatRequestProcessor = new HIncrbyfloatRequestProcessor();
processorMap.put(hIncrbyfloatRequestProcessor.getCmdCode().value(), hIncrbyfloatRequestProcessor);
HLenRequestProcessor hLenRequestProcessor = new HLenRequestProcessor();
processorMap.put(hLenRequestProcessor.getCmdCode().value(), hLenRequestProcessor);
HKeysRequestProcessor hKeysRequestProcessor = new HKeysRequestProcessor();
registryProcessor(hKeysRequestProcessor);
NotSupportProcessor notSupportProcessor = new NotSupportProcessor();
registryProcessor(notSupportProcessor);
AuthRequestProcessor authRequestProcessor = new AuthRequestProcessor();
registryProcessor(authRequestProcessor);
registryProcessor(new HIncrbyfloatRequestProcessor());
registryProcessor(new HLenRequestProcessor());
registryProcessor(new HKeysRequestProcessor());
registryProcessor(new NotSupportProcessor());
registryProcessor(new AuthRequestProcessor());
registryProcessor(new SelectProcessor());
}

private void registryProcessor(RedisRequestProcessor<?> processor) {
Expand All @@ -135,7 +132,9 @@ private void processSingleRequest(RemotingContext ctx, Object object) {
processorMap.get(request.getCmdCode().value()).process(ctx, request, getDefaultExecutor());
} catch (Exception e) {
logger.error(e.getMessage(), e);
ctx.writeAndFlush(new BulkResponse());
BulkResponse bulkResponse = new BulkResponse();
bulkResponse.setError(e.getMessage());
ctx.writeAndFlush(bulkResponse);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.conts.RedisConstants;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
Expand All @@ -39,7 +40,7 @@ public AuthRequestProcessor() {
public void handle(RemotingContext ctx, AuthRequest request) {
if (AeroSpikeClientFactory.originClientPolicy.password == null
|| Objects.equals(AeroSpikeClientFactory.originClientPolicy.password, request.getPassword())) {
request.setResponse("OK");
request.setResponse(RedisConstants.REDIS_SUCCESS_RESULT);
} else {
request.setErrorResponse("ERR Client sent AUTH, but no password is set");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package icu.funkye.redispike.handler.process.impl;

import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.conts.RedisConstants;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
import icu.funkye.redispike.protocol.request.CommandRequest;
Expand All @@ -30,7 +31,7 @@ public CommandRequestProcessor() {

@Override
public void handle(RemotingContext ctx, CommandRequest request) {
request.setResponse("OK");
request.setResponse(RedisConstants.REDIS_SUCCESS_RESULT);
write(ctx, request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package icu.funkye.redispike.handler.process.impl;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -25,6 +26,7 @@
import com.aerospike.client.Key;
import com.aerospike.client.listener.DeleteListener;
import com.alipay.remoting.RemotingContext;
import icu.funkye.redispike.conts.RedisConstants;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
Expand All @@ -41,7 +43,9 @@ public DelRequestProcessor() {
public void handle(RemotingContext ctx, DelRequest request) {
List<String> keys = request.getKey();
List<Key> list =
keys.stream().map(key -> new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, key))
keys.stream().map(key -> new Key(AeroSpikeClientFactory.namespace, Optional.ofNullable(ctx.getConnection().getAttribute(
RedisConstants.REDIS_DB))
.orElseGet(() -> AeroSpikeClientFactory.set).toString(), key))
.collect(Collectors.toList());
CountDownLatch countDownLatch = new CountDownLatch(list.size());
for (Key key : list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
package icu.funkye.redispike.handler.process.impl;

import java.util.Optional;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
import com.aerospike.client.listener.RecordListener;
import com.alipay.remoting.RemotingContext;
import com.alipay.sofa.common.utils.StringUtil;
import icu.funkye.redispike.conts.RedisConstants;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
Expand All @@ -36,15 +38,17 @@ public GetRequestProcessor() {

@Override
public void handle(RemotingContext ctx, GetRequest request) {
Key key = new Key(AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set, request.getKey());
Key key = new Key(AeroSpikeClientFactory.namespace, Optional.ofNullable(ctx.getConnection().getAttribute(
RedisConstants.REDIS_DB))
.orElseGet(() -> AeroSpikeClientFactory.set).toString(), request.getKey());
client.get(AeroSpikeClientFactory.eventLoops.next(), new RecordListener() {
@Override
public void onSuccess(Key key, Record record) {
if (record == null) {
write(ctx, request);
return;
}
String value = record.getString(" ");
String value = record.getString("");
if (StringUtil.isNotBlank(value)) {
request.setResponse(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package icu.funkye.redispike.handler.process.impl;

import java.util.Optional;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Key;
import com.aerospike.client.Record;
Expand All @@ -24,6 +25,7 @@
import com.alipay.remoting.RemotingContext;
import com.alipay.remoting.util.StringUtils;

import icu.funkye.redispike.conts.RedisConstants;
import icu.funkye.redispike.factory.AeroSpikeClientFactory;
import icu.funkye.redispike.handler.process.AbstractRedisRequestProcessor;
import icu.funkye.redispike.protocol.RedisRequestCommandCode;
Expand Down Expand Up @@ -92,7 +94,8 @@ public void onFailure(AerospikeException exception) {
logger.error(exception.getMessage(), exception);
write(ctx, request);
}
}, scanPolicy, AeroSpikeClientFactory.namespace, AeroSpikeClientFactory.set);
}, scanPolicy, AeroSpikeClientFactory.namespace, Optional.ofNullable(ctx.getConnection().getAttribute(
RedisConstants.REDIS_DB))
.orElseGet(() -> AeroSpikeClientFactory.set).toString());
}

}
Loading

0 comments on commit ec7da47

Please sign in to comment.