using Grpc.Core; using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Orders; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.DataLayer.Entities.Trades; using KLHZ.Trader.Core.Exchange.Extentions; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Collections.Concurrent; using Tinkoff.InvestApi; using Tinkoff.InvestApi.V1; namespace KLHZ.Trader.Core.Exchange.Services { public class ExchangeDataReader : IHostedService { private readonly InvestApiClient _investApiClient; private readonly string[] _instrumentsFigis = []; private readonly string[] _managedAccountNamePatterns; private readonly ILogger _logger; private readonly ConcurrentDictionary _tickersCache = new(); private readonly IDbContextFactory _dbContextFactory; private readonly CancellationTokenSource _cts = new(); private readonly IDataBus _eventBus; private readonly bool _exchangeDataRecievingEnabled; public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) { _exchangeDataRecievingEnabled = options.Value.ExchangeDataRecievingEnabled; _eventBus = eventBus; _dbContextFactory = dbContextFactory; _investApiClient = investApiClient; _instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray(); _logger = logger; _managedAccountNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray(); } public async Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("Инициализация приемника данных с биржи"); var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns); await InitCache(); _ = CycleSubscribtion(accounts); } private async Task InitCache() { var shares = await _investApiClient.Instruments.SharesAsync(); foreach (var share in shares.Instruments) { if (_instrumentsFigis.Contains(share.Figi)) { _tickersCache.TryAdd(share.Figi, share.Ticker); } } var futures = await _investApiClient.Instruments.FuturesAsync(); foreach (var future in futures.Instruments) { if (_instrumentsFigis.Contains(future.Figi)) { _tickersCache.TryAdd(future.Figi, future.Ticker); } } } private async Task CycleSubscribtion(string[] accounts) { while (true) { try { if (_exchangeDataRecievingEnabled) { await SubscribePrices(); } await Task.Delay(1000); //await SubscribeCandles(); } catch (Exception ex) { _logger.LogError(ex, "Ошибка в одном из стримов получения данных от биржи."); } } } private async Task SubscribePrices() { using var stream = _investApiClient.MarketDataStream.MarketDataStream(); var request = new SubscribeLastPriceRequest { SubscriptionAction = SubscriptionAction.Subscribe }; var tradesRequest = new SubscribeTradesRequest { SubscriptionAction = SubscriptionAction.Subscribe }; var bookRequest = new SubscribeOrderBookRequest { SubscriptionAction = SubscriptionAction.Subscribe }; foreach (var f in _instrumentsFigis) { request.Instruments.Add( new LastPriceInstrument() { InstrumentId = f }); tradesRequest.Instruments.Add( new TradeInstrument() { InstrumentId = f }); bookRequest.Instruments.Add( new OrderBookInstrument() { InstrumentId = f, Depth = 10 }); } await stream.RequestStream.WriteAsync(new MarketDataRequest { SubscribeLastPriceRequest = request, }); await stream.RequestStream.WriteAsync(new MarketDataRequest { SubscribeTradesRequest = tradesRequest, }); await stream.RequestStream.WriteAsync(new MarketDataRequest { SubscribeOrderBookRequest = bookRequest }); using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var pricesBuffer = new List(); var orderbookItemsBuffer = new List(); var tradesBuffer = new List(); var lastWriteOrderbooks = DateTime.UtcNow; var lastWriteTrades = DateTime.UtcNow; var lastWritePrices = DateTime.UtcNow; var lastWrite = DateTime.UtcNow; await foreach (var response in stream.ResponseStream.ReadAllAsync()) { if (response.LastPrice != null) { var message = new PriceChange() { Figi = response.LastPrice.Figi, Ticker = GetTickerByFigi(response.LastPrice.Figi), Time = response.LastPrice.Time.ToDateTime().ToUniversalTime(), Value = response.LastPrice.Price, IsHistoricalData = false, }; await _eventBus.Broadcast(message); pricesBuffer.Add(message); } if (response.Trade != null) { var trade = new KLHZ.Trader.Core.DataLayer.Entities.Trades.InstrumentTrade() { Figi = response.Trade.Figi, BoughtAt = response.Trade.Time.ToDateTime().ToUniversalTime(), Ticker = GetTickerByFigi(response.Trade.Figi), Price = response.Trade.Price, Count = response.Trade.Quantity, Direction = response.Trade.Direction == Tinkoff.InvestApi.V1.TradeDirection.Sell ? DataLayer.Entities.Trades.Enums.TradeDirection.Sell : DataLayer.Entities.Trades.Enums.TradeDirection.Buy, }; tradesBuffer.Add(trade); } if (response.Orderbook != null) { var asksSummary = new OrderbookItem() { Figi = response.Orderbook.Figi, Ticker = GetTickerByFigi(response.Orderbook.Figi), Count = response.Orderbook.Asks.Sum(a => (int)a.Quantity), ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.AsksSummary, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), }; var bidsSummary = new OrderbookItem() { Figi = response.Orderbook.Figi, Ticker = GetTickerByFigi(response.Orderbook.Figi), Count = response.Orderbook.Bids.Sum(a => (int)a.Quantity), ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.BidsSummary, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), }; orderbookItemsBuffer.Add(asksSummary); orderbookItemsBuffer.Add(bidsSummary); var message = new NewOrderbookMessage() { Ticker = GetTickerByFigi(response.Orderbook.Figi), Figi = response.Orderbook.Figi, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), AsksCount = asksSummary.Count, BidsCount = bidsSummary.Count, }; await _eventBus.Broadcast(message); } if (orderbookItemsBuffer.Count + pricesBuffer.Count + tradesBuffer.Count > 1000 || (DateTime.UtcNow - lastWrite).TotalSeconds > 10) { lastWrite = DateTime.UtcNow; if (orderbookItemsBuffer.Count > 0) { await context.OrderbookItems.AddRangeAsync(orderbookItemsBuffer); orderbookItemsBuffer.Clear(); } if (pricesBuffer.Count > 0) { await context.PriceChanges.AddRangeAsync(pricesBuffer); pricesBuffer.Clear(); } if (tradesBuffer.Count > 0) { await context.InstrumentTrades.AddRangeAsync(tradesBuffer); tradesBuffer.Clear(); } await context.SaveChangesAsync(); } } } private string GetTickerByFigi(string figi) { return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty; } public Task StopAsync(CancellationToken cancellationToken) { _cts.Cancel(); return Task.CompletedTask; } } }