From 45b34b4509b16f27c4618c67164d6499e3dffe9f Mon Sep 17 00:00:00 2001 From: vlad zverzhkhovskiy Date: Tue, 2 Sep 2025 14:25:46 +0300 Subject: [PATCH] =?UTF-8?q?=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20?= =?UTF-8?q?=D1=81=D0=B1=D0=BE=D1=80=20=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B=D1=85?= =?UTF-8?q?=20=D0=BF=D0=BE=20=D1=81=D1=82=D0=B0=D0=BA=D0=B0=D0=BD=D0=B0?= =?UTF-8?q?=D0=BC.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Interfaces/IPriceHistoryCacheUnit.cs | 3 + .../Messaging/Dtos/Interfaces/IOrderbook.cs | 13 ++ .../Dtos/Interfaces/IOrderbookItem.cs | 8 ++ .../Messaging/Dtos/NewOrderbookMessage.cs | 15 +++ .../Messaging/Interfaces/IDataBus.cs | 10 +- .../Services/Cache/PriceHistoryCacheUnit.cs | 8 ++ .../Services/Cache/PriceHistoryCacheUnit2.cs | 37 ++++++ .../Common/Messaging/Services/DataBus.cs | 22 +++- .../Entities/Declisions/Declision.cs | 3 +- .../{ => Enums}/DeclisionTradeAction.cs | 2 +- .../Orders/Enums/OrderbookItemType.cs | 11 ++ .../Entities/Orders/OrderbookItem.cs | 32 +++++ .../Entities/Trades/{ => Enums}/AssetType.cs | 2 +- .../Trades/{ => Enums}/PositionType.cs | 2 +- .../Trades/{ => Enums}/TradeDirection.cs | 2 +- .../Entities/Trades/InstrumentTrade.cs | 3 +- .../DataLayer/Entities/Trades/Trade.cs | 3 +- KLHZ.Trader.Core/DataLayer/TraderDbContext.cs | 11 ++ .../Exchange/Services/ExchangeDataReader.cs | 124 ++++++++++++++---- .../Exchange/Services/ManagedAccount.cs | 6 +- KLHZ.Trader.Core/Exchange/Services/Trader.cs | 24 +++- .../TG/Services/BotMessagesHandler.cs | 6 +- .../postgres/migration3.sql | 12 ++ .../Controllers/PlayController.cs | 2 +- 24 files changed, 309 insertions(+), 52 deletions(-) create mode 100644 KLHZ.Trader.Core.Contracts/Messaging/Dtos/Interfaces/IOrderbook.cs create mode 100644 KLHZ.Trader.Core.Contracts/Messaging/Dtos/Interfaces/IOrderbookItem.cs create mode 100644 KLHZ.Trader.Core.Contracts/Messaging/Dtos/NewOrderbookMessage.cs rename KLHZ.Trader.Core/DataLayer/Entities/Declisions/{ => Enums}/DeclisionTradeAction.cs (69%) create mode 100644 KLHZ.Trader.Core/DataLayer/Entities/Orders/Enums/OrderbookItemType.cs create mode 100644 KLHZ.Trader.Core/DataLayer/Entities/Orders/OrderbookItem.cs rename KLHZ.Trader.Core/DataLayer/Entities/Trades/{ => Enums}/AssetType.cs (58%) rename KLHZ.Trader.Core/DataLayer/Entities/Trades/{ => Enums}/PositionType.cs (58%) rename KLHZ.Trader.Core/DataLayer/Entities/Trades/{ => Enums}/TradeDirection.cs (58%) create mode 100644 KLHZ.Trader.Infrastructure/postgres/migration3.sql diff --git a/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs b/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs index b3a40a7..5ba349d 100644 --- a/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs +++ b/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs @@ -8,5 +8,8 @@ namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces public int Length { get; } public ValueTask AddData(INewPrice priceChange); public ValueTask<(DateTime[] timestamps, float[] prices)> GetData(); + public ValueTask AddOrderbook(IOrderbook orderbook); + public int AsksCount { get; } + public int BidsCount { get; } } } diff --git a/KLHZ.Trader.Core.Contracts/Messaging/Dtos/Interfaces/IOrderbook.cs b/KLHZ.Trader.Core.Contracts/Messaging/Dtos/Interfaces/IOrderbook.cs new file mode 100644 index 0000000..d83e9c0 --- /dev/null +++ b/KLHZ.Trader.Core.Contracts/Messaging/Dtos/Interfaces/IOrderbook.cs @@ -0,0 +1,13 @@ +namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces +{ + public interface IOrderbook + { + public string Ticker { get; } + public string Figi { get; } + public int AsksCount {get;} + public int BidsCount {get;} + public DateTime Time { get; } + public IOrderbookItem[] Asks { get; } + public IOrderbookItem[] Bids { get; } + } +} diff --git a/KLHZ.Trader.Core.Contracts/Messaging/Dtos/Interfaces/IOrderbookItem.cs b/KLHZ.Trader.Core.Contracts/Messaging/Dtos/Interfaces/IOrderbookItem.cs new file mode 100644 index 0000000..7b763ea --- /dev/null +++ b/KLHZ.Trader.Core.Contracts/Messaging/Dtos/Interfaces/IOrderbookItem.cs @@ -0,0 +1,8 @@ +namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces +{ + public interface IOrderbookItem + { + public decimal Count { get; } + public decimal Price { get; } + } +} diff --git a/KLHZ.Trader.Core.Contracts/Messaging/Dtos/NewOrderbookMessage.cs b/KLHZ.Trader.Core.Contracts/Messaging/Dtos/NewOrderbookMessage.cs new file mode 100644 index 0000000..e3fac91 --- /dev/null +++ b/KLHZ.Trader.Core.Contracts/Messaging/Dtos/NewOrderbookMessage.cs @@ -0,0 +1,15 @@ +using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; + +namespace KLHZ.Trader.Core.Contracts.Messaging.Dtos +{ + public class NewOrderbookMessage : IOrderbook + { + public required string Ticker { get; init; } + public required string Figi { get; init; } + public DateTime Time { get; init; } + public IOrderbookItem[] Asks { get; init; } = []; + public IOrderbookItem[] Bids { get; init; } = []; + public int AsksCount { get; init; } + public int BidsCount { get; init; } + } +} diff --git a/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs b/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs index 825192d..baf6f86 100644 --- a/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs +++ b/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs @@ -6,14 +6,16 @@ namespace KLHZ.Trader.Core.Contracts.Messaging.Interfaces { public interface IDataBus { + public bool AddChannel(string key, Channel channel); public bool AddChannel(string key, Channel channel); public bool AddChannel(string key, Channel channel); public bool AddChannel(string key, Channel channel); public bool AddChannel(string key, Channel channel); public bool AddChannel(string key, Channel channel); - public Task BroadcastNewPrice(INewPrice newPriceMessage); - public Task BroadcastCommand(TradeCommand command); - public Task BroadcastNewCandle(INewCandle command); - public Task BroadcastProcessedPrice(IProcessedPrice command); + public Task Broadcast(INewPrice newPriceMessage); + public Task Broadcast(TradeCommand command); + public Task Broadcast(INewCandle command); + public Task Broadcast(IProcessedPrice command); + public Task Broadcast(IOrderbook orderbook); } } diff --git a/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit.cs b/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit.cs index 05ee3a6..c1c2550 100644 --- a/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit.cs +++ b/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit.cs @@ -20,6 +20,9 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache } } + public int AsksCount => 1; + public int BidsCount => 1; + private readonly object _locker = new(); private readonly float[] Prices = new float[CacheMaxLength]; private readonly DateTime[] Timestamps = new DateTime[CacheMaxLength]; @@ -56,6 +59,11 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache } } + public ValueTask AddOrderbook(IOrderbook orderbook) + { + return ValueTask.CompletedTask; + } + public PriceHistoryCacheUnit(string figi, params INewPrice[] priceChanges) { Figi = figi; diff --git a/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit2.cs b/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit2.cs index b2479cd..a48dc0c 100644 --- a/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit2.cs +++ b/KLHZ.Trader.Core.Math/Declisions/Services/Cache/PriceHistoryCacheUnit2.cs @@ -21,6 +21,28 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache } } + public int AsksCount + { + get + { + lock (_locker) + { + return _asksCount; + } + } + } + + public int BidsCount + { + get + { + lock (_locker) + { + return _bidsCount; + } + } + } + private readonly object _locker = new(); private readonly float[] Prices = new float[_arrayMaxLength]; private readonly DateTime[] Timestamps = new DateTime[_arrayMaxLength]; @@ -28,8 +50,12 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache private int _length = 0; private int _pointer = -1; + private int _asksCount = 1; + private int _bidsCount = 1; + public ValueTask AddData(INewPrice priceChange) { + if (priceChange.Figi != Figi) return ValueTask.CompletedTask; lock (_locker) { _pointer++; @@ -69,6 +95,17 @@ namespace KLHZ.Trader.Core.Math.Declisions.Services.Cache } } + public ValueTask AddOrderbook(IOrderbook orderbook) + { + if (orderbook.Figi != Figi) return ValueTask.CompletedTask; + lock (_locker) + { + _asksCount = orderbook.AsksCount; + _bidsCount = orderbook.BidsCount; + } + return ValueTask.CompletedTask; + } + public PriceHistoryCacheUnit2(string figi, params INewPrice[] priceChanges) { Figi = figi; diff --git a/KLHZ.Trader.Core/Common/Messaging/Services/DataBus.cs b/KLHZ.Trader.Core/Common/Messaging/Services/DataBus.cs index 198fa0d..49a009b 100644 --- a/KLHZ.Trader.Core/Common/Messaging/Services/DataBus.cs +++ b/KLHZ.Trader.Core/Common/Messaging/Services/DataBus.cs @@ -8,6 +8,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services { public class DataBus : IDataBus { + private readonly ConcurrentDictionary> _orderbooksChannels = new(); private readonly ConcurrentDictionary> _messagesChannels = new(); private readonly ConcurrentDictionary> _candlesChannels = new(); private readonly ConcurrentDictionary> _priceChannels = new(); @@ -39,7 +40,12 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services return _commandChannels.TryAdd(key, channel); } - public async Task BroadcastNewPrice(INewPrice newPriceMessage) + public bool AddChannel(string key, Channel channel) + { + return _orderbooksChannels.TryAdd(key, channel); + } + + public async Task Broadcast(INewPrice newPriceMessage) { foreach (var channel in _priceChannels.Values) { @@ -47,7 +53,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services } } - public async Task BroadcastProcessedPrice(IProcessedPrice mess) + public async Task Broadcast(IProcessedPrice mess) { foreach (var channel in _processedPricesChannels.Values) { @@ -55,7 +61,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services } } - public async Task BroadcastNewCandle(INewCandle newPriceMessage) + public async Task Broadcast(INewCandle newPriceMessage) { foreach (var channel in _candlesChannels.Values) { @@ -63,12 +69,20 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services } } - public async Task BroadcastCommand(TradeCommand command) + public async Task Broadcast(TradeCommand command) { foreach (var channel in _commandChannels.Values) { await channel.Writer.WriteAsync(command); } } + + public async Task Broadcast(IOrderbook orderbook) + { + foreach (var channel in _orderbooksChannels.Values) + { + await channel.Writer.WriteAsync(orderbook); + } + } } } diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Declisions/Declision.cs b/KLHZ.Trader.Core/DataLayer/Entities/Declisions/Declision.cs index 18d0531..a3db8c6 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Declisions/Declision.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Declisions/Declision.cs @@ -1,4 +1,5 @@ -using System.ComponentModel.DataAnnotations.Schema; +using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums; +using System.ComponentModel.DataAnnotations.Schema; namespace KLHZ.Trader.Core.DataLayer.Entities.Declisions { diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Declisions/DeclisionTradeAction.cs b/KLHZ.Trader.Core/DataLayer/Entities/Declisions/Enums/DeclisionTradeAction.cs similarity index 69% rename from KLHZ.Trader.Core/DataLayer/Entities/Declisions/DeclisionTradeAction.cs rename to KLHZ.Trader.Core/DataLayer/Entities/Declisions/Enums/DeclisionTradeAction.cs index 64350cf..66577f1 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Declisions/DeclisionTradeAction.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Declisions/Enums/DeclisionTradeAction.cs @@ -1,4 +1,4 @@ -namespace KLHZ.Trader.Core.DataLayer.Entities.Declisions +namespace KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums { public enum DeclisionTradeAction { diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Orders/Enums/OrderbookItemType.cs b/KLHZ.Trader.Core/DataLayer/Entities/Orders/Enums/OrderbookItemType.cs new file mode 100644 index 0000000..16ce2bf --- /dev/null +++ b/KLHZ.Trader.Core/DataLayer/Entities/Orders/Enums/OrderbookItemType.cs @@ -0,0 +1,11 @@ +namespace KLHZ.Trader.Core.DataLayer.Entities.Orders.Enums +{ + public enum OrderbookItemType + { + Unknown = 0, + Ask = 1, + Bid = 2, + AsksSummary = 3, + BidsSummary = 4 + } +} diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Orders/OrderbookItem.cs b/KLHZ.Trader.Core/DataLayer/Entities/Orders/OrderbookItem.cs new file mode 100644 index 0000000..b1b024f --- /dev/null +++ b/KLHZ.Trader.Core/DataLayer/Entities/Orders/OrderbookItem.cs @@ -0,0 +1,32 @@ +using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; +using KLHZ.Trader.Core.DataLayer.Entities.Orders.Enums; +using System.ComponentModel.DataAnnotations.Schema; + +namespace KLHZ.Trader.Core.DataLayer.Entities.Orders +{ + [Table("orderbook_items")] + public class OrderbookItem : IOrderbookItem + { + [Column("id")] + public long Id { get; set; } + + [Column("time")] + public DateTime Time { get; set; } + + [Column("price")] + public decimal Price { get; set; } + + [Column("count")] + public decimal Count { get; set; } + + [Column("figi")] + public required string Figi { get; set; } + + [Column("ticker")] + public required string Ticker { get; set; } + + [Column("item_type")] + public OrderbookItemType ItemType { get; set; } + + } +} diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Trades/AssetType.cs b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/AssetType.cs similarity index 58% rename from KLHZ.Trader.Core/DataLayer/Entities/Trades/AssetType.cs rename to KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/AssetType.cs index 59a30f1..7f148c6 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Trades/AssetType.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/AssetType.cs @@ -1,4 +1,4 @@ -namespace KLHZ.Trader.Core.DataLayer.Entities.Trades +namespace KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums { public enum AssetType { diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Trades/PositionType.cs b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/PositionType.cs similarity index 58% rename from KLHZ.Trader.Core/DataLayer/Entities/Trades/PositionType.cs rename to KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/PositionType.cs index 27c75c6..3b90439 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Trades/PositionType.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/PositionType.cs @@ -1,4 +1,4 @@ -namespace KLHZ.Trader.Core.DataLayer.Entities.Trades +namespace KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums { public enum PositionType { diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Trades/TradeDirection.cs b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/TradeDirection.cs similarity index 58% rename from KLHZ.Trader.Core/DataLayer/Entities/Trades/TradeDirection.cs rename to KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/TradeDirection.cs index 62a87f9..2042e4a 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Trades/TradeDirection.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Enums/TradeDirection.cs @@ -1,4 +1,4 @@ -namespace KLHZ.Trader.Core.DataLayer.Entities.Trades +namespace KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums { public enum TradeDirection { diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Trades/InstrumentTrade.cs b/KLHZ.Trader.Core/DataLayer/Entities/Trades/InstrumentTrade.cs index c0fdcb2..4293954 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Trades/InstrumentTrade.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Trades/InstrumentTrade.cs @@ -1,4 +1,5 @@ -using System.ComponentModel.DataAnnotations.Schema; +using KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums; +using System.ComponentModel.DataAnnotations.Schema; namespace KLHZ.Trader.Core.DataLayer.Entities.Trades { diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Trades/Trade.cs b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Trade.cs index f85138e..f898042 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Trades/Trade.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Trade.cs @@ -1,4 +1,5 @@ -using System.ComponentModel.DataAnnotations.Schema; +using KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums; +using System.ComponentModel.DataAnnotations.Schema; namespace KLHZ.Trader.Core.DataLayer.Entities.Trades { diff --git a/KLHZ.Trader.Core/DataLayer/TraderDbContext.cs b/KLHZ.Trader.Core/DataLayer/TraderDbContext.cs index 651811d..b319583 100644 --- a/KLHZ.Trader.Core/DataLayer/TraderDbContext.cs +++ b/KLHZ.Trader.Core/DataLayer/TraderDbContext.cs @@ -1,4 +1,5 @@ using KLHZ.Trader.Core.DataLayer.Entities.Declisions; +using KLHZ.Trader.Core.DataLayer.Entities.Orders; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.DataLayer.Entities.Trades; using Microsoft.EntityFrameworkCore; @@ -14,6 +15,7 @@ namespace KLHZ.Trader.Core.DataLayer public DbSet PriceChanges { get; set; } public DbSet ProcessedPrices { get; set; } public DbSet Candles { get; set; } + public DbSet OrderbookItems { get; set; } public TraderDbContext(DbContextOptions options) : base(options) { @@ -60,6 +62,15 @@ namespace KLHZ.Trader.Core.DataLayer v => DateTime.SpecifyKind(v, DateTimeKind.Utc)); }); + modelBuilder.Entity(entity => + { + entity.HasKey(e1 => e1.Id); + entity.Property(e => e.Time) + .HasConversion( + v => v.ToUniversalTime(), + v => DateTime.SpecifyKind(v, DateTimeKind.Utc)); + }); + modelBuilder.Entity(entity => { entity.HasKey(e1 => e1.Id); diff --git a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs index 266eb93..c52246d 100644 --- a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs @@ -1,6 +1,8 @@ using Grpc.Core; +using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer; +using KLHZ.Trader.Core.DataLayer.Entities.Orders; using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.DataLayer.Entities.Trades; using KLHZ.Trader.Core.Exchange.Extentions; @@ -51,7 +53,7 @@ namespace KLHZ.Trader.Core.Exchange.Services var shares = await _investApiClient.Instruments.SharesAsync(); foreach (var share in shares.Instruments) { - //if (_instrumentsFigis.Contains(share.Figi)) + if (_instrumentsFigis.Contains(share.Figi)) { _tickersCache.TryAdd(share.Figi, share.Ticker); } @@ -59,7 +61,7 @@ namespace KLHZ.Trader.Core.Exchange.Services var futures = await _investApiClient.Instruments.FuturesAsync(); foreach (var future in futures.Instruments) { - //if (_instrumentsFigis.Contains(future.Figi)) + if (_instrumentsFigis.Contains(future.Figi)) { _tickersCache.TryAdd(future.Figi, future.Ticker); } @@ -100,37 +102,49 @@ namespace KLHZ.Trader.Core.Exchange.Services SubscriptionAction = SubscriptionAction.Subscribe }; + var bookRequest = new SubscribeOrderBookRequest + { + SubscriptionAction = SubscriptionAction.Subscribe + }; + foreach (var f in _instrumentsFigis) { - request.Instruments.Add( - new LastPriceInstrument() - { - InstrumentId = f - }); + //request.Instruments.Add( + // new LastPriceInstrument() + // { + // InstrumentId = f + // }); - tradesRequest.Instruments.Add( - new TradeInstrument() + //tradesRequest.Instruments.Add( + // new TradeInstrument() + // { + // InstrumentId = f + // }); + + bookRequest.Instruments.Add( + new OrderBookInstrument() { - InstrumentId = f + InstrumentId = f, + Depth = 10 }); } await stream.RequestStream.WriteAsync(new MarketDataRequest { SubscribeLastPriceRequest = request, - }); - - await stream.RequestStream.WriteAsync(new MarketDataRequest - { SubscribeTradesRequest = tradesRequest, + SubscribeOrderBookRequest = bookRequest }); using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var pricesBuffer = new List(); + var orderbookItemsBuffer = new List(); var tradesBuffer = new List(); - var lastWritePrices = DateTime.UtcNow; + var lastWriteOrderbooks = DateTime.UtcNow; var lastWriteTrades = DateTime.UtcNow; + var lastWritePrices = DateTime.UtcNow; + var lastWrite = DateTime.UtcNow; await foreach (var response in stream.ResponseStream.ReadAllAsync()) { if (response.LastPrice != null) @@ -143,7 +157,7 @@ namespace KLHZ.Trader.Core.Exchange.Services Value = response.LastPrice.Price, IsHistoricalData = false, }; - await _eventBus.BroadcastNewPrice(message); + await _eventBus.Broadcast(message); pricesBuffer.Add(message); } @@ -156,24 +170,78 @@ namespace KLHZ.Trader.Core.Exchange.Services Ticker = GetTickerByFigi(response.Trade.Figi), Price = response.Trade.Price, Count = response.Trade.Quantity, - Direction = response.Trade.Direction == Tinkoff.InvestApi.V1.TradeDirection.Sell ? KLHZ.Trader.Core.DataLayer.Entities.Trades.TradeDirection.Sell : KLHZ.Trader.Core.DataLayer.Entities.Trades.TradeDirection.Buy, + Direction = response.Trade.Direction == Tinkoff.InvestApi.V1.TradeDirection.Sell ? DataLayer.Entities.Trades.Enums.TradeDirection.Sell : DataLayer.Entities.Trades.Enums.TradeDirection.Buy, }; tradesBuffer.Add(trade); } - - - //if (pricesBuffer.Count > 200 || (DateTime.UtcNow - lastWritePrices).TotalSeconds > 10) + if (response.Orderbook != null) { - lastWritePrices = DateTime.UtcNow; - await context.PriceChanges.AddRangeAsync(pricesBuffer); - pricesBuffer.Clear(); - await context.SaveChangesAsync(); + var asks = response.Orderbook.Asks.Select(a => new OrderbookItem() + { + Figi = response.Orderbook.Figi, + Ticker = GetTickerByFigi(response.Orderbook.Figi), + ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.Ask, + Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), + Price = a.Price, + Count = a.Quantity, + }).ToArray(); + orderbookItemsBuffer.AddRange(asks); + var bids = response.Orderbook.Bids.Select(a => new OrderbookItem() + { + Figi = response.Orderbook.Figi, + Ticker = GetTickerByFigi(response.Orderbook.Figi), + ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.Bid, + Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), + Price = a.Price, + Count = a.Quantity, + }).ToArray(); + orderbookItemsBuffer.AddRange(bids); + orderbookItemsBuffer.Add(new OrderbookItem() + { + Figi = response.Orderbook.Figi, + Ticker = GetTickerByFigi(response.Orderbook.Figi), + Count = bids.Sum(a => (int)a.Count), + ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.BidsSummary, + Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), + }); + + var message = new NewOrderbookMessage() + { + Ticker = GetTickerByFigi(response.Orderbook.Figi), + Figi = response.Orderbook.Figi, + Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), + Asks = asks, + Bids = bids, + }; + orderbookItemsBuffer.Add(new OrderbookItem() + { + Figi = response.Orderbook.Figi, + Ticker = GetTickerByFigi(response.Orderbook.Figi), + Count = asks.Sum(a => (int)a.Count), + ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.AsksSummary, + Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), + }); + await _eventBus.Broadcast(message); } - //if (tradesBuffer.Count > 200 || (DateTime.UtcNow - lastWriteTrades).TotalSeconds > 10) + + if (orderbookItemsBuffer.Count + pricesBuffer.Count + tradesBuffer.Count > 1000 || (DateTime.UtcNow - lastWrite).TotalSeconds > 10) { - lastWriteTrades = DateTime.UtcNow; - await context.InstrumentTrades.AddRangeAsync(tradesBuffer); - tradesBuffer.Clear(); + lastWrite = DateTime.UtcNow; + if (orderbookItemsBuffer.Count > 0) + { + await context.OrderbookItems.AddRangeAsync(orderbookItemsBuffer); + orderbookItemsBuffer.Clear(); + } + if (pricesBuffer.Count > 0) + { + await context.PriceChanges.AddRangeAsync(pricesBuffer); + pricesBuffer.Clear(); + } + if (tradesBuffer.Count > 0) + { + await context.InstrumentTrades.AddRangeAsync(tradesBuffer); + tradesBuffer.Clear(); + } await context.SaveChangesAsync(); } } diff --git a/KLHZ.Trader.Core/Exchange/Services/ManagedAccount.cs b/KLHZ.Trader.Core/Exchange/Services/ManagedAccount.cs index d6f8835..5afcdb8 100644 --- a/KLHZ.Trader.Core/Exchange/Services/ManagedAccount.cs +++ b/KLHZ.Trader.Core/Exchange/Services/ManagedAccount.cs @@ -236,9 +236,9 @@ namespace KLHZ.Trader.Core.Exchange.Services BoughtAt = DateTime.UtcNow, Count = res.LotsExecuted, Price = res.ExecutedOrderPrice, - Position = DataLayer.Entities.Trades.PositionType.Long, - Direction = DataLayer.Entities.Trades.TradeDirection.Buy, - Asset = DataLayer.Entities.Trades.AssetType.Common, + Position = DataLayer.Entities.Trades.Enums.PositionType.Long, + Direction = DataLayer.Entities.Trades.Enums.TradeDirection.Buy, + Asset = DataLayer.Entities.Trades.Enums.AssetType.Common, }; await context.Trades.AddAsync(newTrade); diff --git a/KLHZ.Trader.Core/Exchange/Services/Trader.cs b/KLHZ.Trader.Core/Exchange/Services/Trader.cs index b05104e..21898f2 100644 --- a/KLHZ.Trader.Core/Exchange/Services/Trader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/Trader.cs @@ -6,6 +6,7 @@ using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; +using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums; using KLHZ.Trader.Core.Exchange.Extentions; using KLHZ.Trader.Core.Exchange.Models; using KLHZ.Trader.Core.Math.Declisions.Services.Cache; @@ -42,8 +43,10 @@ namespace KLHZ.Trader.Core.Exchange.Services private readonly string[] _managedAccountsNamePatterns = []; private readonly Channel _pricesChannel = Channel.CreateUnbounded(); + private readonly Channel _ordersbookChannel = Channel.CreateUnbounded(); public Trader( + ILogger logger, ITradingEventsDetector tradingEventsDetector, BotModeSwitcher botModeSwitcher, IServiceProvider provider, @@ -52,6 +55,7 @@ namespace KLHZ.Trader.Core.Exchange.Services IDbContextFactory dbContextFactory, InvestApiClient investApiClient) { + _logger = logger; _tradingEventsDetector = tradingEventsDetector; _botModeSwitcher = botModeSwitcher; _dataBus = dataBus; @@ -87,10 +91,12 @@ namespace KLHZ.Trader.Core.Exchange.Services } _dataBus.AddChannel(nameof(Trader), _pricesChannel); - _ = ProcessMessages(); + _dataBus.AddChannel(nameof(Trader), _ordersbookChannel); + _ = ProcessPrices(); + _ = ProcessOrdersbooks(); } - private async Task ProcessMessages() + private async Task ProcessPrices() { while (await _pricesChannel.Reader.WaitToReadAsync()) { @@ -146,6 +152,20 @@ namespace KLHZ.Trader.Core.Exchange.Services } } + private async Task ProcessOrdersbooks() + { + while (await _ordersbookChannel.Reader.WaitToReadAsync()) + { + var message = await _ordersbookChannel.Reader.ReadAsync(); + if (!_historyCash.TryGetValue(message.Figi, out var data)) + { + data = new PriceHistoryCacheUnit2(message.Figi); + _historyCash.TryAdd(message.Figi, data); + } + await data.AddOrderbook(message); + } + } + public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; diff --git a/KLHZ.Trader.Core/TG/Services/BotMessagesHandler.cs b/KLHZ.Trader.Core/TG/Services/BotMessagesHandler.cs index f393ade..1cdab06 100644 --- a/KLHZ.Trader.Core/TG/Services/BotMessagesHandler.cs +++ b/KLHZ.Trader.Core/TG/Services/BotMessagesHandler.cs @@ -82,7 +82,7 @@ namespace KLHZ.Trader.Core.TG.Services RecomendPrice = null, Figi = "BBG004730N88", }; - await _eventBus.BroadcastCommand(command); + await _eventBus.Broadcast(command); break; } case "продать сбер": @@ -95,7 +95,7 @@ namespace KLHZ.Trader.Core.TG.Services Count = 1, LotsCount = 1, }; - await _eventBus.BroadcastCommand(command); + await _eventBus.Broadcast(command); break; } case "купить сбер": @@ -107,7 +107,7 @@ namespace KLHZ.Trader.Core.TG.Services Figi = "BBG004730N88", Count = 1 }; - await _eventBus.BroadcastCommand(command); + await _eventBus.Broadcast(command); break; } } diff --git a/KLHZ.Trader.Infrastructure/postgres/migration3.sql b/KLHZ.Trader.Infrastructure/postgres/migration3.sql new file mode 100644 index 0000000..52606f7 --- /dev/null +++ b/KLHZ.Trader.Infrastructure/postgres/migration3.sql @@ -0,0 +1,12 @@ +drop table if exists instrument_trades; +create table instrument_trades +( + trade_id bigserial, + bought_at timestamp default current_timestamp, + figi text not null, + ticker text not null, + price decimal not null, + count decimal not null, + direction int not null default 1, + primary key (trade_id) +); \ No newline at end of file diff --git a/KLHZ.Trader.Service/Controllers/PlayController.cs b/KLHZ.Trader.Service/Controllers/PlayController.cs index d28846e..7c7a670 100644 --- a/KLHZ.Trader.Service/Controllers/PlayController.cs +++ b/KLHZ.Trader.Service/Controllers/PlayController.cs @@ -41,7 +41,7 @@ namespace KLHZ.Trader.Service.Controllers foreach (var mess in data) { - await _dataBus.BroadcastNewPrice(mess); + await _dataBus.Broadcast(mess); } } catch (Exception ex)