diff options
| author | Jake Mannens <jake@asger.xyz> | 2024-09-20 16:21:09 +1000 |
|---|---|---|
| committer | Jake Mannens <jake@asger.xyz> | 2024-10-29 11:42:54 +1100 |
| commit | e8e3c4cba8ffa0056e984c113cfbb75319e00022 (patch) | |
| tree | 8336315e61f9e77207276d478b25fe5dc7c3d06c /PagerFetchService.cs | |
| parent | 0e21907c76dbefed11f382bcf949143f0716567f (diff) | |
v0.4-rc1v0.4-rc1
Diffstat (limited to 'PagerFetchService.cs')
| -rw-r--r-- | PagerFetchService.cs | 111 |
1 files changed, 85 insertions, 26 deletions
diff --git a/PagerFetchService.cs b/PagerFetchService.cs index 98723aa..aa014a3 100644 --- a/PagerFetchService.cs +++ b/PagerFetchService.cs @@ -10,8 +10,9 @@ public class PagerMessageEventArgs : EventArgs { public delegate void PagerMessageHandler(object sender, PagerMessageEventArgs e); public interface IPagerProvider { - public event EventHandler OnConnect; - public event PagerMessageHandler OnPagerMessage; + public event EventHandler OnConnect; + public event EventHandler<string> OnDisconnect; + public event PagerMessageHandler OnPagerMessage; public void Connect(); public void Disconnect(); @@ -23,12 +24,17 @@ public interface IPagerProvider { public class PagerProviderAttribute : Attribute {} public class PagerFetchService : IHostedService { - private const int DefaultFetchLimit = 500; - private readonly TimeSpan MinFetchInterval = TimeSpan.FromMinutes(10); + private readonly TimeSpan FetchInterval = TimeSpan.FromMinutes(1); + + private System.Timers.Timer fetchTimer; private IPagerProvider pagerProvider = new PagerMon(); - private DateTime? LastFetch; + private IRootPagerHandler rootPagerHandler; + + private DateTime? lastFetch; + + private SemaphoreSlim lastFetchLock; private Regex[] MessageInclude; private Regex[] MessageExclude; @@ -44,12 +50,19 @@ public class PagerFetchService : IHostedService { IConfiguration config, ILogger<PagerFetchService> logger, IServiceProvider serviceProvider, + IRootPagerHandler rootPagerHandler, IPagerMessageParserService parser) { - this.config = config; - this.logger = logger; - this.serviceProvider = serviceProvider; - this.parser = parser; + this.config = config; + this.logger = logger; + this.serviceProvider = serviceProvider; + this.rootPagerHandler = rootPagerHandler; + this.parser = parser; + + fetchTimer = new() { + Interval = FetchInterval.TotalMilliseconds + }; + fetchTimer.Elapsed += (obj, e) => FetchAsync().GetAwaiter().GetResult(); // Pre-compile regex message filters if configured MessageInclude = config.GetValue<string[]>("PagerParser:MessageInclude")? @@ -62,17 +75,28 @@ public class PagerFetchService : IHostedService { public Task StartAsync(CancellationToken cancellationToken) { logger.LogInformation("Pager fetch service starting..."); + lastFetchLock = new(1, 1); + fetchTimer.AutoReset = true; + fetchTimer.Start(); cts = new(); + cancellationToken.Register(() => cts.Cancel()); pagerProvider.OnConnect += OnConnect; + pagerProvider.OnDisconnect += OnDisconnect; pagerProvider.OnPagerMessage += PagerMessageReceived; pagerProvider.Connect(); + + _ = Task.Run(FetchAsync); + return Task.CompletedTask; } - public Task StopAsync(CancellationToken cancellationToken) { + public Task StopAsync(CancellationToken ct) { logger.LogInformation("Pager fetch service stopping..."); + lastFetchLock.Dispose(); + fetchTimer.Stop(); cts.Cancel(); pagerProvider.OnConnect -= OnConnect; + pagerProvider.OnDisconnect -= OnDisconnect; pagerProvider.OnPagerMessage -= PagerMessageReceived; pagerProvider.Disconnect(); return Task.CompletedTask; @@ -114,29 +138,64 @@ public class PagerFetchService : IHostedService { db.SaveChanges(); } - // If we were disconnected, or have just started up, attempt to - // fetch a bulk of messages in case we missed any. We store the - // time of the last successful fetch to ensure we don't fetch - // unless we've been disconnected for a significant amount of - // time (or have just started up). - public async void OnConnect(object? sender, EventArgs e) { - var fetchUntil = LastFetch ?? LastPageTimestamp ?? DateTime.MinValue; + public async Task FetchAsync() { + await lastFetchLock.WaitAsync(cts.Token); + try { + var fetchUntil = lastFetch ?? LastPageTimestamp ?? DateTime.MinValue; - logger.LogInformation($"Fetching messages up to {fetchUntil}"); + logger.LogInformation($"Fetching messages up to {fetchUntil}"); + try { + var messages = await pagerProvider.FetchAsync(cts.Token, fetchUntil); + logger.LogInformation($"Fetched {messages.Count()} message(s)"); + AddMessages(messages); + // If our provider is currently connected and receiving messages + // in realtime, we can set lastFetch to null, allowing it to be + // updated when the next disconnect occurs. Otherwise if we're + // currently disconnected, store the current time as the last + // fetch, in preparation for the next fetch cycle. + lastFetch = fetchTimer.AutoReset ? DateTime.Now : null; + } catch { + logger.LogError("Failed to fetch messages"); + } + } finally { + lastFetchLock.Release(); + } + } + + public void OnConnect(object? sender, EventArgs e) { + logger.LogInformation( + $"Connected to pager message provider {pagerProvider.GetType().Name}"); + + // Stop polling, allowing any pending fetch to proceed to get messages + // potentially missed whilst the stream was disconnected. + fetchTimer.AutoReset = false; + } + + public async void OnDisconnect(object? sender, string e) { + logger.LogInformation( + $"Disconnected from pager message provider {pagerProvider.GetType().Name}"); + + await lastFetchLock.WaitAsync(cts.Token); try { - var messages = await pagerProvider.FetchAsync(cts.Token, fetchUntil); - logger.LogInformation($"Fetched {messages.Count()} message(s)"); - AddMessages(messages); - LastFetch = DateTime.Now; - } catch { - logger.LogError("Failed to fetch messages"); + lastFetch ??= DateTime.Now; + } finally { + lastFetchLock.Release(); } + + // Fallback to polling until the stream is re-established + fetchTimer.AutoReset = true; + fetchTimer.Start(); } - public void PagerMessageReceived(object sender, PagerMessageEventArgs e) { + public async void PagerMessageReceived(object sender, PagerMessageEventArgs e) { logger.LogInformation($"PagerMessage: {e.Message.Message}"); - AddMessages(new[] { e.Message }); + + AddMessages([ e.Message ]); + + await rootPagerHandler.HandleMessageAsync( + e.Message, + parser.TryParse(e.Message.Message)); } private DateTime? LastPageTimestamp { |
