aboutsummaryrefslogtreecommitdiff
path: root/PagerFetchService.cs
blob: 98723aa4df69416f3c667a0cf5c6a950cff1e093 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
using PagerParser.PagerProviders;
using System.Text.RegularExpressions;

namespace PagerParser;

public class PagerMessageEventArgs : EventArgs {
    public PagerMessage Message { get; init; }
}

public delegate void PagerMessageHandler(object sender, PagerMessageEventArgs e);

public interface IPagerProvider {
    public event EventHandler        OnConnect;
    public event PagerMessageHandler OnPagerMessage;

    public void Connect();
    public void Disconnect();

    public Task<PagerMessage[]> FetchAsync(CancellationToken ct, DateTime until);
}

[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public class PagerProviderAttribute : Attribute {}

public class PagerFetchService : IHostedService {
    private const int         DefaultFetchLimit = 500;
    private readonly TimeSpan MinFetchInterval  = TimeSpan.FromMinutes(10);

    private IPagerProvider pagerProvider = new PagerMon();

    private DateTime? LastFetch;

    private Regex[] MessageInclude;
    private Regex[] MessageExclude;

    private CancellationTokenSource cts = new();

    private IConfiguration config;
    private ILogger logger;
    private IServiceProvider serviceProvider;
    private IPagerMessageParserService parser;

    public PagerFetchService(
        IConfiguration config,
        ILogger<PagerFetchService> logger,
        IServiceProvider serviceProvider,
        IPagerMessageParserService parser) {

        this.config          = config;
        this.logger          = logger;
        this.serviceProvider = serviceProvider;
        this.parser          = parser;

        // Pre-compile regex message filters if configured
        MessageInclude = config.GetValue<string[]>("PagerParser:MessageInclude")?
            .Select(x => new Regex(x, RegexOptions.Compiled | RegexOptions.IgnoreCase))
            .ToArray() ?? Array.Empty<Regex>();
        MessageExclude = config.GetValue<string[]>("PagerParser:MessageExclude")?
            .Select(x => new Regex(x, RegexOptions.Compiled | RegexOptions.IgnoreCase))
            .ToArray() ?? Array.Empty<Regex>();
    }

    public Task StartAsync(CancellationToken cancellationToken) {
        logger.LogInformation("Pager fetch service starting...");
        cts = new();
        pagerProvider.OnConnect      += OnConnect;
        pagerProvider.OnPagerMessage += PagerMessageReceived;
        pagerProvider.Connect();
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken) {
        logger.LogInformation("Pager fetch service stopping...");
        cts.Cancel();
        pagerProvider.OnConnect      -= OnConnect;
        pagerProvider.OnPagerMessage -= PagerMessageReceived;
        pagerProvider.Disconnect();
        return Task.CompletedTask;
    }

    // Merge incoming messages with messages already in the DB
    private void AddMessages(PagerMessage[] messages) {
        using var scope = serviceProvider.CreateScope();
        using var db    = scope.ServiceProvider.GetRequiredService<PagerContext>();

        var toAdd = messages
            .Where(m =>
                MessageInclude
                .Select(mi => mi.IsMatch(m.Message))
                .DefaultIfEmpty(true)
                .Contains(true))
            .Where(m =>
                MessageExclude
                .Select(me => !me.IsMatch(m.Message))
                .DefaultIfEmpty(true)
                .Contains(true))
            .Where(m =>
                !db.PagerMessages
                .Select(m => m.Message)
                .Contains(m.Message));

        logger.LogDebug($"Adding {toAdd.Count()}/{messages.Count()} message(s) to the DB");

        // Attempt to parse each newly-fetched message.
        // Failure to parse isn't critical, as we still keep the raw message.
        foreach(var message in toAdd) {
            message.ParsedMessage = parser.TryParse(message.Message);
            if(message.ParsedMessage is not null)
                message.ParsedMessage.GpsPosition =
                    PositionCalculator.GetGpsPosition(message.ParsedMessage);
        }

        db.AddRange(messages);
        db.SaveChanges();
    }

    // If we were disconnected, or have just started up, attempt to
    // fetch a bulk of messages in case we missed any. We store the
    // time of the last successful fetch to ensure we don't fetch
    // unless we've been disconnected for a significant amount of
    // time (or have just started up).
    public async void OnConnect(object? sender, EventArgs e) {
        var fetchUntil = LastFetch ?? LastPageTimestamp ?? DateTime.MinValue;

        logger.LogInformation($"Fetching messages up to {fetchUntil}");

        try {
            var messages = await pagerProvider.FetchAsync(cts.Token, fetchUntil);
            logger.LogInformation($"Fetched {messages.Count()} message(s)");
            AddMessages(messages);
            LastFetch = DateTime.Now;
        } catch {
            logger.LogError("Failed to fetch messages");
        }
    }

    public void PagerMessageReceived(object sender, PagerMessageEventArgs e) {
        logger.LogInformation($"PagerMessage: {e.Message.Message}");
        AddMessages(new[] { e.Message });
    }

    private DateTime? LastPageTimestamp {
        get {
            using var scope = serviceProvider.CreateScope();
            using var db    = scope.ServiceProvider.GetRequiredService<PagerContext>();

            return db.PagerMessages
                .OrderByDescending(m => m.Timestamp)
                .FirstOrDefault()?.Timestamp;
        }
    }
}