Skip to content

Commit

Permalink
[BUG] Missed inheriting code caused failure (#795)
Browse files Browse the repository at this point in the history
* removing .NetCoreApp3.1 build components
  • Loading branch information
scottf authored Jul 12, 2023
1 parent 6dc24d5 commit 7a773e5
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 90 deletions.
76 changes: 38 additions & 38 deletions az-templates/stage-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ stages:
projects: 'src/*.sln'
arguments: '-c $(BuildConfiguration) --no-incremental --nologo -p:TreatWarningsAsErrors=true -p:Version=$(SemVer) -p:InformationalVersion=$(InfoVer)'

- task: VSTest@2
displayName: 'UnitTests .NetCoreApp3.1'
inputs:
testSelector: 'testAssemblies'
testAssemblyVer2: 'src/Tests/UnitTests/bin/$(BuildConfiguration)/netcoreapp3.1/UnitTests.dll'
configuration: $(BuildConfiguration)
testRunTitle: 'UnitTests .NetCoreApp3.1'
# - task: VSTest@2
# displayName: 'UnitTests .NetCoreApp3.1'
# inputs:
# testSelector: 'testAssemblies'
# testAssemblyVer2: 'src/Tests/UnitTests/bin/$(BuildConfiguration)/netcoreapp3.1/UnitTests.dll'
# configuration: $(BuildConfiguration)
# testRunTitle: 'UnitTests .NetCoreApp3.1'

- task: VSTest@2
displayName: 'UnitTests .Net4.6'
Expand All @@ -46,17 +46,17 @@ stages:
Write-Host "Appending nats-server directory $nats to path."
Write-Host "##vso[task.setvariable variable=PATH;]${env:PATH};$nats"
- task: VSTest@2
displayName: 'IntegrationTests .NetCoreApp3.1'
env:
GODEBUG: x509sha1=1
inputs:
testSelector: 'testAssemblies'
testAssemblyVer2: 'src/Tests/IntegrationTests/bin/$(BuildConfiguration)/netcoreapp3.1/IntegrationTests.dll'
configuration: $(BuildConfiguration)
rerunFailedTests: True
rerunMaxAttempts: 2
testRunTitle: 'IntegrationTests .NetCoreApp3.1'
# - task: VSTest@2
# displayName: 'IntegrationTests .NetCoreApp3.1'
# env:
# GODEBUG: x509sha1=1
# inputs:
# testSelector: 'testAssemblies'
# testAssemblyVer2: 'src/Tests/IntegrationTests/bin/$(BuildConfiguration)/netcoreapp3.1/IntegrationTests.dll'
# configuration: $(BuildConfiguration)
# rerunFailedTests: True
# rerunMaxAttempts: 2
# testRunTitle: 'IntegrationTests .NetCoreApp3.1'

- task: VSTest@2
displayName: 'IntegrationTests .Net4.6'
Expand All @@ -70,17 +70,17 @@ stages:
rerunMaxAttempts: 2
testRunTitle: 'IntegrationTests .Net4.6'

- task: VSTest@2
displayName: 'IntegrationTests Internal .NetCoreApp3.1'
env:
GODEBUG: x509sha1=1
inputs:
testSelector: 'testAssemblies'
testAssemblyVer2: 'src/Tests/IntegrationTestsInternal/bin/$(BuildConfiguration)/netcoreapp3.1/IntegrationTestsInternal.dll'
configuration: $(BuildConfiguration)
rerunFailedTests: True
rerunMaxAttempts: 2
testRunTitle: 'IntegrationTests Internal .NetCoreApp3.1'
# - task: VSTest@2
# displayName: 'IntegrationTests Internal .NetCoreApp3.1'
# env:
# GODEBUG: x509sha1=1
# inputs:
# testSelector: 'testAssemblies'
# testAssemblyVer2: 'src/Tests/IntegrationTestsInternal/bin/$(BuildConfiguration)/netcoreapp3.1/IntegrationTestsInternal.dll'
# configuration: $(BuildConfiguration)
# rerunFailedTests: True
# rerunMaxAttempts: 2
# testRunTitle: 'IntegrationTests Internal .NetCoreApp3.1'

- task: VSTest@2
displayName: 'IntegrationTests Internal .Net4.6'
Expand All @@ -94,15 +94,15 @@ stages:
rerunMaxAttempts: 2
testRunTitle: 'IntegrationTests Internal .Net4.6'

- task: VSTest@2
displayName: 'IntegrationTests Using Nito AsyncEx .NetCoreApp3.1'
inputs:
testSelector: 'testAssemblies'
testAssemblyVer2: 'src/Tests/IntegrationTestsUsingNitoAsyncEx/bin/$(BuildConfiguration)/netcoreapp3.1/IntegrationTestsUsingNitoAsyncEx.dll'
configuration: $(BuildConfiguration)
rerunFailedTests: True
rerunMaxAttempts: 2
testRunTitle: 'IntegrationTests Using Nito AsyncEx .NetCoreApp3.1'
# - task: VSTest@2
# displayName: 'IntegrationTests Using Nito AsyncEx .NetCoreApp3.1'
# inputs:
# testSelector: 'testAssemblies'
# testAssemblyVer2: 'src/Tests/IntegrationTestsUsingNitoAsyncEx/bin/$(BuildConfiguration)/netcoreapp3.1/IntegrationTestsUsingNitoAsyncEx.dll'
# configuration: $(BuildConfiguration)
# rerunFailedTests: True
# rerunMaxAttempts: 2
# testRunTitle: 'IntegrationTests Using Nito AsyncEx .NetCoreApp3.1'

- task: DotNetCoreCLI@2
displayName: 'Pack Nupkg'
Expand Down
43 changes: 3 additions & 40 deletions src/NATS.Client/JetStream/JetStreamPushAsyncSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,12 @@

namespace NATS.Client.JetStream
{
public class JetStreamPushAsyncSubscription : AsyncSubscription, IJetStreamPushAsyncSubscription
public class JetStreamPushAsyncSubscription : JetStreamAbstractAsyncSubscription, IJetStreamPushAsyncSubscription
{
internal readonly MessageManager messageManager;
internal string _consumer;

// properties of IJetStreamSubscription
public JetStream Context { get; }
public string Stream { get; }
public string Consumer => _consumer;
public string DeliverSubject { get; }

internal JetStreamPushAsyncSubscription(Connection conn, string subject, string queue,
JetStream js, string stream, string consumer, string deliver, MessageManager messageManager)
: base(conn, subject, queue)
{
Context = js;
Stream = stream;
_consumer = consumer; // might be null, JetStream will call UpdateConsumer later
DeliverSubject = deliver;

this.messageManager = messageManager;
messageManager.Startup(this);
}

internal void UpdateConsumer(string consumer)
{
_consumer = consumer;
}

public ConsumerInfo GetConsumerInformation() => Context.LookupConsumerInfo(Stream, Consumer);

: base(conn, subject, queue, js, stream, consumer, deliver, messageManager) {}

public bool IsPullMode() => false;

public override void Unsubscribe()
{
messageManager.Shutdown();
base.Unsubscribe();
}

internal override void close()
{
messageManager.Shutdown();
base.close();
}
}
}
34 changes: 24 additions & 10 deletions src/NATS.Client/KeyValue/KeyValueWatchSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,31 @@ public class KeyValueWatchSubscription : IDisposable
{
private IJetStreamPushAsyncSubscription sub;
private readonly InterlockedBoolean endOfDataSent;
private readonly object subLock;

public KeyValueWatchSubscription(KeyValue kv, string keyPattern,
IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions)
{
subLock = new object();
string subscribeSubject = kv.ReadSubject(keyPattern);

// figure out the result options
bool headersOnly = false;
bool includeDeletes = true;
DeliverPolicy deliverPolicy = DeliverPolicy.LastPerSubject;
foreach (KeyValueWatchOption wo in watchOptions) {
switch (wo) {
foreach (KeyValueWatchOption wo in watchOptions)
{
switch (wo)
{
case KeyValueWatchOption.MetaOnly: headersOnly = true; break;
case KeyValueWatchOption.IgnoreDelete: includeDeletes = false; break;
case KeyValueWatchOption.UpdatesOnly: deliverPolicy = DeliverPolicy.New; break;
case KeyValueWatchOption.IncludeHistory: deliverPolicy = DeliverPolicy.All; break;
}
}

if (deliverPolicy == DeliverPolicy.New
|| kv._getLast(subscribeSubject) == null) {
if (deliverPolicy == DeliverPolicy.New || kv._getLast(subscribeSubject) == null)
{
endOfDataSent = new InterlockedBoolean(true);
watcher.EndOfData();
}
Expand Down Expand Up @@ -91,13 +95,23 @@ public KeyValueWatchSubscription(KeyValue kv, string keyPattern,

public void Unsubscribe()
{
try
lock (subLock)
{
sub?.Unsubscribe();
}
finally
{
sub = null;
if (sub != null)
{
try
{
sub?.Unsubscribe();
}
catch (Exception)
{
// ignore all exceptions, nothing we can really do.
}
finally
{
sub = null;
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/Tests/IntegrationTests/TestJetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -941,10 +941,11 @@ public async Task TestMaxPayloadJs()

Options opts = ConnectionFactory.GetDefaultOptions();
opts.AllowReconnect = false;
opts.Url = $"nats://127.0.0.1:{Context.Server1.Port}";
NATSServer.QuietOptionsModifier.Invoke(opts);

ulong expectedSeq = 0;
using (NATSServer.CreateJetStreamFast())
using (NATSServer.CreateJetStreamFast(Context.Server1.Port))
{
using (var c = Context.ConnectionFactory.CreateConnection(opts))
{
Expand Down
44 changes: 43 additions & 1 deletion src/Tests/IntegrationTests/TestKeyValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,49 @@ private void _testMirror(IKeyValue okv, IKeyValue mkv, int num) {
}
ValidateWatcher(new Object[]{"bb0", "aaa" + num, KeyValueOperation.Delete}, oWatcher);
}
}
}

[Fact]
public void TestDontGetNoResponders()
{
const int NUM_MESSAGES = 1000;

Context.RunInJsServer(c =>
{
// get the kv management context
IKeyValueManagement kvm = c.CreateKeyValueManagementContext();

// create the bucket
kvm.Create(KeyValueConfiguration.Builder()
.WithName(BUCKET)
.WithMaxHistoryPerKey(10)
.WithStorageType(StorageType.Memory)
.Build());

IKeyValue kvContext = c.CreateKeyValueContext(BUCKET);

for (int i = 0; i < NUM_MESSAGES; i++)
{
kvContext.Put(i.ToString(), i.ToString());
}

TestKeyValueWatcher watcher = new TestKeyValueWatcher(true);

var watch = kvContext.Watch(">", watcher, watcher.WatchOptions);

int count = 0;
while (watcher.EndOfDataReceived == 0 && count < 100)
{
Thread.Sleep(10);
count++;
}

Assert.True(watcher.EndOfDataReceived == 1);
Assert.True(watcher.Entries.Count == 1000);

watch.Unsubscribe();
});
}
}

class TestKeyValueWatcher : IKeyValueWatcher
Expand Down

0 comments on commit 7a773e5

Please sign in to comment.