From 64ce547f1ba1286771d7124877c91c7bbb17ac35 Mon Sep 17 00:00:00 2001 From: vlad zverzhkhovskiy Date: Mon, 8 Sep 2025 13:48:54 +0300 Subject: [PATCH] =?UTF-8?q?=D0=BE=D0=B1=D0=BD=D0=BE=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D1=81=D1=82=D1=80=D0=B0=D1=82=D0=B5=D0=B3?= =?UTF-8?q?=D0=B8=D0=B8=20+=20=D1=80=D0=B5=D1=84=D0=B0=D0=BA=D1=82=D0=BE?= =?UTF-8?q?=D1=80=D0=B8=D0=BD=D0=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Interfaces/IPriceHistoryCacheUnit.cs | 3 + .../Declisions/Dtos/ProcessedPrice.cs | 14 - .../Services/Cache/PriceHistoryCacheUnit.cs | 94 ---- .../Services/Cache/PriceHistoryCacheUnit2.cs | 40 ++ .../Declisions/Utils/MovingAverage.cs | 10 +- .../HistoryCacheUnit2Tests.cs | 61 ++- .../HistoryCacheUnitTests.cs | 207 ------- KLHZ.Trader.Core.Tests/TraderTests.cs | 13 +- .../Services/ProcessedPricesLogger.cs | 74 --- .../Exchange/Services/ExchangeDataReader.cs | 8 +- KLHZ.Trader.Core/Exchange/Services/Trader.cs | 512 +++++++++--------- ...eDataProvider.cs => TraderDataProvider.cs} | 235 +++++++- .../Services/TradingCommandsExecutor.cs | 4 +- .../Controllers/PlayController.cs | 4 +- KLHZ.Trader.Service/Program.cs | 2 +- 15 files changed, 576 insertions(+), 705 deletions(-) delete mode 100644 KLHZ.Trader.Core.Math/Declisions/Dtos/ProcessedPrice.cs delete mode 100644 KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit.cs delete mode 100644 KLHZ.Trader.Core.Tests/HistoryCacheUnitTests.cs delete mode 100644 KLHZ.Trader.Core/Common/Messaging/Services/ProcessedPricesLogger.cs rename KLHZ.Trader.Core/Exchange/Services/{TradeDataProvider.cs => TraderDataProvider.cs} (50%) diff --git a/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs b/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs index 2826943..c377411 100644 --- a/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs +++ b/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs @@ -8,6 +8,7 @@ namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces public int Length { get; } public ValueTask AddData(INewPrice priceChange); public ValueTask<(DateTime[] timestamps, decimal[] prices)> GetData(int? length = null); + public ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(TimeSpan period); public ValueTask AddOrderbook(IOrderbook orderbook); /// @@ -19,5 +20,7 @@ namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces /// Число заявок на покупку в стакане. /// public decimal BidsCount { get; } + + public ValueTask<(DateTime time, decimal price)> GetLastValues(); } } diff --git a/KLHZ.Trader.Core.Math/Declisions/Dtos/ProcessedPrice.cs b/KLHZ.Trader.Core.Math/Declisions/Dtos/ProcessedPrice.cs deleted file mode 100644 index a8d0d78..0000000 --- a/KLHZ.Trader.Core.Math/Declisions/Dtos/ProcessedPrice.cs +++ /dev/null @@ -1,14 +0,0 @@ -using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; - -namespace KLHZ.Trader.Core.Math.Declisions.Dtos -{ - public class ProcessedPrice : IProcessedPrice - { - public required string Processor { get; set; } - public bool IsHistoricalData { get; set; } - public decimal Value { get; set; } - public required string Figi { get; set; } - public required string Ticker { get; set; } - public DateTime Time { get; set; } - } -} diff --git a/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit.cs b/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit.cs deleted file mode 100644 index de90df6..0000000 --- a/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit.cs +++ /dev/null @@ -1,94 +0,0 @@ -using KLHZ.Trader.Core.Contracts.Declisions.Interfaces; -using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; - -namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache -{ - public class PriceHistoryCacheUnit : IPriceHistoryCacheUnit - { - public const int CacheMaxLength = 500; - - public string Figi { get; init; } - - public int Length - { - get - { - lock (_locker) - { - return _length; - } - } - } - - public decimal AsksCount => 1; - public decimal BidsCount => 1; - - private readonly object _locker = new(); - private readonly decimal[] Prices = new decimal[CacheMaxLength]; - private readonly DateTime[] Timestamps = new DateTime[CacheMaxLength]; - - private int _length = 0; - - public ValueTask AddData(INewPrice priceChange) - { - lock (_locker) - { - Array.Copy(Prices, 1, Prices, 0, Prices.Length - 1); - Array.Copy(Timestamps, 1, Timestamps, 0, Timestamps.Length - 1); - - Prices[Prices.Length - 1] = priceChange.Value; - Timestamps[Timestamps.Length - 1] = priceChange.Time; - - if (_length < CacheMaxLength) - { - _length++; - } - } - return ValueTask.CompletedTask; - } - - public ValueTask<(DateTime[] timestamps, decimal[] prices)> GetData(int? length = null) - { - lock (_locker) - { - var prices = new decimal[_length]; - var timestamps = new DateTime[_length]; - Array.Copy(Prices, Prices.Length - _length, prices, 0, prices.Length); - Array.Copy(Timestamps, Prices.Length - _length, timestamps, 0, timestamps.Length); - return ValueTask.FromResult((timestamps, prices)); - } - } - - public ValueTask AddOrderbook(IOrderbook orderbook) - { - return ValueTask.CompletedTask; - } - - public PriceHistoryCacheUnit(string figi, params INewPrice[] priceChanges) - { - Figi = figi; - - - if (priceChanges.Length == 0) - { - return; - } - - var selectedPriceChanges = priceChanges - .OrderBy(pc => pc.Time) - .Skip(priceChanges.Length - CacheMaxLength) - .ToArray(); - var prices = selectedPriceChanges - .Select(pc => pc.Value) - .ToArray(); - var times = selectedPriceChanges - .Select(pc => pc.Time) - .ToArray(); - - Array.Copy(prices, 0, Prices, Prices.Length - prices.Length, prices.Length); - Array.Copy(times, 0, Timestamps, Timestamps.Length - times.Length, times.Length); - - _length = times.Length > CacheMaxLength ? CacheMaxLength : times.Length; - } - } -} diff --git a/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit2.cs b/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit2.cs index dcad0f0..f1af6fe 100644 --- a/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit2.cs +++ b/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit2.cs @@ -108,6 +108,46 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache return ValueTask.CompletedTask; } + public ValueTask<(DateTime time, decimal price)> GetLastValues() + { + lock (_locker) + { + return ValueTask.FromResult((Timestamps[_pointer], Prices[_pointer])); + } + } + + public ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(TimeSpan period) + { + lock (_locker) + { + if (_pointer < 0) + { + return ValueTask.FromResult((Array.Empty(), Array.Empty(), false)); + } + else + { + var i = _pointer; + var lastTime = Timestamps[i]; + for (i = _pointer - 1; i >= 0; i--) + { + var currentTime = Timestamps[i]; + if (lastTime - currentTime >= period) + { + break; + } + } + + var dataLength = _pointer - i; + var prices = new decimal[dataLength]; + var timestamps = new DateTime[dataLength]; + var index = 1 + _pointer - dataLength; + Array.Copy(Prices, index, prices, 0, prices.Length); + Array.Copy(Timestamps, index, timestamps, 0, timestamps.Length); + return ValueTask.FromResult((timestamps, prices, i + 1 != 0)); + } + } + } + public PriceHistoryCacheUnit2(string figi, params INewPrice[] priceChanges) { Figi = figi; diff --git a/KLHZ.Trader.Core.Math/Declisions/Utils/MovingAverage.cs b/KLHZ.Trader.Core.Math/Declisions/Utils/MovingAverage.cs index 74b1351..ac509ea 100644 --- a/KLHZ.Trader.Core.Math/Declisions/Utils/MovingAverage.cs +++ b/KLHZ.Trader.Core.Math/Declisions/Utils/MovingAverage.cs @@ -20,7 +20,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Utils return (startTime, sum / count); } - public static (TradingEvent events, decimal bigWindowAv, decimal smallWindowAv) CheckByWindowAverageMean(DateTime[] timestamps, decimal[] prices, int size, int smallWindow, int bigWindow, decimal meanfullStep = 3m) + public static (TradingEvent events, decimal bigWindowAv, decimal smallWindowAv) CheckByWindowAverageMean(DateTime[] timestamps, decimal[] prices, int size, int smallWindow, int bigWindow, TimeSpan timeForUptreandStart, decimal meanfullStep = 3m) { var res = TradingEvent.None; var bigWindowAv = 0m; @@ -29,6 +29,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Utils try { var pricesForFinalComparison = new decimal[size]; + var timesForFinalComparison = new DateTime[size]; var twavss = new decimal[size]; var twavbs = new decimal[size]; var times = new DateTime[size]; @@ -42,6 +43,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Utils var twavs = CalcTimeWindowAverageValue(timestamps, prices, smallWindow, shift); var twavb = CalcTimeWindowAverageValue(timestamps, prices, bigWindow, shift); pricesForFinalComparison[i2] = prices[prices.Length - 1 - shift]; + timesForFinalComparison[i2] = timestamps[prices.Length - 1 - shift]; if (shift == 0) { bigWindowAv = twavb.value; @@ -75,7 +77,8 @@ namespace KLHZ.Trader.Core.Math.Declisions.Utils // если фильтрация окном 120 наползает на окно 15 сверху, потенциальное время открытия лонга и закрытия шорта if (twavbs[size - 1] <= twavss[size - 1] && twavbs[size - 2] > twavss[size - 2]) { - if (pricesForFinalComparison[i2 + 1] - pricesForFinalComparison[size - 1] >= meanfullStep) + if (pricesForFinalComparison[i2 + 1] - pricesForFinalComparison[size - 1] >= meanfullStep + && timesForFinalComparison[size - 1] - timesForFinalComparison[i2 + 1] >= timeForUptreandStart) { res |= TradingEvent.UptrendStart; } @@ -85,7 +88,8 @@ namespace KLHZ.Trader.Core.Math.Declisions.Utils // если фильтрация окном 15 наползает на окно 120 сверху, потенциальное время закрытия лонга и возможно открытия шорта if (twavss[size - 1] <= twavbs[size - 1] && twavss[size - 2] > twavbs[size - 2]) { - if (pricesForFinalComparison[i2 + 1] - pricesForFinalComparison[size - 1] <= -meanfullStep) + if (pricesForFinalComparison[i2 + 1] - pricesForFinalComparison[size - 1] <= -meanfullStep + && timesForFinalComparison[size - 1] - timesForFinalComparison[i2 + 1] >= timeForUptreandStart) { res |= TradingEvent.UptrendEnd; } diff --git a/KLHZ.Trader.Core.Tests/HistoryCacheUnit2Tests.cs b/KLHZ.Trader.Core.Tests/HistoryCacheUnit2Tests.cs index 3049e0a..11e6709 100644 --- a/KLHZ.Trader.Core.Tests/HistoryCacheUnit2Tests.cs +++ b/KLHZ.Trader.Core.Tests/HistoryCacheUnit2Tests.cs @@ -13,7 +13,7 @@ namespace KLHZ.Trader.Core.Tests var startDt = DateTime.UtcNow.AddSeconds(-count); for (int i = 0; i < count; i++) { - startDt = startDt.AddSeconds(i); + startDt = startDt.AddSeconds(1); res[i] = new PriceChange() { Figi = figi, @@ -80,7 +80,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void Test4() { - var count = PriceHistoryCacheUnit.CacheMaxLength; + var count = PriceHistoryCacheUnit2.CacheMaxLength; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit2(figi, hist); @@ -100,7 +100,7 @@ namespace KLHZ.Trader.Core.Tests public void Test5() { var shift = 7; - var count = PriceHistoryCacheUnit.CacheMaxLength + shift; + var count = PriceHistoryCacheUnit2.CacheMaxLength + shift; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit2(figi, hist); @@ -124,7 +124,7 @@ namespace KLHZ.Trader.Core.Tests public void Test6() { var shift = 10; - var count = PriceHistoryCacheUnit.CacheMaxLength + shift; + var count = PriceHistoryCacheUnit2.CacheMaxLength + shift; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit2(figi, hist); @@ -148,7 +148,7 @@ namespace KLHZ.Trader.Core.Tests public void Test7() { var shift = 334; - var count = PriceHistoryCacheUnit.CacheMaxLength + shift; + var count = PriceHistoryCacheUnit2.CacheMaxLength + shift; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit2(figi, hist); @@ -172,7 +172,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void Test8() { - var count = PriceHistoryCacheUnit.CacheMaxLength; + var count = PriceHistoryCacheUnit2.CacheMaxLength; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit2(figi, hist); @@ -216,6 +216,9 @@ namespace KLHZ.Trader.Core.Tests var data = cacheUnit.GetData().Result; Assert.IsTrue(data.prices.Length == PriceHistoryCacheUnit2.CacheMaxLength); Assert.IsTrue(data.prices.Last() == i); + var lastValues = cacheUnit.GetLastValues().Result; + Assert.IsTrue(data.prices.Last() == lastValues.price); + Assert.IsTrue(data.timestamps.Last() == lastValues.time); } } } @@ -225,7 +228,7 @@ namespace KLHZ.Trader.Core.Tests { var length = 77; var shift = 334; - var count = PriceHistoryCacheUnit.CacheMaxLength + shift; + var count = PriceHistoryCacheUnit2.CacheMaxLength + shift; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit2(figi, hist); @@ -242,5 +245,49 @@ namespace KLHZ.Trader.Core.Tests Assert.That(hist[hist.Length - i].Time, Is.EqualTo(data.timestamps[data.prices.Length - i])); } } + + [Test] + public void Test11() + { + var length = 77; + var shift = 334; + var count = PriceHistoryCacheUnit2.CacheMaxLength + shift; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2(figi, hist); + var data = cacheUnit.GetData(length).Result; + var span = TimeSpan.FromSeconds(100); + var dataForPeriod = cacheUnit.GetData(span).Result; + + Assert.That(dataForPeriod.isFullIntervalExists); + Assert.That(data.prices.Last() == dataForPeriod.prices.Last()); + Assert.That(data.timestamps.Last() == dataForPeriod.timestamps.Last()); + var dt = dataForPeriod.timestamps.Last() - dataForPeriod.timestamps.First(); + Assert.That(dt <= span); + Assert.That(data.prices.Length == length); + Assert.That(data.timestamps.Length == length); + + Assert.That(data.prices.Last() == hist.Last().Value); + Assert.That(data.timestamps.Last() == hist.Last().Time); + } + + [Test] + public void Test12() + { + var count = 30; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2(figi, hist); + var data = cacheUnit.GetData().Result; + var span = TimeSpan.FromSeconds(100); + var dataForPeriod = cacheUnit.GetData(span).Result; + + Assert.IsFalse(dataForPeriod.isFullIntervalExists); + Assert.That(data.prices.Last() == dataForPeriod.prices.Last()); + Assert.That(data.timestamps.Last() == dataForPeriod.timestamps.Last()); + + Assert.That(data.prices.First() == dataForPeriod.prices.First()); + Assert.That(data.timestamps.First() == dataForPeriod.timestamps.First()); + } } } \ No newline at end of file diff --git a/KLHZ.Trader.Core.Tests/HistoryCacheUnitTests.cs b/KLHZ.Trader.Core.Tests/HistoryCacheUnitTests.cs deleted file mode 100644 index 9406c0f..0000000 --- a/KLHZ.Trader.Core.Tests/HistoryCacheUnitTests.cs +++ /dev/null @@ -1,207 +0,0 @@ -using KLHZ.Trader.Core.DataLayer.Entities.Prices; -using KLHZ.Trader.Core.Math.Declisions.Services.Cache; - -namespace KLHZ.Trader.Core.Tests -{ - public class HistoryCacheUnitTests - { - private static PriceChange[] GetHistory(int count, string figi) - { - var res = new PriceChange[count]; - if (count != 0) - { - var startDt = DateTime.UtcNow.AddSeconds(-count); - for (int i = 0; i < count; i++) - { - startDt = startDt.AddSeconds(i); - res[i] = new PriceChange() - { - Figi = figi, - Ticker = figi + "_ticker", - Id = i, - Time = startDt, - Value = (decimal)(i + 0.5) - }; - } - } - return res; - } - - [Test] - public void Test1() - { - var count = 0; - var figi = "figi"; - var hist = GetHistory(count, figi); - var cacheUnit = new PriceHistoryCacheUnit("", hist); - var data = cacheUnit.GetData().Result; - - Assert.That(data.prices.Length == count); - Assert.That(data.timestamps.Length == count); - } - - [Test] - public void Test2() - { - var count = 1; - var figi = "figi"; - var hist = GetHistory(count, figi); - var cacheUnit = new PriceHistoryCacheUnit("", hist); - var data = cacheUnit.GetData().Result; - - Assert.That(data.prices.Length == count); - Assert.That(data.timestamps.Length == count); - for (var i = 0; i < count; i++) - { - Assert.That((float)hist[i].Value, Is.EqualTo(data.prices[i])); - Assert.That(hist[i].Time, Is.EqualTo(data.timestamps[i])); - } - } - - [Test] - public void Test3() - { - var count = 20; - var figi = "figi"; - var hist = GetHistory(count, figi); - var cacheUnit = new PriceHistoryCacheUnit("", hist); - var data = cacheUnit.GetData().Result; - - Assert.That(data.prices.Length == count); - Assert.That(data.timestamps.Length == count); - - for (var i = 0; i < count; i++) - { - Assert.That((float)hist[i].Value, Is.EqualTo(data.prices[i])); - Assert.That(hist[i].Time, Is.EqualTo(data.timestamps[i])); - } - } - - [Test] - public void Test4() - { - var count = PriceHistoryCacheUnit.CacheMaxLength; - var figi = "figi"; - var hist = GetHistory(count, figi); - var cacheUnit = new PriceHistoryCacheUnit("", hist); - var data = cacheUnit.GetData().Result; - - Assert.That(data.prices.Length == count); - Assert.That(data.timestamps.Length == count); - - for (var i = 0; i < count; i++) - { - Assert.That((float)hist[i].Value, Is.EqualTo(data.prices[i])); - Assert.That(hist[i].Time, Is.EqualTo(data.timestamps[i])); - } - } - - [Test] - public void Test5() - { - var shift = 7; - var count = PriceHistoryCacheUnit.CacheMaxLength + shift; - var figi = "figi"; - var hist = GetHistory(count, figi); - var cacheUnit = new PriceHistoryCacheUnit("", hist); - var data = cacheUnit.GetData().Result; - - Assert.That(data.prices.Length == count - shift); - Assert.That(data.timestamps.Length == count - shift); - - for (var i = 0; i < count; i++) - { - var k = i + shift; - if (k < hist.Length) - { - Assert.That((float)hist[k].Value, Is.EqualTo(data.prices[i])); - Assert.That(hist[k].Time, Is.EqualTo(data.timestamps[i])); - } - } - } - - [Test] - public void Test6() - { - var shift = 10; - var count = PriceHistoryCacheUnit.CacheMaxLength + shift; - var figi = "figi"; - var hist = GetHistory(count, figi); - var cacheUnit = new PriceHistoryCacheUnit("", hist); - var data = cacheUnit.GetData().Result; - - Assert.That(data.prices.Length == count - shift); - Assert.That(data.timestamps.Length == count - shift); - - for (var i = 0; i < count; i++) - { - var k = i + shift; - if (k < hist.Length) - { - Assert.That((float)hist[k].Value, Is.EqualTo(data.prices[i])); - Assert.That(hist[k].Time, Is.EqualTo(data.timestamps[i])); - } - } - } - - [Test] - public void Test7() - { - var shift = 334; - var count = PriceHistoryCacheUnit.CacheMaxLength + shift; - var figi = "figi"; - var hist = GetHistory(count, figi); - var cacheUnit = new PriceHistoryCacheUnit("", hist); - var data = cacheUnit.GetData().Result; - - Assert.That(data.prices.Length == count - shift); - Assert.That(data.timestamps.Length == count - shift); - - for (var i = 0; i < count; i++) - { - var k = i + shift; - if (k < hist.Length) - { - Assert.That((float)hist[k].Value, Is.EqualTo(data.prices[i])); - Assert.That(hist[k].Time, Is.EqualTo(data.timestamps[i])); - } - } - } - - - [Test] - public void Test8() - { - var count = PriceHistoryCacheUnit.CacheMaxLength; - var figi = "figi"; - var hist = GetHistory(count, figi); - var cacheUnit = new PriceHistoryCacheUnit("", hist); - var data = cacheUnit.GetData().Result; - - Assert.That(data.prices.Length == count); - Assert.That(data.timestamps.Length == count); - - for (var i = 0; i < count; i++) - { - Assert.That((float)hist[i].Value, Is.EqualTo(data.prices[i])); - Assert.That(hist[i].Time, Is.EqualTo(data.timestamps[i])); - } - - var newData1 = new PriceChange() { Figi = figi, Ticker = figi, Value = 100500, Time = DateTime.UtcNow }; - - cacheUnit.AddData(newData1); - - var data2 = cacheUnit.GetData().Result; - Assert.IsTrue(data2.prices[data2.prices.Length - 1] == newData1.Value); - Assert.IsTrue(data2.timestamps[data2.timestamps.Length - 1] == newData1.Time); - - var newData2 = new PriceChange() { Figi = figi, Ticker = figi, Value = 100501, Time = DateTime.UtcNow }; - - cacheUnit.AddData(newData2); - - var data3 = cacheUnit.GetData().Result; - Assert.IsTrue(data3.prices[data3.prices.Length - 1] == newData2.Value); - Assert.IsTrue(data3.timestamps[data3.timestamps.Length - 1] == newData2.Time); - } - } -} \ No newline at end of file diff --git a/KLHZ.Trader.Core.Tests/TraderTests.cs b/KLHZ.Trader.Core.Tests/TraderTests.cs index df01c3e..458bb3d 100644 --- a/KLHZ.Trader.Core.Tests/TraderTests.cs +++ b/KLHZ.Trader.Core.Tests/TraderTests.cs @@ -1,4 +1,5 @@ -using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; +using KLHZ.Trader.Core.Common; +using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; namespace KLHZ.Trader.Core.Tests { @@ -7,7 +8,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void IsBuyAllowedTest1() { - + BotModeSwitcher.StartPurchase(); var account = new ManagedAccount("111"); account.Total = 10000; account.Balance = 9000; @@ -24,7 +25,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void IsBuyAllowedTest2() { - + BotModeSwitcher.StartPurchase(); var account = new ManagedAccount("111"); account.Total = 10000; account.Balance = 5000; @@ -41,7 +42,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void IsBuyAllowedTest3() { - + BotModeSwitcher.StartPurchase(); var account = new ManagedAccount("111"); account.Total = 10000; account.Balance = 5000; @@ -58,7 +59,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void IsBuyAllowedTest4() { - + BotModeSwitcher.StartPurchase(); var account = new ManagedAccount("111"); account.Total = 10000; account.Balance = 3000; @@ -75,7 +76,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void IsBuyAllowedTest5() { - + BotModeSwitcher.StartPurchase(); var account = new ManagedAccount("111"); account.Total = 10000; account.Balance = 5000; diff --git a/KLHZ.Trader.Core/Common/Messaging/Services/ProcessedPricesLogger.cs b/KLHZ.Trader.Core/Common/Messaging/Services/ProcessedPricesLogger.cs deleted file mode 100644 index 0a93705..0000000 --- a/KLHZ.Trader.Core/Common/Messaging/Services/ProcessedPricesLogger.cs +++ /dev/null @@ -1,74 +0,0 @@ -using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; -using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; -using KLHZ.Trader.Core.DataLayer; -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using System.Threading.Channels; - -namespace KLHZ.Trader.Core.Common.Messaging.Services -{ - public class ProcessedPricesLogger : IHostedService - { - private readonly ILogger _logger; - private readonly IDataBus _dataBus; - private readonly IDbContextFactory _dbContextFactory; - - private readonly Channel _channel = Channel.CreateUnbounded(); - public ProcessedPricesLogger(IDataBus dataBus, IDbContextFactory dbContextFactory, ILogger logger) - { - _dataBus = dataBus; - _dbContextFactory = dbContextFactory; - _logger = logger; - _dataBus.AddChannel(nameof(ProcessedPricesLogger), _channel); - _ = ProcessMessages(); - } - - private async Task ProcessMessages() - { - var buffer = new List(); - var lastWrite = DateTime.UtcNow; - while (await _channel.Reader.WaitToReadAsync()) - { - try - { - var message = await _channel.Reader.ReadAsync(); - - buffer.Add(new DataLayer.Entities.Prices.ProcessedPrice() - { - Figi = message.Figi, - Processor = message.Processor, - Ticker = message.Ticker, - IsHistoricalData = message.IsHistoricalData, - Time = message.Time, - Value = message.Value, - }); - - if (buffer.Count > 10000 || DateTime.UtcNow - lastWrite > TimeSpan.FromSeconds(5) || _channel.Reader.Count == 0) - { - lastWrite = DateTime.UtcNow; - using var context = await _dbContextFactory.CreateDbContextAsync(); - context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; - await context.AddRangeAsync(buffer); - buffer.Clear(); - await context.SaveChangesAsync(); - } - } - catch (Exception ex) - { - - } - } - } - - public Task StartAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - } -} diff --git a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs index 9c0e538..3183abf 100644 --- a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs @@ -11,7 +11,6 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using System.Collections.Concurrent; using Tinkoff.InvestApi; using Tinkoff.InvestApi.V1; @@ -19,17 +18,16 @@ namespace KLHZ.Trader.Core.Exchange.Services { public class ExchangeDataReader : IHostedService { - private readonly TradeDataProvider _tradeDataProvider; + private readonly TraderDataProvider _tradeDataProvider; private readonly InvestApiClient _investApiClient; private readonly string[] _instrumentsFigis = []; private readonly string[] _managedAccountNamePatterns; private readonly ILogger _logger; - private readonly ConcurrentDictionary _tickersCache = new(); private readonly IDbContextFactory _dbContextFactory; private readonly CancellationTokenSource _cts = new(); private readonly IDataBus _eventBus; private readonly bool _exchangeDataRecievingEnabled; - public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus, TradeDataProvider tradeDataProvider, + public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus, TraderDataProvider tradeDataProvider, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) { @@ -45,6 +43,7 @@ namespace KLHZ.Trader.Core.Exchange.Services public async Task StartAsync(CancellationToken cancellationToken) { + await _tradeDataProvider.Init(); _logger.LogInformation("Инициализация приемника данных с биржи"); var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns); _ = CycleSubscribtion(accounts); @@ -61,7 +60,6 @@ namespace KLHZ.Trader.Core.Exchange.Services await SubscribePrices(); } await Task.Delay(1000); - //await SubscribeCandles(); } catch (Exception ex) { diff --git a/KLHZ.Trader.Core/Exchange/Services/Trader.cs b/KLHZ.Trader.Core/Exchange/Services/Trader.cs index 5156d83..afbf2f4 100644 --- a/KLHZ.Trader.Core/Exchange/Services/Trader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/Trader.cs @@ -1,10 +1,8 @@ using KLHZ.Trader.Core.Common; using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums; -using KLHZ.Trader.Core.Contracts.Declisions.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; -using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums; using KLHZ.Trader.Core.DataLayer.Entities.Prices; @@ -12,13 +10,13 @@ using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Exchange.Models.Trading; using KLHZ.Trader.Core.Exchange.Utils; -using KLHZ.Trader.Core.Math.Declisions.Services.Cache; using KLHZ.Trader.Core.Math.Declisions.Utils; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; +using System.Security.Cryptography; using System.Threading.Channels; using Tinkoff.InvestApi; using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; @@ -28,15 +26,15 @@ namespace KLHZ.Trader.Core.Exchange.Services public class Trader : IHostedService { private readonly IDataBus _dataBus; - private readonly IDbContextFactory _dbContextFactory; - private readonly TradeDataProvider _tradeDataProvider; + private readonly TraderDataProvider _tradeDataProvider; private readonly ILogger _logger; - internal readonly ConcurrentDictionary DeferredLongOpens = new(); - internal readonly ConcurrentDictionary DeferredLongCloses = new(); + private readonly ConcurrentDictionary DeferredLongOpens = new(); + private readonly ConcurrentDictionary DeferredLongCloses = new(); private readonly ConcurrentDictionary OpeningStops = new(); private readonly ConcurrentDictionary Leverages = new(); - private readonly ConcurrentDictionary _historyCash = new(); + private readonly string _bigWindowProcessor = nameof(Trader) + "_big"; + private readonly string _smallWindowProcessor = nameof(Trader) + "_small"; private readonly double _buyStopLength; private readonly decimal _futureComission; @@ -47,19 +45,17 @@ namespace KLHZ.Trader.Core.Exchange.Services private readonly Channel _pricesChannel = Channel.CreateUnbounded(); private readonly Channel _ordersbookChannel = Channel.CreateUnbounded(); - private readonly CancellationTokenSource _cts = new(); + public Trader( ILogger logger, IOptions options, IDataBus dataBus, - IDbContextFactory dbContextFactory, - TradeDataProvider tradeDataProvider, + TraderDataProvider tradeDataProvider, InvestApiClient investApiClient) { _tradeDataProvider = tradeDataProvider; _logger = logger; _dataBus = dataBus; - _dbContextFactory = dbContextFactory; _futureComission = options.Value.FutureComission; _shareComission = options.Value.ShareComission; _accountCashPart = options.Value.AccountCashPart; @@ -84,242 +80,30 @@ namespace KLHZ.Trader.Core.Exchange.Services private async Task ProcessPrices() { - var declisionsForSave = new List(); - var processedPrices = new List(); while (await _pricesChannel.Reader.WaitToReadAsync()) { - - var bigWindowProcessor = nameof(Trader) + "_big"; - var smallWindowProcessor = nameof(Trader) + "_small"; var message = await _pricesChannel.Reader.ReadAsync(); if (_tradingInstrumentsFigis.Contains(message.Figi)) { var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; - if (_historyCash.TryGetValue(message.Figi, out var unit)) - { - await unit.AddData(message); - } - else - { - unit = new PriceHistoryCacheUnit2(message.Figi, message); - _historyCash.TryAdd(message.Figi, unit); - } + try { + await _tradeDataProvider.AddData(message, TimeSpan.FromHours(7)); + await ProcessDeferredLongOpens(message, currentTime); + await ProcessDeferredLongCloses(message, currentTime); if (message.Figi == "FUTIMOEXF000") { - DeferredTrade? longOpen; - DeferredLongOpens.TryGetValue(message.Figi, out longOpen); - if (longOpen != null) - { - var t = currentTime; - if (longOpen.Time <= t - && t - longOpen.Time < TimeSpan.FromMinutes(3)) - { - DeferredLongOpens.TryRemove(message.Figi, out _); - if (message.Value - longOpen.Price < 1) - { - if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) - { - var accounts = _tradeDataProvider.Accounts - .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) - .ToArray(); - foreach (var acc in accounts) - { - if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart)) - { - await _dataBus.Broadcast(new TradeCommand() - { - AccountId = acc.Value.AccountId, - Figi = message.Figi, - CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy, - Count = 1, - RecomendPrice = null, - }); - LogDeclision(declisionsForSave, DeclisionTradeAction.OpenLong, message); - } - } - } - else - { - LogDeclision(declisionsForSave, DeclisionTradeAction.OpenLong, message); - } - } - } - } - - DeferredTrade? longClose; - DeferredLongCloses.TryGetValue(message.Figi, out longClose); - if (longClose != null) - { - if (longClose.Time <= currentTime) - { - DeferredLongCloses.TryRemove(message.Figi, out _); - if (longClose.Price - message.Value < 1) - { - var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); - if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) - { - var assetsForClose = _tradeDataProvider.Accounts - .SelectMany(a => a.Value.Assets.Values) - .Where(a => a.Figi == message.Figi && a.Count > 0) - .ToArray(); - foreach (var asset in assetsForClose) - { - var profit = 0m; - - if (assetType == AssetType.Common && asset.Count > 0) - { - profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, - GetComission(assetType), 1, false); - } - if (assetType == AssetType.Futures) - { - profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, - GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); - } - if (profit > 0) - { - await _dataBus.Broadcast(new TradeCommand() - { - AccountId = asset.AccountId, - Figi = message.Figi, - CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, - Count = (long)asset.Count, - RecomendPrice = null, - }); - LogDeclision(declisionsForSave, DeclisionTradeAction.CloseLong, message); - } - } - } - else - { - LogDeclision(declisionsForSave, DeclisionTradeAction.CloseLong, message); - } - } - } - } - var windowMaxSize = 100; - var data = await unit.GetData(windowMaxSize); + + var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize); var state = ExchangeScheduler.GetCurrentState(message.Time); + await ProcessClearing(data, state, message); + await SellOldAssetsIfCan(message); - if (state == ExchangeState.ClearingTime - && !message.IsHistoricalData - && data.timestamps.Length > 1 - && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3)) - { - await UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]); - } - - if (OpeningStops.TryGetValue(message.Figi, out var dt)) - { - if (dt < currentTime) - { - OpeningStops.TryRemove(message.Figi, out _); - } - } - - if ((unit.BidsCount / unit.AsksCount) < 0.5m || (unit.BidsCount / unit.AsksCount) > 2m) - { - var stopTo = currentTime.AddMinutes(3); - //OpeningStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo); - //LogDeclision(declisionsForSave, DeclisionTradeAction.StopBuyShortTime, message); - } - - var res = TradingEvent.None; - var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 45, 180, 2.5m); - var resultLongClose = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 15, 120, 2.5m).events; - - var uptrendStarts = LocalTrends.CheckByLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(120), TimeSpan.FromSeconds(20), 1.5m, 15); - //var uptrendStarts2 = LocalTrends.CheckByLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(3), 1.5m, 2); - - - res |= (uptrendStarts & TradingEvent.UptrendStart); - res |= resultLongClose; - res |= resultMoveAvFull.events; - - //res = TradingEvent.None; - - //if (!stopBuy &&RandomNumberGenerator.GetInt32(100) < 20) - //{ - // res |= TradingEvent.UptrendStart; - //} - //else if (!stopSell && (RandomNumberGenerator.GetInt32(100) < 20)) - //{ - // res |= TradingEvent.UptrendEnd; - //} - - - - if (resultMoveAvFull.bigWindowAv != 0) - { - LogPrice(processedPrices, message, bigWindowProcessor, resultMoveAvFull.bigWindowAv); - LogPrice(processedPrices, message, smallWindowProcessor, resultMoveAvFull.smallWindowAv); - } - if ((resultLongClose & TradingEvent.StopBuy) == TradingEvent.StopBuy) - { - var stopTo = (message.IsHistoricalData ? message.Time : DateTime.UtcNow).AddMinutes(_buyStopLength / 2); - OpeningStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo); - //LogDeclision(declisionsForSave, DeclisionTradeAction.StopBuy, message); - } - - if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart - && !OpeningStops.TryGetValue(message.Figi, out _) - && state == ExchangeState.Open - && data.timestamps.Length > 1 - && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1))) - { - var trade = new DeferredTrade() - { - Figi = message.Figi, - Price = message.Value, - Time = message.Time.AddSeconds(15) - }; - DeferredLongOpens[message.Figi] = trade; - } - - if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd) - { - var trade = new DeferredTrade() - { - Figi = message.Figi, - Price = message.Value, - Time = message.Time.AddSeconds(15) - }; - DeferredLongCloses[message.Figi] = trade; - } - - //if ((resultLongOpen.events & TradingEvent.ShortOpen) == TradingEvent.ShortOpen - // && !OpeningStops.TryGetValue(message.Figi, out _)) - //{ - // LogDeclision(declisionsForSave, DeclisionTradeAction.OpenShort, message); - //} - - //if ((resultLongOpen.events & TradingEvent.ShortClose) == TradingEvent.ShortClose) - //{ - // LogDeclision(declisionsForSave, DeclisionTradeAction.CloseShort, message); - //} - - if ((!message.IsHistoricalData && (processedPrices.Count > 0 || declisionsForSave.Count > 0)) - || (message.IsHistoricalData && ((processedPrices.Count + declisionsForSave.Count > 10000) || _pricesChannel.Reader.Count == 0))) - { - using var context = await _dbContextFactory.CreateDbContextAsync(); - context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; - - if (processedPrices.Count > 0) - { - await context.ProcessedPrices.AddRangeAsync(processedPrices); - processedPrices.Clear(); - } - if (declisionsForSave.Count > 0) - { - await context.Declisions.AddRangeAsync(declisionsForSave); - declisionsForSave.Clear(); - } - await context.SaveChangesAsync(); - } + ProcessOpeningStops(message, currentTime); + await ProcessNewPriceIMOEXF(data, state, message, windowMaxSize); } } catch (Exception ex) @@ -330,60 +114,255 @@ namespace KLHZ.Trader.Core.Exchange.Services } } - private async Task UpdateFuturesPrice(INewPrice newPrice, decimal newPriceValue) + private async Task SellOldAssetsIfCan(INewPrice message) { - using var context = await _dbContextFactory.CreateDbContextAsync(); - await context.Trades - .Where(t => t.Figi == newPrice.Figi && t.ArchiveStatus == 0 && t.Asset == DataLayer.Entities.Trades.Enums.AssetType.Future) - .ExecuteUpdateAsync(t => t.SetProperty(tr => tr.Price, newPriceValue)); - foreach (var account in _tradeDataProvider.Accounts.Values) + var accounts = _tradeDataProvider.Accounts.Values.ToArray(); + var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); + foreach (var acc in accounts) { - await _tradeDataProvider.SyncPortfolio(account); + var assets = acc.Assets.Values.Where(a => a.Figi == message.Figi && DateTime.UtcNow - a.BoughtAt > TimeSpan.FromHours(4)).ToArray(); + foreach (var asset in assets) + { + var profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, + GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); + if (profit > 0) + { + await _dataBus.Broadcast(new TradeCommand() + { + AccountId = asset.AccountId, + Figi = message.Figi, + CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, + Count = (long)asset.Count, + RecomendPrice = null, + }); + await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); + } + } } } - private static void LogPrice(List prices, INewPrice message, string processor, decimal value) + private async Task ProcessNewPriceIMOEXF((DateTime[] timestamps, decimal[] prices, decimal bidsCount, decimal asksCount) data, + ExchangeState state, + INewPrice message, int windowMaxSize) { - prices.Add(new ProcessedPrice() + var res = TradingEvent.None; + var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 45, 180, TimeSpan.FromSeconds(30), 1m); + //var resultLongClose = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 15, 120, 1.5m).events; + + //ar uptrendStarts = LocalTrends.CheckByLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(120), TimeSpan.FromSeconds(20), 1.5m, 15); + + + //res |= (uptrendStarts & TradingEvent.UptrendStart); + //res |= resultLongClose; + res |= resultMoveAvFull.events; + + if (resultMoveAvFull.bigWindowAv != 0) + { + await LogPrice(message, _bigWindowProcessor, resultMoveAvFull.bigWindowAv); + await LogPrice(message, _smallWindowProcessor, resultMoveAvFull.smallWindowAv); + } + if ((resultMoveAvFull.events & TradingEvent.StopBuy) == TradingEvent.StopBuy) + { + var stopTo = (message.IsHistoricalData ? message.Time : DateTime.UtcNow).AddMinutes(_buyStopLength / 2); + //OpeningStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo); + //await LogDeclision(DeclisionTradeAction.StopBuy, message); + } + + if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart + && !OpeningStops.TryGetValue(message.Figi, out _) + && state == ExchangeState.Open + && data.timestamps.Length > 1 + && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1))) + { + var fullData = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromMinutes(60)); + if (fullData.isFullIntervalExists) + { + var max = fullData.prices.Max(); + var min = fullData.prices.Min(); + + if (max - min < 15 && fullData.prices.Last() - fullData.prices.First() < 4) + { + var trade = new DeferredTrade() + { + Figi = message.Figi, + Price = message.Value, + Time = message.Time.AddSeconds(15) + }; + DeferredLongOpens[message.Figi] = trade; + } + } + } + + if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd) + { + var trade = new DeferredTrade() + { + Figi = message.Figi, + Price = message.Value, + Time = message.Time.AddSeconds(15) + }; + DeferredLongCloses[message.Figi] = trade; + } + } + + private async Task ProcessClearing((DateTime[] timestamps, decimal[] prices, decimal bidsCount, decimal asksCount) data, ExchangeState state, INewPrice message) + { + if (state == ExchangeState.ClearingTime + && !message.IsHistoricalData + && data.timestamps.Length > 1 + && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3)) + { + await _tradeDataProvider.UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]); + } + } + + private void ProcessOpeningStops(INewPrice message, DateTime currentTime) + { + if (OpeningStops.TryGetValue(message.Figi, out var dt)) + { + if (dt < currentTime) + { + OpeningStops.TryRemove(message.Figi, out _); + } + } + } + + private async Task ProcessDeferredLongOpens(INewPrice message, DateTime currentTime) + { + if (message.Figi == "FUTIMOEXF000") + { + DeferredTrade? longOpen; + DeferredLongOpens.TryGetValue(message.Figi, out longOpen); + if (longOpen != null) + { + var t = currentTime; + if (longOpen.Time <= t + && t - longOpen.Time < TimeSpan.FromMinutes(3)) + { + DeferredLongOpens.TryRemove(message.Figi, out _); + if (message.Value - longOpen.Price < 1) + { + if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) + { + var accounts = _tradeDataProvider.Accounts + .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) + .ToArray(); + foreach (var acc in accounts) + { + if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart)) + { + if (RandomNumberGenerator.GetInt32(100) > 50) + { + await _dataBus.Broadcast(new TradeCommand() + { + AccountId = acc.Value.AccountId, + Figi = message.Figi, + CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy, + Count = 1, + RecomendPrice = null, + }); + } + + await LogDeclision(DeclisionTradeAction.OpenLong, message); + } + } + } + else + { + await LogDeclision(DeclisionTradeAction.OpenLong, message); + } + } + } + } + } + } + + private async Task ProcessDeferredLongCloses(INewPrice message, DateTime currentTime) + { + if (message.Figi == "FUTIMOEXF000") + { + DeferredTrade? longClose; + DeferredLongCloses.TryGetValue(message.Figi, out longClose); + if (longClose != null) + { + if (longClose.Time <= currentTime) + { + DeferredLongCloses.TryRemove(message.Figi, out _); + if (longClose.Price - message.Value < 1) + { + var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); + if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) + { + var assetsForClose = _tradeDataProvider.Accounts + .SelectMany(a => a.Value.Assets.Values) + .Where(a => a.Figi == message.Figi && a.Count > 0) + .ToArray(); + foreach (var asset in assetsForClose) + { + var profit = 0m; + + if (assetType == AssetType.Common && asset.Count > 0) + { + profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, + GetComission(assetType), 1, false); + } + if (assetType == AssetType.Futures) + { + profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, + GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); + } + if (profit > 0) + { + await _dataBus.Broadcast(new TradeCommand() + { + AccountId = asset.AccountId, + Figi = message.Figi, + CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, + Count = (long)asset.Count, + RecomendPrice = null, + }); + await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); + } + } + } + else + { + await LogDeclision(DeclisionTradeAction.CloseLong, message); + } + } + } + } + } + } + + private async Task LogPrice(INewPrice message, string processor, decimal value) + { + await _tradeDataProvider.LogPrice(new ProcessedPrice() { Figi = message.Figi, Ticker = message.Ticker, Processor = processor, Time = message.Time, Value = value, - }); + }, !message.IsHistoricalData); } - private static void LogDeclision(List declisions, DeclisionTradeAction action, INewPrice message) + private async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null) { - declisions.Add(new Declision() + await _tradeDataProvider.LogDeclision(new Declision() { AccountId = string.Empty, Figi = message.Figi, Ticker = message.Ticker, + Value = profit, Price = message.Value, Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Action = action, - }); - } - - private async Task ProcessOrdersbooks() - { - while (await _ordersbookChannel.Reader.WaitToReadAsync()) - { - var message = await _ordersbookChannel.Reader.ReadAsync(); - if (!_historyCash.TryGetValue(message.Figi, out var data)) - { - data = new PriceHistoryCacheUnit2(message.Figi); - _historyCash.TryAdd(message.Figi, data); - } - await data.AddOrderbook(message); - } + }, !message.IsHistoricalData); } public Task StopAsync(CancellationToken cancellationToken) { - _cts.Cancel(); return Task.CompletedTask; } @@ -433,5 +412,14 @@ namespace KLHZ.Trader.Core.Exchange.Services return true; } + + private async Task ProcessOrdersbooks() + { + while (await _ordersbookChannel.Reader.WaitToReadAsync()) + { + var message = await _ordersbookChannel.Reader.ReadAsync(); + await _tradeDataProvider.AddOrderbook(message); + } + } } } diff --git a/KLHZ.Trader.Core/Exchange/Services/TradeDataProvider.cs b/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs similarity index 50% rename from KLHZ.Trader.Core/Exchange/Services/TradeDataProvider.cs rename to KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs index e47fbd3..b383fd9 100644 --- a/KLHZ.Trader.Core/Exchange/Services/TradeDataProvider.cs +++ b/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs @@ -1,12 +1,18 @@ -using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; +using KLHZ.Trader.Core.Contracts.Declisions.Interfaces; +using KLHZ.Trader.Core.Contracts.Messaging.Dtos; +using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.DataLayer; +using KLHZ.Trader.Core.DataLayer.Entities.Declisions; +using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Extentions; using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; using KLHZ.Trader.Core.Exchange.Models.Configs; +using KLHZ.Trader.Core.Math.Declisions.Services.Cache; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; +using System.Threading.Channels; using Tinkoff.InvestApi; using Tinkoff.InvestApi.V1; using Asset = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Asset; @@ -14,13 +20,13 @@ using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; namespace KLHZ.Trader.Core.Exchange.Services { - //todo перенести сюда весь кэш и всю работу по сохранению данных. - public class TradeDataProvider + public class TraderDataProvider { + private readonly ConcurrentDictionary _historyCash = new(); + private readonly InvestApiClient _investApiClient; private readonly IDbContextFactory _dbContextFactory; private readonly ILogger _logger; - private readonly IDataBus _dataBus; private readonly string[] _managedAccountsNamePatterns = []; private readonly string[] _instrumentsFigis = []; @@ -28,15 +34,18 @@ namespace KLHZ.Trader.Core.Exchange.Services private readonly ConcurrentDictionary _tickersCache = new(); private readonly ConcurrentDictionary _assetTypesCache = new(); internal readonly ConcurrentDictionary Accounts = new(); - public TradeDataProvider(InvestApiClient investApiClient, IOptions options, IDbContextFactory dbContextFactory, ILogger logger, IDataBus dataBus) + private readonly bool _isDataRecievingAllowed = false; + private readonly Channel _forSave = Channel.CreateUnbounded(); + private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1); + + public TraderDataProvider(InvestApiClient investApiClient, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) { _investApiClient = investApiClient; _dbContextFactory = dbContextFactory; _logger = logger; - _dataBus = dataBus; _managedAccountsNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray(); _instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray(); - + _isDataRecievingAllowed = options.Value.ExchangeDataRecievingEnabled; foreach (var lev in options.Value.InstrumentsSettings) { @@ -44,35 +53,121 @@ namespace KLHZ.Trader.Core.Exchange.Services } } - public async Task Init() + public async ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(string figi, TimeSpan timeSpan) { - var shares = await _investApiClient.Instruments.SharesAsync(); - foreach (var share in shares.Instruments) + if (_historyCash.TryGetValue(figi, out var unit)) { - if (_instrumentsFigis.Contains(share.Figi)) - { - _tickersCache.TryAdd(share.Figi, share.Ticker); - _assetTypesCache.TryAdd(share.Figi, AssetType.Common); - } + var res = await unit.GetData(timeSpan); + return (res.timestamps, res.prices, res.isFullIntervalExists); } - var futures = await _investApiClient.Instruments.FuturesAsync(); - foreach (var future in futures.Instruments) + return (Array.Empty(), Array.Empty(), false); + } + + public async ValueTask<(DateTime[] timestamps, decimal[] prices, decimal bidsCount, decimal asksCount)> GetData(string figi, int? length = null) + { + if (_historyCash.TryGetValue(figi, out var unit)) { - if (_instrumentsFigis.Contains(future.Figi)) + var res = await unit.GetData(length); + return (res.timestamps, res.prices, unit.BidsCount, unit.AsksCount); + } + return (Array.Empty(), Array.Empty(), 1, 1); + } + + public async ValueTask AddData(INewPrice message, TimeSpan? clearingInterval = null) + { + if (_historyCash.TryGetValue(message.Figi, out var unit)) + { + if (clearingInterval.HasValue) { - _tickersCache.TryAdd(future.Figi, future.Ticker); - _assetTypesCache.TryAdd(future.Figi, AssetType.Futures); + var lasts = await unit.GetLastValues(); + if (message.Time - lasts.time > clearingInterval.Value) + { + unit = new PriceHistoryCacheUnit2(message.Figi); + _historyCash[message.Figi] = unit; + } } + await unit.AddData(message); + } + else + { + unit = new PriceHistoryCacheUnit2(message.Figi, message); + _historyCash.TryAdd(message.Figi, unit); + } + } + + public async ValueTask AddOrderbook(IOrderbook orderbook) + { + if (!_historyCash.TryGetValue(orderbook.Figi, out var unit)) + { + unit = new PriceHistoryCacheUnit2(orderbook.Figi); + _historyCash.TryAdd(orderbook.Figi, unit); } - var accounts = await _investApiClient.GetAccounts(_managedAccountsNamePatterns); - var accountsList = new List(); - int i = 0; - foreach (var accountId in accounts) + await unit.AddOrderbook(orderbook); + } + + public async Task Init() + { + await _initSemaphore.WaitAsync(TimeSpan.FromSeconds(15)); + try { - var acc = new ManagedAccount(accountId); - await SyncPortfolio(acc); - Accounts[accountId] = acc; + var shares = await _investApiClient.Instruments.SharesAsync(); + foreach (var share in shares.Instruments) + { + if (_instrumentsFigis.Contains(share.Figi)) + { + _tickersCache.TryAdd(share.Figi, share.Ticker); + _assetTypesCache.TryAdd(share.Figi, AssetType.Common); + } + } + var futures = await _investApiClient.Instruments.FuturesAsync(); + foreach (var future in futures.Instruments) + { + if (_instrumentsFigis.Contains(future.Figi)) + { + _tickersCache.TryAdd(future.Figi, future.Ticker); + _assetTypesCache.TryAdd(future.Figi, AssetType.Futures); + } + } + + var accounts = await _investApiClient.GetAccounts(_managedAccountsNamePatterns); + var accountsList = new List(); + foreach (var accountId in accounts) + { + var acc = new ManagedAccount(accountId); + await SyncPortfolio(acc); + Accounts[accountId] = acc; + } + + if (_isDataRecievingAllowed) + { + var time = DateTime.UtcNow.AddHours(-1.5); + using var context1 = await _dbContextFactory.CreateDbContextAsync(); + context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + var data = await context1.PriceChanges + .Where(c => _instrumentsFigis.Contains(c.Figi) && c.Time >= time) + .OrderBy(c => c.Time) + .Select(c => new NewPriceMessage() + { + Figi = c.Figi, + Ticker = c.Ticker, + Time = c.Time, + Value = c.Value, + IsHistoricalData = true + }) + .ToArrayAsync(); + + foreach (var price in data) + { + await AddData(price); + } + } + + _ = WritePricesTask(); + } + catch(Exception ex) + { + } } @@ -161,7 +256,20 @@ namespace KLHZ.Trader.Core.Exchange.Services } } - public async Task LogDeal(DealResult dealResult) + internal async Task UpdateFuturesPrice(INewPrice newPrice, decimal newPriceValue) + { + using var context = await _dbContextFactory.CreateDbContextAsync(); + context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + await context.Trades + .Where(t => t.Figi == newPrice.Figi && t.ArchiveStatus == 0 && t.Asset == DataLayer.Entities.Trades.Enums.AssetType.Future) + .ExecuteUpdateAsync(t => t.SetProperty(tr => tr.Price, newPriceValue).SetProperty(tr => tr.BoughtAt, DateTime.UtcNow)); + foreach (var account in Accounts.Values) + { + await SyncPortfolio(account); + } + } + + internal async Task LogDeal(DealResult dealResult) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; @@ -237,5 +345,76 @@ namespace KLHZ.Trader.Core.Exchange.Services await SyncPortfolio(Accounts[dealResult.AccountId]); } + internal async Task LogPrice(ProcessedPrice price, bool saveImmediately) + { + if (saveImmediately) + { + using var context = await _dbContextFactory.CreateDbContextAsync(); + context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + await context.ProcessedPrices.AddRangeAsync(price); + await context.SaveChangesAsync(); + } + else + { + await _forSave.Writer.WriteAsync(price); + } + } + + internal async Task LogDeclision(Declision declision, bool saveImmediately) + { + if (saveImmediately) + { + using var context = await _dbContextFactory.CreateDbContextAsync(); + context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + await context.Declisions.AddRangeAsync(declision); + await context.SaveChangesAsync(); + } + else + { + await _forSave.Writer.WriteAsync(declision); + } + } + + private async Task WritePricesTask() + { + var buffer1 = new List(); + var buffer2 = new List(); + while (await _forSave.Reader.WaitToReadAsync()) + { + try + { + var obj = await _forSave.Reader.ReadAsync(); + if (obj is ProcessedPrice price) + { + buffer1.Add(price); + } + if (obj is Declision dec) + { + buffer2.Add(dec); + } + if ((buffer1.Count + buffer2.Count) > 10000 || _forSave.Reader.Count == 0) + { + using var context = await _dbContextFactory.CreateDbContextAsync(); + context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + if (buffer1.Count > 0) + { + await context.ProcessedPrices.AddRangeAsync(buffer1); + } + if (buffer2.Count > 0) + { + await context.Declisions.AddRangeAsync(buffer2); + } + + await context.SaveChangesAsync(); + buffer1.Clear(); + buffer2.Clear(); + } + } + catch (Exception ex) + { + + } + } + } } } diff --git a/KLHZ.Trader.Core/Exchange/Services/TradingCommandsExecutor.cs b/KLHZ.Trader.Core/Exchange/Services/TradingCommandsExecutor.cs index ba44046..e7febaf 100644 --- a/KLHZ.Trader.Core/Exchange/Services/TradingCommandsExecutor.cs +++ b/KLHZ.Trader.Core/Exchange/Services/TradingCommandsExecutor.cs @@ -11,13 +11,13 @@ namespace KLHZ.Trader.Core.Exchange.Services { public class TradingCommandsExecutor : IHostedService { - private readonly TradeDataProvider _tradeDataProvider; + private readonly TraderDataProvider _tradeDataProvider; private readonly InvestApiClient _investApiClient; private readonly IDataBus _dataBus; private readonly ILogger _logger; private readonly Channel _channel = Channel.CreateUnbounded(); - public TradingCommandsExecutor(InvestApiClient investApiClient, IDataBus dataBus, ILogger logger, TradeDataProvider tradeDataProvider) + public TradingCommandsExecutor(InvestApiClient investApiClient, IDataBus dataBus, ILogger logger, TraderDataProvider tradeDataProvider) { _investApiClient = investApiClient; _dataBus = dataBus; diff --git a/KLHZ.Trader.Service/Controllers/PlayController.cs b/KLHZ.Trader.Service/Controllers/PlayController.cs index 0bb45d5..239d46c 100644 --- a/KLHZ.Trader.Service/Controllers/PlayController.cs +++ b/KLHZ.Trader.Service/Controllers/PlayController.cs @@ -24,7 +24,7 @@ namespace KLHZ.Trader.Service.Controllers { try { - var time = new DateTime(2025, 9, 3, 13, 5, 0, DateTimeKind.Utc); + var time = DateTime.UtcNow.AddHours(-4); using var context1 = await _dbContextFactory.CreateDbContextAsync(); context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var data = await context1.PriceChanges @@ -39,7 +39,7 @@ namespace KLHZ.Trader.Service.Controllers IsHistoricalData = true }) .ToArrayAsync(); - //data = data.Where(d=>d.Time> time).ToArray(); + data = data.Where(d=>d.Time> time).ToArray(); foreach (var mess in data) { await _dataBus.Broadcast(mess); diff --git a/KLHZ.Trader.Service/Program.cs b/KLHZ.Trader.Service/Program.cs index c8eddc8..aa447aa 100644 --- a/KLHZ.Trader.Service/Program.cs +++ b/KLHZ.Trader.Service/Program.cs @@ -51,7 +51,7 @@ builder.Services.AddHostedService(); builder.Services.AddSingleton(); -builder.Services.AddSingleton(); +builder.Services.AddSingleton(); builder.Services.AddSingleton(); for (int i = 0; i < 10; i++)