diff --git a/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs b/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs index 8c72095..ed53e7c 100644 --- a/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs +++ b/KLHZ.Trader.Core.Contracts/Messaging/Interfaces/IDataBus.cs @@ -9,17 +9,7 @@ namespace KLHZ.Trader.Core.Contracts.Messaging.Interfaces public bool AddChannel(string key, Channel channel); public bool AddChannel(string key, Channel channel); public Task Broadcast(INewPrice newPriceMessage); - - - - - - public bool AddChannel(string key, Channel channel); - public bool AddChannel(string key, Channel channel); - public bool AddChannel(string key, Channel channel); - - public Task Broadcast(ITradeCommand command); - public Task Broadcast(IProcessedPrice command); public Task Broadcast(IOrderbook orderbook); + public Task Broadcast(IMessage message); } } diff --git a/KLHZ.Trader.Core/Common/Messaging/Services/DataBus.cs b/KLHZ.Trader.Core/Common/Messaging/Services/DataBus.cs index cdfc995..63bf7c2 100644 --- a/KLHZ.Trader.Core/Common/Messaging/Services/DataBus.cs +++ b/KLHZ.Trader.Core/Common/Messaging/Services/DataBus.cs @@ -9,15 +9,7 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services { private readonly ConcurrentDictionary> _orderbooksChannels = new(); private readonly ConcurrentDictionary> _messagesChannels = new(); - private readonly ConcurrentDictionary> _candlesChannels = new(); private readonly ConcurrentDictionary> _priceChannels = new(); - private readonly ConcurrentDictionary> _processedPricesChannels = new(); - private readonly ConcurrentDictionary> _commandChannels = new(); - - public bool AddChannel(string key, Channel channel) - { - return _processedPricesChannels.TryAdd(key, channel); - } public bool AddChannel(string key, Channel channel) { @@ -29,16 +21,6 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services return _priceChannels.TryAdd(key, channel); } - public bool AddChannel(string key, Channel channel) - { - return _candlesChannels.TryAdd(key, channel); - } - - public bool AddChannel(string key, Channel channel) - { - return _commandChannels.TryAdd(key, channel); - } - public bool AddChannel(string key, Channel channel) { return _orderbooksChannels.TryAdd(key, channel); @@ -52,30 +34,6 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services } } - public async Task Broadcast(IProcessedPrice mess) - { - foreach (var channel in _processedPricesChannels.Values) - { - await channel.Writer.WriteAsync(mess); - } - } - - public async Task Broadcast(INewCandle newPriceMessage) - { - foreach (var channel in _candlesChannels.Values) - { - await channel.Writer.WriteAsync(newPriceMessage); - } - } - - public async Task Broadcast(ITradeCommand command) - { - foreach (var channel in _commandChannels.Values) - { - await channel.Writer.WriteAsync(command); - } - } - public async Task Broadcast(IOrderbook orderbook) { foreach (var channel in _orderbooksChannels.Values) @@ -83,5 +41,13 @@ namespace KLHZ.Trader.Core.Common.Messaging.Services await channel.Writer.WriteAsync(orderbook); } } + + public async Task Broadcast(IMessage message) + { + foreach (var channel in _messagesChannels.Values) + { + await channel.Writer.WriteAsync(message); + } + } } } diff --git a/KLHZ.Trader.Core/Exchange/Extentions/InvestApiClientExtentions.cs b/KLHZ.Trader.Core/Exchange/Extentions/InvestApiClientExtentions.cs index 7ea622a..8eb2997 100644 --- a/KLHZ.Trader.Core/Exchange/Extentions/InvestApiClientExtentions.cs +++ b/KLHZ.Trader.Core/Exchange/Extentions/InvestApiClientExtentions.cs @@ -5,21 +5,20 @@ namespace KLHZ.Trader.Core.Exchange.Extentions { internal static class InvestApiClientExtentions { - public static async Task GetAccounts(this InvestApiClient client, params string[] managedAccountNamePatterns) + public static async Task> GetAccounts(this InvestApiClient client, params string[] managedAccountNamePatterns) { var accounts = await client.Users.GetAccountsAsync(); - var accsIds = new HashSet(); + var accs = new Dictionary(); foreach (var pattern in managedAccountNamePatterns) { var aids = accounts.Accounts - .Where(a => a.Name.ToLower().Contains(pattern) && a.AccessLevel == AccessLevel.AccountAccessLevelFullAccess) - .Select(a => a.Id); + .Where(a => a.Name.ToLower().Contains(pattern) && a.AccessLevel == AccessLevel.AccountAccessLevelFullAccess); foreach (var a in aids) { - accsIds.Add(a); + accs.Add(a.Id, a.Name); } } - return accsIds.ToArray(); + return accs; } } } diff --git a/KLHZ.Trader.Core/Exchange/Interfaces/IManagedAccount.cs b/KLHZ.Trader.Core/Exchange/Interfaces/IManagedAccount.cs index 414fa3f..526efbf 100644 --- a/KLHZ.Trader.Core/Exchange/Interfaces/IManagedAccount.cs +++ b/KLHZ.Trader.Core/Exchange/Interfaces/IManagedAccount.cs @@ -9,10 +9,12 @@ namespace KLHZ.Trader.Core.Exchange.Interfaces public decimal Total { get; } bool Initialized { get; } string AccountId { get; } - Task Init(string accountId); + string? AccountName{ get; } + Task Init(string accountId, string? accountName = null); Task LoadPortfolio(); ImmutableDictionary Assets { get; } public Task OpenPosition(string figi, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1); public Task ClosePosition(string figi); + public Task ResetStops(string figi, decimal stopLossShift, decimal takeProfitShift); } } diff --git a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs index f6997f7..3f52bbd 100644 --- a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs @@ -52,9 +52,9 @@ namespace KLHZ.Trader.Core.Exchange.Services var accounts = await _investApiClient.GetAccounts(_managedAccountNamePatterns); foreach (var acc in accounts) { - await _portfolioWrapper.AddAccount(acc); + await _portfolioWrapper.AddAccount(acc.Key,acc.Value); } - _ = CycleSubscribtion(accounts); + _ = CycleSubscribtion(accounts.Keys.ToArray()); } private async Task CycleSubscribtion(string[] accounts) diff --git a/KLHZ.Trader.Core/Exchange/Services/ManagedAccount.cs b/KLHZ.Trader.Core/Exchange/Services/ManagedAccount.cs index f011ef2..dd89097 100644 --- a/KLHZ.Trader.Core/Exchange/Services/ManagedAccount.cs +++ b/KLHZ.Trader.Core/Exchange/Services/ManagedAccount.cs @@ -18,6 +18,7 @@ namespace KLHZ.Trader.Core.Exchange.Services public class ManagedAccount : IManagedAccount { public string AccountId { get; private set; } = string.Empty; + public string AccountName { get; private set; } = string.Empty; public bool Initialized { get; private set; } = false; public decimal Balance @@ -74,11 +75,12 @@ namespace KLHZ.Trader.Core.Exchange.Services _logger = logger; } - public async Task Init(string accountId) + public async Task Init(string accountId, string? accountName = null) { try { await _initSemaphore.WaitAsync2(TimeSpan.FromMilliseconds(100)); + AccountName = accountName ?? AccountId; AccountId = accountId; _semaphore.Release(); await LoadPortfolio(); @@ -234,6 +236,74 @@ namespace KLHZ.Trader.Core.Exchange.Services _semaphore.Release(); } + public async Task ResetStops(string figi, decimal stopLossShift, decimal takeProfitShift) + { + try + { + await _semaphore.WaitAsync2(_defaultLockTimeSpan); + if (_assets.TryGetValue(figi, out var asset) && _options.Value.TradingInstrumentsFigis.Contains(figi)) + { + var stopsReq = new GetStopOrdersRequest() { AccountId = asset.AccountId }; + var stopOrders = await _investApiClient.StopOrders.GetStopOrdersAsync(stopsReq); + if (stopOrders.StopOrders != null) + { + foreach (var stopOrder in stopOrders.StopOrders) + { + try + { + await _investApiClient.StopOrders.CancelStopOrderAsync(new CancelStopOrderRequest() { AccountId = asset.AccountId, StopOrderId = stopOrder.StopOrderId }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Ошибка при закрытии стопов для позиции."); + } + } + } + + var stopOrdersDirection = asset.Count < 0 ? StopOrderDirection.Buy : StopOrderDirection.Sell; + + var executedPrice = asset.BoughtPrice; + var slReq = new PostStopOrderRequest() + { + AccountId = AccountId, + ConfirmMarginTrade = false, + InstrumentId = figi, + Direction = stopOrdersDirection, + PriceType = PriceType.Point, + Quantity = (long)asset.Count, + StopOrderType = StopOrderType.StopLoss, + StopPrice = asset.Count > 0 ? executedPrice - stopLossShift : executedPrice + stopLossShift, + ExchangeOrderType = ExchangeOrderType.Market, + ExpirationType = StopOrderExpirationType.GoodTillCancel, + }; + var slOrderRes = await _investApiClient.StopOrders.PostStopOrderAsync(slReq); + + var tpReq = new PostStopOrderRequest() + { + AccountId = AccountId, + ConfirmMarginTrade = false, + InstrumentId = figi, + Direction = stopOrdersDirection, + PriceType = PriceType.Point, + Quantity = (long)asset.Count, + StopOrderType = StopOrderType.TakeProfit, + StopPrice = asset.Count > 0 ? executedPrice + takeProfitShift : executedPrice - takeProfitShift, + ExchangeOrderType = ExchangeOrderType.Market, + ExpirationType = StopOrderExpirationType.GoodTillCancel, + }; + var tpOrderRes = await _investApiClient.StopOrders.PostStopOrderAsync(tpReq); + await LoadPortfolioNolock(); + } + } + catch (TaskCanceledException) { } + catch (Exception ex) + { + _logger.LogError(ex, "Ошибка при переопределении стопов."); + } + + _semaphore.Release(); + } + public async Task ClosePosition(string figi) { try diff --git a/KLHZ.Trader.Core/Exchange/Services/PortfolioWrapper.cs b/KLHZ.Trader.Core/Exchange/Services/PortfolioWrapper.cs index 395caed..e1995e2 100644 --- a/KLHZ.Trader.Core/Exchange/Services/PortfolioWrapper.cs +++ b/KLHZ.Trader.Core/Exchange/Services/PortfolioWrapper.cs @@ -14,7 +14,7 @@ namespace KLHZ.Trader.Core.Exchange.Services _services = services; } - public async Task AddAccount(string accountId) + public async Task AddAccount(string accountId, string? accountName = null) { for (int i = 0; i < 10; i++) { diff --git a/KLHZ.Trader.Core/Exchange/Services/Trader.cs b/KLHZ.Trader.Core/Exchange/Services/Trader.cs index ca55ee6..8672b3a 100644 --- a/KLHZ.Trader.Core/Exchange/Services/Trader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/Trader.cs @@ -1,5 +1,6 @@ using KLHZ.Trader.Core.Common; using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums; +using KLHZ.Trader.Core.Contracts.Messaging.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer.Entities.Declisions; @@ -100,7 +101,7 @@ namespace KLHZ.Trader.Core.Exchange.Services return data2; } - private async ValueTask CheckPosition((DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists) data, INewPrice message) + private async ValueTask CheckHarmonicPosition((DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists) data, INewPrice message) { var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow; var position = ValueAmplitudePosition.None; @@ -279,37 +280,68 @@ namespace KLHZ.Trader.Core.Exchange.Services } await LogDeclision(DeclisionTradeAction.CloseShort, message); } - } + if (newMod == TradingMode.Growing && newMod != oldMod) + { + var stops = GetStops(message, PositionType.Long); + if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) + { + var accounts = _portfolioWrapper.Accounts + .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) + .Take(1) + .Select(a => a.Value) + .ToArray(); + await OpenPositions(accounts, message, PositionType.Long, stops.stopLoss, stops.takeProfit, 1); + } + await LogDeclision(DeclisionTradeAction.OpenLong, message); + await LogDeclision(DeclisionTradeAction.OpenLong, message.Value + stops.takeProfit, message.Time.AddMilliseconds(-100), message); + await LogDeclision(DeclisionTradeAction.OpenLong, message.Value - stops.stopLoss, message.Time.AddMilliseconds(100), message); + } + if (newMod == TradingMode.Dropping && newMod != oldMod) + { + var stops = GetStops(message, PositionType.Short); + if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) + { + var accounts = _portfolioWrapper.Accounts + .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) + .Take(1) + .Select(a => a.Value) + .ToArray(); + await OpenPositions(accounts, message, PositionType.Short, stops.stopLoss, stops.takeProfit, 1); + } + await LogDeclision(DeclisionTradeAction.OpenShort, message); + await LogDeclision(DeclisionTradeAction.OpenShort, message.Value - stops.takeProfit, message.Time.AddMilliseconds(-100), message); + await LogDeclision(DeclisionTradeAction.OpenShort, message.Value + stops.stopLoss, message.Time.AddMilliseconds(100), message); + } + TradingModes[message.Figi] = newMod; + if (oldMod != newMod) + { + var accountForStopsChanging = _portfolioWrapper.Accounts + .Where(a => a.Value.Assets.ContainsKey(message.Figi)) + .ToArray(); - if (newMod == TradingMode.Growing && newMod != oldMod) - { - var stops = GetStops(message, PositionType.Long); - if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase()) - { - var accounts = _portfolioWrapper.Accounts - .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) - .Take(1) - .Select(a => a.Value) - .ToArray(); - await OpenPositions(accounts, message, PositionType.Long, stops.stopLoss, stops.takeProfit, 1); + foreach (var account in accountForStopsChanging) + { + if (account.Value.Assets.TryGetValue(message.Figi, out var asset)) + { + var stops = GetStops(message, asset.Count > 0 ? PositionType.Long : PositionType.Short); + if (!message.IsHistoricalData) + { + //await account.Value.ResetStops(message.Figi, stops.stopLoss, stops.takeProfit); + //if (asset.Count < 0) + //{ + // await LogDeclision(DeclisionTradeAction.OpenShort, asset.BoughtPrice - stops.takeProfit, message.Time.AddMilliseconds(-100), message); + // await LogDeclision(DeclisionTradeAction.OpenShort, asset.BoughtPrice + stops.stopLoss, message.Time.AddMilliseconds(100), message); + //} + //else + //{ + // await LogDeclision(DeclisionTradeAction.OpenLong, asset.BoughtPrice + stops.takeProfit, message.Time.AddMilliseconds(-100), message); + // await LogDeclision(DeclisionTradeAction.OpenLong, asset.BoughtPrice - stops.stopLoss, message.Time.AddMilliseconds(100), message); + //} + } + } + } } - await LogDeclision(DeclisionTradeAction.OpenLong, message); } - if (newMod == TradingMode.Dropping && newMod != oldMod) - { - var stops = GetStops(message, PositionType.Short); - if (!message.IsHistoricalData && BotModeSwitcher.CanSell()) - { - var accounts = _portfolioWrapper.Accounts - .Where(a => !a.Value.Assets.ContainsKey(message.Figi)) - .Take(1) - .Select(a => a.Value) - .ToArray(); - await OpenPositions(accounts, message, PositionType.Short, stops.stopLoss, stops.takeProfit, 1); - } - await LogDeclision(DeclisionTradeAction.OpenShort, message); - } - TradingModes[message.Figi] = newMod; } } else @@ -394,7 +426,7 @@ INewPrice message, int windowMaxSize, decimal uptrendStartingDetectionMeanfullSt private async Task CheckPosition(INewPrice message) { var data2 = await GetData(message); - var position = await CheckPosition(data2, message); + var position = await CheckHarmonicPosition(data2, message); return position; } @@ -418,6 +450,10 @@ INewPrice message, int windowMaxSize, decimal uptrendStartingDetectionMeanfullSt if (profit > 0) { assetsForClose.Add(asset); + await _dataBus.Broadcast(new MessageForAdmin() + { + Text = $"Закрываю позицию {asset.Figi} ({(asset.Count > 0 ? "лонг" : "шорт")}) на счёте {_portfolioWrapper.Accounts[asset.AccountId].AccountName}. Количество {(long)asset.Count}, цена ~{message.Value}, профит {profit}" + }); if (loggedDeclisions == 0) { loggedDeclisions++; @@ -430,6 +466,7 @@ INewPrice message, int windowMaxSize, decimal uptrendStartingDetectionMeanfullSt assetsForClose.Add(asset); } } + var tasks = assetsForClose.Select(asset => _portfolioWrapper.Accounts[asset.AccountId].ClosePosition(message.Figi)); await Task.WhenAll(tasks); } @@ -437,11 +474,19 @@ INewPrice message, int windowMaxSize, decimal uptrendStartingDetectionMeanfullSt private async Task OpenPositions(IManagedAccount[] accounts, INewPrice message, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1) { var loggedDeclisions = 0; + var sign = positionType == PositionType.Long ? 1 : 1; foreach (var acc in accounts) { if (IsOperationAllowed(acc, message.Value, 1, _accountCashPartFutures, _accountCashPart)) { await acc.OpenPosition(message.Figi, positionType, stopLossShift, takeProfitShift, count); + await _dataBus.Broadcast(new MessageForAdmin() + { + Text = $"Открываю позицию {message.Figi} ({(positionType == PositionType.Long ? "лонг" : "шорт")}) " + + $"на счёте {acc.AccountName}. Количество {(positionType == PositionType.Long ? "" : "-")}{count}, " + + $"цена ~{message.Value}. Стоп лосс: {(positionType == PositionType.Long ? "-" : "+")}{stopLossShift}. " + + $"Тейк профит: {(positionType == PositionType.Long ? "+" : "-")}{takeProfitShift}" + }); } if (loggedDeclisions == 0) diff --git a/KLHZ.Trader.Service/Program.cs b/KLHZ.Trader.Service/Program.cs index 5279144..119fcae 100644 --- a/KLHZ.Trader.Service/Program.cs +++ b/KLHZ.Trader.Service/Program.cs @@ -27,7 +27,7 @@ builder.Host.ConfigureSerilog( builder.Configuration.GetSection("LokiUrl").Value, serviceName: "klhz.trader", excludeEFLogs: false, - minLevel: Serilog.Events.LogEventLevel.Warning, + minLevel: Serilog.Events.LogEventLevel.Information, excludeMetricsScrapingLogs: true, EFMinLogLevel: Serilog.Events.LogEventLevel.Warning );