-
Notifications
You must be signed in to change notification settings - Fork 151
/
Copy pathFetchBytesExample.cs
170 lines (151 loc) · 7.25 KB
/
FetchBytesExample.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Diagnostics;
using NATS.Client;
using NATS.Client.JetStream;
namespace NATSExamples
{
internal static class FetchBytesExample
{
private static readonly string STREAM = "fetch-bytes-stream";
private static readonly string SUBJECT = "fetch-bytes-subject";
private static readonly string MESSAGE_TEXT = "fetch-bytes";
private static readonly string CONSUMER_NAME_PREFIX = "fetch-bytes-consumer";
private static readonly int MESSAGES = 20;
private static readonly int EXPIRES_SECONDS = 2;
public static string SERVER = "nats://localhost:4222";
public static void Main(string[] args)
{
Options opts = ConnectionFactory.GetDefaultOptions(SERVER);
using (IConnection c = new ConnectionFactory().CreateConnection(opts))
{
// bytes don't work before server v2.9.1
if (c.ServerInfo.IsOlderThanVersion("2.9.1"))
{
return;
}
IJetStreamManagement jsm = c.CreateJetStreamManagementContext();
IJetStream js = c.CreateJetStreamContext();
// set's up the stream and publish data
JsUtils.CreateOrReplaceStream(jsm, STREAM, SUBJECT);
JsUtils.Publish(js, SUBJECT, MESSAGE_TEXT, MESSAGES, false);
// Different max bytes sizes demonstrate expiration behavior
// A. max bytes is reached before message count
// Each test message consumeByteCount is 138
simpleFetch(c, js, "A", 0, 1000);
// B. fetch max messages is reached before byte count
// Each test message consumeByteCount is 131 or 134
simpleFetch(c, js, "B", 10, 2000);
// C. fewer bytes available than the byte count
// Each test message consumeByteCount is 138, 140 or 141
simpleFetch(c, js, "C", 0, 4000);
}
}
private static void simpleFetch(IConnection c, IJetStream js, string label, int maxMessages, int maxBytes)
{
string consumerName = generateConsumerName(maxMessages, maxBytes);
// get stream context, create consumer and get the consumer context
IStreamContext streamContext;
IConsumerContext consumerContext;
try
{
streamContext = c.GetStreamContext(STREAM);
streamContext.CreateOrUpdateConsumer(ConsumerConfiguration.Builder().WithDurable(consumerName).Build());
consumerContext = js.GetConsumerContext(STREAM, consumerName);
}
catch (Exception) {
// possible exceptions
// - a connection problem
// - the stream or consumer did not exist
return;
}
// Custom FetchConsumeOptions
FetchConsumeOptions.FetchConsumeOptionsBuilder builder
= FetchConsumeOptions.Builder().WithExpiresIn(EXPIRES_SECONDS * 1000);
if (maxMessages == 0)
{
builder.WithMaxBytes(maxBytes);
}
else
{
builder.WithMax(maxBytes, maxMessages);
}
FetchConsumeOptions fetchConsumeOptions = builder.Build();
printExplanation(label, consumerName, maxMessages, maxBytes);
// create the consumer then use it
// note: no need to catch NATSTimeoutException because the NextMessage will return null
try
{
int receivedMessages = 0;
long receivedBytes = 0;
Stopwatch sw = new Stopwatch();
IFetchConsumer consumer = consumerContext.Fetch(fetchConsumeOptions);
Msg msg = consumer.NextMessage();
while (msg != null) {
msg.Ack();
receivedMessages++;
receivedBytes += msg.ConsumeByteCount;
if (receivedBytes >= maxBytes || receivedMessages == maxMessages) {
msg = null;
}
else {
msg = consumer.NextMessage();
}
}
sw.Stop();
printSummary(receivedMessages, receivedBytes, sw.ElapsedMilliseconds);
}
catch (NATSJetStreamStatusException)
{
// Either the consumer was deleted in the middle
// of the pull or there is a new status from the
// server that this client is not aware of
}
}
private static string generateConsumerName(int maxMessages, int maxBytes)
{
if (maxMessages == 0)
{
return CONSUMER_NAME_PREFIX + "-" + maxBytes + "-bytes-unlimited-messages";
}
return CONSUMER_NAME_PREFIX + "-" + maxBytes + "-bytes-" + maxMessages + "-messages";
}
private static void printSummary(int receivedMessages, long receivedBytes, long elapsed)
{
Console.WriteLine("+++ " + receivedBytes + "/" + receivedMessages + " bytes/message(s) were received in " +
elapsed + "ms\n");
}
private static void printExplanation(string label, string name, int maxMessages, int maxBytes)
{
Console.WriteLine("--------------------------------------------------------------------------------");
Console.WriteLine(label + ". " + name);
switch (label)
{
case "A":
Console.WriteLine("=== Max bytes (" + maxBytes + ") threshold will be met since the next message would put the byte count over " + maxBytes + " bytes");
Console.WriteLine("=== nextMessage() will return null when consume is done.");
break;
case "B":
Console.WriteLine("=== Fetch max messages (" + maxMessages + ") will be reached before max bytes (" + maxBytes + ")");
Console.WriteLine("=== nextMessage() will return null when consume is done.");
break;
case "C":
Console.WriteLine("=== Max bytes (" + maxBytes + ") is larger than available bytes (about 2700).");
Console.WriteLine("=== FetchConsumeOption \"expires in\" is " + EXPIRES_SECONDS + " seconds.");
Console.WriteLine("=== nextMessage() blocks until expiration when there are no messages available, then returns null.");
break;
}
}
}
}