Compare commits

...

7 Commits

Author SHA1 Message Date
vlad zverzhkhovskiy 889934c51b фиксацтя
checks / check and test (push) Has been cancelled Details
2025-08-15 09:22:38 +03:00
vlad zverzhkhovskiy 7ddb102efe Тестовое соединения и тесты к нему
checks / check and test (push) Has been cancelled Details
2025-08-07 12:48:10 +03:00
vlad zverzhkhovskiy 0c59ba8d64 Корректная работа с сокетом 2025-08-07 10:14:18 +03:00
vlad zverzhkhovskiy 1f987c6982 фиксация 2025-08-07 10:03:02 +03:00
vlad zverzhkhovskiy c21e3ed952 промежуточная фиксация рефакторинга 2025-08-06 17:34:54 +03:00
vlad zverzhkhovskiy 78f9f6c398 промежуточная фиксация рефакторинга 2025-08-06 17:14:23 +03:00
vlad zverzhkhovskiy fba480eaae промежуточная фиксация рефакторинга 2025-08-06 17:06:21 +03:00
76 changed files with 891 additions and 1253 deletions

View File

@ -1,6 +1,6 @@
using Microsoft.AspNetCore.Mvc;
using Sphagnum.Common.Contracts.Messaging;
using Sphagnum.Common.Contracts.Messaging.Messages;
using Sphagnum.Client;
using Sphagnum.Common.Messaging.Contracts;
namespace Sphagnum.DebugClient.Controllers
{
@ -8,16 +8,15 @@ namespace Sphagnum.DebugClient.Controllers
[Route("[controller]/[action]")]
public class TestController : ControllerBase
{
private readonly IMessagingClient _connection;
private static readonly Task? rec;
private readonly ClientDefault _connection;
public TestController(IMessagingClient connection)
public TestController(ClientDefault connection)
{
_connection = connection;
}
[HttpGet]
public string test()
public string Test()
{
return "Ok!";
}
@ -34,9 +33,9 @@ namespace Sphagnum.DebugClient.Controllers
payload1[i] = 1;
payload2[i] = 2;
}
var t1 = _connection.Publish(new OutgoingMessage("test", payload1)).AsTask();
var t2 = _connection.Publish(new OutgoingMessage("test", payload2)).AsTask();
await Task.WhenAll(t1, t2);
var t1 = _connection.Publish(new Common.Messaging.Contracts.Messages.Message("test", RoutingKey.Empty, payload1)).AsTask();
//var t2 = _connection.Publish(new Message("test", RoutingKey.Empty, payload2)).AsTask();
await Task.WhenAll(t1);
}
}
}

View File

@ -1,6 +1,5 @@
using Sphagnum.Client;
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Common.Contracts.Messaging;
using Sphagnum.Common.Infrastructure.Services;
var builder = WebApplication.CreateBuilder(args);
@ -9,13 +8,13 @@ builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddSingleton(new ConnectionFactory()
{
UserRights = UserRights.All,
//UserRights = UserRights.All,
Login = "root",
Password = "root",
Hostname = "test_server",
Port = 8081,
});
builder.Services.AddSingleton<IMessagingClient, ClientDefault>();
builder.Services.AddSingleton<ClientDefault>();
var app = builder.Build();
app.UseSwagger();

View File

@ -0,0 +1,40 @@

using Sphagnum.Client;
namespace Sphagnum.DebugClient.Services
{
public class TestService : IHostedService
{
private readonly CancellationTokenSource _cts;
private readonly ClientDefault _clientDefault;
private readonly Task _consumingTask;
public TestService(ClientDefault clientDefault)
{
_cts = new CancellationTokenSource();
_clientDefault = clientDefault;
_consumingTask = Consuming();
}
private async Task Consuming()
{
while (!_cts.IsCancellationRequested)
{
var message = await _clientDefault.Consume(_cts.Token);
await _clientDefault.Ack(message.MessageId);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();
return Task.CompletedTask;
}
}
}

View File

@ -1,10 +1,16 @@
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Server;
using Sphagnum.Common.Infrastructure.Services;
using Sphagnum.Server.Broker.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddSingleton<ConnectionFactory>();
builder.Services.AddHostedService<BrokerHost>();
builder.Services.AddSingleton(new ConnectionFactory()
{
Port = 8081,
});
builder.Services.AddSingleton<MessagesProcessor>();
builder.Services.AddSingleton<ConnectionsManager>();
builder.Services.AddHostedService<ConnectionsReciever>();
var app = builder.Build();
app.MapControllers();

View File

@ -23,6 +23,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sphagnum.Common.UnitTests",
EndProject
Project("{E53339B2-1760-4266-BCC7-CA923CBCF16C}") = "docker-compose", "docker-compose.dcproj", "{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sphagnum.FuncTests", "..\tests\Sphagnum.FuncTests\Sphagnum.FuncTests.csproj", "{C160C372-5E5E-5442-462B-A5B5C36BE2D0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -57,6 +59,10 @@ Global
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Release|Any CPU.Build.0 = Release|Any CPU
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -68,6 +74,7 @@ Global
{C344FDF3-3D4D-4BC6-B257-4AD907BE1BE4} = {71CDAFF8-7C3D-4915-8FAA-5C263857AD62}
{77F76F27-D883-4392-90D5-8F441043F468} = {71CDAFF8-7C3D-4915-8FAA-5C263857AD62}
{25865911-1F2D-4083-A99C-C65E13F05C14} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750}
{C160C372-5E5E-5442-462B-A5B5C36BE2D0} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {42DA8F91-1667-49BC-971D-D83C0870393A}

View File

@ -1,13 +0,0 @@
namespace Sphagnum.Client
{
//internal class ClientConnection : SocketConnection
//{
// public ClientConnection() : base( async (dd) =>
// {
// })
// {
// }
//}
}

View File

@ -1,8 +1,9 @@
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Common.Contracts.Messaging;
using Sphagnum.Common.Contracts.Messaging.Messages;
using Sphagnum.Common.Services;
using Sphagnum.Common.Utils;
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Extensions;
using Sphagnum.Common.Infrastructure.Services;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Contracts.Messages;
using Sphagnum.Common.Messaging.Utils;
using System;
using System.Threading;
using System.Threading.Channels;
@ -10,42 +11,34 @@ using System.Threading.Tasks;
namespace Sphagnum.Client
{
public class ClientDefault : IMessagingClient, IDisposable
public sealed class ClientDefault : IDisposable
{
private readonly SphagnumConnection _connection;
private readonly ConnectionFactory _connectionFactory;
private readonly Task _recievingTask;
private readonly IConnection _connection;
private readonly Channel<byte[]> _commonMessagesChannel = Channel.CreateUnbounded<byte[]>();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public ClientDefault(ConnectionFactory factory)
{
_connectionFactory = factory;
_connection = factory.CreateDefaultConnected(() => async (mess) =>
{
await _commonMessagesChannel.Writer.WriteAsync(mess);
}).Result;
Auth().Wait();
Task.Delay(10000).Wait();
_connection = factory.CreateConnection().Result;
_recievingTask = RecivingTask();
}
private async Task<byte[]> ReceiveAsync()
private async Task RecivingTask()
{
return await _commonMessagesChannel.Reader.ReadAsync(_cts.Token);
}
private async Task Auth()
{
await _connection.SendAsync(MessageParser.PackMessage(new AuthMessage(_connectionFactory.Login, _connectionFactory.Password, _connectionFactory.UserRights)));
var response = await ReceiveAsync();
var messageType = MessageParser.GetMessageType(response);
if (messageType == Common.Utils.Models.MessageType.AuthSuccessfull)
while (!_cts.IsCancellationRequested)
{
return;
var data = await _connection.ReceiveAsync(_cts.Token);
if (MessageParser.GetMessageType(data) == MessageType.Common)
{
await _commonMessagesChannel.Writer.WriteAsync(data);
}
}
throw new Exception("Auth failed!");
}
public ValueTask Ack(Guid messageId)
public async ValueTask Ack(Guid messageId)
{
throw new NotImplementedException();
await Task.Delay(1000);
}
public ValueTask Nack(Guid messageId)
@ -53,17 +46,17 @@ namespace Sphagnum.Client
throw new NotImplementedException();
}
public async ValueTask<Guid> Publish(OutgoingMessage message)
public async ValueTask<Guid> Publish(Message message)
{
var bytes = MessageParser.PackMessage(message);
await _connection.SendAsync(bytes);
await _connection.SendAsync(bytes.AsMemory(), System.Net.Sockets.SocketFlags.None);
return MessageParser.GetMessageId(bytes);
}
public async ValueTask<IncommingMessage> Consume(CancellationToken cancellationToken)
public async ValueTask<Message> Consume(CancellationToken cancellationToken)
{
var result = await _commonMessagesChannel.Reader.ReadAsync(cancellationToken);
return MessageParser.UnpackIncomingMessage(result);
return MessageParser.UnpackMessage(result);
}
public ValueTask Reject(Guid messageId)

View File

@ -2,6 +2,8 @@
//Тесты
[assembly: InternalsVisibleTo("Sphagnum.Common.UnitTests")]
[assembly: InternalsVisibleTo("Sphagnum.Server.Tests")]
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]
[assembly: InternalsVisibleTo("Sphagnum.Client")]
[assembly: InternalsVisibleTo("Sphagnum.Server")]

View File

@ -1,14 +0,0 @@
namespace Sphagnum.Common.Contracts.Administration.Enums
{
public enum ExchangeType : byte
{
/// <summary>
/// Раздает сообщения во все топики с подходящим ключём маршрутизации.
/// </summary>
Broadcast,
/// <summary>
/// Отправляет сообщение в одну из очередей с подходящим ключём маршрутизации.
/// </summary>
Topic,
}
}

View File

@ -1,8 +0,0 @@
namespace Sphagnum.Common.Contracts.Administration.Enums
{
public enum TopicType
{
Queue,
Stack,
}
}

View File

@ -1,15 +0,0 @@
using Sphagnum.Common.Contracts.Administration.Requests;
using System.Threading.Tasks;
namespace Sphagnum.Common.Contracts.Administration
{
public interface IAdministrationClient
{
public ValueTask CreateExchange(ExchangeCreationRequest exchangeCreationRequest);
public ValueTask CreateTopic(TopicCreationRequest topicCreationRequest);
public ValueTask DeleteTopic(string topicName);
public ValueTask DeleteExchange(string exchangeName);
public ValueTask BindTopic(TopicBindingRequest topicCreationRequest);
public ValueTask UnbindTopic(TopicBindingRequest topicCreationRequest);
}
}

View File

@ -1,10 +0,0 @@
using Sphagnum.Common.Contracts.Administration.Enums;
namespace Sphagnum.Common.Contracts.Administration.Requests
{
public readonly ref struct ExchangeCreationRequest
{
public readonly string ExchangeName;
public readonly ExchangeType ExchangeType;
}
}

View File

@ -1,11 +0,0 @@
using Sphagnum.Common.Contracts.Messaging;
namespace Sphagnum.Common.Contracts.Administration.Requests
{
public readonly ref struct TopicBindingRequest
{
public readonly string TopicName;
public readonly string ExchangeName;
public readonly RoutingKey RoutingKey;
}
}

View File

@ -1,10 +0,0 @@
using Sphagnum.Common.Contracts.Administration.Enums;
namespace Sphagnum.Common.Contracts.Administration.Requests
{
public readonly ref struct TopicCreationRequest
{
public readonly string TopicName;
public readonly TopicType TopicType;
}
}

View File

@ -1,8 +0,0 @@
namespace Sphagnum.Common
{
internal static class Constants
{
public const int HashedUserDataSizeInfBytes = 32;
public const int PayloadRecieverBufferSize = 8192;
}
}

View File

@ -1,8 +0,0 @@
using Sphagnum.Common.Contracts.Messaging;
namespace Sphagnum.Common.Contracts
{
public interface ISphagnumClient : IMessagingClient
{
}
}

View File

@ -1,28 +0,0 @@
using Sphagnum.Common.Services;
using System;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Sphagnum.Common.Contracts.Login
{
public class ConnectionFactory
{
public int Port { get; set; }
public string Hostname { get; set; } = string.Empty;
public string Login { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
public UserRights UserRights { get; set; }
internal virtual async Task<SphagnumConnection> CreateDefaultConnected(Func<Func<byte[], Task>> messagesProcessorFactory)
{
var conn = new SphagnumConnection(() => new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp)), messagesProcessorFactory);
await conn.ConnectAsync(Hostname, Port);
return conn;
}
internal virtual SphagnumConnection CreateDefault(Func<Func<byte[], Task>> messagesProcessorFactory)
{
return new SphagnumConnection(() => new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp)), messagesProcessorFactory);
}
}
}

View File

@ -1,13 +0,0 @@
namespace Sphagnum.Common.Contracts.Login
{
public class UserData
{
public string Username { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
public int Port { get; set; }
public string Hostname { get; set; } = string.Empty;
public UserRights UserRights { get; set; } =
UserRights.MessagesConsuming |
UserRights.MessagesPublishing;
}
}

View File

@ -1,19 +0,0 @@
using System;
namespace Sphagnum.Common.Contracts.Login
{
[Flags]
public enum UserRights : ushort
{
None = 0,
MessagesConsuming = 1,
MessagesPublishing = 2,
TopicCreating = 4,
TopicDeleting = 8,
TopicBinding = 16,
ExchangeCreating = 32,
ExchangeDeleting = 64,
All = MessagesConsuming | MessagesPublishing | TopicCreating | TopicDeleting | TopicBinding | ExchangeCreating | ExchangeDeleting,
}
}

View File

@ -1,16 +0,0 @@
using Sphagnum.Common.Contracts.Messaging.Messages;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Contracts.Messaging
{
public interface IMessagingClient
{
public ValueTask<IncommingMessage> Consume(CancellationToken cancellationToken);
public ValueTask<Guid> Publish(OutgoingMessage message);
public ValueTask Ack(Guid messageId);
public ValueTask Nack(Guid messageId);
public ValueTask Reject(Guid messageId);
}
}

View File

@ -1,22 +0,0 @@
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Common.Utils;
using System;
namespace Sphagnum.Common.Contracts.Messaging.Messages
{
internal readonly ref struct AuthMessage
{
public readonly ReadOnlyMemory<byte> Payload;
public readonly Guid MessageId;
public AuthMessage(string login, string pwd, UserRights userRights)
{
MessageId = Guid.NewGuid();
var data = new byte[Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes + 2];
HashCalculator.Calc(login).CopyTo(data, 0);
HashCalculator.Calc(pwd).CopyTo(data, Constants.HashedUserDataSizeInfBytes);
BitConverter.TryWriteBytes(data.AsSpan(Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes), (ushort)userRights);
Payload = data;
}
}
}

View File

@ -1,9 +0,0 @@
using System;
namespace Sphagnum.Common.Contracts.Messaging.Messages
{
internal readonly ref struct AuthResultMessage
{
public readonly ReadOnlyMemory<byte> Payload;
}
}

View File

@ -1,17 +0,0 @@
using System;
namespace Sphagnum.Common.Contracts.Messaging.Messages
{
public readonly struct IncommingMessage
{
public readonly Guid MessageId;
public readonly ReadOnlyMemory<byte> Payload;
public IncommingMessage(Guid messageId, ReadOnlyMemory<byte> payload)
{
MessageId = messageId;
Payload = payload;
}
}
}

View File

@ -1,27 +0,0 @@
using System;
namespace Sphagnum.Common.Contracts.Messaging.Messages
{
public readonly struct OutgoingMessage
{
public readonly string Exchange;
public readonly RoutingKey RoutingKey;
public readonly ReadOnlyMemory<byte> Payload;
public OutgoingMessage(string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
{
Exchange = exchange;
RoutingKey = routingKey;
Payload = payload;
}
public OutgoingMessage(string exchange, ReadOnlyMemory<byte> payload)
{
Exchange = exchange;
RoutingKey = new RoutingKey();
Payload = payload;
}
}
}

View File

@ -1,8 +0,0 @@
using System;
namespace Sphagnum.Common.Exceptions
{
public class AuthException : Exception
{
}
}

View File

@ -4,10 +4,15 @@ using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Contracts.Infrastructure
namespace Sphagnum.Common.Infrastructure.Contracts
{
/// <summary>
/// Абстракция над подключением (сокет или Channel в случае работы клиента и сервра внутри одного процесса).
/// </summary>
internal interface IConnection : IDisposable
{
public CancellationTokenSource CancellationTokenSource { get; }
public Guid ConnectionId { get; }
Task ConnectAsync(string host, int port);
Task<IConnection> AcceptAsync();
//todo прописать бросаемые исключения
@ -22,5 +27,7 @@ namespace Sphagnum.Common.Contracts.Infrastructure
//todo прописать бросаемые исключения
void Close();
bool Connected { get; }
public event Action<Guid>? ConnectionClosed;
}
}

View File

@ -0,0 +1,21 @@
using Sphagnum.Common.Infrastructure.Contracts;
using System;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Infrastructure.Extensions
{
internal static class IConnectionExtensions
{
public static async ValueTask<byte[]> ReceiveAsync(this IConnection connection, CancellationToken cancellationToken = default)
{
var lengthBuffer = new byte[5];
await connection.ReceiveAsync(lengthBuffer, SocketFlags.Peek, cancellationToken);
var length = BitConverter.ToInt32(lengthBuffer, 0);
var result = new byte[length];
await connection.ReceiveAsync(result, SocketFlags.None, cancellationToken);
return result;
}
}
}

View File

@ -0,0 +1,25 @@
using Sphagnum.Common.Infrastructure.Contracts;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Sphagnum.Common.Infrastructure.Services
{
public class ConnectionFactory
{
public int Port { get; set; }
public string Hostname { get; set; } = string.Empty;
public string Login { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
internal virtual async Task<IConnection> CreateConnection(bool connected = true)
{
var conn = new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
if (connected)
{
await conn.ConnectAsync(Hostname, Port);
}
return conn;
}
}
}

View File

@ -1,14 +1,20 @@
using Sphagnum.Common.Contracts.Infrastructure;
using Sphagnum.Common.Infrastructure.Contracts;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Services
namespace Sphagnum.Common.Infrastructure.Services
{
internal class SocketConnection : IConnection
{
public Guid ConnectionId { get; private set; } = Guid.NewGuid();
public bool Connected => _socket.Connected;
public CancellationTokenSource CancellationTokenSource { get; private set; } = new CancellationTokenSource();
public event Action<Guid>? ConnectionClosed;
private readonly Socket _socket;
public SocketConnection(Socket socket)
@ -16,8 +22,6 @@ namespace Sphagnum.Common.Services
_socket = socket;
}
public bool Connected => _socket.Connected;
public async Task<IConnection> AcceptAsync()
{
var socket = await _socket.AcceptAsync();
@ -31,6 +35,7 @@ namespace Sphagnum.Common.Services
public void Close()
{
ConnectionClosed?.Invoke(ConnectionId);
_socket.Close();
}
@ -41,6 +46,7 @@ namespace Sphagnum.Common.Services
public void Dispose()
{
ConnectionClosed?.Invoke(ConnectionId);
_socket.Dispose();
}

View File

@ -1,6 +1,6 @@
using System;
namespace Sphagnum.Common.Utils.Models
namespace Sphagnum.Common.Messaging.Contracts
{
[Flags]
internal enum MessageFlags : ushort

View File

@ -1,4 +1,4 @@
namespace Sphagnum.Common.Utils.Models
namespace Sphagnum.Common.Messaging.Contracts
{
internal enum MessageType : byte
{

View File

@ -0,0 +1,32 @@
using System;
namespace Sphagnum.Common.Messaging.Contracts.Messages
{
public readonly struct Message
{
public readonly Guid MessageId;
public readonly ReadOnlyMemory<byte> Payload;
public readonly string Exchange;
public readonly RoutingKey RoutingKey;
public Message(string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
{
Exchange = exchange;
RoutingKey = routingKey;
Payload = payload;
RoutingKey = routingKey;
MessageId = Guid.NewGuid();
}
internal Message(Guid messageId, string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
{
Exchange = exchange;
RoutingKey = routingKey;
Payload = payload;
MessageId = messageId;
}
}
}

View File

@ -1,4 +1,4 @@
namespace Sphagnum.Common.Contracts.Messaging
namespace Sphagnum.Common.Messaging.Contracts
{
public readonly struct RoutingKey
{
@ -14,5 +14,7 @@
Part2 = part2;
Part3 = part3;
}
public readonly static RoutingKey Empty = new RoutingKey();
}
}

View File

@ -1,11 +1,10 @@
using Sphagnum.Common.Contracts.Messaging;
using Sphagnum.Common.Contracts.Messaging.Messages;
using Sphagnum.Common.Utils.Models;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Contracts.Messages;
using System;
using System.Runtime.CompilerServices;
using System.Text;
namespace Sphagnum.Common.Utils
namespace Sphagnum.Common.Messaging.Utils
{
/// <summary>
/// Порядок передачи:
@ -21,7 +20,7 @@ namespace Sphagnum.Common.Utils
/// </summary>
internal static class MessageParser
{
public static OutgoingMessage UnpackOutgoingMessage(byte[] bytes)
public static Message UnpackMessage(byte[] bytes)
{
if ((MessageType)bytes[4] != MessageType.Common)
{
@ -30,64 +29,11 @@ namespace Sphagnum.Common.Utils
var exchangeName = GetExchangeName(bytes);
var routingKey = GetRoutingKey(bytes);
var payload = GetPayload(bytes);
return new OutgoingMessage(exchangeName, routingKey, payload);
}
public static IncommingMessage UnpackIncomingMessage(byte[] bytes)
{
if ((MessageType)bytes[4] != MessageType.Common)
{
throw new ArgumentException("Uncorrect message type! 1 (MessageType.Common) expected!");
}
var id = GetMessageId(bytes);
var payload = GetPayload(bytes);
return new IncommingMessage(id, payload);
return new Message(id, exchangeName, routingKey, payload);
}
public static byte[] PackMessage(IncommingMessage message)
{
var result = new byte[27 + message.Payload.Length];
result[4] = (byte)MessageType.Common;
var flags = MessageFlags.HasPayload;
BitConverter.TryWriteBytes(result.AsSpan(5, 2), (ushort)flags);
message.MessageId.TryWriteBytes(result.AsSpan(7));
BitConverter.TryWriteBytes(result.AsSpan(23), message.Payload.Length);
message.Payload.CopyTo(result.AsMemory(27));
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
return result;
}
public static byte[] PackReplyMessage(MessageType messageType)
{
var res = new byte[23];
res[4] = (byte)messageType;
res[0] = 23;
return res;
}
public static byte[] PackReplyMessage(MessageType messageType, Guid parentMessageId)
{
var res = new byte[23];
res[4] = (byte)messageType;
res[0] = 23;
parentMessageId.TryWriteBytes(res.AsSpan(7));
return res;
}
public static byte[] PackMessage(AuthMessage message)
{
var result = new byte[27 + message.Payload.Length];
result[4] = (byte)MessageType.Auth;
var flags = MessageFlags.HasPayload;
BitConverter.TryWriteBytes(result.AsSpan(5, 2), (ushort)flags);
message.MessageId.TryWriteBytes(result.AsSpan(7));
BitConverter.TryWriteBytes(result.AsSpan(23), message.Payload.Length);
message.Payload.CopyTo(result.AsMemory(27));
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
return result;
}
public static byte[] PackMessage(OutgoingMessage message)
public static byte[] PackMessage(Message message)
{
if (string.IsNullOrEmpty(message.Exchange) || string.IsNullOrWhiteSpace(message.Exchange))
{
@ -115,7 +61,7 @@ namespace Sphagnum.Common.Utils
var exchangeNameBytes = Encoding.UTF8.GetBytes(message.Exchange);// todo перевести на более оптимальный метод, не аллоцирующий лишнего.
count += exchangeNameBytes.Length;
count++;
return Pack(message, Guid.NewGuid(), flags, count);
return Pack(message, flags, count);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@ -126,30 +72,40 @@ namespace Sphagnum.Common.Utils
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static Guid GetMessageId(Span<byte> bytes)
internal static byte[] Pack(Message message, MessageFlags flags, int count)
{
var slice = bytes.Slice(7, 16);
return new Guid(slice);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static Guid GetMessageId(ReadOnlyMemory<byte> bytes)
{
var slice = bytes.Slice(7, 16);
return new Guid(slice.Span);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void CopyId(ReadOnlySpan<byte> bytes, byte[] buffer)
{
var slice = bytes.Slice(7, 16);
slice.CopyTo(buffer.AsSpan(7, 16));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static MessageType GetMessageType(Span<byte> bytes)
{
return (MessageType)bytes[4];
var result = new byte[count];
result[4] = (byte)MessageType.Common;
var shift = 5;
BitConverter.TryWriteBytes(result.AsSpan(shift), (ushort)flags);//2. flags
shift += 2;
message.MessageId.TryWriteBytes(result.AsSpan(shift));//3. id
shift += 16;
if ((flags & MessageFlags.HasExchange) == MessageFlags.HasExchange)
{
var exchangeBytes = Encoding.UTF8.GetBytes(message.Exchange);
BitConverter.TryWriteBytes(result.AsSpan(shift), (byte)message.Exchange.Length);//4. ExchangeNameLength
shift += 1;
exchangeBytes.CopyTo(result.AsSpan(shift));//5. ExchangeName
shift += exchangeBytes.Length;
}
if ((flags & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey)//6. RoutingKey
{
result[shift] = message.RoutingKey.Part1;
shift++;
result[shift] = message.RoutingKey.Part2;
shift++;
result[shift] = message.RoutingKey.Part3;
shift++;
}
if ((flags & MessageFlags.HasPayload) == MessageFlags.HasPayload)
{
BitConverter.TryWriteBytes(result.AsSpan(shift), message.Payload.Length);//7. PayloadSize
shift += 4;
message.Payload.CopyTo(result.AsMemory(shift));//8. Payload
}
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
@ -180,80 +136,30 @@ namespace Sphagnum.Common.Utils
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static int GetPayloadStart(Span<byte> bytes)
internal static MessageType GetMessageType(Span<byte> bytes)
{
if (HasPayload(bytes))
{
var shift = 27;
if (HasExchange(bytes))//todo проверить бенчмарком, как работает инлайн
{
shift += bytes[23];
shift += 1;
}
if (HasKey(bytes))
{
shift += 3;
}
return shift;
}
return -1;
}
internal static byte[] Pack(OutgoingMessage message, Guid id, MessageFlags flags, int count)
{
var result = new byte[count];
result[4] = (byte)MessageType.Common;
var shift = 5;
BitConverter.TryWriteBytes(result.AsSpan(shift), (ushort)flags);//2. flags
shift += 2;
id.TryWriteBytes(result.AsSpan(shift));//3. id
shift += 16;
if ((flags & MessageFlags.HasExchange) == MessageFlags.HasExchange)
{
var exchangeBytes = Encoding.UTF8.GetBytes(message.Exchange);
BitConverter.TryWriteBytes(result.AsSpan(shift), (byte)(message.Exchange.Length));//4. ExchangeNameLength
shift += 1;
exchangeBytes.CopyTo(result.AsSpan(shift));//5. ExchangeName
shift += exchangeBytes.Length;
}
if ((flags & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey)//6. RoutingKey
{
result[shift] = message.RoutingKey.Part1;
shift++;
result[shift] = message.RoutingKey.Part2;
shift++;
result[shift] = message.RoutingKey.Part3;
shift++;
}
if ((flags & MessageFlags.HasPayload) == MessageFlags.HasPayload)
{
BitConverter.TryWriteBytes(result.AsSpan(shift), message.Payload.Length);//7. PayloadSize
shift += 4;
message.Payload.CopyTo(result.AsMemory(shift));//8. Payload
}
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
return result;
return bytes.Length < 5 ? MessageType.Unknown : (MessageType)bytes[4];
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool HasKey(Span<byte> bytes)
{
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
return (((MessageFlags)value & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey);
return ((MessageFlags)value & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool HasPayload(Span<byte> bytes)
{
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
return (((MessageFlags)value & MessageFlags.HasPayload) == MessageFlags.HasPayload);
return ((MessageFlags)value & MessageFlags.HasPayload) == MessageFlags.HasPayload;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool HasExchange(Span<byte> bytes)
{
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
return (((MessageFlags)value & MessageFlags.HasExchange) == MessageFlags.HasExchange);
return ((MessageFlags)value & MessageFlags.HasExchange) == MessageFlags.HasExchange;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]

View File

@ -1,36 +0,0 @@
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace Sphagnum.Common.Services
{
internal class ChannelsPool
{
private readonly int _poolSize;
public ChannelsPool(int poolSize)
{
_poolSize = poolSize;
}
private readonly ConcurrentQueue<Channel<byte[]>> channels = new ConcurrentQueue<Channel<byte[]>>();
public Channel<byte[]> Get()
{
if (channels.TryDequeue(out var channel))
{
return channel;
}
else
{
return Channel.CreateBounded<byte[]>(1);
}
}
public void Return(Channel<byte[]> channel)
{
if (channels.Count < _poolSize)
{
channels.Enqueue(channel);
}
}
}
}

View File

@ -1,143 +0,0 @@
using Sphagnum.Common.Contracts.Infrastructure;
using Sphagnum.Common.Exceptions;
using Sphagnum.Common.Utils;
using Sphagnum.Common.Utils.Models;
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Sphagnum.Common.Services
{
internal class SphagnumConnection
{
protected readonly IConnection _connection;
private readonly ChannelsPool _pool = new ChannelsPool(100);
private readonly ConcurrentDictionary<Guid, Channel<byte[]>> sendingItems = new ConcurrentDictionary<Guid, Channel<byte[]>>();
private readonly Func<Func<byte[], Task>> _messagesProcessorFactory;
private readonly Func<byte[], Task> _messagesProcessor;
public SphagnumConnection(Func<IConnection> connectionsFactory, Func<Func<byte[], Task>> messagesProcessorFactory)
{
_connection = connectionsFactory(); // new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
_messagesProcessorFactory = messagesProcessorFactory;
_messagesProcessor = _messagesProcessorFactory();
RecievingTask();
}
private SphagnumConnection(IConnection socket, Func<Func<byte[], Task>> messagesProcessorFactory)
{
_connection = socket;
_messagesProcessorFactory = messagesProcessorFactory;
_messagesProcessor = _messagesProcessorFactory();
}
public bool Connected => _connection.Connected;
public Task ConnectAsync(string host, int port)
{
return _connection.ConnectAsync(host, port);
}
public async virtual Task<SphagnumConnection> AcceptAsync()
{
var socket = await _connection.AcceptAsync();
return new SphagnumConnection(socket, _messagesProcessorFactory);
}
public void Bind(int port)
{
_connection.Bind(new IPEndPoint(IPAddress.Any, port));
}
public void Close()
{
_connection.Close();
}
public void Dispose()
{
_connection.Dispose();
}
public void Listen(int backlog)
{
_connection.Listen(backlog);
}
public async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
int res;
if (buffer.Span[4] != (byte)MessageType.MessageAccepted)
{
var channel = _pool.Get();
sendingItems.TryAdd(MessageParser.GetMessageId(buffer), channel);
res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
var message = await channel.Reader.WaitToReadAsync(cancellationToken);// todo обработка сообщения
_pool.Return(channel);
return res;
}
else
{
res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
}
return res;
}
private async ValueTask<byte[]> ReceiveAsync(CancellationToken cancellationToken = default)
{
var lengthBuffer = new byte[4];
await _connection.ReceiveAsync(lengthBuffer, SocketFlags.Peek, cancellationToken);
var length = BitConverter.ToInt32(lengthBuffer, 0);
var result = new byte[length];
await _connection.ReceiveAsync(result, SocketFlags.None, cancellationToken);
return result;
}
private async Task RecievingTask(CancellationToken token = default)
{
var successRespBuffer = new byte[23];
successRespBuffer[4] = (byte)MessageType.MessageAccepted;
successRespBuffer[0] = 23;
while (Connected)
{
try
{
var message = await ReceiveAsync(token);
if (message[4] != (byte)MessageType.MessageAccepted)
{
try
{
await _messagesProcessor(message);
MessageParser.CopyId(message, successRespBuffer);
await SendAsync(successRespBuffer, token);
}
catch (AuthException ex)
{
await SendAsync(successRespBuffer, token);
await Task.Delay(1000);
_connection.Close();
}
catch (Exception ex)
{
}
}
else if (sendingItems.TryRemove(MessageParser.GetMessageId(message), out var channel))
{
await channel.Writer.WriteAsync(message);
}
}
catch (Exception ex)
{
}
}
_connection.Close();
_connection.Dispose();
}
}
}

View File

@ -5,8 +5,4 @@
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
</ItemGroup>
</Project>

View File

@ -1,14 +0,0 @@
using System.Security.Cryptography;
using System.Text;
namespace Sphagnum.Common.Utils
{
internal static class HashCalculator
{
private readonly static SHA256 _hash = SHA256.Create();
public static byte[] Calc(string text)
{
return _hash.ComputeHash(Encoding.UTF8.GetBytes(text));
}
}
}

View File

@ -1,3 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Sphagnum.Server.FuncTests")]
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]

View File

@ -1,66 +0,0 @@
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Common.Services;
using Sphagnum.Server.Cluster.Contracts;
using Sphagnum.Server.Cluster.Services;
using Sphagnum.Server.DataProcessing.Contracts;
using Sphagnum.Server.DataProcessing.Services;
using Sphagnum.Server.Storage.Messages.Contracts;
using Sphagnum.Server.Storage.Messages.Services;
using Sphagnum.Server.Storage.Users.Contracts;
using Sphagnum.Server.Storage.Users.Services;
namespace Sphagnum.Server.Broker.Services
{
internal class BrokerDefaultBase(ConnectionFactory connectionFactory, IMessagesStorage messagesStorage, IDistributor distributor, IDataProcessor dataProcessor)
{
private readonly SphagnumConnection _connection;
private readonly CancellationTokenSource _cts = new();
private Task? _acceptationTask;
private readonly IAuthInfoStorage _authInfoStorage = new AuthInfoStorageBase();
private readonly IMessagesStorage _messagesStorage = messagesStorage;
private readonly IDistributor _distributor = distributor;
private readonly IDataProcessor _dataProcessor = dataProcessor;
private readonly ConnectionFactory _connectionFactory = connectionFactory;
public Task StartAsync(int port)
{
_connectionFactory.CreateDefault(() =>
{
var processor = new MessagesProcessor(_authInfoStorage, _messagesStorage, _distributor, _dataProcessor);
return processor.ProcessMessage;
});
_connection?.Bind(port);
_connection?.Listen(1000); //todo разобраться что делает этот параметр.
//_acceptationTask = AcceptationWorker(_cts.Token);
return Task.CompletedTask;
}
public Task StopAsync()
{
_cts.Cancel();
return Task.CompletedTask;
}
internal static BrokerDefaultBase Create(ConnectionFactory connectionFactory)
{
return new BrokerDefaultBase(
connectionFactory,
new MessagesStorageDefault(),
new DistributorDefault(),
new DataProcessorDefault()
);
}
private async Task AcceptationWorker(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var acceptedSocket = await _connection.AcceptAsync();
}
}
}
}

View File

@ -1,26 +0,0 @@
using Microsoft.Extensions.Hosting;
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Server.Broker.Services;
namespace Sphagnum.Server
{
public class BrokerHost : IHostedService
{
private readonly BrokerDefaultBase _broker;
public BrokerHost(ConnectionFactory connectionFactory)
{
_broker = BrokerDefaultBase.Create(connectionFactory);
}
public Task StartAsync(CancellationToken cancellationToken)
{
return _broker.StartAsync(8081);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _broker.StopAsync();
}
}
}

View File

@ -0,0 +1,26 @@
using Microsoft.Extensions.Hosting;
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
using System.Net;
namespace Sphagnum.Server.Broker.Services
{
public class BrokerService : IHostedService
{
private readonly ConnectionsManager _manager;
public BrokerService(ConnectionsManager manager)
{
_manager = manager;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Sphagnum.Server.Broker.Services
{
public class ConnectionsManager
{
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Sphagnum.Server.Broker.Services
{
internal class MessagesBus
{
}
}

View File

@ -1,68 +1,36 @@
using Sphagnum.Common;
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Common.Exceptions;
using Sphagnum.Common.Utils;
using Sphagnum.Common.Utils.Models;
using Sphagnum.Server.Cluster.Contracts;
using Sphagnum.Server.DataProcessing.Contracts;
using Sphagnum.Server.Storage.Messages.Contracts;
using Sphagnum.Server.Storage.Users.Contracts;
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Extensions;
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
using System.Collections.Concurrent;
namespace Sphagnum.Server.Broker.Services
{
internal class MessagesProcessor
public class MessagesProcessor
{
private bool AuthOk = true;
private readonly IWalWriter _walWriter;
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
private readonly IAuthInfoStorage _authInfoStorage;
private readonly IMessagesStorage _messagesStorage;
private readonly IDistributor _distributor;
private readonly IDataProcessor _dataProcessor;
public MessagesProcessor(IAuthInfoStorage authInfoStorage, IMessagesStorage messagesStorage, IDistributor distributor, IDataProcessor dataProcessor)
public MessagesProcessor(IWalWriter walWriter)
{
_authInfoStorage = authInfoStorage;
_messagesStorage = messagesStorage;
_distributor = distributor;
_dataProcessor = dataProcessor;
_walWriter = walWriter;
}
internal async Task ProcessMessage(byte[] message)
internal void AddConnection(IConnection connection)
{
if (AuthOk)
connection.ConnectionClosed += (id) =>
{
await _messagesStorage.LogMessage(message);
await _distributor.DistributeData(message);
await _dataProcessor.PutMessage(message);
}
else if (await CheckRights(message))
{
AuthOk = true;
await _messagesStorage.LogMessage(message);
await _distributor.DistributeData(message);
await _dataProcessor.PutMessage(message);
}
else
{
throw new AuthException();
}
Connections.TryRemove(id, out var conn);
};
var _ = ProcessMessages(connection);
}
private async ValueTask<bool> CheckRights(byte[] buffer)
internal async Task ProcessMessages(IConnection connection)
{
var messageType = MessageParser.GetMessageType(buffer);
if (messageType == MessageType.Auth)
while (!connection.CancellationTokenSource.IsCancellationRequested)
{
var payloadStart = MessageParser.GetPayloadStart(buffer);
var rights = (UserRights)BitConverter.ToInt16(buffer.AsSpan(Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes + payloadStart, 2));
var isRecievingAllowed = await _authInfoStorage.CheckRights(
buffer.AsSpan(payloadStart, Constants.HashedUserDataSizeInfBytes),
buffer.AsSpan(payloadStart + Constants.HashedUserDataSizeInfBytes, Constants.HashedUserDataSizeInfBytes),
rights,
new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token
);
return isRecievingAllowed;
var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token);
await _walWriter.WriteData(data);
}
return false;
}
}
}

View File

@ -0,0 +1,54 @@
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
using System.Collections.Concurrent;
using System.Net;
using Sphagnum.Common.Infrastructure.Extensions;
namespace Sphagnum.Server.Broker.Services
{
public class Reciever
{
public readonly CancellationTokenSource Cts = new();
private readonly IConnection _listeningConnection;
private readonly IWalWriter _walWriter;
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
public Reciever(ConnectionFactory connectionFactory, IWalWriter walWriter)
{
_listeningConnection = connectionFactory.CreateConnection(false).Result;
_listeningConnection.Bind(new IPEndPoint(IPAddress.Any, connectionFactory.Port));
_listeningConnection.Listen(1000); //todo разобраться что делает этот параметр.
_walWriter = walWriter;
var _ = AcceptationWorker();
}
internal void AddConnection(IConnection connection)
{
Connections[connection.ConnectionId] = connection;
connection.ConnectionClosed += (id) =>
{
Connections.TryRemove(id, out var conn);
};
ProcessMessages(connection);
}
private async Task AcceptationWorker()
{
while (!Cts.IsCancellationRequested)
{
var acceptedSocket = await _listeningConnection.AcceptAsync();
AddConnection(acceptedSocket);
}
}
internal async Task ProcessMessages(IConnection connection)
{
while (!connection.CancellationTokenSource.IsCancellationRequested)
{
var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token);
await _walWriter.WriteData(data);
}
}
}
}

View File

@ -1,7 +0,0 @@
namespace Sphagnum.Server.Cluster.Contracts
{
public interface IDistributor
{
ValueTask DistributeData(ReadOnlySpan<byte> data);
}
}

View File

@ -1,12 +0,0 @@
using Sphagnum.Server.Cluster.Contracts;
namespace Sphagnum.Server.Cluster.Services
{
internal class DistributorDefault : IDistributor
{
public ValueTask DistributeData(ReadOnlySpan<byte> data)
{
return ValueTask.CompletedTask;
}
}
}

View File

@ -1,9 +0,0 @@
namespace Sphagnum.Server.DataProcessing.Contracts
{
internal interface IDataProcessor
{
public bool RegisterCoprocessor(string key, Func<byte[], Task> func);
public Func<byte[], Task>? UnregisterCoprocessor(string key);
public ValueTask PutMessage(ReadOnlySpan<byte> message);
}
}

View File

@ -1,29 +0,0 @@
using Sphagnum.Server.DataProcessing.Contracts;
using System.Collections.Concurrent;
namespace Sphagnum.Server.DataProcessing.Services
{
internal class DataProcessorDefault : IDataProcessor
{
private readonly ConcurrentDictionary<string, Func<byte[], Task>> _processors = new();
public ValueTask PutMessage(ReadOnlySpan<byte> message)
{
return ValueTask.CompletedTask;
}
public bool RegisterCoprocessor(string key, Func<byte[], Task> func)
{
return _processors.TryAdd(key, func);
}
public Func<byte[], Task>? UnregisterCoprocessor(string key)
{
if (_processors.TryRemove(key, out Func<byte[], Task>? res))
{
return res;
}
return null;
}
}
}

View File

@ -0,0 +1,9 @@
namespace Sphagnum.Server.Scheme.Contracts
{
public interface IExchange
{
public string Name { get; init; }
public Task SendMessage();
}
}

View File

@ -0,0 +1,7 @@
namespace Sphagnum.Server.Storage
{
internal class SchemaManager
{
}
}

View File

@ -14,4 +14,8 @@
<ProjectReference Include="..\Sphagnum.Common\Sphagnum.Common.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Storage\Utils\" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,8 @@
namespace Sphagnum.Server.Storage.Contracts
{
public class StorageConfig
{
public readonly string WalDirectory;
public readonly long WalPageMinSize = 1024 * 4;
}
}

View File

@ -0,0 +1,8 @@
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
{
internal interface IWalCursorManager
{
ValueTask<WalCursor> GetCursor();
ValueTask SetCursor(WalCursor walCursor);
}
}

View File

@ -0,0 +1,7 @@
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
{
internal interface IWalStreamSource
{
public Stream GetStream(string pathToFile);
}
}

View File

@ -0,0 +1,7 @@
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
{
public interface IWalWriter
{
public Task WriteData(byte[] data);
}
}

View File

@ -0,0 +1,14 @@
namespace Sphagnum.Server.Storage.Contracts.Wal
{
public readonly struct WalCursor
{
public readonly long PageId;
public readonly long CurrentPosition;
public WalCursor(long pageId, long currentPosition)
{
PageId = pageId;
CurrentPosition = currentPosition;
}
}
}

View File

@ -1,7 +0,0 @@
namespace Sphagnum.Server.Storage.Messages.Contracts
{
internal interface IMessagesStorage
{
ValueTask LogMessage(ReadOnlyMemory<byte> message);
}
}

View File

@ -1,12 +0,0 @@
using Sphagnum.Server.Storage.Messages.Contracts;
namespace Sphagnum.Server.Storage.Messages.Services
{
internal class MessagesStorageDefault : IMessagesStorage
{
public ValueTask LogMessage(ReadOnlyMemory<byte> message)
{
return ValueTask.CompletedTask;
}
}
}

View File

@ -0,0 +1,47 @@
using Sphagnum.Server.Storage.Contracts;
using Sphagnum.Server.Storage.Contracts.Wal;
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
namespace Sphagnum.Server.Storage.Services.Wal
{
internal class WalCursorManager : IWalCursorManager
{
private readonly string _cursorPath;
private byte[]? data;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1);
public WalCursorManager(StorageConfig storageConfig)
{
_cursorPath = Path.Combine(storageConfig.WalDirectory, "cursor");
}
public async ValueTask<WalCursor> GetCursor()
{
await _semaphore.WaitAsync();
try
{
if (data == null)
{
data = await File.ReadAllBytesAsync(_cursorPath);
}
var result = new WalCursor(BitConverter.ToInt64(data, 0), BitConverter.ToInt64(data, 8));
return result;
}
finally { _semaphore.Release(); }
}
public async ValueTask SetCursor(WalCursor walCursor)
{
try
{
if (data == null)
{
data = new byte[16];
}
BitConverter.TryWriteBytes(data.AsSpan(0, 8), walCursor.PageId);
BitConverter.TryWriteBytes(data.AsSpan(8, 8), walCursor.CurrentPosition);
await File.WriteAllBytesAsync(_cursorPath, data);
}
finally { _semaphore.Release(); }
}
}
}

View File

@ -0,0 +1,13 @@
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
namespace Sphagnum.Server.Storage.Services.Wal
{
internal class WalStreamSource : IWalStreamSource
{
public Stream GetStream(string pathToFile)
{
var stream = new FileStream(pathToFile, FileMode.OpenOrCreate, FileAccess.Write);
return stream;
}
}
}

View File

@ -0,0 +1,54 @@
using Microsoft.Extensions.Logging;
using Sphagnum.Server.Storage.Contracts;
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
using System.Globalization;
namespace Sphagnum.Server.Storage.Services.Wal
{
internal class WalWriterDefault : IWalWriter
{
private readonly SemaphoreSlim _semaphore = new(0, 1);
private readonly string _walDirectory;
private readonly long _walPageSize;
private string? _walPagePath;
private long CurrentPageSize = 0;
private long CurrentPage = 0;
private readonly IWalStreamSource _streamSource;
private readonly ILogger<WalWriterDefault> _logger;
public WalWriterDefault(StorageConfig storageConfig, IWalStreamSource streamSource, ILogger<WalWriterDefault> logger)
{
_streamSource = streamSource;
_walDirectory = storageConfig.WalDirectory;
_walPageSize = storageConfig.WalPageMinSize;
_logger = logger;
}
public async Task WriteData(byte[] data)
{
if (!string.IsNullOrWhiteSpace(_walPagePath))
{
await _semaphore.WaitAsync();
try
{
if (CurrentPageSize > _walPageSize)
{
CurrentPageSize = 0;
CurrentPage += 1;
_walPagePath = Path.Combine(_walDirectory, CurrentPage.ToString(CultureInfo.InvariantCulture));
}
using var stream = _streamSource.GetStream(_walPagePath);
await stream.WriteAsync(data);
CurrentPageSize += data.LongLength;
}
catch (Exception ex)
{
_logger.LogError(ex, "Walwriting failed!");
}
_semaphore.Release();
}
}
}
}

View File

@ -1,13 +0,0 @@
using Sphagnum.Common.Contracts.Login;
namespace Sphagnum.Server.Storage.Users.Contracts
{
internal interface IAuthInfoStorage
{
public ValueTask<bool> CheckRights(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights, CancellationToken token = default);
public ValueTask AddUser(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights);
public ValueTask SetRights(Span<byte> hashedUsername, UserRights userRights);
}
}

View File

@ -1,30 +0,0 @@
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Common.Utils;
using Sphagnum.Server.Storage.Users.Contracts;
using System.Numerics;
namespace Sphagnum.Server.Storage.Users.Services
{
internal class AuthInfoStorageBase : IAuthInfoStorage
{
private readonly Vector<byte> RootUserLogin = new(HashCalculator.Calc("root"));
private readonly Vector<byte> RootUserPassword = new(HashCalculator.Calc("root"));
private readonly UserRights RootUserRights = UserRights.All;
public ValueTask AddUser(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights)
{
return ValueTask.CompletedTask;
}
public ValueTask<bool> CheckRights(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights, CancellationToken token = default)
{
var username = new Vector<byte>(hashedUsername);
var pwd = new Vector<byte>(hashedPassword);
return ValueTask.FromResult(username == RootUserLogin && pwd == RootUserPassword && (userRights & RootUserRights) == userRights);
}
public ValueTask SetRights(Span<byte> hashedUsername, UserRights userRights)
{
return ValueTask.CompletedTask;
}
}
}

View File

@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]

View File

@ -1,10 +1,10 @@
using Sphagnum.Common.Contracts.Messaging.Messages;
using Sphagnum.Common.Messaging.Contracts.Messages;
namespace Sphagnum.Common.UnitTests.Comparers
{
internal static class MessagesComparer
{
public static bool Compare(OutgoingMessage message1, OutgoingMessage message2)
public static bool Compare(Message message1, Message message2)
{
var res = true;
res &= message1.Exchange == message2.Exchange;
@ -12,25 +12,19 @@ namespace Sphagnum.Common.UnitTests.Comparers
res &= message1.RoutingKey.Part2 == message2.RoutingKey.Part2;
res &= message1.RoutingKey.Part3 == message2.RoutingKey.Part3;
res &= message1.Payload.Length == message2.Payload.Length;
res &= message1.MessageId == message2.MessageId;
res &= ComparePayloads(message1, message2);
return res;
}
public static bool Compare(IncommingMessage message1, IncommingMessage message2)
{
var res = true;
res &= message1.MessageId == message2.MessageId;
res &= ComparePayloads(message1.Payload.ToArray(), message2.Payload.ToArray());
return res;
}
public static bool ComparePayloads(OutgoingMessage message1, OutgoingMessage message2)
public static bool ComparePayloads(Message message1, Message message2)
{
var payload1 = message1.Payload.ToArray();
var payload2 = message2.Payload.ToArray();
return ComparePayloads(payload1, payload2);
}
public static bool ComparePayloads(byte[] payload1, byte[] payload2)
{
var res = true;

View File

@ -1,17 +0,0 @@
namespace Sphagnum.Common.UnitTests
{
public class ConnectionTests
{
[SetUp]
public void Setup()
{
}
[Test]
public void Test1()
{
Assert.Pass();
}
}
}

View File

@ -1,23 +1,24 @@
using Sphagnum.Common.Contracts.Messaging;
using Sphagnum.Common.Contracts.Messaging.Messages;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Contracts.Messages;
using System.Security.Cryptography;
namespace Sphagnum.Common.UnitTests.DataGenerators
{
internal static class MessagesGenerator
{
public static OutgoingMessage GetRandomOutgoingMessage(bool emptyKey = false, bool emptyPayload = false)
public static Message GetRandomMessage(Guid? messageId = null, bool emptyKey = false, bool emptyPayload = false)
{
var exchangeName = RandomNumberGenerator.GetInt32(10000000).ToString();
var payload = !emptyPayload ? RandomNumberGenerator.GetBytes(RandomNumberGenerator.GetInt32(0, 1000)) : [];
var routingKeysBytes = RandomNumberGenerator.GetBytes(3);
return new OutgoingMessage(exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
}
public static IncommingMessage GetRandoIncommingMessage(bool emptyPayload = false)
{
var payload = !emptyPayload ? RandomNumberGenerator.GetBytes(RandomNumberGenerator.GetInt32(0, 1000)) : [];
return new IncommingMessage(Guid.NewGuid(), payload);
if (messageId.HasValue)
{
return new Message(messageId.Value, exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
}
else
{
return new Message(exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
}
}
}
}

View File

@ -1,6 +1,6 @@
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Utils;
using Sphagnum.Common.UnitTests.DataGenerators;
using Sphagnum.Common.Utils;
using Sphagnum.Common.Utils.Models;
namespace Sphagnum.Common.UnitTests
{
@ -12,11 +12,17 @@ namespace Sphagnum.Common.UnitTests
var count = 0;
while (count < 100)
{
var message = MessagesGenerator.GetRandoIncommingMessage();
var message = MessagesGenerator.GetRandomMessage();
var bytes = MessageParser.PackMessage(message);
var message2 = MessageParser.UnpackIncomingMessage(bytes);
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
var message2 = MessageParser.UnpackMessage(bytes);
var bytes2 = MessageParser.PackMessage(message2);
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
Assert.Multiple(() =>
{
Assert.That(f1, Is.EqualTo(f2));
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
});
count++;
}
}
@ -27,31 +33,17 @@ namespace Sphagnum.Common.UnitTests
var count = 0;
while (count < 100)
{
var message = MessagesGenerator.GetRandoIncommingMessage(true);
var message = MessagesGenerator.GetRandomMessage(null, false, true);
var bytes = MessageParser.PackMessage(message);
var message2 = MessageParser.UnpackIncomingMessage(bytes);
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
Assert.IsTrue((MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2)) == MessageFlags.HasPayload);
count++;
}
}
[Test]
public void PackUnpackOutgoingMessageGetMessageId_WithRoutingKeyAndPayload()
{
var count = 0;
while (count < 100)
{
var id = Guid.NewGuid();
var message = MessagesGenerator.GetRandomOutgoingMessage();
var bytesForFlags = MessageParser.PackMessage(message);
var flags = (MessageFlags)BitConverter.ToUInt16(bytesForFlags.AsSpan(5, 2));
var bytes = MessageParser.Pack(message, id, flags, bytesForFlags.Length);
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
var id2 = MessageParser.GetMessageId(bytes);
Assert.That(id, Is.EqualTo(id2));
var message2 = MessageParser.UnpackMessage(bytes);
var bytes2 = MessageParser.PackMessage(message2);
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
Assert.Multiple(() =>
{
Assert.That(f1, Is.EqualTo(f2));
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
});
count++;
}
}
@ -63,95 +55,17 @@ namespace Sphagnum.Common.UnitTests
while (count < 100)
{
var id = Guid.NewGuid();
var message = MessagesGenerator.GetRandomOutgoingMessage(true, true);
var bytesForFlags = MessageParser.PackMessage(message);
var flags = (MessageFlags)BitConverter.ToUInt16(bytesForFlags.AsSpan(5, 2));
var bytes = MessageParser.Pack(message, id, flags, bytesForFlags.Length);
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
var id2 = MessageParser.GetMessageId(bytes);
Assert.That(id, Is.EqualTo(id2));
count++;
}
}
[Test]
public void PackUnpackOutgoingMessage_WithRoutingKeyAndPayload()
{
var count = 0;
while (count < 100)
{
var message = MessagesGenerator.GetRandomOutgoingMessage();
var message = MessagesGenerator.GetRandomMessage(null, true, true);
var bytes = MessageParser.PackMessage(message);
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
var message2 = MessageParser.UnpackMessage(bytes);
var bytes2 = MessageParser.PackMessage(message2);
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
var bytes3 = MessageParser.PackMessage(message2);
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
count++;
}
}
[Test]
public void PackUnpackOutgoingMessage_WithRoutingKeyAndEmptyPayload()
{
var count = 0;
while (count < 100)
{
var message = MessagesGenerator.GetRandomOutgoingMessage(false, true);
var bytes = MessageParser.PackMessage(message);
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
var bytes2 = MessageParser.PackMessage(message2);
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
var bytes3 = MessageParser.PackMessage(message2);
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
count++;
}
}
[Test]
public void PackUnpackOutgoingMessage_WithEmptyRoutingKey()
{
var count = 0;
while (count < 100)
{
var message = MessagesGenerator.GetRandomOutgoingMessage(true);
var bytes = MessageParser.PackMessage(message);
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
var bytes2 = MessageParser.PackMessage(message2);
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
var bytes3 = MessageParser.PackMessage(message2);
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
count++;
}
}
[Test]
public void PackUnpackOutgoingMessage_WithEmptyRoutingKeyAndEmptyPayload()
{
var count = 0;
while (count < 100)
{
var message = MessagesGenerator.GetRandomOutgoingMessage(true, true);
var bytes = MessageParser.PackMessage(message);
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
var bytes2 = MessageParser.PackMessage(message2);
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
var bytes3 = MessageParser.PackMessage(message2);
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
Assert.Multiple(() =>
{
Assert.That(f1, Is.EqualTo(f2));
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
});
count++;
}
}

View File

@ -1,23 +1,33 @@
using Sphagnum.Common.Contracts.Infrastructure;
using System.Collections.Concurrent;
using Sphagnum.Common.Infrastructure.Contracts;
using System.Net;
using System.Net.Sockets;
using System.Threading.Channels;
namespace Sphagnum.Common.UnitTests.Services
{
/// <summary>
/// Класс имитирующий передачу данных через сокет.
/// </summary>
internal class TestConnection : IConnection
{
private readonly ConcurrentQueue<byte[]> _queue = new();
public bool Connected => true;
public CancellationTokenSource CancellationTokenSource { get; set; } = new CancellationTokenSource();
public Guid ConnectionId { get; private set; }
public IConnection Accept()
public bool Connected { get; set; }
public event Action<Guid>? ConnectionClosed;
public Channel<byte[]> _channel = Channel.CreateUnbounded<byte[]>();
public Channel<TestConnection> _newConnectionsChannel = Channel.CreateUnbounded<TestConnection>();
public TestConnection(Guid id)
{
return new TestConnection();
ConnectionId = id;
}
public Task<IConnection> AcceptAsync()
public TestConnection()
{
return Task.FromResult<IConnection>(new TestConnection());
ConnectionId = Guid.NewGuid();
}
public void Bind(EndPoint endPoint)
@ -26,17 +36,21 @@ namespace Sphagnum.Common.UnitTests.Services
public void Close()
{
Connected = false;
CancellationTokenSource.Cancel();
ConnectionClosed?.Invoke(ConnectionId);
}
public Task ConnectAsync(string host, int port)
{
Connected = true;
return Task.CompletedTask;
}
public void Dispose()
{
CancellationTokenSource.Cancel();
ConnectionClosed?.Invoke(ConnectionId);
}
public void Listen(int backlog)
@ -46,37 +60,50 @@ namespace Sphagnum.Common.UnitTests.Services
public async ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
{
var res = new byte[buffer.Length];
await Receive(res, socketFlags, cancellationToken);
res.CopyTo(buffer);
return res.Length;
var canRead = await _channel.Reader.WaitToReadAsync(cancellationToken);
if (canRead)
{
if (socketFlags == SocketFlags.Peek)
{
if (_channel.Reader.TryPeek(out var data))
{
int i;
for (i = 0; i < buffer.Length && i < data.Length; i++)
{
buffer.Span[i] = data[i];
}
return i;
}
}
else
{
var data = await _channel.Reader.ReadAsync(cancellationToken);
for (var i = 0; i < buffer.Length && i < data.Length; i++)
{
buffer.Span[i] = data[i];
}
return data.Length;
}
}
return 0;
}
public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
public async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
{
_queue.Enqueue(buffer.Span.ToArray());
return ValueTask.FromResult(buffer.Length);
var data = new byte[buffer.Length];
buffer.CopyTo(data);
await _channel.Writer.WriteAsync(data, cancellationToken);
return data.Length;
}
private async ValueTask<int> Receive(byte[] buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default, int counter = 0)
public async Task<IConnection> AcceptAsync()
{
if (counter > 200)
{
throw new TimeoutException();
}
return await _newConnectionsChannel.Reader.ReadAsync();
}
if (socketFlags == SocketFlags.Peek ? _queue.TryPeek(out byte[]? result) : _queue.TryDequeue(out result))
{
result.CopyTo(buffer, 0);
return result.Length;
}
else
{
await Task.Delay(100, cancellationToken);
counter++;
await Receive(buffer, socketFlags, cancellationToken, counter);
}
throw new TimeoutException();
internal async Task AddInputConnection(TestConnection connection)
{
await _newConnectionsChannel.Writer.WriteAsync(connection);
}
}
}

View File

@ -1,18 +1,19 @@
using Sphagnum.Common.Contracts.Login;
using Sphagnum.Common.Services;
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
namespace Sphagnum.Common.UnitTests.Services
{
internal class TestConnectionFactory : ConnectionFactory
{
internal override SphagnumConnection CreateDefault(Func<Func<byte[], Task>> messagesProcessorFactory)
public IConnection? CurrentConnection { get; private set; }
internal override Task<IConnection> CreateConnection(bool connected = true)
{
return new SphagnumConnection(() => new TestConnection(), messagesProcessorFactory);
return Task.FromResult(CurrentConnection ?? (IConnection)(new TestConnection()));
}
internal override Task<SphagnumConnection> CreateDefaultConnected(Func<Func<byte[], Task>> messagesProcessorFactory)
public void SetCurrentConnection(IConnection? connection)
{
return Task.FromResult(new SphagnumConnection(() => new TestConnection(), messagesProcessorFactory));
CurrentConnection = connection;
}
}
}

View File

@ -10,6 +10,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AutoFixture" Version="4.18.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.2.1" />

View File

@ -1,158 +1,163 @@
//using Sphagnum.Common.UnitTests.Services;
//using System;
//using System.Collections.Generic;
//using System.Linq;
//using System.Text;
//using System.Threading.Tasks;
using AutoFixture;
using Sphagnum.Common.Infrastructure.Extensions;
using Sphagnum.Common.UnitTests.Services;
using System.Security.Cryptography;
//namespace Sphagnum.Common.UnitTests
//{
// public class TestConnectionTests
// {
// [Test]
// public async Task SendRecieve8BytesWithBufferSize10()
// {
// var connection = new TestConnection
// {
// BufferSize = 10
// };
// var data = new byte[8];
// Array.Fill<byte>(data, 1);
// _ = await connection.SendAsync(data);
// var forResult = new List<byte>();
// for (int i = 0; i < 1; i++)
// {
// var buffer = new byte[connection.BufferSize];
// _ = await connection.ReceiveAsync(buffer);
// for (int j = 0; j < buffer.Length; j++)
// {
// var el = buffer[j];
// if (el == 1)
// {
// forResult.Add(el);
// }
// }
// Array.Clear(buffer);
// }
namespace Sphagnum.Common.UnitTests
{
public class TestConnectionTests
{
// Assert.IsTrue(data.Length == forResult.Count);
// }
[Test]
public async Task IsNewConnectionCretesCorrect()
{
var connection = new TestConnection();
var connection2 = new TestConnection();
var connectionTask = connection.AcceptAsync();
// [Test]
// public async Task SendRecieve10BytesWithBufferSize10()
// {
// var connection = new TestConnection
// {
// BufferSize = 10
// };
// var data = new byte[10];
// Array.Fill<byte>(data, 1);
// _ = await connection.SendAsync(data);
// var forResult = new List<byte>();
// for (int i = 0; i < 1; i++)
// {
// var buffer = new byte[connection.BufferSize];
// _ = await connection.ReceiveAsync(buffer);
// for (int j = 0; j < buffer.Length; j++)
// {
// var el = buffer[j];
// if (el == 1)
// {
// forResult.Add(el);
// }
// }
// Array.Clear(buffer);
// }
await Task.WhenAll(connection.AddInputConnection(connection2), connectionTask);
Assert.That(connection2.ConnectionId, Is.EqualTo(connectionTask.Result.ConnectionId));
}
// Assert.IsTrue(data.Length == forResult.Count);
// }
/// <summary>
/// Ïðîâåðÿåò ñöåíàðèé, êîãäà íåñêîëüêî ðàç ïîäðÿä ÷èòàþòñÿ äàííûå
/// ñ ôëàãîì peek - òî åñòü ãëÿíóòü ÷òî ëåæèò íà ïîâåðõíîñòè, íå âû÷èòûâàÿ.
/// </summary>
/// <returns></returns>
[Test]
public async Task SendRecieveData_WithPeekSocketFlag_MultipleReading()
{
var connection = new TestConnection();
var buffer1 = new byte[5];
var recievingTask = connection.ReceiveAsync(buffer1, System.Net.Sockets.SocketFlags.Peek);
var fix = new Fixture();
var data = fix.CreateMany<byte>(11).ToArray();
var sendingTask = connection.SendAsync(data, System.Net.Sockets.SocketFlags.None);
await Task.WhenAll(sendingTask.AsTask(), recievingTask.AsTask());
for (int i = 0; i < 5; i++)
{
Assert.That(buffer1[i] == data[i]);
}
Assert.That(recievingTask.Result == 5);
var buffer2 = new byte[5];
var count = await connection.ReceiveAsync(buffer2, System.Net.Sockets.SocketFlags.Peek);
Assert.That(recievingTask.Result == count);
for (int i = 0; i < 5; i++)
{
Assert.That(buffer2[i] == data[i]);
}
// [Test]
// public async Task SendRecieve11BytesWithBufferSize10()
// {
// var connection = new TestConnection
// {
// BufferSize = 10
// };
// var data = new byte[11];
// Array.Fill<byte>(data, 1);
// _ = await connection.SendAsync(data);
// var forResult = new List<byte>();
// for (int i = 0; i < 2; i++)
// {
// var buffer = new byte[connection.BufferSize];
// _ = await connection.ReceiveAsync(buffer);
// for (int j = 0; j < buffer.Length; j++)
// {
// var el = buffer[j];
// if (el==1)
// {
// forResult.Add(el);
// }
// }
// Array.Clear(buffer);
// }
var buffer3 = new byte[5];
var count3 = await connection.ReceiveAsync(buffer3, System.Net.Sockets.SocketFlags.Peek);
Assert.That(recievingTask.Result == count3);
for (int i = 0; i < 5; i++)
{
Assert.That(buffer3[i] == data[i]);
}
}
// Assert.IsTrue(data.Length == forResult.Count);
// }
/// <summary>
/// Ïðîâåðÿåò ñöåíàðèé, êîãäà íåñêîëüêî ðàç ïîäðÿä ÷èòàþòñÿ äàííûå
/// ñ ôëàãîì peek - òî åñòü ãëÿíóòü ÷òî ëåæèò íà ïîâåðõíîñòè, íå âû÷èòûâàÿ. Ïðè ýòîì, â ïðîöåññå ðàáîòû ïðîäîëæàþò ïèñàòüñÿ íîâûå äàííûå.
/// </summary>
/// <returns></returns>
[Test]
public async Task SendRecieveData_WithPeekSocketFlag_MultipleReadingWithWriting()
{
var connection = new TestConnection();
var buffer1 = new byte[5];
var recievingTask = connection.ReceiveAsync(buffer1, System.Net.Sockets.SocketFlags.Peek);
var fix = new Fixture();
var data = fix.CreateMany<byte>(RandomNumberGenerator.GetInt32(5, 100)).ToArray();
var sendingTask = connection.SendAsync(data, System.Net.Sockets.SocketFlags.None);
await Task.WhenAll(sendingTask.AsTask(), recievingTask.AsTask());
for (int i = 0; i < 5; i++)
{
Assert.That(buffer1[i] == data[i]);
}
Assert.That(recievingTask.Result == 5);
var buffer2 = new byte[5];
var count = await connection.ReceiveAsync(buffer2, System.Net.Sockets.SocketFlags.Peek);
Assert.That(recievingTask.Result == count);
for (int i = 0; i < 5; i++)
{
Assert.That(buffer2[i] == data[i]);
}
// [Test]
// public async Task SendRecieve31BytesWithBufferSize10()
// {
// var connection = new TestConnection
// {
// BufferSize = 10
// };
// var data = new byte[31];
// Array.Fill<byte>(data, 1);
// _ = await connection.SendAsync(data);
// var forResult = new List<byte>();
// for (int i = 0; i < 4; i++)
// {
// var buffer = new byte[connection.BufferSize];
// _ = await connection.ReceiveAsync(buffer);
// for (int j = 0; j < buffer.Length; j++)
// {
// var el = buffer[j];
// if (el == 1)
// {
// forResult.Add(el);
// }
// }
// Array.Clear(buffer);
// }
for (int c = 0; c < 20; c++)
{
var data2 = fix.CreateMany<byte>(RandomNumberGenerator.GetInt32(5, 100)).ToArray();
var sendingCount = await connection.SendAsync(data2, System.Net.Sockets.SocketFlags.None);
var buffer3 = new byte[5];
var count3 = await connection.ReceiveAsync(buffer3, System.Net.Sockets.SocketFlags.Peek);
Assert.That(recievingTask.Result == count3);
for (int i = 0; i < 5; i++)
{
Assert.That(buffer3[i] == data[i]);
}
}
}
// Assert.IsTrue(data.Length == forResult.Count);
// }
/// <summary>
/// Ïðîâåðÿåò ñöåíàðèé, ÷òåíèÿ-îòïðàâëåíèÿ äàííûõ.
/// </summary>
/// <returns></returns>
[Test]
public async Task SendRecieveData_WithNoneSocketFlag()
{
var fix = new Fixture();
// [Test]
// public async Task SendRecieve30BytesWithBufferSize10()
// {
// var connection = new TestConnection
// {
// BufferSize = 10
// };
// var data = new byte[30];
// Array.Fill<byte>(data, 1);
// _ = await connection.SendAsync(data);
// var forResult = new List<byte>();
// for (int i = 0; i < 3; i++)
// {
// var buffer = new byte[connection.BufferSize];
// _ = await connection.ReceiveAsync(buffer);
// for (int j = 0; j < buffer.Length; j++)
// {
// var el = buffer[j];
// if (el == 1)
// {
// forResult.Add(el);
// }
// }
// Array.Clear(buffer);
// }
var connection = new TestConnection();
// Assert.IsTrue(data.Length == forResult.Count);
// }
// }
for (int c = 0; c < 20; c++)
{
int dataSize = RandomNumberGenerator.GetInt32(5, 100);
var buffer1 = new byte[dataSize];
var recievingTask = connection.ReceiveAsync(buffer1, System.Net.Sockets.SocketFlags.None);
var data = fix.CreateMany<byte>(dataSize).ToArray();
//}
var sendingTask = connection.SendAsync(data, System.Net.Sockets.SocketFlags.None);
await Task.WhenAll(sendingTask.AsTask(), recievingTask.AsTask());
for (int i = 0; i < dataSize; i++)
{
Assert.That(buffer1[i] == data[i]);
}
Assert.IsTrue(recievingTask.Result == sendingTask.Result);
}
}
/// <summary>
/// Ïðîâåðÿåò ñöåíàðèé, ÷òåíèÿ-îòïðàâëåíèÿ äàííûõ c ïîìîùüþ ìåòîäà ðàñøèðåíèÿ.
/// </summary>
/// <returns></returns>
[Test]
public async Task SendRecieveData_WithExtensionMethod()
{
var fix = new Fixture();
var connection = new TestConnection();
for (int c = 0; c < 20; c++)
{
int dataSize = RandomNumberGenerator.GetInt32(5, 100);
var recievingTask = connection.ReceiveAsync(CancellationToken.None);
var payload = fix.CreateMany<byte>(dataSize).ToArray();
var length = BitConverter.GetBytes(dataSize);
var data = new byte[dataSize + 4];
length.CopyTo(data, 0);
payload.CopyTo(data, 4);
var sendingTask = connection.SendAsync(data, System.Net.Sockets.SocketFlags.None);
await Task.WhenAll(sendingTask.AsTask(), recievingTask.AsTask());
for (int i = 0; i < dataSize; i++)
{
Assert.That(recievingTask.Result[i] == data[i]);
}
}
}
}
}

View File

@ -0,0 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit.Analyzers" Version="3.9.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Sphagnum.Client\Sphagnum.Client.csproj" />
<ProjectReference Include="..\..\src\Sphagnum.Server\Sphagnum.Server.csproj" />
<ProjectReference Include="..\Sphagnum.Common.UnitTests\Sphagnum.Common.UnitTests.csproj" />
</ItemGroup>
<ItemGroup>
<Using Include="NUnit.Framework" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,48 @@
using AutoFixture;
using Sphagnum.Client;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.UnitTests.Services;
using Sphagnum.Server.Broker.Services;
namespace Sphagnum.FuncTests
{
public class Tests
{
[SetUp]
public void Setup()
{
}
[Test]
public async Task Test1()
{
var fixture = new Fixture();
var factory = new TestConnectionFactory();
var manager = new ConnectionsManager();
var processor = new MessagesProcessor();
var mainConnection = await factory.CreateConnection(false) as TestConnection;
Assert.IsNotNull(mainConnection);
factory.SetCurrentConnection(mainConnection);
var server = new ConnectionsReciever(factory, manager, processor);
await server.StartAsync(CancellationToken.None);
factory.SetCurrentConnection(null);
var connection1 = await factory.CreateConnection(true) as TestConnection;
Assert.IsNotNull(connection1);
factory.SetCurrentConnection(connection1);
var client = new ClientDefault(factory);
await mainConnection.AddInputConnection(connection1);
var data = fixture.CreateMany<byte>(11).ToArray();
await Task.Delay(100);
await client.Publish(new Common.Messaging.Contracts.Messages.Message("default", RoutingKey.Empty, data));
Assert.Pass();
await Task.Delay(10000);
}
}
}