using Google.Protobuf.WellKnownTypes; using KLHZ.Trader.Core.Common; using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Exchange.Models.Trading; using KLHZ.Trader.Core.Exchange.Utils; using KLHZ.Trader.Core.Math.Declisions.Dtos.FFT.Enums; using KLHZ.Trader.Core.Math.Declisions.Utils; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Security.Cryptography; using System.Threading.Channels; using Telegram.Bot.Types; using Tinkoff.InvestApi; using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; namespace KLHZ.Trader.Core.Exchange.Services { public class Trader : IHostedService { private readonly IDataBus _dataBus; private readonly TraderDataProvider _tradeDataProvider; private readonly ILogger _logger; private readonly ConcurrentDictionary LongOpeningStops = new(); private readonly ConcurrentDictionary LongClosingStops = new(); private readonly ConcurrentDictionary ShortClosingStops = new(); private readonly ConcurrentDictionary Leverages = new(); private readonly decimal _futureComission; private readonly decimal _shareComission; private readonly decimal _accountCashPart; private readonly decimal _accountCashPartFutures; private readonly string[] _tradingInstrumentsFigis = []; private readonly Channel _pricesChannel = Channel.CreateUnbounded(); private readonly Channel _ordersbookChannel = Channel.CreateUnbounded(); public Trader( ILogger logger, IOptions options, IDataBus dataBus, TraderDataProvider tradeDataProvider, InvestApiClient investApiClient) { _tradeDataProvider = tradeDataProvider; _logger = logger; _dataBus = dataBus; _futureComission = options.Value.FutureComission; _shareComission = options.Value.ShareComission; _accountCashPart = options.Value.AccountCashPart; _accountCashPartFutures = options.Value.AccountCashPartFutures; _tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis; foreach (var lev in options.Value.InstrumentsSettings) { Leverages.TryAdd(lev.Figi, lev); } } public async Task StartAsync(CancellationToken cancellationToken) { await _tradeDataProvider.Init(); _dataBus.AddChannel(nameof(Trader), _pricesChannel); _dataBus.AddChannel(nameof(Trader), _ordersbookChannel); _ = ProcessPrices(); } public async ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(INewPrice message) { var data2 = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromHours(1.5)); if (!data2.isFullIntervalExists) { data2 = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromHours(1)); } if (!data2.isFullIntervalExists) { data2 = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromHours(0.75)); } return data2; } private async ValueTask CheckPosition((DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists) data, INewPrice message) { var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; var position = ValueAmplitudePosition.None; var fft = await _tradeDataProvider.GetFFtResult(message.Figi); if (fft.IsEmpty || (currentTime - fft.LastTime).TotalSeconds > 90) { if (data.isFullIntervalExists) { var interpolatedData = SignalProcessing.InterpolateData(data.timestamps, data.prices, TimeSpan.FromSeconds(5)); fft = FFT.Analyze(interpolatedData.timestamps, interpolatedData.values, message.Figi, TimeSpan.FromMinutes(3), TimeSpan.FromMinutes(40)); await _tradeDataProvider.SetFFtResult(fft); } } else { position = FFT.Check(fft, message.Time); if (position == Math.Declisions.Dtos.FFT.Enums.ValueAmplitudePosition.UpperThen30Decil) { await LogPrice(message, "upper30percent", message.Value); } if (position == Math.Declisions.Dtos.FFT.Enums.ValueAmplitudePosition.LowerThenMediana) { await LogPrice(message, "lower30percent", message.Value); } } return position; } private async Task ProcessPrices() { var pricesCache = new Dictionary>(); while (await _pricesChannel.Reader.WaitToReadAsync()) { var message = await _pricesChannel.Reader.ReadAsync(); if (message.IsHistoricalData) { await _tradeDataProvider.AddData(message, TimeSpan.FromHours(6)); if (!pricesCache.TryGetValue(message.Figi, out var list)) { list = new List(); pricesCache[message.Figi] = list; } list.Add(message); if ((list.Last().Time - list.First().Time).TotalSeconds < 0.5) { list.Add(message); continue; } else { message = new PriceChange() { Figi = message.Figi, Ticker = message.Ticker, Count = message.Count, Direction = message.Direction, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = list.Sum(l => l.Value) / list.Count }; list.Clear(); } } if (message.Figi == "BBG004730N88") { if (message.Direction == 1) { await _tradeDataProvider.AddDataTo5MinuteWindowCache(message.Figi, Constants._1minBuyCacheKey, new Contracts.Declisions.Dtos.CachedValue() { Time = message.Time, Value = (decimal)message.Count }); } if (message.Direction == 2) { await _tradeDataProvider.AddDataTo5MinuteWindowCache(message.Figi, Constants._1minSellCacheKey, new Contracts.Declisions.Dtos.CachedValue() { Time = message.Time, Value = (decimal)message.Count }); } var sberSells = await _tradeDataProvider.GetDataFrom5MinuteWindowCache("BBG004730N88", Constants._1minSellCacheKey); var sberBuys = await _tradeDataProvider.GetDataFrom5MinuteWindowCache("BBG004730N88", Constants._1minBuyCacheKey); var sells = sberSells.Sum(s => s.Value); var buys = sberBuys.Sum(s => s.Value); var su = sells + buys; if (su != 0) { await LogPrice(message, "sellsbuysbalance", (sells / su - 0.5m)*2); } } if (_tradingInstrumentsFigis.Contains(message.Figi)) { var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; try { if (message.Figi == "FUTIMOEXF000") { ProcessStops(message, currentTime); var windowMaxSize = 2000; await SellAssetsIfNeed(message); var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize); var state = ExchangeScheduler.GetCurrentState(message.Time); await ProcessClearing(data, state, message); await ProcessNewPriceIMOEXF2(data, state, message, windowMaxSize); } } catch (Exception ex) { _logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF"); } } } } private async Task SellAssetsIfNeed(INewPrice message) { if (!BotModeSwitcher.CanSell()) { _logger.LogWarning("Сброс активов недоступен, т.к. отключены продажи."); return; } var accounts = _tradeDataProvider.Accounts.Values.ToArray(); var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); foreach (var acc in accounts) { var assets = acc.Assets.Values.Where(a => a.Figi == message.Figi).ToArray(); foreach (var asset in assets) { var profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); var stoppingKey = message.Figi + asset.AccountId; if (message.Time - asset.BoughtAt > TimeSpan.FromMinutes(4) && profit < -66m) { var command = new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = asset.Count < 0 ? Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy : Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = System.Math.Abs((long)asset.Count), RecomendPrice = null, EnableMargin = false, }; await _dataBus.Broadcast(command); _logger.LogWarning("Сброс актива {figi}! id команды {commandId} Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}", message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin); await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit); } } } } private async Task CheckByWindowAverageMean((DateTime[] timestamps, decimal[] prices) data, INewPrice message, int windowMaxSize) { var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 30, 180, TimeSpan.FromSeconds(20), -1m, 2m); if (resultMoveAvFull.bigWindowAv != 0) { await LogPrice(message, Constants.BigWindowCrossingAverageProcessor, resultMoveAvFull.bigWindowAv); await LogPrice(message, Constants.SmallWindowCrossingAverageProcessor, resultMoveAvFull.smallWindowAv); } return resultMoveAvFull.events; } private async Task CheckByWindowAverageMeanForShotrs((DateTime[] timestamps, decimal[] prices) data, INewPrice message, int windowMaxSize) { var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 30, 240, TimeSpan.FromSeconds(20), -1m, 1m); if (resultMoveAvFull.bigWindowAv != 0) { //await LogPrice(message, Constants.BigWindowCrossingAverageProcessor, resultMoveAvFull.bigWindowAv); //await LogPrice(message, Constants.SmallWindowCrossingAverageProcessor, resultMoveAvFull.smallWindowAv); } return resultMoveAvFull.events; } private Task CheckByLocalTrends((DateTime[] timestamps, decimal[] prices) data, INewPrice message, int windowMaxSize) { var res = TradingEvent.None; if (LocalTrends.TryGetLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(20), 1, out var resLocalTrends)) { res |= (resLocalTrends & TradingEvent.UptrendStart); if ((resLocalTrends & TradingEvent.UptrendStart) == TradingEvent.UptrendStart) { res |= TradingEvent.DowntrendEnd; } } //if (LocalTrends.TryGetLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(90), TimeSpan.FromSeconds(30), 2, out var resLocalTrends2)) //{ // res |= (resLocalTrends & TradingEvent.DowntrendEnd); //} //if (LocalTrends.TryGetLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(90), TimeSpan.FromSeconds(20), 2.5, out var resLocalTrends3)) //{ // res |= (resLocalTrends & TradingEvent.DowntrendStart); //} return Task.FromResult(res); } private async Task GetAreasRelation((DateTime[] timestamps, decimal[] prices) data, INewPrice message) { var areasRel = -1m; if (ShapeAreaCalculator.TryGetAreasRelation(data.timestamps, data.prices, message.Value, Constants.AreasRelationWindow, out var rel)) { await _tradeDataProvider.AddDataTo1MinuteWindowCache(message.Figi, Constants._1minCacheKey, new Contracts.Declisions.Dtos.CachedValue() { Time = message.Time, Value = (decimal)rel }); var areas = await _tradeDataProvider.GetDataFrom1MinuteWindowCache(message.Figi, Constants._1minCacheKey); areasRel = (decimal)areas.Sum(a => a.Value) / areas.Length; await LogPrice(message, Constants.AreasRelationProcessor, areasRel); return areasRel > 0 ? areasRel : null; } return null; } private async Task CheckPosition(INewPrice message) { var data2 = await GetData(message); var position = await CheckPosition(data2, message); return position; } private async Task CalcTrendDiff(INewPrice message) { var data = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromHours(1)); if (data.isFullIntervalExists && LocalTrends.TryCalcTrendDiff(data.timestamps,data.prices, out var res)) { return res; } return null; } private async Task ProcessNewPriceIMOEXF2((DateTime[] timestamps, decimal[] prices) data, ExchangeState state, INewPrice message, int windowMaxSize) { if (data.timestamps.Length <= 4) { return; } var sberSells = await _tradeDataProvider.GetDataFrom5MinuteWindowCache("BBG004730N88", Constants._1minSellCacheKey); var sberBuys = await _tradeDataProvider.GetDataFrom5MinuteWindowCache("BBG004730N88", Constants._1minBuyCacheKey); var sells = sberSells.Sum(s => s.Value); var buys = sberBuys.Sum(s => s.Value); var su = sells + buys; var dsell = (sells / su - 0.5m) * 2; var mavTask = CheckByWindowAverageMean(data, message, windowMaxSize); var mavTaskShorts = CheckByWindowAverageMeanForShotrs(data, message, windowMaxSize); var ltTask = CheckByLocalTrends(data, message, windowMaxSize); var areasTask = GetAreasRelation(data, message); var positionTask = CheckPosition(message); var trendTask = CalcTrendDiff(message); await Task.WhenAll(mavTask, ltTask, areasTask, positionTask, trendTask, mavTaskShorts); var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); var res = mavTask.Result | ltTask.Result; if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart && !LongOpeningStops.ContainsKey(message.Figi) && trendTask.Result.HasValue && trendTask.Result.Value > -5 && state == ExchangeState.Open && areasTask.Result.HasValue && (areasTask.Result.Value >= 20 && areasTask.Result.Value < 75) && (positionTask.Result == ValueAmplitudePosition.LowerThenMediana) ) { if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) { var accounts = _tradeDataProvider.Accounts .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) .ToArray(); var loggedDeclisions = 0; foreach (var acc in accounts) { if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart)) { if (RandomNumberGenerator.GetInt32(100) > 50) { var command = new TradeCommand() { AccountId = acc.Value.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy, Count = 1, RecomendPrice = null, }; await _dataBus.Broadcast(command); _logger.LogWarning("Покупка актива {figi}! id команды {commandId}. Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}", message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin); if (loggedDeclisions == 0) { await LogDeclision(DeclisionTradeAction.OpenLongReal, message); LongOpeningStops[message.Figi] = message.Time.AddMinutes(1); loggedDeclisions++; } } } } } await LogDeclision(DeclisionTradeAction.OpenLong, message); } if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd) { if (dsell < 0.1m) { if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) { var loggedDeclisions = 0; var assetsForClose = _tradeDataProvider.Accounts .SelectMany(a => a.Value.Assets.Values) .Where(a => a.Figi == message.Figi && a.Count > 0) .ToArray(); foreach (var asset in assetsForClose) { var profit = 0m; if (assetType == AssetType.Common && asset.Count > 0) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), 1, false); } if (assetType == AssetType.Futures) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); } if (profit > 0) { LongClosingStops[message.Figi] = message.Time.AddSeconds(30); var command = new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, EnableMargin = false, }; await _dataBus.Broadcast(command); _logger.LogWarning("Продажа актива {figi}! id команды {commandId}. Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}", message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin); if (loggedDeclisions == 0) { loggedDeclisions++; await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit); } } } } await LogDeclision(DeclisionTradeAction.CloseLong, message); } } if ((mavTaskShorts.Result & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd) { if (trendTask.Result.HasValue && trendTask.Result.Value < -4) { if (!message.IsHistoricalData) { var accounts = _tradeDataProvider.Accounts .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) .ToArray(); var loggedDeclisions = 0; foreach (var acc in accounts) { if (BotModeSwitcher.CanSell()) { if (RandomNumberGenerator.GetInt32(100) > 50) { var command = new TradeCommand() { AccountId = acc.Value.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = 1, RecomendPrice = null, EnableMargin = true }; await _dataBus.Broadcast(command); _logger.LogWarning("Открытие шорта {figi}! id команды {commandId}. Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}", message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin); if (loggedDeclisions == 0) { await LogDeclision(DeclisionTradeAction.OpenShortReal, message); loggedDeclisions++; } } } } } await LogDeclision(DeclisionTradeAction.OpenShort, message); } } if ((res & TradingEvent.DowntrendEnd) == TradingEvent.DowntrendEnd) { if (!ShortClosingStops.ContainsKey(message.Figi)) { if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) { var loggedDeclisions = 0; var assetsForClose = _tradeDataProvider.Accounts .SelectMany(a => a.Value.Assets.Values) .Where(a => a.Figi == message.Figi && a.Count < 0) .ToArray(); foreach (var asset in assetsForClose) { var profit = 0m; if (assetType == AssetType.Futures) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); } if (profit > 0) { var command = new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy, Count = System.Math.Abs((long)asset.Count), RecomendPrice = null, EnableMargin = false, }; await _dataBus.Broadcast(command); _logger.LogWarning("Продажа актива {figi}! id команды {commandId}. Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}", message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin); if (loggedDeclisions == 0) { loggedDeclisions++; await LogDeclision(DeclisionTradeAction.CloseShortReal, message, profit); } } } } if (message.IsHistoricalData) { ShortClosingStops[message.Figi] = message.Time.AddSeconds(30); } await LogDeclision(DeclisionTradeAction.CloseShort, message); } } } private async Task ProcessClearing((DateTime[] timestamps, decimal[] prices) data, ExchangeState state, INewPrice message) { if (state == ExchangeState.ClearingTime && !message.IsHistoricalData && data.timestamps.Length > 1 && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3)) { await _tradeDataProvider.UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]); } } private void ProcessStops(INewPrice message, DateTime currentTime) { if (LongOpeningStops.TryGetValue(message.Figi, out var dt)) { if (dt < currentTime) { LongOpeningStops.TryRemove(message.Figi, out _); } } if (ShortClosingStops.TryGetValue(message.Figi, out var dt2)) { if (dt2 < currentTime) { ShortClosingStops.TryRemove(message.Figi, out _); } } if (LongClosingStops.TryGetValue(message.Figi, out var dt3)) { if (dt3 < currentTime) { LongClosingStops.TryRemove(message.Figi, out _); } } } private async Task LogPrice(INewPrice message, string processor, decimal value) { await _tradeDataProvider.LogPrice(new ProcessedPrice() { Figi = message.Figi, Ticker = message.Ticker, Processor = processor, Time = message.Time, Value = value, }, false); } private async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null) { await _tradeDataProvider.LogDeclision(new Declision() { AccountId = string.Empty, Figi = message.Figi, Ticker = message.Ticker, Value = profit, Price = message.Value, Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Action = action, }, false); } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private decimal GetComission(AssetType assetType) { if (assetType == AssetType.Common) { return _shareComission; } else if (assetType == AssetType.Futures) { return _futureComission; } else { return 0; } } private decimal GetLeverage(string figi, bool isShort) { var res = 1m; if (Leverages.TryGetValue(figi, out var leverage)) { res = isShort ? leverage.ShortLeverage : leverage.LongLeverage; } return res; } internal static bool IsBuyAllowed(ManagedAccount account, decimal boutPrice, decimal count, decimal accountCashPartFutures, decimal accountCashPart) { if (!BotModeSwitcher.CanPurchase()) return false; var balance = account.Balance; var total = account.Total; var futures = account.Assets.Values.FirstOrDefault(v => v.Type == AssetType.Futures); if (futures != null) { if ((balance - boutPrice * count) / total < accountCashPartFutures) return false; } else { if ((balance - boutPrice * count) / total < accountCashPart) return false; } return true; } } }