фиксация

revision
vlad zverzhkhovskiy 2025-08-07 10:03:02 +03:00
parent c21e3ed952
commit 1f987c6982
24 changed files with 174 additions and 317 deletions

View File

@ -1,4 +1,5 @@
using Microsoft.AspNetCore.Mvc;
using Sphagnum.Client;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Contracts.Messages;
@ -8,10 +9,10 @@ namespace Sphagnum.DebugClient.Controllers
[Route("[controller]/[action]")]
public class TestController : ControllerBase
{
private readonly IMessagingClient _connection;
private readonly ClientDefault _connection;
private static readonly Task? rec;
public TestController(IMessagingClient connection)
public TestController(ClientDefault connection)
{
_connection = connection;
}
@ -34,9 +35,9 @@ namespace Sphagnum.DebugClient.Controllers
payload1[i] = 1;
payload2[i] = 2;
}
var t1 = _connection.Publish(new OutgoingMessage("test", payload1)).AsTask();
var t2 = _connection.Publish(new OutgoingMessage("test", payload2)).AsTask();
await Task.WhenAll(t1, t2);
var t1 = _connection.Publish(new Common.Messaging.Contracts.Messages.Message("test", RoutingKey.Empty, payload1)).AsTask();
//var t2 = _connection.Publish(new Message("test", RoutingKey.Empty, payload2)).AsTask();
await Task.WhenAll(t1);
}
}
}

View File

@ -1,6 +1,5 @@
using Sphagnum.Client;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Old.Contracts.Login;
using Sphagnum.Common.Infrastructure.Services;
var builder = WebApplication.CreateBuilder(args);
@ -9,13 +8,13 @@ builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddSingleton(new ConnectionFactory()
{
UserRights = UserRights.All,
//UserRights = UserRights.All,
Login = "root",
Password = "root",
Hostname = "test_server",
Port = 8081,
});
builder.Services.AddSingleton<IMessagingClient, ClientDefault>();
builder.Services.AddSingleton<ClientDefault>();
var app = builder.Build();
app.UseSwagger();

View File

@ -0,0 +1,40 @@

using Sphagnum.Client;
namespace Sphagnum.DebugClient.Services
{
public class TestService : IHostedService
{
private readonly CancellationTokenSource _cts;
private readonly ClientDefault _clientDefault;
private readonly Task _consumingTask;
public TestService(ClientDefault clientDefault)
{
_cts = new CancellationTokenSource();
_clientDefault = clientDefault;
_consumingTask = Consuming();
}
private async Task Consuming()
{
while (!_cts.IsCancellationRequested)
{
var message = await _clientDefault.Consume(_cts.Token);
await _clientDefault.Ack(message.MessageId);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();
return Task.CompletedTask;
}
}
}

View File

@ -1,10 +1,17 @@
using Sphagnum.Common.Old.Contracts.Login;
using Sphagnum.Server;
using Sphagnum.Common.Infrastructure.Services;
using Sphagnum.Server.Broker.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddSingleton<ConnectionFactory>();
builder.Services.AddHostedService<BrokerHost>();
builder.Services.AddSingleton(new ConnectionFactory()
{
Port = 8081,
});
builder.Services.AddSingleton<MessagesProcessor>();
builder.Services.AddSingleton<ConnectionsManager>();
builder.Services.AddSingleton<ConnectionsManager>();
builder.Services.AddHostedService<ConnectionsReciever>();
var app = builder.Build();
app.MapControllers();

View File

@ -19,6 +19,7 @@ namespace Sphagnum.Client
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public ClientDefault(ConnectionFactory factory)
{
Task.Delay(10000).Wait();
_connection = factory.CreateConnection().Result;
_recievingTask = RecivingTask();
}
@ -47,9 +48,9 @@ namespace Sphagnum.Client
// throw new Exception("Auth failed!");
//}
public ValueTask Ack(Guid messageId)
public async ValueTask Ack(Guid messageId)
{
throw new NotImplementedException();
await Task.Delay(1000);
}
public ValueTask Nack(Guid messageId)
@ -59,9 +60,9 @@ namespace Sphagnum.Client
public async ValueTask<Guid> Publish(Message message)
{
var bytes = MessageParserold.PackMessage(message);
var bytes = MessageParser.PackMessage(message);
await _connection.SendAsync(bytes.AsMemory(), System.Net.Sockets.SocketFlags.None);
return MessageParserold.GetMessageId(bytes);
return MessageParser.GetMessageId(bytes);
}
public async ValueTask<Message> Consume(CancellationToken cancellationToken)

View File

@ -11,6 +11,8 @@ namespace Sphagnum.Common.Infrastructure.Contracts
/// </summary>
internal interface IConnection : IDisposable
{
public CancellationTokenSource CancellationTokenSource { get; }
public Guid ConnectionId { get; }
Task ConnectAsync(string host, int port);
Task<IConnection> AcceptAsync();
//todo прописать бросаемые исключения
@ -25,5 +27,7 @@ namespace Sphagnum.Common.Infrastructure.Contracts
//todo прописать бросаемые исключения
void Close();
bool Connected { get; }
public event Action<Guid> ConnectionClosed;
}
}

View File

@ -1,5 +1,4 @@
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Old.Contracts.Login;
using System.Net.Sockets;
using System.Threading.Tasks;
@ -11,7 +10,7 @@ namespace Sphagnum.Common.Infrastructure.Services
public string Hostname { get; set; } = string.Empty;
public string Login { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
public UserRights UserRights { get; set; }
internal virtual async Task<IConnection> CreateConnection(bool connected = true)
{
var conn = new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));

View File

@ -9,6 +9,12 @@ namespace Sphagnum.Common.Infrastructure.Services
{
internal class SocketConnection : IConnection
{
public Guid ConnectionId { get; private set; } = Guid.NewGuid();
public bool Connected => _socket.Connected;
public CancellationTokenSource CancellationTokenSource { get; private set; } = new CancellationTokenSource();
public event Action<Guid>? ConnectionClosed;
private readonly Socket _socket;
public SocketConnection(Socket socket)
@ -16,8 +22,6 @@ namespace Sphagnum.Common.Infrastructure.Services
_socket = socket;
}
public bool Connected => _socket.Connected;
public async Task<IConnection> AcceptAsync()
{
var socket = await _socket.AcceptAsync();
@ -31,6 +35,7 @@ namespace Sphagnum.Common.Infrastructure.Services
public void Close()
{
ConnectionClosed?.Invoke(ConnectionId);
_socket.Close();
}
@ -41,6 +46,7 @@ namespace Sphagnum.Common.Infrastructure.Services
public void Dispose()
{
ConnectionClosed?.Invoke(ConnectionId);
_socket.Dispose();
}

View File

@ -1,8 +1,6 @@
using Sphagnum.Common.Infrastructure.Contracts;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

View File

@ -5,8 +5,4 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<Folder Include="Old\Utils\Enums\" />
</ItemGroup>
</Project>

View File

@ -1,61 +0,0 @@
using Sphagnum.Common.Old.Contracts.Login;
using Sphagnum.Common.Old.Services;
using Sphagnum.Server.Cluster.Contracts;
using Sphagnum.Server.Cluster.Services;
using Sphagnum.Server.DataProcessing.Contracts;
using Sphagnum.Server.DataProcessing.Services;
using Sphagnum.Server.Storage.Messages.Contracts;
using Sphagnum.Server.Storage.Messages.Services;
using Sphagnum.Server.Storage.Users.Contracts;
using Sphagnum.Server.Storage.Users.Services;
namespace Sphagnum.Server.Broker.Services
{
internal class BrokerDefaultBase(ConnectionFactory connectionFactory, IMessagesStorage messagesStorage, IDistributor distributor, IDataProcessor dataProcessor)
{
private readonly SphagnumConnectionOld _connection;
private readonly CancellationTokenSource _cts = new();
private Task? _acceptationTask;
private readonly IAuthInfoStorage _authInfoStorage = new AuthInfoStorageBase();
private readonly IMessagesStorage _messagesStorage = messagesStorage;
private readonly IDistributor _distributor = distributor;
private readonly IDataProcessor _dataProcessor = dataProcessor;
private readonly ConnectionFactory _connectionFactory = connectionFactory;
public Task StartAsync(int port)
{
_connection?.Bind(port);
_connection?.Listen(1000); //todo разобраться что делает этот параметр.
//_acceptationTask = AcceptationWorker(_cts.Token);
return Task.CompletedTask;
}
public Task StopAsync()
{
_cts.Cancel();
return Task.CompletedTask;
}
internal static BrokerDefaultBase Create(ConnectionFactory connectionFactory)
{
return new BrokerDefaultBase(
connectionFactory,
new MessagesStorageDefault(),
new DistributorDefault(),
new DataProcessorDefault()
);
}
private async Task AcceptationWorker(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var acceptedSocket = await _connection.AcceptAsync();
}
}
}
}

View File

@ -1,26 +0,0 @@
using Microsoft.Extensions.Hosting;
using Sphagnum.Common.Old.Contracts.Login;
using Sphagnum.Server.Broker.Services;
namespace Sphagnum.Server
{
public class BrokerHost : IHostedService
{
private readonly BrokerDefaultBase _broker;
public BrokerHost(ConnectionFactory connectionFactory)
{
_broker = BrokerDefaultBase.Create(connectionFactory);
}
public Task StartAsync(CancellationToken cancellationToken)
{
return _broker.StartAsync(8081);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _broker.StopAsync();
}
}
}

View File

@ -1,6 +1,22 @@
namespace Sphagnum.Server.Broker.Services
using Sphagnum.Common.Infrastructure.Contracts;
using System.Collections.Concurrent;
namespace Sphagnum.Server.Broker.Services
{
internal class ConnectionsManager
public class ConnectionsManager
{
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
public ConnectionsManager()
{
}
internal void AddConnection(IConnection connection)
{
connection.ConnectionClosed += (id) =>
{
Connections.TryRemove(id, out var conn);
};
}
}
}

View File

@ -0,0 +1,56 @@
using Microsoft.Extensions.Hosting;
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
using System.Net;
namespace Sphagnum.Server.Broker.Services
{
public class ConnectionsReciever : IHostedService
{
private readonly CancellationTokenSource _cts = new();
private Task _acceptationTask;
private readonly IConnection _connection;
private readonly ConnectionsManager _manager;
private readonly MessagesProcessor _processor;
public ConnectionsReciever(ConnectionFactory connectionFactory, ConnectionsManager manager, MessagesProcessor processor)
{
_manager = manager;
_connection = connectionFactory.CreateConnection(false).Result;
_connection.Listen(connectionFactory.Port);
_acceptationTask = AcceptationWorker();
_processor = processor;
}
public Task StopAsync()
{
_cts.Cancel();
return Task.CompletedTask;
}
private async Task AcceptationWorker()
{
while (!_cts.IsCancellationRequested)
{
var acceptedSocket = await _connection.AcceptAsync();
_manager.AddConnection(acceptedSocket);
_processor.AddConnection(acceptedSocket);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
_connection?.Listen(1000); //todo разобраться что делает этот параметр.
//_acceptationTask = AcceptationWorker(_cts.Token);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_connection.Close();
_cts.Cancel();
return Task.CompletedTask;
}
}
}

View File

@ -1,68 +1,35 @@
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Messaging.Contracts.Messages;
using Sphagnum.Common.Messaging.Utils;
using Sphagnum.Common.Old.Contracts;
using Sphagnum.Common.Old.Contracts.Login;
using Sphagnum.Common.Old.Exceptions;
using Sphagnum.Server.Cluster.Contracts;
using Sphagnum.Server.DataProcessing.Contracts;
using Sphagnum.Server.Storage.Messages.Contracts;
using Sphagnum.Server.Storage.Users.Contracts;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Sphagnum.Common.Messaging.Extensions;
namespace Sphagnum.Server.Broker.Services
{
internal class MessagesProcessor
public class MessagesProcessor
{
private bool AuthOk = true;
private readonly IAuthInfoStorage _authInfoStorage;
private readonly IMessagesStorage _messagesStorage;
private readonly IDistributor _distributor;
private readonly IDataProcessor _dataProcessor;
public MessagesProcessor(IAuthInfoStorage authInfoStorage, IMessagesStorage messagesStorage, IDistributor distributor, IDataProcessor dataProcessor)
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
internal void AddConnection(IConnection connection)
{
_authInfoStorage = authInfoStorage;
_messagesStorage = messagesStorage;
_distributor = distributor;
_dataProcessor = dataProcessor;
connection.ConnectionClosed += (id) =>
{
Connections.TryRemove(id, out var conn);
};
ProcessMessages(connection);
}
internal async Task ProcessMessage(byte[] message)
{
if (AuthOk)
internal static async Task ProcessMessages(IConnection connection)
{
while (!connection.CancellationTokenSource.IsCancellationRequested)
{
await _messagesStorage.LogMessage(message);
await _distributor.DistributeData(message);
await _dataProcessor.PutMessage(message);
var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token);
var mess = MessageParser.UnpackMessage(data);
}
else if (await CheckRights(message))
{
AuthOk = true;
await _messagesStorage.LogMessage(message);
await _distributor.DistributeData(message);
await _dataProcessor.PutMessage(message);
}
else
{
throw new AuthException();
}
}
private async ValueTask<bool> CheckRights(byte[] buffer)
{
var messageType = MessageParserold.GetMessageType(buffer);
if (messageType == MessageType.Auth)
{
var payloadStart = MessageParserold.GetPayloadStart(buffer);
var rights = (UserRights)BitConverter.ToInt16(buffer.AsSpan(Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes + payloadStart, 2));
var isRecievingAllowed = await _authInfoStorage.CheckRights(
buffer.AsSpan(payloadStart, Constants.HashedUserDataSizeInfBytes),
buffer.AsSpan(payloadStart + Constants.HashedUserDataSizeInfBytes, Constants.HashedUserDataSizeInfBytes),
rights,
new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token
);
return isRecievingAllowed;
}
return false;
}
}
}

View File

@ -1,7 +0,0 @@
namespace Sphagnum.Server.Cluster.Contracts
{
public interface IDistributor
{
ValueTask DistributeData(ReadOnlySpan<byte> data);
}
}

View File

@ -1,12 +0,0 @@
using Sphagnum.Server.Cluster.Contracts;
namespace Sphagnum.Server.Cluster.Services
{
internal class DistributorDefault : IDistributor
{
public ValueTask DistributeData(ReadOnlySpan<byte> data)
{
return ValueTask.CompletedTask;
}
}
}

View File

@ -1,9 +0,0 @@
namespace Sphagnum.Server.DataProcessing.Contracts
{
internal interface IDataProcessor
{
public bool RegisterCoprocessor(string key, Func<byte[], Task> func);
public Func<byte[], Task>? UnregisterCoprocessor(string key);
public ValueTask PutMessage(ReadOnlySpan<byte> message);
}
}

View File

@ -1,29 +0,0 @@
using Sphagnum.Server.DataProcessing.Contracts;
using System.Collections.Concurrent;
namespace Sphagnum.Server.DataProcessing.Services
{
internal class DataProcessorDefault : IDataProcessor
{
private readonly ConcurrentDictionary<string, Func<byte[], Task>> _processors = new();
public ValueTask PutMessage(ReadOnlySpan<byte> message)
{
return ValueTask.CompletedTask;
}
public bool RegisterCoprocessor(string key, Func<byte[], Task> func)
{
return _processors.TryAdd(key, func);
}
public Func<byte[], Task>? UnregisterCoprocessor(string key)
{
if (_processors.TryRemove(key, out Func<byte[], Task>? res))
{
return res;
}
return null;
}
}
}

View File

@ -1,7 +0,0 @@
namespace Sphagnum.Server.Storage.Messages.Contracts
{
internal interface IMessagesStorage
{
ValueTask LogMessage(ReadOnlyMemory<byte> message);
}
}

View File

@ -1,12 +0,0 @@
using Sphagnum.Server.Storage.Messages.Contracts;
namespace Sphagnum.Server.Storage.Messages.Services
{
internal class MessagesStorageDefault : IMessagesStorage
{
public ValueTask LogMessage(ReadOnlyMemory<byte> message)
{
return ValueTask.CompletedTask;
}
}
}

View File

@ -1,13 +0,0 @@
using Sphagnum.Common.Old.Contracts.Login;
namespace Sphagnum.Server.Storage.Users.Contracts
{
internal interface IAuthInfoStorage
{
public ValueTask<bool> CheckRights(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights, CancellationToken token = default);
public ValueTask AddUser(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights);
public ValueTask SetRights(Span<byte> hashedUsername, UserRights userRights);
}
}

View File

@ -1,30 +0,0 @@
using Sphagnum.Common.Old.Contracts.Login;
using Sphagnum.Common.Old.Utils;
using Sphagnum.Server.Storage.Users.Contracts;
using System.Numerics;
namespace Sphagnum.Server.Storage.Users.Services
{
internal class AuthInfoStorageBase : IAuthInfoStorage
{
private readonly Vector<byte> RootUserLogin = new(HashCalculator.Calc("root"));
private readonly Vector<byte> RootUserPassword = new(HashCalculator.Calc("root"));
private readonly UserRights RootUserRights = UserRights.All;
public ValueTask AddUser(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights)
{
return ValueTask.CompletedTask;
}
public ValueTask<bool> CheckRights(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights, CancellationToken token = default)
{
var username = new Vector<byte>(hashedUsername);
var pwd = new Vector<byte>(hashedPassword);
return ValueTask.FromResult(username == RootUserLogin && pwd == RootUserPassword && (userRights & RootUserRights) == userRights);
}
public ValueTask SetRights(Span<byte> hashedUsername, UserRights userRights)
{
return ValueTask.CompletedTask;
}
}
}

View File

@ -17,33 +17,6 @@ namespace Sphagnum.Common.UnitTests.Comparers
return res;
}
public static bool Compare(OutgoingMessage message1, OutgoingMessage message2)
{
var res = true;
res &= message1.Exchange == message2.Exchange;
res &= message1.RoutingKey.Part1 == message2.RoutingKey.Part1;
res &= message1.RoutingKey.Part2 == message2.RoutingKey.Part2;
res &= message1.RoutingKey.Part3 == message2.RoutingKey.Part3;
res &= message1.Payload.Length == message2.Payload.Length;
res &= ComparePayloads(message1, message2);
return res;
}
public static bool Compare(IncommingMessage message1, IncommingMessage message2)
{
var res = true;
res &= message1.MessageId == message2.MessageId;
res &= ComparePayloads(message1.Payload.ToArray(), message2.Payload.ToArray());
return res;
}
public static bool ComparePayloads(OutgoingMessage message1, OutgoingMessage message2)
{
var payload1 = message1.Payload.ToArray();
var payload2 = message2.Payload.ToArray();
return ComparePayloads(payload1, payload2);
}
public static bool ComparePayloads(Message message1, Message message2)
{
var payload1 = message1.Payload.ToArray();