refactor (networking)

This commit is contained in:
Kevin Kai Berthold 2023-09-21 22:10:55 +02:00
parent febc4d9488
commit 450a6f2796
153 changed files with 7834 additions and 8004 deletions

View file

@ -1,77 +1,77 @@
using Insight.Agent.Interfaces;
using Insight.Agent.Messages;
using Insight.Domain.Interfaces;
using Insight.Domain.Messages;
using Insight.Domain.Messages.Agent;
using Insight.Infrastructure;
using Insight.Infrastructure.Entities;
using MongoDB.Bson;
using MongoDB.Driver;
namespace Insight.Server.Network.Handlers.Agent
namespace Insight.Server.Network.Handlers.Agent;
public class SessionHandler : IMessageHandler<AgentSession>
{
public class SessionHandler : IAgentMessageHandler<AgentSession>
private readonly IMongoDatabase _database;
public SessionHandler(IMongoDatabase database)
{
private readonly IMongoDatabase _database;
_database = database;
}
public SessionHandler(IMongoDatabase database)
public async ValueTask HandleAsync<TMessage>(AgentSession sender, TMessage message, CancellationToken cancellationToken) where TMessage : IMessage
{
if (message is SessionList sessions)
{
_database = database;
}
public async ValueTask HandleAsync<TMessage>(AgentSession sender, TMessage message, CancellationToken cancellationToken) where TMessage : IAgentMessage
{
if (message is SessionList sessions)
{
await OnSessionsAsync(sender, sessions, cancellationToken);
}
}
private async ValueTask OnSessionsAsync(AgentSession session, List<Session> sessions, CancellationToken cancellationToken)
{
var agentEntity = await _database.Agent().Find(Builders<AgentEntity>.Filter.Eq(p => p.Id, session.Id)).FirstOrDefaultAsync(cancellationToken);
if (agentEntity is null) return;
var hostEntity = await _database.Host().Find(Builders<HostEntity>.Filter.Eq(p => p.Agent, agentEntity.Id)).FirstOrDefaultAsync(cancellationToken);
if (hostEntity is null) return;
var batch = ObjectId.GenerateNewId().ToString();
var date = DateTime.Now;
var bulk = new List<WriteModel<HostSessionEntity>>();
if (sessions is not null && sessions.Any())
{
foreach (var sess in sessions)
{
var filterDefinition = Builders<HostSessionEntity>.Filter.And(new List<FilterDefinition<HostSessionEntity>>
{
Builders<HostSessionEntity>.Filter.Eq(x => x.Host, hostEntity.Id),
Builders<HostSessionEntity>.Filter.Eq(x => x.Sid, sess.Sid)
});
var updateDefinition = Builders<HostSessionEntity>.Update
.SetOnInsert(p => p.Insert, date)
.SetOnInsert(p => p.Host, hostEntity.Id)
.SetOnInsert(p => p.Sid, sess.Sid)
.Set(p => p.Update, date)
.Set(p => p.Batch, batch)
.Set(p => p.User, sess.User)
.Set(p => p.Remote, sess.Remote)
.Set(p => p.Type, sess.Type)
.Set(p => p.State, sess.Status);
bulk.Add(new UpdateOneModel<HostSessionEntity>(filterDefinition, updateDefinition)
{
IsUpsert = true
});
}
}
bulk.Add(new DeleteManyModel<HostSessionEntity>(Builders<HostSessionEntity>.Filter.And(new List<FilterDefinition<HostSessionEntity>>
{
Builders<HostSessionEntity>.Filter.Eq(x => x.Host, hostEntity.Id),
Builders<HostSessionEntity>.Filter.Ne(x => x.Batch, batch)
})));
var result = await _database.HostSession().BulkWriteAsync(bulk, cancellationToken: cancellationToken);
await OnSessionsAsync(sender, sessions, cancellationToken);
}
}
private async ValueTask OnSessionsAsync(AgentSession session, List<Session> sessions, CancellationToken cancellationToken)
{
var agentEntity = await _database.Agent().Find(Builders<AgentEntity>.Filter.Eq(p => p.Id, session.Id)).FirstOrDefaultAsync(cancellationToken);
if (agentEntity is null) return;
var hostEntity = await _database.Host().Find(Builders<HostEntity>.Filter.Eq(p => p.Agent, agentEntity.Id)).FirstOrDefaultAsync(cancellationToken);
if (hostEntity is null) return;
var batch = ObjectId.GenerateNewId().ToString();
var date = DateTime.Now;
var bulk = new List<WriteModel<HostSessionEntity>>();
if (sessions is not null && sessions.Any())
{
foreach (var sess in sessions)
{
var filterDefinition = Builders<HostSessionEntity>.Filter.And(new List<FilterDefinition<HostSessionEntity>>
{
Builders<HostSessionEntity>.Filter.Eq(x => x.Host, hostEntity.Id),
Builders<HostSessionEntity>.Filter.Eq(x => x.Sid, sess.Sid)
});
var updateDefinition = Builders<HostSessionEntity>.Update
.SetOnInsert(p => p.Insert, date)
.SetOnInsert(p => p.Host, hostEntity.Id)
.SetOnInsert(p => p.Sid, sess.Sid)
.Set(p => p.Update, date)
.Set(p => p.Batch, batch)
.Set(p => p.User, sess.User)
.Set(p => p.Remote, sess.Remote)
.Set(p => p.Type, sess.Type)
.Set(p => p.State, sess.Status);
bulk.Add(new UpdateOneModel<HostSessionEntity>(filterDefinition, updateDefinition)
{
IsUpsert = true
});
}
}
bulk.Add(new DeleteManyModel<HostSessionEntity>(Builders<HostSessionEntity>.Filter.And(new List<FilterDefinition<HostSessionEntity>>
{
Builders<HostSessionEntity>.Filter.Eq(x => x.Host, hostEntity.Id),
Builders<HostSessionEntity>.Filter.Ne(x => x.Batch, batch)
})));
var result = await _database.HostSession().BulkWriteAsync(bulk, cancellationToken: cancellationToken);
}
}