using Grpc.Core; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Orders; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Exchange.Extentions; using KLHZ.Trader.Core.Exchange.Models.Configs; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using Tinkoff.InvestApi; using Tinkoff.InvestApi.V1; namespace KLHZ.Trader.Core.Exchange.Services { public class ExchangeDataReader : IHostedService { private readonly PortfolioWrapper _portfolioWrapper; private readonly TraderDataProvider _tradeDataProvider; private readonly InvestApiClient _investApiClient; private readonly string[] _instrumentsFigis = []; private readonly string[] _managedAccountNamePatterns; private readonly ILogger _logger; private readonly IDbContextFactory _dbContextFactory; private readonly CancellationTokenSource _cts = new(); private readonly IDataBus _eventBus; private readonly bool _exchangeDataRecievingEnabled; private readonly ConcurrentDictionary _usedOrderIds = new(); public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus, TraderDataProvider tradeDataProvider, IOptions options, IDbContextFactory dbContextFactory, PortfolioWrapper portfolioWrapper, ILogger logger) { _exchangeDataRecievingEnabled = options.Value.ExchangeDataRecievingEnabled; _eventBus = eventBus; _dbContextFactory = dbContextFactory; _investApiClient = investApiClient; _instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray(); _logger = logger; _managedAccountNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray(); _tradeDataProvider = tradeDataProvider; _portfolioWrapper = portfolioWrapper; } public async Task StartAsync(CancellationToken cancellationToken) { await _tradeDataProvider.Init(); _logger.LogInformation("Инициализация приемника данных с биржи"); var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns); foreach (var acc in accounts) { await _portfolioWrapper.AddAccount(acc.Key, acc.Value); } _ = CycleSubscribtion(accounts.Keys.ToArray()); } private async Task CycleSubscribtion(string[] accounts) { while (true) { try { if (_exchangeDataRecievingEnabled) { var t1 = SubscribeTrades(accounts); var t2 = SubscribeExchangeData(); await Task.WhenAll(t1, t2); } await Task.Delay(1000); } catch (Exception ex) { _logger.LogError(ex, "Ошибка в одном из стримов получения данных от биржи."); } } } private async Task SubscribeTrades(string[] accounts) { var req = new TradesStreamRequest(); foreach (var a in accounts) { req.Accounts.Add(a); } using var stream = _investApiClient.OrdersStream.TradesStream(req); await foreach (var response in stream.ResponseStream.ReadAllAsync()) { if (response.OrderTrades?.Trades != null) { if (_usedOrderIds.TryAdd(response.OrderTrades.OrderId, DateTime.UtcNow)) { _ = _portfolioWrapper.Accounts[response.OrderTrades.AccountId].LoadPortfolio(); } } } } private async Task SubscribeExchangeData() { using var stream = _investApiClient.MarketDataStream.MarketDataStream(); var tradesRequest = new SubscribeTradesRequest { SubscriptionAction = SubscriptionAction.Subscribe }; var bookRequest = new SubscribeOrderBookRequest { SubscriptionAction = SubscriptionAction.Subscribe }; foreach (var f in _instrumentsFigis) { tradesRequest.Instruments.Add( new TradeInstrument() { InstrumentId = f }); bookRequest.Instruments.Add( new OrderBookInstrument() { InstrumentId = f, Depth = 10 }); } await stream.RequestStream.WriteAsync(new MarketDataRequest { SubscribeTradesRequest = tradesRequest, }); await stream.RequestStream.WriteAsync(new MarketDataRequest { SubscribeOrderBookRequest = bookRequest }); var lastUpdateDict = new Dictionary(); var pricesBuffer = new List(); var orderbookItemsBuffer = new List(); var lastWrite = DateTime.UtcNow; await foreach (var response in stream.ResponseStream.ReadAllAsync()) { if (response.Trade != null) { var message = new PriceChange() { Figi = response.Trade.Figi, Ticker = _tradeDataProvider.GetTickerByFigi(response.Trade.Figi), Time = response.Trade.Time.ToDateTime().ToUniversalTime(), Price = response.Trade.Price, IsHistoricalData = false, Direction = (int)response.Trade.Direction, Count = response.Trade.Quantity, }; await _eventBus.Broadcast(message); pricesBuffer.Add(message); } if (response.Orderbook != null) { var asks = response.Orderbook.Asks.Take(4).Select(a => new OrderbookItem() { Count = a.Quantity, Price = a.Price, Figi = response.Orderbook.Figi, Ticker = _tradeDataProvider.GetTickerByFigi(response.Orderbook.Figi), ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.Ask, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), }).ToArray(); var bids = response.Orderbook.Bids.Take(4).Select(a => new OrderbookItem() { Count = a.Quantity, Price = a.Price, Figi = response.Orderbook.Figi, Ticker = _tradeDataProvider.GetTickerByFigi(response.Orderbook.Figi), ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.Bid, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), }).ToArray(); orderbookItemsBuffer.AddRange(asks); orderbookItemsBuffer.AddRange(bids); var message = new NewOrderbookMessage() { Ticker = _tradeDataProvider.GetTickerByFigi(response.Orderbook.Figi), Figi = response.Orderbook.Figi, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), Asks = asks, Bids = bids, AsksCount = asks.Length, BidsCount = asks.Length, }; await _eventBus.Broadcast(message); } if (orderbookItemsBuffer.Count + pricesBuffer.Count > 100 || (DateTime.UtcNow - lastWrite).TotalSeconds > 5) { try { using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; lastWrite = DateTime.UtcNow; if (orderbookItemsBuffer.Count > 0) { await context.OrderbookItems.AddRangeAsync(orderbookItemsBuffer); orderbookItemsBuffer.Clear(); } if (pricesBuffer.Count > 0) { await context.PriceChanges.AddRangeAsync(pricesBuffer); pricesBuffer.Clear(); } await context.SaveChangesAsync(); } catch (Exception ex) { _logger.LogError(ex, "Ошибка при сохранении данных биржи."); } } } } public Task StopAsync(CancellationToken cancellationToken) { _cts.Cancel(); return Task.CompletedTask; } } }