aboutsummaryrefslogtreecommitdiff
path: root/PagerFetchService.cs
diff options
context:
space:
mode:
authorJake Mannens <jake@asger.xyz>2024-09-20 16:21:09 +1000
committerJake Mannens <jake@asger.xyz>2024-10-29 11:42:54 +1100
commite8e3c4cba8ffa0056e984c113cfbb75319e00022 (patch)
tree8336315e61f9e77207276d478b25fe5dc7c3d06c /PagerFetchService.cs
parent0e21907c76dbefed11f382bcf949143f0716567f (diff)
v0.4-rc1v0.4-rc1
Diffstat (limited to 'PagerFetchService.cs')
-rw-r--r--PagerFetchService.cs111
1 files changed, 85 insertions, 26 deletions
diff --git a/PagerFetchService.cs b/PagerFetchService.cs
index 98723aa..aa014a3 100644
--- a/PagerFetchService.cs
+++ b/PagerFetchService.cs
@@ -10,8 +10,9 @@ public class PagerMessageEventArgs : EventArgs {
public delegate void PagerMessageHandler(object sender, PagerMessageEventArgs e);
public interface IPagerProvider {
- public event EventHandler OnConnect;
- public event PagerMessageHandler OnPagerMessage;
+ public event EventHandler OnConnect;
+ public event EventHandler<string> OnDisconnect;
+ public event PagerMessageHandler OnPagerMessage;
public void Connect();
public void Disconnect();
@@ -23,12 +24,17 @@ public interface IPagerProvider {
public class PagerProviderAttribute : Attribute {}
public class PagerFetchService : IHostedService {
- private const int DefaultFetchLimit = 500;
- private readonly TimeSpan MinFetchInterval = TimeSpan.FromMinutes(10);
+ private readonly TimeSpan FetchInterval = TimeSpan.FromMinutes(1);
+
+ private System.Timers.Timer fetchTimer;
private IPagerProvider pagerProvider = new PagerMon();
- private DateTime? LastFetch;
+ private IRootPagerHandler rootPagerHandler;
+
+ private DateTime? lastFetch;
+
+ private SemaphoreSlim lastFetchLock;
private Regex[] MessageInclude;
private Regex[] MessageExclude;
@@ -44,12 +50,19 @@ public class PagerFetchService : IHostedService {
IConfiguration config,
ILogger<PagerFetchService> logger,
IServiceProvider serviceProvider,
+ IRootPagerHandler rootPagerHandler,
IPagerMessageParserService parser) {
- this.config = config;
- this.logger = logger;
- this.serviceProvider = serviceProvider;
- this.parser = parser;
+ this.config = config;
+ this.logger = logger;
+ this.serviceProvider = serviceProvider;
+ this.rootPagerHandler = rootPagerHandler;
+ this.parser = parser;
+
+ fetchTimer = new() {
+ Interval = FetchInterval.TotalMilliseconds
+ };
+ fetchTimer.Elapsed += (obj, e) => FetchAsync().GetAwaiter().GetResult();
// Pre-compile regex message filters if configured
MessageInclude = config.GetValue<string[]>("PagerParser:MessageInclude")?
@@ -62,17 +75,28 @@ public class PagerFetchService : IHostedService {
public Task StartAsync(CancellationToken cancellationToken) {
logger.LogInformation("Pager fetch service starting...");
+ lastFetchLock = new(1, 1);
+ fetchTimer.AutoReset = true;
+ fetchTimer.Start();
cts = new();
+ cancellationToken.Register(() => cts.Cancel());
pagerProvider.OnConnect += OnConnect;
+ pagerProvider.OnDisconnect += OnDisconnect;
pagerProvider.OnPagerMessage += PagerMessageReceived;
pagerProvider.Connect();
+
+ _ = Task.Run(FetchAsync);
+
return Task.CompletedTask;
}
- public Task StopAsync(CancellationToken cancellationToken) {
+ public Task StopAsync(CancellationToken ct) {
logger.LogInformation("Pager fetch service stopping...");
+ lastFetchLock.Dispose();
+ fetchTimer.Stop();
cts.Cancel();
pagerProvider.OnConnect -= OnConnect;
+ pagerProvider.OnDisconnect -= OnDisconnect;
pagerProvider.OnPagerMessage -= PagerMessageReceived;
pagerProvider.Disconnect();
return Task.CompletedTask;
@@ -114,29 +138,64 @@ public class PagerFetchService : IHostedService {
db.SaveChanges();
}
- // If we were disconnected, or have just started up, attempt to
- // fetch a bulk of messages in case we missed any. We store the
- // time of the last successful fetch to ensure we don't fetch
- // unless we've been disconnected for a significant amount of
- // time (or have just started up).
- public async void OnConnect(object? sender, EventArgs e) {
- var fetchUntil = LastFetch ?? LastPageTimestamp ?? DateTime.MinValue;
+ public async Task FetchAsync() {
+ await lastFetchLock.WaitAsync(cts.Token);
+ try {
+ var fetchUntil = lastFetch ?? LastPageTimestamp ?? DateTime.MinValue;
- logger.LogInformation($"Fetching messages up to {fetchUntil}");
+ logger.LogInformation($"Fetching messages up to {fetchUntil}");
+ try {
+ var messages = await pagerProvider.FetchAsync(cts.Token, fetchUntil);
+ logger.LogInformation($"Fetched {messages.Count()} message(s)");
+ AddMessages(messages);
+ // If our provider is currently connected and receiving messages
+ // in realtime, we can set lastFetch to null, allowing it to be
+ // updated when the next disconnect occurs. Otherwise if we're
+ // currently disconnected, store the current time as the last
+ // fetch, in preparation for the next fetch cycle.
+ lastFetch = fetchTimer.AutoReset ? DateTime.Now : null;
+ } catch {
+ logger.LogError("Failed to fetch messages");
+ }
+ } finally {
+ lastFetchLock.Release();
+ }
+ }
+
+ public void OnConnect(object? sender, EventArgs e) {
+ logger.LogInformation(
+ $"Connected to pager message provider {pagerProvider.GetType().Name}");
+
+ // Stop polling, allowing any pending fetch to proceed to get messages
+ // potentially missed whilst the stream was disconnected.
+ fetchTimer.AutoReset = false;
+ }
+
+ public async void OnDisconnect(object? sender, string e) {
+ logger.LogInformation(
+ $"Disconnected from pager message provider {pagerProvider.GetType().Name}");
+
+ await lastFetchLock.WaitAsync(cts.Token);
try {
- var messages = await pagerProvider.FetchAsync(cts.Token, fetchUntil);
- logger.LogInformation($"Fetched {messages.Count()} message(s)");
- AddMessages(messages);
- LastFetch = DateTime.Now;
- } catch {
- logger.LogError("Failed to fetch messages");
+ lastFetch ??= DateTime.Now;
+ } finally {
+ lastFetchLock.Release();
}
+
+ // Fallback to polling until the stream is re-established
+ fetchTimer.AutoReset = true;
+ fetchTimer.Start();
}
- public void PagerMessageReceived(object sender, PagerMessageEventArgs e) {
+ public async void PagerMessageReceived(object sender, PagerMessageEventArgs e) {
logger.LogInformation($"PagerMessage: {e.Message.Message}");
- AddMessages(new[] { e.Message });
+
+ AddMessages([ e.Message ]);
+
+ await rootPagerHandler.HandleMessageAsync(
+ e.Message,
+ parser.TryParse(e.Message.Message));
}
private DateTime? LastPageTimestamp {