Skip to content

Commit

Permalink
now follows async naming convention (#73)
Browse files Browse the repository at this point in the history
* add .cache folder to gitiginore

* now follows async naming convention
  • Loading branch information
runeanielsen authored Aug 15, 2022
1 parent db213dd commit 0956ded
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 82 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
bin/
obj/
publish/
publish/
.cache/
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Get the start_lsn column value for the specified capture instance from the cdc.c
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var minLsn = await Cdc.GetMinLsn(connection, "dbo_Employee");
var minLsn = await Cdc.GetMinLsnAsync(connection, "dbo_Employee");
```

### Get max LSN
Expand All @@ -29,7 +29,7 @@ Get the maximum log sequence number (LSN) from the start_lsn column in the cdc.l
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var maxLsn = await Cdc.GetMaxLsn(connection);
var maxLsn = await Cdc.GetMaxLsnAsync(connection);
```

### Get previous LSN
Expand All @@ -39,7 +39,7 @@ Get the previous log sequence number (LSN) in the sequence based upon the specif
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var previousLsn = await Cdc.GetPreviousLsn(connection, 120000);
var previousLsn = await Cdc.GetPreviousLsnAsync(connection, 120000);
```

### Get next LSN
Expand All @@ -49,7 +49,7 @@ Get the next log sequence number (LSN) in the sequence based upon the specified
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var nextLsn = await Cdc.GetNextLsn(connection, 120000);
var nextLsn = await Cdc.GetNextLsnAsync(connection, 120000);
```

### Map time to LSN
Expand All @@ -59,7 +59,7 @@ Map the log sequence number (LSN) value from the start_lsn column in the cdc.lsn
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var lsn = await Cdc.MapTimeToLsn(connection, DateTime.UtcNow, RelationalOperator.LargestLessThan);
var lsn = await Cdc.MapTimeToLsnAsync(connection, DateTime.UtcNow, RelationalOperator.LargestLessThan);
```

### Map LSN to time
Expand All @@ -69,7 +69,7 @@ Map date and time value from the tran_end_time column in the cdc.lsn_time_mappin
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var time = await Cdc.MapLsnToTime(connection, 120000);
var time = await Cdc.MapLsnToTimeAsync(connection, 120000);
```

### Get all changes
Expand All @@ -79,7 +79,7 @@ Get one row for each change applied to the source table within the specified log
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var allChanges = await Cdc.GetAllChanges(connection, "dbo_Employee", 120000, 120020);
var allChanges = await Cdc.GetAllChangesAsync(connection, "dbo_Employee", 120000, 120020);
```

### Get net changes
Expand All @@ -89,7 +89,7 @@ Get one net change row for each source row changed within the specified Log Sequ
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var netChanges = await Cdc.GetNetChanges(connection, "dbo_Employee", 120000, 120020);
var netChanges = await Cdc.GetNetChangesAsync(connection, "dbo_Employee", 120000, 120020);
```

### Get column ordinal
Expand All @@ -99,7 +99,7 @@ Get the column ordinal of the specified column as it appears in the change table
```c#
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();
var columnOrdinal = await Cdc.GetColumnOrdinal(connection, "dbo_Employee", "Salary");
var columnOrdinal = await Cdc.GetColumnOrdinalAsync(connection, "dbo_Employee", "Salary");
```

### Has column changed
Expand All @@ -112,11 +112,11 @@ var captureInstance = "dbo_Employee";
using var connection = new SqlConnection("myConnectionString");
await connection.OpenAsync();

var minLsn = await Cdc.GetMinLsn(connection, captureInstance);
var maxLsn = await Cdc.GetMaxLsn(connection);
var minLsn = await Cdc.GetMinLsnAsync(connection, captureInstance);
var maxLsn = await Cdc.GetMaxLsnAsync(connection);

var columnOrdinal = await Cdc.GetColumnOrdinal(connection, captureInstance, columnName);
var changes = await Cdc.GetAllChanges(
var columnOrdinal = await Cdc.GetColumnOrdinalAsync(connection, captureInstance, columnName);
var changes = await Cdc.GetAllChangesAsync(
connection,
captureInstance,
minLsn,
Expand All @@ -126,7 +126,7 @@ var changes = await Cdc.GetAllChanges(
// We just pick the first one here as an example.
var updateMask = changes.First().GetUpdateMask();

var hasColumnChanged = await Cdc.HasColumnChanged(connection, captureInstance, columnName, updateMask);
var hasColumnChanged = await Cdc.HasColumnChangedAsync(connection, captureInstance, columnName, updateMask);
```

## Setup CDC on MS-SQL Server
Expand Down
10 changes: 5 additions & 5 deletions examples/Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ public static void Main(string[] args)
{
await connection.OpenAsync();

var highBoundLsn = await Cdc.GetMaxLsn(connection);
var highBoundLsn = await Cdc.GetMaxLsnAsync(connection);
if (lowBoundLsn <= highBoundLsn)
{
Console.WriteLine($"Polling with from '{lowBoundLsn}' to '{highBoundLsn}");

var changes = new List<AllChangeRow>();
foreach (var table in tables)
{
var changeSets = await Cdc.GetAllChanges(
var changeSets = await Cdc.GetAllChangesAsync(
connection, table, lowBoundLsn, highBoundLsn, AllChangesRowFilterOption.AllUpdateOld);
changes.AddRange(changeSets);
}

var orderedChanges = changes.OrderBy(x => x.SequenceValue).ToList();
await changeDataChannel.Writer.WriteAsync(orderedChanges);

lowBoundLsn = await Cdc.GetNextLsn(connection, highBoundLsn);
lowBoundLsn = await Cdc.GetNextLsnAsync(connection, highBoundLsn);
}
else
{
Expand Down Expand Up @@ -99,8 +99,8 @@ private static async Task<BigInteger> GetStartLsn(string connectionString)
{
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
var currentMaxLsn = await Cdc.GetMaxLsn(connection);
return await Cdc.GetNextLsn(connection, currentMaxLsn);
var currentMaxLsn = await Cdc.GetMaxLsnAsync(connection);
return await Cdc.GetNextLsnAsync(connection, currentMaxLsn);
}

private static string CreateConnectionString()
Expand Down
38 changes: 19 additions & 19 deletions src/MsSqlCdc/Cdc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ public static class Cdc
/// Returns whether the specified update mask indicates that the specified column
/// has been updated in the associated change row.
/// </returns>
public static async Task<bool> HasColumnChanged(
public static async Task<bool> HasColumnChangedAsync(
SqlConnection connection,
string captureInstance,
string columnName,
byte[] updateMask)
{
var hasColumnChanged = await CdcDatabase.HasColumnChanged(
var hasColumnChanged = await CdcDatabase.HasColumnChangedAsync(
connection, captureInstance, columnName, updateMask).ConfigureAwait(false);

return hasColumnChanged ?? throw new CdcException(
Expand All @@ -112,12 +112,12 @@ public static async Task<bool> HasColumnChanged(
/// table associated with the specified capture instance.
/// If the column ordinal could not be found -1 is returned.
/// </returns>
public static async Task<int> GetColumnOrdinal(
public static async Task<int> GetColumnOrdinalAsync(
SqlConnection connection,
string captureInstance,
string columnName)
{
return await CdcDatabase.GetColumnOrdinal(connection, captureInstance, columnName)
return await CdcDatabase.GetColumnOrdinalAsync(connection, captureInstance, columnName)
.ConfigureAwait(false) ?? -1;
}

Expand All @@ -135,13 +135,13 @@ public static async Task<int> GetColumnOrdinal(
/// Returns the log sequence number (LSN) value from the start_lsn column
/// in the cdc.lsn_time_mapping system table for the specified time.
/// </returns>
public static async Task<BigInteger> MapTimeToLsn(
public static async Task<BigInteger> MapTimeToLsnAsync(
SqlConnection connection,
DateTime trackingTime,
RelationalOperator relationalOperator)
{
var convertedRelationOperator = DataConvert.ConvertRelationOperator(relationalOperator);
var lsnBytes = await CdcDatabase.MapTimeToLsn(
var lsnBytes = await CdcDatabase.MapTimeToLsnAsync(
connection, trackingTime, convertedRelationOperator).ConfigureAwait(false);

return lsnBytes is not null
Expand All @@ -162,7 +162,7 @@ public static async Task<BigInteger> MapTimeToLsn(
/// Returns the date and time value from the tran_end_time column in the cdc.lsn_time_mapping
/// system table for the specified log sequence number (LSN).
/// </returns>
public static async Task<DateTime> MapLsnToTime(SqlConnection connection, BigInteger lsn)
public static async Task<DateTime> MapLsnToTimeAsync(SqlConnection connection, BigInteger lsn)
{
var binaryLsn = DataConvert.ConvertLsnBigEndian(lsn);
return await CdcDatabase.MapLsnToTime(connection, binaryLsn).ConfigureAwait(false) ??
Expand All @@ -176,9 +176,9 @@ public static async Task<DateTime> MapLsnToTime(SqlConnection connection, BigInt
/// <param name="connection">An open connection to a MS-SQL database.</param>
/// <param name="captureInstance">The name of the capture instance.</param>
/// <returns>Return the low endpoint of the change data capture timeline for any capture instance.</returns>
public static async Task<BigInteger> GetMinLsn(SqlConnection connection, string captureInstance)
public static async Task<BigInteger> GetMinLsnAsync(SqlConnection connection, string captureInstance)
{
var minLsnBytes = await CdcDatabase.GetMinLsn(connection, captureInstance).ConfigureAwait(false);
var minLsnBytes = await CdcDatabase.GetMinLsnAsync(connection, captureInstance).ConfigureAwait(false);

return minLsnBytes is not null
? DataConvert.ConvertBinaryLsn(minLsnBytes)
Expand All @@ -193,9 +193,9 @@ public static async Task<BigInteger> GetMinLsn(SqlConnection connection, string
/// </summary>
/// <param name="connection">An open connection to a MS-SQL database.</param>
/// <returns>Return the high endpoint of the change data capture timeline for any capture instance.</returns>
public static async Task<BigInteger> GetMaxLsn(SqlConnection connection)
public static async Task<BigInteger> GetMaxLsnAsync(SqlConnection connection)
{
var maxLsnBytes = await CdcDatabase.GetMaxLsn(connection).ConfigureAwait(false);
var maxLsnBytes = await CdcDatabase.GetMaxLsnAsync(connection).ConfigureAwait(false);

return maxLsnBytes is not null
? DataConvert.ConvertBinaryLsn(maxLsnBytes)
Expand All @@ -208,10 +208,10 @@ public static async Task<BigInteger> GetMaxLsn(SqlConnection connection)
/// <param name="connection">An open connection to a MS-SQL database.</param>
/// <param name="lsn">The LSN number that should be used as the point to get the previous LSN.</param>
/// <returns>Return the high endpoint of the change data capture timeline for any capture instance.</returns>
public static async Task<BigInteger> GetPreviousLsn(SqlConnection connection, BigInteger lsn)
public static async Task<BigInteger> GetPreviousLsnAsync(SqlConnection connection, BigInteger lsn)
{
var binaryLsn = DataConvert.ConvertLsnBigEndian(lsn);
var previousLsnBytes = await CdcDatabase.DecrementLsn(connection, binaryLsn).ConfigureAwait(false);
var previousLsnBytes = await CdcDatabase.DecrementLsnAsync(connection, binaryLsn).ConfigureAwait(false);

return previousLsnBytes is not null
? DataConvert.ConvertBinaryLsn(previousLsnBytes)
Expand All @@ -224,10 +224,10 @@ public static async Task<BigInteger> GetPreviousLsn(SqlConnection connection, Bi
/// <param name="connection">An open connection to a MS-SQL database.</param>
/// <param name="lsn">The LSN number that should be used as the point to get the next LSN.</param>
/// <returns>Get the next log sequence number (LSN) in the sequence based upon the specified LSN.</returns>
public static async Task<BigInteger> GetNextLsn(SqlConnection connection, BigInteger lsn)
public static async Task<BigInteger> GetNextLsnAsync(SqlConnection connection, BigInteger lsn)
{
var lsnBinary = DataConvert.ConvertLsnBigEndian(lsn);
var nextLsnBytes = await CdcDatabase.IncrementLsn(connection, lsnBinary).ConfigureAwait(false);
var nextLsnBytes = await CdcDatabase.IncrementLsnAsync(connection, lsnBinary).ConfigureAwait(false);

return nextLsnBytes is not null
? DataConvert.ConvertBinaryLsn(nextLsnBytes)
Expand All @@ -244,7 +244,7 @@ public static async Task<BigInteger> GetNextLsn(SqlConnection connection, BigInt
/// <returns>
/// Returns one net change row for each source row changed within the specified Log Sequence Numbers (LSN) range.
/// </returns>
public static async Task<IReadOnlyCollection<NetChangeRow>> GetNetChanges(
public static async Task<IReadOnlyCollection<NetChangeRow>> GetNetChangesAsync(
SqlConnection connection,
string captureInstance,
BigInteger fromLsn,
Expand All @@ -254,7 +254,7 @@ public static async Task<IReadOnlyCollection<NetChangeRow>> GetNetChanges(
var beginLsnBinary = DataConvert.ConvertLsnBigEndian(fromLsn);
var endLsnBinary = DataConvert.ConvertLsnBigEndian(toLsn);
var filterOption = DataConvert.ConvertNetChangesRowFilterOption(netChangesRowFilterOption);
var cdcColumns = await CdcDatabase.GetNetChanges(
var cdcColumns = await CdcDatabase.GetNetChangesAsync(
connection, captureInstance, beginLsnBinary, endLsnBinary, filterOption).ConfigureAwait(false);

return cdcColumns.Select(x => NetChangeRowFactory.Create(x, captureInstance)).ToList();
Expand All @@ -272,7 +272,7 @@ public static async Task<IReadOnlyCollection<NetChangeRow>> GetNetChanges(
/// Returns one row for each change applied to the source table within the specified log sequence number (LSN) range.
/// If a source row had multiple changes during the interval, each change is represented in the returned result set.
/// </returns>
public static async Task<IReadOnlyCollection<AllChangeRow>> GetAllChanges(
public static async Task<IReadOnlyCollection<AllChangeRow>> GetAllChangesAsync(
SqlConnection connection,
string captureInstance,
BigInteger beginLsn,
Expand All @@ -282,7 +282,7 @@ public static async Task<IReadOnlyCollection<AllChangeRow>> GetAllChanges(
var beginLsnBinary = DataConvert.ConvertLsnBigEndian(beginLsn);
var endLsnBinary = DataConvert.ConvertLsnBigEndian(endLsn);
var filterOption = DataConvert.ConvertAllChangesRowFilterOption(allChangesRowFilterOption);
var cdcColumns = await CdcDatabase.GetAllChanges(
var cdcColumns = await CdcDatabase.GetAllChangesAsync(
connection, captureInstance, beginLsnBinary, endLsnBinary, filterOption).ConfigureAwait(false);

return cdcColumns.Select(x => AllChangeRowFactory.Create(x, captureInstance)).ToList();
Expand Down
24 changes: 12 additions & 12 deletions src/MsSqlCdc/CdcDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace MsSqlCdc;

internal static class CdcDatabase
{
public static async Task<bool?> HasColumnChanged(
public static async Task<bool?> HasColumnChangedAsync(
SqlConnection connection,
string captureInstance,
string columnName,
Expand All @@ -25,7 +25,7 @@ internal static class CdcDatabase
return hasColumnChanged != DBNull.Value ? (bool?)hasColumnChanged : null;
}

public static async Task<int?> GetColumnOrdinal(
public static async Task<int?> GetColumnOrdinalAsync(
SqlConnection connection,
string captureInstance,
string columnName)
Expand Down Expand Up @@ -53,7 +53,7 @@ internal static class CdcDatabase
return lsnTime != DBNull.Value ? (DateTime?)lsnTime : null;
}

public static async Task<byte[]?> MapTimeToLsn(
public static async Task<byte[]?> MapTimeToLsnAsync(
SqlConnection connection,
DateTime trackingTime,
string relationOperator)
Expand All @@ -69,7 +69,7 @@ internal static class CdcDatabase
return lsnBasedOnTime != DBNull.Value ? (byte[]?)lsnBasedOnTime : null;
}

public static async Task<byte[]?> GetMinLsn(SqlConnection connection, string captureInstance)
public static async Task<byte[]?> GetMinLsnAsync(SqlConnection connection, string captureInstance)
{
var sql = "SELECT sys.fn_cdc_get_min_lsn(@capture_instance)";

Expand All @@ -81,7 +81,7 @@ internal static class CdcDatabase
return minLsn != DBNull.Value ? (byte[]?)minLsn : null;
}

public static async Task<byte[]?> GetMaxLsn(SqlConnection connection)
public static async Task<byte[]?> GetMaxLsnAsync(SqlConnection connection)
{
var sql = "SELECT sys.fn_cdc_get_max_lsn()";
using var command = new SqlCommand(sql, connection);
Expand All @@ -90,7 +90,7 @@ internal static class CdcDatabase
return maxLsn != DBNull.Value ? (byte[]?)maxLsn : null;
}

public static async Task<byte[]?> DecrementLsn(SqlConnection connection, byte[] lsn)
public static async Task<byte[]?> DecrementLsnAsync(SqlConnection connection, byte[] lsn)
{
var sql = "SELECT sys.fn_cdc_decrement_lsn(@lsn)";

Expand All @@ -102,7 +102,7 @@ internal static class CdcDatabase
return decrementedLsn != DBNull.Value ? (byte[]?)decrementedLsn : null;
}

public static async Task<byte[]?> IncrementLsn(SqlConnection connection, byte[] lsn)
public static async Task<byte[]?> IncrementLsnAsync(SqlConnection connection, byte[] lsn)
{
var sql = "SELECT sys.fn_cdc_increment_lsn(@lsn)";

Expand All @@ -114,14 +114,14 @@ internal static class CdcDatabase
return incrementedLsn != DBNull.Value ? (byte[]?)incrementedLsn : null;
}

public static async Task<List<IReadOnlyDictionary<string, object>>> GetAllChanges(
public static async Task<List<IReadOnlyDictionary<string, object>>> GetAllChangesAsync(
SqlConnection connection,
string captureInstance,
byte[] beginLsn,
byte[] endLsn,
string filterOption)
{
return await GetChanges(
return await GetChangesAsync(
connection,
"cdc.fn_cdc_get_all_changes",
captureInstance,
Expand All @@ -130,14 +130,14 @@ public static async Task<List<IReadOnlyDictionary<string, object>>> GetAllChange
filterOption).ConfigureAwait(false);
}

public static async Task<List<IReadOnlyDictionary<string, object>>> GetNetChanges(
public static async Task<List<IReadOnlyDictionary<string, object>>> GetNetChangesAsync(
SqlConnection connection,
string captureInstance,
byte[] beginLsn,
byte[] endLsn,
string filterOption)
{
return await GetChanges(
return await GetChangesAsync(
connection,
"cdc.fn_cdc_get_net_changes",
captureInstance,
Expand All @@ -149,7 +149,7 @@ public static async Task<List<IReadOnlyDictionary<string, object>>> GetNetChange
[System.Diagnostics.CodeAnalysis.SuppressMessage
("Security", "CA2100:Review SQL queries for security vulnerabilities",
Justification = "No user input.")]
private static async Task<List<IReadOnlyDictionary<string, object>>> GetChanges(
private static async Task<List<IReadOnlyDictionary<string, object>>> GetChangesAsync(
SqlConnection connection,
string cdcFunction,
string captureInstance,
Expand Down
Loading

0 comments on commit 0956ded

Please sign in to comment.