Skip to content

Commit

Permalink
No commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
Yury-Fridlyand committed Oct 1, 2024
1 parent 2c7ec4d commit 42f5484
Show file tree
Hide file tree
Showing 13 changed files with 868 additions and 578 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,11 @@ public abstract class BaseClientConfiguration {
*/
private final ThreadPoolResource threadPoolResource;

/**
* Serialization protocol to be used with the server. If not set, {@link ProtocolVersion#RESP3}
* will be used.
*/
private final ProtocolVersion protocol;

public abstract BaseSubscriptionConfiguration getSubscriptionConfiguration();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

/** Represents the communication protocol with the server. */
public enum ProtocolVersion {
/** Use RESP2 to communicate with the server nodes. */
RESP3,
/** Use RESP3 to communicate with the server nodes. */
RESP2
}
11 changes: 10 additions & 1 deletion java/client/src/main/java/glide/managers/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.GlideClusterClientConfiguration;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.ProtocolVersion;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConfigurationError;
import glide.connectors.handlers.ChannelHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -118,6 +120,10 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderBaseConfiguration
connectionRequestBuilder.setClientName(configuration.getClientName());
}

if (configuration.getProtocol() != null) {
connectionRequestBuilder.setProtocolValue(configuration.getProtocol().ordinal());
}

return connectionRequestBuilder;
}

Expand Down Expand Up @@ -145,7 +151,10 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderGlideClient(
}

if (configuration.getSubscriptionConfiguration() != null) {
// TODO throw ConfigurationError if RESP2
if (configuration.getProtocol() == ProtocolVersion.RESP2) {
throw new ConfigurationError(
"PubSub subscriptions require RESP3 protocol, but RESP2 was configured.");
}
var subscriptionsBuilder = PubSubSubscriptions.newBuilder();
for (var entry : configuration.getSubscriptionConfiguration().getSubscriptions().entrySet()) {
var channelsBuilder = PubSubChannelsOrPatterns.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.GlideClusterClientConfiguration;
import glide.api.models.configuration.NodeAddress;
import glide.api.models.configuration.ProtocolVersion;
import glide.api.models.configuration.ReadFrom;
import glide.api.models.configuration.ServerCredentials;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration;
Expand Down Expand Up @@ -143,6 +144,7 @@ public void connection_request_protobuf_generation_with_all_fields_set() {
.build())
.databaseId(DATABASE_ID)
.clientName(CLIENT_NAME)
.protocol(ProtocolVersion.RESP2)
.subscriptionConfiguration(
StandaloneSubscriptionConfiguration.builder()
.subscription(EXACT, gs("channel_1"))
Expand Down Expand Up @@ -176,6 +178,7 @@ public void connection_request_protobuf_generation_with_all_fields_set() {
.build())
.setDatabaseId(DATABASE_ID)
.setClientName(CLIENT_NAME)
.setProtocol(ConnectionRequestOuterClass.ProtocolVersion.RESP2)
.setPubsubSubscriptions(
PubSubSubscriptions.newBuilder()
.putAllChannelsOrPatternsByType(
Expand Down
36 changes: 17 additions & 19 deletions java/integTest/src/test/java/glide/ConnectionTests.java
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
/** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */
package glide;

import static glide.TestUtilities.commonClientConfig;
import static glide.TestUtilities.commonClusterClientConfig;

import glide.api.GlideClient;
import glide.api.models.configuration.GlideClientConfiguration;
import glide.api.models.configuration.NodeAddress;
import glide.api.GlideClusterClient;
import glide.api.models.configuration.ProtocolVersion;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

@Timeout(10) // seconds
public class ConnectionTests {

@Test
@ParameterizedTest
@EnumSource(ProtocolVersion.class)
@SneakyThrows
public void basic_client() {
public void basic_client(ProtocolVersion protocol) {
var regularClient =
GlideClient.createClient(
GlideClientConfiguration.builder()
.address(
NodeAddress.builder().port(TestConfiguration.STANDALONE_PORTS[0]).build())
.build())
.get();
GlideClient.createClient(commonClientConfig().protocol(protocol).build()).get();
regularClient.close();
}

@Test
@ParameterizedTest
@EnumSource(ProtocolVersion.class)
@SneakyThrows
public void cluster_client() {
var regularClient =
GlideClient.createClient(
GlideClientConfiguration.builder()
.address(NodeAddress.builder().port(TestConfiguration.CLUSTER_PORTS[0]).build())
.build())
public void cluster_client(ProtocolVersion protocol) {
var clusterClient =
GlideClusterClient.createClient(commonClusterClientConfig().protocol(protocol).build())
.get();
regularClient.close();
clusterClient.close();
}
}
27 changes: 27 additions & 0 deletions java/integTest/src/test/java/glide/PubSubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback;
import glide.api.models.configuration.ClusterSubscriptionConfiguration;
import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode;
import glide.api.models.configuration.ProtocolVersion;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotKeyRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotType;
import glide.api.models.configuration.StandaloneSubscriptionConfiguration;
Expand Down Expand Up @@ -280,6 +281,32 @@ private void skipTestsOnMac() {
"PubSub doesn't work on mac OS");
}

@SneakyThrows
@ParameterizedTest(name = "standalone = {0}")
@ValueSource(booleans = {true, false})
public void config_error_on_resp2(boolean standalone) {
final GlideString channel = gs(UUID.randomUUID().toString());

Map<? extends ChannelMode, Set<GlideString>> subscriptions =
standalone
? Map.of(PubSubChannelMode.EXACT, Set.of(channel))
: Map.of(PubSubClusterChannelMode.EXACT, Set.of(channel));

var exception =
assertThrows(
ConfigurationError.class,
() ->
GlideClient.createClient(
commonClientConfig()
.subscriptionConfiguration(
StandaloneSubscriptionConfiguration
.builder() /*.subscriptions(subscriptions)*/
.build())
.protocol(ProtocolVersion.RESP2)
.build()));
assertTrue(exception.getMessage().contains("PubSub subscriptions require RESP3 protocol"));
}

/** Similar to `test_pubsub_exact_happy_path` in python client tests. */
@SneakyThrows
@ParameterizedTest(name = "standalone = {0}, read messages via {1}")
Expand Down
Loading

0 comments on commit 42f5484

Please sign in to comment.