diff --git a/KLHZ.Trader.Core/Common/Extentions/SemaphoreSlimExtention.cs b/KLHZ.Trader.Core/Common/Extentions/SemaphoreSlimExtention.cs new file mode 100644 index 0000000..075a5f3 --- /dev/null +++ b/KLHZ.Trader.Core/Common/Extentions/SemaphoreSlimExtention.cs @@ -0,0 +1,11 @@ +namespace KLHZ.Trader.Core.Common.Extentions +{ + internal static class SemaphoreSlimExtention + { + public static async Task WaitAsync2(this SemaphoreSlim semaphore, TimeSpan timeSpan) + { + var cts = new CancellationTokenSource(timeSpan); + await semaphore.WaitAsync(cts.Token); + } + } +} diff --git a/KLHZ.Trader.Core/DataLayer/Entities/Trades/Trade.cs b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Trade.cs index f898042..de797ca 100644 --- a/KLHZ.Trader.Core/DataLayer/Entities/Trades/Trade.cs +++ b/KLHZ.Trader.Core/DataLayer/Entities/Trades/Trade.cs @@ -45,5 +45,8 @@ namespace KLHZ.Trader.Core.DataLayer.Entities.Trades [Column("asset_type")] public AssetType Asset { get; set; } + + [Column("asset_id")] + public Guid AssetId { get; set; } } } diff --git a/KLHZ.Trader.Core/Exchange/Interfaces/IManagedAccount.cs b/KLHZ.Trader.Core/Exchange/Interfaces/IManagedAccount.cs new file mode 100644 index 0000000..f6cad7c --- /dev/null +++ b/KLHZ.Trader.Core/Exchange/Interfaces/IManagedAccount.cs @@ -0,0 +1,17 @@ +using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; +using System.Collections.Immutable; + +namespace KLHZ.Trader.Core.Exchange.Interfaces +{ + public interface IManagedAccount + { + public decimal Balance { get; } + public decimal Total { get; } + bool Initialized { get; } + string AccountId { get; } + Task Init(string accountId); + ImmutableDictionary Assets { get; } + public Task OpenPosition(string figi, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1); + public Task ClosePosition(string figi); + } +} diff --git a/KLHZ.Trader.Core/Exchange/Models/AssetsAccounting/Asset.cs b/KLHZ.Trader.Core/Exchange/Models/AssetsAccounting/Asset.cs index 19638fb..0eab692 100644 --- a/KLHZ.Trader.Core/Exchange/Models/AssetsAccounting/Asset.cs +++ b/KLHZ.Trader.Core/Exchange/Models/AssetsAccounting/Asset.cs @@ -2,6 +2,7 @@ { public class Asset : LockableExchangeObject { + public Guid AssetId { get; init; } = Guid.NewGuid(); public long? TradeId { get; init; } public decimal BlockedItems { get; init; } public AssetType Type { get; init; } diff --git a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs index 4820f53..fc4f4d8 100644 --- a/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs +++ b/KLHZ.Trader.Core/Exchange/Services/ExchangeDataReader.cs @@ -234,7 +234,6 @@ namespace KLHZ.Trader.Core.Exchange.Services } } } - public Task StopAsync(CancellationToken cancellationToken) { _cts.Cancel(); diff --git a/KLHZ.Trader.Core/Exchange/Services/ManagedAccount2.cs b/KLHZ.Trader.Core/Exchange/Services/ManagedAccount2.cs new file mode 100644 index 0000000..b369dfc --- /dev/null +++ b/KLHZ.Trader.Core/Exchange/Services/ManagedAccount2.cs @@ -0,0 +1,301 @@ +using Grpc.Core; +using KLHZ.Trader.Core.Common.Extentions; +using KLHZ.Trader.Core.DataLayer; +using KLHZ.Trader.Core.Exchange.Extentions; +using KLHZ.Trader.Core.Exchange.Interfaces; +using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; +using KLHZ.Trader.Core.Exchange.Models.Configs; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Collections.Concurrent; +using System.Collections.Immutable; +using Tinkoff.InvestApi; +using Tinkoff.InvestApi.V1; +using Asset = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Asset; + +namespace KLHZ.Trader.Core.Exchange.Services +{ + public class ManagedAccount2 : IManagedAccount + { + public string AccountId { get; private set; } = string.Empty; + public bool Initialized { get; private set; } = false; + + public decimal Balance + { + get + { + lock (_locker) + return _balance; + } + private set + { + lock (_locker) + _balance = value; + } + } + public decimal Total + { + get + { + lock (_locker) + return _total; + } + private set + { + lock (_locker) + _total = value; + } + } + + public ImmutableDictionary Assets => GetAssets(); + + + private readonly InvestApiClient _investApiClient; + private readonly IDbContextFactory _dbContextFactory; + private readonly ILogger _logger; + private readonly IOptions _options; + + + private readonly Dictionary _assets = new(); + private readonly ConcurrentDictionary _usedOrderIds = new(); + + private readonly object _locker = new(); + private decimal _balance = 0; + private decimal _total = 0; + private readonly TimeSpan _defaultLockTimeSpan = TimeSpan.FromSeconds(30); + + private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1); + private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1); + public ManagedAccount2(InvestApiClient investApiClient, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) + { + _investApiClient = investApiClient; + _dbContextFactory = dbContextFactory; + _options = options; + _logger = logger; + } + + public async Task Init(string accountId) + { + try + { + await _initSemaphore.WaitAsync2(TimeSpan.FromMilliseconds(100)); + AccountId = accountId; + _semaphore.Release(); + await LoadPortfolio(); + _ = CyclingOperations(); + Initialized = true; + } + catch (TaskCanceledException) + { + + } + catch (Exception ex) + { + _initSemaphore.Release(); + } + } + + private async Task LoadPortfolio() + { + try + { + await _semaphore.WaitAsync2(_defaultLockTimeSpan); + await LoadPortfolioNolock(); + } + catch (TaskCanceledException) { } + catch (Exception ex) + { + _logger.LogError(ex, "Ошибка при синхранизации портфеля счёта {accountId}", AccountId); + } + + _semaphore.Release(); + } + + private ImmutableDictionary GetAssets() + { + var res = ImmutableDictionary.Empty; + try + { + _semaphore.WaitAsync2(TimeSpan.FromMilliseconds(100)).Wait(); + res = _assets.ToImmutableDictionary(); + } + catch (TaskCanceledException) { } + catch (Exception ex) + { + + } + _semaphore.Release(); + return res; + } + + private async Task LoadPortfolioNolock() + { + var portfolio = await _investApiClient.Operations.GetPortfolioAsync(new PortfolioRequest() + { + AccountId = AccountId, + }); + + using var context = await _dbContextFactory.CreateDbContextAsync(); + context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + + var trades = await context.Trades + .Where(t => t.AccountId == AccountId && t.ArchiveStatus == 0) + .ToListAsync(); + + var oldAssets = _assets.ToDictionary(); + _assets.Clear(); + foreach (var position in portfolio.Positions) + { + oldAssets.TryGetValue(position.Figi, out var oldAsset); + var newAssetId = oldAsset?.AssetId ?? Guid.NewGuid(); + var trade = trades.FirstOrDefault(t => t.Figi == position.Figi && t.AssetId == newAssetId); + var asset = new Asset() + { + AssetId = newAssetId, + TradeId = trade?.Id, + AccountId = AccountId, + Figi = position.Figi, + Ticker = position.Ticker, + BoughtAt = trade?.BoughtAt ?? DateTime.UtcNow, + BoughtPrice = trade?.Price ?? position.AveragePositionPrice, + Type = position.InstrumentType.ParseInstrumentType(), + Position = position.Quantity > 0 ? PositionType.Long : PositionType.Short, + BlockedItems = position.BlockedLots, + Count = position.Quantity, + }; + _assets[asset.Figi] = asset; + } + + Total = portfolio.TotalAmountPortfolio; + Balance = portfolio.TotalAmountCurrencies; + } + + public async Task OpenPosition(string figi, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1) + { + try + { + await _semaphore.WaitAsync2(_defaultLockTimeSpan); + if (!_assets.ContainsKey(figi) && _options.Value.TradingInstrumentsFigis.Contains(figi)) + { + var openingDirection = positionType == PositionType.Short ? OrderDirection.Sell : OrderDirection.Buy; + var stopOrdersDirection = positionType == PositionType.Short ? StopOrderDirection.Buy : StopOrderDirection.Sell; + var req = new PostOrderRequest() + { + AccountId = AccountId, + InstrumentId = figi, + Direction = openingDirection, + OrderType = OrderType.Market, + Quantity = count, + ConfirmMarginTrade = true, + }; + + var res = await _investApiClient.Orders.PostOrderAsync(req); + + _usedOrderIds.TryAdd(res.OrderId, DateTime.UtcNow); + var executedPrice = res.ExecutedOrderPrice / 10; + var slReq = new PostStopOrderRequest() + { + AccountId = AccountId, + ConfirmMarginTrade = false, + InstrumentId = figi, + Direction = stopOrdersDirection, + PriceType = PriceType.Point, + Quantity = count, + StopOrderType = StopOrderType.StopLoss, + StopPrice = positionType == PositionType.Long ? executedPrice - stopLossShift : executedPrice + stopLossShift, + ExchangeOrderType = ExchangeOrderType.Market, + ExpirationType = StopOrderExpirationType.GoodTillCancel, + }; + var slOrderRes = await _investApiClient.StopOrders.PostStopOrderAsync(slReq); + + var tpReq = new PostStopOrderRequest() + { + AccountId = AccountId, + ConfirmMarginTrade = false, + InstrumentId = figi, + Direction = stopOrdersDirection, + PriceType = PriceType.Point, + Quantity = count, + StopOrderType = StopOrderType.TakeProfit, + StopPrice = positionType == PositionType.Long ? executedPrice + takeProfitShift : executedPrice - takeProfitShift, + ExchangeOrderType = ExchangeOrderType.Market, + ExpirationType = StopOrderExpirationType.GoodTillCancel, + }; + var tpOrderRes = await _investApiClient.StopOrders.PostStopOrderAsync(tpReq); + await LoadPortfolioNolock(); + } + } + catch (TaskCanceledException) { } + catch (Exception ex) + { + _logger.LogError(ex, "Ошибка при открытии позиции."); + } + + _semaphore.Release(); + } + + public async Task ClosePosition(string figi) + { + try + { + await _semaphore.WaitAsync2(_defaultLockTimeSpan); + if (_assets.TryGetValue(figi, out var asset)) + { + var closingDirection = asset.Count > 0 ? OrderDirection.Sell : OrderDirection.Buy; + var req = new PostOrderRequest() + { + AccountId = AccountId, + InstrumentId = figi, + Direction = closingDirection, + OrderType = OrderType.Market, + Quantity = (long)System.Math.Abs(asset.Count), + ConfirmMarginTrade = true, + }; + + var res = await _investApiClient.Orders.PostOrderAsync(req); + await LoadPortfolioNolock(); + } + } + catch (TaskCanceledException) { } + catch (Exception ex) + { + _logger.LogError(ex, "Ошибка при закрытии позиции."); + } + + _semaphore.Release(); + } + + private async Task SubscribeTrades() + { + var req = new TradesStreamRequest(); + req.Accounts.Add(AccountId); + + using var stream = _investApiClient.OrdersStream.TradesStream(req); + await foreach (var response in stream.ResponseStream.ReadAllAsync()) + { + if (response.OrderTrades?.Trades != null) + { + if (_usedOrderIds.TryAdd(response.OrderTrades.OrderId, DateTime.UtcNow)) + { + await LoadPortfolio(); + } + } + } + } + + private async Task CyclingOperations() + { + Task? tradesLoadingTas = null; + while (true) + { + if (tradesLoadingTas == null || tradesLoadingTas.Status != TaskStatus.Running) + { + tradesLoadingTas = SubscribeTrades(); + } + await Task.Delay(10000); + await LoadPortfolio(); + } + } + } +} diff --git a/KLHZ.Trader.Core/Exchange/Services/PortfolioDataProvider.cs b/KLHZ.Trader.Core/Exchange/Services/PortfolioDataProvider.cs new file mode 100644 index 0000000..d372dda --- /dev/null +++ b/KLHZ.Trader.Core/Exchange/Services/PortfolioDataProvider.cs @@ -0,0 +1,32 @@ +using KLHZ.Trader.Core.Exchange.Interfaces; +using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace KLHZ.Trader.Core.Exchange.Services +{ + public class PortfolioDataProvider : IHostedService + { + private readonly IServiceProvider _services; + public PortfolioDataProvider(IServiceProvider services) + { + _services = services; + + + + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + var acc = _services.GetKeyedService(1); + await acc.Init("2274189208"); + await acc.OpenPosition("FUTIMOEXF000", PositionType.Short, 2.5m, 4m); + await acc.ClosePosition("FUTIMOEXF000"); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} diff --git a/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs b/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs index 846e626..8e67afb 100644 --- a/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs +++ b/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs @@ -32,7 +32,7 @@ namespace KLHZ.Trader.Core.Exchange.Services private readonly InvestApiClient _investApiClient; private readonly IDbContextFactory _dbContextFactory; - private readonly ILogger _logger; + private readonly ILogger _logger; private readonly string[] _managedAccountsNamePatterns = []; private readonly string[] _instrumentsFigis = []; @@ -46,7 +46,7 @@ namespace KLHZ.Trader.Core.Exchange.Services private readonly SemaphoreSlim _syncSemaphore = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1); - public TraderDataProvider(InvestApiClient investApiClient, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) + public TraderDataProvider(InvestApiClient investApiClient, IOptions options, IDbContextFactory dbContextFactory, ILogger logger) { _investApiClient = investApiClient; _dbContextFactory = dbContextFactory; @@ -256,7 +256,7 @@ namespace KLHZ.Trader.Core.Exchange.Services return _assetTypesCache.TryGetValue(figi, out var t) ? t : AssetType.Unknown; } - internal async Task SyncPortfolio(string accountId) + private async Task SyncPortfolio(string accountId) { if (Accounts.TryGetValue(accountId, out var account)) { @@ -264,7 +264,7 @@ namespace KLHZ.Trader.Core.Exchange.Services } } - internal async Task SyncPortfolio(ManagedAccount account) + private async Task SyncPortfolio(ManagedAccount account) { try { diff --git a/KLHZ.Trader.Service/Program.cs b/KLHZ.Trader.Service/Program.cs index 11e938d..36a4b37 100644 --- a/KLHZ.Trader.Service/Program.cs +++ b/KLHZ.Trader.Service/Program.cs @@ -1,7 +1,7 @@ using KLHZ.Trader.Core.Common.Messaging.Services; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer; -using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting; +using KLHZ.Trader.Core.Exchange.Interfaces; using KLHZ.Trader.Core.Exchange.Models.Configs; using KLHZ.Trader.Core.Exchange.Services; using KLHZ.Trader.Core.TG; @@ -46,7 +46,9 @@ builder.Services.AddHostedService(); builder.Services.AddHostedService(); builder.Services.AddHostedService(); builder.Services.AddHostedService(); -//builder.Services.AddHostedService(); + + +builder.Services.AddHostedService(); //builder.Services.AddHostedService(); @@ -57,7 +59,7 @@ builder.Services.AddSingleton(); for (int i = 0; i < 10; i++) { - builder.Services.AddKeyedSingleton(i); + builder.Services.AddKeyedSingleton(i); } builder.Services.Configure(builder.Configuration.GetSection(nameof(TgBotConfig)));