using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Math.Declisions.Services.Cache; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Threading.Channels; using Tinkoff.InvestApi; using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; namespace KLHZ.Trader.Core.Exchange.Services { public class TraderDataProvider { private readonly ConcurrentDictionary _historyCash3 = new(); private readonly InvestApiClient _investApiClient; private readonly IDbContextFactory _dbContextFactory; private readonly ILogger _logger; private readonly string[] _instrumentsFigis = []; private readonly string[] _tradingInstrumentsFigis = []; public readonly ConcurrentDictionary Orderbooks = new(); private readonly ConcurrentDictionary _tickersCache = new(); private readonly ConcurrentDictionary _assetTypesCache = new(); private readonly bool _isDataRecievingAllowed = false; private readonly Channel _forSave = Channel.CreateUnbounded(); private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1); public TraderDataProvider(InvestApiClient investApiClient, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) { _investApiClient = investApiClient; _dbContextFactory = dbContextFactory; _logger = logger; _instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray(); _tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis.ToArray(); _isDataRecievingAllowed = options.Value.ExchangeDataRecievingEnabled; } public async ValueTask GetLastPrice(string figi) { var res = 0m; if (_historyCash3.TryGetValue(figi, out var unit)) { res = (await unit.GetLastValues()).price; } return res; } public async ValueTask AddData(ITradeDataItem message) { if (message.Direction != 1) return; if (_historyCash3.TryGetValue(message.Figi, out var unit)) { await unit.AddData(message); } else { unit = new PriceHistoryCacheUnit3(message.Figi, message); _historyCash3.TryAdd(message.Figi, unit); } } public async ValueTask AddData(string figi, string key, ITradeDataItem data) { if (!_historyCash3.TryGetValue(figi, out var item)) { item = new PriceHistoryCacheUnit3(figi); _historyCash3.TryAdd(figi, item); } await _historyCash3[figi].AddData(data, key); } public ValueTask GetDataForTimeWindow(string figi, TimeSpan time, string? key = null, Func? selector = null) { if (_historyCash3.TryGetValue(figi, out var cahcheItem)) { return cahcheItem.GetData(time, key: key, selector); } return ValueTask.FromResult(Array.Empty()); } public async ValueTask AddOrderbook(IOrderbook orderbook) { if (!_historyCash3.TryGetValue(orderbook.Figi, out var unit)) { unit = new PriceHistoryCacheUnit3(orderbook.Figi); _historyCash3.TryAdd(orderbook.Figi, unit); } Orderbooks[orderbook.Figi] = orderbook; await unit.AddOrderbook(orderbook); } public async Task Init() { try { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await _initSemaphore.WaitAsync(cts.Token); var shares = await _investApiClient.Instruments.SharesAsync(); foreach (var share in shares.Instruments) { if (_instrumentsFigis.Contains(share.Figi)) { _tickersCache.TryAdd(share.Figi, share.Ticker); _assetTypesCache.TryAdd(share.Figi, AssetType.Common); } } var futures = await _investApiClient.Instruments.FuturesAsync(); foreach (var future in futures.Instruments) { if (_instrumentsFigis.Contains(future.Figi)) { _tickersCache.TryAdd(future.Figi, future.Ticker); _assetTypesCache.TryAdd(future.Figi, AssetType.Futures); } } if (_isDataRecievingAllowed) { var time = DateTime.UtcNow.AddHours(-20); using var context1 = await _dbContextFactory.CreateDbContextAsync(); context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var data = await context1.PriceChanges .Where(c => _tradingInstrumentsFigis.Contains(c.Figi) && c.Time >= time) .OrderBy(c => c.Time) .Select(c => new TradeDataItem() { Figi = c.Figi, Ticker = c.Ticker, Time = c.Time, Price = c.Price, IsHistoricalData = true, Direction = c.Direction, Count = c.Count, }) .ToArrayAsync(); foreach (var price in data) { await AddData(price); } } _ = WritePricesTask(); } catch (Exception ex) { } } public string GetTickerByFigi(string figi) { return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty; } public AssetType GetAssetTypeByFigi(string figi) { return _assetTypesCache.TryGetValue(figi, out var t) ? t : AssetType.Unknown; } internal async Task LogPrice(ProcessedPrice price, bool saveImmediately) { if (saveImmediately) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.ProcessedPrices.AddRangeAsync(price); await context.SaveChangesAsync(); } else { await _forSave.Writer.WriteAsync(price); } } internal async Task LogDeclision(Declision declision, bool saveImmediately) { if (saveImmediately) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.Declisions.AddRangeAsync(declision); await context.SaveChangesAsync(); } else { await _forSave.Writer.WriteAsync(declision); } } internal async Task LogPrice(ITradeDataItem message, string processor, decimal value) { await LogPrice(new ProcessedPrice() { Figi = message.Figi, Ticker = message.Ticker, Processor = processor, Time = message.Time, Price = value, }, false); } internal async Task LogPrice(string figi, string ticker, DateTime time, decimal value, string processor) { await LogPrice(new ProcessedPrice() { Figi = figi, Ticker = ticker, Processor = processor, Time = time, Price = value, }, false); } internal async Task LogDeclision(DeclisionTradeAction action, ITradeDataItem message, decimal? profit = null) { await LogDeclision(new Declision() { AccountId = string.Empty, Figi = message.Figi, Ticker = message.Ticker, Value = profit, Price = message.Price, Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Action = action, }, false); } internal async Task LogDeclision(DeclisionTradeAction action, decimal price, DateTime time, ITradeDataItem message) { await LogDeclision(new Declision() { AccountId = string.Empty, Figi = message.Figi, Ticker = message.Ticker, Value = price, Price = price, Time = time, Action = action, }, false); } private async Task WritePricesTask() { var buffer1 = new List(); var buffer2 = new List(); while (await _forSave.Reader.WaitToReadAsync()) { try { var obj = await _forSave.Reader.ReadAsync(); if (obj is ProcessedPrice price) { buffer1.Add(price); } if (obj is Declision dec) { buffer2.Add(dec); } if ((buffer1.Count + buffer2.Count) > 50000 || _forSave.Reader.Count == 0) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; if (buffer1.Count > 0) { await context.ProcessedPrices.AddRangeAsync(buffer1); } if (buffer2.Count > 0) { await context.Declisions.AddRangeAsync(buffer2); } await context.SaveChangesAsync(); buffer1.Clear(); buffer2.Clear(); } } catch (Exception ex) { } } } } }