klhztrader/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs

190 lines
7.4 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using Grpc.Core;
using KLHZ.Trader.Core.Common.Messaging.Contracts;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
using KLHZ.Trader.Core.Exchange.Extentions;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using Tinkoff.InvestApi;
using Tinkoff.InvestApi.V1;
using Candle = KLHZ.Trader.Core.DataLayer.Entities.Prices.Candle;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class ExchangeDataReader : IHostedService
{
private readonly InvestApiClient _investApiClient;
private readonly string[] _instrumentsFigis = [];
private readonly string[] _managedAccountNamePatterns;
private readonly ILogger<ExchangeDataReader> _logger;
private readonly ConcurrentDictionary<string, string> _tickersCache = new();
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
private readonly CancellationTokenSource _cts = new();
private readonly IDataBus _eventBus;
public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus,
IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory,
ILogger<ExchangeDataReader> logger)
{
_eventBus = eventBus;
_dbContextFactory = dbContextFactory;
_investApiClient = investApiClient;
_instrumentsFigis = options.Value.AllowedInstrumentsFigis.ToArray();
_logger = logger;
_managedAccountNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Инициализация приемника данных с биржи");
var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns);
await InitCache();
_ = CycleSubscribtion(accounts);
}
private async Task InitCache()
{
var shares = await _investApiClient.Instruments.SharesAsync();
foreach (var share in shares.Instruments)
{
//if (_instrumentsFigis.Contains(share.Figi))
{
_tickersCache.TryAdd(share.Figi, share.Ticker);
}
}
var futures = await _investApiClient.Instruments.FuturesAsync();
foreach (var future in futures.Instruments)
{
//if (_instrumentsFigis.Contains(future.Figi))
{
_tickersCache.TryAdd(future.Figi, future.Ticker);
}
}
}
private async Task CycleSubscribtion(string[] accounts)
{
while (true)
{
try
{
//await SubscribePrices();
await Task.Delay(1000);
//await SubscribeCandles();
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка в одном из стримов получения данных от биржи.");
}
}
}
private async Task SubscribePrices()
{
using var stream = _investApiClient.MarketDataStream.MarketDataStream();
var request = new SubscribeLastPriceRequest
{
SubscriptionAction = SubscriptionAction.Subscribe
};
foreach (var f in _instrumentsFigis)
{
request.Instruments.Add(
new LastPriceInstrument()
{
InstrumentId = f
});
}
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeLastPriceRequest = request,
});
await foreach (var response in stream.ResponseStream.ReadAllAsync())
{
if (response.LastPrice != null)
{
var message = new PriceChange()
{
Figi = response.LastPrice.Figi,
Ticker = GetTickerByFigi(response.LastPrice.Figi),
Time = response.LastPrice.Time.ToDateTime().ToUniversalTime(),
Value = response.LastPrice.Price,
IsHistoricalData = false,
};
await _eventBus.BroadcastNewPrice(message);
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
await context.PriceChanges.AddAsync(message);
await context.SaveChangesAsync();
}
}
}
private async Task SubscribeCandles()
{
using var stream = _investApiClient.MarketDataStream.MarketDataStream();
var request = new SubscribeCandlesRequest
{
SubscriptionAction = SubscriptionAction.Subscribe,
CandleSourceType = GetCandlesRequest.Types.CandleSource.Exchange
};
foreach (var f in _instrumentsFigis)
{
request.Instruments.Add(
new CandleInstrument()
{
InstrumentId = f,
Interval = SubscriptionInterval.OneMinute
});
}
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeCandlesRequest = request,
});
await foreach (var response in stream.ResponseStream.ReadAllAsync())
{
if (response.Candle != null)
{
var message = new Candle()
{
Figi = response.Candle.Figi,
Ticker = GetTickerByFigi(response.LastPrice.Figi),
Time = response.Candle.Time.ToDateTime().ToUniversalTime(),
Close = response.Candle.Close,
Open = response.Candle.Open,
Low = response.Candle.Low,
High = response.Candle.High,
Volume = response.Candle.Volume,
IsHistoricalData = false,
};
await _eventBus.BroadcastNewCandle(message);
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
await context.Candles.AddAsync(message);
await context.SaveChangesAsync();
}
}
}
private string GetTickerByFigi(string figi)
{
return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();
return Task.CompletedTask;
}
}
}