diff --git a/KLHZ.Trader.Core/Exchange/Services/Trader.cs b/KLHZ.Trader.Core/Exchange/Services/Trader.cs index 9b27f9e..fa4dfb1 100644 --- a/KLHZ.Trader.Core/Exchange/Services/Trader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/Trader.cs @@ -140,127 +140,135 @@ namespace KLHZ.Trader.Core.Exchange.Services { var message = await _pricesChannel.Reader.ReadAsync(); - #region Ускорение обработки исторических данных при отладке - if (message.IsHistoricalData) + try { - await _tradeDataProvider.AddData(message, TimeSpan.FromHours(6)); - if (!pricesCache.TryGetValue(message.Figi, out var list)) - { - list = new List(); - pricesCache[message.Figi] = list; - } - list.Add(message); - - if ((list.Last().Time - list.First().Time).TotalSeconds < 0.5) + #region Ускорение обработки исторических данных при отладке + if (message.IsHistoricalData) { + await _tradeDataProvider.AddData(message, TimeSpan.FromHours(6)); + if (!pricesCache.TryGetValue(message.Figi, out var list)) + { + list = new List(); + pricesCache[message.Figi] = list; + } list.Add(message); - continue; - } - else - { - message = new PriceChange() - { - Figi = message.Figi, - Ticker = message.Ticker, - Count = message.Count, - Direction = message.Direction, - IsHistoricalData = message.IsHistoricalData, - Time = message.Time, - Value = list.Sum(l => l.Value) / list.Count - }; - list.Clear(); - } - try - { - if (timesCache.TryGetValue(message.Figi, out var dt)) + if ((list.Last().Time - list.First().Time).TotalSeconds < 0.5) { - if ((message.Time - dt).TotalSeconds > 120) + list.Add(message); + continue; + } + else + { + message = new PriceChange() + { + Figi = message.Figi, + Ticker = message.Ticker, + Count = message.Count, + Direction = message.Direction, + IsHistoricalData = message.IsHistoricalData, + Time = message.Time, + Value = list.Sum(l => l.Value) / list.Count + }; + list.Clear(); + } + + try + { + if (timesCache.TryGetValue(message.Figi, out var dt)) + { + if ((message.Time - dt).TotalSeconds > 120) + { + timesCache[message.Figi] = message.Time; + + TradingModes[message.Figi] = await CalcTradingMode(message); + } + } + else { timesCache[message.Figi] = message.Time; - - TradingModes[message.Figi] = await CalcTradingMode(message); } } - else + catch (Exception ex) { - timesCache[message.Figi] = message.Time; + } } - catch(Exception ex) + var mode = (decimal)TradingModes[message.Figi]; + await LogPrice(message, "trading_mode", mode); + //continue; + #endregion + if (message.Figi == "BBG004730N88") { + if (message.Direction == 1) + { + await _tradeDataProvider.AddDataTo5MinuteWindowCache(message.Figi, Constants._1minBuyCacheKey, new Contracts.Declisions.Dtos.CachedValue() + { + Time = message.Time, + Value = (decimal)message.Count + }); + } + if (message.Direction == 2) + { + await _tradeDataProvider.AddDataTo5MinuteWindowCache(message.Figi, Constants._1minSellCacheKey, new Contracts.Declisions.Dtos.CachedValue() + { + Time = message.Time, + Value = (decimal)message.Count + }); + } + var sberSells = await _tradeDataProvider.GetDataFrom5MinuteWindowCache("BBG004730N88", Constants._1minSellCacheKey); + var sberBuys = await _tradeDataProvider.GetDataFrom5MinuteWindowCache("BBG004730N88", Constants._1minBuyCacheKey); + var sells = sberSells.Sum(s => s.Value); + var buys = sberBuys.Sum(s => s.Value); + var su = sells + buys; + if (su != 0) + { + await LogPrice(message, "sellsbuysbalance", (sells / su - 0.5m) * 2); + } + } + + if (_tradingInstrumentsFigis.Contains(message.Figi)) + { + var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; + + try + { + ProcessStops(message, currentTime); + var windowMaxSize = 2000; + await SellAssetsIfNeed(message); + var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize); + var state = ExchangeScheduler.GetCurrentState(message.Time); + await ProcessClearing(data, state, message); + + if (TradingModes[message.Figi] == TradingMode.Stable) + { + await ProcessNewPriceIMOEXF_Stable(data, state, message, windowMaxSize); + } + else if (TradingModes[message.Figi] == TradingMode.SlowDropping) + { + await ProcessNewPriceIMOEXF_Dropping(data, state, message, windowMaxSize, 3); + } + else if (TradingModes[message.Figi] == TradingMode.Dropping) + { + await ProcessNewPriceIMOEXF_Dropping(data, state, message, windowMaxSize, 6); + } + else + { + await ProcessNewPriceIMOEXF2(data, state, message, windowMaxSize); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF"); + } } } - - await LogPrice(message, "trading_mode", (int)TradingModes[message.Figi]); - //continue; - #endregion - if (message.Figi == "BBG004730N88") + catch(Exception e) { - if (message.Direction == 1) - { - await _tradeDataProvider.AddDataTo5MinuteWindowCache(message.Figi, Constants._1minBuyCacheKey, new Contracts.Declisions.Dtos.CachedValue() - { - Time = message.Time, - Value = (decimal)message.Count - }); - } - if (message.Direction == 2) - { - await _tradeDataProvider.AddDataTo5MinuteWindowCache(message.Figi, Constants._1minSellCacheKey, new Contracts.Declisions.Dtos.CachedValue() - { - Time = message.Time, - Value = (decimal)message.Count - }); - } - var sberSells = await _tradeDataProvider.GetDataFrom5MinuteWindowCache("BBG004730N88", Constants._1minSellCacheKey); - var sberBuys = await _tradeDataProvider.GetDataFrom5MinuteWindowCache("BBG004730N88", Constants._1minBuyCacheKey); - var sells = sberSells.Sum(s => s.Value); - var buys = sberBuys.Sum(s => s.Value); - var su = sells + buys; - if (su != 0) - { - await LogPrice(message, "sellsbuysbalance", (sells / su - 0.5m) * 2); - } } - if (_tradingInstrumentsFigis.Contains(message.Figi)) - { - var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; - - try - { - ProcessStops(message, currentTime); - var windowMaxSize = 2000; - await SellAssetsIfNeed(message); - var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize); - var state = ExchangeScheduler.GetCurrentState(message.Time); - await ProcessClearing(data, state, message); - - if (TradingModes[message.Figi] == TradingMode.Stable) - { - await ProcessNewPriceIMOEXF_Stable(data, state, message, windowMaxSize); - } - else if (TradingModes[message.Figi] == TradingMode.SlowDropping) - { - await ProcessNewPriceIMOEXF_Dropping(data,state,message,windowMaxSize,3); - } - else if (TradingModes[message.Figi] == TradingMode.Dropping) - { - await ProcessNewPriceIMOEXF_Dropping(data, state, message, windowMaxSize, 6); - } - else - { - await ProcessNewPriceIMOEXF2(data, state, message, windowMaxSize); - } - } - catch (Exception ex) - { - _logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF"); - } - } } } @@ -711,23 +719,26 @@ namespace KLHZ.Trader.Core.Exchange.Services await LogDeclision(DeclisionTradeAction.OpenLong, message); } - foreach (var acc in _tradeDataProvider.Accounts) + if (!message.IsHistoricalData) { - if (acc.Value.Assets.TryGetValue(message.Figi, out var asset)) + foreach (var acc in _tradeDataProvider.Accounts) { - var order = acc.Value.Orders.Values.FirstOrDefault(o => o.Figi == message.Figi && o.Direction == DealDirection.Sell); - if (order == null && asset.Count>0) + if (acc.Value.Assets.TryGetValue(message.Figi, out var asset)) { - var command = new TradeCommand() + var order = acc.Value.Orders.Values.FirstOrDefault(o => o.Figi == message.Figi && o.Direction == DealDirection.Sell); + if (order == null && asset.Count > 0) { - AccountId = asset.AccountId, - Figi = message.Figi, - CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.LimitSell, - Count = (long)asset.Count, - RecomendPrice = asset.BoughtPrice + 3, - EnableMargin = false, - }; - await _dataBus.Broadcast(command); + var command = new TradeCommand() + { + AccountId = asset.AccountId, + Figi = message.Figi, + CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.LimitSell, + Count = (long)asset.Count, + RecomendPrice = asset.BoughtPrice + 3, + EnableMargin = false, + }; + await _dataBus.Broadcast(command); + } } } } @@ -845,23 +856,27 @@ namespace KLHZ.Trader.Core.Exchange.Services await LogDeclision(DeclisionTradeAction.CloseShort, message); } } - foreach (var acc in _tradeDataProvider.Accounts) + + if (!message.IsHistoricalData) { - if (acc.Value.Assets.TryGetValue(message.Figi, out var asset)) + foreach (var acc in _tradeDataProvider.Accounts) { - var order = acc.Value.Orders.Values.FirstOrDefault(o => o.Figi == message.Figi && o.Direction == DealDirection.Buy); - if (order == null && asset.Count<0) + if (acc.Value.Assets.TryGetValue(message.Figi, out var asset)) { - var command = new TradeCommand() + var order = acc.Value.Orders.Values.FirstOrDefault(o => o.Figi == message.Figi && o.Direction == DealDirection.Buy); + if (order == null && asset.Count < 0) { - AccountId = asset.AccountId, - Figi = message.Figi, - CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.LimitBuy, - Count = System.Math.Abs((long)asset.Count), - RecomendPrice = asset.BoughtPrice - step, - EnableMargin = false, - }; - await _dataBus.Broadcast(command); + var command = new TradeCommand() + { + AccountId = asset.AccountId, + Figi = message.Figi, + CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.LimitBuy, + Count = System.Math.Abs((long)asset.Count), + RecomendPrice = asset.BoughtPrice - step, + EnableMargin = false, + }; + await _dataBus.Broadcast(command); + } } } }