using KLHZ.Trader.Core.Contracts.Declisions.Dtos; using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums; using KLHZ.Trader.Core.Contracts.Declisions.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Extentions; using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Math.Declisions.Dtos.FFT; using KLHZ.Trader.Core.Math.Declisions.Services.Cache; using KLHZ.Trader.Core.Math.Declisions.Utils; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Threading.Channels; using Tinkoff.InvestApi; using Tinkoff.InvestApi.V1; using Asset = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Asset; using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; using Order = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Order; using PositionType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.PositionType; namespace KLHZ.Trader.Core.Exchange.Services { public class TraderDataProvider { private readonly ConcurrentDictionary _historyCash = new(); private readonly InvestApiClient _investApiClient; private readonly IDbContextFactory _dbContextFactory; private readonly ILogger _logger; private readonly string[] _managedAccountsNamePatterns = []; private readonly string[] _instrumentsFigis = []; private readonly ConcurrentDictionary _fftResults = new(); private readonly ConcurrentDictionary _instrumentsSettings = new(); private readonly ConcurrentDictionary _tickersCache = new(); private readonly ConcurrentDictionary _assetTypesCache = new(); internal readonly ConcurrentDictionary Accounts = new(); private readonly bool _isDataRecievingAllowed = false; private readonly Channel _forSave = Channel.CreateUnbounded(); private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1); public TraderDataProvider(InvestApiClient investApiClient, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) { _investApiClient = investApiClient; _dbContextFactory = dbContextFactory; _logger = logger; _managedAccountsNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray(); _instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray(); _isDataRecievingAllowed = options.Value.ExchangeDataRecievingEnabled; foreach (var lev in options.Value.InstrumentsSettings) { _instrumentsSettings.TryAdd(lev.Figi, lev); } } public ValueTask GetFFtResult(string figi) { if (_fftResults.TryGetValue(figi, out var res)) { return ValueTask.FromResult(res); } return ValueTask.FromResult(FFTAnalyzeResult.Empty); } public ValueTask SetFFtResult(FFTAnalyzeResult result) { _fftResults[result.Key] = result; return ValueTask.CompletedTask; } public async ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(string figi, TimeSpan timeSpan) { if (_historyCash.TryGetValue(figi, out var unit)) { var res = await unit.GetData(timeSpan); return (res.timestamps, res.prices, res.isFullIntervalExists); } return (Array.Empty(), Array.Empty(), false); } public async ValueTask<(DateTime[] timestamps, decimal[] prices)> GetData(string figi, int? length = null) { if (_historyCash.TryGetValue(figi, out var unit)) { var res = await unit.GetData(length); return (res.timestamps, res.prices); } return (Array.Empty(), Array.Empty()); } public async ValueTask AddData(INewPrice message, TimeSpan? clearingInterval = null) { if (_historyCash.TryGetValue(message.Figi, out var unit)) { if (clearingInterval.HasValue) { var lasts = await unit.GetLastValues(); if (message.Time - lasts.time > clearingInterval.Value) { unit = new PriceHistoryCacheUnit2(message.Figi); _historyCash[message.Figi] = unit; } } await unit.AddData(message); } else { unit = new PriceHistoryCacheUnit2(message.Figi, message); _historyCash.TryAdd(message.Figi, unit); } } public async ValueTask AddDataTo1MinuteWindowCache(string figi, string key, CachedValue data) { if (!_historyCash.TryGetValue(figi, out var unit)) { unit = new PriceHistoryCacheUnit2(figi); _historyCash.TryAdd(figi, unit); } await _historyCash[figi].AddDataToTimeWindowCache(key, data, TimeWindowCacheType._1_Minute); } public async ValueTask AddDataTo5MinuteWindowCache(string figi, string key, CachedValue data) { if (!_historyCash.TryGetValue(figi, out var unit)) { unit = new PriceHistoryCacheUnit2(figi); _historyCash.TryAdd(figi, unit); } await _historyCash[figi].AddDataToTimeWindowCache(key, data, TimeWindowCacheType._5_Minutes); } public ValueTask GetDataFrom1MinuteWindowCache(string figi, string key) { if (_historyCash.TryGetValue(figi, out var cahcheItem)) { return cahcheItem.GetDataFromTimeWindowCache(key, TimeWindowCacheType._1_Minute); } return ValueTask.FromResult(Array.Empty()); } public ValueTask GetDataFrom5MinuteWindowCache(string figi, string key) { if (_historyCash.TryGetValue(figi, out var cahcheItem)) { return cahcheItem.GetDataFromTimeWindowCache(key, TimeWindowCacheType._5_Minutes); } return ValueTask.FromResult(Array.Empty()); } public async ValueTask AddOrderbook(IOrderbook orderbook) { if (!_historyCash.TryGetValue(orderbook.Figi, out var unit)) { unit = new PriceHistoryCacheUnit2(orderbook.Figi); _historyCash.TryAdd(orderbook.Figi, unit); } await unit.AddOrderbook(orderbook); } public async Task Init() { try { await _initSemaphore.WaitAsync(TimeSpan.FromSeconds(3)); var shares = await _investApiClient.Instruments.SharesAsync(); foreach (var share in shares.Instruments) { if (_instrumentsFigis.Contains(share.Figi)) { _tickersCache.TryAdd(share.Figi, share.Ticker); _assetTypesCache.TryAdd(share.Figi, AssetType.Common); } } var futures = await _investApiClient.Instruments.FuturesAsync(); foreach (var future in futures.Instruments) { if (_instrumentsFigis.Contains(future.Figi)) { _tickersCache.TryAdd(future.Figi, future.Ticker); _assetTypesCache.TryAdd(future.Figi, AssetType.Futures); } } var accounts = await _investApiClient.GetAccounts(_managedAccountsNamePatterns); var accountsList = new List(); foreach (var accountId in accounts) { var acc = new ManagedAccount(accountId); await SyncPortfolio(acc); Accounts[accountId] = acc; } if (_isDataRecievingAllowed) { var time = DateTime.UtcNow.AddHours(-1.5); using var context1 = await _dbContextFactory.CreateDbContextAsync(); context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var data = await context1.PriceChanges .Where(c => _instrumentsFigis.Contains(c.Figi) && c.Time >= time) .OrderBy(c => c.Time) .Select(c => new NewPriceMessage() { Figi = c.Figi, Ticker = c.Ticker, Time = c.Time, Value = c.Value, IsHistoricalData = true }) .ToArrayAsync(); foreach (var price in data) { await AddData(price); var cachedData = await GetData(price.Figi); if (ShapeAreaCalculator.TryGetAreasRelation(cachedData.timestamps, cachedData.prices, price.Value, Constants.AreasRelationWindow, out var rel)) { await AddDataTo1MinuteWindowCache(price.Figi, Constants._1minCacheKey, new CachedValue() { Time = price.Time, Value = (decimal)rel }); } } } _ = SyncPortfolioWorker(); _ = WritePricesTask(); } catch (Exception ex) { } } public string GetTickerByFigi(string figi) { return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty; } public AssetType GetAssetTypeByFigi(string figi) { return _assetTypesCache.TryGetValue(figi, out var t) ? t : AssetType.Unknown; } internal async Task SyncPortfolio(string accountId) { if (Accounts.TryGetValue(accountId, out var account)) { await SyncPortfolio(account); } } internal async Task SyncPortfolio(ManagedAccount account) { try { await _initSemaphore.WaitAsync(TimeSpan.FromSeconds(5)); var portfolio = await _investApiClient.Operations.GetPortfolioAsync(new PortfolioRequest() { AccountId = account.AccountId, }); var oldAssets = account.Assets.Keys.ToHashSet(); using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var trades = await context.Trades .Where(t => t.AccountId == account.AccountId && t.ArchiveStatus == 0) .ToListAsync(); foreach (var position in portfolio.Positions) { decimal price = 0; var trade = trades.FirstOrDefault(t => t.Figi == position.Figi); if (trade != null) { trade.Count = position.Quantity; trade.Position = position.Quantity > 0 ? DataLayer.Entities.Trades.Enums.PositionType.Long : DataLayer.Entities.Trades.Enums.PositionType.Short; trades.Remove(trade); price = trade.Price; context.Trades.Update(trade); await context.SaveChangesAsync(); } else { price = position.AveragePositionPrice; } #pragma warning disable CS0612 // Тип или член устарел var asset = new Models.AssetsAccounting.Asset() { TradeId = trade?.Id, AccountId = account.AccountId, Figi = position.Figi, Ticker = position.Ticker, BoughtAt = trade?.BoughtAt ?? DateTime.UtcNow, BoughtPrice = price, Type = position.InstrumentType.ParseInstrumentType(), Position = position.Quantity > 0 ? PositionType.Long : PositionType.Short, BlockedItems = position.BlockedLots, Count = position.Quantity, CountLots = position.QuantityLots, }; #pragma warning restore CS0612 // Тип или член устарел account.Assets.AddOrUpdate(asset.Figi, asset, (k, v) => asset); oldAssets.Remove(asset.Figi); } account.Total = portfolio.TotalAmountPortfolio; account.Balance = portfolio.TotalAmountCurrencies; foreach (var asset in oldAssets) { account.Assets.TryRemove(asset, out _); } var ids = trades.Select(t => t.Id).ToArray(); await context.Trades .Where(t => ids.Contains(t.Id)) .ExecuteUpdateAsync(t => t.SetProperty(tr => tr.ArchiveStatus, 1)); var orders = await _investApiClient.Orders.GetOrdersAsync(new GetOrdersRequest() { AccountId = account.AccountId }); var actualOrders = orders.Orders.Select(o => new Order() { AccountId = account.AccountId, Figi = o.Figi, OrderId = o.OrderId, Ticker = GetTickerByFigi(o.Figi), Count = o.LotsRequested, ExpirationTime = DateTime.UtcNow.AddMinutes(10), OpenDate = DateTime.UtcNow, Price = o.AveragePositionPrice, Direction = (DealDirection)(int)o.Direction }).ToArray(); foreach (var order in actualOrders) { account.Orders[order.OrderId] = order; } } catch (Exception ex) { _logger.LogError(ex, "Ошибка при синхранизации портфеля счёта {accountId}", account.AccountId); } _initSemaphore.Release(); } internal async Task UpdateFuturesPrice(INewPrice newPrice, decimal newPriceValue) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.Trades .Where(t => t.Figi == newPrice.Figi && t.ArchiveStatus == 0 && t.Asset == DataLayer.Entities.Trades.Enums.AssetType.Future) .ExecuteUpdateAsync(t => t.SetProperty(tr => tr.Price, newPriceValue).SetProperty(tr => tr.BoughtAt, DateTime.UtcNow)); foreach (var account in Accounts.Values) { await SyncPortfolio(account); } } internal async Task LogDeal(DealResult dealResult) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var priceCoeff = 1m; var sign = dealResult.Direction == DealDirection.Sell ? -1m : 1; var dealCount = dealResult.Count * sign; if (_instrumentsSettings.TryGetValue(dealResult.Figi, out var se)) { priceCoeff = se.PriceToRubConvertationCoefficient; } var trade = await context.Trades.FirstOrDefaultAsync(t => t.ArchiveStatus == 0 && t.Figi == dealResult.Figi && t.AccountId == dealResult.AccountId); if (trade == null) { var newTrade = new DataLayer.Entities.Trades.Trade() { AccountId = dealResult.AccountId, Figi = dealResult.Figi, Ticker = GetTickerByFigi(dealResult.Figi), BoughtAt = DateTime.UtcNow, Count = dealCount, Price = dealResult.Price * priceCoeff, Position = dealCount >= 0 ? DataLayer.Entities.Trades.Enums.PositionType.Long : DataLayer.Entities.Trades.Enums.PositionType.Short, Direction = (DataLayer.Entities.Trades.Enums.TradeDirection)(int)dealResult.Direction, Asset = (DataLayer.Entities.Trades.Enums.AssetType)(int)GetAssetTypeByFigi(dealResult.Figi) }; await context.Trades.AddAsync(newTrade); await context.SaveChangesAsync(); } else { var oldAmount = trade.Price * trade.Count; var newAmount = dealResult.Price * priceCoeff * dealCount; var oldCount = trade.Count; trade.Count = trade.Count + dealCount; if (trade.Count != 0)// Если суммарное количество элементов позиции сокращается - пересчитывать цену не нужно. { if (trade.Count / System.Math.Abs(trade.Count) != oldCount / System.Math.Abs(oldCount))//если сменился знак общего числа активов. { trade.Price = dealResult.Price; trade.Position = trade.Count < 0 ? DataLayer.Entities.Trades.Enums.PositionType.Short : DataLayer.Entities.Trades.Enums.PositionType.Long; } else { trade.Price = (oldAmount + newAmount) / trade.Count; } context.Trades.Update(trade); await context.SaveChangesAsync(); } } } internal async Task LogPrice(ProcessedPrice price, bool saveImmediately) { if (saveImmediately) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.ProcessedPrices.AddRangeAsync(price); await context.SaveChangesAsync(); } else { await _forSave.Writer.WriteAsync(price); } } internal async Task LogDeclision(Declision declision, bool saveImmediately) { if (saveImmediately) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.Declisions.AddRangeAsync(declision); await context.SaveChangesAsync(); } else { await _forSave.Writer.WriteAsync(declision); } } private async Task SyncPortfolioWorker() { while (true) { try { await Task.Delay(20000); foreach (var acc in Accounts) { await SyncPortfolio(acc.Value); } } catch (Exception ex) { _logger.LogError(ex, "Ошибка при цикличном обновлении портфеля"); } } } private async Task WritePricesTask() { var buffer1 = new List(); var buffer2 = new List(); while (await _forSave.Reader.WaitToReadAsync()) { try { var obj = await _forSave.Reader.ReadAsync(); if (obj is ProcessedPrice price) { buffer1.Add(price); } if (obj is Declision dec) { buffer2.Add(dec); } if ((buffer1.Count + buffer2.Count) > 50000 || _forSave.Reader.Count == 0) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; if (buffer1.Count > 0) { await context.ProcessedPrices.AddRangeAsync(buffer1); } if (buffer2.Count > 0) { await context.Declisions.AddRangeAsync(buffer2); } await context.SaveChangesAsync(); buffer1.Clear(); buffer2.Clear(); } } catch (Exception ex) { } } } public ValueTask GetAssetsByFigi(string figi) { var assets = Accounts.Values.SelectMany(a => a.Assets.Values.Where(aa => aa.Figi == figi)).ToArray(); return ValueTask.FromResult(assets); } } }