рефакторинг временнОго кеша

dev
vlad zverzhkhovskiy 2025-09-12 11:50:26 +03:00
parent fcf686a9d5
commit c9bad810ac
10 changed files with 186 additions and 46 deletions

View File

@ -0,0 +1,8 @@
namespace KLHZ.Trader.Core.Contracts.Declisions.Dtos
{
public class CachedValue
{
public DateTime Time { get; init; }
public decimal Value { get; init; }
}
}

View File

@ -0,0 +1,10 @@
namespace KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums
{
public enum TimeWindowCacheType
{
None = 0,
_1_Minute = 1,
_2_Minutes = 2,
_15_Minutes = 15,
}
}

View File

@ -1,4 +1,6 @@
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces
{
@ -11,6 +13,10 @@ namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces
public ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(TimeSpan period);
public ValueTask AddOrderbook(IOrderbook orderbook);
public ValueTask AddDataToTimeWindowCache(string key, CachedValue data, TimeWindowCacheType timeWindowCacheType);
public ValueTask<CachedValue[]> GetDataFromTimeWindowCache(string key, TimeWindowCacheType timeWindowCacheType);
/// <summary>
/// Число заявок на продаже в стакане.
/// </summary>

View File

@ -1,14 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace KLHZ.Trader.Core.Math.Declisions.Dtos
{
public class CachedValue
{
public DateTime Time { get; init; }
public decimal Value { get; init; }
}
}

View File

@ -0,0 +1,61 @@
using KLHZ.Trader.Core.Contracts.Declisions.Dtos;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
namespace KLHZ.Trader.Core.Math.Declisions.Dtos
{
internal class TimeWindowCacheItem
{
private readonly object _locker = new();
private readonly LinkedList<CachedValue> _cachedValues = new();
public readonly TimeSpan WindowSize;
public readonly string Key;
public TimeWindowCacheItem(string key, TimeWindowCacheType window)
{
Key = key;
WindowSize = GetTimeSpan(window);
}
public ValueTask AddData(CachedValue cachedValue)
{
lock (_locker)
{
_cachedValues.AddLast(cachedValue);
if (_cachedValues.Last != null && _cachedValues.First != null
&& _cachedValues.Last.Value.Time - _cachedValues.First.Value.Time > WindowSize)
{
_cachedValues.RemoveFirst();
}
}
return ValueTask.CompletedTask;
}
public ValueTask<CachedValue[]> GetValues()
{
lock (_locker)
{
return ValueTask.FromResult(_cachedValues.ToArray());
}
}
private static TimeSpan GetTimeSpan(TimeWindowCacheType type)
{
switch (type)
{
case TimeWindowCacheType._2_Minutes:
{
return TimeSpan.FromMinutes(2);
}
case TimeWindowCacheType._15_Minutes:
{
return TimeSpan.FromMinutes(15);
}
default:
{
return TimeSpan.FromMinutes(1);
}
}
}
}
}

View File

@ -1,4 +1,6 @@
using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.Math.Declisions.Dtos;
using System.Collections.Concurrent;
@ -48,7 +50,9 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
private readonly object _locker = new();
private readonly decimal[] Prices = new decimal[_arrayMaxLength];
private readonly DateTime[] Timestamps = new DateTime[_arrayMaxLength];
private readonly ConcurrentDictionary<string, LinkedList<CachedValue>> TimeWindows = new();
private readonly ConcurrentDictionary<string, TimeWindowCacheItem> _1_minTimeWindows = new();
private readonly ConcurrentDictionary<string, TimeWindowCacheItem> _2_minTimeWindows = new();
private readonly ConcurrentDictionary<string, TimeWindowCacheItem> _15_minTimeWindows = new();
private int _length = 0;
private int _pointer = -1;
@ -56,11 +60,46 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
private long _asksCount = 1;
private long _bidsCount = 1;
public ValueTask AddDataToTimeWindowCache(string key, CachedValue data, TimeSpan window)
public ValueTask AddDataToTimeWindowCache(string key, CachedValue data, TimeWindowCacheType timeWindowCacheType)
{
var dict = GetDict(timeWindowCacheType);
if (!dict.TryGetValue(key, out var cahcheItem))
{
dict.TryAdd(key, new TimeWindowCacheItem(key, timeWindowCacheType));
}
dict[key].AddData(data);
return ValueTask.CompletedTask;
}
public ValueTask<CachedValue[]> GetDataFromTimeWindowCache(string key, TimeWindowCacheType timeWindowCacheType)
{
var dict = GetDict(timeWindowCacheType);
if (dict.TryGetValue(key, out var cahcheItem))
{
return cahcheItem.GetValues();
}
return ValueTask.FromResult(Array.Empty<CachedValue>());
}
private ConcurrentDictionary<string, TimeWindowCacheItem> GetDict(TimeWindowCacheType timeWindowCacheType)
{
switch (timeWindowCacheType)
{
case TimeWindowCacheType._2_Minutes:
{
return _2_minTimeWindows;
}
case TimeWindowCacheType._15_Minutes:
{
return _15_minTimeWindows;
}
default:
{
return _1_minTimeWindows; ;
}
}
}
public ValueTask AddData(INewPrice priceChange)
{
if (priceChange.Figi != Figi) return ValueTask.CompletedTask;

View File

@ -0,0 +1,11 @@
namespace KLHZ.Trader.Core.Exchange
{
internal static class Constants
{
internal const string _1minCacheKey = "1min";
internal const string BigWindowCrossingAverageProcessor = "Trader_big";
internal const string SmallWindowCrossingAverageProcessor = "Trader_small";
internal const string AreasRelationProcessor = "balancescalc30min";
internal readonly static TimeSpan AreasRelationWindow = TimeSpan.FromMinutes(15);
}
}

View File

@ -31,9 +31,6 @@ namespace KLHZ.Trader.Core.Exchange.Services
private readonly ConcurrentDictionary<string, DateTime> OpeningStops = new();
private readonly ConcurrentDictionary<string, InstrumentSettings> Leverages = new();
private readonly string _bigWindowProcessor = nameof(Trader) + "_big";
private readonly string _smallWindowProcessor = nameof(Trader) + "_small";
private readonly decimal _futureComission;
private readonly decimal _shareComission;
private readonly decimal _accountCashPart;
@ -75,7 +72,6 @@ namespace KLHZ.Trader.Core.Exchange.Services
private async Task ProcessPrices()
{
var buffer = new LinkedList<(DateTime, double)>();
while (await _pricesChannel.Reader.WaitToReadAsync())
{
var message = await _pricesChannel.Reader.ReadAsync();
@ -94,15 +90,11 @@ namespace KLHZ.Trader.Core.Exchange.Services
var windowMaxSize = 1000;
await SellAssetsIfNeed(message);
var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize);
if (data.timestamps.Length <= 1)
{
buffer.Clear();
}
var state = ExchangeScheduler.GetCurrentState(message.Time);
await ProcessClearing(data, state, message);
ProcessOpeningStops(message, currentTime);
await ProcessNewPriceIMOEXF(data, state, message, windowMaxSize, buffer);
await ProcessNewPriceIMOEXF(data, state, message, windowMaxSize);
}
}
catch (Exception ex)
@ -129,7 +121,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
var profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
if (message.Time - asset.BoughtAt > TimeSpan.FromMinutes(4) && profit<-66m)
if (message.Time - asset.BoughtAt > TimeSpan.FromMinutes(4) && profit < -66m)
{
await _dataBus.Broadcast(new TradeCommand()
{
@ -143,9 +135,10 @@ namespace KLHZ.Trader.Core.Exchange.Services
});
OpeningStops[message.Figi] = DateTime.UtcNow.AddMinutes(10);
await LogDeclision(DeclisionTradeAction.CloseLong, message, profit);
await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit);
}
if (message.Time - asset.BoughtAt > TimeSpan.FromHours(4) && profit> 100)
if (message.Time - asset.BoughtAt > TimeSpan.FromHours(4) && profit > 100)
{
await _dataBus.Broadcast(new TradeCommand()
{
@ -158,6 +151,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
EnableMargin = false,
});
await LogDeclision(DeclisionTradeAction.CloseLong, message, profit);
await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit);
}
}
}
@ -165,7 +159,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
private async Task ProcessNewPriceIMOEXF((DateTime[] timestamps, decimal[] prices) data,
ExchangeState state,
INewPrice message, int windowMaxSize, LinkedList<(DateTime time, double val)> areasBuffer)
INewPrice message, int windowMaxSize)
{
var res = TradingEvent.None;
var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices,
@ -175,21 +169,22 @@ namespace KLHZ.Trader.Core.Exchange.Services
if (resultMoveAvFull.bigWindowAv != 0)
{
await LogPrice(message, _bigWindowProcessor, resultMoveAvFull.bigWindowAv);
await LogPrice(message, _smallWindowProcessor, resultMoveAvFull.smallWindowAv);
await LogPrice(message, Constants.BigWindowCrossingAverageProcessor, resultMoveAvFull.bigWindowAv);
await LogPrice(message, Constants.SmallWindowCrossingAverageProcessor, resultMoveAvFull.smallWindowAv);
}
var areasRel = -1m;
if (ShapeAreaCalculator.TryGetAreasRelation(data.timestamps, data.prices, message.Value, TimeSpan.FromMinutes(15), out var rel))
if (ShapeAreaCalculator.TryGetAreasRelation(data.timestamps, data.prices, message.Value, Constants.AreasRelationWindow, out var rel))
{
areasBuffer.AddLast((message.Time, rel));
if (areasBuffer.Last != null && areasBuffer.First != null
&& areasBuffer.Last.Value.time - areasBuffer.First.Value.time > TimeSpan.FromMinutes(1))
await _tradeDataProvider.AddDataTo1MinuteWindowCache(message.Figi, Constants._1minCacheKey, new Contracts.Declisions.Dtos.CachedValue()
{
areasBuffer.RemoveFirst();
}
areasRel = (decimal)areasBuffer.Sum(a => a.val) / areasBuffer.Count;
await LogPrice(message, "balancescalc30min", areasRel);
Time = message.Time,
Value = (decimal)rel
});
var areas = await _tradeDataProvider.GetDataFrom1MinuteWindowCache(message.Figi, Constants._1minCacheKey);
areasRel = (decimal)areas.Sum(a => a.Value) / areas.Length;
await LogPrice(message, Constants.AreasRelationProcessor, areasRel);
}
if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart
&& !OpeningStops.TryGetValue(message.Figi, out _)
@ -232,10 +227,6 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
await LogDeclision(DeclisionTradeAction.OpenLong, message);
}
//else if (areasRel >=75)
//{
// await LogDeclision(DeclisionTradeAction.OpenShort, message);
//}
}
if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd)

View File

@ -1,4 +1,6 @@
using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.DataLayer;
@ -8,6 +10,7 @@ using KLHZ.Trader.Core.Exchange.Extentions;
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using KLHZ.Trader.Core.Exchange.Models.Configs;
using KLHZ.Trader.Core.Math.Declisions.Services.Cache;
using KLHZ.Trader.Core.Math.Declisions.Utils;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@ -96,6 +99,25 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
}
public async ValueTask AddDataTo1MinuteWindowCache(string figi, string key, CachedValue data)
{
if (!_historyCash.TryGetValue(figi, out var unit))
{
unit = new PriceHistoryCacheUnit2(figi);
_historyCash.TryAdd(figi, unit);
}
await _historyCash[figi].AddDataToTimeWindowCache(key, data, TimeWindowCacheType._1_Minute);
}
public ValueTask<CachedValue[]> GetDataFrom1MinuteWindowCache(string figi, string key)
{
if (_historyCash.TryGetValue(figi, out var cahcheItem))
{
return cahcheItem.GetDataFromTimeWindowCache(key, TimeWindowCacheType._1_Minute);
}
return ValueTask.FromResult(Array.Empty<CachedValue>());
}
public async ValueTask AddOrderbook(IOrderbook orderbook)
{
if (!_historyCash.TryGetValue(orderbook.Figi, out var unit))
@ -161,6 +183,12 @@ namespace KLHZ.Trader.Core.Exchange.Services
foreach (var price in data)
{
await AddData(price);
var cachedData = await GetData(price.Figi);
if (ShapeAreaCalculator.TryGetAreasRelation(cachedData.timestamps, cachedData.prices, price.Value, Constants.AreasRelationWindow, out var rel))
{
await AddDataTo1MinuteWindowCache(price.Figi, Constants._1minCacheKey, new CachedValue() { Time = price.Time, Value = (decimal)rel });
}
}
}

View File

@ -30,7 +30,7 @@ namespace KLHZ.Trader.Service.Controllers
try
{
var time1 = DateTime.UtcNow.AddDays(-30);
var time2 = DateTime.UtcNow.AddMinutes(30);
var time2 = DateTime.UtcNow.AddMinutes(18);
using var context1 = await _dbContextFactory.CreateDbContextAsync();
context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
var data = await context1.PriceChanges