From 0e21907c76dbefed11f382bcf949143f0716567f Mon Sep 17 00:00:00 2001 From: Jake Mannens Date: Fri, 20 Sep 2024 16:21:09 +1000 Subject: v0.3 --- PagerFetchService.cs | 132 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 111 insertions(+), 21 deletions(-) (limited to 'PagerFetchService.cs') diff --git a/PagerFetchService.cs b/PagerFetchService.cs index 97a03d5..98723aa 100644 --- a/PagerFetchService.cs +++ b/PagerFetchService.cs @@ -1,52 +1,109 @@ -using PagerParser.Sites; +using PagerParser.PagerProviders; +using System.Text.RegularExpressions; namespace PagerParser; -// Self-explanatory... Hosted background services that periodically fetches -// pager messages, tries to parse them, then adds both the raw pager message -// as well as the parsed message contents to the DB, ignoring already fetched -// messages in the process. +public class PagerMessageEventArgs : EventArgs { + public PagerMessage Message { get; init; } +} + +public delegate void PagerMessageHandler(object sender, PagerMessageEventArgs e); + +public interface IPagerProvider { + public event EventHandler OnConnect; + public event PagerMessageHandler OnPagerMessage; + + public void Connect(); + public void Disconnect(); + + public Task FetchAsync(CancellationToken ct, DateTime until); +} + +[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)] +public class PagerProviderAttribute : Attribute {} + public class PagerFetchService : IHostedService { - private readonly TimeSpan FetchInterval = TimeSpan.FromMinutes(15); + private const int DefaultFetchLimit = 500; + private readonly TimeSpan MinFetchInterval = TimeSpan.FromMinutes(10); + + private IPagerProvider pagerProvider = new PagerMon(); + + private DateTime? LastFetch; + + private Regex[] MessageInclude; + private Regex[] MessageExclude; + private CancellationTokenSource cts = new(); + + private IConfiguration config; + private ILogger logger; private IServiceProvider serviceProvider; private IPagerMessageParserService parser; - private Timer fetchTimer; - - private PagerMon pagerMon = new(); + public PagerFetchService( + IConfiguration config, + ILogger logger, + IServiceProvider serviceProvider, + IPagerMessageParserService parser) { - public PagerFetchService(IServiceProvider serviceProvider, IPagerMessageParserService parser) { + this.config = config; + this.logger = logger; this.serviceProvider = serviceProvider; this.parser = parser; - fetchTimer = new Timer(Fetch); + + // Pre-compile regex message filters if configured + MessageInclude = config.GetValue("PagerParser:MessageInclude")? + .Select(x => new Regex(x, RegexOptions.Compiled | RegexOptions.IgnoreCase)) + .ToArray() ?? Array.Empty(); + MessageExclude = config.GetValue("PagerParser:MessageExclude")? + .Select(x => new Regex(x, RegexOptions.Compiled | RegexOptions.IgnoreCase)) + .ToArray() ?? Array.Empty(); } public Task StartAsync(CancellationToken cancellationToken) { - fetchTimer.Change(TimeSpan.Zero, FetchInterval); + logger.LogInformation("Pager fetch service starting..."); + cts = new(); + pagerProvider.OnConnect += OnConnect; + pagerProvider.OnPagerMessage += PagerMessageReceived; + pagerProvider.Connect(); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { - fetchTimer.Change(Timeout.Infinite, Timeout.Infinite); + logger.LogInformation("Pager fetch service stopping..."); + cts.Cancel(); + pagerProvider.OnConnect -= OnConnect; + pagerProvider.OnPagerMessage -= PagerMessageReceived; + pagerProvider.Disconnect(); return Task.CompletedTask; } - private void Fetch(object? state) { + // Merge incoming messages with messages already in the DB + private void AddMessages(PagerMessage[] messages) { using var scope = serviceProvider.CreateScope(); - var db = scope.ServiceProvider.GetRequiredService(); + using var db = scope.ServiceProvider.GetRequiredService(); - // Fetch pager messages, ignoring message we've already got - var messages = pagerMon - .Fetch() + var toAdd = messages + .Where(m => + MessageInclude + .Select(mi => mi.IsMatch(m.Message)) + .DefaultIfEmpty(true) + .Contains(true)) + .Where(m => + MessageExclude + .Select(me => !me.IsMatch(m.Message)) + .DefaultIfEmpty(true) + .Contains(true)) .Where(m => !db.PagerMessages .Select(m => m.Message) .Contains(m.Message)); + logger.LogDebug($"Adding {toAdd.Count()}/{messages.Count()} message(s) to the DB"); + // Attempt to parse each newly-fetched message. // Failure to parse isn't critical, as we still keep the raw message. - foreach(var message in messages) { + foreach(var message in toAdd) { message.ParsedMessage = parser.TryParse(message.Message); if(message.ParsedMessage is not null) message.ParsedMessage.GpsPosition = @@ -57,6 +114,39 @@ public class PagerFetchService : IHostedService { db.SaveChanges(); } - public void Dispose() => - fetchTimer.Dispose(); + // If we were disconnected, or have just started up, attempt to + // fetch a bulk of messages in case we missed any. We store the + // time of the last successful fetch to ensure we don't fetch + // unless we've been disconnected for a significant amount of + // time (or have just started up). + public async void OnConnect(object? sender, EventArgs e) { + var fetchUntil = LastFetch ?? LastPageTimestamp ?? DateTime.MinValue; + + logger.LogInformation($"Fetching messages up to {fetchUntil}"); + + try { + var messages = await pagerProvider.FetchAsync(cts.Token, fetchUntil); + logger.LogInformation($"Fetched {messages.Count()} message(s)"); + AddMessages(messages); + LastFetch = DateTime.Now; + } catch { + logger.LogError("Failed to fetch messages"); + } + } + + public void PagerMessageReceived(object sender, PagerMessageEventArgs e) { + logger.LogInformation($"PagerMessage: {e.Message.Message}"); + AddMessages(new[] { e.Message }); + } + + private DateTime? LastPageTimestamp { + get { + using var scope = serviceProvider.CreateScope(); + using var db = scope.ServiceProvider.GetRequiredService(); + + return db.PagerMessages + .OrderByDescending(m => m.Timestamp) + .FirstOrDefault()?.Timestamp; + } + } } -- cgit v1.3