using KLHZ.Trader.Core.Common; using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Exchange.Models.Trading; using KLHZ.Trader.Core.Exchange.Utils; using KLHZ.Trader.Core.Math.Declisions.Utils; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Security.Cryptography; using System.Threading.Channels; using Tinkoff.InvestApi; using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; namespace KLHZ.Trader.Core.Exchange.Services { public class Trader : IHostedService { private readonly IDataBus _dataBus; private readonly TraderDataProvider _tradeDataProvider; private readonly ILogger _logger; private readonly ConcurrentDictionary DeferredLongOpens = new(); private readonly ConcurrentDictionary DeferredLongCloses = new(); private readonly ConcurrentDictionary OpeningStops = new(); private readonly ConcurrentDictionary Leverages = new(); private readonly string _bigWindowProcessor = nameof(Trader) + "_big"; private readonly string _smallWindowProcessor = nameof(Trader) + "_small"; private readonly double _buyStopLength; private readonly decimal _futureComission; private readonly decimal _shareComission; private readonly decimal _accountCashPart; private readonly decimal _accountCashPartFutures; private readonly string[] _tradingInstrumentsFigis = []; private readonly Channel _pricesChannel = Channel.CreateUnbounded(); private readonly Channel _ordersbookChannel = Channel.CreateUnbounded(); public Trader( ILogger logger, IOptions options, IDataBus dataBus, TraderDataProvider tradeDataProvider, InvestApiClient investApiClient) { _tradeDataProvider = tradeDataProvider; _logger = logger; _dataBus = dataBus; _futureComission = options.Value.FutureComission; _shareComission = options.Value.ShareComission; _accountCashPart = options.Value.AccountCashPart; _accountCashPartFutures = options.Value.AccountCashPartFutures; _tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis; _buyStopLength = (double)options.Value.StopBuyLengthMinuts; foreach (var lev in options.Value.InstrumentsSettings) { Leverages.TryAdd(lev.Figi, lev); } } public async Task StartAsync(CancellationToken cancellationToken) { await _tradeDataProvider.Init(); _dataBus.AddChannel(nameof(Trader), _pricesChannel); _dataBus.AddChannel(nameof(Trader), _ordersbookChannel); _ = ProcessPrices(); _ = ProcessOrdersbooks(); } private async Task ProcessPrices() { while (await _pricesChannel.Reader.WaitToReadAsync()) { var message = await _pricesChannel.Reader.ReadAsync(); if (_tradingInstrumentsFigis.Contains(message.Figi)) { var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; try { await _tradeDataProvider.AddData(message, TimeSpan.FromHours(7)); await ProcessDeferredLongOpens(message, currentTime); await ProcessDeferredLongCloses(message, currentTime); if (message.Figi == "FUTIMOEXF000") { var windowMaxSize = 1000; var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize); var state = ExchangeScheduler.GetCurrentState(message.Time); await ProcessClearing(data, state, message); await SellOldAssetsIfCan(message); ProcessOpeningStops(message, currentTime); await ProcessNewPriceIMOEXF(data, state, message, windowMaxSize); } } catch (Exception ex) { _logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF"); } } } } private async Task SellOldAssetsIfCan(INewPrice message) { var accounts = _tradeDataProvider.Accounts.Values.ToArray(); var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); foreach (var acc in accounts) { var assets = acc.Assets.Values.Where(a => a.Figi == message.Figi && DateTime.UtcNow - a.BoughtAt > TimeSpan.FromHours(4)).ToArray(); foreach (var asset in assets) { var profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); if (profit > 0) { await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, }); await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); } } } } private async Task ProcessNewPriceIMOEXF((DateTime[] timestamps, decimal[] prices, decimal bidsCount, decimal asksCount) data, ExchangeState state, INewPrice message, int windowMaxSize) { var res = TradingEvent.None; var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 45, 180, TimeSpan.FromSeconds(30), 1m); //var resultLongClose = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 15, 120, 1.5m).events; //ar uptrendStarts = LocalTrends.CheckByLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(120), TimeSpan.FromSeconds(20), 1.5m, 15); //res |= (uptrendStarts & TradingEvent.UptrendStart); //res |= resultLongClose; res |= resultMoveAvFull.events; if (resultMoveAvFull.bigWindowAv != 0) { await LogPrice(message, _bigWindowProcessor, resultMoveAvFull.bigWindowAv); await LogPrice(message, _smallWindowProcessor, resultMoveAvFull.smallWindowAv); } if ((resultMoveAvFull.events & TradingEvent.StopBuy) == TradingEvent.StopBuy) { var stopTo = (message.IsHistoricalData ? message.Time : DateTime.UtcNow).AddMinutes(_buyStopLength / 2); //OpeningStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo); //await LogDeclision(DeclisionTradeAction.StopBuy, message); } if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart && !OpeningStops.TryGetValue(message.Figi, out _) && state == ExchangeState.Open && data.timestamps.Length > 1 && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1))) { var fullData = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromMinutes(60)); if (fullData.isFullIntervalExists) { var max = fullData.prices.Max(); var min = fullData.prices.Min(); if (max - min < 15 && fullData.prices.Last() - fullData.prices.First() < 4) { if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) { var accounts = _tradeDataProvider.Accounts .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) .ToArray(); foreach (var acc in accounts) { if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart)) { if (RandomNumberGenerator.GetInt32(100) > 50) { await _dataBus.Broadcast(new TradeCommand() { AccountId = acc.Value.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy, Count = 1, RecomendPrice = null, }); } await LogDeclision(DeclisionTradeAction.OpenLong, message); } } } else { await LogDeclision(DeclisionTradeAction.OpenLong, message); } } } } if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd) { var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) { var assetsForClose = _tradeDataProvider.Accounts .SelectMany(a => a.Value.Assets.Values) .Where(a => a.Figi == message.Figi && a.Count > 0) .ToArray(); foreach (var asset in assetsForClose) { var profit = 0m; if (assetType == AssetType.Common && asset.Count > 0) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), 1, false); } if (assetType == AssetType.Futures) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); } if (profit > 0) { await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, }); await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); } } } else { await LogDeclision(DeclisionTradeAction.CloseLong, message); } } } private async Task ProcessClearing((DateTime[] timestamps, decimal[] prices, decimal bidsCount, decimal asksCount) data, ExchangeState state, INewPrice message) { if (state == ExchangeState.ClearingTime && !message.IsHistoricalData && data.timestamps.Length > 1 && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3)) { await _tradeDataProvider.UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]); } } private void ProcessOpeningStops(INewPrice message, DateTime currentTime) { if (OpeningStops.TryGetValue(message.Figi, out var dt)) { if (dt < currentTime) { OpeningStops.TryRemove(message.Figi, out _); } } } private async Task ProcessDeferredLongOpens(INewPrice message, DateTime currentTime) { if (message.Figi == "FUTIMOEXF000") { DeferredTrade? longOpen; DeferredLongOpens.TryGetValue(message.Figi, out longOpen); if (longOpen != null) { var t = currentTime; if (longOpen.Time <= t && t - longOpen.Time < TimeSpan.FromMinutes(3)) { DeferredLongOpens.TryRemove(message.Figi, out _); if (message.Value - longOpen.Price < 1) { if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) { var accounts = _tradeDataProvider.Accounts .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) .ToArray(); foreach (var acc in accounts) { if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart)) { if (RandomNumberGenerator.GetInt32(100) > 50) { await _dataBus.Broadcast(new TradeCommand() { AccountId = acc.Value.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy, Count = 1, RecomendPrice = null, }); } await LogDeclision(DeclisionTradeAction.OpenLong, message); } } } else { await LogDeclision(DeclisionTradeAction.OpenLong, message); } } } } } } private async Task ProcessDeferredLongCloses(INewPrice message, DateTime currentTime) { if (message.Figi == "FUTIMOEXF000") { DeferredTrade? longClose; DeferredLongCloses.TryGetValue(message.Figi, out longClose); if (longClose != null) { if (longClose.Time <= currentTime) { DeferredLongCloses.TryRemove(message.Figi, out _); if (longClose.Price - message.Value < 1) { var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) { var assetsForClose = _tradeDataProvider.Accounts .SelectMany(a => a.Value.Assets.Values) .Where(a => a.Figi == message.Figi && a.Count > 0) .ToArray(); foreach (var asset in assetsForClose) { var profit = 0m; if (assetType == AssetType.Common && asset.Count > 0) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), 1, false); } if (assetType == AssetType.Futures) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); } if (profit > 0) { await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, }); await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); } } } else { await LogDeclision(DeclisionTradeAction.CloseLong, message); } } } } } } private async Task LogPrice(INewPrice message, string processor, decimal value) { await _tradeDataProvider.LogPrice(new ProcessedPrice() { Figi = message.Figi, Ticker = message.Ticker, Processor = processor, Time = message.Time, Value = value, }, !message.IsHistoricalData); } private async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null) { await _tradeDataProvider.LogDeclision(new Declision() { AccountId = string.Empty, Figi = message.Figi, Ticker = message.Ticker, Value = profit, Price = message.Value, Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Action = action, }, !message.IsHistoricalData); } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private decimal GetComission(AssetType assetType) { if (assetType == AssetType.Common) { return _shareComission; } else if (assetType == AssetType.Futures) { return _futureComission; } else { return 0; } } private decimal GetLeverage(string figi, bool isShort) { var res = 1m; if (Leverages.TryGetValue(figi, out var leverage)) { res = isShort ? leverage.ShortLeverage : leverage.LongLeverage; } return res; } internal static bool IsBuyAllowed(ManagedAccount account, decimal boutPrice, decimal count, decimal accountCashPartFutures, decimal accountCashPart) { if (!BotModeSwitcher.CanPurchase()) return false; var balance = account.Balance; var total = account.Total; var futures = account.Assets.Values.FirstOrDefault(v => v.Type == AssetType.Futures); if (futures != null) { if ((balance - boutPrice * count) / total < accountCashPartFutures) return false; } else { if ((balance - boutPrice * count) / total < accountCashPart) return false; } return true; } private async Task ProcessOrdersbooks() { while (await _ordersbookChannel.Reader.WaitToReadAsync()) { var message = await _ordersbookChannel.Reader.ReadAsync(); await _tradeDataProvider.AddOrderbook(message); } } } }