Uses a file share to store attachments for messages.
Two settings are required as part of the default usage:
- A file share or directory location.
- A default time to keep for attachments.
configuration.EnableAttachments(
fileShare: "networkSharePath",
timeToKeep: _ => TimeSpan.FromDays(7));
configuration.EnableAttachments(
connectionFactory: async cancel =>
{
var connection = new SqlConnection(connectionString);
try
{
await connection.OpenAsync(cancel).ConfigureAwait(false);
return connection;
}
catch
{
await connection.DisposeAsync();
throw;
}
},
timeToKeep: _ => TimeSpan.FromDays(7));
Uses the NServiceBus.Attachments.FileShare.TimeToKeep.Default
method for attachment cleanup.
This usage results in the following:
configuration.EnableAttachments(
fileShare: "networkSharePath",
timeToKeep: TimeToKeep.Default);
configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
Attachment cleanup is enabled by default. It can be disabled using the following:
var attachments = configuration.EnableAttachments(
fileShare: "networkSharePath",
timeToKeep: TimeToKeep.Default);
attachments.DisableCleanupTask();
var attachments = configuration.EnableAttachments(
connectionFactory: OpenConnection,
timeToKeep: TimeToKeep.Default);
attachments.DisableCleanupTask();
When the cleanup task runs it uses the Expiry
column to determine if a given attachment should be deleted. This column is populated when an attachment is written. When adding an attachment to an outgoing message, all methods accept an optional parameter timeToKeep
of the type GetTimeToKeep
. GetTimeToKeep
is defined as:
public delegate TimeSpan GetTimeToKeep(TimeSpan? messageTimeToBeReceived);
Where messageTimeToBeReceived
is value of TimeToBeReceived. If no timeToKeep
parameter for a specific attachment is defined then the endpoint level timeToKeep
is used.
The result of timeToKeep
is then added to the current date and persisted to the Expiry
column.
The method TimeToKeep.Default
provides a recommended default for for attachment lifetime calculation:
- If TimeToBeReceived is defined then keep attachment for twice that time.
- Else; keep for 10 days.
Approaches to using attachments for an outgoing message.
Note: Stream.Dispose is called after the data has been persisted. As such it is not necessary for any code using attachments to perform this cleanup.
While the below examples illustrate adding an attachment to SendOptions
, equivalent operations can be performed on PublishOptions
and ReplyOptions
The recommended approach for adding an attachment is by providing a delegate that constructs the stream. The execution of this delegate is then deferred until later in the outgoing pipeline, when the instance of the stream is required to be persisted.
There are both async and sync variants.
class HandlerFactory :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.Add(
name: "attachment1",
streamFactory: () => File.OpenRead("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}
class HandlerFactory :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.Add(
name: "attachment1",
streamFactory: () => File.OpenRead("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}
class HandlerFactoryAsync :
IHandleMessages<MyMessage>
{
static HttpClient httpClient = new();
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.Add(
name: "attachment1",
streamFactory: () => httpClient.GetStreamAsync("theUrl"));
return context.Send(new OtherMessage(), sendOptions);
}
}
class HandlerFactoryAsync :
IHandleMessages<MyMessage>
{
static HttpClient httpClient = new();
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
attachments.Add(
name: "attachment1",
streamFactory: () => httpClient.GetStreamAsync("theUrl"));
return context.Send(new OtherMessage(), sendOptions);
}
}
In some cases an instance of a stream is already available in scope and as such it can be passed directly.
class HandlerInstance :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
var stream = File.OpenRead("FilePath.txt");
attachments.Add(
name: "attachment1",
stream: stream,
cleanup: () => File.Delete("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}
class HandlerInstance :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var sendOptions = new SendOptions();
var attachments = sendOptions.Attachments();
var stream = File.OpenRead("FilePath.txt");
attachments.Add(
name: "attachment1",
stream: stream,
cleanup: () => File.Delete("FilePath.txt"));
return context.Send(new OtherMessage(), sendOptions);
}
}
Approaches to using attachments for the current incoming message.
Processes an attachment with a specific name.
class HandlerProcessStream :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStream(
name: "attachment1",
action: async(stream, token) =>
{
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, token);
},
context.CancellationToken);
}
}
class HandlerProcessStream :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStream(
name: "attachment1",
action: async (stream, token) =>
{
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, token);
},
context.CancellationToken);
}
}
Processes all attachments.
class HandlerProcessStreams :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreams(
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}
class HandlerProcessStreams :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreams(
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}
Copy an attachment with a specific name to another stream.
class HandlerCopyTo :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachments.CopyTo("attachment1", fileToCopyTo, context.CancellationToken);
}
}
class HandlerCopyTo :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachments.CopyTo("attachment1", fileToCopyTo, context.CancellationToken);
}
}
Get a stream for an attachment with a specific name.
class HandlerGetStream :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var stream = await attachments.GetStream("attachment1", context.CancellationToken);
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await stream.CopyToAsync(fileToCopyTo, context.CancellationToken);
}
}
class HandlerGetStream :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
await using var attachment = await attachments.GetStream("attachment1", context.CancellationToken);
// Use the attachment stream. in this example copy to a file
await using var fileToCopyTo = File.Create("FilePath.txt");
await attachment.CopyToAsync(fileToCopyTo, context.CancellationToken);
}
}
Get a byte array for an attachment with a specific name.
WARNING: This should only be used the data size is know to be small as it causes the full size of the attachment to be allocated in memory.
class HandlerGetBytes :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
var bytes = await attachments.GetBytes("attachment1", context.CancellationToken);
// use the byte array
}
}
class HandlerGetBytes :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
var bytes = await attachments.GetBytes("attachment1", context.CancellationToken);
// use the byte array
}
}
All of the above examples have companion methods that are suffixed with ForMessage
. These methods allow a handler or saga to read any attachments as long as the message id for that attachment is known. For example processing all attachments for a specific message could be done as follows
class HandlerProcessStreamsForMessage :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreamsForMessage(
messageId: "theMessageId",
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var toCopyTo = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(toCopyTo, cancel);
},
context.CancellationToken);
}
}
class HandlerProcessStreamsForMessage :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var attachments = context.Attachments();
return attachments.ProcessStreamsForMessage(
messageId: "theMessageId",
action: async (stream, cancel) =>
{
// Use the attachment stream. in this example copy to a file
await using var file = File.Create($"{stream.Name}.txt");
await stream.CopyToAsync(file, cancel);
},
context.CancellationToken);
}
}
This can be helpful in a saga that is operating in a Scatter-Gather mode. So instead of storing all binaries inside the saga persister, the saga can instead store the message ids and then, at a latter point in time, access those attachments.
The below examples also use the NServiceBus.Testing extension.
public class Handler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var options = new SendOptions();
var attachments = options.Attachments();
attachments.Add("theName", () => File.OpenRead("aFilePath"));
return context.Send(new OtherMessage(), options);
}
}
public class Handler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, HandlerContext context)
{
var options = new SendOptions();
var attachments = options.Attachments();
attachments.Add("theName", () => File.OpenRead("aFilePath"));
return context.Send(new OtherMessage(), options);
}
}
[Fact]
public async Task TestOutgoingAttachments()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
//Act
await handler.Handle(new(), context);
// Assert
var sentMessage = context.Sent.Single();
var attachments = sentMessage.Options.Attachments();
var attachment = attachments.Items.Single();
Assert.Contains("theName", attachment.Name);
Assert.True(attachments.HasPendingAttachments);
}
[Fact]
public async Task TestOutgoingAttachments()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
//Act
await handler.Handle(new(), context);
// Assert
var sentMessage = context.Sent.Single();
var attachments = sentMessage.Options.Attachments();
var attachment = attachments.Items.Single();
Assert.Contains("theName", attachment.Name);
Assert.True(attachments.HasPendingAttachments);
}
To mock or verify incoming attachments is it necessary to inject a instance of IMessageAttachments
into the current IMessageHandlerContext
. This can be done using the MockAttachmentHelper.InjectAttachmentsInstance()
extension method which exists in the NServiceBus.Attachments.Testing
namespace.
var context = new RecordingHandlerContext();
var mockMessageAttachments = new MyMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
var context = new RecordingHandlerContext();
var mockMessageAttachments = new MyMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
The implementation of IMessageHandlerContext
can be a custom coded mock or constructed using any of the popular mocking/assertion frameworks.
There is a default implementation of IMessageAttachments
named MockMessageAttachments
. This implementation stubs out all methods. All members are virtual so it can be used as simplified base class for custom mocks.
public class CustomMockMessageAttachments :
MockMessageAttachments
{
public override Task<AttachmentBytes> GetBytes(Cancel cancel = default)
{
GetBytesWasCalled = true;
return Task.FromResult(new AttachmentBytes("name", [5]));
}
public bool GetBytesWasCalled { get; private set; }
}
public class CustomMockMessageAttachments :
MockMessageAttachments
{
public override Task<AttachmentBytes> GetBytes(Cancel cancel = default)
{
GetBytesWasCalled = true;
return Task.FromResult(new AttachmentBytes("name", [5]));
}
public bool GetBytesWasCalled { get; private set; }
}
Putting these parts together allows a handler, using incoming attachments, to be tested.
public class Handler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachment = context.Attachments();
var bytes = await attachment.GetBytes(context.CancellationToken);
}
}
public class Handler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, HandlerContext context)
{
var attachment = context.Attachments();
var bytes = await attachment.GetBytes(context.CancellationToken);
}
}
[Fact]
public async Task TestIncomingAttachment()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
var mockMessageAttachments = new CustomMockMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
//Act
await handler.Handle(new(), context);
//Assert
Assert.True(mockMessageAttachments.GetBytesWasCalled);
}
[Fact]
public async Task TestIncomingAttachment()
{
//Arrange
var context = new RecordingHandlerContext();
var handler = new Handler();
var mockMessageAttachments = new CustomMockMessageAttachments();
context.InjectAttachmentsInstance(mockMessageAttachments);
//Act
await handler.Handle(new(), context);
//Assert
Assert.True(mockMessageAttachments.GetBytesWasCalled);
}