194 lines
7.7 KiB
C#
194 lines
7.7 KiB
C#
using Grpc.Core;
|
||
using KLHZ.Trader.Core.Common.Messaging.Contracts;
|
||
using KLHZ.Trader.Core.DataLayer;
|
||
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
|
||
using KLHZ.Trader.Core.Exchange.Extentions;
|
||
using Microsoft.EntityFrameworkCore;
|
||
using Microsoft.Extensions.Hosting;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Options;
|
||
using System.Collections.Concurrent;
|
||
using Tinkoff.InvestApi;
|
||
using Tinkoff.InvestApi.V1;
|
||
using Candle = KLHZ.Trader.Core.DataLayer.Entities.Prices.Candle;
|
||
|
||
namespace KLHZ.Trader.Core.Exchange.Services
|
||
{
|
||
public class ExchangeDataReader : IHostedService
|
||
{
|
||
private readonly InvestApiClient _investApiClient;
|
||
private readonly string[] _instrumentsFigis = [];
|
||
private readonly string[] _managedAccountNamePatterns;
|
||
private readonly ILogger<ExchangeDataReader> _logger;
|
||
private readonly ConcurrentDictionary<string, string> _tickersCache = new();
|
||
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
|
||
private readonly CancellationTokenSource _cts = new();
|
||
private readonly IDataBus _eventBus;
|
||
private readonly bool _exchangeDataRecievingEnabled;
|
||
public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus,
|
||
IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory,
|
||
ILogger<ExchangeDataReader> logger)
|
||
{
|
||
_exchangeDataRecievingEnabled = options.Value.ExchangeDataRecievingEnabled;
|
||
_eventBus = eventBus;
|
||
_dbContextFactory = dbContextFactory;
|
||
_investApiClient = investApiClient;
|
||
_instrumentsFigis = options.Value.AllowedInstrumentsFigis.ToArray();
|
||
_logger = logger;
|
||
_managedAccountNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray();
|
||
}
|
||
|
||
public async Task StartAsync(CancellationToken cancellationToken)
|
||
{
|
||
_logger.LogInformation("Инициализация приемника данных с биржи");
|
||
var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns);
|
||
await InitCache();
|
||
_ = CycleSubscribtion(accounts);
|
||
}
|
||
|
||
private async Task InitCache()
|
||
{
|
||
var shares = await _investApiClient.Instruments.SharesAsync();
|
||
foreach (var share in shares.Instruments)
|
||
{
|
||
//if (_instrumentsFigis.Contains(share.Figi))
|
||
{
|
||
_tickersCache.TryAdd(share.Figi, share.Ticker);
|
||
}
|
||
}
|
||
var futures = await _investApiClient.Instruments.FuturesAsync();
|
||
foreach (var future in futures.Instruments)
|
||
{
|
||
//if (_instrumentsFigis.Contains(future.Figi))
|
||
{
|
||
_tickersCache.TryAdd(future.Figi, future.Ticker);
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task CycleSubscribtion(string[] accounts)
|
||
{
|
||
while (true)
|
||
{
|
||
try
|
||
{
|
||
if (_exchangeDataRecievingEnabled)
|
||
{
|
||
await SubscribePrices();
|
||
}
|
||
await Task.Delay(1000);
|
||
//await SubscribeCandles();
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "Ошибка в одном из стримов получения данных от биржи.");
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task SubscribePrices()
|
||
{
|
||
using var stream = _investApiClient.MarketDataStream.MarketDataStream();
|
||
|
||
var request = new SubscribeLastPriceRequest
|
||
{
|
||
SubscriptionAction = SubscriptionAction.Subscribe
|
||
};
|
||
|
||
foreach (var f in _instrumentsFigis)
|
||
{
|
||
request.Instruments.Add(
|
||
new LastPriceInstrument()
|
||
{
|
||
InstrumentId = f
|
||
});
|
||
}
|
||
|
||
await stream.RequestStream.WriteAsync(new MarketDataRequest
|
||
{
|
||
SubscribeLastPriceRequest = request,
|
||
});
|
||
|
||
await foreach (var response in stream.ResponseStream.ReadAllAsync())
|
||
{
|
||
if (response.LastPrice != null)
|
||
{
|
||
var message = new PriceChange()
|
||
{
|
||
Figi = response.LastPrice.Figi,
|
||
Ticker = GetTickerByFigi(response.LastPrice.Figi),
|
||
Time = response.LastPrice.Time.ToDateTime().ToUniversalTime(),
|
||
Value = response.LastPrice.Price,
|
||
IsHistoricalData = false,
|
||
};
|
||
await _eventBus.BroadcastNewPrice(message);
|
||
using var context = await _dbContextFactory.CreateDbContextAsync();
|
||
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
||
await context.PriceChanges.AddAsync(message);
|
||
await context.SaveChangesAsync();
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task SubscribeCandles()
|
||
{
|
||
using var stream = _investApiClient.MarketDataStream.MarketDataStream();
|
||
|
||
var request = new SubscribeCandlesRequest
|
||
{
|
||
SubscriptionAction = SubscriptionAction.Subscribe,
|
||
CandleSourceType = GetCandlesRequest.Types.CandleSource.Exchange
|
||
};
|
||
|
||
foreach (var f in _instrumentsFigis)
|
||
{
|
||
request.Instruments.Add(
|
||
new CandleInstrument()
|
||
{
|
||
InstrumentId = f,
|
||
Interval = SubscriptionInterval.OneMinute
|
||
});
|
||
}
|
||
|
||
await stream.RequestStream.WriteAsync(new MarketDataRequest
|
||
{
|
||
SubscribeCandlesRequest = request,
|
||
});
|
||
|
||
await foreach (var response in stream.ResponseStream.ReadAllAsync())
|
||
{
|
||
if (response.Candle != null)
|
||
{
|
||
var message = new Candle()
|
||
{
|
||
Figi = response.Candle.Figi,
|
||
Ticker = GetTickerByFigi(response.LastPrice.Figi),
|
||
Time = response.Candle.Time.ToDateTime().ToUniversalTime(),
|
||
Close = response.Candle.Close,
|
||
Open = response.Candle.Open,
|
||
Low = response.Candle.Low,
|
||
High = response.Candle.High,
|
||
Volume = response.Candle.Volume,
|
||
IsHistoricalData = false,
|
||
};
|
||
await _eventBus.BroadcastNewCandle(message);
|
||
using var context = await _dbContextFactory.CreateDbContextAsync();
|
||
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
||
await context.Candles.AddAsync(message);
|
||
await context.SaveChangesAsync();
|
||
}
|
||
}
|
||
}
|
||
private string GetTickerByFigi(string figi)
|
||
{
|
||
return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty;
|
||
}
|
||
|
||
public Task StopAsync(CancellationToken cancellationToken)
|
||
{
|
||
_cts.Cancel();
|
||
return Task.CompletedTask;
|
||
}
|
||
}
|
||
}
|