aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJake Mannens <jake@asger.xyz>2024-09-20 16:21:09 +1000
committerJake Mannens <jake@asger.xyz>2024-10-29 11:42:48 +1100
commit0e21907c76dbefed11f382bcf949143f0716567f (patch)
tree3b470c01fb441517bc794daa179e6b2159fc8b46
parent0ae271bebd4a43b14bd4c215c539e16adbe073fb (diff)
v0.3v0.3
-rw-r--r--CHANGELOG.md7
-rw-r--r--PagerFetchService.cs132
-rw-r--r--PagerParser.csproj2
-rw-r--r--Sites/PagerMon.cs103
-rw-r--r--appsettings.Development.json10
-rw-r--r--appsettings.json8
6 files changed, 229 insertions, 33 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index df35f02..5ef1e09 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+# Version 0.3
+
+### New Features
+- Added framework for realtime message handling
+- Added SocketIO support to PagerMon provider for realtime message reception functionality
+- Added configurable incoming message regex filters
+
# Version 0.2
### Bug Fixes
diff --git a/PagerFetchService.cs b/PagerFetchService.cs
index 97a03d5..98723aa 100644
--- a/PagerFetchService.cs
+++ b/PagerFetchService.cs
@@ -1,52 +1,109 @@
-using PagerParser.Sites;
+using PagerParser.PagerProviders;
+using System.Text.RegularExpressions;
namespace PagerParser;
-// Self-explanatory... Hosted background services that periodically fetches
-// pager messages, tries to parse them, then adds both the raw pager message
-// as well as the parsed message contents to the DB, ignoring already fetched
-// messages in the process.
+public class PagerMessageEventArgs : EventArgs {
+ public PagerMessage Message { get; init; }
+}
+
+public delegate void PagerMessageHandler(object sender, PagerMessageEventArgs e);
+
+public interface IPagerProvider {
+ public event EventHandler OnConnect;
+ public event PagerMessageHandler OnPagerMessage;
+
+ public void Connect();
+ public void Disconnect();
+
+ public Task<PagerMessage[]> FetchAsync(CancellationToken ct, DateTime until);
+}
+
+[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
+public class PagerProviderAttribute : Attribute {}
+
public class PagerFetchService : IHostedService {
- private readonly TimeSpan FetchInterval = TimeSpan.FromMinutes(15);
+ private const int DefaultFetchLimit = 500;
+ private readonly TimeSpan MinFetchInterval = TimeSpan.FromMinutes(10);
+
+ private IPagerProvider pagerProvider = new PagerMon();
+
+ private DateTime? LastFetch;
+
+ private Regex[] MessageInclude;
+ private Regex[] MessageExclude;
+ private CancellationTokenSource cts = new();
+
+ private IConfiguration config;
+ private ILogger logger;
private IServiceProvider serviceProvider;
private IPagerMessageParserService parser;
- private Timer fetchTimer;
-
- private PagerMon pagerMon = new();
+ public PagerFetchService(
+ IConfiguration config,
+ ILogger<PagerFetchService> logger,
+ IServiceProvider serviceProvider,
+ IPagerMessageParserService parser) {
- public PagerFetchService(IServiceProvider serviceProvider, IPagerMessageParserService parser) {
+ this.config = config;
+ this.logger = logger;
this.serviceProvider = serviceProvider;
this.parser = parser;
- fetchTimer = new Timer(Fetch);
+
+ // Pre-compile regex message filters if configured
+ MessageInclude = config.GetValue<string[]>("PagerParser:MessageInclude")?
+ .Select(x => new Regex(x, RegexOptions.Compiled | RegexOptions.IgnoreCase))
+ .ToArray() ?? Array.Empty<Regex>();
+ MessageExclude = config.GetValue<string[]>("PagerParser:MessageExclude")?
+ .Select(x => new Regex(x, RegexOptions.Compiled | RegexOptions.IgnoreCase))
+ .ToArray() ?? Array.Empty<Regex>();
}
public Task StartAsync(CancellationToken cancellationToken) {
- fetchTimer.Change(TimeSpan.Zero, FetchInterval);
+ logger.LogInformation("Pager fetch service starting...");
+ cts = new();
+ pagerProvider.OnConnect += OnConnect;
+ pagerProvider.OnPagerMessage += PagerMessageReceived;
+ pagerProvider.Connect();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken) {
- fetchTimer.Change(Timeout.Infinite, Timeout.Infinite);
+ logger.LogInformation("Pager fetch service stopping...");
+ cts.Cancel();
+ pagerProvider.OnConnect -= OnConnect;
+ pagerProvider.OnPagerMessage -= PagerMessageReceived;
+ pagerProvider.Disconnect();
return Task.CompletedTask;
}
- private void Fetch(object? state) {
+ // Merge incoming messages with messages already in the DB
+ private void AddMessages(PagerMessage[] messages) {
using var scope = serviceProvider.CreateScope();
- var db = scope.ServiceProvider.GetRequiredService<PagerContext>();
+ using var db = scope.ServiceProvider.GetRequiredService<PagerContext>();
- // Fetch pager messages, ignoring message we've already got
- var messages = pagerMon
- .Fetch()
+ var toAdd = messages
+ .Where(m =>
+ MessageInclude
+ .Select(mi => mi.IsMatch(m.Message))
+ .DefaultIfEmpty(true)
+ .Contains(true))
+ .Where(m =>
+ MessageExclude
+ .Select(me => !me.IsMatch(m.Message))
+ .DefaultIfEmpty(true)
+ .Contains(true))
.Where(m =>
!db.PagerMessages
.Select(m => m.Message)
.Contains(m.Message));
+ logger.LogDebug($"Adding {toAdd.Count()}/{messages.Count()} message(s) to the DB");
+
// Attempt to parse each newly-fetched message.
// Failure to parse isn't critical, as we still keep the raw message.
- foreach(var message in messages) {
+ foreach(var message in toAdd) {
message.ParsedMessage = parser.TryParse(message.Message);
if(message.ParsedMessage is not null)
message.ParsedMessage.GpsPosition =
@@ -57,6 +114,39 @@ public class PagerFetchService : IHostedService {
db.SaveChanges();
}
- public void Dispose() =>
- fetchTimer.Dispose();
+ // If we were disconnected, or have just started up, attempt to
+ // fetch a bulk of messages in case we missed any. We store the
+ // time of the last successful fetch to ensure we don't fetch
+ // unless we've been disconnected for a significant amount of
+ // time (or have just started up).
+ public async void OnConnect(object? sender, EventArgs e) {
+ var fetchUntil = LastFetch ?? LastPageTimestamp ?? DateTime.MinValue;
+
+ logger.LogInformation($"Fetching messages up to {fetchUntil}");
+
+ try {
+ var messages = await pagerProvider.FetchAsync(cts.Token, fetchUntil);
+ logger.LogInformation($"Fetched {messages.Count()} message(s)");
+ AddMessages(messages);
+ LastFetch = DateTime.Now;
+ } catch {
+ logger.LogError("Failed to fetch messages");
+ }
+ }
+
+ public void PagerMessageReceived(object sender, PagerMessageEventArgs e) {
+ logger.LogInformation($"PagerMessage: {e.Message.Message}");
+ AddMessages(new[] { e.Message });
+ }
+
+ private DateTime? LastPageTimestamp {
+ get {
+ using var scope = serviceProvider.CreateScope();
+ using var db = scope.ServiceProvider.GetRequiredService<PagerContext>();
+
+ return db.PagerMessages
+ .OrderByDescending(m => m.Timestamp)
+ .FirstOrDefault()?.Timestamp;
+ }
+ }
}
diff --git a/PagerParser.csproj b/PagerParser.csproj
index 0652e9b..8a16fe6 100644
--- a/PagerParser.csproj
+++ b/PagerParser.csproj
@@ -4,11 +4,13 @@
<TargetFramework>net7.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
+ <Version>0.3</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="CoordinateSharp" Version="2.18.1.1" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="7.0.4" />
+ <PackageReference Include="SocketIOClient" Version="3.0.8" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.5.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.8" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Proxies" Version="7.0.8" />
diff --git a/Sites/PagerMon.cs b/Sites/PagerMon.cs
index d27d3fe..4d19354 100644
--- a/Sites/PagerMon.cs
+++ b/Sites/PagerMon.cs
@@ -1,36 +1,117 @@
-namespace PagerParser.Sites;
+using SocketIOClient;
+using System;
+using System.Text.Json;
+
+namespace PagerParser.PagerProviders;
// Interface class for fetching pager messages from PagerMon
// sites such as Jobyyy over HTTP.
-public class PagerMon {
- public record PagerMonMessage {
+[PagerProvider]
+public class PagerMon : IPagerProvider {
+ private record PagerMonMessage {
public long Timestamp { get; set; }
public string Message { get; set; }
+
+ public PagerMessage PagerMessage => new PagerMessage() {
+ Timestamp = DateTimeOffset.FromUnixTimeSeconds(Timestamp).UtcDateTime,
+ Message = Message
+ };
}
- public record PagerMonQueryResponse {
+ private record PagerMonQueryResponse {
public PagerMonMessage[] Messages { get; set; }
}
- private const string QueryUrl =
- "https://jobyyy.net/api/messageSearch?alias=6&limit=100";
+ public event EventHandler OnConnect;
+ public event PagerMessageHandler OnPagerMessage;
+
+ private const string QueryBaseUrl =
+ "https://jobyyy.net/api/messages";
private HttpClient httpClient = new();
+ private SocketIO socketIOClient;
+
+ public PagerMon() {
+ socketIOClient = new SocketIO("https://jobyyy.net/", new SocketIOOptions() {
+ Transport = SocketIOClient.Transport.TransportProtocol.WebSocket,
+ AutoUpgrade = false,
+ EIO = EngineIO.V3
+ });
+
+ socketIOClient.OnConnected += (sender, e) =>
+ OnConnect?.Invoke(this, e);
+
+ socketIOClient.On("messagePost", PagerMessageReceived);
+ }
+
+ public void Connect() =>
+ socketIOClient.ConnectAsync().GetAwaiter().GetResult();
+
+ public void Disconnect() =>
+ socketIOClient.DisconnectAsync().GetAwaiter().GetResult();
+
public PagerMessage[] Fetch() {
List<PagerMessage> messages = new();
var result = httpClient
- .GetFromJsonAsync<PagerMonQueryResponse>(QueryUrl)
+ .GetFromJsonAsync<PagerMonQueryResponse>($"{QueryBaseUrl}?limit=120")
.GetAwaiter()
.GetResult();
if(result is null)
return Array.Empty<PagerMessage>();
- return result.Messages.Select(m => new PagerMessage() {
- Timestamp = DateTimeOffset.FromUnixTimeSeconds(m.Timestamp).UtcDateTime,
- Message = m.Message
- }).ToArray();
+ return result.Messages
+ .Select(m => m.PagerMessage)
+ .ToArray();
+ }
+
+ public async Task<PagerMessage[]> FetchAsync(CancellationToken ct, DateTime until) {
+ List<PagerMessage> pagerMessages = new();
+
+ int page = 1;
+
+ while(!ct.IsCancellationRequested) {
+ PagerMonQueryResponse? result;
+ try {
+ result = await httpClient
+ .GetFromJsonAsync<PagerMonQueryResponse>($"{QueryBaseUrl}?limit=120&page={page}");
+ } catch {
+ break;
+ }
+
+ if(result is null || result.Messages.Count() == 0)
+ break;
+
+ var messages = result.Messages
+ .Select(m => m.PagerMessage)
+ .OrderBy(m => m.Timestamp)
+ .ToArray();
+
+ pagerMessages.AddRange(messages);
+
+ if(messages[0].Timestamp <= until)
+ break;
+
+ page++;
+ }
+
+ return pagerMessages.ToArray();
+ }
+
+ private void PagerMessageReceived(SocketIOResponse resp) {
+ var message = JsonSerializer.Deserialize<PagerMonMessage>(
+ resp.GetValue(),
+ new JsonSerializerOptions() {
+ PropertyNameCaseInsensitive = true
+ });
+
+ if(message is null)
+ return;
+
+ OnPagerMessage?.Invoke(this, new() {
+ Message = message.PagerMessage
+ });
}
} \ No newline at end of file
diff --git a/appsettings.Development.json b/appsettings.Development.json
new file mode 100644
index 0000000..063ef36
--- /dev/null
+++ b/appsettings.Development.json
@@ -0,0 +1,10 @@
+{
+ "PagerParser": {
+ "ReparseAllOnStartup": true
+ },
+ "Logging": {
+ "LogLevel": {
+ "Default": "Debug"
+ }
+ }
+} \ No newline at end of file
diff --git a/appsettings.json b/appsettings.json
index 624334b..d7eb9a7 100644
--- a/appsettings.json
+++ b/appsettings.json
@@ -1,7 +1,13 @@
{
"PagerParser": {
"ReparseAllOnStartup": false,
- "ReparseFailedOnStartup": true
+ "ReparseFailedOnStartup": true,
+ "MessageInclude": [
+ // Regex patterns to include
+ ],
+ "MessageExclude": [
+ // Regex patterns to exclude
+ ]
},
"ConnectionStrings": {
"DefaultConnection": "Host=127.0.0.1;Database=PagerParser;Username=pagerparser;Password=password"