using Grpc.Core; using KLHZ.Trader.Core.Common.Messaging.Contracts; using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Extentions; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using Tinkoff.InvestApi; using Tinkoff.InvestApi.V1; using Candle = KLHZ.Trader.Core.DataLayer.Entities.Prices.Candle; namespace KLHZ.Trader.Core.Exchange.Services { public class ExchangeDataReader : IHostedService { private readonly InvestApiClient _investApiClient; private readonly string[] _instrumentsFigis = []; private readonly string[] _managedAccountNamePatterns; private readonly ILogger _logger; private readonly ConcurrentDictionary _tickersCache = new(); private readonly IDbContextFactory _dbContextFactory; private readonly CancellationTokenSource _cts = new(); private readonly IDataBus _eventBus; public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) { _eventBus = eventBus; _dbContextFactory = dbContextFactory; _investApiClient = investApiClient; _instrumentsFigis = options.Value.AllowedInstrumentsFigis.ToArray(); _logger = logger; _managedAccountNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray(); } public async Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("Инициализация приемника данных с биржи"); var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns); await InitCache(); _ = CycleSubscribtion(accounts); } private async Task InitCache() { var shares = await _investApiClient.Instruments.SharesAsync(); foreach (var share in shares.Instruments) { //if (_instrumentsFigis.Contains(share.Figi)) { _tickersCache.TryAdd(share.Figi, share.Ticker); } } var futures = await _investApiClient.Instruments.FuturesAsync(); foreach (var future in futures.Instruments) { //if (_instrumentsFigis.Contains(future.Figi)) { _tickersCache.TryAdd(future.Figi, future.Ticker); } } } private async Task CycleSubscribtion(string[] accounts) { while (true) { try { await SubscribePrices(); await Task.Delay(1000); //await SubscribeCandles(); } catch (Exception ex) { _logger.LogError(ex, "Ошибка в одном из стримов получения данных от биржи."); } } } private async Task SubscribePrices() { using var stream = _investApiClient.MarketDataStream.MarketDataStream(); var request = new SubscribeLastPriceRequest { SubscriptionAction = SubscriptionAction.Subscribe }; foreach (var f in _instrumentsFigis) { request.Instruments.Add( new LastPriceInstrument() { InstrumentId = f }); } await stream.RequestStream.WriteAsync(new MarketDataRequest { SubscribeLastPriceRequest = request, }); await foreach (var response in stream.ResponseStream.ReadAllAsync()) { if (response.LastPrice != null) { var message = new PriceChange() { Figi = response.LastPrice.Figi, Ticker = GetTickerByFigi(response.LastPrice.Figi), Time = response.LastPrice.Time.ToDateTime().ToUniversalTime(), Value = response.LastPrice.Price, IsHistoricalData = false, }; await _eventBus.BroadcastNewPrice(message); using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.PriceChanges.AddAsync(message); await context.SaveChangesAsync(); } } } private async Task SubscribeCandles() { using var stream = _investApiClient.MarketDataStream.MarketDataStream(); var request = new SubscribeCandlesRequest { SubscriptionAction = SubscriptionAction.Subscribe, CandleSourceType = GetCandlesRequest.Types.CandleSource.Exchange }; foreach (var f in _instrumentsFigis) { request.Instruments.Add( new CandleInstrument() { InstrumentId = f, Interval = SubscriptionInterval.OneMinute }); } await stream.RequestStream.WriteAsync(new MarketDataRequest { SubscribeCandlesRequest = request, }); await foreach (var response in stream.ResponseStream.ReadAllAsync()) { if (response.Candle != null) { var message = new Candle() { Figi = response.Candle.Figi, Ticker = GetTickerByFigi(response.LastPrice.Figi), Time = response.Candle.Time.ToDateTime().ToUniversalTime(), Close = response.Candle.Close, Open = response.Candle.Open, Low = response.Candle.Low, High = response.Candle.High, Volume = response.Candle.Volume, IsHistoricalData = false, }; await _eventBus.BroadcastNewCandle(message); using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.Candles.AddAsync(message); await context.SaveChangesAsync(); } } } private string GetTickerByFigi(string figi) { return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty; } public Task StopAsync(CancellationToken cancellationToken) { _cts.Cancel(); return Task.CompletedTask; } } }