фиксацтя
checks / check and test (push) Has been cancelled Details

revision
vlad zverzhkhovskiy 2025-08-15 09:22:38 +03:00
parent 7ddb102efe
commit 889934c51b
31 changed files with 383 additions and 109 deletions

View File

@ -9,7 +9,6 @@ builder.Services.AddSingleton(new ConnectionFactory()
});
builder.Services.AddSingleton<MessagesProcessor>();
builder.Services.AddSingleton<ConnectionsManager>();
builder.Services.AddSingleton<ConnectionsManager>();
builder.Services.AddHostedService<ConnectionsReciever>();

View File

@ -23,7 +23,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sphagnum.Common.UnitTests",
EndProject
Project("{E53339B2-1760-4266-BCC7-CA923CBCF16C}") = "docker-compose", "docker-compose.dcproj", "{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sphagnum.Server.Tests", "..\Sphagnum.Server.Tests\Sphagnum.Server.Tests.csproj", "{134442E7-CA62-4BE4-B7EB-63847385A98C}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sphagnum.FuncTests", "..\tests\Sphagnum.FuncTests\Sphagnum.FuncTests.csproj", "{C160C372-5E5E-5442-462B-A5B5C36BE2D0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -59,10 +59,10 @@ Global
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BF7E9B18-1C0F-4AA6-B4BD-F38617B72A1B}.Release|Any CPU.Build.0 = Release|Any CPU
{134442E7-CA62-4BE4-B7EB-63847385A98C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{134442E7-CA62-4BE4-B7EB-63847385A98C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{134442E7-CA62-4BE4-B7EB-63847385A98C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{134442E7-CA62-4BE4-B7EB-63847385A98C}.Release|Any CPU.Build.0 = Release|Any CPU
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C160C372-5E5E-5442-462B-A5B5C36BE2D0}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -74,7 +74,7 @@ Global
{C344FDF3-3D4D-4BC6-B257-4AD907BE1BE4} = {71CDAFF8-7C3D-4915-8FAA-5C263857AD62}
{77F76F27-D883-4392-90D5-8F441043F468} = {71CDAFF8-7C3D-4915-8FAA-5C263857AD62}
{25865911-1F2D-4083-A99C-C65E13F05C14} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750}
{134442E7-CA62-4BE4-B7EB-63847385A98C} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750}
{C160C372-5E5E-5442-462B-A5B5C36BE2D0} = {8E76791F-BA23-44AD-BACB-E14DD5FCE750}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {42DA8F91-1667-49BC-971D-D83C0870393A}

View File

@ -1,8 +1,8 @@
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Extensions;
using Sphagnum.Common.Infrastructure.Services;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Contracts.Messages;
using Sphagnum.Common.Messaging.Extensions;
using Sphagnum.Common.Messaging.Utils;
using System;
using System.Threading;
@ -36,18 +36,6 @@ namespace Sphagnum.Client
}
}
//private async Task Auth()
//{
// await _connection.SendAsync(MessageParser.PackMessage(new AuthMessage(_connectionFactory.Login, _connectionFactory.Password, _connectionFactory.UserRights)));
// var response = await ReceiveAsync();
// var messageType = MessageParser.GetMessageType(response);
// if (messageType == MessageType.AuthSuccessfull)
// {
// return;
// }
// throw new Exception("Auth failed!");
//}
public async ValueTask Ack(Guid messageId)
{
await Task.Delay(1000);

View File

@ -3,5 +3,7 @@
//Тесты
[assembly: InternalsVisibleTo("Sphagnum.Common.UnitTests")]
[assembly: InternalsVisibleTo("Sphagnum.Server.Tests")]
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]
[assembly: InternalsVisibleTo("Sphagnum.Client")]
[assembly: InternalsVisibleTo("Sphagnum.Server")]

View File

@ -4,7 +4,7 @@ using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Messaging.Extensions
namespace Sphagnum.Common.Infrastructure.Extensions
{
internal static class IConnectionExtensions
{

View File

@ -32,6 +32,7 @@ namespace Sphagnum.Common.Messaging.Utils
var id = GetMessageId(bytes);
return new Message(id, exchangeName, routingKey, payload);
}
public static byte[] PackMessage(Message message)
{
if (string.IsNullOrEmpty(message.Exchange) || string.IsNullOrWhiteSpace(message.Exchange))

View File

@ -1,3 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Sphagnum.Server.FuncTests")]
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]

View File

@ -0,0 +1,26 @@
using Microsoft.Extensions.Hosting;
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
using System.Net;
namespace Sphagnum.Server.Broker.Services
{
public class BrokerService : IHostedService
{
private readonly ConnectionsManager _manager;
public BrokerService(ConnectionsManager manager)
{
_manager = manager;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

View File

@ -1,22 +1,12 @@
using Sphagnum.Common.Infrastructure.Contracts;
using System.Collections.Concurrent;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Sphagnum.Server.Broker.Services
{
public class ConnectionsManager
{
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
public ConnectionsManager()
{
}
internal void AddConnection(IConnection connection)
{
connection.ConnectionClosed += (id) =>
{
Connections.TryRemove(id, out var conn);
};
}
}
}

View File

@ -1,56 +0,0 @@
using Microsoft.Extensions.Hosting;
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
using System.Net;
namespace Sphagnum.Server.Broker.Services
{
public class ConnectionsReciever : IHostedService
{
private readonly int _port;
private readonly CancellationTokenSource _cts = new();
private readonly IConnection _connection;
private readonly ConnectionsManager _manager;
private readonly MessagesProcessor _processor;
public ConnectionsReciever(ConnectionFactory connectionFactory, ConnectionsManager manager, MessagesProcessor processor)
{
_port = connectionFactory.Port;
_manager = manager;
_connection = connectionFactory.CreateConnection(false).Result;
_processor = processor;
}
public Task StopAsync()
{
_cts.Cancel();
return Task.CompletedTask;
}
private async Task AcceptationWorker()
{
while (!_cts.IsCancellationRequested)
{
var acceptedSocket = await _connection.AcceptAsync();
_manager.AddConnection(acceptedSocket);
_processor.AddConnection(acceptedSocket);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
_connection.Bind(new IPEndPoint(IPAddress.Any, _port));
_connection?.Listen(1000); //todo разобраться что делает этот параметр.
var _ = AcceptationWorker();
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_connection.Close();
_cts.Cancel();
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Sphagnum.Server.Broker.Services
{
internal class MessagesBus
{
}
}

View File

@ -1,13 +1,20 @@
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Messaging.Extensions;
using Sphagnum.Common.Messaging.Utils;
using Sphagnum.Common.Infrastructure.Extensions;
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
using System.Collections.Concurrent;
namespace Sphagnum.Server.Broker.Services
{
public class MessagesProcessor
{
private readonly IWalWriter _walWriter;
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
public MessagesProcessor(IWalWriter walWriter)
{
_walWriter = walWriter;
}
internal void AddConnection(IConnection connection)
{
connection.ConnectionClosed += (id) =>
@ -17,12 +24,12 @@ namespace Sphagnum.Server.Broker.Services
var _ = ProcessMessages(connection);
}
internal static async Task ProcessMessages(IConnection connection)
internal async Task ProcessMessages(IConnection connection)
{
while (!connection.CancellationTokenSource.IsCancellationRequested)
{
var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token);
var mess = MessageParser.UnpackMessage(data);
await _walWriter.WriteData(data);
}
}
}

View File

@ -0,0 +1,54 @@
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
using System.Collections.Concurrent;
using System.Net;
using Sphagnum.Common.Infrastructure.Extensions;
namespace Sphagnum.Server.Broker.Services
{
public class Reciever
{
public readonly CancellationTokenSource Cts = new();
private readonly IConnection _listeningConnection;
private readonly IWalWriter _walWriter;
private readonly ConcurrentDictionary<Guid, IConnection> Connections = new();
public Reciever(ConnectionFactory connectionFactory, IWalWriter walWriter)
{
_listeningConnection = connectionFactory.CreateConnection(false).Result;
_listeningConnection.Bind(new IPEndPoint(IPAddress.Any, connectionFactory.Port));
_listeningConnection.Listen(1000); //todo разобраться что делает этот параметр.
_walWriter = walWriter;
var _ = AcceptationWorker();
}
internal void AddConnection(IConnection connection)
{
Connections[connection.ConnectionId] = connection;
connection.ConnectionClosed += (id) =>
{
Connections.TryRemove(id, out var conn);
};
ProcessMessages(connection);
}
private async Task AcceptationWorker()
{
while (!Cts.IsCancellationRequested)
{
var acceptedSocket = await _listeningConnection.AcceptAsync();
AddConnection(acceptedSocket);
}
}
internal async Task ProcessMessages(IConnection connection)
{
while (!connection.CancellationTokenSource.IsCancellationRequested)
{
var data = await connection.ReceiveAsync(connection.CancellationTokenSource.Token);
await _walWriter.WriteData(data);
}
}
}
}

View File

@ -0,0 +1,9 @@
namespace Sphagnum.Server.Scheme.Contracts
{
public interface IExchange
{
public string Name { get; init; }
public Task SendMessage();
}
}

View File

@ -0,0 +1,7 @@
namespace Sphagnum.Server.Storage
{
internal class SchemaManager
{
}
}

View File

@ -14,4 +14,8 @@
<ProjectReference Include="..\Sphagnum.Common\Sphagnum.Common.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Storage\Utils\" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,8 @@
namespace Sphagnum.Server.Storage.Contracts
{
public class StorageConfig
{
public readonly string WalDirectory;
public readonly long WalPageMinSize = 1024 * 4;
}
}

View File

@ -0,0 +1,8 @@
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
{
internal interface IWalCursorManager
{
ValueTask<WalCursor> GetCursor();
ValueTask SetCursor(WalCursor walCursor);
}
}

View File

@ -0,0 +1,7 @@
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
{
internal interface IWalStreamSource
{
public Stream GetStream(string pathToFile);
}
}

View File

@ -0,0 +1,7 @@
namespace Sphagnum.Server.Storage.Contracts.Wal.Interfaces
{
public interface IWalWriter
{
public Task WriteData(byte[] data);
}
}

View File

@ -0,0 +1,14 @@
namespace Sphagnum.Server.Storage.Contracts.Wal
{
public readonly struct WalCursor
{
public readonly long PageId;
public readonly long CurrentPosition;
public WalCursor(long pageId, long currentPosition)
{
PageId = pageId;
CurrentPosition = currentPosition;
}
}
}

View File

@ -0,0 +1,47 @@
using Sphagnum.Server.Storage.Contracts;
using Sphagnum.Server.Storage.Contracts.Wal;
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
namespace Sphagnum.Server.Storage.Services.Wal
{
internal class WalCursorManager : IWalCursorManager
{
private readonly string _cursorPath;
private byte[]? data;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1);
public WalCursorManager(StorageConfig storageConfig)
{
_cursorPath = Path.Combine(storageConfig.WalDirectory, "cursor");
}
public async ValueTask<WalCursor> GetCursor()
{
await _semaphore.WaitAsync();
try
{
if (data == null)
{
data = await File.ReadAllBytesAsync(_cursorPath);
}
var result = new WalCursor(BitConverter.ToInt64(data, 0), BitConverter.ToInt64(data, 8));
return result;
}
finally { _semaphore.Release(); }
}
public async ValueTask SetCursor(WalCursor walCursor)
{
try
{
if (data == null)
{
data = new byte[16];
}
BitConverter.TryWriteBytes(data.AsSpan(0, 8), walCursor.PageId);
BitConverter.TryWriteBytes(data.AsSpan(8, 8), walCursor.CurrentPosition);
await File.WriteAllBytesAsync(_cursorPath, data);
}
finally { _semaphore.Release(); }
}
}
}

View File

@ -0,0 +1,13 @@
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
namespace Sphagnum.Server.Storage.Services.Wal
{
internal class WalStreamSource : IWalStreamSource
{
public Stream GetStream(string pathToFile)
{
var stream = new FileStream(pathToFile, FileMode.OpenOrCreate, FileAccess.Write);
return stream;
}
}
}

View File

@ -0,0 +1,54 @@
using Microsoft.Extensions.Logging;
using Sphagnum.Server.Storage.Contracts;
using Sphagnum.Server.Storage.Contracts.Wal.Interfaces;
using System.Globalization;
namespace Sphagnum.Server.Storage.Services.Wal
{
internal class WalWriterDefault : IWalWriter
{
private readonly SemaphoreSlim _semaphore = new(0, 1);
private readonly string _walDirectory;
private readonly long _walPageSize;
private string? _walPagePath;
private long CurrentPageSize = 0;
private long CurrentPage = 0;
private readonly IWalStreamSource _streamSource;
private readonly ILogger<WalWriterDefault> _logger;
public WalWriterDefault(StorageConfig storageConfig, IWalStreamSource streamSource, ILogger<WalWriterDefault> logger)
{
_streamSource = streamSource;
_walDirectory = storageConfig.WalDirectory;
_walPageSize = storageConfig.WalPageMinSize;
_logger = logger;
}
public async Task WriteData(byte[] data)
{
if (!string.IsNullOrWhiteSpace(_walPagePath))
{
await _semaphore.WaitAsync();
try
{
if (CurrentPageSize > _walPageSize)
{
CurrentPageSize = 0;
CurrentPage += 1;
_walPagePath = Path.Combine(_walDirectory, CurrentPage.ToString(CultureInfo.InvariantCulture));
}
using var stream = _streamSource.GetStream(_walPagePath);
await stream.WriteAsync(data);
CurrentPageSize += data.LongLength;
}
catch (Exception ex)
{
_logger.LogError(ex, "Walwriting failed!");
}
_semaphore.Release();
}
}
}
}

View File

@ -0,0 +1,3 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Sphagnum.FuncTests")]

View File

@ -3,7 +3,7 @@ using System.Net;
using System.Net.Sockets;
using System.Threading.Channels;
namespace Sphagnum.Server.Tests.Services
namespace Sphagnum.Common.UnitTests.Services
{
/// <summary>
/// Класс имитирующий передачу данных через сокет.
@ -101,9 +101,9 @@ namespace Sphagnum.Server.Tests.Services
return await _newConnectionsChannel.Reader.ReadAsync();
}
internal async Task AddInputConnection(Guid? connectionId = null)
internal async Task AddInputConnection(TestConnection connection)
{
await _newConnectionsChannel.Writer.WriteAsync(connectionId.HasValue ? new TestConnection(connectionId.Value) : new TestConnection());
await _newConnectionsChannel.Writer.WriteAsync(connection);
}
}
}

View File

@ -0,0 +1,19 @@
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
namespace Sphagnum.Common.UnitTests.Services
{
internal class TestConnectionFactory : ConnectionFactory
{
public IConnection? CurrentConnection { get; private set; }
internal override Task<IConnection> CreateConnection(bool connected = true)
{
return Task.FromResult(CurrentConnection ?? (IConnection)(new TestConnection()));
}
public void SetCurrentConnection(IConnection? connection)
{
CurrentConnection = connection;
}
}
}

View File

@ -10,6 +10,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AutoFixture" Version="4.18.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.2.1" />

View File

@ -1,9 +1,9 @@
using AutoFixture;
using Sphagnum.Common.Messaging.Extensions;
using Sphagnum.Server.Tests.Services;
using Sphagnum.Common.Infrastructure.Extensions;
using Sphagnum.Common.UnitTests.Services;
using System.Security.Cryptography;
namespace Sphagnum.Server.Tests
namespace Sphagnum.Common.UnitTests
{
public class TestConnectionTests
{
@ -12,10 +12,11 @@ namespace Sphagnum.Server.Tests
public async Task IsNewConnectionCretesCorrect()
{
var connection = new TestConnection();
var connection2 = new TestConnection();
var connectionTask = connection.AcceptAsync();
var id = Guid.NewGuid();
await Task.WhenAll(connection.AddInputConnection(id), connectionTask);
Assert.That(id, Is.EqualTo(connectionTask.Result.ConnectionId));
await Task.WhenAll(connection.AddInputConnection(connection2), connectionTask);
Assert.That(connection2.ConnectionId, Is.EqualTo(connectionTask.Result.ConnectionId));
}
/// <summary>

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
@ -10,7 +10,6 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AutoFixture" Version="4.18.1" />
<PackageReference Include="coverlet.collector" Version="6.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.8.0" />
<PackageReference Include="NUnit" Version="3.14.0" />
@ -19,7 +18,9 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\src\Sphagnum.Server\Sphagnum.Server.csproj" />
<ProjectReference Include="..\..\src\Sphagnum.Client\Sphagnum.Client.csproj" />
<ProjectReference Include="..\..\src\Sphagnum.Server\Sphagnum.Server.csproj" />
<ProjectReference Include="..\Sphagnum.Common.UnitTests\Sphagnum.Common.UnitTests.csproj" />
</ItemGroup>
<ItemGroup>

View File

@ -0,0 +1,48 @@
using AutoFixture;
using Sphagnum.Client;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.UnitTests.Services;
using Sphagnum.Server.Broker.Services;
namespace Sphagnum.FuncTests
{
public class Tests
{
[SetUp]
public void Setup()
{
}
[Test]
public async Task Test1()
{
var fixture = new Fixture();
var factory = new TestConnectionFactory();
var manager = new ConnectionsManager();
var processor = new MessagesProcessor();
var mainConnection = await factory.CreateConnection(false) as TestConnection;
Assert.IsNotNull(mainConnection);
factory.SetCurrentConnection(mainConnection);
var server = new ConnectionsReciever(factory, manager, processor);
await server.StartAsync(CancellationToken.None);
factory.SetCurrentConnection(null);
var connection1 = await factory.CreateConnection(true) as TestConnection;
Assert.IsNotNull(connection1);
factory.SetCurrentConnection(connection1);
var client = new ClientDefault(factory);
await mainConnection.AddInputConnection(connection1);
var data = fixture.CreateMany<byte>(11).ToArray();
await Task.Delay(100);
await client.Publish(new Common.Messaging.Contracts.Messages.Message("default", RoutingKey.Empty, data));
Assert.Pass();
await Task.Delay(10000);
}
}
}