diff --git a/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs b/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs index 2b7d17c..2d00782 100644 --- a/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs +++ b/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs @@ -13,7 +13,6 @@ namespace KLHZ.Trader.Core.Contracts.Messaging.Interfaces public bool AddChannel(string key, Channel channel); public Task Broadcast(INewPrice newPriceMessage); public Task Broadcast(ITradeCommand command); - public Task Broadcast(INewCandle command); public Task Broadcast(IProcessedPrice command); public Task Broadcast(IOrderbook orderbook); } diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Orders/Enums/OrderbookItemType.cs b/KLHZ.Trader.Core/DataLayer/Entities/Orders/Enums/OrderbookItemType.cs index 32d52cc..4f05592 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Orders/Enums/OrderbookItemType.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Orders/Enums/OrderbookItemType.cs @@ -9,6 +9,6 @@ BidsSummary10 = 4, AsksSummary4 = 5, BidsSummary4 = 6, - BidsAsksSummary4 = 7, + BidsAsksSummary4_2min = 7, } } diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Orders/OrderbookItem.cs b/KLHZ.Trader.Core/DataLayer/Entities/Orders/OrderbookItem.cs index 88ac5ab..a69d3ec 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Orders/OrderbookItem.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Orders/OrderbookItem.cs @@ -29,30 +29,4 @@ namespace KLHZ.Trader.Core.DataLayer.Entities.Orders public OrderbookItemType ItemType { get; set; } } - - [Table("orderbook_items_reserve")] - public class OrderbookItemr: IOrderbookItem - { - [Column("id")] - public long Id { get; set; } - - [Column("time")] - public DateTime Time { get; set; } - - [Column("price")] - public decimal Price { get; set; } - - [Column("count")] - public long Count { get; set; } - - [Column("figi")] - public required string Figi { get; set; } - - [Column("ticker")] - public required string Ticker { get; set; } - - [Column("item_type")] - public OrderbookItemType ItemType { get; set; } - - } } diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Prices/Candle.cs b/KLHZ.Trader.Core/DataLayer/Entities/Prices/Candle.cs deleted file mode 100644 index 670990f..0000000 --- a/KLHZ.Trader.Core/DataLayer/Entities/Prices/Candle.cs +++ /dev/null @@ -1,33 +0,0 @@ -using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; -using System.ComponentModel.DataAnnotations.Schema; - -namespace KLHZ.Trader.Core.DataLayer.Entities.Prices -{ - [Table("candles")] - public class Candle : INewCandle - { - [Column("time")] - public DateTime Time { get; set; } - - [Column("figi")] - public required string Figi { get; set; } - - [Column("open")] - public decimal Open { get; set; } - [Column("close")] - public decimal Close { get; set; } - [Column("volume")] - public decimal Volume { get; set; } - - [Column("high")] - public decimal High { get; set; } - [Column("low")] - public decimal Low { get; set; } - - [Column("ticker")] - public required string Ticker { get; set; } - - [NotMapped] - public bool IsHistoricalData { get; set; } - } -} diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Trades/InstrumentTrade.cs b/KLHZ.Trader.Core/DataLayer/Entities/Trades/InstrumentTrade.cs deleted file mode 100644 index 4293954..0000000 --- a/KLHZ.Trader.Core/DataLayer/Entities/Trades/InstrumentTrade.cs +++ /dev/null @@ -1,34 +0,0 @@ -using KLHZ.Trader.Core.DataLayer.Entities.Trades.Enums; -using System.ComponentModel.DataAnnotations.Schema; - -namespace KLHZ.Trader.Core.DataLayer.Entities.Trades -{ - /// - /// Обезличенная сделка с биржи над инструментом. - /// - [Table("instrument_trades")] - public class InstrumentTrade - { - [Column("trade_id")] - public long Id { get; set; } - - [Column("bought_at")] - public DateTime BoughtAt { get; set; } - - [Column("figi")] - public required string Figi { get; set; } - - [Column("ticker")] - public required string Ticker { get; set; } - - [Column("price")] - - public decimal Price { get; set; } - - [Column("count")] - public decimal Count { get; set; } - - [Column("direction")] - public TradeDirection Direction { get; set; } - } -} diff --git a/KLHZ.Trader.Core/DataLayer/TraderDbContext.cs b/KLHZ.Trader.Core/DataLayer/TraderDbContext.cs index d3227b6..245beb5 100644 --- a/KLHZ.Trader.Core/DataLayer/TraderDbContext.cs +++ b/KLHZ.Trader.Core/DataLayer/TraderDbContext.cs @@ -10,13 +10,10 @@ namespace KLHZ.Trader.Core.DataLayer public class TraderDbContext : DbContext { public DbSet Trades { get; set; } - public DbSet InstrumentTrades { get; set; } public DbSet Declisions { get; set; } public DbSet PriceChanges { get; set; } public DbSet ProcessedPrices { get; set; } - public DbSet Candles { get; set; } public DbSet OrderbookItems { get; set; } - public DbSet OrderbookItemsReserve { get; set; } public TraderDbContext(DbContextOptions options) : base(options) { @@ -35,15 +32,6 @@ namespace KLHZ.Trader.Core.DataLayer v => DateTime.SpecifyKind(v, DateTimeKind.Utc)); }); - modelBuilder.Entity(entity => - { - entity.HasKey(e1 => e1.Id); - entity.Property(e => e.BoughtAt) - .HasConversion( - v => v.ToUniversalTime(), - v => DateTime.SpecifyKind(v, DateTimeKind.Utc)); - }); - modelBuilder.Entity(entity => { entity.HasKey(e1 => e1.Id); @@ -72,15 +60,6 @@ namespace KLHZ.Trader.Core.DataLayer v => DateTime.SpecifyKind(v, DateTimeKind.Utc)); }); - modelBuilder.Entity(entity => - { - entity.HasKey(e1 => e1.Id); - entity.Property(e => e.Time) - .HasConversion( - v => v.ToUniversalTime(), - v => DateTime.SpecifyKind(v, DateTimeKind.Utc)); - }); - modelBuilder.Entity(entity => { entity.HasKey(e1 => e1.Id); @@ -90,16 +69,6 @@ namespace KLHZ.Trader.Core.DataLayer v => v.ToUniversalTime(), v => DateTime.SpecifyKind(v, DateTimeKind.Utc)); }); - - modelBuilder.Entity(entity => - { - entity.HasKey(e1 => new { e1.Figi, e1.Time }); - entity.Ignore(e1 => e1.IsHistoricalData); - entity.Property(e => e.Time) - .HasConversion( - v => v.ToUniversalTime(), - v => DateTime.SpecifyKind(v, DateTimeKind.Utc)); - }); } } } diff --git a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs index 9ea8d21..2d961c7 100644 --- a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs @@ -4,13 +4,14 @@ using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer; using KLHZ.Trader.Core.DataLayer.Entities.Orders; using KLHZ.Trader.Core.DataLayer.Entities.Prices; -using KLHZ.Trader.Core.DataLayer.Entities.Trades; using KLHZ.Trader.Core.Exchange.Extentions; +using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; using KLHZ.Trader.Core.Exchange.Models.Configs; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using System.Collections.Concurrent; using Tinkoff.InvestApi; using Tinkoff.InvestApi.V1; @@ -27,8 +28,7 @@ namespace KLHZ.Trader.Core.Exchange.Services private readonly CancellationTokenSource _cts = new(); private readonly IDataBus _eventBus; private readonly bool _exchangeDataRecievingEnabled; - private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); - + private readonly ConcurrentDictionary _usedOrderIds = new(); public ExchangeDataReader(InvestApiClient investApiClient, IDataBus eventBus, TraderDataProvider tradeDataProvider, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) @@ -59,8 +59,8 @@ namespace KLHZ.Trader.Core.Exchange.Services { if (_exchangeDataRecievingEnabled) { - var t1 = SubscribePrices(); - var t2 = SubscribeMyTrades(); + var t1 = SubscribeTrades(); + var t2 = SubscribeExchangeData(); await Task.WhenAll(t1, t2); } await Task.Delay(1000); @@ -72,36 +72,43 @@ namespace KLHZ.Trader.Core.Exchange.Services } } - private async Task SubscribeMyTrades() + private async Task SubscribeTrades() { - var req = new PortfolioStreamRequest(); + var req = new TradesStreamRequest(); foreach (var a in _tradeDataProvider.Accounts) { req.Accounts.Add(a.Key); } - using var stream = _investApiClient.OperationsStream.PortfolioStream(req); + using var stream = _investApiClient.OrdersStream.TradesStream(req); await foreach (var response in stream.ResponseStream.ReadAllAsync()) { - if (response.Portfolio?.Positions != null) + if (response.OrderTrades?.Trades != null) { - if (_semaphoreSlim.CurrentCount == 1) + if (_usedOrderIds.TryAdd(response.OrderTrades.OrderId, DateTime.UtcNow)) { - try + var deals = response.OrderTrades.Trades + .Select(t => new DealResult() + { + AccountId = response.OrderTrades.AccountId, + Figi = response.OrderTrades.Figi, + Count = t.Quantity, + Direction = response.OrderTrades.Direction == OrderDirection.Sell ? DealDirection.Sell : DealDirection.Buy, + Price = t.Price, + Success = true + }) + .ToArray(); + foreach (var d in deals) { - await _tradeDataProvider.SyncPortfolio(response.Portfolio.AccountId); + await _tradeDataProvider.LogDeal(d); } - catch (Exception ex) - { - - } - _semaphoreSlim.Release(); + await _tradeDataProvider.SyncPortfolio(response.OrderTrades.AccountId); } } } } - private async Task SubscribePrices() + private async Task SubscribeExchangeData() { using var stream = _investApiClient.MarketDataStream.MarketDataStream(); @@ -142,7 +149,6 @@ namespace KLHZ.Trader.Core.Exchange.Services var pricesBuffer = new List(); var orderbookItemsBuffer = new List(); - var tradesBuffer = new List(); var lastWrite = DateTime.UtcNow; await foreach (var response in stream.ResponseStream.ReadAllAsync()) { @@ -155,79 +161,54 @@ namespace KLHZ.Trader.Core.Exchange.Services Time = response.Trade.Time.ToDateTime().ToUniversalTime(), Value = response.Trade.Price, IsHistoricalData = false, - Direction = (int)TradeDirection.Sell, + Direction = (int)response.Trade.Direction, Count = response.Trade.Quantity, }; + await _tradeDataProvider.AddData(message, TimeSpan.FromHours(7)); await _eventBus.Broadcast(message); pricesBuffer.Add(message); - - //var trade = new KLHZ.Trader.Core.DataLayer.Entities.Trades.InstrumentTrade() - //{ - // Figi = response.Trade.Figi, - // BoughtAt = response.Trade.Time.ToDateTime().ToUniversalTime(), - // Ticker = _tradeDataProvider.GetTickerByFigi(response.Trade.Figi), - // Price = response.Trade.Price, - // Count = response.Trade.Quantity, - // Direction = response.Trade.Direction == Tinkoff.InvestApi.V1.TradeDirection.Sell ? DataLayer.Entities.Trades.Enums.TradeDirection.Sell : DataLayer.Entities.Trades.Enums.TradeDirection.Buy, - //}; - //tradesBuffer.Add(trade); } if (response.Orderbook != null) { - var asksSummary10 = new OrderbookItem() + + var asks = response.Orderbook.Asks.Take(4).Select(a => new OrderbookItem() { + Count = a.Quantity, + Price = a.Price, Figi = response.Orderbook.Figi, Ticker = _tradeDataProvider.GetTickerByFigi(response.Orderbook.Figi), - Count = response.Orderbook.Asks.Sum(a => (int)a.Quantity), - ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.AsksSummary10, + ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.Ask, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), - }; + }).ToArray(); - var asksSummary4 = new OrderbookItem() + var bids = response.Orderbook.Bids.Take(4).Select(a => new OrderbookItem() { + Count = a.Quantity, + Price = a.Price, Figi = response.Orderbook.Figi, Ticker = _tradeDataProvider.GetTickerByFigi(response.Orderbook.Figi), - Count = response.Orderbook.Asks.Take(4).Sum(a => (int)a.Quantity), - ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.AsksSummary4, + ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.Bid, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), - }; + }).ToArray(); - var bidsSummary10 = new OrderbookItem() - { - Figi = response.Orderbook.Figi, - Ticker = _tradeDataProvider.GetTickerByFigi(response.Orderbook.Figi), - Count = response.Orderbook.Bids.Sum(a => (int)a.Quantity), - ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.BidsSummary10, - Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), - }; - - var bidsSummary4 = new OrderbookItem() - { - Figi = response.Orderbook.Figi, - Ticker = _tradeDataProvider.GetTickerByFigi(response.Orderbook.Figi), - Count = response.Orderbook.Bids.Take(4).Sum(a => (int)a.Quantity), - ItemType = DataLayer.Entities.Orders.Enums.OrderbookItemType.BidsSummary4, - Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), - }; - - orderbookItemsBuffer.Add(asksSummary4); - orderbookItemsBuffer.Add(asksSummary10); - orderbookItemsBuffer.Add(bidsSummary4); - orderbookItemsBuffer.Add(bidsSummary10); + orderbookItemsBuffer.AddRange(asks); + orderbookItemsBuffer.AddRange(bids); var message = new NewOrderbookMessage() { Ticker = _tradeDataProvider.GetTickerByFigi(response.Orderbook.Figi), Figi = response.Orderbook.Figi, Time = response.Orderbook.Time.ToDateTime().ToUniversalTime(), - AsksCount = asksSummary10.Count, - BidsCount = bidsSummary10.Count, + Asks = asks, + Bids = bids, + AsksCount = asks.Length, + BidsCount = asks.Length, }; - await _eventBus.Broadcast(message); + await _tradeDataProvider.AddOrderbook(message); } - if (orderbookItemsBuffer.Count + pricesBuffer.Count + tradesBuffer.Count > 100 || (DateTime.UtcNow - lastWrite).TotalSeconds > 5) + if (orderbookItemsBuffer.Count + pricesBuffer.Count > 100 || (DateTime.UtcNow - lastWrite).TotalSeconds > 5) { try { @@ -244,11 +225,6 @@ namespace KLHZ.Trader.Core.Exchange.Services await context.PriceChanges.AddRangeAsync(pricesBuffer); pricesBuffer.Clear(); } - if (tradesBuffer.Count > 0) - { - await context.InstrumentTrades.AddRangeAsync(tradesBuffer); - tradesBuffer.Clear(); - } await context.SaveChangesAsync(); } catch (Exception ex) diff --git a/KLHZ.Trader.Core/Exchange/Services/Trader.cs b/KLHZ.Trader.Core/Exchange/Services/Trader.cs index 5e7807e..bf0061a 100644 --- a/KLHZ.Trader.Core/Exchange/Services/Trader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/Trader.cs @@ -73,7 +73,6 @@ namespace KLHZ.Trader.Core.Exchange.Services _dataBus.AddChannel(nameof(Trader), _pricesChannel); _dataBus.AddChannel(nameof(Trader), _ordersbookChannel); _ = ProcessPrices(); - _ = ProcessOrdersbooks(); } private async Task ProcessPrices() @@ -88,7 +87,6 @@ namespace KLHZ.Trader.Core.Exchange.Services try { - await _tradeDataProvider.AddData(message, TimeSpan.FromHours(7)); //await ProcessDeferredLongOpens(message, currentTime); //await ProcessDeferredLongCloses(message, currentTime); if (message.Figi == "FUTIMOEXF000") @@ -163,8 +161,8 @@ namespace KLHZ.Trader.Core.Exchange.Services && (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1)) ) { - //var fullData = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromMinutes(30)); - //if (fullData.isFullIntervalExists && fullData.prices.Last() - fullData.prices.First()>-8) + var fullData = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromMinutes(30)); + if (fullData.isFullIntervalExists && fullData.prices.Last() - fullData.prices.First() > -8) { if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) { @@ -346,14 +344,5 @@ namespace KLHZ.Trader.Core.Exchange.Services return true; } - - private async Task ProcessOrdersbooks() - { - while (await _ordersbookChannel.Reader.WaitToReadAsync()) - { - var message = await _ordersbookChannel.Reader.ReadAsync(); - await _tradeDataProvider.AddOrderbook(message); - } - } } } diff --git a/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs b/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs index c4a53f1..fd4e5d5 100644 --- a/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs +++ b/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs @@ -194,7 +194,6 @@ namespace KLHZ.Trader.Core.Exchange.Services { try { - //await _semaphoreSlim.WaitAsync(); var portfolio = await _investApiClient.Operations.GetPortfolioAsync(new PortfolioRequest() { AccountId = account.AccountId, @@ -240,24 +239,6 @@ namespace KLHZ.Trader.Core.Exchange.Services #pragma warning restore CS0612 // Тип или член устарел account.Assets.AddOrUpdate(asset.Figi, asset, (k, v) => asset); oldAssets.Remove(asset.Figi); - if (trade == null && position.InstrumentType != "currency") - { - trade = new DataLayer.Entities.Trades.Trade() - { - AccountId = account.AccountId, - Figi = position.Figi, - Ticker = position.Ticker, - ArchiveStatus = 0, - Asset = (DataLayer.Entities.Trades.Enums.AssetType)(int)GetAssetTypeByFigi(position.Figi), - BoughtAt = DateTime.UtcNow, - Count = position.Quantity, - Direction = position.Quantity > 0 ? DataLayer.Entities.Trades.Enums.TradeDirection.Buy : DataLayer.Entities.Trades.Enums.TradeDirection.Sell, - Position = position.Quantity > 0 ? DataLayer.Entities.Trades.Enums.PositionType.Long : DataLayer.Entities.Trades.Enums.PositionType.Short, - Price = price - }; - await context.Trades.AddAsync(trade); - await context.SaveChangesAsync(); - } } account.Total = portfolio.TotalAmountPortfolio; @@ -277,10 +258,6 @@ namespace KLHZ.Trader.Core.Exchange.Services { _logger.LogError(ex, "Ошибка при синхранизации портфеля счёта {accountId}", account.AccountId); } - finally - { - //_semaphoreSlim.Release(); - } } internal async Task UpdateFuturesPrice(INewPrice newPrice, decimal newPriceValue) @@ -301,6 +278,8 @@ namespace KLHZ.Trader.Core.Exchange.Services using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var priceCoeff = 1m; + var sign = dealResult.Direction == DealDirection.Sell ? -1m : 1; + var dealCount = dealResult.Count * sign; if (_instrumentsSettings.TryGetValue(dealResult.Figi, out var se)) { priceCoeff = se.PriceToRubConvertationCoefficient; @@ -314,9 +293,9 @@ namespace KLHZ.Trader.Core.Exchange.Services Figi = dealResult.Figi, Ticker = GetTickerByFigi(dealResult.Figi), BoughtAt = DateTime.UtcNow, - Count = dealResult.Count, + Count = dealCount, Price = dealResult.Price * priceCoeff, - Position = dealResult.Count > 0 ? DataLayer.Entities.Trades.Enums.PositionType.Long : DataLayer.Entities.Trades.Enums.PositionType.Short, + Position = dealCount >= 0 ? DataLayer.Entities.Trades.Enums.PositionType.Long : DataLayer.Entities.Trades.Enums.PositionType.Short, Direction = (DataLayer.Entities.Trades.Enums.TradeDirection)(int)dealResult.Direction, Asset = (DataLayer.Entities.Trades.Enums.AssetType)(int)GetAssetTypeByFigi(dealResult.Figi) }; @@ -327,12 +306,23 @@ namespace KLHZ.Trader.Core.Exchange.Services else { var oldAmount = trade.Price * trade.Count; - var newAmount = dealResult.Price * priceCoeff * dealResult.Count; + var newAmount = dealResult.Price * priceCoeff * dealCount; + + var oldCount = trade.Count; - trade.Count = trade.Count + dealResult.Count; - if (trade.Count != 0 && System.Math.Abs(oldCount) < System.Math.Abs(trade.Count))// Если суммарное количество элементов позиции сокращается - пересчитывать цену не нужно. + + trade.Count = trade.Count + dealCount; + if (trade.Count != 0)// Если суммарное количество элементов позиции сокращается - пересчитывать цену не нужно. { - trade.Price = (oldAmount + newAmount) / trade.Count; + if (trade.Count / System.Math.Abs(trade.Count) != oldCount / System.Math.Abs(oldCount))//если сменился знак общего числа активов. + { + trade.Price = dealResult.Price; + trade.Position = trade.Count < 0 ? DataLayer.Entities.Trades.Enums.PositionType.Short : DataLayer.Entities.Trades.Enums.PositionType.Long; + } + else + { + trade.Price = (oldAmount + newAmount) / trade.Count; + } } if (Accounts.TryGetValue(dealResult.AccountId, out var account)) @@ -358,7 +348,7 @@ namespace KLHZ.Trader.Core.Exchange.Services BlockedItems = asset.BlockedItems, BoughtAt = DateTime.UtcNow, BoughtPrice = trade.Price, - Count = trade.Count, + Count = sign * trade.Count, Position = trade.Count > 0 ? PositionType.Long : PositionType.Short, Type = asset.Type, TradeId = asset.TradeId, @@ -369,7 +359,6 @@ namespace KLHZ.Trader.Core.Exchange.Services } } } - await SyncPortfolio(Accounts[dealResult.AccountId]); } internal async Task LogPrice(ProcessedPrice price, bool saveImmediately) diff --git a/KLHZ.Trader.Core/TG/Services/BotMessagesHandler.cs b/KLHZ.Trader.Core/TG/Services/BotMessagesHandler.cs index 9628b3c..3a7e2b8 100644 --- a/KLHZ.Trader.Core/TG/Services/BotMessagesHandler.cs +++ b/KLHZ.Trader.Core/TG/Services/BotMessagesHandler.cs @@ -75,7 +75,7 @@ namespace KLHZ.Trader.Core.TG.Services await botClient.SendMessage(update.Message.Chat, "Покупки остановлены!"); break; } - case "продать IMOEXF": + case "скинуть IMOEXF": { var assets = await _traderDataProvider.GetAssetsByFigi("FUTIMOEXF000"); @@ -90,11 +90,25 @@ namespace KLHZ.Trader.Core.TG.Services RecomendPrice = null, Figi = asset.Figi, Count = (long)asset.Count, + EnableMargin = false, }; await _eventBus.Broadcast(command); } } + break; + } + case "продать IMOEXF": + { + var command = new TradeCommand() + { + AccountId = "2274189208", + CommandType = TradeCommandType.MarketSell, + RecomendPrice = null, + Figi = "FUTIMOEXF000", + Count = 1 + }; + await _eventBus.Broadcast(command); break; } case "купить IMOEXF": diff --git a/KLHZ.Trader.HistoryLoader/Controllers/LoaderController.cs b/KLHZ.Trader.HistoryLoader/Controllers/LoaderController.cs index 3f7d7b0..a393616 100644 --- a/KLHZ.Trader.HistoryLoader/Controllers/LoaderController.cs +++ b/KLHZ.Trader.HistoryLoader/Controllers/LoaderController.cs @@ -1,10 +1,7 @@ -using Google.Protobuf.WellKnownTypes; using KLHZ.Trader.Core.DataLayer; using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; using Tinkoff.InvestApi; -using Tinkoff.InvestApi.V1; -using Candle = KLHZ.Trader.Core.DataLayer.Entities.Prices.Candle; namespace KLHZ.Trader.HistoryLoader.Controllers { @@ -20,61 +17,5 @@ namespace KLHZ.Trader.HistoryLoader.Controllers _dbContextFactory = dbContextFactory; _investApiClient = client; } - [HttpGet] - public async Task Load(string figi) - { - using var context1 = await _dbContextFactory.CreateDbContextAsync(); - context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; - var existed = (await context1.Candles.Where(c => c.Figi == figi).Select(c => c.Time) - .ToArrayAsync()).Select(t => t.ToUniversalTime()) - .ToHashSet(); - var dt = DateTime.UtcNow; - var i = -400; - while (i < 0) - { - try - { - - var req = new GetCandlesRequest() - { - Interval = CandleInterval._5Sec, - From = Timestamp.FromDateTime(dt.AddHours(i)), - To = Timestamp.FromDateTime(dt.AddHours(i + 1)), - InstrumentId = figi, - }; - var candles = await _investApiClient.MarketData.GetCandlesAsync(req); - i++; - using var context = await _dbContextFactory.CreateDbContextAsync(); - context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; - var forAdd = new List(); - foreach (var c in candles.Candles) - { - var dt1 = c.Time.ToDateTime().ToUniversalTime(); - if (!existed.Contains(dt1)) - { - var ca = new Candle - { - Figi = figi, - Ticker = string.Empty, - Time = dt1, - Open = c.Open, - Close = c.Close, - Volume = c.Volume, - High = c.High, - Low = c.Low, - }; - forAdd.Add(ca); - existed.Add(dt1); - } - } - await context.Candles.AddRangeAsync(forAdd); - await context.SaveChangesAsync(); - } - catch (Exception ex) - { - - } - } - } } } diff --git a/KLHZ.Trader.Service/Controllers/PlayController.cs b/KLHZ.Trader.Service/Controllers/PlayController.cs index c99f958..fbe4dbc 100644 --- a/KLHZ.Trader.Service/Controllers/PlayController.cs +++ b/KLHZ.Trader.Service/Controllers/PlayController.cs @@ -1,6 +1,8 @@ using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer; +using KLHZ.Trader.Core.DataLayer.Entities.Orders; +using KLHZ.Trader.Core.DataLayer.Entities.Prices; using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; @@ -53,40 +55,138 @@ namespace KLHZ.Trader.Service.Controllers } [HttpGet] - public async Task LoadTradesToHistory() + public async Task CalcOrderbookMeanav(string figi) { try { + var t = DateTime.UtcNow.AddHours(-4); using var context1 = await _dbContextFactory.CreateDbContextAsync(); context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; - var data = await context1.OrderbookItemsReserve - //.Where(c => c.Figi == figi) - //.OrderBy(c => c.BoughtAt) + var data = await context1.OrderbookItems + .Where(i => i.Time > t && i.Figi == figi && (i.ItemType == Core.DataLayer.Entities.Orders.Enums.OrderbookItemType.BidsSummary4 || i.ItemType == Core.DataLayer.Entities.Orders.Enums.OrderbookItemType.AsksSummary4)) + .OrderBy(i => i.Time) .ToArrayAsync(); - data = data.DistinctBy(d => new { d.Figi, d.Time, d.ItemType}).ToArray(); - //await context1.PriceChanges.Where(p => p.Figi == figi).ExecuteDeleteAsync(); - var data2 = data.Select(d => new Core.DataLayer.Entities.Orders.OrderbookItem() + var bids = new LinkedList(); + var asks = new LinkedList(); + var buffer = new List(); + var dt = TimeSpan.FromMinutes(1); + var q = data.ToLookup(d => d.Time); + foreach (var d in q) { - Figi = d.Figi, - Time = d.Time, - ItemType = d.ItemType, - Ticker = d.Ticker, - Count = d.Count, - Price = d.Price, - - }).ToArray(); - int count = 0; - foreach(var mess in data2) - { - count++; - await context1.OrderbookItems.AddAsync(mess); - if (count > 10000) + var pair = d.DistinctBy(www => www.ItemType).OrderBy(www => www.ItemType).ToArray(); + if (pair.Length == 2) { - await context1.SaveChangesAsync(); - count = 0; + bids.AddLast(pair[1]); + if (bids.Last().Time - bids.First().Time > dt) + { + bids.RemoveFirst(); + } + if (pair[0].Count != 0) + { + pair[1].Price = ((decimal)pair[1].Count) / ((decimal)pair[0].Count); + buffer.Add(new OrderbookItem() + { + Figi = pair[1].Figi, + Ticker = pair[1].Ticker, + Count = 1, + ItemType = Core.DataLayer.Entities.Orders.Enums.OrderbookItemType.BidsAsksSummary4_2min, + Price = bids.Sum(b => b.Price) / bids.Count, + Time = pair[1].Time, + }); + } + if (buffer.Count > 10000) + { + await context1.OrderbookItems.AddRangeAsync(buffer); + await context1.SaveChangesAsync(); + buffer.Clear(); + } + } + else + { + } } - await context1.SaveChangesAsync(); + if (buffer.Count > 0) + { + await context1.OrderbookItems.AddRangeAsync(buffer); + await context1.SaveChangesAsync(); + buffer.Clear(); + } + } + catch (Exception ex) + { + + } + } + + [HttpGet] + public async Task CalcTradesMeanav(string figi) + { + try + { + var t = DateTime.UtcNow.AddHours(-4); + using var context1 = await _dbContextFactory.CreateDbContextAsync(); + context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + var data = await context1.PriceChanges + .Where(i => i.Time > t && i.Figi == figi) + .OrderBy(i => i.Time) + .ToArrayAsync(); + var sells = new LinkedList(); + var buys = new LinkedList(); + var buffer = new List(); + var dt = TimeSpan.FromMinutes(1); + foreach (var d in data) + { + if (d.Direction == 1) + { + if (buys.Last().Time - buys.First().Time > dt) + { + buys.RemoveFirst(); + } + buys.AddLast(d); + } + if (d.Direction == 2) + { + sells.AddLast(d); + if (sells.Last().Time - sells.First().Time > dt) + { + sells.RemoveFirst(); + } + sells.AddLast(d); + } + + if (sells.Count > 0 && buys.Count > 0) + { + var meanS = ((decimal)sells.Sum(s => s.Count)) / sells.Count; + var meanB = ((decimal)buys.Sum(s => s.Count)) / buys.Count; + if (meanS != 0) + { + buffer.Add(new ProcessedPrice() + { + Figi = d.Figi, + Processor = "tradesbalance", + Ticker = d.Ticker, + Count = 1, + Direction = 0, + Time = d.Time, + Value = meanB / meanS + }); + } + } + + if (buffer.Count > 10000) + { + await context1.ProcessedPrices.AddRangeAsync(buffer); + await context1.SaveChangesAsync(); + buffer.Clear(); + } + } + if (buffer.Count > 0) + { + await context1.ProcessedPrices.AddRangeAsync(buffer); + await context1.SaveChangesAsync(); + buffer.Clear(); + } } catch (Exception ex) { diff --git a/KLHZ.Trader.Service/appsettings.json b/KLHZ.Trader.Service/appsettings.json index 48a94c5..e179099 100644 --- a/KLHZ.Trader.Service/appsettings.json +++ b/KLHZ.Trader.Service/appsettings.json @@ -24,7 +24,7 @@ "Figi": "FUTIMOEXF000", "LongLeverage": 10.3, "ShortLeverage": 7.9, - "PriceToRubConvertationCoefficient" : 0.1 + "PriceToRubConvertationCoefficient" : 1 } ] },