-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathServiceBusQueue.cs
More file actions
161 lines (136 loc) · 4.83 KB
/
ServiceBusQueue.cs
File metadata and controls
161 lines (136 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace CloudWorker.MessageQueue;
public class ServiceBusQueueMessage : IQueueMessage
{
private readonly ServiceBusReceivedMessage _message;
private readonly ServiceBusReceiver _receiver;
public ServiceBusQueueMessage(ServiceBusReceivedMessage message, ServiceBusReceiver receiver)
{
_message = message;
_receiver = receiver;
}
public string Id => _message.MessageId;
public string Content => _message.Body.ToString();
//TODO: Retry on throttled for the following methods
public Task RenewLeaseAsync()
{
return _receiver.RenewMessageLockAsync(_message);
}
public Task ReturnAsync()
{
return _receiver.AbandonMessageAsync(_message);
}
public Task DeleteAsync()
{
return _receiver.CompleteMessageAsync(_message);
}
}
public class ServiceBusQueueOptions : QueueOptions
{
public bool? RetryOnThrottled { get; set; } = true;
public static ServiceBusQueueOptions Default { get; } = new ServiceBusQueueOptions();
public override void Merge(QueueOptions? other)
{
base.Merge(other);
if (other is ServiceBusQueueOptions opts)
{
if (opts.RetryOnThrottled != null)
{
RetryOnThrottled = opts.RetryOnThrottled;
}
}
}
}
public class ServiceBusQueue : IMessageQueue, IAsyncDisposable
{
public const string QueueType = "servicebus";
private readonly ServiceBusQueueOptions _options;
private readonly TimeSpan _messageLease;
private ServiceBusClient _client;
private ServiceBusReceiver _receiver;
private ServiceBusSender _sender;
private ILogger? _logger;
public ServiceBusQueue(ServiceBusQueueOptions options, ILogger? logger = null)
{
_options = options;
if (_options.MessageLease is null)
{
throw new ArgumentNullException(nameof(options.MessageLease));
}
_messageLease = TimeSpan.FromSeconds((double)_options.MessageLease);
_options.RetryOnThrottled ??= ServiceBusQueueOptions.Default.RetryOnThrottled;
_client = new ServiceBusClient(_options.ConnectionString);
//TODO: Shall we have either receiver or sender, but not both, for an instance of this class?
_receiver = _client.CreateReceiver(_options.QueueName);
_sender = _client.CreateSender(_options.QueueName);
_logger = logger;
}
public TimeSpan MessageLease => _messageLease;
public async Task<IQueueMessage> WaitAsync(CancellationToken cancel = default)
{
var messages = await WaitBatchAsync(1, cancel).ConfigureAwait(false);
return messages[0];
}
public async Task<IReadOnlyList<IQueueMessage>> WaitBatchAsync(int batchSize, CancellationToken cancel = default)
{
IReadOnlyList<ServiceBusReceivedMessage>? messages = null;
while (true)
{
await RetryWhenThrottled(async () =>
{
messages = await _receiver.ReceiveMessagesAsync(batchSize, TimeSpan.MaxValue, cancel).ConfigureAwait(false);
}, cancel).ConfigureAwait(false);
if (messages?.Count > 0)
{
break;
}
else
{
_logger?.LogWarning("Service Bus ReceiveMessagesAsync returns empty result!");
}
}
return messages!.Select(msg => new ServiceBusQueueMessage(msg, _receiver)).ToImmutableList();
}
public async Task SendAsync(string message, CancellationToken cancel = default)
{
await RetryWhenThrottled(async () =>
{
await _sender.SendMessageAsync(new ServiceBusMessage(message), cancel).ConfigureAwait(false);
}, cancel).ConfigureAwait(false);
}
private async Task RetryWhenThrottled(Func<Task> task, CancellationToken cancel = default)
{
while (!cancel.IsCancellationRequested)
{
try
{
await task().ConfigureAwait(false);
break;
}
catch (ServiceBusException ex)
{
if (_options.RetryOnThrottled!.Value && ex.Reason == ServiceBusFailureReason.ServiceBusy)
{
_logger?.LogWarning(ex, "ServiceBusQueue: Being throttled. Sleep 2 seconds before retry.");
await Task.Delay(TimeSpan.FromSeconds(2), cancel).ConfigureAwait(false);
}
else
{
throw;
}
}
}
}
public ValueTask DisposeAsync()
{
return _client.DisposeAsync();
}
}