добавил сбор данных по стаканам.

main
vlad zverzhkhovskiy 2025-09-02 14:25:46 +03:00
parent d634cfac73
commit 45b34b4509
24 changed files with 309 additions and 52 deletions

View File

@ -8,5 +8,8 @@ namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces
public int Length { get; }
public ValueTask AddData(INewPrice priceChange);
public ValueTask<(DateTime[] timestamps, float[] prices)> GetData();
public ValueTask AddOrderbook(IOrderbook orderbook);
public int AsksCount { get; }
public int BidsCount { get; }
}
}

View File

@ -0,0 +1,13 @@
namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces
{
public interface IOrderbook
{
public string Ticker { get; }
public string Figi { get; }
public int AsksCount {get;}
public int BidsCount {get;}
public DateTime Time { get; }
public IOrderbookItem[] Asks { get; }
public IOrderbookItem[] Bids { get; }
}
}

View File

@ -0,0 +1,8 @@
namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces
{
public interface IOrderbookItem
{
public decimal Count { get; }
public decimal Price { get; }
}
}

View File

@ -0,0 +1,15 @@
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos
{
public class NewOrderbookMessage : IOrderbook
{
public required string Ticker { get; init; }
public required string Figi { get; init; }
public DateTime Time { get; init; }
public IOrderbookItem[] Asks { get; init; } = [];
public IOrderbookItem[] Bids { get; init; } = [];
public int AsksCount { get; init; }
public int BidsCount { get; init; }
}
}

View File

@ -6,14 +6,16 @@ namespace KLHZ.Trader.Core.Contracts.Messaging.Interfaces
{
public interface IDataBus
{
public bool AddChannel(string key, Channel<IOrderbook> channel);
public bool AddChannel(string key, Channel<IProcessedPrice> channel);
public bool AddChannel(string key, Channel<INewPrice> channel);
public bool AddChannel(string key, Channel<TradeCommand> channel);
public bool AddChannel(string key, Channel<IMessage> channel);
public bool AddChannel(string key, Channel<INewCandle> channel);
public Task BroadcastNewPrice(INewPrice newPriceMessage);
public Task BroadcastCommand(TradeCommand command);
public Task BroadcastNewCandle(INewCandle command);
public Task BroadcastProcessedPrice(IProcessedPrice command);
public Task Broadcast(INewPrice newPriceMessage);
public Task Broadcast(TradeCommand command);
public Task Broadcast(INewCandle command);
public Task Broadcast(IProcessedPrice command);
public Task Broadcast(IOrderbook orderbook);
}
}

View File

@ -20,6 +20,9 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
}
}
public int AsksCount => 1;
public int BidsCount => 1;
private readonly object _locker = new();
private readonly float[] Prices = new float[CacheMaxLength];
private readonly DateTime[] Timestamps = new DateTime[CacheMaxLength];
@ -56,6 +59,11 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
}
}
public ValueTask AddOrderbook(IOrderbook orderbook)
{
return ValueTask.CompletedTask;
}
public PriceHistoryCacheUnit(string figi, params INewPrice[] priceChanges)
{
Figi = figi;

View File

@ -21,6 +21,28 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
}
}
public int AsksCount
{
get
{
lock (_locker)
{
return _asksCount;
}
}
}
public int BidsCount
{
get
{
lock (_locker)
{
return _bidsCount;
}
}
}
private readonly object _locker = new();
private readonly float[] Prices = new float[_arrayMaxLength];
private readonly DateTime[] Timestamps = new DateTime[_arrayMaxLength];
@ -28,8 +50,12 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
private int _length = 0;
private int _pointer = -1;
private int _asksCount = 1;
private int _bidsCount = 1;
public ValueTask AddData(INewPrice priceChange)
{
if (priceChange.Figi != Figi) return ValueTask.CompletedTask;
lock (_locker)
{
_pointer++;
@ -69,6 +95,17 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache
}
}
public ValueTask AddOrderbook(IOrderbook orderbook)
{
if (orderbook.Figi != Figi) return ValueTask.CompletedTask;
lock (_locker)
{
_asksCount = orderbook.AsksCount;
_bidsCount = orderbook.BidsCount;
}
return ValueTask.CompletedTask;
}
public PriceHistoryCacheUnit2(string figi, params INewPrice[] priceChanges)
{
Figi = figi;

View File

@ -8,6 +8,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services
{
public class DataBus : IDataBus
{
private readonly ConcurrentDictionary<string, Channel<IOrderbook>> _orderbooksChannels = new();
private readonly ConcurrentDictionary<string, Channel<IMessage>> _messagesChannels = new();
private readonly ConcurrentDictionary<string, Channel<INewCandle>> _candlesChannels = new();
private readonly ConcurrentDictionary<string, Channel<INewPrice>> _priceChannels = new();
@ -39,7 +40,12 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services
return _commandChannels.TryAdd(key, channel);
}
public async Task BroadcastNewPrice(INewPrice newPriceMessage)
public bool AddChannel(string key, Channel<IOrderbook> channel)
{
return _orderbooksChannels.TryAdd(key, channel);
}
public async Task Broadcast(INewPrice newPriceMessage)
{
foreach (var channel in _priceChannels.Values)
{
@ -47,7 +53,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services
}
}
public async Task BroadcastProcessedPrice(IProcessedPrice mess)
public async Task Broadcast(IProcessedPrice mess)
{
foreach (var channel in _processedPricesChannels.Values)
{
@ -55,7 +61,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services
}
}
public async Task BroadcastNewCandle(INewCandle newPriceMessage)
public async Task Broadcast(INewCandle newPriceMessage)
{
foreach (var channel in _candlesChannels.Values)
{
@ -63,12 +69,20 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services
}
}
public async Task BroadcastCommand(TradeCommand command)
public async Task Broadcast(TradeCommand command)
{
foreach (var channel in _commandChannels.Values)
{
await channel.Writer.WriteAsync(command);
}
}
public async Task Broadcast(IOrderbook orderbook)
{
foreach (var channel in _orderbooksChannels.Values)
{
await channel.Writer.WriteAsync(orderbook);
}
}
}
}

View File

@ -1,4 +1,5 @@
using System.ComponentModel.DataAnnotations.Schema;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
using System.ComponentModel.DataAnnotations.Schema;
namespace KLHZ.Trader.Core.DataLayer.Entities.Declisions
{

View File

@ -1,4 +1,4 @@
namespace KLHZ.Trader.Core.DataLayer.Entities.Declisions
namespace KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums
{
public enum DeclisionTradeAction
{

View File

@ -0,0 +1,11 @@
namespace KLHZ.Trader.Core.DataLayer.Entities.Orders.Enums
{
public enum OrderbookItemType
{
Unknown = 0,
Ask = 1,
Bid = 2,
AsksSummary = 3,
BidsSummary = 4
}
}

View File

@ -0,0 +1,32 @@
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.DataLayer.Entities.Orders.Enums;
using System.ComponentModel.DataAnnotations.Schema;
namespace KLHZ.Trader.Core.DataLayer.Entities.Orders
{
[Table("orderbook_items")]
public class OrderbookItem : IOrderbookItem
{
[Column("id")]
public long Id { get; set; }
[Column("time")]
public DateTime Time { get; set; }
[Column("price")]
public decimal Price { get; set; }
[Column("count")]
public decimal Count { get; set; }
[Column("figi")]
public required string Figi { get; set; }
[Column("ticker")]
public required string Ticker { get; set; }
[Column("item_type")]
public OrderbookItemType ItemType { get; set; }
}
}

View File

@ -1,4 +1,4 @@
namespace KLHZ.Trader.Core.DataLayer.Entities.Trades
namespace KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums
{
public enum AssetType
{

View File

@ -1,4 +1,4 @@
namespace KLHZ.Trader.Core.DataLayer.Entities.Trades
namespace KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums
{
public enum PositionType
{

View File

@ -1,4 +1,4 @@
namespace KLHZ.Trader.Core.DataLayer.Entities.Trades
namespace KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums
{
public enum TradeDirection
{

View File

@ -1,4 +1,5 @@
using System.ComponentModel.DataAnnotations.Schema;
using KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums;
using System.ComponentModel.DataAnnotations.Schema;
namespace KLHZ.Trader.Core.DataLayer.Entities.Trades
{

View File

@ -1,4 +1,5 @@
using System.ComponentModel.DataAnnotations.Schema;
using KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums;
using System.ComponentModel.DataAnnotations.Schema;
namespace KLHZ.Trader.Core.DataLayer.Entities.Trades
{

View File

@ -1,4 +1,5 @@
using KLHZ.Trader.Core.DataLayer.Entities.Declisions;
using KLHZ.Trader.Core.DataLayer.Entities.Orders;
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
using KLHZ.Trader.Core.DataLayer.Entities.Trades;
using Microsoft.EntityFrameworkCore;
@ -14,6 +15,7 @@ namespace KLHZ.Trader.Core.DataLayer
public DbSet<PriceChange> PriceChanges { get; set; }
public DbSet<ProcessedPrice> ProcessedPrices { get; set; }
public DbSet<Candle> Candles { get; set; }
public DbSet<OrderbookItem> OrderbookItems { get; set; }
public TraderDbContext(DbContextOptions<TraderDbContext> options)
: base(options)
{
@ -60,6 +62,15 @@ namespace KLHZ.Trader.Core.DataLayer
v => DateTime.SpecifyKind(v, DateTimeKind.Utc));
});
modelBuilder.Entity<OrderbookItem>(entity =>
{
entity.HasKey(e1 => e1.Id);
entity.Property(e => e.Time)
.HasConversion(
v => v.ToUniversalTime(),
v => DateTime.SpecifyKind(v, DateTimeKind.Utc));
});
modelBuilder.Entity<ProcessedPrice>(entity =>
{
entity.HasKey(e1 => e1.Id);

View File

@ -1,6 +1,8 @@
using Grpc.Core;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.DataLayer.Entities.Orders;
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
using KLHZ.Trader.Core.DataLayer.Entities.Trades;
using KLHZ.Trader.Core.Exchange.Extentions;
@ -51,7 +53,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
var shares = await _investApiClient.Instruments.SharesAsync();
foreach (var share in shares.Instruments)
{
//if (_instrumentsFigis.Contains(share.Figi))
if (_instrumentsFigis.Contains(share.Figi))
{
_tickersCache.TryAdd(share.Figi, share.Ticker);
}
@ -59,7 +61,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
var futures = await _investApiClient.Instruments.FuturesAsync();
foreach (var future in futures.Instruments)
{
//if (_instrumentsFigis.Contains(future.Figi))
if (_instrumentsFigis.Contains(future.Figi))
{
_tickersCache.TryAdd(future.Figi, future.Ticker);
}
@ -100,37 +102,49 @@ namespace KLHZ.Trader.Core.Exchange.Services
SubscriptionAction = SubscriptionAction.Subscribe
};
var bookRequest = new SubscribeOrderBookRequest
{
SubscriptionAction = SubscriptionAction.Subscribe
};
foreach (var f in _instrumentsFigis)
{
request.Instruments.Add(
new LastPriceInstrument()
{
InstrumentId = f
});
//request.Instruments.Add(
// new LastPriceInstrument()
// {
// InstrumentId = f
// });
tradesRequest.Instruments.Add(
new TradeInstrument()
//tradesRequest.Instruments.Add(
// new TradeInstrument()
// {
// InstrumentId = f
// });
bookRequest.Instruments.Add(
new OrderBookInstrument()
{
InstrumentId = f
InstrumentId = f,
Depth = 10
});
}
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeLastPriceRequest = request,
});
await stream.RequestStream.WriteAsync(new MarketDataRequest
{
SubscribeTradesRequest = tradesRequest,
SubscribeOrderBookRequest = bookRequest
});
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
var pricesBuffer = new List<PriceChange>();
var orderbookItemsBuffer = new List<OrderbookItem>();
var tradesBuffer = new List<InstrumentTrade>();
var lastWritePrices = DateTime.UtcNow;
var lastWriteOrderbooks = DateTime.UtcNow;
var lastWriteTrades = DateTime.UtcNow;
var lastWritePrices = DateTime.UtcNow;
var lastWrite = DateTime.UtcNow;
await foreach (var response in stream.ResponseStream.ReadAllAsync())
{
if (response.LastPrice != null)
@ -143,7 +157,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
Value = response.LastPrice.Price,
IsHistoricalData = false,
};
await _eventBus.BroadcastNewPrice(message);
await _eventBus.Broadcast(message);
pricesBuffer.Add(message);
}
@ -156,24 +170,78 @@ namespace KLHZ.Trader.Core.Exchange.Services
Ticker = GetTickerByFigi(response.Trade.Figi),
Price = response.Trade.Price,
Count = response.Trade.Quantity,
Direction = response.Trade.Direction == Tinkoff.InvestApi.V1.TradeDirection.Sell ? KLHZ.Trader.Core.DataLayer.Entities.Trades.TradeDirection.Sell : KLHZ.Trader.Core.DataLayer.Entities.Trades.TradeDirection.Buy,
Direction = response.Trade.Direction == Tinkoff.InvestApi.V1.TradeDirection.Sell ? DataLayer.Entities.Trades.Enums.TradeDirection.Sell : DataLayer.Entities.Trades.Enums.TradeDirection.Buy,
};
tradesBuffer.Add(trade);
}
//if (pricesBuffer.Count > 200 || (DateTime.UtcNow - lastWritePrices).TotalSeconds > 10)
if (response.Orderbook != null)
{
lastWritePrices = DateTime.UtcNow;
await context.PriceChanges.AddRangeAsync(pricesBuffer);
pricesBuffer.Clear();
await context.SaveChangesAsync();
var asks = response.Orderbook.Asks.Select(a => new OrderbookItem()
{
Figi = response.Orderbook.Figi,
Ticker = GetTickerByFigi(response.Orderbook.Figi),
ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.Ask,
Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(),
Price = a.Price,
Count = a.Quantity,
}).ToArray();
orderbookItemsBuffer.AddRange(asks);
var bids = response.Orderbook.Bids.Select(a => new OrderbookItem()
{
Figi = response.Orderbook.Figi,
Ticker = GetTickerByFigi(response.Orderbook.Figi),
ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.Bid,
Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(),
Price = a.Price,
Count = a.Quantity,
}).ToArray();
orderbookItemsBuffer.AddRange(bids);
orderbookItemsBuffer.Add(new OrderbookItem()
{
Figi = response.Orderbook.Figi,
Ticker = GetTickerByFigi(response.Orderbook.Figi),
Count = bids.Sum(a => (int)a.Count),
ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.BidsSummary,
Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(),
});
var message = new NewOrderbookMessage()
{
Ticker = GetTickerByFigi(response.Orderbook.Figi),
Figi = response.Orderbook.Figi,
Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(),
Asks = asks,
Bids = bids,
};
orderbookItemsBuffer.Add(new OrderbookItem()
{
Figi = response.Orderbook.Figi,
Ticker = GetTickerByFigi(response.Orderbook.Figi),
Count = asks.Sum(a => (int)a.Count),
ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.AsksSummary,
Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(),
});
await _eventBus.Broadcast(message);
}
//if (tradesBuffer.Count > 200 || (DateTime.UtcNow - lastWriteTrades).TotalSeconds > 10)
if (orderbookItemsBuffer.Count + pricesBuffer.Count + tradesBuffer.Count > 1000 || (DateTime.UtcNow - lastWrite).TotalSeconds > 10)
{
lastWriteTrades = DateTime.UtcNow;
await context.InstrumentTrades.AddRangeAsync(tradesBuffer);
tradesBuffer.Clear();
lastWrite = DateTime.UtcNow;
if (orderbookItemsBuffer.Count > 0)
{
await context.OrderbookItems.AddRangeAsync(orderbookItemsBuffer);
orderbookItemsBuffer.Clear();
}
if (pricesBuffer.Count > 0)
{
await context.PriceChanges.AddRangeAsync(pricesBuffer);
pricesBuffer.Clear();
}
if (tradesBuffer.Count > 0)
{
await context.InstrumentTrades.AddRangeAsync(tradesBuffer);
tradesBuffer.Clear();
}
await context.SaveChangesAsync();
}
}

View File

@ -236,9 +236,9 @@ namespace KLHZ.Trader.Core.Exchange.Services
BoughtAt = DateTime.UtcNow,
Count = res.LotsExecuted,
Price = res.ExecutedOrderPrice,
Position = DataLayer.Entities.Trades.PositionType.Long,
Direction = DataLayer.Entities.Trades.TradeDirection.Buy,
Asset = DataLayer.Entities.Trades.AssetType.Common,
Position = DataLayer.Entities.Trades.Enums.PositionType.Long,
Direction = DataLayer.Entities.Trades.Enums.TradeDirection.Buy,
Asset = DataLayer.Entities.Trades.Enums.AssetType.Common,
};
await context.Trades.AddAsync(newTrade);

View File

@ -6,6 +6,7 @@ using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
using KLHZ.Trader.Core.Exchange.Extentions;
using KLHZ.Trader.Core.Exchange.Models;
using KLHZ.Trader.Core.Math.Declisions.Services.Cache;
@ -42,8 +43,10 @@ namespace KLHZ.Trader.Core.Exchange.Services
private readonly string[] _managedAccountsNamePatterns = [];
private readonly Channel<INewPrice> _pricesChannel = Channel.CreateUnbounded<INewPrice>();
private readonly Channel<IOrderbook> _ordersbookChannel = Channel.CreateUnbounded<IOrderbook>();
public Trader(
ILogger<Trader> logger,
ITradingEventsDetector tradingEventsDetector,
BotModeSwitcher botModeSwitcher,
IServiceProvider provider,
@ -52,6 +55,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
IDbContextFactory<TraderDbContext> dbContextFactory,
InvestApiClient investApiClient)
{
_logger = logger;
_tradingEventsDetector = tradingEventsDetector;
_botModeSwitcher = botModeSwitcher;
_dataBus = dataBus;
@ -87,10 +91,12 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
_dataBus.AddChannel(nameof(Trader), _pricesChannel);
_ = ProcessMessages();
_dataBus.AddChannel(nameof(Trader), _ordersbookChannel);
_ = ProcessPrices();
_ = ProcessOrdersbooks();
}
private async Task ProcessMessages()
private async Task ProcessPrices()
{
while (await _pricesChannel.Reader.WaitToReadAsync())
{
@ -146,6 +152,20 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
}
private async Task ProcessOrdersbooks()
{
while (await _ordersbookChannel.Reader.WaitToReadAsync())
{
var message = await _ordersbookChannel.Reader.ReadAsync();
if (!_historyCash.TryGetValue(message.Figi, out var data))
{
data = new PriceHistoryCacheUnit2(message.Figi);
_historyCash.TryAdd(message.Figi, data);
}
await data.AddOrderbook(message);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;

View File

@ -82,7 +82,7 @@ namespace KLHZ.Trader.Core.TG.Services
RecomendPrice = null,
Figi = "BBG004730N88",
};
await _eventBus.BroadcastCommand(command);
await _eventBus.Broadcast(command);
break;
}
case "продать сбер":
@ -95,7 +95,7 @@ namespace KLHZ.Trader.Core.TG.Services
Count = 1,
LotsCount = 1,
};
await _eventBus.BroadcastCommand(command);
await _eventBus.Broadcast(command);
break;
}
case "купить сбер":
@ -107,7 +107,7 @@ namespace KLHZ.Trader.Core.TG.Services
Figi = "BBG004730N88",
Count = 1
};
await _eventBus.BroadcastCommand(command);
await _eventBus.Broadcast(command);
break;
}
}

View File

@ -0,0 +1,12 @@
drop table if exists instrument_trades;
create table instrument_trades
(
trade_id bigserial,
bought_at timestamp default current_timestamp,
figi text not null,
ticker text not null,
price decimal not null,
count decimal not null,
direction int not null default 1,
primary key (trade_id)
);

View File

@ -41,7 +41,7 @@ namespace KLHZ.Trader.Service.Controllers
foreach (var mess in data)
{
await _dataBus.BroadcastNewPrice(mess);
await _dataBus.Broadcast(mess);
}
}
catch (Exception ex)