using KLHZ.Trader.Core.Common; using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Exchange.Models.Trading; using KLHZ.Trader.Core.Exchange.Utils; using KLHZ.Trader.Core.Math.Declisions.Utils; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Security.Cryptography; using System.Threading.Channels; using Tinkoff.InvestApi; using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; namespace KLHZ.Trader.Core.Exchange.Services { public class Trader : IHostedService { private readonly IDataBus _dataBus; private readonly TraderDataProvider _tradeDataProvider; private readonly ILogger _logger; private readonly ConcurrentDictionary OpeningStops = new(); private readonly ConcurrentDictionary ClosingStops = new(); private readonly ConcurrentDictionary Leverages = new(); private readonly decimal _futureComission; private readonly decimal _shareComission; private readonly decimal _accountCashPart; private readonly decimal _accountCashPartFutures; private readonly string[] _tradingInstrumentsFigis = []; private readonly Channel _pricesChannel = Channel.CreateUnbounded(); private readonly Channel _ordersbookChannel = Channel.CreateUnbounded(); public Trader( ILogger logger, IOptions options, IDataBus dataBus, TraderDataProvider tradeDataProvider, InvestApiClient investApiClient) { _tradeDataProvider = tradeDataProvider; _logger = logger; _dataBus = dataBus; _futureComission = options.Value.FutureComission; _shareComission = options.Value.ShareComission; _accountCashPart = options.Value.AccountCashPart; _accountCashPartFutures = options.Value.AccountCashPartFutures; _tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis; foreach (var lev in options.Value.InstrumentsSettings) { Leverages.TryAdd(lev.Figi, lev); } } public async Task StartAsync(CancellationToken cancellationToken) { await _tradeDataProvider.Init(); _dataBus.AddChannel(nameof(Trader), _pricesChannel); _dataBus.AddChannel(nameof(Trader), _ordersbookChannel); _ = ProcessPrices(); } private async Task ProcessPrices() { while (await _pricesChannel.Reader.WaitToReadAsync()) { var message = await _pricesChannel.Reader.ReadAsync(); if (message.IsHistoricalData) { await _tradeDataProvider.AddData(message, TimeSpan.FromHours(6)); } if (_tradingInstrumentsFigis.Contains(message.Figi)) { var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; try { if (message.Figi == "FUTIMOEXF000") { ProcessStops(message, currentTime); var windowMaxSize = 1000; await SellAssetsIfNeed(message); var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize); var state = ExchangeScheduler.GetCurrentState(message.Time); await ProcessClearing(data, state, message); await ProcessNewPriceIMOEXF(data, state, message, windowMaxSize); } } catch (Exception ex) { _logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF"); } } } } private async Task SellAssetsIfNeed(INewPrice message) { if (!BotModeSwitcher.CanSell()) { return; } var accounts = _tradeDataProvider.Accounts.Values.ToArray(); var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); foreach (var acc in accounts) { var assets = acc.Assets.Values.Where(a => a.Figi == message.Figi).ToArray(); foreach (var asset in assets) { var profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); var stoppingKey = message.Figi + asset.AccountId; if (message.Time - asset.BoughtAt > TimeSpan.FromMinutes(4) && profit < -66m && !ClosingStops.ContainsKey(stoppingKey)) { await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = asset.Count < 0 ? Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy : Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = System.Math.Abs((long)asset.Count), RecomendPrice = null, EnableMargin = false, }); OpeningStops[message.Figi] = DateTime.UtcNow.AddMinutes(10); ClosingStops[stoppingKey] = DateTime.UtcNow.AddSeconds(30); await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit); } if (message.Time - asset.BoughtAt > TimeSpan.FromHours(4) && profit > 100 && !ClosingStops.ContainsKey(stoppingKey)) { await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = asset.Count < 0 ? Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy : Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, EnableMargin = false, }); ClosingStops[stoppingKey] = DateTime.UtcNow.AddSeconds(30); await LogDeclision(DeclisionTradeAction.CloseLong, message, profit); await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit); } } } } private async Task ProcessNewPriceIMOEXF((DateTime[] timestamps, decimal[] prices) data, ExchangeState state, INewPrice message, int windowMaxSize) { var res = TradingEvent.None; var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 30, 180, TimeSpan.FromSeconds(20), -1m, 2m); res |= resultMoveAvFull.events; if (resultMoveAvFull.bigWindowAv != 0) { await LogPrice(message, Constants.BigWindowCrossingAverageProcessor, resultMoveAvFull.bigWindowAv); await LogPrice(message, Constants.SmallWindowCrossingAverageProcessor, resultMoveAvFull.smallWindowAv); } var areasRel = -1m; if (ShapeAreaCalculator.TryGetAreasRelation(data.timestamps, data.prices, message.Value, Constants.AreasRelationWindow, out var rel)) { await _tradeDataProvider.AddDataTo1MinuteWindowCache(message.Figi, Constants._1minCacheKey, new Contracts.Declisions.Dtos.CachedValue() { Time = message.Time, Value = (decimal)rel }); var areas = await _tradeDataProvider.GetDataFrom1MinuteWindowCache(message.Figi, Constants._1minCacheKey); areasRel = (decimal)areas.Sum(a => a.Value) / areas.Length; await LogPrice(message, Constants.AreasRelationProcessor, areasRel); } if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart && !OpeningStops.TryGetValue(message.Figi, out _) && state == ExchangeState.Open && data.timestamps.Length > 1 && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1)) ) { if (areasRel >= 20 && areasRel < 75) { if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) { var accounts = _tradeDataProvider.Accounts .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) .ToArray(); var loggedDeclisions = 0; foreach (var acc in accounts) { if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart)) { if (RandomNumberGenerator.GetInt32(100) > 50) { await _dataBus.Broadcast(new TradeCommand() { AccountId = acc.Value.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy, Count = 1, RecomendPrice = null, }); if (loggedDeclisions == 0) { await LogDeclision(DeclisionTradeAction.OpenLongReal, message); OpeningStops[message.Figi] = DateTime.UtcNow.AddMinutes(1); loggedDeclisions++; } } } } } await LogDeclision(DeclisionTradeAction.OpenLong, message); } } if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd) { var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi); var loggedDeclisions = 0; if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) { var assetsForClose = _tradeDataProvider.Accounts .SelectMany(a => a.Value.Assets.Values) .Where(a => a.Figi == message.Figi && a.Count > 0) .ToArray(); foreach (var asset in assetsForClose) { var profit = 0m; if (assetType == AssetType.Common && asset.Count > 0) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), 1, false); } if (assetType == AssetType.Futures) { profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value, GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0); } var stoppingKey = message.Figi + asset.AccountId; if (profit > 0 && !ClosingStops.ContainsKey(stoppingKey)) { ClosingStops[stoppingKey] = DateTime.UtcNow.AddSeconds(30); await _dataBus.Broadcast(new TradeCommand() { AccountId = asset.AccountId, Figi = message.Figi, CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell, Count = (long)asset.Count, RecomendPrice = null, EnableMargin = false, }); if (loggedDeclisions == 0) { loggedDeclisions++; await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit); } } } } await LogDeclision(DeclisionTradeAction.CloseLong, message); } } private async Task ProcessClearing((DateTime[] timestamps, decimal[] prices) data, ExchangeState state, INewPrice message) { if (state == ExchangeState.ClearingTime && !message.IsHistoricalData && data.timestamps.Length > 1 && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3)) { await _tradeDataProvider.UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]); } } private void ProcessStops(INewPrice message, DateTime currentTime) { if (OpeningStops.TryGetValue(message.Figi, out var dt)) { if (dt < currentTime) { OpeningStops.TryRemove(message.Figi, out _); } } if (ClosingStops.TryGetValue(message.Figi, out var dt2)) { if (dt2 < currentTime) { ClosingStops.TryRemove(message.Figi, out _); } } } private async Task LogPrice(INewPrice message, string processor, decimal value) { await _tradeDataProvider.LogPrice(new ProcessedPrice() { Figi = message.Figi, Ticker = message.Ticker, Processor = processor, Time = message.Time, Value = value, }, false); } private async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null) { await _tradeDataProvider.LogDeclision(new Declision() { AccountId = string.Empty, Figi = message.Figi, Ticker = message.Ticker, Value = profit, Price = message.Value, Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Action = action, }, false); } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private decimal GetComission(AssetType assetType) { if (assetType == AssetType.Common) { return _shareComission; } else if (assetType == AssetType.Futures) { return _futureComission; } else { return 0; } } private decimal GetLeverage(string figi, bool isShort) { var res = 1m; if (Leverages.TryGetValue(figi, out var leverage)) { res = isShort ? leverage.ShortLeverage : leverage.LongLeverage; } return res; } internal static bool IsBuyAllowed(ManagedAccount account, decimal boutPrice, decimal count, decimal accountCashPartFutures, decimal accountCashPart) { if (!BotModeSwitcher.CanPurchase()) return false; var balance = account.Balance; var total = account.Total; var futures = account.Assets.Values.FirstOrDefault(v => v.Type == AssetType.Futures); if (futures != null) { if ((balance - boutPrice * count) / total < accountCashPartFutures) return false; } else { if ((balance - boutPrice * count) / total < accountCashPart) return false; } return true; } } }