diff --git a/KLHZ.Trader.Core.Math/Declisions/Utils/Statistics.cs b/KLHZ.Trader.Core.Math/Declisions/Utils/Statistics.cs index 16b3860..85cd275 100644 --- a/KLHZ.Trader.Core.Math/Declisions/Utils/Statistics.cs +++ b/KLHZ.Trader.Core.Math/Declisions/Utils/Statistics.cs @@ -1,4 +1,5 @@ -using System; +using KLHZ.Trader.Core.Contracts.Declisions.Dtos; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -8,6 +9,12 @@ namespace KLHZ.Trader.Core.Math.Declisions.Utils { public static class Statistics { + + public static decimal Mean(this CachedValue[] values) + { + return values.Sum(x => x.Value)/ values.Length; + } + private static (decimal mean, decimal std) CaclSigma(decimal[] values) { var mean = values.Sum() / values.Length; diff --git a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs index d72102e..c0a9bb2 100644 --- a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs @@ -227,7 +227,8 @@ namespace KLHZ.Trader.Core.Exchange.Services AsksCount = asks.Length, BidsCount = asks.Length, }; - await _tradeDataProvider.AddOrderbook(message); + + await _eventBus.Broadcast(message); } if (orderbookItemsBuffer.Count + pricesBuffer.Count > 100 || (DateTime.UtcNow - lastWrite).TotalSeconds > 5) diff --git a/KLHZ.Trader.Core/Exchange/Services/Trader.cs b/KLHZ.Trader.Core/Exchange/Services/Trader.cs index fa4fcab..5c535ca 100644 --- a/KLHZ.Trader.Core/Exchange/Services/Trader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/Trader.cs @@ -38,6 +38,7 @@ namespace KLHZ.Trader.Core.Exchange.Services private readonly ConcurrentDictionary LastTradingEvents = new(); private readonly ConcurrentDictionary TradingModes = new(); + private readonly ConcurrentDictionary OrderBooks = new(); private readonly ConcurrentDictionary LongOpeningStops = new(); private readonly ConcurrentDictionary ShortOpeningStops = new(); @@ -55,6 +56,7 @@ namespace KLHZ.Trader.Core.Exchange.Services private readonly Channel _pricesChannel = Channel.CreateUnbounded(); private readonly Channel _commands = Channel.CreateUnbounded(); + private readonly Channel _orderbooks = Channel.CreateUnbounded(); public Trader( ILogger logger, IOptions options, @@ -86,9 +88,10 @@ namespace KLHZ.Trader.Core.Exchange.Services public Task StartAsync(CancellationToken cancellationToken) { _dataBus.AddChannel(nameof(Trader), _pricesChannel); - //_dataBus.AddChannel(nameof(Trader), _ordersbookChannel); + _dataBus.AddChannel(nameof(Trader), _orderbooks); _dataBus.AddChannel(nameof(Trader), _commands); _ = ProcessPrices(); + _ = ProcessOrderbooks(); _ = ProcessCommands(); return Task.CompletedTask; } @@ -246,6 +249,43 @@ namespace KLHZ.Trader.Core.Exchange.Services } } + private async Task ProcessOrderbooks() + { + while (await _orderbooks.Reader.WaitToReadAsync()) + { + var message = await _orderbooks.Reader.ReadAsync(); + await _tradeDataProvider.AddOrderbook(message); + + //if (OrderBooks.TryGetValue(message.Figi, out var oldMessage)) + //{ + // if (message.Bids.Length > 0 && oldMessage.Bids.Length>0) + // { + // var val = message.Bids[0].Count - oldMessage.Bids[0].Count; + // await _tradeDataProvider.AddDataTo20SecondsWindowCache(message.Figi, "dbids0", new Contracts.Declisions.Dtos.CachedValue() + // { + // Time = message.Time, + // Value = val + // }); + // if (val!=0) + // await LogPrice(message.Figi, message.Ticker, message.Time, val, "bids0"); + // } + // if (message.Asks.Length > 0 && oldMessage.Asks.Length > 0) + // { + // var val = message.Asks[0].Count - oldMessage.Asks[0].Count; + // await _tradeDataProvider.AddDataTo20SecondsWindowCache(message.Figi, "dasks0", new Contracts.Declisions.Dtos.CachedValue() + // { + // Time = message.Time, + // Value = val + // }); + // if (val != 0) + // await LogPrice(message.Figi, message.Ticker, message.Time, val, "asks0"); + // } + + //} + OrderBooks[message.Figi] = message; + } + } + private async Task ProcessPrices() { var pricesCache1 = new Dictionary>(); @@ -600,9 +640,16 @@ namespace KLHZ.Trader.Core.Exchange.Services if (changemodes.Length > 1) { - await LogPrice(message, "changemode", changemodes[changemodes.Length - 1].Value - changemodes[changemodes.Length - 2].Value); + await LogPrice(message, "changemode", changeModIndicator); } + //var sells20 = await _tradeDataProvider.GetDataFrom20SecondsWindowCache(message.Figi, "2"); + //var buys20 = await _tradeDataProvider.GetDataFrom20SecondsWindowCache(message.Figi, "1"); + + //if (sells20.Length>0) + // await LogPrice(message.Figi, message.Ticker, message.Time, sells20.Sum(v => v.Value), "sells20"); + //if (buys20.Length > 0) + // await LogPrice(message.Figi, message.Ticker, message.Time, buys20.Sum(v => v.Value), "buys20"); //if (changemodes[changemodes.Length-1].Time - changemodes[0].Time > TimeSpan.FromMinutes(10)) //{ // var diffs = SignalProcessing.CalcDiffs(changemodes.Select(c => c.Value).ToArray()); @@ -851,7 +898,7 @@ INewPrice message, int windowMaxSize, decimal? uptrendStartingDetectionMeanfullS // result = MergeResultsMax(result, getLocalTrendsModsTask.Result); //result = MergeResultsMult(result, getFFTModsTask.Result); //result = MergeResultsMult(result, getSellsDiffsModsTask.Result); - //result = MergeResultsMult(result, getTradingModeModsTask.Result); + result = MergeResultsMult(result, getTradingModeModsTask.Result); result = MergeResultsMult(result, getSpeedResultantModsTask.Result); if (result[TradingEvent.UptrendStart] >= Constants.UppingCoefficient @@ -970,6 +1017,18 @@ INewPrice message, int windowMaxSize, decimal? uptrendStartingDetectionMeanfullS }, false); } + private async Task LogPrice(string figi, string ticker,DateTime time, decimal value, string processor) + { + await _tradeDataProvider.LogPrice(new ProcessedPrice() + { + Figi = figi, + Ticker = ticker, + Processor = processor, + Time = time, + Value = value, + }, false); + } + private async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null) { await _tradeDataProvider.LogDeclision(new Declision() @@ -1110,23 +1169,23 @@ INewPrice message, int windowMaxSize, decimal? uptrendStartingDetectionMeanfullS { case TradingMode.Stable: { - return (0.5m, -0.5m); + return (-0.5m, 0.5m); } case TradingMode.Dropping: { - return (-1.5m, null); ; + return (-1.5m, 0.5m); ; } case TradingMode.SlowDropping: { - return (-0.5m, null); + return (-1m, 0.5m); } case TradingMode.SlowGrowing: { - return (null, 0.5m); + return (-0.5m, 1m); } case TradingMode.Growing: { - return (null, 1.5m); + return (-0.5m, 1.5m); } default: { diff --git a/KLHZ.Trader.Service/Controllers/PlayController.cs b/KLHZ.Trader.Service/Controllers/PlayController.cs index cb659d4..92ba368 100644 --- a/KLHZ.Trader.Service/Controllers/PlayController.cs +++ b/KLHZ.Trader.Service/Controllers/PlayController.cs @@ -42,7 +42,7 @@ namespace KLHZ.Trader.Service.Controllers context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; - while (time1 < DateTime.UtcNow.Date) + while (time1 < DateTime.UtcNow) { var data = new List(); var data2 = new List(); @@ -197,13 +197,27 @@ namespace KLHZ.Trader.Service.Controllers } } } + else + { + var ob = data[i].Orderbook; + if (ob != null) + { + var d = new TimeSeriesData() + { + Figi = ob.Figi, + Orderbook = ob, + Time = ob.Time, + }; + data2.Add(d); + } + } } foreach (var mess in data2) { if (mess.Orderbook != null) { - await _traderDataProvider.AddOrderbook(mess.Orderbook); + await _dataBus.Broadcast(mess.Orderbook); } if (mess.NewPrice != null) {