klhztrader/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs

194 lines
7.8 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using Grpc.Core;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
using KLHZ.Trader.Core.DataLayer.Entities.Trades;
using KLHZ.Trader.Core.Exchange.Extentions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using Tinkoff.InvestApi;
using Tinkoff.InvestApi.V1;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class ExchangeDataReader : IHostedService
{
private readonly InvestApiClient _investApiClient;
private readonly string[] _instrumentsFigis = [];
private readonly string[] _managedAccountNamePatterns;
private readonly ILogger<ExchangeDataReader> _logger;
private readonly ConcurrentDictionary<string, string> _tickersCache = new();
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
private readonly CancellationTokenSource _cts = new();
private readonly IDataBus _eventBus;
private readonly bool _exchangeDataRecievingEnabled;
public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus,
IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory,
ILogger<ExchangeDataReader> logger)
{
_exchangeDataRecievingEnabled = options.Value.ExchangeDataRecievingEnabled;
_eventBus = eventBus;
_dbContextFactory = dbContextFactory;
_investApiClient = investApiClient;
_instrumentsFigis = options.Value.AllowedInstrumentsFigis.ToArray();
_logger = logger;
_managedAccountNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Инициализация приемника данных с биржи");
var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns);
await InitCache();
_ = CycleSubscribtion(accounts);
}
private async Task InitCache()
{
var shares = await _investApiClient.Instruments.SharesAsync();
foreach (var share in shares.Instruments)
{
//if (_instrumentsFigis.Contains(share.Figi))
{
_tickersCache.TryAdd(share.Figi, share.Ticker);
}
}
var futures = await _investApiClient.Instruments.FuturesAsync();
foreach (var future in futures.Instruments)
{
//if (_instrumentsFigis.Contains(future.Figi))
{
_tickersCache.TryAdd(future.Figi, future.Ticker);
}
}
}
private async Task CycleSubscribtion(string[] accounts)
{
while (true)
{
try
{
if (_exchangeDataRecievingEnabled)
{
await SubscribePrices();
}
await Task.Delay(1000);
//await SubscribeCandles();
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка в одном из стримов получения данных от биржи.");
}
}
}
private async Task SubscribePrices()
{
using var stream = _investApiClient.MarketDataStream.MarketDataStream();
var request = new SubscribeLastPriceRequest
{
SubscriptionAction = SubscriptionAction.Subscribe
};
var tradesRequest = new SubscribeTradesRequest
{
SubscriptionAction = SubscriptionAction.Subscribe
};
foreach (var f in _instrumentsFigis)
{
request.Instruments.Add(
new LastPriceInstrument()
{
InstrumentId = f
});
tradesRequest.Instruments.Add(
new TradeInstrument()
{
InstrumentId = f
});
}
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeLastPriceRequest = request,
});
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeTradesRequest = tradesRequest,
});
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
var pricesBuffer = new List<PriceChange>();
var tradesBuffer = new List<InstrumentTrade>();
var lastWritePrices = DateTime.UtcNow;
var lastWriteTrades = DateTime.UtcNow;
await foreach (var response in stream.ResponseStream.ReadAllAsync())
{
if (response.LastPrice != null)
{
var message = new PriceChange()
{
Figi = response.LastPrice.Figi,
Ticker = GetTickerByFigi(response.LastPrice.Figi),
Time = response.LastPrice.Time.ToDateTime().ToUniversalTime(),
Value = response.LastPrice.Price,
IsHistoricalData = false,
};
await _eventBus.BroadcastNewPrice(message);
pricesBuffer.Add(message);
}
if (response.Trade != null)
{
var trade = new KLHZ.Trader.Core.DataLayer.Entities.Trades.InstrumentTrade()
{
Figi = response.Trade.Figi,
BoughtAt = response.Trade.Time.ToDateTime().ToUniversalTime(),
Ticker = GetTickerByFigi(response.Trade.Figi),
Price = response.Trade.Price,
Count = response.Trade.Quantity,
Direction = response.Trade.Direction == Tinkoff.InvestApi.V1.TradeDirection.Sell ? KLHZ.Trader.Core.DataLayer.Entities.Trades.TradeDirection.Sell : KLHZ.Trader.Core.DataLayer.Entities.Trades.TradeDirection.Buy,
};
tradesBuffer.Add(trade);
}
//if (pricesBuffer.Count > 200 || (DateTime.UtcNow - lastWritePrices).TotalSeconds > 10)
{
lastWritePrices = DateTime.UtcNow;
await context.PriceChanges.AddRangeAsync(pricesBuffer);
pricesBuffer.Clear();
await context.SaveChangesAsync();
}
//if (tradesBuffer.Count > 200 || (DateTime.UtcNow - lastWriteTrades).TotalSeconds > 10)
{
lastWriteTrades = DateTime.UtcNow;
await context.InstrumentTrades.AddRangeAsync(tradesBuffer);
tradesBuffer.Clear();
await context.SaveChangesAsync();
}
}
}
private string GetTickerByFigi(string figi)
{
return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();
return Task.CompletedTask;
}
}
}