From 889934c51bc1d83dbd0bbe4e391417b9b6dea7ae Mon Sep 17 00:00:00 2001 From: vlad zverzhkhovskiy Date: Fri, 15 Aug 2025 09:22:38 +0300 Subject: [PATCH] =?UTF-8?q?=D1=84=D0=B8=D0=BA=D1=81=D0=B0=D1=86=D1=82?= =?UTF-8?q?=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- development/Sphagnum.DebugService/Program.cs | 1 - development/SphagnumDebug.sln | 12 ++-- src/Sphagnum.Client/ClientDefault.cs | 14 +---- src/Sphagnum.Common/AssemblySettings.cs | 2 + .../Extensions/IConnectionExtensions.cs | 2 +- .../Messaging/Utils/MessageParser.cs | 1 + src/Sphagnum.Server/AssemblySettings.cs | 2 +- .../Broker/Services/BrokerService.cs | 26 +++++++++ .../Broker/Services/ConnectionsManager.cs | 20 ++----- .../Broker/Services/ConnectionsReciever.cs | 56 ------------------- .../Broker/Services/MessagesBus.cs | 12 ++++ .../Broker/Services/MessagesProcessor.cs | 15 +++-- .../Broker/Services/Reciever.cs | 54 ++++++++++++++++++ .../Scheme/Contracts/IExchange.cs | 9 +++ src/Sphagnum.Server/Scheme/SchemaManager.cs | 7 +++ src/Sphagnum.Server/Sphagnum.Server.csproj | 4 ++ .../Storage/Contracts/StorageConfig.cs | 8 +++ .../Wal/Interfaces/IWalCursorManager.cs | 8 +++ .../Wal/Interfaces/IWalStreamSource.cs | 7 +++ .../Contracts/Wal/Interfaces/IWalWriter.cs | 7 +++ .../Storage/Contracts/Wal/WalCursor.cs | 14 +++++ .../Storage/Services/Wal/WalCursorManager.cs | 47 ++++++++++++++++ .../Storage/Services/Wal/WalStreamSource.cs | 13 +++++ .../Storage/Services/Wal/WalWriterDefault.cs | 54 ++++++++++++++++++ .../AssemblySettings.cs | 3 + .../Services/TestConnection.cs | 6 +- .../Services/TestConnectionFactory.cs | 19 +++++++ .../Sphagnum.Common.UnitTests.csproj | 1 + .../TestConnectionTests.cs | 13 +++-- .../Sphagnum.FuncTests.csproj | 7 ++- tests/Sphagnum.FuncTests/UnitTest1.cs | 48 ++++++++++++++++ 31 files changed, 383 insertions(+), 109 deletions(-) rename src/Sphagnum.Common/{Messaging => Infrastructure}/Extensions/IConnectionExtensions.cs (90%) create mode 100644 src/Sphagnum.Server/Broker/Services/BrokerService.cs delete mode 100644 src/Sphagnum.Server/Broker/Services/ConnectionsReciever.cs create mode 100644 src/Sphagnum.Server/Broker/Services/MessagesBus.cs create mode 100644 src/Sphagnum.Server/Broker/Services/Reciever.cs create mode 100644 src/Sphagnum.Server/Scheme/Contracts/IExchange.cs create mode 100644 src/Sphagnum.Server/Scheme/SchemaManager.cs create mode 100644 src/Sphagnum.Server/Storage/Contracts/StorageConfig.cs create mode 100644 src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalCursorManager.cs create mode 100644 src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalStreamSource.cs create mode 100644 src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalWriter.cs create mode 100644 src/Sphagnum.Server/Storage/Contracts/Wal/WalCursor.cs create mode 100644 src/Sphagnum.Server/Storage/Services/Wal/WalCursorManager.cs create mode 100644 src/Sphagnum.Server/Storage/Services/Wal/WalStreamSource.cs create mode 100644 src/Sphagnum.Server/Storage/Services/Wal/WalWriterDefault.cs create mode 100644 tests/Sphagnum.Common.UnitTests/AssemblySettings.cs rename {Sphagnum.Server.Tests => tests/Sphagnum.Common.UnitTests}/Services/TestConnection.cs (91%) create mode 100644 tests/Sphagnum.Common.UnitTests/Services/TestConnectionFactory.cs rename {Sphagnum.Server.Tests => tests/Sphagnum.Common.UnitTests}/TestConnectionTests.cs (92%) rename Sphagnum.Server.Tests/Sphagnum.Server.Tests.csproj => tests/Sphagnum.FuncTests/Sphagnum.FuncTests.csproj (68%) create mode 100644 tests/Sphagnum.FuncTests/UnitTest1.cs diff --git a/development/Sphagnum.DebugService/Program.cs b/development/Sphagnum.DebugService/Program.cs index d8d5456..e1f1024 100644 --- a/development/Sphagnum.DebugService/Program.cs +++ b/development/Sphagnum.DebugService/Program.cs @@ -9,7 +9,6 @@ builder.Services.AddSingleton(new ConnectionFactory() }); builder.Services.AddSingleton(); builder.Services.AddSingleton(); -builder.Services.AddSingleton(); builder.Services.AddHostedService(); diff --git a/development/SphagnumDebug.sln b/development/SphagnumDebug.sln index e4d4586..a1f20b4 100644 --- a/development/SphagnumDebug.sln +++ b/development/SphagnumDebug.sln @@ -23,7 +23,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sphagnum.Common.UnitTests", EndProject Project("{E53339B2-1760-4266-BCC7-CA923CBCF16C}") = "docker-compose", "docker-compose.dcproj", "{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sphagnum.Server.Tests", "..\Sphagnum.Server.Tests\Sphagnum.Server.Tests.csproj", "{134442E7-CA62-4BE4-B7EB-63847385A98C}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sphagnum.FuncTests", "..\tests\Sphagnum.FuncTests\Sphagnum.FuncTests.csproj", "{C160C372-5E5E-5442-462B-A5B5C36BE2D0}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -59,10 +59,10 @@ Global {BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Debug|Any CPU.Build.0 = Debug|Any CPU {BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Release|Any CPU.ActiveCfg = Release|Any CPU {BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Release|Any CPU.Build.0 = Release|Any CPU - {134442E7-CA62-4BE4-B7EB-63847385A98C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {134442E7-CA62-4BE4-B7EB-63847385A98C}.Debug|Any CPU.Build.0 = Debug|Any CPU - {134442E7-CA62-4BE4-B7EB-63847385A98C}.Release|Any CPU.ActiveCfg = Release|Any CPU - {134442E7-CA62-4BE4-B7EB-63847385A98C}.Release|Any CPU.Build.0 = Release|Any CPU + {C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -74,7 +74,7 @@ Global {C344FDF3-3D4D-4BC6-B257-4AD907BE1BE4} = {71CDAFF8-7C3D-4915-8FAA-5C263857AD62} {77F76F27-D883-4392-90D5-8F441043F468} = {71CDAFF8-7C3D-4915-8FAA-5C263857AD62} {25865911-1F2D-4083-A99C-C65E13F05C14} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750} - {134442E7-CA62-4BE4-B7EB-63847385A98C} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750} + {C160C372-5E5E-5442-462B-A5B5C36BE2D0} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {42DA8F91-1667-49BC-971D-D83C0870393A} diff --git a/src/Sphagnum.Client/ClientDefault.cs b/src/Sphagnum.Client/ClientDefault.cs index 7940d66..ecd1f49 100644 --- a/src/Sphagnum.Client/ClientDefault.cs +++ b/src/Sphagnum.Client/ClientDefault.cs @@ -1,8 +1,8 @@ using Sphagnum.Common.Infrastructure.Contracts; +using Sphagnum.Common.Infrastructure.Extensions; using Sphagnum.Common.Infrastructure.Services; using Sphagnum.Common.Messaging.Contracts; using Sphagnum.Common.Messaging.Contracts.Messages; -using Sphagnum.Common.Messaging.Extensions; using Sphagnum.Common.Messaging.Utils; using System; using System.Threading; @@ -36,18 +36,6 @@ namespace Sphagnum.Client } } - //private async Task Auth() - //{ - // await _connection.SendAsync(MessageParser.PackMessage(new AuthMessage(_connectionFactory.Login, _connectionFactory.Password, _connectionFactory.UserRights))); - // var response = await ReceiveAsync(); - // var messageType = MessageParser.GetMessageType(response); - // if (messageType == MessageType.AuthSuccessfull) - // { - // return; - // } - // throw new Exception("Auth failed!"); - //} - public async ValueTask Ack(Guid messageId) { await Task.Delay(1000); diff --git a/src/Sphagnum.Common/AssemblySettings.cs b/src/Sphagnum.Common/AssemblySettings.cs index 130e54c..6fcb377 100644 --- a/src/Sphagnum.Common/AssemblySettings.cs +++ b/src/Sphagnum.Common/AssemblySettings.cs @@ -3,5 +3,7 @@ //Тесты [assembly: InternalsVisibleTo("Sphagnum.Common.UnitTests")] [assembly: InternalsVisibleTo("Sphagnum.Server.Tests")] +[assembly: InternalsVisibleTo("Sphagnum.FuncTests")] + [assembly: InternalsVisibleTo("Sphagnum.Client")] [assembly: InternalsVisibleTo("Sphagnum.Server")] \ No newline at end of file diff --git a/src/Sphagnum.Common/Messaging/Extensions/IConnectionExtensions.cs b/src/Sphagnum.Common/Infrastructure/Extensions/IConnectionExtensions.cs similarity index 90% rename from src/Sphagnum.Common/Messaging/Extensions/IConnectionExtensions.cs rename to src/Sphagnum.Common/Infrastructure/Extensions/IConnectionExtensions.cs index 4a1d058..75e00b4 100644 --- a/src/Sphagnum.Common/Messaging/Extensions/IConnectionExtensions.cs +++ b/src/Sphagnum.Common/Infrastructure/Extensions/IConnectionExtensions.cs @@ -4,7 +4,7 @@ using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; -namespace Sphagnum.Common.Messaging.Extensions +namespace Sphagnum.Common.Infrastructure.Extensions { internal static class IConnectionExtensions { diff --git a/src/Sphagnum.Common/Messaging/Utils/MessageParser.cs b/src/Sphagnum.Common/Messaging/Utils/MessageParser.cs index b775a35..6f0027c 100644 --- a/src/Sphagnum.Common/Messaging/Utils/MessageParser.cs +++ b/src/Sphagnum.Common/Messaging/Utils/MessageParser.cs @@ -32,6 +32,7 @@ namespace Sphagnum.Common.Messaging.Utils var id = GetMessageId(bytes); return new Message(id, exchangeName, routingKey, payload); } + public static byte[] PackMessage(Message message) { if (string.IsNullOrEmpty(message.Exchange) || string.IsNullOrWhiteSpace(message.Exchange)) diff --git a/src/Sphagnum.Server/AssemblySettings.cs b/src/Sphagnum.Server/AssemblySettings.cs index a455e20..2ca5276 100644 --- a/src/Sphagnum.Server/AssemblySettings.cs +++ b/src/Sphagnum.Server/AssemblySettings.cs @@ -1,3 +1,3 @@ using System.Runtime.CompilerServices; -[assembly: InternalsVisibleTo("Sphagnum.Server.FuncTests")] \ No newline at end of file +[assembly: InternalsVisibleTo("Sphagnum.FuncTests")] \ No newline at end of file diff --git a/src/Sphagnum.Server/Broker/Services/BrokerService.cs b/src/Sphagnum.Server/Broker/Services/BrokerService.cs new file mode 100644 index 0000000..6d97058 --- /dev/null +++ b/src/Sphagnum.Server/Broker/Services/BrokerService.cs @@ -0,0 +1,26 @@ +using Microsoft.Extensions.Hosting; +using Sphagnum.Common.Infrastructure.Contracts; +using Sphagnum.Common.Infrastructure.Services; +using System.Net; + +namespace Sphagnum.Server.Broker.Services +{ + public class BrokerService : IHostedService + { + private readonly ConnectionsManager _manager; + public BrokerService(ConnectionsManager manager) + { + _manager = manager; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} diff --git a/src/Sphagnum.Server/Broker/Services/ConnectionsManager.cs b/src/Sphagnum.Server/Broker/Services/ConnectionsManager.cs index 3dc64f1..5162df7 100644 --- a/src/Sphagnum.Server/Broker/Services/ConnectionsManager.cs +++ b/src/Sphagnum.Server/Broker/Services/ConnectionsManager.cs @@ -1,22 +1,12 @@ -using Sphagnum.Common.Infrastructure.Contracts; -using System.Collections.Concurrent; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; namespace Sphagnum.Server.Broker.Services { public class ConnectionsManager { - private readonly ConcurrentDictionary Connections = new(); - public ConnectionsManager() - { - - } - - internal void AddConnection(IConnection connection) - { - connection.ConnectionClosed += (id) => - { - Connections.TryRemove(id, out var conn); - }; - } } } diff --git a/src/Sphagnum.Server/Broker/Services/ConnectionsReciever.cs b/src/Sphagnum.Server/Broker/Services/ConnectionsReciever.cs deleted file mode 100644 index 80b74fd..0000000 --- a/src/Sphagnum.Server/Broker/Services/ConnectionsReciever.cs +++ /dev/null @@ -1,56 +0,0 @@ -using Microsoft.Extensions.Hosting; -using Sphagnum.Common.Infrastructure.Contracts; -using Sphagnum.Common.Infrastructure.Services; -using System.Net; - -namespace Sphagnum.Server.Broker.Services -{ - public class ConnectionsReciever : IHostedService - { - private readonly int _port; - private readonly CancellationTokenSource _cts = new(); - private readonly IConnection _connection; - private readonly ConnectionsManager _manager; - private readonly MessagesProcessor _processor; - public ConnectionsReciever(ConnectionFactory connectionFactory, ConnectionsManager manager, MessagesProcessor processor) - { - _port = connectionFactory.Port; - _manager = manager; - _connection = connectionFactory.CreateConnection(false).Result; - _processor = processor; - } - - - public Task StopAsync() - { - _cts.Cancel(); - return Task.CompletedTask; - } - - private async Task AcceptationWorker() - { - - while (!_cts.IsCancellationRequested) - { - var acceptedSocket = await _connection.AcceptAsync(); - _manager.AddConnection(acceptedSocket); - _processor.AddConnection(acceptedSocket); - } - } - - public Task StartAsync(CancellationToken cancellationToken) - { - _connection.Bind(new IPEndPoint(IPAddress.Any, _port)); - _connection?.Listen(1000); //todo разобраться что делает этот параметр. - var _ = AcceptationWorker(); - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) - { - _connection.Close(); - _cts.Cancel(); - return Task.CompletedTask; - } - } -} diff --git a/src/Sphagnum.Server/Broker/Services/MessagesBus.cs b/src/Sphagnum.Server/Broker/Services/MessagesBus.cs new file mode 100644 index 0000000..d0d5543 --- /dev/null +++ b/src/Sphagnum.Server/Broker/Services/MessagesBus.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Sphagnum.Server.Broker.Services +{ + internal class MessagesBus + { + } +} diff --git a/src/Sphagnum.Server/Broker/Services/MessagesProcessor.cs b/src/Sphagnum.Server/Broker/Services/MessagesProcessor.cs index 8a05ea1..a41bb99 100644 --- a/src/Sphagnum.Server/Broker/Services/MessagesProcessor.cs +++ b/src/Sphagnum.Server/Broker/Services/MessagesProcessor.cs @@ -1,13 +1,20 @@ using Sphagnum.Common.Infrastructure.Contracts; -using Sphagnum.Common.Messaging.Extensions; -using Sphagnum.Common.Messaging.Utils; +using Sphagnum.Common.Infrastructure.Extensions; +using Sphagnum.Server.Storage.Contracts.Wal.Interfaces; using System.Collections.Concurrent; namespace Sphagnum.Server.Broker.Services { public class MessagesProcessor { + private readonly IWalWriter _walWriter; private readonly ConcurrentDictionary Connections = new(); + + public MessagesProcessor(IWalWriter walWriter) + { + _walWriter = walWriter; + } + internal void AddConnection(IConnection connection) { connection.ConnectionClosed += (id) => @@ -17,12 +24,12 @@ namespace Sphagnum.Server.Broker.Services var _ = ProcessMessages(connection); } - internal static async Task ProcessMessages(IConnection connection) + internal async Task ProcessMessages(IConnection connection) { while (!connection.CancellationTokenSource.IsCancellationRequested) { var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token); - var mess = MessageParser.UnpackMessage(data); + await _walWriter.WriteData(data); } } } diff --git a/src/Sphagnum.Server/Broker/Services/Reciever.cs b/src/Sphagnum.Server/Broker/Services/Reciever.cs new file mode 100644 index 0000000..2b11164 --- /dev/null +++ b/src/Sphagnum.Server/Broker/Services/Reciever.cs @@ -0,0 +1,54 @@ +using Sphagnum.Common.Infrastructure.Contracts; +using Sphagnum.Common.Infrastructure.Services; +using Sphagnum.Server.Storage.Contracts.Wal.Interfaces; +using System.Collections.Concurrent; +using System.Net; +using Sphagnum.Common.Infrastructure.Extensions; + +namespace Sphagnum.Server.Broker.Services +{ + public class Reciever + { + public readonly CancellationTokenSource Cts = new(); + + private readonly IConnection _listeningConnection; + private readonly IWalWriter _walWriter; + private readonly ConcurrentDictionary Connections = new(); + public Reciever(ConnectionFactory connectionFactory, IWalWriter walWriter) + { + _listeningConnection = connectionFactory.CreateConnection(false).Result; + _listeningConnection.Bind(new IPEndPoint(IPAddress.Any, connectionFactory.Port)); + _listeningConnection.Listen(1000); //todo разобраться что делает этот параметр. + _walWriter = walWriter; + var _ = AcceptationWorker(); + } + + internal void AddConnection(IConnection connection) + { + Connections[connection.ConnectionId] = connection; + connection.ConnectionClosed += (id) => + { + Connections.TryRemove(id, out var conn); + }; + ProcessMessages(connection); + } + + private async Task AcceptationWorker() + { + while (!Cts.IsCancellationRequested) + { + var acceptedSocket = await _listeningConnection.AcceptAsync(); + AddConnection(acceptedSocket); + } + } + + internal async Task ProcessMessages(IConnection connection) + { + while (!connection.CancellationTokenSource.IsCancellationRequested) + { + var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token); + await _walWriter.WriteData(data); + } + } + } +} diff --git a/src/Sphagnum.Server/Scheme/Contracts/IExchange.cs b/src/Sphagnum.Server/Scheme/Contracts/IExchange.cs new file mode 100644 index 0000000..9103000 --- /dev/null +++ b/src/Sphagnum.Server/Scheme/Contracts/IExchange.cs @@ -0,0 +1,9 @@ +namespace Sphagnum.Server.Scheme.Contracts +{ + public interface IExchange + { + public string Name { get; init; } + + public Task SendMessage(); + } +} diff --git a/src/Sphagnum.Server/Scheme/SchemaManager.cs b/src/Sphagnum.Server/Scheme/SchemaManager.cs new file mode 100644 index 0000000..87888c3 --- /dev/null +++ b/src/Sphagnum.Server/Scheme/SchemaManager.cs @@ -0,0 +1,7 @@ +namespace Sphagnum.Server.Storage +{ + internal class SchemaManager + { + + } +} diff --git a/src/Sphagnum.Server/Sphagnum.Server.csproj b/src/Sphagnum.Server/Sphagnum.Server.csproj index 802859c..f286b9d 100644 --- a/src/Sphagnum.Server/Sphagnum.Server.csproj +++ b/src/Sphagnum.Server/Sphagnum.Server.csproj @@ -14,4 +14,8 @@ + + + + diff --git a/src/Sphagnum.Server/Storage/Contracts/StorageConfig.cs b/src/Sphagnum.Server/Storage/Contracts/StorageConfig.cs new file mode 100644 index 0000000..4dd113c --- /dev/null +++ b/src/Sphagnum.Server/Storage/Contracts/StorageConfig.cs @@ -0,0 +1,8 @@ +namespace Sphagnum.Server.Storage.Contracts +{ + public class StorageConfig + { + public readonly string WalDirectory; + public readonly long WalPageMinSize = 1024 * 4; + } +} diff --git a/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalCursorManager.cs b/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalCursorManager.cs new file mode 100644 index 0000000..891aeb1 --- /dev/null +++ b/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalCursorManager.cs @@ -0,0 +1,8 @@ +namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces +{ + internal interface IWalCursorManager + { + ValueTask GetCursor(); + ValueTask SetCursor(WalCursor walCursor); + } +} \ No newline at end of file diff --git a/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalStreamSource.cs b/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalStreamSource.cs new file mode 100644 index 0000000..1dacc2d --- /dev/null +++ b/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalStreamSource.cs @@ -0,0 +1,7 @@ +namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces +{ + internal interface IWalStreamSource + { + public Stream GetStream(string pathToFile); + } +} diff --git a/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalWriter.cs b/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalWriter.cs new file mode 100644 index 0000000..7ffc95c --- /dev/null +++ b/src/Sphagnum.Server/Storage/Contracts/Wal/Interfaces/IWalWriter.cs @@ -0,0 +1,7 @@ +namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces +{ + public interface IWalWriter + { + public Task WriteData(byte[] data); + } +} diff --git a/src/Sphagnum.Server/Storage/Contracts/Wal/WalCursor.cs b/src/Sphagnum.Server/Storage/Contracts/Wal/WalCursor.cs new file mode 100644 index 0000000..96b3854 --- /dev/null +++ b/src/Sphagnum.Server/Storage/Contracts/Wal/WalCursor.cs @@ -0,0 +1,14 @@ +namespace Sphagnum.Server.Storage.Contracts.Wal +{ + public readonly struct WalCursor + { + public readonly long PageId; + public readonly long CurrentPosition; + + public WalCursor(long pageId, long currentPosition) + { + PageId = pageId; + CurrentPosition = currentPosition; + } + } +} diff --git a/src/Sphagnum.Server/Storage/Services/Wal/WalCursorManager.cs b/src/Sphagnum.Server/Storage/Services/Wal/WalCursorManager.cs new file mode 100644 index 0000000..937b4d5 --- /dev/null +++ b/src/Sphagnum.Server/Storage/Services/Wal/WalCursorManager.cs @@ -0,0 +1,47 @@ +using Sphagnum.Server.Storage.Contracts; +using Sphagnum.Server.Storage.Contracts.Wal; +using Sphagnum.Server.Storage.Contracts.Wal.Interfaces; + +namespace Sphagnum.Server.Storage.Services.Wal +{ + internal class WalCursorManager : IWalCursorManager + { + private readonly string _cursorPath; + private byte[]? data; + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1); + public WalCursorManager(StorageConfig storageConfig) + { + _cursorPath = Path.Combine(storageConfig.WalDirectory, "cursor"); + } + + public async ValueTask GetCursor() + { + await _semaphore.WaitAsync(); + try + { + if (data == null) + { + data = await File.ReadAllBytesAsync(_cursorPath); + } + var result = new WalCursor(BitConverter.ToInt64(data, 0), BitConverter.ToInt64(data, 8)); + return result; + } + finally { _semaphore.Release(); } + } + + public async ValueTask SetCursor(WalCursor walCursor) + { + try + { + if (data == null) + { + data = new byte[16]; + } + BitConverter.TryWriteBytes(data.AsSpan(0, 8), walCursor.PageId); + BitConverter.TryWriteBytes(data.AsSpan(8, 8), walCursor.CurrentPosition); + await File.WriteAllBytesAsync(_cursorPath, data); + } + finally { _semaphore.Release(); } + } + } +} diff --git a/src/Sphagnum.Server/Storage/Services/Wal/WalStreamSource.cs b/src/Sphagnum.Server/Storage/Services/Wal/WalStreamSource.cs new file mode 100644 index 0000000..314cce9 --- /dev/null +++ b/src/Sphagnum.Server/Storage/Services/Wal/WalStreamSource.cs @@ -0,0 +1,13 @@ +using Sphagnum.Server.Storage.Contracts.Wal.Interfaces; + +namespace Sphagnum.Server.Storage.Services.Wal +{ + internal class WalStreamSource : IWalStreamSource + { + public Stream GetStream(string pathToFile) + { + var stream = new FileStream(pathToFile, FileMode.OpenOrCreate, FileAccess.Write); + return stream; + } + } +} diff --git a/src/Sphagnum.Server/Storage/Services/Wal/WalWriterDefault.cs b/src/Sphagnum.Server/Storage/Services/Wal/WalWriterDefault.cs new file mode 100644 index 0000000..844b959 --- /dev/null +++ b/src/Sphagnum.Server/Storage/Services/Wal/WalWriterDefault.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.Logging; +using Sphagnum.Server.Storage.Contracts; +using Sphagnum.Server.Storage.Contracts.Wal.Interfaces; +using System.Globalization; + +namespace Sphagnum.Server.Storage.Services.Wal +{ + internal class WalWriterDefault : IWalWriter + { + private readonly SemaphoreSlim _semaphore = new(0, 1); + + private readonly string _walDirectory; + private readonly long _walPageSize; + + private string? _walPagePath; + private long CurrentPageSize = 0; + private long CurrentPage = 0; + + private readonly IWalStreamSource _streamSource; + private readonly ILogger _logger; + public WalWriterDefault(StorageConfig storageConfig, IWalStreamSource streamSource, ILogger logger) + { + _streamSource = streamSource; + _walDirectory = storageConfig.WalDirectory; + _walPageSize = storageConfig.WalPageMinSize; + _logger = logger; + } + + public async Task WriteData(byte[] data) + { + if (!string.IsNullOrWhiteSpace(_walPagePath)) + { + await _semaphore.WaitAsync(); + try + { + if (CurrentPageSize > _walPageSize) + { + CurrentPageSize = 0; + CurrentPage += 1; + _walPagePath = Path.Combine(_walDirectory, CurrentPage.ToString(CultureInfo.InvariantCulture)); + } + using var stream = _streamSource.GetStream(_walPagePath); + await stream.WriteAsync(data); + CurrentPageSize += data.LongLength; + } + catch (Exception ex) + { + _logger.LogError(ex, "Walwriting failed!"); + } + _semaphore.Release(); + } + } + } +} diff --git a/tests/Sphagnum.Common.UnitTests/AssemblySettings.cs b/tests/Sphagnum.Common.UnitTests/AssemblySettings.cs new file mode 100644 index 0000000..2ca5276 --- /dev/null +++ b/tests/Sphagnum.Common.UnitTests/AssemblySettings.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Sphagnum.FuncTests")] \ No newline at end of file diff --git a/Sphagnum.Server.Tests/Services/TestConnection.cs b/tests/Sphagnum.Common.UnitTests/Services/TestConnection.cs similarity index 91% rename from Sphagnum.Server.Tests/Services/TestConnection.cs rename to tests/Sphagnum.Common.UnitTests/Services/TestConnection.cs index 3acad27..241dad1 100644 --- a/Sphagnum.Server.Tests/Services/TestConnection.cs +++ b/tests/Sphagnum.Common.UnitTests/Services/TestConnection.cs @@ -3,7 +3,7 @@ using System.Net; using System.Net.Sockets; using System.Threading.Channels; -namespace Sphagnum.Server.Tests.Services +namespace Sphagnum.Common.UnitTests.Services { /// /// Класс имитирующий передачу данных через сокет. @@ -101,9 +101,9 @@ namespace Sphagnum.Server.Tests.Services return await _newConnectionsChannel.Reader.ReadAsync(); } - internal async Task AddInputConnection(Guid? connectionId = null) + internal async Task AddInputConnection(TestConnection connection) { - await _newConnectionsChannel.Writer.WriteAsync(connectionId.HasValue ? new TestConnection(connectionId.Value) : new TestConnection()); + await _newConnectionsChannel.Writer.WriteAsync(connection); } } } diff --git a/tests/Sphagnum.Common.UnitTests/Services/TestConnectionFactory.cs b/tests/Sphagnum.Common.UnitTests/Services/TestConnectionFactory.cs new file mode 100644 index 0000000..001703a --- /dev/null +++ b/tests/Sphagnum.Common.UnitTests/Services/TestConnectionFactory.cs @@ -0,0 +1,19 @@ +using Sphagnum.Common.Infrastructure.Contracts; +using Sphagnum.Common.Infrastructure.Services; + +namespace Sphagnum.Common.UnitTests.Services +{ + internal class TestConnectionFactory : ConnectionFactory + { + public IConnection? CurrentConnection { get; private set; } + internal override Task CreateConnection(bool connected = true) + { + return Task.FromResult(CurrentConnection ?? (IConnection)(new TestConnection())); + } + + public void SetCurrentConnection(IConnection? connection) + { + CurrentConnection = connection; + } + } +} diff --git a/tests/Sphagnum.Common.UnitTests/Sphagnum.Common.UnitTests.csproj b/tests/Sphagnum.Common.UnitTests/Sphagnum.Common.UnitTests.csproj index 738a1ed..ca48fa6 100644 --- a/tests/Sphagnum.Common.UnitTests/Sphagnum.Common.UnitTests.csproj +++ b/tests/Sphagnum.Common.UnitTests/Sphagnum.Common.UnitTests.csproj @@ -10,6 +10,7 @@ + diff --git a/Sphagnum.Server.Tests/TestConnectionTests.cs b/tests/Sphagnum.Common.UnitTests/TestConnectionTests.cs similarity index 92% rename from Sphagnum.Server.Tests/TestConnectionTests.cs rename to tests/Sphagnum.Common.UnitTests/TestConnectionTests.cs index 39057f2..fe36399 100644 --- a/Sphagnum.Server.Tests/TestConnectionTests.cs +++ b/tests/Sphagnum.Common.UnitTests/TestConnectionTests.cs @@ -1,9 +1,9 @@ using AutoFixture; -using Sphagnum.Common.Messaging.Extensions; -using Sphagnum.Server.Tests.Services; +using Sphagnum.Common.Infrastructure.Extensions; +using Sphagnum.Common.UnitTests.Services; using System.Security.Cryptography; -namespace Sphagnum.Server.Tests +namespace Sphagnum.Common.UnitTests { public class TestConnectionTests { @@ -12,10 +12,11 @@ namespace Sphagnum.Server.Tests public async Task IsNewConnectionCretesCorrect() { var connection = new TestConnection(); + var connection2 = new TestConnection(); var connectionTask = connection.AcceptAsync(); - var id = Guid.NewGuid(); - await Task.WhenAll(connection.AddInputConnection(id), connectionTask); - Assert.That(id, Is.EqualTo(connectionTask.Result.ConnectionId)); + + await Task.WhenAll(connection.AddInputConnection(connection2), connectionTask); + Assert.That(connection2.ConnectionId, Is.EqualTo(connectionTask.Result.ConnectionId)); } /// diff --git a/Sphagnum.Server.Tests/Sphagnum.Server.Tests.csproj b/tests/Sphagnum.FuncTests/Sphagnum.FuncTests.csproj similarity index 68% rename from Sphagnum.Server.Tests/Sphagnum.Server.Tests.csproj rename to tests/Sphagnum.FuncTests/Sphagnum.FuncTests.csproj index 82c544c..56b7ab3 100644 --- a/Sphagnum.Server.Tests/Sphagnum.Server.Tests.csproj +++ b/tests/Sphagnum.FuncTests/Sphagnum.FuncTests.csproj @@ -1,4 +1,4 @@ - + net8.0 @@ -10,7 +10,6 @@ - @@ -19,7 +18,9 @@ - + + + diff --git a/tests/Sphagnum.FuncTests/UnitTest1.cs b/tests/Sphagnum.FuncTests/UnitTest1.cs new file mode 100644 index 0000000..db78b0a --- /dev/null +++ b/tests/Sphagnum.FuncTests/UnitTest1.cs @@ -0,0 +1,48 @@ +using AutoFixture; +using Sphagnum.Client; +using Sphagnum.Common.Messaging.Contracts; +using Sphagnum.Common.UnitTests.Services; +using Sphagnum.Server.Broker.Services; + +namespace Sphagnum.FuncTests +{ + public class Tests + { + [SetUp] + public void Setup() + { + } + + [Test] + public async Task Test1() + { + var fixture = new Fixture(); + var factory = new TestConnectionFactory(); + + var manager = new ConnectionsManager(); + var processor = new MessagesProcessor(); + var mainConnection = await factory.CreateConnection(false) as TestConnection; + Assert.IsNotNull(mainConnection); + factory.SetCurrentConnection(mainConnection); + + + var server = new ConnectionsReciever(factory, manager, processor); + await server.StartAsync(CancellationToken.None); + + factory.SetCurrentConnection(null); + var connection1 = await factory.CreateConnection(true) as TestConnection; + Assert.IsNotNull(connection1); + + factory.SetCurrentConnection(connection1); + + var client = new ClientDefault(factory); + await mainConnection.AddInputConnection(connection1); + var data = fixture.CreateMany(11).ToArray(); + + await Task.Delay(100); + await client.Publish(new Common.Messaging.Contracts.Messages.Message("default", RoutingKey.Empty, data)); + Assert.Pass(); + await Task.Delay(10000); + } + } +} \ No newline at end of file