Compare commits
7 Commits
Author | SHA1 | Date |
---|---|---|
|
889934c51b | |
|
7ddb102efe | |
|
0c59ba8d64 | |
|
1f987c6982 | |
|
c21e3ed952 | |
|
78f9f6c398 | |
|
fba480eaae |
|
@ -1,6 +1,6 @@
|
|||
using Microsoft.AspNetCore.Mvc;
|
||||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Client;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
|
||||
namespace Sphagnum.DebugClient.Controllers
|
||||
{
|
||||
|
@ -8,16 +8,15 @@ namespace Sphagnum.DebugClient.Controllers
|
|||
[Route("[controller]/[action]")]
|
||||
public class TestController : ControllerBase
|
||||
{
|
||||
private readonly IMessagingClient _connection;
|
||||
private static readonly Task? rec;
|
||||
private readonly ClientDefault _connection;
|
||||
|
||||
public TestController(IMessagingClient connection)
|
||||
public TestController(ClientDefault connection)
|
||||
{
|
||||
_connection = connection;
|
||||
}
|
||||
|
||||
[HttpGet]
|
||||
public string test()
|
||||
public string Test()
|
||||
{
|
||||
return "Ok!";
|
||||
}
|
||||
|
@ -34,9 +33,9 @@ namespace Sphagnum.DebugClient.Controllers
|
|||
payload1[i] = 1;
|
||||
payload2[i] = 2;
|
||||
}
|
||||
var t1 = _connection.Publish(new OutgoingMessage("test", payload1)).AsTask();
|
||||
var t2 = _connection.Publish(new OutgoingMessage("test", payload2)).AsTask();
|
||||
await Task.WhenAll(t1, t2);
|
||||
var t1 = _connection.Publish(new Common.Messaging.Contracts.Messages.Message("test", RoutingKey.Empty, payload1)).AsTask();
|
||||
//var t2 = _connection.Publish(new Message("test", RoutingKey.Empty, payload2)).AsTask();
|
||||
await Task.WhenAll(t1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
using Sphagnum.Client;
|
||||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Infrastructure.Services;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
|
@ -9,13 +8,13 @@ builder.Services.AddEndpointsApiExplorer();
|
|||
builder.Services.AddSwaggerGen();
|
||||
builder.Services.AddSingleton(new ConnectionFactory()
|
||||
{
|
||||
UserRights = UserRights.All,
|
||||
//UserRights = UserRights.All,
|
||||
Login = "root",
|
||||
Password = "root",
|
||||
Hostname = "test_server",
|
||||
Port = 8081,
|
||||
});
|
||||
builder.Services.AddSingleton<IMessagingClient, ClientDefault>();
|
||||
builder.Services.AddSingleton<ClientDefault>();
|
||||
var app = builder.Build();
|
||||
|
||||
app.UseSwagger();
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
|
||||
using Sphagnum.Client;
|
||||
|
||||
namespace Sphagnum.DebugClient.Services
|
||||
{
|
||||
public class TestService : IHostedService
|
||||
{
|
||||
private readonly CancellationTokenSource _cts;
|
||||
private readonly ClientDefault _clientDefault;
|
||||
private readonly Task _consumingTask;
|
||||
public TestService(ClientDefault clientDefault)
|
||||
{
|
||||
_cts = new CancellationTokenSource();
|
||||
_clientDefault = clientDefault;
|
||||
_consumingTask = Consuming();
|
||||
}
|
||||
|
||||
private async Task Consuming()
|
||||
{
|
||||
while (!_cts.IsCancellationRequested)
|
||||
{
|
||||
var message = await _clientDefault.Consume(_cts.Token);
|
||||
|
||||
|
||||
await _clientDefault.Ack(message.MessageId);
|
||||
}
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_cts.Cancel();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,10 +1,16 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Server;
|
||||
using Sphagnum.Common.Infrastructure.Services;
|
||||
using Sphagnum.Server.Broker.Services;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
builder.Services.AddControllers();
|
||||
builder.Services.AddSingleton<ConnectionFactory>();
|
||||
builder.Services.AddHostedService<BrokerHost>();
|
||||
builder.Services.AddSingleton(new ConnectionFactory()
|
||||
{
|
||||
Port = 8081,
|
||||
});
|
||||
builder.Services.AddSingleton<MessagesProcessor>();
|
||||
builder.Services.AddSingleton<ConnectionsManager>();
|
||||
builder.Services.AddHostedService<ConnectionsReciever>();
|
||||
|
||||
|
||||
var app = builder.Build();
|
||||
app.MapControllers();
|
||||
|
|
|
@ -23,6 +23,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sphagnum.Common.UnitTests",
|
|||
EndProject
|
||||
Project("{E53339B2-1760-4266-BCC7-CA923CBCF16C}") = "docker-compose", "docker-compose.dcproj", "{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}"
|
||||
EndProject
|
||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sphagnum.FuncTests", "..\tests\Sphagnum.FuncTests\Sphagnum.FuncTests.csproj", "{C160C372-5E5E-5442-462B-A5B5C36BE2D0}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
|
@ -57,6 +59,10 @@ Global
|
|||
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
|
@ -68,6 +74,7 @@ Global
|
|||
{C344FDF3-3D4D-4BC6-B257-4AD907BE1BE4} = {71CDAFF8-7C3D-4915-8FAA-5C263857AD62}
|
||||
{77F76F27-D883-4392-90D5-8F441043F468} = {71CDAFF8-7C3D-4915-8FAA-5C263857AD62}
|
||||
{25865911-1F2D-4083-A99C-C65E13F05C14} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750}
|
||||
{C160C372-5E5E-5442-462B-A5B5C36BE2D0} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750}
|
||||
EndGlobalSection
|
||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||
SolutionGuid = {42DA8F91-1667-49BC-971D-D83C0870393A}
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
namespace Sphagnum.Client
|
||||
{
|
||||
//internal class ClientConnection : SocketConnection
|
||||
//{
|
||||
// public ClientConnection() : base( async (dd) =>
|
||||
// {
|
||||
|
||||
// })
|
||||
// {
|
||||
|
||||
// }
|
||||
//}
|
||||
}
|
|
@ -1,8 +1,9 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Services;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Infrastructure.Extensions;
|
||||
using Sphagnum.Common.Infrastructure.Services;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
using Sphagnum.Common.Messaging.Utils;
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
|
@ -10,42 +11,34 @@ using System.Threading.Tasks;
|
|||
|
||||
namespace Sphagnum.Client
|
||||
{
|
||||
public class ClientDefault : IMessagingClient, IDisposable
|
||||
public sealed class ClientDefault : IDisposable
|
||||
{
|
||||
private readonly SphagnumConnection _connection;
|
||||
private readonly ConnectionFactory _connectionFactory;
|
||||
private readonly Task _recievingTask;
|
||||
private readonly IConnection _connection;
|
||||
private readonly Channel<byte[]> _commonMessagesChannel = Channel.CreateUnbounded<byte[]>();
|
||||
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
|
||||
public ClientDefault(ConnectionFactory factory)
|
||||
{
|
||||
_connectionFactory = factory;
|
||||
_connection = factory.CreateDefaultConnected(() => async (mess) =>
|
||||
{
|
||||
await _commonMessagesChannel.Writer.WriteAsync(mess);
|
||||
}).Result;
|
||||
Auth().Wait();
|
||||
Task.Delay(10000).Wait();
|
||||
_connection = factory.CreateConnection().Result;
|
||||
_recievingTask = RecivingTask();
|
||||
}
|
||||
|
||||
private async Task<byte[]> ReceiveAsync()
|
||||
private async Task RecivingTask()
|
||||
{
|
||||
return await _commonMessagesChannel.Reader.ReadAsync(_cts.Token);
|
||||
}
|
||||
|
||||
private async Task Auth()
|
||||
{
|
||||
await _connection.SendAsync(MessageParser.PackMessage(new AuthMessage(_connectionFactory.Login, _connectionFactory.Password, _connectionFactory.UserRights)));
|
||||
var response = await ReceiveAsync();
|
||||
var messageType = MessageParser.GetMessageType(response);
|
||||
if (messageType == Common.Utils.Models.MessageType.AuthSuccessfull)
|
||||
while (!_cts.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
var data = await _connection.ReceiveAsync(_cts.Token);
|
||||
if (MessageParser.GetMessageType(data) == MessageType.Common)
|
||||
{
|
||||
await _commonMessagesChannel.Writer.WriteAsync(data);
|
||||
}
|
||||
}
|
||||
throw new Exception("Auth failed!");
|
||||
}
|
||||
|
||||
public ValueTask Ack(Guid messageId)
|
||||
public async ValueTask Ack(Guid messageId)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
await Task.Delay(1000);
|
||||
}
|
||||
|
||||
public ValueTask Nack(Guid messageId)
|
||||
|
@ -53,17 +46,17 @@ namespace Sphagnum.Client
|
|||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public async ValueTask<Guid> Publish(OutgoingMessage message)
|
||||
public async ValueTask<Guid> Publish(Message message)
|
||||
{
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
await _connection.SendAsync(bytes);
|
||||
await _connection.SendAsync(bytes.AsMemory(), System.Net.Sockets.SocketFlags.None);
|
||||
return MessageParser.GetMessageId(bytes);
|
||||
}
|
||||
|
||||
public async ValueTask<IncommingMessage> Consume(CancellationToken cancellationToken)
|
||||
public async ValueTask<Message> Consume(CancellationToken cancellationToken)
|
||||
{
|
||||
var result = await _commonMessagesChannel.Reader.ReadAsync(cancellationToken);
|
||||
return MessageParser.UnpackIncomingMessage(result);
|
||||
return MessageParser.UnpackMessage(result);
|
||||
}
|
||||
|
||||
public ValueTask Reject(Guid messageId)
|
||||
|
|
|
@ -2,6 +2,8 @@
|
|||
|
||||
//Тесты
|
||||
[assembly: InternalsVisibleTo("Sphagnum.Common.UnitTests")]
|
||||
[assembly: InternalsVisibleTo("Sphagnum.Server.Tests")]
|
||||
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]
|
||||
|
||||
[assembly: InternalsVisibleTo("Sphagnum.Client")]
|
||||
[assembly: InternalsVisibleTo("Sphagnum.Server")]
|
|
@ -1,14 +0,0 @@
|
|||
namespace Sphagnum.Common.Contracts.Administration.Enums
|
||||
{
|
||||
public enum ExchangeType : byte
|
||||
{
|
||||
/// <summary>
|
||||
/// Раздает сообщения во все топики с подходящим ключём маршрутизации.
|
||||
/// </summary>
|
||||
Broadcast,
|
||||
/// <summary>
|
||||
/// Отправляет сообщение в одну из очередей с подходящим ключём маршрутизации.
|
||||
/// </summary>
|
||||
Topic,
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
namespace Sphagnum.Common.Contracts.Administration.Enums
|
||||
{
|
||||
public enum TopicType
|
||||
{
|
||||
Queue,
|
||||
Stack,
|
||||
}
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Administration.Requests;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Administration
|
||||
{
|
||||
public interface IAdministrationClient
|
||||
{
|
||||
public ValueTask CreateExchange(ExchangeCreationRequest exchangeCreationRequest);
|
||||
public ValueTask CreateTopic(TopicCreationRequest topicCreationRequest);
|
||||
public ValueTask DeleteTopic(string topicName);
|
||||
public ValueTask DeleteExchange(string exchangeName);
|
||||
public ValueTask BindTopic(TopicBindingRequest topicCreationRequest);
|
||||
public ValueTask UnbindTopic(TopicBindingRequest topicCreationRequest);
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Administration.Enums;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Administration.Requests
|
||||
{
|
||||
public readonly ref struct ExchangeCreationRequest
|
||||
{
|
||||
public readonly string ExchangeName;
|
||||
public readonly ExchangeType ExchangeType;
|
||||
}
|
||||
}
|
|
@ -1,11 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Messaging;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Administration.Requests
|
||||
{
|
||||
public readonly ref struct TopicBindingRequest
|
||||
{
|
||||
public readonly string TopicName;
|
||||
public readonly string ExchangeName;
|
||||
public readonly RoutingKey RoutingKey;
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Administration.Enums;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Administration.Requests
|
||||
{
|
||||
public readonly ref struct TopicCreationRequest
|
||||
{
|
||||
public readonly string TopicName;
|
||||
public readonly TopicType TopicType;
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
namespace Sphagnum.Common
|
||||
{
|
||||
internal static class Constants
|
||||
{
|
||||
public const int HashedUserDataSizeInfBytes = 32;
|
||||
public const int PayloadRecieverBufferSize = 8192;
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Messaging;
|
||||
|
||||
namespace Sphagnum.Common.Contracts
|
||||
{
|
||||
public interface ISphagnumClient : IMessagingClient
|
||||
{
|
||||
}
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
using Sphagnum.Common.Services;
|
||||
using System;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Login
|
||||
{
|
||||
public class ConnectionFactory
|
||||
{
|
||||
public int Port { get; set; }
|
||||
public string Hostname { get; set; } = string.Empty;
|
||||
public string Login { get; set; } = string.Empty;
|
||||
public string Password { get; set; } = string.Empty;
|
||||
public UserRights UserRights { get; set; }
|
||||
|
||||
internal virtual async Task<SphagnumConnection> CreateDefaultConnected(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
var conn = new SphagnumConnection(() => new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp)), messagesProcessorFactory);
|
||||
await conn.ConnectAsync(Hostname, Port);
|
||||
return conn;
|
||||
}
|
||||
|
||||
internal virtual SphagnumConnection CreateDefault(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
return new SphagnumConnection(() => new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp)), messagesProcessorFactory);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,13 +0,0 @@
|
|||
namespace Sphagnum.Common.Contracts.Login
|
||||
{
|
||||
public class UserData
|
||||
{
|
||||
public string Username { get; set; } = string.Empty;
|
||||
public string Password { get; set; } = string.Empty;
|
||||
public int Port { get; set; }
|
||||
public string Hostname { get; set; } = string.Empty;
|
||||
public UserRights UserRights { get; set; } =
|
||||
UserRights.MessagesConsuming |
|
||||
UserRights.MessagesPublishing;
|
||||
}
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Login
|
||||
{
|
||||
[Flags]
|
||||
public enum UserRights : ushort
|
||||
{
|
||||
None = 0,
|
||||
MessagesConsuming = 1,
|
||||
MessagesPublishing = 2,
|
||||
TopicCreating = 4,
|
||||
TopicDeleting = 8,
|
||||
TopicBinding = 16,
|
||||
ExchangeCreating = 32,
|
||||
ExchangeDeleting = 64,
|
||||
|
||||
All = MessagesConsuming | MessagesPublishing | TopicCreating | TopicDeleting | TopicBinding | ExchangeCreating | ExchangeDeleting,
|
||||
}
|
||||
}
|
|
@ -1,16 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging
|
||||
{
|
||||
public interface IMessagingClient
|
||||
{
|
||||
public ValueTask<IncommingMessage> Consume(CancellationToken cancellationToken);
|
||||
public ValueTask<Guid> Publish(OutgoingMessage message);
|
||||
public ValueTask Ack(Guid messageId);
|
||||
public ValueTask Nack(Guid messageId);
|
||||
public ValueTask Reject(Guid messageId);
|
||||
}
|
||||
}
|
|
@ -1,22 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Utils;
|
||||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging.Messages
|
||||
{
|
||||
internal readonly ref struct AuthMessage
|
||||
{
|
||||
public readonly ReadOnlyMemory<byte> Payload;
|
||||
public readonly Guid MessageId;
|
||||
|
||||
public AuthMessage(string login, string pwd, UserRights userRights)
|
||||
{
|
||||
MessageId = Guid.NewGuid();
|
||||
var data = new byte[Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes + 2];
|
||||
HashCalculator.Calc(login).CopyTo(data, 0);
|
||||
HashCalculator.Calc(pwd).CopyTo(data, Constants.HashedUserDataSizeInfBytes);
|
||||
BitConverter.TryWriteBytes(data.AsSpan(Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes), (ushort)userRights);
|
||||
Payload = data;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging.Messages
|
||||
{
|
||||
internal readonly ref struct AuthResultMessage
|
||||
{
|
||||
public readonly ReadOnlyMemory<byte> Payload;
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging.Messages
|
||||
{
|
||||
public readonly struct IncommingMessage
|
||||
{
|
||||
public readonly Guid MessageId;
|
||||
|
||||
public readonly ReadOnlyMemory<byte> Payload;
|
||||
|
||||
public IncommingMessage(Guid messageId, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
MessageId = messageId;
|
||||
Payload = payload;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging.Messages
|
||||
{
|
||||
public readonly struct OutgoingMessage
|
||||
{
|
||||
public readonly string Exchange;
|
||||
|
||||
public readonly RoutingKey RoutingKey;
|
||||
|
||||
public readonly ReadOnlyMemory<byte> Payload;
|
||||
|
||||
public OutgoingMessage(string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
Exchange = exchange;
|
||||
RoutingKey = routingKey;
|
||||
Payload = payload;
|
||||
}
|
||||
|
||||
public OutgoingMessage(string exchange, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
Exchange = exchange;
|
||||
RoutingKey = new RoutingKey();
|
||||
Payload = payload;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Exceptions
|
||||
{
|
||||
public class AuthException : Exception
|
||||
{
|
||||
}
|
||||
}
|
|
@ -4,10 +4,15 @@ using System.Net.Sockets;
|
|||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Infrastructure
|
||||
namespace Sphagnum.Common.Infrastructure.Contracts
|
||||
{
|
||||
/// <summary>
|
||||
/// Абстракция над подключением (сокет или Channel в случае работы клиента и сервра внутри одного процесса).
|
||||
/// </summary>
|
||||
internal interface IConnection : IDisposable
|
||||
{
|
||||
public CancellationTokenSource CancellationTokenSource { get; }
|
||||
public Guid ConnectionId { get; }
|
||||
Task ConnectAsync(string host, int port);
|
||||
Task<IConnection> AcceptAsync();
|
||||
//todo прописать бросаемые исключения
|
||||
|
@ -22,5 +27,7 @@ namespace Sphagnum.Common.Contracts.Infrastructure
|
|||
//todo прописать бросаемые исключения
|
||||
void Close();
|
||||
bool Connected { get; }
|
||||
|
||||
public event Action<Guid>? ConnectionClosed;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using System;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Infrastructure.Extensions
|
||||
{
|
||||
internal static class IConnectionExtensions
|
||||
{
|
||||
public static async ValueTask<byte[]> ReceiveAsync(this IConnection connection, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var lengthBuffer = new byte[5];
|
||||
await connection.ReceiveAsync(lengthBuffer, SocketFlags.Peek, cancellationToken);
|
||||
var length = BitConverter.ToInt32(lengthBuffer, 0);
|
||||
var result = new byte[length];
|
||||
await connection.ReceiveAsync(result, SocketFlags.None, cancellationToken);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Infrastructure.Services
|
||||
{
|
||||
public class ConnectionFactory
|
||||
{
|
||||
public int Port { get; set; }
|
||||
public string Hostname { get; set; } = string.Empty;
|
||||
public string Login { get; set; } = string.Empty;
|
||||
public string Password { get; set; } = string.Empty;
|
||||
|
||||
internal virtual async Task<IConnection> CreateConnection(bool connected = true)
|
||||
{
|
||||
var conn = new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
|
||||
if (connected)
|
||||
{
|
||||
await conn.ConnectAsync(Hostname, Port);
|
||||
}
|
||||
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,14 +1,20 @@
|
|||
using Sphagnum.Common.Contracts.Infrastructure;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using System;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Services
|
||||
namespace Sphagnum.Common.Infrastructure.Services
|
||||
{
|
||||
internal class SocketConnection : IConnection
|
||||
{
|
||||
public Guid ConnectionId { get; private set; } = Guid.NewGuid();
|
||||
public bool Connected => _socket.Connected;
|
||||
public CancellationTokenSource CancellationTokenSource { get; private set; } = new CancellationTokenSource();
|
||||
|
||||
public event Action<Guid>? ConnectionClosed;
|
||||
|
||||
private readonly Socket _socket;
|
||||
|
||||
public SocketConnection(Socket socket)
|
||||
|
@ -16,8 +22,6 @@ namespace Sphagnum.Common.Services
|
|||
_socket = socket;
|
||||
}
|
||||
|
||||
public bool Connected => _socket.Connected;
|
||||
|
||||
public async Task<IConnection> AcceptAsync()
|
||||
{
|
||||
var socket = await _socket.AcceptAsync();
|
||||
|
@ -31,6 +35,7 @@ namespace Sphagnum.Common.Services
|
|||
|
||||
public void Close()
|
||||
{
|
||||
ConnectionClosed?.Invoke(ConnectionId);
|
||||
_socket.Close();
|
||||
}
|
||||
|
||||
|
@ -41,6 +46,7 @@ namespace Sphagnum.Common.Services
|
|||
|
||||
public void Dispose()
|
||||
{
|
||||
ConnectionClosed?.Invoke(ConnectionId);
|
||||
_socket.Dispose();
|
||||
}
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Utils.Models
|
||||
namespace Sphagnum.Common.Messaging.Contracts
|
||||
{
|
||||
[Flags]
|
||||
internal enum MessageFlags : ushort
|
|
@ -1,4 +1,4 @@
|
|||
namespace Sphagnum.Common.Utils.Models
|
||||
namespace Sphagnum.Common.Messaging.Contracts
|
||||
{
|
||||
internal enum MessageType : byte
|
||||
{
|
|
@ -0,0 +1,32 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Messaging.Contracts.Messages
|
||||
{
|
||||
public readonly struct Message
|
||||
{
|
||||
public readonly Guid MessageId;
|
||||
|
||||
public readonly ReadOnlyMemory<byte> Payload;
|
||||
|
||||
public readonly string Exchange;
|
||||
|
||||
public readonly RoutingKey RoutingKey;
|
||||
|
||||
public Message(string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
Exchange = exchange;
|
||||
RoutingKey = routingKey;
|
||||
Payload = payload;
|
||||
RoutingKey = routingKey;
|
||||
MessageId = Guid.NewGuid();
|
||||
}
|
||||
|
||||
internal Message(Guid messageId, string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
Exchange = exchange;
|
||||
RoutingKey = routingKey;
|
||||
Payload = payload;
|
||||
MessageId = messageId;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
namespace Sphagnum.Common.Contracts.Messaging
|
||||
namespace Sphagnum.Common.Messaging.Contracts
|
||||
{
|
||||
public readonly struct RoutingKey
|
||||
{
|
||||
|
@ -14,5 +14,7 @@
|
|||
Part2 = part2;
|
||||
Part3 = part3;
|
||||
}
|
||||
|
||||
public readonly static RoutingKey Empty = new RoutingKey();
|
||||
}
|
||||
}
|
|
@ -1,11 +1,10 @@
|
|||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Utils.Models;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
using System;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
|
||||
namespace Sphagnum.Common.Utils
|
||||
namespace Sphagnum.Common.Messaging.Utils
|
||||
{
|
||||
/// <summary>
|
||||
/// Порядок передачи:
|
||||
|
@ -21,7 +20,7 @@ namespace Sphagnum.Common.Utils
|
|||
/// </summary>
|
||||
internal static class MessageParser
|
||||
{
|
||||
public static OutgoingMessage UnpackOutgoingMessage(byte[] bytes)
|
||||
public static Message UnpackMessage(byte[] bytes)
|
||||
{
|
||||
if ((MessageType)bytes[4] != MessageType.Common)
|
||||
{
|
||||
|
@ -30,64 +29,11 @@ namespace Sphagnum.Common.Utils
|
|||
var exchangeName = GetExchangeName(bytes);
|
||||
var routingKey = GetRoutingKey(bytes);
|
||||
var payload = GetPayload(bytes);
|
||||
return new OutgoingMessage(exchangeName, routingKey, payload);
|
||||
}
|
||||
|
||||
public static IncommingMessage UnpackIncomingMessage(byte[] bytes)
|
||||
{
|
||||
if ((MessageType)bytes[4] != MessageType.Common)
|
||||
{
|
||||
throw new ArgumentException("Uncorrect message type! 1 (MessageType.Common) expected!");
|
||||
}
|
||||
var id = GetMessageId(bytes);
|
||||
var payload = GetPayload(bytes);
|
||||
return new IncommingMessage(id, payload);
|
||||
return new Message(id, exchangeName, routingKey, payload);
|
||||
}
|
||||
|
||||
public static byte[] PackMessage(IncommingMessage message)
|
||||
{
|
||||
var result = new byte[27 + message.Payload.Length];
|
||||
result[4] = (byte)MessageType.Common;
|
||||
var flags = MessageFlags.HasPayload;
|
||||
BitConverter.TryWriteBytes(result.AsSpan(5, 2), (ushort)flags);
|
||||
message.MessageId.TryWriteBytes(result.AsSpan(7));
|
||||
BitConverter.TryWriteBytes(result.AsSpan(23), message.Payload.Length);
|
||||
message.Payload.CopyTo(result.AsMemory(27));
|
||||
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
|
||||
return result;
|
||||
}
|
||||
|
||||
public static byte[] PackReplyMessage(MessageType messageType)
|
||||
{
|
||||
var res = new byte[23];
|
||||
res[4] = (byte)messageType;
|
||||
res[0] = 23;
|
||||
return res;
|
||||
}
|
||||
|
||||
public static byte[] PackReplyMessage(MessageType messageType, Guid parentMessageId)
|
||||
{
|
||||
var res = new byte[23];
|
||||
res[4] = (byte)messageType;
|
||||
res[0] = 23;
|
||||
parentMessageId.TryWriteBytes(res.AsSpan(7));
|
||||
return res;
|
||||
}
|
||||
|
||||
public static byte[] PackMessage(AuthMessage message)
|
||||
{
|
||||
var result = new byte[27 + message.Payload.Length];
|
||||
result[4] = (byte)MessageType.Auth;
|
||||
var flags = MessageFlags.HasPayload;
|
||||
BitConverter.TryWriteBytes(result.AsSpan(5, 2), (ushort)flags);
|
||||
message.MessageId.TryWriteBytes(result.AsSpan(7));
|
||||
BitConverter.TryWriteBytes(result.AsSpan(23), message.Payload.Length);
|
||||
message.Payload.CopyTo(result.AsMemory(27));
|
||||
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
|
||||
return result;
|
||||
}
|
||||
|
||||
public static byte[] PackMessage(OutgoingMessage message)
|
||||
public static byte[] PackMessage(Message message)
|
||||
{
|
||||
if (string.IsNullOrEmpty(message.Exchange) || string.IsNullOrWhiteSpace(message.Exchange))
|
||||
{
|
||||
|
@ -115,7 +61,7 @@ namespace Sphagnum.Common.Utils
|
|||
var exchangeNameBytes = Encoding.UTF8.GetBytes(message.Exchange);// todo перевести на более оптимальный метод, не аллоцирующий лишнего.
|
||||
count += exchangeNameBytes.Length;
|
||||
count++;
|
||||
return Pack(message, Guid.NewGuid(), flags, count);
|
||||
return Pack(message, flags, count);
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
|
@ -126,30 +72,40 @@ namespace Sphagnum.Common.Utils
|
|||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static Guid GetMessageId(Span<byte> bytes)
|
||||
internal static byte[] Pack(Message message, MessageFlags flags, int count)
|
||||
{
|
||||
var slice = bytes.Slice(7, 16);
|
||||
return new Guid(slice);
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static Guid GetMessageId(ReadOnlyMemory<byte> bytes)
|
||||
{
|
||||
var slice = bytes.Slice(7, 16);
|
||||
return new Guid(slice.Span);
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static void CopyId(ReadOnlySpan<byte> bytes, byte[] buffer)
|
||||
{
|
||||
var slice = bytes.Slice(7, 16);
|
||||
slice.CopyTo(buffer.AsSpan(7, 16));
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static MessageType GetMessageType(Span<byte> bytes)
|
||||
{
|
||||
return (MessageType)bytes[4];
|
||||
var result = new byte[count];
|
||||
result[4] = (byte)MessageType.Common;
|
||||
var shift = 5;
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), (ushort)flags);//2. flags
|
||||
shift += 2;
|
||||
message.MessageId.TryWriteBytes(result.AsSpan(shift));//3. id
|
||||
shift += 16;
|
||||
if ((flags & MessageFlags.HasExchange) == MessageFlags.HasExchange)
|
||||
{
|
||||
var exchangeBytes = Encoding.UTF8.GetBytes(message.Exchange);
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), (byte)message.Exchange.Length);//4. ExchangeNameLength
|
||||
shift += 1;
|
||||
exchangeBytes.CopyTo(result.AsSpan(shift));//5. ExchangeName
|
||||
shift += exchangeBytes.Length;
|
||||
}
|
||||
if ((flags & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey)//6. RoutingKey
|
||||
{
|
||||
result[shift] = message.RoutingKey.Part1;
|
||||
shift++;
|
||||
result[shift] = message.RoutingKey.Part2;
|
||||
shift++;
|
||||
result[shift] = message.RoutingKey.Part3;
|
||||
shift++;
|
||||
}
|
||||
if ((flags & MessageFlags.HasPayload) == MessageFlags.HasPayload)
|
||||
{
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), message.Payload.Length);//7. PayloadSize
|
||||
shift += 4;
|
||||
message.Payload.CopyTo(result.AsMemory(shift));//8. Payload
|
||||
}
|
||||
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
|
||||
return result;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
|
@ -180,80 +136,30 @@ namespace Sphagnum.Common.Utils
|
|||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static int GetPayloadStart(Span<byte> bytes)
|
||||
internal static MessageType GetMessageType(Span<byte> bytes)
|
||||
{
|
||||
if (HasPayload(bytes))
|
||||
{
|
||||
var shift = 27;
|
||||
if (HasExchange(bytes))//todo проверить бенчмарком, как работает инлайн
|
||||
{
|
||||
shift += bytes[23];
|
||||
shift += 1;
|
||||
}
|
||||
if (HasKey(bytes))
|
||||
{
|
||||
shift += 3;
|
||||
}
|
||||
return shift;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
internal static byte[] Pack(OutgoingMessage message, Guid id, MessageFlags flags, int count)
|
||||
{
|
||||
var result = new byte[count];
|
||||
result[4] = (byte)MessageType.Common;
|
||||
var shift = 5;
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), (ushort)flags);//2. flags
|
||||
shift += 2;
|
||||
id.TryWriteBytes(result.AsSpan(shift));//3. id
|
||||
shift += 16;
|
||||
if ((flags & MessageFlags.HasExchange) == MessageFlags.HasExchange)
|
||||
{
|
||||
var exchangeBytes = Encoding.UTF8.GetBytes(message.Exchange);
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), (byte)(message.Exchange.Length));//4. ExchangeNameLength
|
||||
shift += 1;
|
||||
exchangeBytes.CopyTo(result.AsSpan(shift));//5. ExchangeName
|
||||
shift += exchangeBytes.Length;
|
||||
}
|
||||
if ((flags & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey)//6. RoutingKey
|
||||
{
|
||||
result[shift] = message.RoutingKey.Part1;
|
||||
shift++;
|
||||
result[shift] = message.RoutingKey.Part2;
|
||||
shift++;
|
||||
result[shift] = message.RoutingKey.Part3;
|
||||
shift++;
|
||||
}
|
||||
if ((flags & MessageFlags.HasPayload) == MessageFlags.HasPayload)
|
||||
{
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), message.Payload.Length);//7. PayloadSize
|
||||
shift += 4;
|
||||
message.Payload.CopyTo(result.AsMemory(shift));//8. Payload
|
||||
}
|
||||
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
|
||||
return result;
|
||||
return bytes.Length < 5 ? MessageType.Unknown : (MessageType)bytes[4];
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool HasKey(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return (((MessageFlags)value & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey);
|
||||
return ((MessageFlags)value & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool HasPayload(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return (((MessageFlags)value & MessageFlags.HasPayload) == MessageFlags.HasPayload);
|
||||
return ((MessageFlags)value & MessageFlags.HasPayload) == MessageFlags.HasPayload;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool HasExchange(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return (((MessageFlags)value & MessageFlags.HasExchange) == MessageFlags.HasExchange);
|
||||
return ((MessageFlags)value & MessageFlags.HasExchange) == MessageFlags.HasExchange;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
@ -1,36 +0,0 @@
|
|||
using System.Collections.Concurrent;
|
||||
using System.Threading.Channels;
|
||||
|
||||
namespace Sphagnum.Common.Services
|
||||
{
|
||||
internal class ChannelsPool
|
||||
{
|
||||
private readonly int _poolSize;
|
||||
public ChannelsPool(int poolSize)
|
||||
{
|
||||
_poolSize = poolSize;
|
||||
}
|
||||
|
||||
private readonly ConcurrentQueue<Channel<byte[]>> channels = new ConcurrentQueue<Channel<byte[]>>();
|
||||
|
||||
public Channel<byte[]> Get()
|
||||
{
|
||||
if (channels.TryDequeue(out var channel))
|
||||
{
|
||||
return channel;
|
||||
}
|
||||
else
|
||||
{
|
||||
return Channel.CreateBounded<byte[]>(1);
|
||||
}
|
||||
}
|
||||
|
||||
public void Return(Channel<byte[]> channel)
|
||||
{
|
||||
if (channels.Count < _poolSize)
|
||||
{
|
||||
channels.Enqueue(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,143 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Infrastructure;
|
||||
using Sphagnum.Common.Exceptions;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Utils.Models;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Services
|
||||
{
|
||||
internal class SphagnumConnection
|
||||
{
|
||||
protected readonly IConnection _connection;
|
||||
private readonly ChannelsPool _pool = new ChannelsPool(100);
|
||||
private readonly ConcurrentDictionary<Guid, Channel<byte[]>> sendingItems = new ConcurrentDictionary<Guid, Channel<byte[]>>();
|
||||
private readonly Func<Func<byte[], Task>> _messagesProcessorFactory;
|
||||
private readonly Func<byte[], Task> _messagesProcessor;
|
||||
|
||||
public SphagnumConnection(Func<IConnection> connectionsFactory, Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
_connection = connectionsFactory(); // new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
|
||||
_messagesProcessorFactory = messagesProcessorFactory;
|
||||
_messagesProcessor = _messagesProcessorFactory();
|
||||
RecievingTask();
|
||||
}
|
||||
|
||||
private SphagnumConnection(IConnection socket, Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
_connection = socket;
|
||||
_messagesProcessorFactory = messagesProcessorFactory;
|
||||
_messagesProcessor = _messagesProcessorFactory();
|
||||
}
|
||||
|
||||
public bool Connected => _connection.Connected;
|
||||
|
||||
public Task ConnectAsync(string host, int port)
|
||||
{
|
||||
return _connection.ConnectAsync(host, port);
|
||||
}
|
||||
|
||||
public async virtual Task<SphagnumConnection> AcceptAsync()
|
||||
{
|
||||
var socket = await _connection.AcceptAsync();
|
||||
return new SphagnumConnection(socket, _messagesProcessorFactory);
|
||||
}
|
||||
|
||||
public void Bind(int port)
|
||||
{
|
||||
_connection.Bind(new IPEndPoint(IPAddress.Any, port));
|
||||
}
|
||||
|
||||
public void Close()
|
||||
{
|
||||
_connection.Close();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
_connection.Dispose();
|
||||
}
|
||||
|
||||
public void Listen(int backlog)
|
||||
{
|
||||
_connection.Listen(backlog);
|
||||
}
|
||||
|
||||
public async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
int res;
|
||||
if (buffer.Span[4] != (byte)MessageType.MessageAccepted)
|
||||
{
|
||||
var channel = _pool.Get();
|
||||
sendingItems.TryAdd(MessageParser.GetMessageId(buffer), channel);
|
||||
res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
|
||||
var message = await channel.Reader.WaitToReadAsync(cancellationToken);// todo обработка сообщения
|
||||
_pool.Return(channel);
|
||||
return res;
|
||||
}
|
||||
else
|
||||
{
|
||||
res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
private async ValueTask<byte[]> ReceiveAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
var lengthBuffer = new byte[4];
|
||||
await _connection.ReceiveAsync(lengthBuffer, SocketFlags.Peek, cancellationToken);
|
||||
var length = BitConverter.ToInt32(lengthBuffer, 0);
|
||||
var result = new byte[length];
|
||||
await _connection.ReceiveAsync(result, SocketFlags.None, cancellationToken);
|
||||
return result;
|
||||
}
|
||||
|
||||
private async Task RecievingTask(CancellationToken token = default)
|
||||
{
|
||||
var successRespBuffer = new byte[23];
|
||||
successRespBuffer[4] = (byte)MessageType.MessageAccepted;
|
||||
successRespBuffer[0] = 23;
|
||||
while (Connected)
|
||||
{
|
||||
try
|
||||
{
|
||||
var message = await ReceiveAsync(token);
|
||||
if (message[4] != (byte)MessageType.MessageAccepted)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _messagesProcessor(message);
|
||||
MessageParser.CopyId(message, successRespBuffer);
|
||||
await SendAsync(successRespBuffer, token);
|
||||
}
|
||||
catch (AuthException ex)
|
||||
{
|
||||
await SendAsync(successRespBuffer, token);
|
||||
await Task.Delay(1000);
|
||||
_connection.Close();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
else if (sendingItems.TryRemove(MessageParser.GetMessageId(message), out var channel))
|
||||
{
|
||||
await channel.Writer.WriteAsync(message);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
_connection.Close();
|
||||
_connection.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,8 +5,4 @@
|
|||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
|
||||
namespace Sphagnum.Common.Utils
|
||||
{
|
||||
internal static class HashCalculator
|
||||
{
|
||||
private readonly static SHA256 _hash = SHA256.Create();
|
||||
public static byte[] Calc(string text)
|
||||
{
|
||||
return _hash.ComputeHash(Encoding.UTF8.GetBytes(text));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,3 +1,3 @@
|
|||
using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("Sphagnum.Server.FuncTests")]
|
||||
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]
|
|
@ -1,66 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Services;
|
||||
using Sphagnum.Server.Cluster.Contracts;
|
||||
using Sphagnum.Server.Cluster.Services;
|
||||
using Sphagnum.Server.DataProcessing.Contracts;
|
||||
using Sphagnum.Server.DataProcessing.Services;
|
||||
using Sphagnum.Server.Storage.Messages.Contracts;
|
||||
using Sphagnum.Server.Storage.Messages.Services;
|
||||
using Sphagnum.Server.Storage.Users.Contracts;
|
||||
using Sphagnum.Server.Storage.Users.Services;
|
||||
|
||||
namespace Sphagnum.Server.Broker.Services
|
||||
{
|
||||
internal class BrokerDefaultBase(ConnectionFactory connectionFactory, IMessagesStorage messagesStorage, IDistributor distributor, IDataProcessor dataProcessor)
|
||||
{
|
||||
private readonly SphagnumConnection _connection;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
private Task? _acceptationTask;
|
||||
|
||||
private readonly IAuthInfoStorage _authInfoStorage = new AuthInfoStorageBase();
|
||||
private readonly IMessagesStorage _messagesStorage = messagesStorage;
|
||||
private readonly IDistributor _distributor = distributor;
|
||||
private readonly IDataProcessor _dataProcessor = dataProcessor;
|
||||
private readonly ConnectionFactory _connectionFactory = connectionFactory;
|
||||
|
||||
public Task StartAsync(int port)
|
||||
{
|
||||
_connectionFactory.CreateDefault(() =>
|
||||
{
|
||||
var processor = new MessagesProcessor(_authInfoStorage, _messagesStorage, _distributor, _dataProcessor);
|
||||
return processor.ProcessMessage;
|
||||
});
|
||||
_connection?.Bind(port);
|
||||
_connection?.Listen(1000); //todo разобраться что делает этот параметр.
|
||||
//_acceptationTask = AcceptationWorker(_cts.Token);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
|
||||
public Task StopAsync()
|
||||
{
|
||||
_cts.Cancel();
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
internal static BrokerDefaultBase Create(ConnectionFactory connectionFactory)
|
||||
{
|
||||
return new BrokerDefaultBase(
|
||||
connectionFactory,
|
||||
new MessagesStorageDefault(),
|
||||
new DistributorDefault(),
|
||||
new DataProcessorDefault()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
private async Task AcceptationWorker(CancellationToken token)
|
||||
{
|
||||
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
var acceptedSocket = await _connection.AcceptAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
using Microsoft.Extensions.Hosting;
|
||||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Server.Broker.Services;
|
||||
|
||||
namespace Sphagnum.Server
|
||||
{
|
||||
public class BrokerHost : IHostedService
|
||||
{
|
||||
private readonly BrokerDefaultBase _broker;
|
||||
|
||||
public BrokerHost(ConnectionFactory connectionFactory)
|
||||
{
|
||||
_broker = BrokerDefaultBase.Create(connectionFactory);
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return _broker.StartAsync(8081);
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return _broker.StopAsync();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
using Microsoft.Extensions.Hosting;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Infrastructure.Services;
|
||||
using System.Net;
|
||||
|
||||
namespace Sphagnum.Server.Broker.Services
|
||||
{
|
||||
public class BrokerService : IHostedService
|
||||
{
|
||||
private readonly ConnectionsManager _manager;
|
||||
public BrokerService(ConnectionsManager manager)
|
||||
{
|
||||
_manager = manager;
|
||||
}
|
||||
|
||||
public Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Server.Broker.Services
|
||||
{
|
||||
public class ConnectionsManager
|
||||
{
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Server.Broker.Services
|
||||
{
|
||||
internal class MessagesBus
|
||||
{
|
||||
}
|
||||
}
|
|
@ -1,68 +1,36 @@
|
|||
using Sphagnum.Common;
|
||||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Exceptions;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Utils.Models;
|
||||
using Sphagnum.Server.Cluster.Contracts;
|
||||
using Sphagnum.Server.DataProcessing.Contracts;
|
||||
using Sphagnum.Server.Storage.Messages.Contracts;
|
||||
using Sphagnum.Server.Storage.Users.Contracts;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Infrastructure.Extensions;
|
||||
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace Sphagnum.Server.Broker.Services
|
||||
{
|
||||
internal class MessagesProcessor
|
||||
public class MessagesProcessor
|
||||
{
|
||||
private bool AuthOk = true;
|
||||
private readonly IWalWriter _walWriter;
|
||||
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
|
||||
|
||||
private readonly IAuthInfoStorage _authInfoStorage;
|
||||
private readonly IMessagesStorage _messagesStorage;
|
||||
private readonly IDistributor _distributor;
|
||||
private readonly IDataProcessor _dataProcessor;
|
||||
public MessagesProcessor(IAuthInfoStorage authInfoStorage, IMessagesStorage messagesStorage, IDistributor distributor, IDataProcessor dataProcessor)
|
||||
public MessagesProcessor(IWalWriter walWriter)
|
||||
{
|
||||
_authInfoStorage = authInfoStorage;
|
||||
_messagesStorage = messagesStorage;
|
||||
_distributor = distributor;
|
||||
_dataProcessor = dataProcessor;
|
||||
_walWriter = walWriter;
|
||||
}
|
||||
|
||||
internal async Task ProcessMessage(byte[] message)
|
||||
internal void AddConnection(IConnection connection)
|
||||
{
|
||||
if (AuthOk)
|
||||
connection.ConnectionClosed += (id) =>
|
||||
{
|
||||
await _messagesStorage.LogMessage(message);
|
||||
await _distributor.DistributeData(message);
|
||||
await _dataProcessor.PutMessage(message);
|
||||
}
|
||||
else if (await CheckRights(message))
|
||||
{
|
||||
AuthOk = true;
|
||||
await _messagesStorage.LogMessage(message);
|
||||
await _distributor.DistributeData(message);
|
||||
await _dataProcessor.PutMessage(message);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new AuthException();
|
||||
}
|
||||
Connections.TryRemove(id, out var conn);
|
||||
};
|
||||
var _ = ProcessMessages(connection);
|
||||
}
|
||||
|
||||
private async ValueTask<bool> CheckRights(byte[] buffer)
|
||||
internal async Task ProcessMessages(IConnection connection)
|
||||
{
|
||||
var messageType = MessageParser.GetMessageType(buffer);
|
||||
if (messageType == MessageType.Auth)
|
||||
while (!connection.CancellationTokenSource.IsCancellationRequested)
|
||||
{
|
||||
var payloadStart = MessageParser.GetPayloadStart(buffer);
|
||||
var rights = (UserRights)BitConverter.ToInt16(buffer.AsSpan(Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes + payloadStart, 2));
|
||||
var isRecievingAllowed = await _authInfoStorage.CheckRights(
|
||||
buffer.AsSpan(payloadStart, Constants.HashedUserDataSizeInfBytes),
|
||||
buffer.AsSpan(payloadStart + Constants.HashedUserDataSizeInfBytes, Constants.HashedUserDataSizeInfBytes),
|
||||
rights,
|
||||
new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token
|
||||
);
|
||||
return isRecievingAllowed;
|
||||
var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token);
|
||||
await _walWriter.WriteData(data);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Infrastructure.Services;
|
||||
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net;
|
||||
using Sphagnum.Common.Infrastructure.Extensions;
|
||||
|
||||
namespace Sphagnum.Server.Broker.Services
|
||||
{
|
||||
public class Reciever
|
||||
{
|
||||
public readonly CancellationTokenSource Cts = new();
|
||||
|
||||
private readonly IConnection _listeningConnection;
|
||||
private readonly IWalWriter _walWriter;
|
||||
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
|
||||
public Reciever(ConnectionFactory connectionFactory, IWalWriter walWriter)
|
||||
{
|
||||
_listeningConnection = connectionFactory.CreateConnection(false).Result;
|
||||
_listeningConnection.Bind(new IPEndPoint(IPAddress.Any, connectionFactory.Port));
|
||||
_listeningConnection.Listen(1000); //todo разобраться что делает этот параметр.
|
||||
_walWriter = walWriter;
|
||||
var _ = AcceptationWorker();
|
||||
}
|
||||
|
||||
internal void AddConnection(IConnection connection)
|
||||
{
|
||||
Connections[connection.ConnectionId] = connection;
|
||||
connection.ConnectionClosed += (id) =>
|
||||
{
|
||||
Connections.TryRemove(id, out var conn);
|
||||
};
|
||||
ProcessMessages(connection);
|
||||
}
|
||||
|
||||
private async Task AcceptationWorker()
|
||||
{
|
||||
while (!Cts.IsCancellationRequested)
|
||||
{
|
||||
var acceptedSocket = await _listeningConnection.AcceptAsync();
|
||||
AddConnection(acceptedSocket);
|
||||
}
|
||||
}
|
||||
|
||||
internal async Task ProcessMessages(IConnection connection)
|
||||
{
|
||||
while (!connection.CancellationTokenSource.IsCancellationRequested)
|
||||
{
|
||||
var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token);
|
||||
await _walWriter.WriteData(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
namespace Sphagnum.Server.Cluster.Contracts
|
||||
{
|
||||
public interface IDistributor
|
||||
{
|
||||
ValueTask DistributeData(ReadOnlySpan<byte> data);
|
||||
}
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
using Sphagnum.Server.Cluster.Contracts;
|
||||
|
||||
namespace Sphagnum.Server.Cluster.Services
|
||||
{
|
||||
internal class DistributorDefault : IDistributor
|
||||
{
|
||||
public ValueTask DistributeData(ReadOnlySpan<byte> data)
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
namespace Sphagnum.Server.DataProcessing.Contracts
|
||||
{
|
||||
internal interface IDataProcessor
|
||||
{
|
||||
public bool RegisterCoprocessor(string key, Func<byte[], Task> func);
|
||||
public Func<byte[], Task>? UnregisterCoprocessor(string key);
|
||||
public ValueTask PutMessage(ReadOnlySpan<byte> message);
|
||||
}
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
using Sphagnum.Server.DataProcessing.Contracts;
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace Sphagnum.Server.DataProcessing.Services
|
||||
{
|
||||
internal class DataProcessorDefault : IDataProcessor
|
||||
{
|
||||
private readonly ConcurrentDictionary<string, Func<byte[], Task>> _processors = new();
|
||||
|
||||
public ValueTask PutMessage(ReadOnlySpan<byte> message)
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
public bool RegisterCoprocessor(string key, Func<byte[], Task> func)
|
||||
{
|
||||
return _processors.TryAdd(key, func);
|
||||
}
|
||||
|
||||
public Func<byte[], Task>? UnregisterCoprocessor(string key)
|
||||
{
|
||||
if (_processors.TryRemove(key, out Func<byte[], Task>? res))
|
||||
{
|
||||
return res;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
namespace Sphagnum.Server.Scheme.Contracts
|
||||
{
|
||||
public interface IExchange
|
||||
{
|
||||
public string Name { get; init; }
|
||||
|
||||
public Task SendMessage();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
namespace Sphagnum.Server.Storage
|
||||
{
|
||||
internal class SchemaManager
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -14,4 +14,8 @@
|
|||
<ProjectReference Include="..\Sphagnum.Common\Sphagnum.Common.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Folder Include="Storage\Utils\" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
namespace Sphagnum.Server.Storage.Contracts
|
||||
{
|
||||
public class StorageConfig
|
||||
{
|
||||
public readonly string WalDirectory;
|
||||
public readonly long WalPageMinSize = 1024 * 4;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
|
||||
{
|
||||
internal interface IWalCursorManager
|
||||
{
|
||||
ValueTask<WalCursor> GetCursor();
|
||||
ValueTask SetCursor(WalCursor walCursor);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
|
||||
{
|
||||
internal interface IWalStreamSource
|
||||
{
|
||||
public Stream GetStream(string pathToFile);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
|
||||
{
|
||||
public interface IWalWriter
|
||||
{
|
||||
public Task WriteData(byte[] data);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
namespace Sphagnum.Server.Storage.Contracts.Wal
|
||||
{
|
||||
public readonly struct WalCursor
|
||||
{
|
||||
public readonly long PageId;
|
||||
public readonly long CurrentPosition;
|
||||
|
||||
public WalCursor(long pageId, long currentPosition)
|
||||
{
|
||||
PageId = pageId;
|
||||
CurrentPosition = currentPosition;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,7 +0,0 @@
|
|||
namespace Sphagnum.Server.Storage.Messages.Contracts
|
||||
{
|
||||
internal interface IMessagesStorage
|
||||
{
|
||||
ValueTask LogMessage(ReadOnlyMemory<byte> message);
|
||||
}
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
using Sphagnum.Server.Storage.Messages.Contracts;
|
||||
|
||||
namespace Sphagnum.Server.Storage.Messages.Services
|
||||
{
|
||||
internal class MessagesStorageDefault : IMessagesStorage
|
||||
{
|
||||
public ValueTask LogMessage(ReadOnlyMemory<byte> message)
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
using Sphagnum.Server.Storage.Contracts;
|
||||
using Sphagnum.Server.Storage.Contracts.Wal;
|
||||
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
|
||||
|
||||
namespace Sphagnum.Server.Storage.Services.Wal
|
||||
{
|
||||
internal class WalCursorManager : IWalCursorManager
|
||||
{
|
||||
private readonly string _cursorPath;
|
||||
private byte[]? data;
|
||||
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1);
|
||||
public WalCursorManager(StorageConfig storageConfig)
|
||||
{
|
||||
_cursorPath = Path.Combine(storageConfig.WalDirectory, "cursor");
|
||||
}
|
||||
|
||||
public async ValueTask<WalCursor> GetCursor()
|
||||
{
|
||||
await _semaphore.WaitAsync();
|
||||
try
|
||||
{
|
||||
if (data == null)
|
||||
{
|
||||
data = await File.ReadAllBytesAsync(_cursorPath);
|
||||
}
|
||||
var result = new WalCursor(BitConverter.ToInt64(data, 0), BitConverter.ToInt64(data, 8));
|
||||
return result;
|
||||
}
|
||||
finally { _semaphore.Release(); }
|
||||
}
|
||||
|
||||
public async ValueTask SetCursor(WalCursor walCursor)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (data == null)
|
||||
{
|
||||
data = new byte[16];
|
||||
}
|
||||
BitConverter.TryWriteBytes(data.AsSpan(0, 8), walCursor.PageId);
|
||||
BitConverter.TryWriteBytes(data.AsSpan(8, 8), walCursor.CurrentPosition);
|
||||
await File.WriteAllBytesAsync(_cursorPath, data);
|
||||
}
|
||||
finally { _semaphore.Release(); }
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
|
||||
|
||||
namespace Sphagnum.Server.Storage.Services.Wal
|
||||
{
|
||||
internal class WalStreamSource : IWalStreamSource
|
||||
{
|
||||
public Stream GetStream(string pathToFile)
|
||||
{
|
||||
var stream = new FileStream(pathToFile, FileMode.OpenOrCreate, FileAccess.Write);
|
||||
return stream;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
using Microsoft.Extensions.Logging;
|
||||
using Sphagnum.Server.Storage.Contracts;
|
||||
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
|
||||
using System.Globalization;
|
||||
|
||||
namespace Sphagnum.Server.Storage.Services.Wal
|
||||
{
|
||||
internal class WalWriterDefault : IWalWriter
|
||||
{
|
||||
private readonly SemaphoreSlim _semaphore = new(0, 1);
|
||||
|
||||
private readonly string _walDirectory;
|
||||
private readonly long _walPageSize;
|
||||
|
||||
private string? _walPagePath;
|
||||
private long CurrentPageSize = 0;
|
||||
private long CurrentPage = 0;
|
||||
|
||||
private readonly IWalStreamSource _streamSource;
|
||||
private readonly ILogger<WalWriterDefault> _logger;
|
||||
public WalWriterDefault(StorageConfig storageConfig, IWalStreamSource streamSource, ILogger<WalWriterDefault> logger)
|
||||
{
|
||||
_streamSource = streamSource;
|
||||
_walDirectory = storageConfig.WalDirectory;
|
||||
_walPageSize = storageConfig.WalPageMinSize;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task WriteData(byte[] data)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(_walPagePath))
|
||||
{
|
||||
await _semaphore.WaitAsync();
|
||||
try
|
||||
{
|
||||
if (CurrentPageSize > _walPageSize)
|
||||
{
|
||||
CurrentPageSize = 0;
|
||||
CurrentPage += 1;
|
||||
_walPagePath = Path.Combine(_walDirectory, CurrentPage.ToString(CultureInfo.InvariantCulture));
|
||||
}
|
||||
using var stream = _streamSource.GetStream(_walPagePath);
|
||||
await stream.WriteAsync(data);
|
||||
CurrentPageSize += data.LongLength;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Walwriting failed!");
|
||||
}
|
||||
_semaphore.Release();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,13 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
|
||||
namespace Sphagnum.Server.Storage.Users.Contracts
|
||||
{
|
||||
internal interface IAuthInfoStorage
|
||||
{
|
||||
public ValueTask<bool> CheckRights(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights, CancellationToken token = default);
|
||||
|
||||
public ValueTask AddUser(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights);
|
||||
|
||||
public ValueTask SetRights(Span<byte> hashedUsername, UserRights userRights);
|
||||
}
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Server.Storage.Users.Contracts;
|
||||
using System.Numerics;
|
||||
|
||||
namespace Sphagnum.Server.Storage.Users.Services
|
||||
{
|
||||
internal class AuthInfoStorageBase : IAuthInfoStorage
|
||||
{
|
||||
private readonly Vector<byte> RootUserLogin = new(HashCalculator.Calc("root"));
|
||||
private readonly Vector<byte> RootUserPassword = new(HashCalculator.Calc("root"));
|
||||
private readonly UserRights RootUserRights = UserRights.All;
|
||||
public ValueTask AddUser(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights)
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
public ValueTask<bool> CheckRights(Span<byte> hashedUsername, Span<byte> hashedPassword, UserRights userRights, CancellationToken token = default)
|
||||
{
|
||||
var username = new Vector<byte>(hashedUsername);
|
||||
var pwd = new Vector<byte>(hashedPassword);
|
||||
return ValueTask.FromResult(username == RootUserLogin && pwd == RootUserPassword && (userRights & RootUserRights) == userRights);
|
||||
}
|
||||
|
||||
public ValueTask SetRights(Span<byte> hashedUsername, UserRights userRights)
|
||||
{
|
||||
return ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
using System.Runtime.CompilerServices;
|
||||
|
||||
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]
|
|
@ -1,10 +1,10 @@
|
|||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests.Comparers
|
||||
{
|
||||
internal static class MessagesComparer
|
||||
{
|
||||
public static bool Compare(OutgoingMessage message1, OutgoingMessage message2)
|
||||
public static bool Compare(Message message1, Message message2)
|
||||
{
|
||||
var res = true;
|
||||
res &= message1.Exchange == message2.Exchange;
|
||||
|
@ -12,25 +12,19 @@ namespace Sphagnum.Common.UnitTests.Comparers
|
|||
res &= message1.RoutingKey.Part2 == message2.RoutingKey.Part2;
|
||||
res &= message1.RoutingKey.Part3 == message2.RoutingKey.Part3;
|
||||
res &= message1.Payload.Length == message2.Payload.Length;
|
||||
res &= message1.MessageId == message2.MessageId;
|
||||
res &= ComparePayloads(message1, message2);
|
||||
return res;
|
||||
}
|
||||
|
||||
public static bool Compare(IncommingMessage message1, IncommingMessage message2)
|
||||
{
|
||||
var res = true;
|
||||
res &= message1.MessageId == message2.MessageId;
|
||||
res &= ComparePayloads(message1.Payload.ToArray(), message2.Payload.ToArray());
|
||||
return res;
|
||||
}
|
||||
|
||||
public static bool ComparePayloads(OutgoingMessage message1, OutgoingMessage message2)
|
||||
public static bool ComparePayloads(Message message1, Message message2)
|
||||
{
|
||||
var payload1 = message1.Payload.ToArray();
|
||||
var payload2 = message2.Payload.ToArray();
|
||||
return ComparePayloads(payload1, payload2);
|
||||
}
|
||||
|
||||
|
||||
public static bool ComparePayloads(byte[] payload1, byte[] payload2)
|
||||
{
|
||||
var res = true;
|
||||
|
|
|
@ -1,17 +0,0 @@
|
|||
namespace Sphagnum.Common.UnitTests
|
||||
{
|
||||
public class ConnectionTests
|
||||
{
|
||||
[SetUp]
|
||||
public void Setup()
|
||||
{
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void Test1()
|
||||
{
|
||||
Assert.Pass();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,23 +1,24 @@
|
|||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
using System.Security.Cryptography;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests.DataGenerators
|
||||
{
|
||||
internal static class MessagesGenerator
|
||||
{
|
||||
public static OutgoingMessage GetRandomOutgoingMessage(bool emptyKey = false, bool emptyPayload = false)
|
||||
public static Message GetRandomMessage(Guid? messageId = null, bool emptyKey = false, bool emptyPayload = false)
|
||||
{
|
||||
var exchangeName = RandomNumberGenerator.GetInt32(10000000).ToString();
|
||||
var payload = !emptyPayload ? RandomNumberGenerator.GetBytes(RandomNumberGenerator.GetInt32(0, 1000)) : [];
|
||||
var routingKeysBytes = RandomNumberGenerator.GetBytes(3);
|
||||
return new OutgoingMessage(exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
|
||||
}
|
||||
|
||||
public static IncommingMessage GetRandoIncommingMessage(bool emptyPayload = false)
|
||||
{
|
||||
var payload = !emptyPayload ? RandomNumberGenerator.GetBytes(RandomNumberGenerator.GetInt32(0, 1000)) : [];
|
||||
return new IncommingMessage(Guid.NewGuid(), payload);
|
||||
if (messageId.HasValue)
|
||||
{
|
||||
return new Message(messageId.Value, exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
return new Message(exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Utils;
|
||||
using Sphagnum.Common.UnitTests.DataGenerators;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Utils.Models;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests
|
||||
{
|
||||
|
@ -12,11 +12,17 @@ namespace Sphagnum.Common.UnitTests
|
|||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandoIncommingMessage();
|
||||
var message = MessagesGenerator.GetRandomMessage();
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackIncomingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
|
||||
var message2 = MessageParser.UnpackMessage(bytes);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
|
||||
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
|
||||
Assert.Multiple(() =>
|
||||
{
|
||||
Assert.That(f1, Is.EqualTo(f2));
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
});
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
@ -27,31 +33,17 @@ namespace Sphagnum.Common.UnitTests
|
|||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandoIncommingMessage(true);
|
||||
var message = MessagesGenerator.GetRandomMessage(null, false, true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackIncomingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
Assert.IsTrue((MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2)) == MessageFlags.HasPayload);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessageGetMessageId_WithRoutingKeyAndPayload()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var id = Guid.NewGuid();
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage();
|
||||
var bytesForFlags = MessageParser.PackMessage(message);
|
||||
var flags = (MessageFlags)BitConverter.ToUInt16(bytesForFlags.AsSpan(5, 2));
|
||||
var bytes = MessageParser.Pack(message, id, flags, bytesForFlags.Length);
|
||||
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var id2 = MessageParser.GetMessageId(bytes);
|
||||
Assert.That(id, Is.EqualTo(id2));
|
||||
var message2 = MessageParser.UnpackMessage(bytes);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
|
||||
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
|
||||
Assert.Multiple(() =>
|
||||
{
|
||||
Assert.That(f1, Is.EqualTo(f2));
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
});
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
@ -63,95 +55,17 @@ namespace Sphagnum.Common.UnitTests
|
|||
while (count < 100)
|
||||
{
|
||||
var id = Guid.NewGuid();
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage(true, true);
|
||||
var bytesForFlags = MessageParser.PackMessage(message);
|
||||
var flags = (MessageFlags)BitConverter.ToUInt16(bytesForFlags.AsSpan(5, 2));
|
||||
var bytes = MessageParser.Pack(message, id, flags, bytesForFlags.Length);
|
||||
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var id2 = MessageParser.GetMessageId(bytes);
|
||||
Assert.That(id, Is.EqualTo(id2));
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessage_WithRoutingKeyAndPayload()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage();
|
||||
var message = MessagesGenerator.GetRandomMessage(null, true, true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var message2 = MessageParser.UnpackMessage(bytes);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
var bytes3 = MessageParser.PackMessage(message2);
|
||||
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessage_WithRoutingKeyAndEmptyPayload()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage(false, true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
var bytes3 = MessageParser.PackMessage(message2);
|
||||
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessage_WithEmptyRoutingKey()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage(true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
var bytes3 = MessageParser.PackMessage(message2);
|
||||
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessage_WithEmptyRoutingKeyAndEmptyPayload()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage(true, true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
var bytes3 = MessageParser.PackMessage(message2);
|
||||
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
|
||||
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
|
||||
Assert.Multiple(() =>
|
||||
{
|
||||
Assert.That(f1, Is.EqualTo(f2));
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
});
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,23 +1,33 @@
|
|||
using Sphagnum.Common.Contracts.Infrastructure;
|
||||
using System.Collections.Concurrent;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Channels;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests.Services
|
||||
{
|
||||
/// <summary>
|
||||
/// Класс имитирующий передачу данных через сокет.
|
||||
/// </summary>
|
||||
internal class TestConnection : IConnection
|
||||
{
|
||||
private readonly ConcurrentQueue<byte[]> _queue = new();
|
||||
public bool Connected => true;
|
||||
public CancellationTokenSource CancellationTokenSource { get; set; } = new CancellationTokenSource();
|
||||
public Guid ConnectionId { get; private set; }
|
||||
|
||||
public IConnection Accept()
|
||||
public bool Connected { get; set; }
|
||||
|
||||
public event Action<Guid>? ConnectionClosed;
|
||||
|
||||
public Channel<byte[]> _channel = Channel.CreateUnbounded<byte[]>();
|
||||
public Channel<TestConnection> _newConnectionsChannel = Channel.CreateUnbounded<TestConnection>();
|
||||
|
||||
public TestConnection(Guid id)
|
||||
{
|
||||
return new TestConnection();
|
||||
ConnectionId = id;
|
||||
}
|
||||
|
||||
public Task<IConnection> AcceptAsync()
|
||||
public TestConnection()
|
||||
{
|
||||
return Task.FromResult<IConnection>(new TestConnection());
|
||||
ConnectionId = Guid.NewGuid();
|
||||
}
|
||||
|
||||
public void Bind(EndPoint endPoint)
|
||||
|
@ -26,17 +36,21 @@ namespace Sphagnum.Common.UnitTests.Services
|
|||
|
||||
public void Close()
|
||||
{
|
||||
|
||||
Connected = false;
|
||||
CancellationTokenSource.Cancel();
|
||||
ConnectionClosed?.Invoke(ConnectionId);
|
||||
}
|
||||
|
||||
public Task ConnectAsync(string host, int port)
|
||||
{
|
||||
Connected = true;
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
|
||||
CancellationTokenSource.Cancel();
|
||||
ConnectionClosed?.Invoke(ConnectionId);
|
||||
}
|
||||
|
||||
public void Listen(int backlog)
|
||||
|
@ -46,37 +60,50 @@ namespace Sphagnum.Common.UnitTests.Services
|
|||
|
||||
public async ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var res = new byte[buffer.Length];
|
||||
await Receive(res, socketFlags, cancellationToken);
|
||||
res.CopyTo(buffer);
|
||||
return res.Length;
|
||||
var canRead = await _channel.Reader.WaitToReadAsync(cancellationToken);
|
||||
if (canRead)
|
||||
{
|
||||
if (socketFlags == SocketFlags.Peek)
|
||||
{
|
||||
if (_channel.Reader.TryPeek(out var data))
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < buffer.Length && i < data.Length; i++)
|
||||
{
|
||||
buffer.Span[i] = data[i];
|
||||
}
|
||||
return i;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
var data = await _channel.Reader.ReadAsync(cancellationToken);
|
||||
for (var i = 0; i < buffer.Length && i < data.Length; i++)
|
||||
{
|
||||
buffer.Span[i] = data[i];
|
||||
}
|
||||
return data.Length;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
|
||||
public async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
|
||||
{
|
||||
_queue.Enqueue(buffer.Span.ToArray());
|
||||
return ValueTask.FromResult(buffer.Length);
|
||||
var data = new byte[buffer.Length];
|
||||
buffer.CopyTo(data);
|
||||
await _channel.Writer.WriteAsync(data, cancellationToken);
|
||||
return data.Length;
|
||||
}
|
||||
|
||||
private async ValueTask<int> Receive(byte[] buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default, int counter = 0)
|
||||
public async Task<IConnection> AcceptAsync()
|
||||
{
|
||||
if (counter > 200)
|
||||
{
|
||||
throw new TimeoutException();
|
||||
}
|
||||
return await _newConnectionsChannel.Reader.ReadAsync();
|
||||
}
|
||||
|
||||
if (socketFlags == SocketFlags.Peek ? _queue.TryPeek(out byte[]? result) : _queue.TryDequeue(out result))
|
||||
{
|
||||
result.CopyTo(buffer, 0);
|
||||
return result.Length;
|
||||
}
|
||||
else
|
||||
{
|
||||
await Task.Delay(100, cancellationToken);
|
||||
counter++;
|
||||
await Receive(buffer, socketFlags, cancellationToken, counter);
|
||||
}
|
||||
throw new TimeoutException();
|
||||
internal async Task AddInputConnection(TestConnection connection)
|
||||
{
|
||||
await _newConnectionsChannel.Writer.WriteAsync(connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,18 +1,19 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Services;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Infrastructure.Services;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests.Services
|
||||
{
|
||||
internal class TestConnectionFactory : ConnectionFactory
|
||||
{
|
||||
internal override SphagnumConnection CreateDefault(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
public IConnection? CurrentConnection { get; private set; }
|
||||
internal override Task<IConnection> CreateConnection(bool connected = true)
|
||||
{
|
||||
return new SphagnumConnection(() => new TestConnection(), messagesProcessorFactory);
|
||||
return Task.FromResult(CurrentConnection ?? (IConnection)(new TestConnection()));
|
||||
}
|
||||
|
||||
internal override Task<SphagnumConnection> CreateDefaultConnected(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
public void SetCurrentConnection(IConnection? connection)
|
||||
{
|
||||
return Task.FromResult(new SphagnumConnection(() => new TestConnection(), messagesProcessorFactory));
|
||||
CurrentConnection = connection;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="AutoFixture" Version="4.18.1" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
|
||||
<PackageReference Include="NUnit" Version="3.13.3" />
|
||||
<PackageReference Include="NUnit3TestAdapter" Version="4.2.1" />
|
||||
|
|
|
@ -1,158 +1,163 @@
|
|||
//using Sphagnum.Common.UnitTests.Services;
|
||||
//using System;
|
||||
//using System.Collections.Generic;
|
||||
//using System.Linq;
|
||||
//using System.Text;
|
||||
//using System.Threading.Tasks;
|
||||
using AutoFixture;
|
||||
using Sphagnum.Common.Infrastructure.Extensions;
|
||||
using Sphagnum.Common.UnitTests.Services;
|
||||
using System.Security.Cryptography;
|
||||
|
||||
//namespace Sphagnum.Common.UnitTests
|
||||
//{
|
||||
// public class TestConnectionTests
|
||||
// {
|
||||
// [Test]
|
||||
// public async Task SendRecieve8BytesWithBufferSize10()
|
||||
// {
|
||||
// var connection = new TestConnection
|
||||
// {
|
||||
// BufferSize = 10
|
||||
// };
|
||||
// var data = new byte[8];
|
||||
// Array.Fill<byte>(data, 1);
|
||||
// _ = await connection.SendAsync(data);
|
||||
// var forResult = new List<byte>();
|
||||
// for (int i = 0; i < 1; i++)
|
||||
// {
|
||||
// var buffer = new byte[connection.BufferSize];
|
||||
// _ = await connection.ReceiveAsync(buffer);
|
||||
// for (int j = 0; j < buffer.Length; j++)
|
||||
// {
|
||||
// var el = buffer[j];
|
||||
// if (el == 1)
|
||||
// {
|
||||
// forResult.Add(el);
|
||||
// }
|
||||
// }
|
||||
// Array.Clear(buffer);
|
||||
// }
|
||||
namespace Sphagnum.Common.UnitTests
|
||||
{
|
||||
public class TestConnectionTests
|
||||
{
|
||||
|
||||
// Assert.IsTrue(data.Length == forResult.Count);
|
||||
// }
|
||||
[Test]
|
||||
public async Task IsNewConnectionCretesCorrect()
|
||||
{
|
||||
var connection = new TestConnection();
|
||||
var connection2 = new TestConnection();
|
||||
var connectionTask = connection.AcceptAsync();
|
||||
|
||||
// [Test]
|
||||
// public async Task SendRecieve10BytesWithBufferSize10()
|
||||
// {
|
||||
// var connection = new TestConnection
|
||||
// {
|
||||
// BufferSize = 10
|
||||
// };
|
||||
// var data = new byte[10];
|
||||
// Array.Fill<byte>(data, 1);
|
||||
// _ = await connection.SendAsync(data);
|
||||
// var forResult = new List<byte>();
|
||||
// for (int i = 0; i < 1; i++)
|
||||
// {
|
||||
// var buffer = new byte[connection.BufferSize];
|
||||
// _ = await connection.ReceiveAsync(buffer);
|
||||
// for (int j = 0; j < buffer.Length; j++)
|
||||
// {
|
||||
// var el = buffer[j];
|
||||
// if (el == 1)
|
||||
// {
|
||||
// forResult.Add(el);
|
||||
// }
|
||||
// }
|
||||
// Array.Clear(buffer);
|
||||
// }
|
||||
await Task.WhenAll(connection.AddInputConnection(connection2), connectionTask);
|
||||
Assert.That(connection2.ConnectionId, Is.EqualTo(connectionTask.Result.ConnectionId));
|
||||
}
|
||||
|
||||
// Assert.IsTrue(data.Length == forResult.Count);
|
||||
// }
|
||||
/// <summary>
|
||||
/// Ïðîâåðÿåò ñöåíàðèé, êîãäà íåñêîëüêî ðàç ïîäðÿä ÷èòàþòñÿ äàííûå
|
||||
/// ñ ôëàãîì peek - òî åñòü ãëÿíóòü ÷òî ëåæèò íà ïîâåðõíîñòè, íå âû÷èòûâàÿ.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
[Test]
|
||||
public async Task SendRecieveData_WithPeekSocketFlag_MultipleReading()
|
||||
{
|
||||
var connection = new TestConnection();
|
||||
var buffer1 = new byte[5];
|
||||
var recievingTask = connection.ReceiveAsync(buffer1, System.Net.Sockets.SocketFlags.Peek);
|
||||
var fix = new Fixture();
|
||||
var data = fix.CreateMany<byte>(11).ToArray();
|
||||
var sendingTask = connection.SendAsync(data, System.Net.Sockets.SocketFlags.None);
|
||||
await Task.WhenAll(sendingTask.AsTask(), recievingTask.AsTask());
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
Assert.That(buffer1[i] == data[i]);
|
||||
}
|
||||
Assert.That(recievingTask.Result == 5);
|
||||
var buffer2 = new byte[5];
|
||||
var count = await connection.ReceiveAsync(buffer2, System.Net.Sockets.SocketFlags.Peek);
|
||||
Assert.That(recievingTask.Result == count);
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
Assert.That(buffer2[i] == data[i]);
|
||||
}
|
||||
|
||||
// [Test]
|
||||
// public async Task SendRecieve11BytesWithBufferSize10()
|
||||
// {
|
||||
// var connection = new TestConnection
|
||||
// {
|
||||
// BufferSize = 10
|
||||
// };
|
||||
// var data = new byte[11];
|
||||
// Array.Fill<byte>(data, 1);
|
||||
// _ = await connection.SendAsync(data);
|
||||
// var forResult = new List<byte>();
|
||||
// for (int i = 0; i < 2; i++)
|
||||
// {
|
||||
// var buffer = new byte[connection.BufferSize];
|
||||
// _ = await connection.ReceiveAsync(buffer);
|
||||
// for (int j = 0; j < buffer.Length; j++)
|
||||
// {
|
||||
// var el = buffer[j];
|
||||
// if (el==1)
|
||||
// {
|
||||
// forResult.Add(el);
|
||||
// }
|
||||
// }
|
||||
// Array.Clear(buffer);
|
||||
// }
|
||||
var buffer3 = new byte[5];
|
||||
var count3 = await connection.ReceiveAsync(buffer3, System.Net.Sockets.SocketFlags.Peek);
|
||||
Assert.That(recievingTask.Result == count3);
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
Assert.That(buffer3[i] == data[i]);
|
||||
}
|
||||
}
|
||||
|
||||
// Assert.IsTrue(data.Length == forResult.Count);
|
||||
// }
|
||||
/// <summary>
|
||||
/// Ïðîâåðÿåò ñöåíàðèé, êîãäà íåñêîëüêî ðàç ïîäðÿä ÷èòàþòñÿ äàííûå
|
||||
/// ñ ôëàãîì peek - òî åñòü ãëÿíóòü ÷òî ëåæèò íà ïîâåðõíîñòè, íå âû÷èòûâàÿ. Ïðè ýòîì, â ïðîöåññå ðàáîòû ïðîäîëæàþò ïèñàòüñÿ íîâûå äàííûå.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
[Test]
|
||||
public async Task SendRecieveData_WithPeekSocketFlag_MultipleReadingWithWriting()
|
||||
{
|
||||
var connection = new TestConnection();
|
||||
var buffer1 = new byte[5];
|
||||
var recievingTask = connection.ReceiveAsync(buffer1, System.Net.Sockets.SocketFlags.Peek);
|
||||
var fix = new Fixture();
|
||||
var data = fix.CreateMany<byte>(RandomNumberGenerator.GetInt32(5, 100)).ToArray();
|
||||
var sendingTask = connection.SendAsync(data, System.Net.Sockets.SocketFlags.None);
|
||||
await Task.WhenAll(sendingTask.AsTask(), recievingTask.AsTask());
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
Assert.That(buffer1[i] == data[i]);
|
||||
}
|
||||
Assert.That(recievingTask.Result == 5);
|
||||
var buffer2 = new byte[5];
|
||||
var count = await connection.ReceiveAsync(buffer2, System.Net.Sockets.SocketFlags.Peek);
|
||||
Assert.That(recievingTask.Result == count);
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
Assert.That(buffer2[i] == data[i]);
|
||||
}
|
||||
|
||||
// [Test]
|
||||
// public async Task SendRecieve31BytesWithBufferSize10()
|
||||
// {
|
||||
// var connection = new TestConnection
|
||||
// {
|
||||
// BufferSize = 10
|
||||
// };
|
||||
// var data = new byte[31];
|
||||
// Array.Fill<byte>(data, 1);
|
||||
// _ = await connection.SendAsync(data);
|
||||
// var forResult = new List<byte>();
|
||||
// for (int i = 0; i < 4; i++)
|
||||
// {
|
||||
// var buffer = new byte[connection.BufferSize];
|
||||
// _ = await connection.ReceiveAsync(buffer);
|
||||
// for (int j = 0; j < buffer.Length; j++)
|
||||
// {
|
||||
// var el = buffer[j];
|
||||
// if (el == 1)
|
||||
// {
|
||||
// forResult.Add(el);
|
||||
// }
|
||||
// }
|
||||
// Array.Clear(buffer);
|
||||
// }
|
||||
for (int c = 0; c < 20; c++)
|
||||
{
|
||||
var data2 = fix.CreateMany<byte>(RandomNumberGenerator.GetInt32(5, 100)).ToArray();
|
||||
var sendingCount = await connection.SendAsync(data2, System.Net.Sockets.SocketFlags.None);
|
||||
var buffer3 = new byte[5];
|
||||
var count3 = await connection.ReceiveAsync(buffer3, System.Net.Sockets.SocketFlags.Peek);
|
||||
Assert.That(recievingTask.Result == count3);
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
Assert.That(buffer3[i] == data[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Assert.IsTrue(data.Length == forResult.Count);
|
||||
// }
|
||||
/// <summary>
|
||||
/// Ïðîâåðÿåò ñöåíàðèé, ÷òåíèÿ-îòïðàâëåíèÿ äàííûõ.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
[Test]
|
||||
public async Task SendRecieveData_WithNoneSocketFlag()
|
||||
{
|
||||
var fix = new Fixture();
|
||||
|
||||
// [Test]
|
||||
// public async Task SendRecieve30BytesWithBufferSize10()
|
||||
// {
|
||||
// var connection = new TestConnection
|
||||
// {
|
||||
// BufferSize = 10
|
||||
// };
|
||||
// var data = new byte[30];
|
||||
// Array.Fill<byte>(data, 1);
|
||||
// _ = await connection.SendAsync(data);
|
||||
// var forResult = new List<byte>();
|
||||
// for (int i = 0; i < 3; i++)
|
||||
// {
|
||||
// var buffer = new byte[connection.BufferSize];
|
||||
// _ = await connection.ReceiveAsync(buffer);
|
||||
// for (int j = 0; j < buffer.Length; j++)
|
||||
// {
|
||||
// var el = buffer[j];
|
||||
// if (el == 1)
|
||||
// {
|
||||
// forResult.Add(el);
|
||||
// }
|
||||
// }
|
||||
// Array.Clear(buffer);
|
||||
// }
|
||||
var connection = new TestConnection();
|
||||
|
||||
// Assert.IsTrue(data.Length == forResult.Count);
|
||||
// }
|
||||
// }
|
||||
for (int c = 0; c < 20; c++)
|
||||
{
|
||||
int dataSize = RandomNumberGenerator.GetInt32(5, 100);
|
||||
var buffer1 = new byte[dataSize];
|
||||
var recievingTask = connection.ReceiveAsync(buffer1, System.Net.Sockets.SocketFlags.None);
|
||||
var data = fix.CreateMany<byte>(dataSize).ToArray();
|
||||
|
||||
//}
|
||||
|
||||
var sendingTask = connection.SendAsync(data, System.Net.Sockets.SocketFlags.None);
|
||||
|
||||
await Task.WhenAll(sendingTask.AsTask(), recievingTask.AsTask());
|
||||
for (int i = 0; i < dataSize; i++)
|
||||
{
|
||||
Assert.That(buffer1[i] == data[i]);
|
||||
}
|
||||
Assert.IsTrue(recievingTask.Result == sendingTask.Result);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Ïðîâåðÿåò ñöåíàðèé, ÷òåíèÿ-îòïðàâëåíèÿ äàííûõ c ïîìîùüþ ìåòîäà ðàñøèðåíèÿ.
|
||||
/// </summary>
|
||||
/// <returns></returns>
|
||||
[Test]
|
||||
public async Task SendRecieveData_WithExtensionMethod()
|
||||
{
|
||||
var fix = new Fixture();
|
||||
|
||||
var connection = new TestConnection();
|
||||
|
||||
for (int c = 0; c < 20; c++)
|
||||
{
|
||||
int dataSize = RandomNumberGenerator.GetInt32(5, 100);
|
||||
|
||||
var recievingTask = connection.ReceiveAsync(CancellationToken.None);
|
||||
var payload = fix.CreateMany<byte>(dataSize).ToArray();
|
||||
var length = BitConverter.GetBytes(dataSize);
|
||||
var data = new byte[dataSize + 4];
|
||||
length.CopyTo(data, 0);
|
||||
payload.CopyTo(data, 4);
|
||||
|
||||
var sendingTask = connection.SendAsync(data, System.Net.Sockets.SocketFlags.None);
|
||||
|
||||
await Task.WhenAll(sendingTask.AsTask(), recievingTask.AsTask());
|
||||
for (int i = 0; i < dataSize; i++)
|
||||
{
|
||||
Assert.That(recievingTask.Result[i] == data[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
|
||||
<IsPackable>false</IsPackable>
|
||||
<IsTestProject>true</IsTestProject>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="coverlet.collector" Version="6.0.0" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
|
||||
<PackageReference Include="NUnit" Version="3.14.0" />
|
||||
<PackageReference Include="NUnit.Analyzers" Version="3.9.0" />
|
||||
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\..\src\Sphagnum.Client\Sphagnum.Client.csproj" />
|
||||
<ProjectReference Include="..\..\src\Sphagnum.Server\Sphagnum.Server.csproj" />
|
||||
<ProjectReference Include="..\Sphagnum.Common.UnitTests\Sphagnum.Common.UnitTests.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Using Include="NUnit.Framework" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
|
@ -0,0 +1,48 @@
|
|||
using AutoFixture;
|
||||
using Sphagnum.Client;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.UnitTests.Services;
|
||||
using Sphagnum.Server.Broker.Services;
|
||||
|
||||
namespace Sphagnum.FuncTests
|
||||
{
|
||||
public class Tests
|
||||
{
|
||||
[SetUp]
|
||||
public void Setup()
|
||||
{
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task Test1()
|
||||
{
|
||||
var fixture = new Fixture();
|
||||
var factory = new TestConnectionFactory();
|
||||
|
||||
var manager = new ConnectionsManager();
|
||||
var processor = new MessagesProcessor();
|
||||
var mainConnection = await factory.CreateConnection(false) as TestConnection;
|
||||
Assert.IsNotNull(mainConnection);
|
||||
factory.SetCurrentConnection(mainConnection);
|
||||
|
||||
|
||||
var server = new ConnectionsReciever(factory, manager, processor);
|
||||
await server.StartAsync(CancellationToken.None);
|
||||
|
||||
factory.SetCurrentConnection(null);
|
||||
var connection1 = await factory.CreateConnection(true) as TestConnection;
|
||||
Assert.IsNotNull(connection1);
|
||||
|
||||
factory.SetCurrentConnection(connection1);
|
||||
|
||||
var client = new ClientDefault(factory);
|
||||
await mainConnection.AddInputConnection(connection1);
|
||||
var data = fixture.CreateMany<byte>(11).ToArray();
|
||||
|
||||
await Task.Delay(100);
|
||||
await client.Publish(new Common.Messaging.Contracts.Messages.Message("default", RoutingKey.Empty, data));
|
||||
Assert.Pass();
|
||||
await Task.Delay(10000);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue