Skip to content

Commit

Permalink
Remove avro deserialization fallback ratelimit removed (allegro#1541)
Browse files Browse the repository at this point in the history
* Remove deserialization fallback using any schema

* Check if schemaId exists before avro message deserialization

* CR fixes

* Do some names refactoring

* Improve refreshing schemas by versions and id

* Remove unnecessary imports

* Add test for ratelimit using schema id checking

* Add some cleanup

* Add some cleanup

* Improve retrying deserialization when schema is missing in consumer process

* Add some code cleanup

* Add some code cleanup

* Remove ratelimiter for schema-registry

* Remove unnecessary gradle package
  • Loading branch information
faderskd authored Jul 20, 2022
1 parent 76991c1 commit d38c050
Show file tree
Hide file tree
Showing 23 changed files with 317 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,6 @@ public enum Configs {
SCHEMA_REPOSITORY_SERVER_URL("schema.repository.serverUrl", "http://localhost:8888/"),
SCHEMA_REPOSITORY_HTTP_READ_TIMEOUT_MS("schema.repository.http.read.timeout.ms", 2000),
SCHEMA_REPOSITORY_HTTP_CONNECT_TIMEOUT_MS("schema.repository.http.connect.timeout.ms", 2000),
SCHEMA_REPOSITORY_ONLINE_CHECK_PERMITS_PER_SECOND("schema.repository.online.check.permits.per.second", 100.0),
SCHEMA_REPOSITORY_ONLINE_CHECK_ACQUIRE_WAIT_MS("schema.repository.online.check.acquire.wait.ms", 500),
SCHEMA_REPOSITORY_SUBJECT_SUFFIX_ENABLED("schema.repository.subject.suffix.enabled", false),
SCHEMA_REPOSITORY_SUBJECT_NAMESPACE_ENABLED("schema.repository.subject.namespace.enabled", false),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ public CompositeMessageContentWrapper(JsonMessageContentWrapper jsonMessageConte
AvroMessageSchemaIdAwareContentWrapper schemaIdAwareContentWrapper,
AvroMessageHeaderSchemaVersionContentWrapper headerSchemaVersionContentWrapper,
AvroMessageHeaderSchemaIdContentWrapper headerSchemaIdContentWrapper,
AvroMessageAnySchemaVersionContentWrapper anySchemaVersionContentWrapper,
AvroMessageSchemaVersionTruncationContentWrapper schemaVersionTruncationContentWrapper) {

this.jsonMessageContentWrapper = jsonMessageContentWrapper;
this.avroMessageContentWrapper = avroMessageContentWrapper;
this.avroMessageContentUnwrappers =
asList(schemaIdAwareContentWrapper, schemaVersionTruncationContentWrapper, headerSchemaVersionContentWrapper, headerSchemaIdContentWrapper, anySchemaVersionContentWrapper);
asList(schemaIdAwareContentWrapper, schemaVersionTruncationContentWrapper, headerSchemaVersionContentWrapper, headerSchemaIdContentWrapper);
}

public UnwrappedMessageContent unwrapJson(byte[] data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ public Counter errorsForSchemaIdAwarePayload() {
return metricRegistry.counter(name(deserializationErrorsPath(), "payloadWithSchemaId"));
}

public Counter errorsForAnySchemaVersion() {
return metricRegistry.counter(name(deserializationErrorsPath(), "anySchemaVersion"));
}

public Counter errorsForAnyOnlineSchemaVersion() {
return metricRegistry.counter(name(deserializationErrorsPath(), "anyOnlineSchemaVersion"));
}

public Counter errorsForSchemaVersionTruncation() {
return metricRegistry.counter(name(deserializationErrorsPath(), "schemaVersionTruncation"));
}
Expand All @@ -52,10 +44,6 @@ public Counter usingSchemaIdAware() {
return metricRegistry.counter(name(deserializationPath(), "using", "schemaIdAware"));
}

public Counter usingAnySchemaVersion() {
return metricRegistry.counter(name(deserializationPath(), "using", "anySchemaVersion"));
}

public Counter usingSchemaVersionTruncation() {
return metricRegistry.counter(name(deserializationPath(), "using", "schemaVersionTruncation"));
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@

import java.util.List;

class SchemaCacheRefresherCallback implements TopicCallback {
class SchemaCacheRefresherCallback<T> implements TopicCallback {

private static final Logger logger = LoggerFactory.getLogger(SchemaVersionsRepositoryFactory.class);

public static final boolean REFRESH_ONLINE = true;

private final CachedSchemaVersionsRepository schemaVersionsRepository;
private final CachedCompiledSchemaRepository compiledSchemaRepository;
private final CachedCompiledSchemaRepository<T> compiledSchemaRepository;

public SchemaCacheRefresherCallback(CachedSchemaVersionsRepository schemaVersionsRepository,
CachedCompiledSchemaRepository compiledSchemaRepository) {
CachedCompiledSchemaRepository<T> compiledSchemaRepository) {
this.schemaVersionsRepository = schemaVersionsRepository;
this.compiledSchemaRepository = compiledSchemaRepository;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public SchemaVersionsRepository provide() {
configFactory.getIntProperty(Configs.SCHEMA_CACHE_EXPIRE_AFTER_WRITE_MINUTES));

notificationsBus.registerTopicCallback(
new SchemaCacheRefresherCallback(
new SchemaCacheRefresherCallback<>(
cachedSchemaVersionsRepository,
(CachedCompiledSchemaRepository<?>) compiledSchemaRepository));

Expand Down
Loading

0 comments on commit d38c050

Please sign in to comment.