Унификация используемых моделей

dev
vlad zverzhkhovskiy 2025-10-07 13:26:28 +03:00
parent e2726586e5
commit 41d33356dd
17 changed files with 272 additions and 32 deletions

View File

@ -2,7 +2,7 @@
namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces
{
internal interface ICachedValue : INewPrice
internal interface ICachedValue : ITradeDataItem
{
public decimal Value { get; }
public decimal Value2 { get; }

View File

@ -8,7 +8,7 @@ namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces
{
public string Figi { get; }
public int Length { get; }
public ValueTask AddData(INewPrice priceChange);
public ValueTask AddData(ITradeDataItem priceChange);
public ValueTask<(DateTime[] timestamps, decimal[] prices)> GetData(int? length = null);
public ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(TimeSpan period);
public ValueTask AddOrderbook(IOrderbook orderbook);

View File

@ -1,6 +1,6 @@
namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces
{
public interface INewPrice
public interface ITradeDataItem
{
public bool IsHistoricalData { get; }
public decimal Price { get; }
@ -9,5 +9,7 @@
public DateTime Time { get; }
public long Count { get; }
public int Direction { get; }
public decimal Value { get; }
public decimal Value2 { get; }
}
}

View File

@ -1,6 +1,6 @@
namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces
{
public interface IProcessedPrice : INewPrice
public interface IProcessedPrice : ITradeDataItem
{
public string Processor { get; set; }
}

View File

@ -2,7 +2,7 @@
namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos
{
public class NewPriceMessage : INewPrice
public class NewTrade : ITradeDataItem
{
public decimal Price { get; set; }
public required string Figi { get; set; }
@ -11,5 +11,7 @@ namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos
public bool IsHistoricalData { get; set; }
public long Count { get; set; }
public int Direction { get; set; }
public decimal Value { get; init; }
public decimal Value2 { get; init; }
}
}

View File

@ -6,10 +6,10 @@ namespace KLHZ.Trader.Core.Contracts.Messaging.Interfaces
public interface IDataBus
{
public bool AddChannel(string key, Channel<IOrderbook> channel);
public bool AddChannel(string key, Channel<INewPrice> channel);
public bool AddChannel(string key, Channel<ITradeDataItem> channel);
public bool AddChannel(string key, Channel<IMessage> channel);
public bool AddChannel(string key, Channel<ITradeCommand> channel);
public Task Broadcast(INewPrice newPriceMessage);
public Task Broadcast(ITradeDataItem newPriceMessage);
public Task Broadcast(IOrderbook orderbook);
public Task Broadcast(IMessage message);
public Task Broadcast(ITradeCommand message);

View File

@ -22,7 +22,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Dtos
lock (_locker)
{
_cachedValues.AddLast(cachedValue);
if (_cachedValues.Last != null && _cachedValues.First != null
while (_cachedValues.Last != null && _cachedValues.First != null
&& _cachedValues.Last.Value.Time - _cachedValues.First.Value.Time > WindowSize)
{
_cachedValues.RemoveFirst();

View File

@ -105,7 +105,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
}
}
public ValueTask AddData(INewPrice priceChange)
public ValueTask AddData(ITradeDataItem priceChange)
{
if (priceChange.Figi != Figi) return ValueTask.CompletedTask;
lock (_locker)
@ -200,7 +200,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
}
}
public PriceHistoryCacheUnit2(string figi, params INewPrice[] priceChanges)
public PriceHistoryCacheUnit2(string figi, params ITradeDataItem[] priceChanges)
{
Figi = figi;

View File

@ -0,0 +1,224 @@
using KLHZ.Trader.Core.Contracts.Declisions.Dtos;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.Math.Declisions.Dtos;
using System.Collections.Concurrent;
namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
{
public class PriceHistoryCacheUnit3 : IPriceHistoryCacheUnit
{
public const int CacheMaxLength = 30000;
private const int _arrayMaxLength = 60000;
public string Figi { get; init; }
public int Length
{
get
{
lock (_locker)
{
return _length;
}
}
}
public decimal AsksCount
{
get
{
lock (_locker)
{
return _asksCount;
}
}
}
public decimal BidsCount
{
get
{
lock (_locker)
{
return _bidsCount;
}
}
}
private readonly object _locker = new();
private readonly decimal[] Prices = new decimal[_arrayMaxLength];
private readonly DateTime[] Timestamps = new DateTime[_arrayMaxLength];
private readonly ConcurrentDictionary<string, TimeWindowCacheItem> _20_secTimeWindows = new();
private readonly ConcurrentDictionary<string, TimeWindowCacheItem> _1_minTimeWindows = new();
private readonly ConcurrentDictionary<string, TimeWindowCacheItem> _5_minTimeWindows = new();
private readonly ConcurrentDictionary<string, TimeWindowCacheItem> _15_minTimeWindows = new();
private int _length = 0;
private int _pointer = -1;
private long _asksCount = 1;
private long _bidsCount = 1;
public ValueTask AddDataToTimeWindowCache(string key, CachedValue data, TimeWindowCacheType timeWindowCacheType)
{
var dict = GetDict(timeWindowCacheType);
if (!dict.TryGetValue(key, out var cahcheItem))
{
dict.TryAdd(key, new TimeWindowCacheItem(key, timeWindowCacheType));
}
dict[key].AddData(data);
return ValueTask.CompletedTask;
}
public ValueTask<CachedValue[]> GetDataFromTimeWindowCache(string key, TimeWindowCacheType timeWindowCacheType)
{
var dict = GetDict(timeWindowCacheType);
if (dict.TryGetValue(key, out var cahcheItem))
{
return cahcheItem.GetValues();
}
return ValueTask.FromResult(Array.Empty<CachedValue>());
}
private ConcurrentDictionary<string, TimeWindowCacheItem> GetDict(TimeWindowCacheType timeWindowCacheType)
{
switch (timeWindowCacheType)
{
case TimeWindowCacheType._5_Minutes:
{
return _5_minTimeWindows;
}
case TimeWindowCacheType._15_Minutes:
{
return _15_minTimeWindows;
}
case TimeWindowCacheType._20_Seconds:
{
return _20_secTimeWindows;
}
default:
{
return _1_minTimeWindows; ;
}
}
}
public ValueTask AddData(ITradeDataItem priceChange)
{
if (priceChange.Figi != Figi) return ValueTask.CompletedTask;
lock (_locker)
{
_pointer++;
Prices[_pointer] = priceChange.Price;
Timestamps[_pointer] = priceChange.Time;
if (_length < CacheMaxLength)
{
_length++;
}
if (_pointer == _arrayMaxLength - 1)
{
Array.Copy(Prices, Prices.Length - CacheMaxLength, Prices, 0, CacheMaxLength);
Array.Copy(Timestamps, Timestamps.Length - CacheMaxLength, Timestamps, 0, CacheMaxLength);
_pointer = CacheMaxLength - 1;
}
}
return ValueTask.CompletedTask;
}
public ValueTask<(DateTime[] timestamps, decimal[] prices)> GetData(int? length = null)
{
lock (_locker)
{
if (_pointer < 0)
{
return ValueTask.FromResult((Array.Empty<DateTime>(), Array.Empty<decimal>()));
}
else
{
var dataLength = length.HasValue ? System.Math.Min(length.Value, _length) : _length;
var prices = new decimal[dataLength];
var timestamps = new DateTime[dataLength];
var index = 1 + _pointer - dataLength;
Array.Copy(Prices, index, prices, 0, prices.Length);
Array.Copy(Timestamps, index, timestamps, 0, timestamps.Length);
return ValueTask.FromResult((timestamps, prices));
}
}
}
public ValueTask AddOrderbook(IOrderbook orderbook)
{
if (orderbook.Figi != Figi) return ValueTask.CompletedTask;
lock (_locker)
{
_asksCount = orderbook.AsksCount;
_bidsCount = orderbook.BidsCount;
}
return ValueTask.CompletedTask;
}
public ValueTask<(DateTime time, decimal price)> GetLastValues()
{
lock (_locker)
{
return _pointer >= 0 ? ValueTask.FromResult((Timestamps[_pointer], Prices[_pointer])) : ValueTask.FromResult((DateTime.UtcNow, 0m));
}
}
public ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(TimeSpan period)
{
lock (_locker)
{
if (_pointer < 0)
{
return ValueTask.FromResult((Array.Empty<DateTime>(), Array.Empty<decimal>(), false));
}
else
{
var i = _pointer;
var lastTime = Timestamps[i];
for (i = _pointer - 1; i >= 0; i--)
{
var currentTime = Timestamps[i];
if (lastTime - currentTime >= period)
{
break;
}
}
var dataLength = _pointer - i;
var prices = new decimal[dataLength];
var timestamps = new DateTime[dataLength];
var index = 1 + _pointer - dataLength;
Array.Copy(Prices, index, prices, 0, prices.Length);
Array.Copy(Timestamps, index, timestamps, 0, timestamps.Length);
return ValueTask.FromResult((timestamps, prices, i + 1 != 0));
}
}
}
public PriceHistoryCacheUnit3(string figi, params ITradeDataItem[] priceChanges)
{
Figi = figi;
if (priceChanges.Length == 0)
{
return;
}
var selectedPriceChanges = priceChanges
.OrderBy(pc => pc.Time)
.Skip(priceChanges.Length - CacheMaxLength)
.ToArray();
foreach (var pc in selectedPriceChanges)
{
AddData(pc).AsTask().Wait();
}
}
}
}

View File

@ -10,7 +10,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services
private readonly ConcurrentDictionary<string, Channel<IOrderbook>> _orderbooksChannels = new();
private readonly ConcurrentDictionary<string, Channel<IMessage>> _messagesChannels = new();
private readonly ConcurrentDictionary<string, Channel<ITradeCommand>> _commandsChannel = new();
private readonly ConcurrentDictionary<string, Channel<INewPrice>> _priceChannels = new();
private readonly ConcurrentDictionary<string, Channel<ITradeDataItem>> _priceChannels = new();
public bool AddChannel(string key, Channel<IMessage> channel)
{
@ -22,7 +22,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services
return _commandsChannel.TryAdd(key, channel);
}
public bool AddChannel(string key, Channel<INewPrice> channel)
public bool AddChannel(string key, Channel<ITradeDataItem> channel)
{
return _priceChannels.TryAdd(key, channel);
}
@ -32,7 +32,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services
return _orderbooksChannels.TryAdd(key, channel);
}
public async Task Broadcast(INewPrice newPriceMessage)
public async Task Broadcast(ITradeDataItem newPriceMessage)
{
foreach (var channel in _priceChannels.Values)
{

View File

@ -4,7 +4,7 @@ using System.ComponentModel.DataAnnotations.Schema;
namespace KLHZ.Trader.Core.DataLayer.Entities.Prices
{
[Table("price_changes")]
public class PriceChange : INewPrice
public class PriceChange : ITradeDataItem
{
[Column("id")]
public long Id { get; set; }
@ -28,5 +28,11 @@ namespace KLHZ.Trader.Core.DataLayer.Entities.Prices
[Column("direction")]
public int Direction { get; set; }
[NotMapped]
public decimal Value { get; set; }
[NotMapped]
public decimal Value2 { get; set; }
}
}

View File

@ -31,5 +31,11 @@ namespace KLHZ.Trader.Core.DataLayer.Entities.Prices
[NotMapped]
public int Direction { get; set; }
[NotMapped]
public decimal Value { get; set; }
[NotMapped]
public decimal Value2 { get; set; }
}
}

View File

@ -37,7 +37,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
private readonly ConcurrentDictionary<string, decimal> DPirsonValues = new();
private readonly Channel<INewPrice> _pricesChannel = Channel.CreateUnbounded<INewPrice>();
private readonly Channel<ITradeDataItem> _pricesChannel = Channel.CreateUnbounded<ITradeDataItem>();
private readonly Channel<ITradeCommand> _commands = Channel.CreateUnbounded<ITradeCommand>();
private readonly Channel<IOrderbook> _orderbooks = Channel.CreateUnbounded<IOrderbook>();
public Trader(
@ -80,7 +80,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
if (command.CommandType == TradeCommandType.OpenLong
|| command.CommandType == TradeCommandType.OpenShort)
{
var fakeMessage = new NewPriceMessage() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
var fakeMessage = new NewTrade() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
var positionType = command.CommandType == TradeCommandType.OpenLong ? PositionType.Long : PositionType.Short;
var stops = GetStops(fakeMessage, positionType);
var accounts = _portfolioWrapper.Accounts
@ -92,7 +92,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
else
{
var fakeMessage = new NewPriceMessage() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
var fakeMessage = new NewTrade() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
var assetsForClose = _portfolioWrapper.Accounts
.SelectMany(a => a.Value.Assets.Values)
.Where(a => a.Figi == fakeMessage.Figi)
@ -118,8 +118,8 @@ namespace KLHZ.Trader.Core.Exchange.Services
private async Task ProcessPrices()
{
var pricesCache1 = new Dictionary<string, List<INewPrice>>();
var pricesCache2 = new Dictionary<string, List<INewPrice>>();
var pricesCache1 = new Dictionary<string, List<ITradeDataItem>>();
var pricesCache2 = new Dictionary<string, List<ITradeDataItem>>();
var timesCache = new Dictionary<string, DateTime>();
while (await _pricesChannel.Reader.WaitToReadAsync())
{
@ -259,7 +259,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
}
private async Task ClosePositions(Asset[] assets, INewPrice message, bool withProfitOnly = true)
private async Task ClosePositions(Asset[] assets, ITradeDataItem message, bool withProfitOnly = true)
{
var loggedDeclisions = 0;
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
@ -322,7 +322,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
}
private async Task OpenPositions(IManagedAccount[] accounts, INewPrice message, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1)
private async Task OpenPositions(IManagedAccount[] accounts, ITradeDataItem message, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1)
{
var loggedDeclisions = 0;
var sign = positionType == PositionType.Long ? 1 : 1;
@ -348,7 +348,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
}
private async Task ExecuteDeclisions(INewPrice message, ImmutableDictionary<TradingEvent, decimal> result)
private async Task ExecuteDeclisions(ITradeDataItem message, ImmutableDictionary<TradingEvent, decimal> result)
{
var state = ExchangeScheduler.GetCurrentState();
if (result[TradingEvent.UptrendStart] >= Constants.UppingCoefficient
@ -449,7 +449,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
return res;
}
private (decimal stopLoss, decimal takeProfit) GetStops(INewPrice message, PositionType type)
private (decimal stopLoss, decimal takeProfit) GetStops(ITradeDataItem message, PositionType type)
{
decimal stopLossShift = 2m;
decimal takeProfitShift = 6;

View File

@ -93,7 +93,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
return (Array.Empty<DateTime>(), Array.Empty<decimal>());
}
public async ValueTask AddData(INewPrice message, TimeSpan? clearingInterval = null)
public async ValueTask AddData(ITradeDataItem message, TimeSpan? clearingInterval = null)
{
if (message.Direction != 1) return;
if (_historyCash.TryGetValue(message.Figi, out var unit))
@ -236,7 +236,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
var data = await context1.PriceChanges
.Where(c => _instrumentsFigis.Contains(c.Figi) && c.Time >= time)
.OrderBy(c => c.Time)
.Select(c => new NewPriceMessage()
.Select(c => new NewTrade()
{
Figi = c.Figi,
Ticker = c.Ticker,
@ -299,7 +299,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
}
internal async Task LogPrice(INewPrice message, string processor, decimal value)
internal async Task LogPrice(ITradeDataItem message, string processor, decimal value)
{
await LogPrice(new ProcessedPrice()
{
@ -323,7 +323,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
}, false);
}
internal async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null)
internal async Task LogDeclision(DeclisionTradeAction action, ITradeDataItem message, decimal? profit = null)
{
await LogDeclision(new Declision()
{
@ -337,7 +337,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
}, false);
}
internal async Task LogDeclision(DeclisionTradeAction action, decimal price, DateTime time, INewPrice message)
internal async Task LogDeclision(DeclisionTradeAction action, decimal price, DateTime time, ITradeDataItem message)
{
await LogDeclision(new Declision()
{

View File

@ -58,11 +58,11 @@ namespace KLHZ.Trader.Core.Exchange.Utils
return true;
}
internal static INewPrice FilterHighFreqValues(INewPrice message, Dictionary<string, List<INewPrice>> pricesCache1)
internal static ITradeDataItem FilterHighFreqValues(ITradeDataItem message, Dictionary<string, List<ITradeDataItem>> pricesCache1)
{
if (!pricesCache1.TryGetValue(message.Figi, out var list))
{
list = new List<INewPrice>();
list = new List<ITradeDataItem>();
pricesCache1[message.Figi] = list;
}
list.Add(message);

View File

@ -49,7 +49,7 @@ namespace KLHZ.Trader.Service.Controllers
var prices = await context1.PriceChanges
.Where(c => (c.Figi == figi1 || c.Figi == figi2) && c.Time >= time1 && c.Time < time2)
.OrderBy(c => c.Time)
.Select(c => new NewPriceMessage()
.Select(c => new NewTrade()
{
Figi = c.Figi,
Ticker = c.Ticker,

View File

@ -6,7 +6,7 @@ namespace KLHZ.Trader.Service.Models
{
public required string Figi { get; set; }
public DateTime Time { get; set; }
public INewPrice? NewPrice { get; set; }
public ITradeDataItem? NewPrice { get; set; }
public IOrderbook? Orderbook { get; set; }
}
}