промежуточная фиксация рефакторинга

revision
vlad zverzhkhovskiy 2025-08-06 17:34:54 +03:00
parent 78f9f6c398
commit c21e3ed952
16 changed files with 42 additions and 650 deletions

View File

@ -2,6 +2,7 @@
using Sphagnum.Common.Infrastructure.Services;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Contracts.Messages;
using Sphagnum.Common.Messaging.Extensions;
using Sphagnum.Common.Messaging.Utils;
using System;
using System.Threading;
@ -10,20 +11,29 @@ using System.Threading.Tasks;
namespace Sphagnum.Client
{
public sealed class ClientDefault : IMessagingClient, IDisposable
public sealed class ClientDefault : IDisposable
{
private readonly Task _recievingTask;
private readonly IConnection _connection;
private readonly Channel<byte[]> _commonMessagesChannel = Channel.CreateUnbounded<byte[]>();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
public ClientDefault(ConnectionFactory factory)
{
_connection = factory.CreateConnection().Result;
_recievingTask = RecivingTask();
}
//private async Task<byte[]> ReceiveAsync()
//{
// return await _commonMessagesChannel.Reader.ReadAsync(_cts.Token);
//}
private async Task RecivingTask()
{
while (!_cts.IsCancellationRequested)
{
var data = await _connection.ReceiveAsync(_cts.Token);
if (MessageParser.GetMessageType(data) == MessageType.Common)
{
await _commonMessagesChannel.Writer.WriteAsync(data);
}
}
}
//private async Task Auth()
//{
@ -47,17 +57,17 @@ namespace Sphagnum.Client
throw new NotImplementedException();
}
public async ValueTask<Guid> Publish(OutgoingMessage message)
public async ValueTask<Guid> Publish(Message message)
{
var bytes = MessageParserold.PackMessage(message);
await _connection.SendAsync(bytes.AsMemory(), System.Net.Sockets.SocketFlags.None);
return MessageParserold.GetMessageId(bytes);
}
public async ValueTask<IncommingMessage> Consume(CancellationToken cancellationToken)
public async ValueTask<Message> Consume(CancellationToken cancellationToken)
{
var result = await _commonMessagesChannel.Reader.ReadAsync(cancellationToken);
return MessageParserold.UnpackIncomingMessage(result);
return MessageParser.UnpackMessage(result);
}
public ValueTask Reject(Guid messageId)

View File

@ -1,16 +0,0 @@
using Sphagnum.Common.Messaging.Contracts.Messages;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Messaging.Contracts
{
public interface IMessagingClient
{
public ValueTask<IncommingMessage> Consume(CancellationToken cancellationToken);
public ValueTask<Guid> Publish(OutgoingMessage message);
public ValueTask Ack(Guid messageId);
public ValueTask Nack(Guid messageId);
public ValueTask Reject(Guid messageId);
}
}

View File

@ -1,17 +0,0 @@
using System;
namespace Sphagnum.Common.Messaging.Contracts.Messages
{
public readonly struct IncommingMessage
{
public readonly Guid MessageId;
public readonly ReadOnlyMemory<byte> Payload;
public IncommingMessage(Guid messageId, ReadOnlyMemory<byte> payload)
{
MessageId = messageId;
Payload = payload;
}
}
}

View File

@ -1,27 +0,0 @@
using System;
namespace Sphagnum.Common.Messaging.Contracts.Messages
{
public readonly struct OutgoingMessage
{
public readonly string Exchange;
public readonly RoutingKey RoutingKey;
public readonly ReadOnlyMemory<byte> Payload;
public OutgoingMessage(string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
{
Exchange = exchange;
RoutingKey = routingKey;
Payload = payload;
}
public OutgoingMessage(string exchange, ReadOnlyMemory<byte> payload)
{
Exchange = exchange;
RoutingKey = new RoutingKey();
Payload = payload;
}
}
}

View File

@ -0,0 +1,23 @@
using Sphagnum.Common.Infrastructure.Contracts;
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Messaging.Extensions
{
internal static class IConntcrionExtensions
{
public static async ValueTask<byte[]> ReceiveAsync(this IConnection connection, CancellationToken cancellationToken = default)
{
var lengthBuffer = new byte[4];
await connection.ReceiveAsync(lengthBuffer, SocketFlags.Peek, cancellationToken);
var length = BitConverter.ToInt32(lengthBuffer, 0);
var result = new byte[length];
await connection.ReceiveAsync(result, SocketFlags.None, cancellationToken);
return result;
}
}
}

View File

@ -1,39 +0,0 @@
using Sphagnum.Common.Infrastructure.Contracts;
using System;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Messaging.Services
{
internal class SphagnumConnection
{
private readonly IConnection _connection;
public bool Connected => throw new NotImplementedException();
public void Close()
{
throw new NotImplementedException();
}
public Task ConnectAsync(string host, int port)
{
throw new NotImplementedException();
}
public void Dispose()
{
throw new NotImplementedException();
}
public ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}
}

View File

@ -137,7 +137,7 @@ namespace Sphagnum.Common.Messaging.Utils
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static MessageType GetMessageType(Span<byte> bytes)
{
return (MessageType)bytes[4];
return bytes.Length < 5 ? MessageType.Unknown : (MessageType)bytes[4];
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]

View File

@ -1,304 +0,0 @@
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Contracts.Messages;
using Sphagnum.Common.Old.Contracts.Messaging.Messages;
using System;
using System.Runtime.CompilerServices;
using System.Text;
namespace Sphagnum.Common.Messaging.Utils
{
/// <summary>
/// Порядок передачи:
/// 1. MessageSize, 4 байта
/// 2. MessageType, 1 байт
/// 3. MessageFlags, 2 байта
/// 4. Id сообщения, если есть, 16 байт
/// 5. ExchangeNameLength, если есть, 1 байт
/// 6. ExchangeName, если есть, ExchangeNameLength байт, Utf8
/// 7. RoutingKey, если есть, 3 байта
/// 8. PayloadSize, если есть - 4 байта
/// 9. Payload, если есть, PayloadSize байт
/// </summary>
internal static class MessageParserold
{
public static OutgoingMessage UnpackOutgoingMessage(byte[] bytes)
{
if ((MessageType)bytes[4] != MessageType.Common)
{
throw new ArgumentException("Uncorrect message type! 1 (MessageType.Common) expected!");
}
var exchangeName = GetExchangeName(bytes);
var routingKey = GetRoutingKey(bytes);
var payload = GetPayload(bytes);
return new OutgoingMessage(exchangeName, routingKey, payload);
}
public static IncommingMessage UnpackIncomingMessage(byte[] bytes)
{
if ((MessageType)bytes[4] != MessageType.Common)
{
throw new ArgumentException("Uncorrect message type! 1 (MessageType.Common) expected!");
}
var id = GetMessageId(bytes);
var payload = GetPayload(bytes);
return new IncommingMessage(id, payload);
}
public static byte[] PackMessage(IncommingMessage message)
{
var result = new byte[27 + message.Payload.Length];
result[4] = (byte)MessageType.Common;
var flags = MessageFlags.HasPayload;
BitConverter.TryWriteBytes(result.AsSpan(5, 2), (ushort)flags);
message.MessageId.TryWriteBytes(result.AsSpan(7));
BitConverter.TryWriteBytes(result.AsSpan(23), message.Payload.Length);
message.Payload.CopyTo(result.AsMemory(27));
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
return result;
}
public static byte[] PackMessage(Message message)
{
var result = new byte[27 + message.Payload.Length];
result[4] = (byte)MessageType.Common;
var flags = MessageFlags.HasPayload;
BitConverter.TryWriteBytes(result.AsSpan(5, 2), (ushort)flags);
message.MessageId.TryWriteBytes(result.AsSpan(7));
BitConverter.TryWriteBytes(result.AsSpan(23), message.Payload.Length);
message.Payload.CopyTo(result.AsMemory(27));
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
return result;
}
public static byte[] PackReplyMessage(MessageType messageType)
{
var res = new byte[23];
res[4] = (byte)messageType;
res[0] = 23;
return res;
}
public static byte[] PackReplyMessage(MessageType messageType, Guid parentMessageId)
{
var res = new byte[23];
res[4] = (byte)messageType;
res[0] = 23;
parentMessageId.TryWriteBytes(res.AsSpan(7));
return res;
}
public static byte[] PackMessage(AuthMessage message)
{
var result = new byte[27 + message.Payload.Length];
result[4] = (byte)MessageType.Auth;
var flags = MessageFlags.HasPayload;
BitConverter.TryWriteBytes(result.AsSpan(5, 2), (ushort)flags);
message.MessageId.TryWriteBytes(result.AsSpan(7));
BitConverter.TryWriteBytes(result.AsSpan(23), message.Payload.Length);
message.Payload.CopyTo(result.AsMemory(27));
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
return result;
}
public static byte[] PackMessage(OutgoingMessage message)
{
if (string.IsNullOrEmpty(message.Exchange) || string.IsNullOrWhiteSpace(message.Exchange))
{
throw new ArgumentException("Bad exchange name!");
}
else if (Encoding.UTF8.GetByteCount(message.Exchange) > 255)
{
throw new ArgumentException("Exchange name in UTF8 encoding must allocate < 256 bytes!");
}
var flags = MessageFlags.HasExchange;
int count = 23;
if (message.Payload.Length > 0)
{
flags |= MessageFlags.HasPayload;
count += message.Payload.Length;
count += 4;
}
if (!message.RoutingKey.IsEmpry)
{
flags |= MessageFlags.HasRoutingKey;
count += 3;
}
var exchangeNameBytes = Encoding.UTF8.GetBytes(message.Exchange);// todo перевести на более оптимальный метод, не аллоцирующий лишнего.
count += exchangeNameBytes.Length;
count++;
return Pack(message, Guid.NewGuid(), flags, count);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static Guid GetMessageId(byte[] bytes)
{
var slice = bytes.AsSpan(7, 16);
return new Guid(slice);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static Guid GetMessageId(Span<byte> bytes)
{
var slice = bytes.Slice(7, 16);
return new Guid(slice);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static Guid GetMessageId(ReadOnlyMemory<byte> bytes)
{
var slice = bytes.Slice(7, 16);
return new Guid(slice.Span);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void CopyId(ReadOnlySpan<byte> bytes, byte[] buffer)
{
var slice = bytes.Slice(7, 16);
slice.CopyTo(buffer.AsSpan(7, 16));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static MessageType GetMessageType(Span<byte> bytes)
{
return (MessageType)bytes[4];
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static byte[] GetPayload(Span<byte> bytes)
{
var result = Array.Empty<byte>();
if (HasPayload(bytes))
{
var shift = 23;
if (HasExchange(bytes))//todo проверить бенчмарком, как работает инлайн
{
shift += bytes[23];
shift++;
}
if (HasKey(bytes))
{
shift += 3;
}
var payloadSize = BitConverter.ToInt32(bytes[shift..]);
if (payloadSize > 0)
{
result = bytes.Slice(shift + 4, payloadSize).ToArray();
}
}
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static int GetPayloadStart(Span<byte> bytes)
{
if (HasPayload(bytes))
{
var shift = 27;
if (HasExchange(bytes))//todo проверить бенчмарком, как работает инлайн
{
shift += bytes[23];
shift += 1;
}
if (HasKey(bytes))
{
shift += 3;
}
return shift;
}
return -1;
}
internal static byte[] Pack(OutgoingMessage message, Guid id, MessageFlags flags, int count)
{
var result = new byte[count];
result[4] = (byte)MessageType.Common;
var shift = 5;
BitConverter.TryWriteBytes(result.AsSpan(shift), (ushort)flags);//2. flags
shift += 2;
id.TryWriteBytes(result.AsSpan(shift));//3. id
shift += 16;
if ((flags & MessageFlags.HasExchange) == MessageFlags.HasExchange)
{
var exchangeBytes = Encoding.UTF8.GetBytes(message.Exchange);
BitConverter.TryWriteBytes(result.AsSpan(shift), (byte)message.Exchange.Length);//4. ExchangeNameLength
shift += 1;
exchangeBytes.CopyTo(result.AsSpan(shift));//5. ExchangeName
shift += exchangeBytes.Length;
}
if ((flags & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey)//6. RoutingKey
{
result[shift] = message.RoutingKey.Part1;
shift++;
result[shift] = message.RoutingKey.Part2;
shift++;
result[shift] = message.RoutingKey.Part3;
shift++;
}
if ((flags & MessageFlags.HasPayload) == MessageFlags.HasPayload)
{
BitConverter.TryWriteBytes(result.AsSpan(shift), message.Payload.Length);//7. PayloadSize
shift += 4;
message.Payload.CopyTo(result.AsMemory(shift));//8. Payload
}
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool HasKey(Span<byte> bytes)
{
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
return ((MessageFlags)value & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool HasPayload(Span<byte> bytes)
{
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
return ((MessageFlags)value & MessageFlags.HasPayload) == MessageFlags.HasPayload;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool HasExchange(Span<byte> bytes)
{
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
return ((MessageFlags)value & MessageFlags.HasExchange) == MessageFlags.HasExchange;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static string GetExchangeName(Span<byte> bytes)
{
var hasExchange = HasExchange(bytes);
if (!hasExchange)
{
throw new ArgumentException("bytes must contains exchange name!");
}
return Encoding.UTF8.GetString(bytes.Slice(24, bytes[23]));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static RoutingKey GetRoutingKey(Span<byte> bytes)
{
var length = bytes[23];
RoutingKey key;
if (HasKey(bytes))
{
var routingKeyShift = 23 + length + 1;
var routingKeyPart1 = bytes[routingKeyShift];
var routingKeyPart2 = bytes[routingKeyShift + 1];
var routingKeyPart3 = bytes[routingKeyShift + 2];
key = new RoutingKey(routingKeyPart1, routingKeyPart2, routingKeyPart3);
}
else
{
key = new RoutingKey();
}
return key;
}
}
}

View File

@ -1,8 +0,0 @@
namespace Sphagnum.Common.Old.Contracts
{
internal static class Constants
{
public const int HashedUserDataSizeInfBytes = 32;
public const int PayloadRecieverBufferSize = 8192;
}
}

View File

@ -1,23 +0,0 @@
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Infrastructure.Services;
using System.Net.Sockets;
using System.Threading.Tasks;
namespace Sphagnum.Common.Old.Contracts.Login
{
public class ConnectionFactory
{
public int Port { get; set; }
public string Hostname { get; set; } = string.Empty;
public string Login { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
public UserRights UserRights { get; set; }
internal virtual async Task<IConnection> CreateDefaultConnected()
{
var conn = new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
await conn.ConnectAsync(Hostname, Port);
return conn;
}
}
}

View File

@ -1,19 +0,0 @@
using System;
namespace Sphagnum.Common.Old.Contracts.Login
{
[Flags]
public enum UserRights : ushort
{
None = 0,
MessagesConsuming = 1,
MessagesPublishing = 2,
TopicCreating = 4,
TopicDeleting = 8,
TopicBinding = 16,
ExchangeCreating = 32,
ExchangeDeleting = 64,
All = MessagesConsuming | MessagesPublishing | TopicCreating | TopicDeleting | TopicBinding | ExchangeCreating | ExchangeDeleting,
}
}

View File

@ -1,22 +0,0 @@
using Sphagnum.Common.Old.Contracts.Login;
using Sphagnum.Common.Old.Utils;
using System;
namespace Sphagnum.Common.Old.Contracts.Messaging.Messages
{
internal readonly ref struct AuthMessage
{
public readonly ReadOnlyMemory<byte> Payload;
public readonly Guid MessageId;
public AuthMessage(string login, string pwd, UserRights userRights)
{
MessageId = Guid.NewGuid();
var data = new byte[Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes + 2];
HashCalculator.Calc(login).CopyTo(data, 0);
HashCalculator.Calc(pwd).CopyTo(data, Constants.HashedUserDataSizeInfBytes);
BitConverter.TryWriteBytes(data.AsSpan(Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes), (ushort)userRights);
Payload = data;
}
}
}

View File

@ -1,9 +0,0 @@
using System;
namespace Sphagnum.Common.Old.Contracts.Messaging.Messages
{
internal readonly ref struct AuthResultMessage
{
public readonly ReadOnlyMemory<byte> Payload;
}
}

View File

@ -1,8 +0,0 @@
using System;
namespace Sphagnum.Common.Old.Exceptions
{
public class AuthException : Exception
{
}
}

View File

@ -1,135 +0,0 @@
using Sphagnum.Common.Infrastructure.Contracts;
using Sphagnum.Common.Messaging.Contracts;
using Sphagnum.Common.Messaging.Utils;
using Sphagnum.Common.Old.Exceptions;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
namespace Sphagnum.Common.Old.Services
{
internal class SphagnumConnectionOld
{
protected readonly IConnection _connection;
private readonly Func<Func<byte[], Task>> _messagesProcessorFactory;
private readonly Func<byte[], Task> _messagesProcessor;
public SphagnumConnectionOld(Func<IConnection> connectionsFactory, Func<Func<byte[], Task>> messagesProcessorFactory)
{
_connection = connectionsFactory(); // new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
_messagesProcessorFactory = messagesProcessorFactory;
_messagesProcessor = _messagesProcessorFactory();
RecievingTask();
}
private SphagnumConnectionOld(IConnection socket, Func<Func<byte[], Task>> messagesProcessorFactory)
{
_connection = socket;
_messagesProcessorFactory = messagesProcessorFactory;
_messagesProcessor = _messagesProcessorFactory();
}
public bool Connected => _connection.Connected;
public Task ConnectAsync(string host, int port)
{
return _connection.ConnectAsync(host, port);
}
public async virtual Task<SphagnumConnectionOld> AcceptAsync()
{
var socket = await _connection.AcceptAsync();
return new SphagnumConnectionOld(socket, _messagesProcessorFactory);
}
public void Bind(int port)
{
_connection.Bind(new IPEndPoint(IPAddress.Any, port));
}
public void Close()
{
_connection.Close();
}
public void Dispose()
{
_connection.Dispose();
}
public void Listen(int backlog)
{
_connection.Listen(backlog);
}
public async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
int res;
//if (buffer.Span[4] != (byte)MessageType.MessageAccepted)
//{
// var channel = _pool.Get();
// sendingItems.TryAdd(MessageParser.GetMessageId(buffer), channel);
// res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
// var message = await channel.Reader.WaitToReadAsync(cancellationToken);// todo обработка сообщения
// _pool.Return(channel);
// return res;
//}
//else
//{
// res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
//}
return await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken); ;
}
private async ValueTask<byte[]> ReceiveAsync(CancellationToken cancellationToken = default)
{
var lengthBuffer = new byte[4];
await _connection.ReceiveAsync(lengthBuffer, SocketFlags.Peek, cancellationToken);
var length = BitConverter.ToInt32(lengthBuffer, 0);
var result = new byte[length];
await _connection.ReceiveAsync(result, SocketFlags.None, cancellationToken);
return result;
}
private async Task RecievingTask(CancellationToken token = default)
{
var successRespBuffer = new byte[23];
successRespBuffer[4] = (byte)MessageType.MessageAccepted;
successRespBuffer[0] = 23;
while (Connected)
{
try
{
var message = await ReceiveAsync(token);
if (message[4] != (byte)MessageType.MessageAccepted)
{
try
{
await _messagesProcessor(message);
MessageParserold.CopyId(message, successRespBuffer);
await SendAsync(successRespBuffer, token);
}
catch (AuthException ex)
{
await SendAsync(successRespBuffer, token);
await Task.Delay(1000);
_connection.Close();
}
catch (Exception ex)
{
}
}
}
catch (Exception ex)
{
}
}
_connection.Close();
_connection.Dispose();
}
}
}

View File

@ -1,14 +0,0 @@
using System.Security.Cryptography;
using System.Text;
namespace Sphagnum.Common.Old.Utils
{
internal static class HashCalculator
{
private readonly static SHA256 _hash = SHA256.Create();
public static byte[] Calc(string text)
{
return _hash.ComputeHash(Encoding.UTF8.GetBytes(text));
}
}
}