klhztrader/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs

257 lines
11 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using Grpc.Core;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.DataLayer.Entities.Orders;
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
using KLHZ.Trader.Core.DataLayer.Entities.Trades;
using KLHZ.Trader.Core.Exchange.Extentions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using Tinkoff.InvestApi;
using Tinkoff.InvestApi.V1;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class ExchangeDataReader : IHostedService
{
private readonly InvestApiClient _investApiClient;
private readonly string[] _instrumentsFigis = [];
private readonly string[] _managedAccountNamePatterns;
private readonly ILogger<ExchangeDataReader> _logger;
private readonly ConcurrentDictionary<string, string> _tickersCache = new();
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
private readonly CancellationTokenSource _cts = new();
private readonly IDataBus _eventBus;
private readonly bool _exchangeDataRecievingEnabled;
public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus,
IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory,
ILogger<ExchangeDataReader> logger)
{
_exchangeDataRecievingEnabled = options.Value.ExchangeDataRecievingEnabled;
_eventBus = eventBus;
_dbContextFactory = dbContextFactory;
_investApiClient = investApiClient;
_instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray();
_logger = logger;
_managedAccountNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Инициализация приемника данных с биржи");
var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns);
await InitCache();
_ = CycleSubscribtion(accounts);
}
private async Task InitCache()
{
var shares = await _investApiClient.Instruments.SharesAsync();
foreach (var share in shares.Instruments)
{
if (_instrumentsFigis.Contains(share.Figi))
{
_tickersCache.TryAdd(share.Figi, share.Ticker);
}
}
var futures = await _investApiClient.Instruments.FuturesAsync();
foreach (var future in futures.Instruments)
{
if (_instrumentsFigis.Contains(future.Figi))
{
_tickersCache.TryAdd(future.Figi, future.Ticker);
}
}
}
private async Task CycleSubscribtion(string[] accounts)
{
while (true)
{
try
{
if (_exchangeDataRecievingEnabled)
{
await SubscribePrices();
}
await Task.Delay(1000);
//await SubscribeCandles();
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка в одном из стримов получения данных от биржи.");
}
}
}
private async Task SubscribePrices()
{
using var stream = _investApiClient.MarketDataStream.MarketDataStream();
var request = new SubscribeLastPriceRequest
{
SubscriptionAction = SubscriptionAction.Subscribe
};
var tradesRequest = new SubscribeTradesRequest
{
SubscriptionAction = SubscriptionAction.Subscribe
};
var bookRequest = new SubscribeOrderBookRequest
{
SubscriptionAction = SubscriptionAction.Subscribe
};
foreach (var f in _instrumentsFigis)
{
request.Instruments.Add(
new LastPriceInstrument()
{
InstrumentId = f
});
tradesRequest.Instruments.Add(
new TradeInstrument()
{
InstrumentId = f
});
bookRequest.Instruments.Add(
new OrderBookInstrument()
{
InstrumentId = f,
Depth = 10
});
}
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeLastPriceRequest = request,
});
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeTradesRequest = tradesRequest,
});
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeOrderBookRequest = bookRequest
});
var pricesBuffer = new List<PriceChange>();
var orderbookItemsBuffer = new List<OrderbookItem>();
var tradesBuffer = new List<InstrumentTrade>();
var lastWrite = DateTime.UtcNow;
await foreach (var response in stream.ResponseStream.ReadAllAsync())
{
if (response.LastPrice != null)
{
var message = new PriceChange()
{
Figi = response.LastPrice.Figi,
Ticker = GetTickerByFigi(response.LastPrice.Figi),
Time = response.LastPrice.Time.ToDateTime().ToUniversalTime(),
Value = response.LastPrice.Price,
IsHistoricalData = false,
};
await _eventBus.Broadcast(message);
pricesBuffer.Add(message);
}
if (response.Trade != null)
{
var trade = new KLHZ.Trader.Core.DataLayer.Entities.Trades.InstrumentTrade()
{
Figi = response.Trade.Figi,
BoughtAt = response.Trade.Time.ToDateTime().ToUniversalTime(),
Ticker = GetTickerByFigi(response.Trade.Figi),
Price = response.Trade.Price,
Count = response.Trade.Quantity,
Direction = response.Trade.Direction == Tinkoff.InvestApi.V1.TradeDirection.Sell ? DataLayer.Entities.Trades.Enums.TradeDirection.Sell : DataLayer.Entities.Trades.Enums.TradeDirection.Buy,
};
tradesBuffer.Add(trade);
}
if (response.Orderbook != null)
{
var asksSummary = new OrderbookItem()
{
Figi = response.Orderbook.Figi,
Ticker = GetTickerByFigi(response.Orderbook.Figi),
Count = response.Orderbook.Asks.Sum(a => (int)a.Quantity),
ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.AsksSummary,
Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(),
};
var bidsSummary = new OrderbookItem()
{
Figi = response.Orderbook.Figi,
Ticker = GetTickerByFigi(response.Orderbook.Figi),
Count = response.Orderbook.Bids.Sum(a => (int)a.Quantity),
ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.BidsSummary,
Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(),
};
orderbookItemsBuffer.Add(asksSummary);
orderbookItemsBuffer.Add(bidsSummary);
var message = new NewOrderbookMessage()
{
Ticker = GetTickerByFigi(response.Orderbook.Figi),
Figi = response.Orderbook.Figi,
Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(),
AsksCount = asksSummary.Count,
BidsCount = bidsSummary.Count,
};
await _eventBus.Broadcast(message);
}
if (orderbookItemsBuffer.Count + pricesBuffer.Count + tradesBuffer.Count > 0 || (DateTime.UtcNow - lastWrite).TotalSeconds > 10)
{
try
{
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
lastWrite = DateTime.UtcNow;
if (orderbookItemsBuffer.Count > 0)
{
await context.OrderbookItems.AddRangeAsync(orderbookItemsBuffer);
orderbookItemsBuffer.Clear();
}
if (pricesBuffer.Count > 0)
{
await context.PriceChanges.AddRangeAsync(pricesBuffer);
pricesBuffer.Clear();
}
if (tradesBuffer.Count > 0)
{
await context.InstrumentTrades.AddRangeAsync(tradesBuffer);
tradesBuffer.Clear();
}
await context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при сохранении данных биржи.");
}
}
}
}
private string GetTickerByFigi(string figi)
{
return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();
return Task.CompletedTask;
}
}
}