Смена источника данных
test / deploy_trader_prod (push) Successful in 1m41s Details

dev
vlad zverzhkhovskiy 2025-09-08 14:51:28 +03:00
parent 092920e2f1
commit f7e6b8080b
1 changed files with 33 additions and 21 deletions

View File

@ -72,10 +72,10 @@ namespace KLHZ.Trader.Core.Exchange.Services
{ {
using var stream = _investApiClient.MarketDataStream.MarketDataStream(); using var stream = _investApiClient.MarketDataStream.MarketDataStream();
var request = new SubscribeLastPriceRequest //var request = new SubscribeLastPriceRequest
{ //{
SubscriptionAction = SubscriptionAction.Subscribe // SubscriptionAction = SubscriptionAction.Subscribe
}; //};
var tradesRequest = new SubscribeTradesRequest var tradesRequest = new SubscribeTradesRequest
{ {
@ -89,11 +89,11 @@ namespace KLHZ.Trader.Core.Exchange.Services
foreach (var f in _instrumentsFigis) foreach (var f in _instrumentsFigis)
{ {
request.Instruments.Add( //request.Instruments.Add(
new LastPriceInstrument() // new LastPriceInstrument()
{ // {
InstrumentId = f // InstrumentId = f
}); // });
tradesRequest.Instruments.Add( tradesRequest.Instruments.Add(
new TradeInstrument() new TradeInstrument()
@ -109,10 +109,10 @@ namespace KLHZ.Trader.Core.Exchange.Services
}); });
} }
await stream.RequestStream.WriteAsync(new MarketDataRequest //await stream.RequestStream.WriteAsync(new MarketDataRequest
{ //{
SubscribeLastPriceRequest = request, // SubscribeLastPriceRequest = request,
}); //});
await stream.RequestStream.WriteAsync(new MarketDataRequest await stream.RequestStream.WriteAsync(new MarketDataRequest
{ {
@ -129,22 +129,34 @@ namespace KLHZ.Trader.Core.Exchange.Services
var lastWrite = DateTime.UtcNow; var lastWrite = DateTime.UtcNow;
await foreach (var response in stream.ResponseStream.ReadAllAsync()) await foreach (var response in stream.ResponseStream.ReadAllAsync())
{ {
if (response.LastPrice != null) //if (response.LastPrice != null)
//{
// var message = new PriceChange()
// {
// Figi = response.LastPrice.Figi,
// Ticker = _tradeDataProvider.GetTickerByFigi(response.LastPrice.Figi),
// Time = response.LastPrice.Time.ToDateTime().ToUniversalTime(),
// Value = response.LastPrice.Price,
// IsHistoricalData = false,
// };
// await _eventBus.Broadcast(message);
// pricesBuffer.Add(message);
//}
if (response.Trade != null)
{ {
var message = new PriceChange() var message = new PriceChange()
{ {
Figi = response.LastPrice.Figi, Figi = response.Trade.Figi,
Ticker = _tradeDataProvider.GetTickerByFigi(response.LastPrice.Figi), Ticker = _tradeDataProvider.GetTickerByFigi(response.Trade.Figi),
Time = response.LastPrice.Time.ToDateTime().ToUniversalTime(), Time = response.Trade.Time.ToDateTime().ToUniversalTime(),
Value = response.LastPrice.Price, Value = response.Trade.Price,
IsHistoricalData = false, IsHistoricalData = false,
}; };
await _eventBus.Broadcast(message); await _eventBus.Broadcast(message);
pricesBuffer.Add(message); pricesBuffer.Add(message);
}
if (response.Trade != null)
{
var trade = new KLHZ.Trader.Core.DataLayer.Entities.Trades.InstrumentTrade() var trade = new KLHZ.Trader.Core.DataLayer.Entities.Trades.InstrumentTrade()
{ {
Figi = response.Trade.Figi, Figi = response.Trade.Figi,