промежуточная фиксация рефакторинга
parent
1ba051bb10
commit
fba480eaae
|
@ -1,6 +1,6 @@
|
|||
using Microsoft.AspNetCore.Mvc;
|
||||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
|
||||
namespace Sphagnum.DebugClient.Controllers
|
||||
{
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
using Sphagnum.Client;
|
||||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
using Sphagnum.Server;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
namespace Sphagnum.Client
|
||||
{
|
||||
//internal class ClientConnection : SocketConnection
|
||||
//{
|
||||
// public ClientConnection() : base( async (dd) =>
|
||||
// {
|
||||
|
||||
// })
|
||||
// {
|
||||
|
||||
// }
|
||||
//}
|
||||
}
|
|
@ -1,8 +1,7 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Services;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
using Sphagnum.Common.Messaging.Utils;
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
|
@ -10,38 +9,32 @@ using System.Threading.Tasks;
|
|||
|
||||
namespace Sphagnum.Client
|
||||
{
|
||||
public class ClientDefault : IMessagingClient, IDisposable
|
||||
public sealed class ClientDefault : IMessagingClient, IDisposable
|
||||
{
|
||||
private readonly SphagnumConnection _connection;
|
||||
private readonly ConnectionFactory _connectionFactory;
|
||||
private readonly IConnection _connection;
|
||||
private readonly Channel<byte[]> _commonMessagesChannel = Channel.CreateUnbounded<byte[]>();
|
||||
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
|
||||
public ClientDefault(ConnectionFactory factory)
|
||||
public ClientDefault(IConnectionFactory factory)
|
||||
{
|
||||
_connectionFactory = factory;
|
||||
_connection = factory.CreateDefaultConnected(() => async (mess) =>
|
||||
{
|
||||
await _commonMessagesChannel.Writer.WriteAsync(mess);
|
||||
}).Result;
|
||||
Auth().Wait();
|
||||
_connection = factory.CreateConnection().Result;
|
||||
}
|
||||
|
||||
private async Task<byte[]> ReceiveAsync()
|
||||
{
|
||||
return await _commonMessagesChannel.Reader.ReadAsync(_cts.Token);
|
||||
}
|
||||
//private async Task<byte[]> ReceiveAsync()
|
||||
//{
|
||||
// return await _commonMessagesChannel.Reader.ReadAsync(_cts.Token);
|
||||
//}
|
||||
|
||||
private async Task Auth()
|
||||
{
|
||||
await _connection.SendAsync(MessageParser.PackMessage(new AuthMessage(_connectionFactory.Login, _connectionFactory.Password, _connectionFactory.UserRights)));
|
||||
var response = await ReceiveAsync();
|
||||
var messageType = MessageParser.GetMessageType(response);
|
||||
if (messageType == Common.Utils.Models.MessageType.AuthSuccessfull)
|
||||
{
|
||||
return;
|
||||
}
|
||||
throw new Exception("Auth failed!");
|
||||
}
|
||||
//private async Task Auth()
|
||||
//{
|
||||
// await _connection.SendAsync(MessageParser.PackMessage(new AuthMessage(_connectionFactory.Login, _connectionFactory.Password, _connectionFactory.UserRights)));
|
||||
// var response = await ReceiveAsync();
|
||||
// var messageType = MessageParser.GetMessageType(response);
|
||||
// if (messageType == MessageType.AuthSuccessfull)
|
||||
// {
|
||||
// return;
|
||||
// }
|
||||
// throw new Exception("Auth failed!");
|
||||
//}
|
||||
|
||||
public ValueTask Ack(Guid messageId)
|
||||
{
|
||||
|
@ -55,15 +48,15 @@ namespace Sphagnum.Client
|
|||
|
||||
public async ValueTask<Guid> Publish(OutgoingMessage message)
|
||||
{
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
await _connection.SendAsync(bytes);
|
||||
return MessageParser.GetMessageId(bytes);
|
||||
var bytes = MessageParserold.PackMessage(message);
|
||||
await _connection.SendAsync(bytes.AsMemory(), System.Net.Sockets.SocketFlags.None);
|
||||
return MessageParserold.GetMessageId(bytes);
|
||||
}
|
||||
|
||||
public async ValueTask<IncommingMessage> Consume(CancellationToken cancellationToken)
|
||||
{
|
||||
var result = await _commonMessagesChannel.Reader.ReadAsync(cancellationToken);
|
||||
return MessageParser.UnpackIncomingMessage(result);
|
||||
return MessageParserold.UnpackIncomingMessage(result);
|
||||
}
|
||||
|
||||
public ValueTask Reject(Guid messageId)
|
||||
|
|
|
@ -1,14 +0,0 @@
|
|||
namespace Sphagnum.Common.Contracts.Administration.Enums
|
||||
{
|
||||
public enum ExchangeType : byte
|
||||
{
|
||||
/// <summary>
|
||||
/// Раздает сообщения во все топики с подходящим ключём маршрутизации.
|
||||
/// </summary>
|
||||
Broadcast,
|
||||
/// <summary>
|
||||
/// Отправляет сообщение в одну из очередей с подходящим ключём маршрутизации.
|
||||
/// </summary>
|
||||
Topic,
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
namespace Sphagnum.Common.Contracts.Administration.Enums
|
||||
{
|
||||
public enum TopicType
|
||||
{
|
||||
Queue,
|
||||
Stack,
|
||||
}
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Administration.Requests;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Administration
|
||||
{
|
||||
public interface IAdministrationClient
|
||||
{
|
||||
public ValueTask CreateExchange(ExchangeCreationRequest exchangeCreationRequest);
|
||||
public ValueTask CreateTopic(TopicCreationRequest topicCreationRequest);
|
||||
public ValueTask DeleteTopic(string topicName);
|
||||
public ValueTask DeleteExchange(string exchangeName);
|
||||
public ValueTask BindTopic(TopicBindingRequest topicCreationRequest);
|
||||
public ValueTask UnbindTopic(TopicBindingRequest topicCreationRequest);
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Administration.Enums;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Administration.Requests
|
||||
{
|
||||
public readonly ref struct ExchangeCreationRequest
|
||||
{
|
||||
public readonly string ExchangeName;
|
||||
public readonly ExchangeType ExchangeType;
|
||||
}
|
||||
}
|
|
@ -1,11 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Messaging;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Administration.Requests
|
||||
{
|
||||
public readonly ref struct TopicBindingRequest
|
||||
{
|
||||
public readonly string TopicName;
|
||||
public readonly string ExchangeName;
|
||||
public readonly RoutingKey RoutingKey;
|
||||
}
|
||||
}
|
|
@ -1,10 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Administration.Enums;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Administration.Requests
|
||||
{
|
||||
public readonly ref struct TopicCreationRequest
|
||||
{
|
||||
public readonly string TopicName;
|
||||
public readonly TopicType TopicType;
|
||||
}
|
||||
}
|
|
@ -1,8 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Messaging;
|
||||
|
||||
namespace Sphagnum.Common.Contracts
|
||||
{
|
||||
public interface ISphagnumClient : IMessagingClient
|
||||
{
|
||||
}
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
using Sphagnum.Common.Services;
|
||||
using System;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Login
|
||||
{
|
||||
public class ConnectionFactory
|
||||
{
|
||||
public int Port { get; set; }
|
||||
public string Hostname { get; set; } = string.Empty;
|
||||
public string Login { get; set; } = string.Empty;
|
||||
public string Password { get; set; } = string.Empty;
|
||||
public UserRights UserRights { get; set; }
|
||||
|
||||
internal virtual async Task<SphagnumConnection> CreateDefaultConnected(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
var conn = new SphagnumConnection(() => new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp)), messagesProcessorFactory);
|
||||
await conn.ConnectAsync(Hostname, Port);
|
||||
return conn;
|
||||
}
|
||||
|
||||
internal virtual SphagnumConnection CreateDefault(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
return new SphagnumConnection(() => new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp)), messagesProcessorFactory);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,13 +0,0 @@
|
|||
namespace Sphagnum.Common.Contracts.Login
|
||||
{
|
||||
public class UserData
|
||||
{
|
||||
public string Username { get; set; } = string.Empty;
|
||||
public string Password { get; set; } = string.Empty;
|
||||
public int Port { get; set; }
|
||||
public string Hostname { get; set; } = string.Empty;
|
||||
public UserRights UserRights { get; set; } =
|
||||
UserRights.MessagesConsuming |
|
||||
UserRights.MessagesPublishing;
|
||||
}
|
||||
}
|
|
@ -4,8 +4,11 @@ using System.Net.Sockets;
|
|||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Infrastructure
|
||||
namespace Sphagnum.Common.Infrastructure.Contracts
|
||||
{
|
||||
/// <summary>
|
||||
/// Абстракция над подключением (сокет или Channel в случае работы клиента и сервра внутри одного процесса).
|
||||
/// </summary>
|
||||
internal interface IConnection : IDisposable
|
||||
{
|
||||
Task ConnectAsync(string host, int port);
|
|
@ -0,0 +1,15 @@
|
|||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Infrastructure.Contracts
|
||||
{
|
||||
public interface IConnectionFactory
|
||||
{
|
||||
public int Port { get; }
|
||||
public string Hostname { get; }
|
||||
public string Login { get; }
|
||||
public string Password { get; }
|
||||
public UserRights UserRights { get; set; }
|
||||
internal Task<IConnection> CreateConnection(bool connected = true);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Infrastructure.Services
|
||||
{
|
||||
public class ConnectionFactory : IConnectionFactory
|
||||
{
|
||||
public int Port { get; set; }
|
||||
public string Hostname { get; set; } = string.Empty;
|
||||
public string Login { get; set; } = string.Empty;
|
||||
public string Password { get; set; } = string.Empty;
|
||||
public UserRights UserRights { get; set; }
|
||||
async Task<IConnection> IConnectionFactory.CreateConnection(bool connected = true)
|
||||
{
|
||||
var conn = new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
|
||||
if (connected)
|
||||
{
|
||||
await conn.ConnectAsync(Hostname, Port);
|
||||
}
|
||||
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,11 +1,11 @@
|
|||
using Sphagnum.Common.Contracts.Infrastructure;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using System;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Services
|
||||
namespace Sphagnum.Common.Infrastructure.Services
|
||||
{
|
||||
internal class SocketConnection : IConnection
|
||||
{
|
|
@ -1,9 +1,9 @@
|
|||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging
|
||||
namespace Sphagnum.Common.Messaging.Contracts
|
||||
{
|
||||
public interface IMessagingClient
|
||||
{
|
|
@ -1,6 +1,6 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Utils.Models
|
||||
namespace Sphagnum.Common.Messaging.Contracts
|
||||
{
|
||||
[Flags]
|
||||
internal enum MessageFlags : ushort
|
|
@ -1,4 +1,4 @@
|
|||
namespace Sphagnum.Common.Utils.Models
|
||||
namespace Sphagnum.Common.Messaging.Contracts
|
||||
{
|
||||
internal enum MessageType : byte
|
||||
{
|
|
@ -1,6 +1,6 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging.Messages
|
||||
namespace Sphagnum.Common.Messaging.Contracts.Messages
|
||||
{
|
||||
public readonly struct IncommingMessage
|
||||
{
|
|
@ -0,0 +1,32 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Messaging.Contracts.Messages
|
||||
{
|
||||
public readonly struct Message
|
||||
{
|
||||
public readonly Guid MessageId;
|
||||
|
||||
public readonly ReadOnlyMemory<byte> Payload;
|
||||
|
||||
public readonly string Exchange;
|
||||
|
||||
public readonly RoutingKey RoutingKey;
|
||||
|
||||
public Message(string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
Exchange = exchange;
|
||||
RoutingKey = routingKey;
|
||||
Payload = payload;
|
||||
RoutingKey = routingKey;
|
||||
MessageId = Guid.NewGuid();
|
||||
}
|
||||
|
||||
internal Message(Guid messageId, string exchange, RoutingKey routingKey, ReadOnlyMemory<byte> payload)
|
||||
{
|
||||
Exchange = exchange;
|
||||
RoutingKey = routingKey;
|
||||
Payload = payload;
|
||||
MessageId = messageId;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging.Messages
|
||||
namespace Sphagnum.Common.Messaging.Contracts.Messages
|
||||
{
|
||||
public readonly struct OutgoingMessage
|
||||
{
|
|
@ -1,4 +1,4 @@
|
|||
namespace Sphagnum.Common.Contracts.Messaging
|
||||
namespace Sphagnum.Common.Messaging.Contracts
|
||||
{
|
||||
public readonly struct RoutingKey
|
||||
{
|
||||
|
@ -14,5 +14,7 @@
|
|||
Part2 = part2;
|
||||
Part3 = part3;
|
||||
}
|
||||
|
||||
public readonly static RoutingKey Empty = new RoutingKey();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using System;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Messaging.Services
|
||||
{
|
||||
internal class SphagnumConnection
|
||||
{
|
||||
private readonly IConnection _connection;
|
||||
public bool Connected => throw new NotImplementedException();
|
||||
|
||||
public void Close()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public Task ConnectAsync(string host, int port)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
|
||||
public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,196 @@
|
|||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
using System;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
|
||||
namespace Sphagnum.Common.Messaging.Utils
|
||||
{
|
||||
/// <summary>
|
||||
/// Порядок передачи:
|
||||
/// 1. MessageSize, 4 байта
|
||||
/// 2. MessageType, 1 байт
|
||||
/// 3. MessageFlags, 2 байта
|
||||
/// 4. Id сообщения, если есть, 16 байт
|
||||
/// 5. ExchangeNameLength, если есть, 1 байт
|
||||
/// 6. ExchangeName, если есть, ExchangeNameLength байт, Utf8
|
||||
/// 7. RoutingKey, если есть, 3 байта
|
||||
/// 8. PayloadSize, если есть - 4 байта
|
||||
/// 9. Payload, если есть, PayloadSize байт
|
||||
/// </summary>
|
||||
internal static class MessageParser
|
||||
{
|
||||
public static Message UnpackMessage(byte[] bytes)
|
||||
{
|
||||
if ((MessageType)bytes[4] != MessageType.Common)
|
||||
{
|
||||
throw new ArgumentException("Uncorrect message type! 1 (MessageType.Common) expected!");
|
||||
}
|
||||
var exchangeName = GetExchangeName(bytes);
|
||||
var routingKey = GetRoutingKey(bytes);
|
||||
var payload = GetPayload(bytes);
|
||||
var id = GetMessageId(bytes);
|
||||
return new Message(id, exchangeName, routingKey, payload);
|
||||
}
|
||||
public static byte[] PackMessage(Message message)
|
||||
{
|
||||
if (string.IsNullOrEmpty(message.Exchange) || string.IsNullOrWhiteSpace(message.Exchange))
|
||||
{
|
||||
throw new ArgumentException("Bad exchange name!");
|
||||
}
|
||||
else if (Encoding.UTF8.GetByteCount(message.Exchange) > 255)
|
||||
{
|
||||
throw new ArgumentException("Exchange name in UTF8 encoding must allocate < 256 bytes!");
|
||||
}
|
||||
|
||||
var flags = MessageFlags.HasExchange;
|
||||
int count = 23;
|
||||
if (message.Payload.Length > 0)
|
||||
{
|
||||
flags |= MessageFlags.HasPayload;
|
||||
count += message.Payload.Length;
|
||||
count += 4;
|
||||
}
|
||||
if (!message.RoutingKey.IsEmpry)
|
||||
{
|
||||
flags |= MessageFlags.HasRoutingKey;
|
||||
count += 3;
|
||||
}
|
||||
|
||||
var exchangeNameBytes = Encoding.UTF8.GetBytes(message.Exchange);// todo перевести на более оптимальный метод, не аллоцирующий лишнего.
|
||||
count += exchangeNameBytes.Length;
|
||||
count++;
|
||||
return Pack(message, flags, count);
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static Guid GetMessageId(byte[] bytes)
|
||||
{
|
||||
var slice = bytes.AsSpan(7, 16);
|
||||
return new Guid(slice);
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static byte[] Pack(Message message, MessageFlags flags, int count)
|
||||
{
|
||||
var result = new byte[count];
|
||||
result[4] = (byte)MessageType.Common;
|
||||
var shift = 5;
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), (ushort)flags);//2. flags
|
||||
shift += 2;
|
||||
message.MessageId.TryWriteBytes(result.AsSpan(shift));//3. id
|
||||
shift += 16;
|
||||
if ((flags & MessageFlags.HasExchange) == MessageFlags.HasExchange)
|
||||
{
|
||||
var exchangeBytes = Encoding.UTF8.GetBytes(message.Exchange);
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), (byte)message.Exchange.Length);//4. ExchangeNameLength
|
||||
shift += 1;
|
||||
exchangeBytes.CopyTo(result.AsSpan(shift));//5. ExchangeName
|
||||
shift += exchangeBytes.Length;
|
||||
}
|
||||
if ((flags & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey)//6. RoutingKey
|
||||
{
|
||||
result[shift] = message.RoutingKey.Part1;
|
||||
shift++;
|
||||
result[shift] = message.RoutingKey.Part2;
|
||||
shift++;
|
||||
result[shift] = message.RoutingKey.Part3;
|
||||
shift++;
|
||||
}
|
||||
if ((flags & MessageFlags.HasPayload) == MessageFlags.HasPayload)
|
||||
{
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), message.Payload.Length);//7. PayloadSize
|
||||
shift += 4;
|
||||
message.Payload.CopyTo(result.AsMemory(shift));//8. Payload
|
||||
}
|
||||
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
|
||||
return result;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static byte[] GetPayload(Span<byte> bytes)
|
||||
{
|
||||
var result = Array.Empty<byte>();
|
||||
if (HasPayload(bytes))
|
||||
{
|
||||
var shift = 23;
|
||||
if (HasExchange(bytes))//todo проверить бенчмарком, как работает инлайн
|
||||
{
|
||||
shift += bytes[23];
|
||||
shift++;
|
||||
}
|
||||
if (HasKey(bytes))
|
||||
{
|
||||
shift += 3;
|
||||
}
|
||||
|
||||
var payloadSize = BitConverter.ToInt32(bytes[shift..]);
|
||||
if (payloadSize > 0)
|
||||
{
|
||||
result = bytes.Slice(shift + 4, payloadSize).ToArray();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static MessageType GetMessageType(Span<byte> bytes)
|
||||
{
|
||||
return (MessageType)bytes[4];
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool HasKey(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return ((MessageFlags)value & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool HasPayload(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return ((MessageFlags)value & MessageFlags.HasPayload) == MessageFlags.HasPayload;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool HasExchange(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return ((MessageFlags)value & MessageFlags.HasExchange) == MessageFlags.HasExchange;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static string GetExchangeName(Span<byte> bytes)
|
||||
{
|
||||
var hasExchange = HasExchange(bytes);
|
||||
if (!hasExchange)
|
||||
{
|
||||
throw new ArgumentException("bytes must contains exchange name!");
|
||||
}
|
||||
return Encoding.UTF8.GetString(bytes.Slice(24, bytes[23]));
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static RoutingKey GetRoutingKey(Span<byte> bytes)
|
||||
{
|
||||
var length = bytes[23];
|
||||
RoutingKey key;
|
||||
if (HasKey(bytes))
|
||||
{
|
||||
var routingKeyShift = 23 + length + 1;
|
||||
|
||||
var routingKeyPart1 = bytes[routingKeyShift];
|
||||
var routingKeyPart2 = bytes[routingKeyShift + 1];
|
||||
var routingKeyPart3 = bytes[routingKeyShift + 2];
|
||||
key = new RoutingKey(routingKeyPart1, routingKeyPart2, routingKeyPart3);
|
||||
}
|
||||
else
|
||||
{
|
||||
key = new RoutingKey();
|
||||
}
|
||||
return key;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,11 +1,11 @@
|
|||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Utils.Models;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
using Sphagnum.Common.Old.Contracts.Messaging.Messages;
|
||||
using System;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
|
||||
namespace Sphagnum.Common.Utils
|
||||
namespace Sphagnum.Common.Messaging.Utils
|
||||
{
|
||||
/// <summary>
|
||||
/// Порядок передачи:
|
||||
|
@ -19,7 +19,7 @@ namespace Sphagnum.Common.Utils
|
|||
/// 8. PayloadSize, если есть - 4 байта
|
||||
/// 9. Payload, если есть, PayloadSize байт
|
||||
/// </summary>
|
||||
internal static class MessageParser
|
||||
internal static class MessageParserold
|
||||
{
|
||||
public static OutgoingMessage UnpackOutgoingMessage(byte[] bytes)
|
||||
{
|
||||
|
@ -57,6 +57,19 @@ namespace Sphagnum.Common.Utils
|
|||
return result;
|
||||
}
|
||||
|
||||
public static byte[] PackMessage(Message message)
|
||||
{
|
||||
var result = new byte[27 + message.Payload.Length];
|
||||
result[4] = (byte)MessageType.Common;
|
||||
var flags = MessageFlags.HasPayload;
|
||||
BitConverter.TryWriteBytes(result.AsSpan(5, 2), (ushort)flags);
|
||||
message.MessageId.TryWriteBytes(result.AsSpan(7));
|
||||
BitConverter.TryWriteBytes(result.AsSpan(23), message.Payload.Length);
|
||||
message.Payload.CopyTo(result.AsMemory(27));
|
||||
BitConverter.TryWriteBytes(result.AsSpan(0, 4), result.Length);
|
||||
return result;
|
||||
}
|
||||
|
||||
public static byte[] PackReplyMessage(MessageType messageType)
|
||||
{
|
||||
var res = new byte[23];
|
||||
|
@ -211,7 +224,7 @@ namespace Sphagnum.Common.Utils
|
|||
if ((flags & MessageFlags.HasExchange) == MessageFlags.HasExchange)
|
||||
{
|
||||
var exchangeBytes = Encoding.UTF8.GetBytes(message.Exchange);
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), (byte)(message.Exchange.Length));//4. ExchangeNameLength
|
||||
BitConverter.TryWriteBytes(result.AsSpan(shift), (byte)message.Exchange.Length);//4. ExchangeNameLength
|
||||
shift += 1;
|
||||
exchangeBytes.CopyTo(result.AsSpan(shift));//5. ExchangeName
|
||||
shift += exchangeBytes.Length;
|
||||
|
@ -239,21 +252,21 @@ namespace Sphagnum.Common.Utils
|
|||
private static bool HasKey(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return (((MessageFlags)value & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey);
|
||||
return ((MessageFlags)value & MessageFlags.HasRoutingKey) == MessageFlags.HasRoutingKey;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool HasPayload(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return (((MessageFlags)value & MessageFlags.HasPayload) == MessageFlags.HasPayload);
|
||||
return ((MessageFlags)value & MessageFlags.HasPayload) == MessageFlags.HasPayload;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
private static bool HasExchange(Span<byte> bytes)
|
||||
{
|
||||
var value = BitConverter.ToUInt16(bytes.Slice(5, 2));
|
||||
return (((MessageFlags)value & MessageFlags.HasExchange) == MessageFlags.HasExchange);
|
||||
return ((MessageFlags)value & MessageFlags.HasExchange) == MessageFlags.HasExchange;
|
||||
}
|
||||
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
|
@ -1,4 +1,4 @@
|
|||
namespace Sphagnum.Common
|
||||
namespace Sphagnum.Common.Old.Contracts
|
||||
{
|
||||
internal static class Constants
|
||||
{
|
|
@ -0,0 +1,23 @@
|
|||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Infrastructure.Services;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Old.Contracts.Login
|
||||
{
|
||||
public class ConnectionFactory
|
||||
{
|
||||
public int Port { get; set; }
|
||||
public string Hostname { get; set; } = string.Empty;
|
||||
public string Login { get; set; } = string.Empty;
|
||||
public string Password { get; set; } = string.Empty;
|
||||
public UserRights UserRights { get; set; }
|
||||
|
||||
internal virtual async Task<IConnection> CreateDefaultConnected()
|
||||
{
|
||||
var conn = new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
|
||||
await conn.ConnectAsync(Hostname, Port);
|
||||
return conn;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Login
|
||||
namespace Sphagnum.Common.Old.Contracts.Login
|
||||
{
|
||||
[Flags]
|
||||
public enum UserRights : ushort
|
|
@ -1,8 +1,8 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
using Sphagnum.Common.Old.Utils;
|
||||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging.Messages
|
||||
namespace Sphagnum.Common.Old.Contracts.Messaging.Messages
|
||||
{
|
||||
internal readonly ref struct AuthMessage
|
||||
{
|
|
@ -1,6 +1,6 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Contracts.Messaging.Messages
|
||||
namespace Sphagnum.Common.Old.Contracts.Messaging.Messages
|
||||
{
|
||||
internal readonly ref struct AuthResultMessage
|
||||
{
|
|
@ -1,6 +1,6 @@
|
|||
using System;
|
||||
|
||||
namespace Sphagnum.Common.Exceptions
|
||||
namespace Sphagnum.Common.Old.Exceptions
|
||||
{
|
||||
public class AuthException : Exception
|
||||
{
|
|
@ -1,26 +1,22 @@
|
|||
using Sphagnum.Common.Contracts.Infrastructure;
|
||||
using Sphagnum.Common.Exceptions;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Utils.Models;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Utils;
|
||||
using Sphagnum.Common.Old.Exceptions;
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Sphagnum.Common.Services
|
||||
namespace Sphagnum.Common.Old.Services
|
||||
{
|
||||
internal class SphagnumConnection
|
||||
internal class SphagnumConnectionOld
|
||||
{
|
||||
protected readonly IConnection _connection;
|
||||
private readonly ChannelsPool _pool = new ChannelsPool(100);
|
||||
private readonly ConcurrentDictionary<Guid, Channel<byte[]>> sendingItems = new ConcurrentDictionary<Guid, Channel<byte[]>>();
|
||||
private readonly Func<Func<byte[], Task>> _messagesProcessorFactory;
|
||||
private readonly Func<byte[], Task> _messagesProcessor;
|
||||
|
||||
public SphagnumConnection(Func<IConnection> connectionsFactory, Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
public SphagnumConnectionOld(Func<IConnection> connectionsFactory, Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
_connection = connectionsFactory(); // new SocketConnection(new Socket(SocketType.Stream, ProtocolType.Tcp));
|
||||
_messagesProcessorFactory = messagesProcessorFactory;
|
||||
|
@ -28,7 +24,7 @@ namespace Sphagnum.Common.Services
|
|||
RecievingTask();
|
||||
}
|
||||
|
||||
private SphagnumConnection(IConnection socket, Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
private SphagnumConnectionOld(IConnection socket, Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
_connection = socket;
|
||||
_messagesProcessorFactory = messagesProcessorFactory;
|
||||
|
@ -42,10 +38,10 @@ namespace Sphagnum.Common.Services
|
|||
return _connection.ConnectAsync(host, port);
|
||||
}
|
||||
|
||||
public async virtual Task<SphagnumConnection> AcceptAsync()
|
||||
public async virtual Task<SphagnumConnectionOld> AcceptAsync()
|
||||
{
|
||||
var socket = await _connection.AcceptAsync();
|
||||
return new SphagnumConnection(socket, _messagesProcessorFactory);
|
||||
return new SphagnumConnectionOld(socket, _messagesProcessorFactory);
|
||||
}
|
||||
|
||||
public void Bind(int port)
|
||||
|
@ -71,20 +67,20 @@ namespace Sphagnum.Common.Services
|
|||
public async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
int res;
|
||||
if (buffer.Span[4] != (byte)MessageType.MessageAccepted)
|
||||
{
|
||||
var channel = _pool.Get();
|
||||
sendingItems.TryAdd(MessageParser.GetMessageId(buffer), channel);
|
||||
res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
|
||||
var message = await channel.Reader.WaitToReadAsync(cancellationToken);// todo обработка сообщения
|
||||
_pool.Return(channel);
|
||||
return res;
|
||||
}
|
||||
else
|
||||
{
|
||||
res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
|
||||
}
|
||||
return res;
|
||||
//if (buffer.Span[4] != (byte)MessageType.MessageAccepted)
|
||||
//{
|
||||
// var channel = _pool.Get();
|
||||
// sendingItems.TryAdd(MessageParser.GetMessageId(buffer), channel);
|
||||
// res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
|
||||
// var message = await channel.Reader.WaitToReadAsync(cancellationToken);// todo обработка сообщения
|
||||
// _pool.Return(channel);
|
||||
// return res;
|
||||
//}
|
||||
//else
|
||||
//{
|
||||
// res = await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken);
|
||||
//}
|
||||
return await _connection.SendAsync(buffer, SocketFlags.None, cancellationToken); ;
|
||||
}
|
||||
|
||||
private async ValueTask<byte[]> ReceiveAsync(CancellationToken cancellationToken = default)
|
||||
|
@ -112,7 +108,7 @@ namespace Sphagnum.Common.Services
|
|||
try
|
||||
{
|
||||
await _messagesProcessor(message);
|
||||
MessageParser.CopyId(message, successRespBuffer);
|
||||
MessageParserold.CopyId(message, successRespBuffer);
|
||||
await SendAsync(successRespBuffer, token);
|
||||
}
|
||||
catch (AuthException ex)
|
||||
|
@ -126,10 +122,6 @@ namespace Sphagnum.Common.Services
|
|||
|
||||
}
|
||||
}
|
||||
else if (sendingItems.TryRemove(MessageParser.GetMessageId(message), out var channel))
|
||||
{
|
||||
await channel.Writer.WriteAsync(message);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
|
@ -1,7 +1,7 @@
|
|||
using System.Security.Cryptography;
|
||||
using System.Text;
|
||||
|
||||
namespace Sphagnum.Common.Utils
|
||||
namespace Sphagnum.Common.Old.Utils
|
||||
{
|
||||
internal static class HashCalculator
|
||||
{
|
|
@ -1,36 +0,0 @@
|
|||
using System.Collections.Concurrent;
|
||||
using System.Threading.Channels;
|
||||
|
||||
namespace Sphagnum.Common.Services
|
||||
{
|
||||
internal class ChannelsPool
|
||||
{
|
||||
private readonly int _poolSize;
|
||||
public ChannelsPool(int poolSize)
|
||||
{
|
||||
_poolSize = poolSize;
|
||||
}
|
||||
|
||||
private readonly ConcurrentQueue<Channel<byte[]>> channels = new ConcurrentQueue<Channel<byte[]>>();
|
||||
|
||||
public Channel<byte[]> Get()
|
||||
{
|
||||
if (channels.TryDequeue(out var channel))
|
||||
{
|
||||
return channel;
|
||||
}
|
||||
else
|
||||
{
|
||||
return Channel.CreateBounded<byte[]>(1);
|
||||
}
|
||||
}
|
||||
|
||||
public void Return(Channel<byte[]> channel)
|
||||
{
|
||||
if (channels.Count < _poolSize)
|
||||
{
|
||||
channels.Enqueue(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -6,7 +6,7 @@
|
|||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
|
||||
<Folder Include="Old\Utils\Enums\" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Services;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
using Sphagnum.Common.Old.Services;
|
||||
using Sphagnum.Server.Cluster.Contracts;
|
||||
using Sphagnum.Server.Cluster.Services;
|
||||
using Sphagnum.Server.DataProcessing.Contracts;
|
||||
|
@ -13,7 +13,7 @@ namespace Sphagnum.Server.Broker.Services
|
|||
{
|
||||
internal class BrokerDefaultBase(ConnectionFactory connectionFactory, IMessagesStorage messagesStorage, IDistributor distributor, IDataProcessor dataProcessor)
|
||||
{
|
||||
private readonly SphagnumConnection _connection;
|
||||
private readonly SphagnumConnectionOld _connection;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
private Task? _acceptationTask;
|
||||
|
||||
|
@ -25,11 +25,6 @@ namespace Sphagnum.Server.Broker.Services
|
|||
|
||||
public Task StartAsync(int port)
|
||||
{
|
||||
_connectionFactory.CreateDefault(() =>
|
||||
{
|
||||
var processor = new MessagesProcessor(_authInfoStorage, _messagesStorage, _distributor, _dataProcessor);
|
||||
return processor.ProcessMessage;
|
||||
});
|
||||
_connection?.Bind(port);
|
||||
_connection?.Listen(1000); //todo разобраться что делает этот параметр.
|
||||
//_acceptationTask = AcceptationWorker(_cts.Token);
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
using Microsoft.Extensions.Hosting;
|
||||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
using Sphagnum.Server.Broker.Services;
|
||||
|
||||
namespace Sphagnum.Server
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
namespace Sphagnum.Server.Broker.Services
|
||||
{
|
||||
internal class ConnectionsManager
|
||||
{
|
||||
}
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
using Sphagnum.Common;
|
||||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Exceptions;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Utils.Models;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Utils;
|
||||
using Sphagnum.Common.Old.Contracts;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
using Sphagnum.Common.Old.Exceptions;
|
||||
using Sphagnum.Server.Cluster.Contracts;
|
||||
using Sphagnum.Server.DataProcessing.Contracts;
|
||||
using Sphagnum.Server.Storage.Messages.Contracts;
|
||||
|
@ -49,10 +49,10 @@ namespace Sphagnum.Server.Broker.Services
|
|||
|
||||
private async ValueTask<bool> CheckRights(byte[] buffer)
|
||||
{
|
||||
var messageType = MessageParser.GetMessageType(buffer);
|
||||
var messageType = MessageParserold.GetMessageType(buffer);
|
||||
if (messageType == MessageType.Auth)
|
||||
{
|
||||
var payloadStart = MessageParser.GetPayloadStart(buffer);
|
||||
var payloadStart = MessageParserold.GetPayloadStart(buffer);
|
||||
var rights = (UserRights)BitConverter.ToInt16(buffer.AsSpan(Constants.HashedUserDataSizeInfBytes + Constants.HashedUserDataSizeInfBytes + payloadStart, 2));
|
||||
var isRecievingAllowed = await _authInfoStorage.CheckRights(
|
||||
buffer.AsSpan(payloadStart, Constants.HashedUserDataSizeInfBytes),
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
|
||||
namespace Sphagnum.Server.Storage.Users.Contracts
|
||||
{
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Old.Contracts.Login;
|
||||
using Sphagnum.Common.Old.Utils;
|
||||
using Sphagnum.Server.Storage.Users.Contracts;
|
||||
using System.Numerics;
|
||||
|
||||
|
|
|
@ -1,9 +1,22 @@
|
|||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests.Comparers
|
||||
{
|
||||
internal static class MessagesComparer
|
||||
{
|
||||
public static bool Compare(Message message1, Message message2)
|
||||
{
|
||||
var res = true;
|
||||
res &= message1.Exchange == message2.Exchange;
|
||||
res &= message1.RoutingKey.Part1 == message2.RoutingKey.Part1;
|
||||
res &= message1.RoutingKey.Part2 == message2.RoutingKey.Part2;
|
||||
res &= message1.RoutingKey.Part3 == message2.RoutingKey.Part3;
|
||||
res &= message1.Payload.Length == message2.Payload.Length;
|
||||
res &= message1.MessageId == message2.MessageId;
|
||||
res &= ComparePayloads(message1, message2);
|
||||
return res;
|
||||
}
|
||||
|
||||
public static bool Compare(OutgoingMessage message1, OutgoingMessage message2)
|
||||
{
|
||||
var res = true;
|
||||
|
@ -31,6 +44,14 @@ namespace Sphagnum.Common.UnitTests.Comparers
|
|||
return ComparePayloads(payload1, payload2);
|
||||
}
|
||||
|
||||
public static bool ComparePayloads(Message message1, Message message2)
|
||||
{
|
||||
var payload1 = message1.Payload.ToArray();
|
||||
var payload2 = message2.Payload.ToArray();
|
||||
return ComparePayloads(payload1, payload2);
|
||||
}
|
||||
|
||||
|
||||
public static bool ComparePayloads(byte[] payload1, byte[] payload2)
|
||||
{
|
||||
var res = true;
|
||||
|
|
|
@ -1,23 +1,24 @@
|
|||
using Sphagnum.Common.Contracts.Messaging;
|
||||
using Sphagnum.Common.Contracts.Messaging.Messages;
|
||||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Contracts.Messages;
|
||||
using System.Security.Cryptography;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests.DataGenerators
|
||||
{
|
||||
internal static class MessagesGenerator
|
||||
{
|
||||
public static OutgoingMessage GetRandomOutgoingMessage(bool emptyKey = false, bool emptyPayload = false)
|
||||
public static Message GetRandomMessage(Guid? messageId = null, bool emptyKey = false, bool emptyPayload = false)
|
||||
{
|
||||
var exchangeName = RandomNumberGenerator.GetInt32(10000000).ToString();
|
||||
var payload = !emptyPayload ? RandomNumberGenerator.GetBytes(RandomNumberGenerator.GetInt32(0, 1000)) : [];
|
||||
var routingKeysBytes = RandomNumberGenerator.GetBytes(3);
|
||||
return new OutgoingMessage(exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
|
||||
}
|
||||
|
||||
public static IncommingMessage GetRandoIncommingMessage(bool emptyPayload = false)
|
||||
{
|
||||
var payload = !emptyPayload ? RandomNumberGenerator.GetBytes(RandomNumberGenerator.GetInt32(0, 1000)) : [];
|
||||
return new IncommingMessage(Guid.NewGuid(), payload);
|
||||
if (messageId.HasValue)
|
||||
{
|
||||
return new Message(messageId.Value, exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
return new Message(exchangeName, !emptyKey ? new RoutingKey(routingKeysBytes[0], routingKeysBytes[1], routingKeysBytes[2]) : new RoutingKey(0, 0, 0), payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
using Sphagnum.Common.Messaging.Contracts;
|
||||
using Sphagnum.Common.Messaging.Utils;
|
||||
using Sphagnum.Common.UnitTests.DataGenerators;
|
||||
using Sphagnum.Common.Utils;
|
||||
using Sphagnum.Common.Utils.Models;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests
|
||||
{
|
||||
|
@ -12,9 +12,12 @@ namespace Sphagnum.Common.UnitTests
|
|||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandoIncommingMessage();
|
||||
var message = MessagesGenerator.GetRandomMessage();
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackIncomingMessage(bytes);
|
||||
var message2 = MessageParser.UnpackMessage(bytes);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
|
||||
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
|
||||
count++;
|
||||
|
@ -27,31 +30,15 @@ namespace Sphagnum.Common.UnitTests
|
|||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandoIncommingMessage(true);
|
||||
var message = MessagesGenerator.GetRandomMessage(null, false, true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackIncomingMessage(bytes);
|
||||
var message2 = MessageParser.UnpackMessage(bytes);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
|
||||
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
|
||||
Assert.IsTrue(f1 == f2);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
Assert.IsTrue((MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2)) == MessageFlags.HasPayload);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessageGetMessageId_WithRoutingKeyAndPayload()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var id = Guid.NewGuid();
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage();
|
||||
var bytesForFlags = MessageParser.PackMessage(message);
|
||||
var flags = (MessageFlags)BitConverter.ToUInt16(bytesForFlags.AsSpan(5, 2));
|
||||
var bytes = MessageParser.Pack(message, id, flags, bytesForFlags.Length);
|
||||
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var id2 = MessageParser.GetMessageId(bytes);
|
||||
Assert.That(id, Is.EqualTo(id2));
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
@ -63,95 +50,14 @@ namespace Sphagnum.Common.UnitTests
|
|||
while (count < 100)
|
||||
{
|
||||
var id = Guid.NewGuid();
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage(true, true);
|
||||
var bytesForFlags = MessageParser.PackMessage(message);
|
||||
var flags = (MessageFlags)BitConverter.ToUInt16(bytesForFlags.AsSpan(5, 2));
|
||||
var bytes = MessageParser.Pack(message, id, flags, bytesForFlags.Length);
|
||||
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var id2 = MessageParser.GetMessageId(bytes);
|
||||
Assert.That(id, Is.EqualTo(id2));
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessage_WithRoutingKeyAndPayload()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage();
|
||||
var message = MessagesGenerator.GetRandomMessage(null, true, true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var message2 = MessageParser.UnpackMessage(bytes);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
var bytes3 = MessageParser.PackMessage(message2);
|
||||
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessage_WithRoutingKeyAndEmptyPayload()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage(false, true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
var f1 = (MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2));
|
||||
var f2 = (MessageFlags)BitConverter.ToUInt16(bytes2.AsSpan(5, 2));
|
||||
Assert.IsTrue(f1 == f2);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
var bytes3 = MessageParser.PackMessage(message2);
|
||||
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessage_WithEmptyRoutingKey()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage(true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
var bytes3 = MessageParser.PackMessage(message2);
|
||||
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
||||
[Test]
|
||||
public void PackUnpackOutgoingMessage_WithEmptyRoutingKeyAndEmptyPayload()
|
||||
{
|
||||
var count = 0;
|
||||
while (count < 100)
|
||||
{
|
||||
var message = MessagesGenerator.GetRandomOutgoingMessage(true, true);
|
||||
var bytes = MessageParser.PackMessage(message);
|
||||
var message2 = MessageParser.UnpackOutgoingMessage(bytes);
|
||||
Assert.That(Comparers.MessagesComparer.Compare(message, message2), Is.True);
|
||||
var bytes2 = MessageParser.PackMessage(message2);
|
||||
var message3 = MessageParser.UnpackOutgoingMessage(bytes2);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
var bytes3 = MessageParser.PackMessage(message2);
|
||||
var message4 = MessageParser.UnpackOutgoingMessage(bytes3);
|
||||
Assert.That(Comparers.MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
count++;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
namespace Sphagnum.Common.UnitTests
|
||||
namespace Sphagnum.Common.UnitTests.Old
|
||||
{
|
||||
public class ConnectionTests
|
||||
{
|
|
@ -0,0 +1,160 @@
|
|||
//using Sphagnum.Common.Messaging.Utils;
|
||||
//using Sphagnum.Common.Old.Utils.Enums;
|
||||
//using Sphagnum.Common.UnitTests.Comparers;
|
||||
//using Sphagnum.Common.UnitTests.DataGenerators;
|
||||
|
||||
//namespace Sphagnum.Common.UnitTests.Old
|
||||
//{
|
||||
// public class MessageParserTests
|
||||
// {
|
||||
// [Test]
|
||||
// public void PackUnpackIncomingMessage_WithPayload()
|
||||
// {
|
||||
// var count = 0;
|
||||
// while (count < 100)
|
||||
// {
|
||||
// var message = MessagesGenerator.GetRandomIncommingMessage();
|
||||
// var bytes = MessageParserold.PackMessage(message);
|
||||
// var message2 = MessageParserold.UnpackIncomingMessage(bytes);
|
||||
// Assert.That(MessagesComparer.Compare(message, message2), Is.True);
|
||||
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
|
||||
// [Test]
|
||||
// public void PackUnpackIncomingMessage_WithEmptyPayload()
|
||||
// {
|
||||
// var count = 0;
|
||||
// while (count < 100)
|
||||
// {
|
||||
// var message = MessagesGenerator.GetRandoIncommingMessage(true);
|
||||
// var bytes = MessageParserold.PackMessage(message);
|
||||
// var message2 = MessageParserold.UnpackIncomingMessage(bytes);
|
||||
// Assert.That(MessagesComparer.Compare(message, message2), Is.True);
|
||||
// Assert.IsTrue((MessageFlags)BitConverter.ToUInt16(bytes.AsSpan(5, 2)) == MessageFlags.HasPayload);
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
|
||||
// [Test]
|
||||
// public void PackUnpackOutgoingMessageGetMessageId_WithRoutingKeyAndPayload()
|
||||
// {
|
||||
// var count = 0;
|
||||
// while (count < 100)
|
||||
// {
|
||||
// var id = Guid.NewGuid();
|
||||
// var message = MessagesGenerator.GetRandomOutgoingMessage();
|
||||
// var bytesForFlags = MessageParserold.PackMessage(message);
|
||||
// var flags = (MessageFlags)BitConverter.ToUInt16(bytesForFlags.AsSpan(5, 2));
|
||||
// var bytes = MessageParserold.Pack(message, id, flags, bytesForFlags.Length);
|
||||
|
||||
// var message2 = MessageParserold.UnpackOutgoingMessage(bytes);
|
||||
// Assert.That(MessagesComparer.Compare(message, message2), Is.True);
|
||||
// var id2 = MessageParserold.GetMessageId(bytes);
|
||||
// Assert.That(id, Is.EqualTo(id2));
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
|
||||
// [Test]
|
||||
// public void PackUnpackOutgoingMessageGetMessageId_WithEmptyRoutingKeyAndEmptyPayload()
|
||||
// {
|
||||
// var count = 0;
|
||||
// while (count < 100)
|
||||
// {
|
||||
// var id = Guid.NewGuid();
|
||||
// var message = MessagesGenerator.GetRandomOutgoingMessage(true, true);
|
||||
// var bytesForFlags = MessageParserold.PackMessage(message);
|
||||
// var flags = (MessageFlags)BitConverter.ToUInt16(bytesForFlags.AsSpan(5, 2));
|
||||
// var bytes = MessageParserold.Pack(message, id, flags, bytesForFlags.Length);
|
||||
|
||||
// var message2 = MessageParserold.UnpackOutgoingMessage(bytes);
|
||||
// Assert.That(MessagesComparer.Compare(message, message2), Is.True);
|
||||
// var id2 = MessageParserold.GetMessageId(bytes);
|
||||
// Assert.That(id, Is.EqualTo(id2));
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
|
||||
// [Test]
|
||||
// public void PackUnpackOutgoingMessage_WithRoutingKeyAndPayload()
|
||||
// {
|
||||
// var count = 0;
|
||||
// while (count < 100)
|
||||
// {
|
||||
// var message = MessagesGenerator.GetRandomOutgoingMessage();
|
||||
// var bytes = MessageParserold.PackMessage(message);
|
||||
// var message2 = MessageParserold.UnpackOutgoingMessage(bytes);
|
||||
// Assert.That(MessagesComparer.Compare(message, message2), Is.True);
|
||||
// var bytes2 = MessageParserold.PackMessage(message2);
|
||||
// var message3 = MessageParserold.UnpackOutgoingMessage(bytes2);
|
||||
// Assert.That(MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
// var bytes3 = MessageParserold.PackMessage(message2);
|
||||
// var message4 = MessageParserold.UnpackOutgoingMessage(bytes3);
|
||||
// Assert.That(MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
|
||||
// [Test]
|
||||
// public void PackUnpackOutgoingMessage_WithRoutingKeyAndEmptyPayload()
|
||||
// {
|
||||
// var count = 0;
|
||||
// while (count < 100)
|
||||
// {
|
||||
// var message = MessagesGenerator.GetRandomOutgoingMessage(false, true);
|
||||
// var bytes = MessageParserold.PackMessage(message);
|
||||
// var message2 = MessageParserold.UnpackOutgoingMessage(bytes);
|
||||
// Assert.That(MessagesComparer.Compare(message, message2), Is.True);
|
||||
// var bytes2 = MessageParserold.PackMessage(message2);
|
||||
// var message3 = MessageParserold.UnpackOutgoingMessage(bytes2);
|
||||
// Assert.That(MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
// var bytes3 = MessageParserold.PackMessage(message2);
|
||||
// var message4 = MessageParserold.UnpackOutgoingMessage(bytes3);
|
||||
// Assert.That(MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
|
||||
// [Test]
|
||||
// public void PackUnpackOutgoingMessage_WithEmptyRoutingKey()
|
||||
// {
|
||||
// var count = 0;
|
||||
// while (count < 100)
|
||||
// {
|
||||
// var message = MessagesGenerator.GetRandomOutgoingMessage(true);
|
||||
// var bytes = MessageParserold.PackMessage(message);
|
||||
// var message2 = MessageParserold.UnpackOutgoingMessage(bytes);
|
||||
// Assert.That(MessagesComparer.Compare(message, message2), Is.True);
|
||||
// var bytes2 = MessageParserold.PackMessage(message2);
|
||||
// var message3 = MessageParserold.UnpackOutgoingMessage(bytes2);
|
||||
// Assert.That(MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
// var bytes3 = MessageParserold.PackMessage(message2);
|
||||
// var message4 = MessageParserold.UnpackOutgoingMessage(bytes3);
|
||||
// Assert.That(MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
|
||||
// [Test]
|
||||
// public void PackUnpackOutgoingMessage_WithEmptyRoutingKeyAndEmptyPayload()
|
||||
// {
|
||||
// var count = 0;
|
||||
// while (count < 100)
|
||||
// {
|
||||
// var message = MessagesGenerator.GetRandomOutgoingMessage(true, true);
|
||||
// var bytes = MessageParserold.PackMessage(message);
|
||||
// var message2 = MessageParserold.UnpackOutgoingMessage(bytes);
|
||||
// Assert.That(MessagesComparer.Compare(message, message2), Is.True);
|
||||
// var bytes2 = MessageParserold.PackMessage(message2);
|
||||
// var message3 = MessageParserold.UnpackOutgoingMessage(bytes2);
|
||||
// Assert.That(MessagesComparer.ComparePayloads(message2, message3), Is.True);
|
||||
// var bytes3 = MessageParserold.PackMessage(message2);
|
||||
// var message4 = MessageParserold.UnpackOutgoingMessage(bytes3);
|
||||
// Assert.That(MessagesComparer.ComparePayloads(message3, message4), Is.True);
|
||||
// count++;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
|
@ -1,9 +1,9 @@
|
|||
using Sphagnum.Common.Contracts.Infrastructure;
|
||||
using Sphagnum.Common.Infrastructure.Contracts;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests.Services
|
||||
namespace Sphagnum.Common.UnitTests.Old.Services
|
||||
{
|
||||
internal class TestConnection : IConnection
|
||||
{
|
|
@ -0,0 +1,18 @@
|
|||
//using Sphagnum.Common.Old.Contracts.Login;
|
||||
//using Sphagnum.Common.Old.Services;
|
||||
|
||||
//namespace Sphagnum.Common.UnitTests.Old.Services
|
||||
//{
|
||||
// internal class TestConnectionFactory : ConnectionFactory
|
||||
// {
|
||||
// internal override SphagnumConnectionOld CreateDefault(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
// {
|
||||
// return new SphagnumConnectionOld(() => new TestConnection(), messagesProcessorFactory);
|
||||
// }
|
||||
|
||||
// internal override Task<SphagnumConnectionOld> CreateDefaultConnected(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
// {
|
||||
// return Task.FromResult(new SphagnumConnectionOld(() => new TestConnection(), messagesProcessorFactory));
|
||||
// }
|
||||
// }
|
||||
//}
|
|
@ -1,18 +0,0 @@
|
|||
using Sphagnum.Common.Contracts.Login;
|
||||
using Sphagnum.Common.Services;
|
||||
|
||||
namespace Sphagnum.Common.UnitTests.Services
|
||||
{
|
||||
internal class TestConnectionFactory : ConnectionFactory
|
||||
{
|
||||
internal override SphagnumConnection CreateDefault(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
return new SphagnumConnection(() => new TestConnection(), messagesProcessorFactory);
|
||||
}
|
||||
|
||||
internal override Task<SphagnumConnection> CreateDefaultConnected(Func<Func<byte[], Task>> messagesProcessorFactory)
|
||||
{
|
||||
return Task.FromResult(new SphagnumConnection(() => new TestConnection(), messagesProcessorFactory));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue