using KLHZ.Trader.Core.Common; using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Exchange.Models.Trading; using KLHZ.Trader.Core.Exchange.Utils; using KLHZ.Trader.Core.Math.Declisions.Utils; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Security.Cryptography; using System.Threading.Channels; using Tinkoff.InvestApi; using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; namespace KLHZ.Trader.Core.Exchange.Services { public class Trader : IHostedService { private readonly IDataBus _dataBus; private readonly TraderDataProvider _tradeDataProvider; private readonly ILogger _logger; private readonly ConcurrentDictionary OpeningStops = new(); private readonly ConcurrentDictionary Leverages = new(); private readonly string _bigWindowProcessor = nameof(Trader) + "_big"; private readonly string _smallWindowProcessor = nameof(Trader) + "_small"; private readonly double _buyStopLength; private readonly decimal _futureComission; private readonly decimal _shareComission; private readonly decimal _accountCashPart; private readonly decimal _accountCashPartFutures; private readonly string[] _tradingInstrumentsFigis = []; private readonly Channel _pricesChannel = Channel.CreateUnbounded(); private readonly Channel _ordersbookChannel = Channel.CreateUnbounded(); public Trader( ILogger logger, IOptions options, IDataBus dataBus, TraderDataProvider tradeDataProvider, InvestApiClient investApiClient) { _tradeDataProvider = tradeDataProvider; _logger = logger; _dataBus = dataBus; _futureComission = options.Value.FutureComission; _shareComission = options.Value.ShareComission; _accountCashPart = options.Value.AccountCashPart; _accountCashPartFutures = options.Value.AccountCashPartFutures; _tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis; _buyStopLength = (double)options.Value.StopBuyLengthMinuts; foreach (var lev in options.Value.InstrumentsSettings) { Leverages.TryAdd(lev.Figi, lev); } } public async Task StartAsync(CancellationToken cancellationToken) { await _tradeDataProvider.Init(); _dataBus.AddChannel(nameof(Trader), _pricesChannel); _dataBus.AddChannel(nameof(Trader), _ordersbookChannel); _ = ProcessPrices(); } private async Task ProcessPrices() { var buffer = new LinkedList<(DateTime, double)>(); var tradesBufferBuys = new LinkedList<(DateTime, double)>(); var tradesBufferSells = new LinkedList<(DateTime, double)>(); var tradesRelBuffer = new LinkedList<(DateTime, decimal)>(); while (await _pricesChannel.Reader.WaitToReadAsync()) { var message = await _pricesChannel.Reader.ReadAsync(); if (message.IsHistoricalData) { await _tradeDataProvider.AddData(message, TimeSpan.FromHours(6)); } if (_tradingInstrumentsFigis.Contains(message.Figi)) { var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; try { //await ProcessDeferredLongOpens(message, currentTime); //await ProcessDeferredLongCloses(message, currentTime); if (message.Figi == "FUTIMOEXF000") { var windowMaxSize = 1000; await SellAssetsIfNeed(message); var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize); if (data.timestamps.Length <= 1) { buffer.Clear(); } var state = ExchangeScheduler.GetCurrentState(message.Time); await ProcessClearing(data, state, message); //await SellOldAssetsIfCan(message); ProcessOpeningStops(message, currentTime); await ProcessNewPriceIMOEXF(data, state, message, windowMaxSize, buffer, tradesBufferBuys, tradesBufferSells, tradesRelBuffer); } } catch (Exception ex) { _logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF"); } } } } private async Task SellAssetsIfNeed(INewPrice message) { var accounts = _tradeDataProvider.Accounts.Values.ToArray(); var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); foreach (var acc in accounts) { var assets = acc.Assets.Values.Where(a => a.Figi == message.Figi).ToArray(); foreach (var asset in assets) { var profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); if (message.Time - asset.BoughtAt > TimeSpan.FromMinutes(4) && profit<-66m) { await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = asset.Count < 0 ? Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy : Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, EnableMargin = false, }); OpeningStops[message.Figi] = DateTime.UtcNow.AddMinutes(10); await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); } if (message.Time - asset.BoughtAt > TimeSpan.FromHours(4) && profit> 100) { await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = asset.Count < 0 ? Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy : Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, EnableMargin = false, }); await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); } } } } private async Task ProcessNewPriceIMOEXF((DateTime[] timestamps, decimal[] prices) data, ExchangeState state, INewPrice message, int windowMaxSize, LinkedList<(DateTime time, double val)> areasBuffer, LinkedList<(DateTime time, double val)> tradesBufferBuys, LinkedList<(DateTime time, double val)> tradesBufferSells, LinkedList<(DateTime time, decimal val)> tradesRelBufferSells) { var res = TradingEvent.None; var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 30, 180, TimeSpan.FromSeconds(20), -1m, 2m); res |= resultMoveAvFull.events; if (resultMoveAvFull.bigWindowAv != 0) { await LogPrice(message, _bigWindowProcessor, resultMoveAvFull.bigWindowAv); await LogPrice(message, _smallWindowProcessor, resultMoveAvFull.smallWindowAv); } //var oldTotalSales = (decimal)tradesBufferSells.Sum(s => s.val); //var oldTotalBuys = (decimal)tradesBufferBuys.Sum(s => s.val); //var oldTotalTrades = oldTotalSales + oldTotalBuys; //if (message.Direction == 1) //{ // tradesBufferBuys.AddLast((message.Time, message.Count)); // if (tradesBufferBuys.Last != null && tradesBufferBuys.First != null // && tradesBufferBuys.Last.Value.time - tradesBufferBuys.First.Value.time > TimeSpan.FromSeconds(60)) // { // tradesBufferBuys.RemoveFirst(); // } //} //if (message.Direction == 2) //{ // tradesBufferSells.AddLast((message.Time, message.Count)); // if (tradesBufferSells.Last != null && tradesBufferSells.First != null // && tradesBufferSells.Last.Value.time - tradesBufferSells.First.Value.time > TimeSpan.FromSeconds(60)) // { // tradesBufferSells.RemoveFirst(); // } //} //var totalSales = (decimal)tradesBufferSells.Sum(s => s.val); //var totalBuys = (decimal)tradesBufferBuys.Sum(s => s.val); //var totalTrades = totalSales + totalBuys; //var tradesRelation = -100m; //var oldTradesRelation = -100m; //await LogPrice(message, "tradesvolume", totalTrades); //if (totalTrades > 0 && oldTotalTrades > 0) //{ // tradesRelation = (totalBuys - totalSales) / totalTrades; // oldTradesRelation = (oldTotalBuys - oldTotalSales) / oldTotalTrades; // tradesRelBufferSells.AddLast((message.Time, tradesRelation - oldTradesRelation)); // if (tradesRelBufferSells.Last != null && tradesRelBufferSells.First != null // && tradesRelBufferSells.Last.Value.time - tradesRelBufferSells.First.Value.time > TimeSpan.FromSeconds(10)) // { // tradesRelBufferSells.RemoveFirst(); // } // if (tradesRelBufferSells.Count > 0) // { // await LogPrice(message, "tradesrelation", tradesRelBufferSells.Sum(e => e.val) / tradesRelBufferSells.Count); // } //} var areasRel = -1m; if (ShapeAreaCalculator.TryGetAreasRelation(data.timestamps, data.prices, message.Value, TimeSpan.FromMinutes(15), out var rel)) { areasBuffer.AddLast((message.Time, rel)); if (areasBuffer.Last != null && areasBuffer.First != null && areasBuffer.Last.Value.time - areasBuffer.First.Value.time > TimeSpan.FromMinutes(1)) { areasBuffer.RemoveFirst(); } areasRel = (decimal)areasBuffer.Sum(a => a.val) / areasBuffer.Count; await LogPrice(message, "balancescalc30min", areasRel); } if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart && !OpeningStops.TryGetValue(message.Figi, out _) && state == ExchangeState.Open && data.timestamps.Length > 1 && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1)) ) { if (areasRel >= 20 && areasRel < 75) { if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) { var accounts = _tradeDataProvider.Accounts .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) .ToArray(); var loggedDeclisions = 0; foreach (var acc in accounts) { if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart)) { if (RandomNumberGenerator.GetInt32(100) > 50) { await _dataBus.Broadcast(new TradeCommand() { AccountId = acc.Value.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy, Count = 1, RecomendPrice = null, }); if (loggedDeclisions == 0) { await LogDeclision(DeclisionTradeAction.OpenLongReal, message); OpeningStops[message.Figi] = DateTime.UtcNow.AddMinutes(1); loggedDeclisions++; } } } } } await LogDeclision(DeclisionTradeAction.OpenLong, message); } //else if (areasRel >=75) //{ // await LogDeclision(DeclisionTradeAction.OpenShort, message); //} } if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd) { var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); var loggedDeclisions = 0; if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) { var assetsForClose = _tradeDataProvider.Accounts .SelectMany(a => a.Value.Assets.Values) .Where(a => a.Figi == message.Figi && a.Count > 0) .ToArray(); foreach (var asset in assetsForClose) { var profit = 0m; if (assetType == AssetType.Common && asset.Count > 0) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), 1, false); } if (assetType == AssetType.Futures) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); } if (profit > 0) { await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, EnableMargin = false, }); if (loggedDeclisions == 0) { loggedDeclisions++; await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit); } } } } await LogDeclision(DeclisionTradeAction.CloseLong, message); } } private async Task ProcessClearing((DateTime[] timestamps, decimal[] prices) data, ExchangeState state, INewPrice message) { if (state == ExchangeState.ClearingTime && !message.IsHistoricalData && data.timestamps.Length > 1 && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3)) { await _tradeDataProvider.UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]); } } private void ProcessOpeningStops(INewPrice message, DateTime currentTime) { if (OpeningStops.TryGetValue(message.Figi, out var dt)) { if (dt < currentTime) { OpeningStops.TryRemove(message.Figi, out _); } } } private async Task LogPrice(INewPrice message, string processor, decimal value) { await _tradeDataProvider.LogPrice(new ProcessedPrice() { Figi = message.Figi, Ticker = message.Ticker, Processor = processor, Time = message.Time, Value = value, }, false); } private async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null) { await _tradeDataProvider.LogDeclision(new Declision() { AccountId = string.Empty, Figi = message.Figi, Ticker = message.Ticker, Value = profit, Price = message.Value, Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Action = action, }, false); } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private decimal GetComission(AssetType assetType) { if (assetType == AssetType.Common) { return _shareComission; } else if (assetType == AssetType.Futures) { return _futureComission; } else { return 0; } } private decimal GetLeverage(string figi, bool isShort) { var res = 1m; if (Leverages.TryGetValue(figi, out var leverage)) { res = isShort ? leverage.ShortLeverage : leverage.LongLeverage; } return res; } internal static bool IsBuyAllowed(ManagedAccount account, decimal boutPrice, decimal count, decimal accountCashPartFutures, decimal accountCashPart) { if (!BotModeSwitcher.CanPurchase()) return false; var balance = account.Balance; var total = account.Total; var futures = account.Assets.Values.FirstOrDefault(v => v.Type == AssetType.Futures); if (futures != null) { if ((balance - boutPrice * count) / total < accountCashPartFutures) return false; } else { if ((balance - boutPrice * count) / total < accountCashPart) return false; } return true; } } }