using PagerParser.PagerProviders; using System.Text.RegularExpressions; namespace PagerParser; public class PagerMessageEventArgs : EventArgs { public PagerMessage Message { get; init; } } public delegate void PagerMessageHandler(object sender, PagerMessageEventArgs e); public interface IPagerProvider { public event EventHandler OnConnect; public event EventHandler OnDisconnect; public event PagerMessageHandler OnPagerMessage; public void Connect(); public void Disconnect(); public Task FetchAsync(CancellationToken ct, DateTime until); } [AttributeUsage(AttributeTargets.Class, AllowMultiple = true)] public class PagerProviderAttribute : Attribute {} public class PagerFetchService : IHostedService { private readonly TimeSpan FetchInterval = TimeSpan.FromMinutes(1); private System.Timers.Timer fetchTimer; private IPagerProvider pagerProvider = new PagerMon(); private IRootPagerHandler rootPagerHandler; private DateTime? lastFetch; private SemaphoreSlim lastFetchLock; private Regex[] MessageInclude; private Regex[] MessageExclude; private CancellationTokenSource cts = new(); private IConfiguration config; private ILogger logger; private IServiceProvider serviceProvider; private IPagerMessageParserService parser; public PagerFetchService( IConfiguration config, ILogger logger, IServiceProvider serviceProvider, IRootPagerHandler rootPagerHandler, IPagerMessageParserService parser) { this.config = config; this.logger = logger; this.serviceProvider = serviceProvider; this.rootPagerHandler = rootPagerHandler; this.parser = parser; fetchTimer = new() { Interval = FetchInterval.TotalMilliseconds }; fetchTimer.Elapsed += (obj, e) => FetchAsync().GetAwaiter().GetResult(); // Pre-compile regex message filters if configured MessageInclude = config.GetValue("PagerParser:MessageInclude")? .Select(x => new Regex(x, RegexOptions.Compiled | RegexOptions.IgnoreCase)) .ToArray() ?? Array.Empty(); MessageExclude = config.GetValue("PagerParser:MessageExclude")? .Select(x => new Regex(x, RegexOptions.Compiled | RegexOptions.IgnoreCase)) .ToArray() ?? Array.Empty(); } public Task StartAsync(CancellationToken cancellationToken) { logger.LogInformation("Pager fetch service starting..."); lastFetchLock = new(1, 1); fetchTimer.AutoReset = true; fetchTimer.Start(); cts = new(); cancellationToken.Register(() => cts.Cancel()); pagerProvider.OnConnect += OnConnect; pagerProvider.OnDisconnect += OnDisconnect; pagerProvider.OnPagerMessage += PagerMessageReceived; pagerProvider.Connect(); _ = Task.Run(FetchAsync); return Task.CompletedTask; } public Task StopAsync(CancellationToken ct) { logger.LogInformation("Pager fetch service stopping..."); lastFetchLock.Dispose(); fetchTimer.Stop(); cts.Cancel(); pagerProvider.OnConnect -= OnConnect; pagerProvider.OnDisconnect -= OnDisconnect; pagerProvider.OnPagerMessage -= PagerMessageReceived; pagerProvider.Disconnect(); return Task.CompletedTask; } // Merge incoming messages with messages already in the DB private void AddMessages(PagerMessage[] messages) { using var scope = serviceProvider.CreateScope(); using var db = scope.ServiceProvider.GetRequiredService(); var toAdd = messages .Where(m => MessageInclude .Select(mi => mi.IsMatch(m.Message)) .DefaultIfEmpty(true) .Contains(true)) .Where(m => MessageExclude .Select(me => !me.IsMatch(m.Message)) .DefaultIfEmpty(true) .Contains(true)) .Where(m => !db.PagerMessages .Select(m => m.Message) .Contains(m.Message)); logger.LogDebug($"Adding {toAdd.Count()}/{messages.Count()} message(s) to the DB"); // Attempt to parse each newly-fetched message. // Failure to parse isn't critical, as we still keep the raw message. foreach(var message in toAdd) { message.ParsedMessage = parser.TryParse(message.Message); if(message.ParsedMessage is not null) message.ParsedMessage.GpsPosition = PositionCalculator.GetGpsPosition(message.ParsedMessage); } db.AddRange(messages); db.SaveChanges(); } public async Task FetchAsync() { await lastFetchLock.WaitAsync(cts.Token); try { var fetchUntil = lastFetch ?? LastPageTimestamp ?? DateTime.MinValue; logger.LogInformation($"Fetching messages up to {fetchUntil}"); try { var messages = await pagerProvider.FetchAsync(cts.Token, fetchUntil); logger.LogInformation($"Fetched {messages.Count()} message(s)"); AddMessages(messages); // If our provider is currently connected and receiving messages // in realtime, we can set lastFetch to null, allowing it to be // updated when the next disconnect occurs. Otherwise if we're // currently disconnected, store the current time as the last // fetch, in preparation for the next fetch cycle. lastFetch = fetchTimer.AutoReset ? DateTime.Now : null; } catch { logger.LogError("Failed to fetch messages"); } } finally { lastFetchLock.Release(); } } public void OnConnect(object? sender, EventArgs e) { logger.LogInformation( $"Connected to pager message provider {pagerProvider.GetType().Name}"); // Stop polling, allowing any pending fetch to proceed to get messages // potentially missed whilst the stream was disconnected. fetchTimer.AutoReset = false; } public async void OnDisconnect(object? sender, string e) { logger.LogInformation( $"Disconnected from pager message provider {pagerProvider.GetType().Name}"); await lastFetchLock.WaitAsync(cts.Token); try { lastFetch ??= DateTime.Now; } finally { lastFetchLock.Release(); } // Fallback to polling until the stream is re-established fetchTimer.AutoReset = true; fetchTimer.Start(); } public async void PagerMessageReceived(object sender, PagerMessageEventArgs e) { logger.LogInformation($"PagerMessage: {e.Message.Message}"); AddMessages([ e.Message ]); await rootPagerHandler.HandleMessageAsync( e.Message, parser.TryParse(e.Message.Message)); } private DateTime? LastPageTimestamp { get { using var scope = serviceProvider.CreateScope(); using var db = scope.ServiceProvider.GetRequiredService(); return db.PagerMessages .OrderByDescending(m => m.Timestamp) .FirstOrDefault()?.Timestamp; } } }