фиксация рабочего нового счёта

dev
vlad zverzhkhovskiy 2025-09-19 14:27:16 +03:00
parent 8842080089
commit b6d7cd88c3
9 changed files with 374 additions and 8 deletions

View File

@ -0,0 +1,11 @@
namespace KLHZ.Trader.Core.Common.Extentions
{
internal static class SemaphoreSlimExtention
{
public static async Task WaitAsync2(this SemaphoreSlim semaphore, TimeSpan timeSpan)
{
var cts = new CancellationTokenSource(timeSpan);
await semaphore.WaitAsync(cts.Token);
}
}
}

View File

@ -45,5 +45,8 @@ namespace KLHZ.Trader.Core.DataLayer.Entities.Trades
[Column("asset_type")]
public AssetType Asset { get; set; }
[Column("asset_id")]
public Guid AssetId { get; set; }
}
}

View File

@ -0,0 +1,17 @@
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using System.Collections.Immutable;
namespace KLHZ.Trader.Core.Exchange.Interfaces
{
public interface IManagedAccount
{
public decimal Balance { get; }
public decimal Total { get; }
bool Initialized { get; }
string AccountId { get; }
Task Init(string accountId);
ImmutableDictionary<string, Asset> Assets { get; }
public Task OpenPosition(string figi, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1);
public Task ClosePosition(string figi);
}
}

View File

@ -2,6 +2,7 @@
{
public class Asset : LockableExchangeObject
{
public Guid AssetId { get; init; } = Guid.NewGuid();
public long? TradeId { get; init; }
public decimal BlockedItems { get; init; }
public AssetType Type { get; init; }

View File

@ -234,7 +234,6 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();

View File

@ -0,0 +1,301 @@
using Grpc.Core;
using KLHZ.Trader.Core.Common.Extentions;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.Exchange.Extentions;
using KLHZ.Trader.Core.Exchange.Interfaces;
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using KLHZ.Trader.Core.Exchange.Models.Configs;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using Tinkoff.InvestApi;
using Tinkoff.InvestApi.V1;
using Asset = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Asset;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class ManagedAccount2 : IManagedAccount
{
public string AccountId { get; private set; } = string.Empty;
public bool Initialized { get; private set; } = false;
public decimal Balance
{
get
{
lock (_locker)
return _balance;
}
private set
{
lock (_locker)
_balance = value;
}
}
public decimal Total
{
get
{
lock (_locker)
return _total;
}
private set
{
lock (_locker)
_total = value;
}
}
public ImmutableDictionary<string, Asset> Assets => GetAssets();
private readonly InvestApiClient _investApiClient;
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
private readonly ILogger<TraderDataProvider> _logger;
private readonly IOptions<ExchangeConfig> _options;
private readonly Dictionary<string, Asset> _assets = new();
private readonly ConcurrentDictionary<string, DateTime> _usedOrderIds = new();
private readonly object _locker = new();
private decimal _balance = 0;
private decimal _total = 0;
private readonly TimeSpan _defaultLockTimeSpan = TimeSpan.FromSeconds(30);
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0, 1);
private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1);
public ManagedAccount2(InvestApiClient investApiClient, IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory, ILogger<TraderDataProvider> logger)
{
_investApiClient = investApiClient;
_dbContextFactory = dbContextFactory;
_options = options;
_logger = logger;
}
public async Task Init(string accountId)
{
try
{
await _initSemaphore.WaitAsync2(TimeSpan.FromMilliseconds(100));
AccountId = accountId;
_semaphore.Release();
await LoadPortfolio();
_ = CyclingOperations();
Initialized = true;
}
catch (TaskCanceledException)
{
}
catch (Exception ex)
{
_initSemaphore.Release();
}
}
private async Task LoadPortfolio()
{
try
{
await _semaphore.WaitAsync2(_defaultLockTimeSpan);
await LoadPortfolioNolock();
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при синхранизации портфеля счёта {accountId}", AccountId);
}
_semaphore.Release();
}
private ImmutableDictionary<string, Asset> GetAssets()
{
var res = ImmutableDictionary<string, Asset>.Empty;
try
{
_semaphore.WaitAsync2(TimeSpan.FromMilliseconds(100)).Wait();
res = _assets.ToImmutableDictionary();
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
}
_semaphore.Release();
return res;
}
private async Task LoadPortfolioNolock()
{
var portfolio = await _investApiClient.Operations.GetPortfolioAsync(new PortfolioRequest()
{
AccountId = AccountId,
});
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
var trades = await context.Trades
.Where(t => t.AccountId == AccountId && t.ArchiveStatus == 0)
.ToListAsync();
var oldAssets = _assets.ToDictionary();
_assets.Clear();
foreach (var position in portfolio.Positions)
{
oldAssets.TryGetValue(position.Figi, out var oldAsset);
var newAssetId = oldAsset?.AssetId ?? Guid.NewGuid();
var trade = trades.FirstOrDefault(t => t.Figi == position.Figi && t.AssetId == newAssetId);
var asset = new Asset()
{
AssetId = newAssetId,
TradeId = trade?.Id,
AccountId = AccountId,
Figi = position.Figi,
Ticker = position.Ticker,
BoughtAt = trade?.BoughtAt ?? DateTime.UtcNow,
BoughtPrice = trade?.Price ?? position.AveragePositionPrice,
Type = position.InstrumentType.ParseInstrumentType(),
Position = position.Quantity > 0 ? PositionType.Long : PositionType.Short,
BlockedItems = position.BlockedLots,
Count = position.Quantity,
};
_assets[asset.Figi] = asset;
}
Total = portfolio.TotalAmountPortfolio;
Balance = portfolio.TotalAmountCurrencies;
}
public async Task OpenPosition(string figi, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1)
{
try
{
await _semaphore.WaitAsync2(_defaultLockTimeSpan);
if (!_assets.ContainsKey(figi) && _options.Value.TradingInstrumentsFigis.Contains(figi))
{
var openingDirection = positionType == PositionType.Short ? OrderDirection.Sell : OrderDirection.Buy;
var stopOrdersDirection = positionType == PositionType.Short ? StopOrderDirection.Buy : StopOrderDirection.Sell;
var req = new PostOrderRequest()
{
AccountId = AccountId,
InstrumentId = figi,
Direction = openingDirection,
OrderType = OrderType.Market,
Quantity = count,
ConfirmMarginTrade = true,
};
var res = await _investApiClient.Orders.PostOrderAsync(req);
_usedOrderIds.TryAdd(res.OrderId, DateTime.UtcNow);
var executedPrice = res.ExecutedOrderPrice / 10;
var slReq = new PostStopOrderRequest()
{
AccountId = AccountId,
ConfirmMarginTrade = false,
InstrumentId = figi,
Direction = stopOrdersDirection,
PriceType = PriceType.Point,
Quantity = count,
StopOrderType = StopOrderType.StopLoss,
StopPrice = positionType == PositionType.Long ? executedPrice - stopLossShift : executedPrice + stopLossShift,
ExchangeOrderType = ExchangeOrderType.Market,
ExpirationType = StopOrderExpirationType.GoodTillCancel,
};
var slOrderRes = await _investApiClient.StopOrders.PostStopOrderAsync(slReq);
var tpReq = new PostStopOrderRequest()
{
AccountId = AccountId,
ConfirmMarginTrade = false,
InstrumentId = figi,
Direction = stopOrdersDirection,
PriceType = PriceType.Point,
Quantity = count,
StopOrderType = StopOrderType.TakeProfit,
StopPrice = positionType == PositionType.Long ? executedPrice + takeProfitShift : executedPrice - takeProfitShift,
ExchangeOrderType = ExchangeOrderType.Market,
ExpirationType = StopOrderExpirationType.GoodTillCancel,
};
var tpOrderRes = await _investApiClient.StopOrders.PostStopOrderAsync(tpReq);
await LoadPortfolioNolock();
}
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при открытии позиции.");
}
_semaphore.Release();
}
public async Task ClosePosition(string figi)
{
try
{
await _semaphore.WaitAsync2(_defaultLockTimeSpan);
if (_assets.TryGetValue(figi, out var asset))
{
var closingDirection = asset.Count > 0 ? OrderDirection.Sell : OrderDirection.Buy;
var req = new PostOrderRequest()
{
AccountId = AccountId,
InstrumentId = figi,
Direction = closingDirection,
OrderType = OrderType.Market,
Quantity = (long)System.Math.Abs(asset.Count),
ConfirmMarginTrade = true,
};
var res = await _investApiClient.Orders.PostOrderAsync(req);
await LoadPortfolioNolock();
}
}
catch (TaskCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при закрытии позиции.");
}
_semaphore.Release();
}
private async Task SubscribeTrades()
{
var req = new TradesStreamRequest();
req.Accounts.Add(AccountId);
using var stream = _investApiClient.OrdersStream.TradesStream(req);
await foreach (var response in stream.ResponseStream.ReadAllAsync())
{
if (response.OrderTrades?.Trades != null)
{
if (_usedOrderIds.TryAdd(response.OrderTrades.OrderId, DateTime.UtcNow))
{
await LoadPortfolio();
}
}
}
}
private async Task CyclingOperations()
{
Task? tradesLoadingTas = null;
while (true)
{
if (tradesLoadingTas == null || tradesLoadingTas.Status != TaskStatus.Running)
{
tradesLoadingTas = SubscribeTrades();
}
await Task.Delay(10000);
await LoadPortfolio();
}
}
}
}

View File

@ -0,0 +1,32 @@
using KLHZ.Trader.Core.Exchange.Interfaces;
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class PortfolioDataProvider : IHostedService
{
private readonly IServiceProvider _services;
public PortfolioDataProvider(IServiceProvider services)
{
_services = services;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
var acc = _services.GetKeyedService<IManagedAccount>(1);
await acc.Init("2274189208");
await acc.OpenPosition("FUTIMOEXF000", PositionType.Short, 2.5m, 4m);
await acc.ClosePosition("FUTIMOEXF000");
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

View File

@ -32,7 +32,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
private readonly InvestApiClient _investApiClient;
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
private readonly ILogger<ManagedAccount> _logger;
private readonly ILogger<TraderDataProvider> _logger;
private readonly string[] _managedAccountsNamePatterns = [];
private readonly string[] _instrumentsFigis = [];
@ -46,7 +46,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
private readonly SemaphoreSlim _syncSemaphore = new SemaphoreSlim(1, 1);
private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1);
public TraderDataProvider(InvestApiClient investApiClient, IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory, ILogger<ManagedAccount> logger)
public TraderDataProvider(InvestApiClient investApiClient, IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory, ILogger<TraderDataProvider> logger)
{
_investApiClient = investApiClient;
_dbContextFactory = dbContextFactory;
@ -256,7 +256,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
return _assetTypesCache.TryGetValue(figi, out var t) ? t : AssetType.Unknown;
}
internal async Task SyncPortfolio(string accountId)
private async Task SyncPortfolio(string accountId)
{
if (Accounts.TryGetValue(accountId, out var account))
{
@ -264,7 +264,7 @@ namespace KLHZ.Trader.Core.Exchange.Services
}
}
internal async Task SyncPortfolio(ManagedAccount account)
private async Task SyncPortfolio(ManagedAccount account)
{
try
{

View File

@ -1,7 +1,7 @@
using KLHZ.Trader.Core.Common.Messaging.Services;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using KLHZ.Trader.Core.Exchange.Interfaces;
using KLHZ.Trader.Core.Exchange.Models.Configs;
using KLHZ.Trader.Core.Exchange.Services;
using KLHZ.Trader.Core.TG;
@ -46,7 +46,9 @@ builder.Services.AddHostedService<BotStarter>();
builder.Services.AddHostedService<ExchangeDataReader>();
builder.Services.AddHostedService<Trader>();
builder.Services.AddHostedService<TradingCommandsExecutor>();
//builder.Services.AddHostedService<ProcessedPricesLogger>();
builder.Services.AddHostedService<PortfolioDataProvider>();
//builder.Services.AddHostedService<KalmanPredictor>();
@ -57,7 +59,7 @@ builder.Services.AddSingleton<IDataBus, DataBus>();
for (int i = 0; i < 10; i++)
{
builder.Services.AddKeyedSingleton<ManagedAccount>(i);
builder.Services.AddKeyedSingleton<IManagedAccount, ManagedAccount2>(i);
}
builder.Services.Configure<TgBotConfig>(builder.Configuration.GetSection(nameof(TgBotConfig)));