using KLHZ.Trader.Core.Contracts.Declisions.Dtos; using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums; using KLHZ.Trader.Core.Contracts.Declisions.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Math.Declisions.Dtos.FFT; using KLHZ.Trader.Core.Math.Declisions.Services.Cache; using KLHZ.Trader.Core.Math.Declisions.Utils; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using System.Threading.Channels; using Tinkoff.InvestApi; using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType; namespace KLHZ.Trader.Core.Exchange.Services { public class TraderDataProvider { private readonly ConcurrentDictionary _historyCash = new(); private readonly InvestApiClient _investApiClient; private readonly IDbContextFactory _dbContextFactory; private readonly ILogger _logger; private readonly string[] _instrumentsFigis = []; private readonly ConcurrentDictionary _fftResults = new(); private readonly ConcurrentDictionary _tickersCache = new(); private readonly ConcurrentDictionary _assetTypesCache = new(); private readonly bool _isDataRecievingAllowed = false; private readonly Channel _forSave = Channel.CreateUnbounded(); private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1); public TraderDataProvider(InvestApiClient investApiClient, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) { _investApiClient = investApiClient; _dbContextFactory = dbContextFactory; _logger = logger; _instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray(); _isDataRecievingAllowed = options.Value.ExchangeDataRecievingEnabled; } public ValueTask GetFFtResult(string figi) { if (_fftResults.TryGetValue(figi, out var res)) { return ValueTask.FromResult(res); } return ValueTask.FromResult(FFTAnalyzeResult.Empty); } public ValueTask SetFFtResult(FFTAnalyzeResult result) { _fftResults[result.Key] = result; return ValueTask.CompletedTask; } public async ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(string figi, TimeSpan timeSpan) { if (_historyCash.TryGetValue(figi, out var unit)) { var res = await unit.GetData(timeSpan); return (res.timestamps, res.prices, res.isFullIntervalExists); } return (Array.Empty(), Array.Empty(), false); } public async ValueTask<(DateTime[] timestamps, decimal[] prices)> GetData(string figi, int? length = null) { if (_historyCash.TryGetValue(figi, out var unit)) { var res = await unit.GetData(length); return (res.timestamps, res.prices); } return (Array.Empty(), Array.Empty()); } public async ValueTask AddData(INewPrice message, TimeSpan? clearingInterval = null) { if (message.Direction != 1) return; if (_historyCash.TryGetValue(message.Figi, out var unit)) { if (clearingInterval.HasValue) { var lasts = await unit.GetLastValues(); if (message.Time - lasts.time > clearingInterval.Value) { unit = new PriceHistoryCacheUnit2(message.Figi); _historyCash[message.Figi] = unit; } } await unit.AddData(message); } else { unit = new PriceHistoryCacheUnit2(message.Figi, message); _historyCash.TryAdd(message.Figi, unit); } } public async ValueTask AddDataTo1MinuteWindowCache(string figi, string key, CachedValue data) { if (!_historyCash.TryGetValue(figi, out var unit)) { unit = new PriceHistoryCacheUnit2(figi); _historyCash.TryAdd(figi, unit); } await _historyCash[figi].AddDataToTimeWindowCache(key, data, TimeWindowCacheType._1_Minute); } public async ValueTask AddDataTo5MinuteWindowCache(string figi, string key, CachedValue data) { if (!_historyCash.TryGetValue(figi, out var unit)) { unit = new PriceHistoryCacheUnit2(figi); _historyCash.TryAdd(figi, unit); } await _historyCash[figi].AddDataToTimeWindowCache(key, data, TimeWindowCacheType._5_Minutes); } public ValueTask GetDataFrom1MinuteWindowCache(string figi, string key) { if (_historyCash.TryGetValue(figi, out var cahcheItem)) { return cahcheItem.GetDataFromTimeWindowCache(key, TimeWindowCacheType._1_Minute); } return ValueTask.FromResult(Array.Empty()); } public ValueTask GetDataFrom5MinuteWindowCache(string figi, string key) { if (_historyCash.TryGetValue(figi, out var cahcheItem)) { return cahcheItem.GetDataFromTimeWindowCache(key, TimeWindowCacheType._5_Minutes); } return ValueTask.FromResult(Array.Empty()); } public async ValueTask AddOrderbook(IOrderbook orderbook) { if (!_historyCash.TryGetValue(orderbook.Figi, out var unit)) { unit = new PriceHistoryCacheUnit2(orderbook.Figi); _historyCash.TryAdd(orderbook.Figi, unit); } await unit.AddOrderbook(orderbook); } public async Task Init() { try { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3)); await _initSemaphore.WaitAsync(cts.Token); var shares = await _investApiClient.Instruments.SharesAsync(); foreach (var share in shares.Instruments) { if (_instrumentsFigis.Contains(share.Figi)) { _tickersCache.TryAdd(share.Figi, share.Ticker); _assetTypesCache.TryAdd(share.Figi, AssetType.Common); } } var futures = await _investApiClient.Instruments.FuturesAsync(); foreach (var future in futures.Instruments) { if (_instrumentsFigis.Contains(future.Figi)) { _tickersCache.TryAdd(future.Figi, future.Ticker); _assetTypesCache.TryAdd(future.Figi, AssetType.Futures); } } if (_isDataRecievingAllowed) { var time = DateTime.UtcNow.AddHours(-1.5); using var context1 = await _dbContextFactory.CreateDbContextAsync(); context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var data = await context1.PriceChanges .Where(c => _instrumentsFigis.Contains(c.Figi) && c.Time >= time) .OrderBy(c => c.Time) .Select(c => new NewPriceMessage() { Figi = c.Figi, Ticker = c.Ticker, Time = c.Time, Value = c.Value, IsHistoricalData = true, Direction = c.Direction, Count = c.Count, }) .ToArrayAsync(); foreach (var price in data) { await AddData(price); var cachedData = await GetData(price.Figi); if ((DateTime.UtcNow - price.Time).TotalMinutes < 5) { if (ShapeAreaCalculator.TryGetAreasRelation(cachedData.timestamps, cachedData.prices, price.Value, Constants.AreasRelationWindow, out var rel)) { await AddDataTo1MinuteWindowCache(price.Figi, Constants._1minCacheKey, new CachedValue() { Time = price.Time, Value = (decimal)rel }); } } } } _ = WritePricesTask(); } catch (Exception ex) { } } public string GetTickerByFigi(string figi) { return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty; } public AssetType GetAssetTypeByFigi(string figi) { return _assetTypesCache.TryGetValue(figi, out var t) ? t : AssetType.Unknown; } internal async Task LogPrice(ProcessedPrice price, bool saveImmediately) { if (saveImmediately) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.ProcessedPrices.AddRangeAsync(price); await context.SaveChangesAsync(); } else { await _forSave.Writer.WriteAsync(price); } } internal async Task LogDeclision(Declision declision, bool saveImmediately) { if (saveImmediately) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.Declisions.AddRangeAsync(declision); await context.SaveChangesAsync(); } else { await _forSave.Writer.WriteAsync(declision); } } private async Task WritePricesTask() { var buffer1 = new List(); var buffer2 = new List(); while (await _forSave.Reader.WaitToReadAsync()) { try { var obj = await _forSave.Reader.ReadAsync(); if (obj is ProcessedPrice price) { buffer1.Add(price); } if (obj is Declision dec) { buffer2.Add(dec); } if ((buffer1.Count + buffer2.Count) > 50000 || _forSave.Reader.Count == 0) { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; if (buffer1.Count > 0) { await context.ProcessedPrices.AddRangeAsync(buffer1); } if (buffer2.Count > 0) { await context.Declisions.AddRangeAsync(buffer2); } await context.SaveChangesAsync(); buffer1.Clear(); buffer2.Clear(); } } catch (Exception ex) { } } } } }