528 lines
23 KiB
C#
528 lines
23 KiB
C#
using KLHZ.Trader.Core.Contracts.Declisions.Dtos;
|
|
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
|
|
using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
|
|
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
|
|
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
|
|
using KLHZ.Trader.Core.DataLayer;
|
|
using KLHZ.Trader.Core.DataLayer.Entities.Declisions;
|
|
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
|
|
using KLHZ.Trader.Core.Exchange.Extentions;
|
|
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
|
|
using KLHZ.Trader.Core.Exchange.Models.Configs;
|
|
using KLHZ.Trader.Core.Math.Declisions.Dtos.FFT;
|
|
using KLHZ.Trader.Core.Math.Declisions.Services.Cache;
|
|
using KLHZ.Trader.Core.Math.Declisions.Utils;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using System.Collections.Concurrent;
|
|
using System.Threading.Channels;
|
|
using Tinkoff.InvestApi;
|
|
using Tinkoff.InvestApi.V1;
|
|
using Asset = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Asset;
|
|
using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType;
|
|
using Order = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Order;
|
|
using PositionType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.PositionType;
|
|
|
|
namespace KLHZ.Trader.Core.Exchange.Services
|
|
{
|
|
public class TraderDataProvider
|
|
{
|
|
private readonly ConcurrentDictionary<string, IPriceHistoryCacheUnit> _historyCash = new();
|
|
|
|
private readonly InvestApiClient _investApiClient;
|
|
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
|
|
private readonly ILogger<ManagedAccount> _logger;
|
|
private readonly string[] _managedAccountsNamePatterns = [];
|
|
private readonly string[] _instrumentsFigis = [];
|
|
|
|
private readonly ConcurrentDictionary<string, FFTAnalyzeResult> _fftResults = new();
|
|
private readonly ConcurrentDictionary<string, InstrumentSettings> _instrumentsSettings = new();
|
|
private readonly ConcurrentDictionary<string, string> _tickersCache = new();
|
|
private readonly ConcurrentDictionary<string, AssetType> _assetTypesCache = new();
|
|
internal readonly ConcurrentDictionary<string, ManagedAccount> Accounts = new();
|
|
private readonly bool _isDataRecievingAllowed = false;
|
|
private readonly Channel<object> _forSave = Channel.CreateUnbounded<object>();
|
|
private readonly SemaphoreSlim _syncSemaphore = new SemaphoreSlim(1, 1);
|
|
private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1);
|
|
|
|
public TraderDataProvider(InvestApiClient investApiClient, IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory, ILogger<ManagedAccount> logger)
|
|
{
|
|
_investApiClient = investApiClient;
|
|
_dbContextFactory = dbContextFactory;
|
|
_logger = logger;
|
|
_managedAccountsNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray();
|
|
_instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray();
|
|
_isDataRecievingAllowed = options.Value.ExchangeDataRecievingEnabled;
|
|
|
|
foreach (var lev in options.Value.InstrumentsSettings)
|
|
{
|
|
_instrumentsSettings.TryAdd(lev.Figi, lev);
|
|
}
|
|
}
|
|
|
|
public ValueTask<FFTAnalyzeResult> GetFFtResult(string figi)
|
|
{
|
|
if (_fftResults.TryGetValue(figi, out var res))
|
|
{
|
|
return ValueTask.FromResult(res);
|
|
}
|
|
return ValueTask.FromResult<FFTAnalyzeResult>(FFTAnalyzeResult.Empty);
|
|
}
|
|
|
|
public ValueTask SetFFtResult(FFTAnalyzeResult result)
|
|
{
|
|
_fftResults[result.Key] = result;
|
|
return ValueTask.CompletedTask;
|
|
}
|
|
|
|
public async ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(string figi, TimeSpan timeSpan)
|
|
{
|
|
if (_historyCash.TryGetValue(figi, out var unit))
|
|
{
|
|
var res = await unit.GetData(timeSpan);
|
|
return (res.timestamps, res.prices, res.isFullIntervalExists);
|
|
}
|
|
return (Array.Empty<DateTime>(), Array.Empty<decimal>(), false);
|
|
}
|
|
|
|
public async ValueTask<(DateTime[] timestamps, decimal[] prices)> GetData(string figi, int? length = null)
|
|
{
|
|
if (_historyCash.TryGetValue(figi, out var unit))
|
|
{
|
|
var res = await unit.GetData(length);
|
|
return (res.timestamps, res.prices);
|
|
}
|
|
return (Array.Empty<DateTime>(), Array.Empty<decimal>());
|
|
}
|
|
|
|
public async ValueTask AddData(INewPrice message, TimeSpan? clearingInterval = null)
|
|
{
|
|
if (_historyCash.TryGetValue(message.Figi, out var unit))
|
|
{
|
|
if (clearingInterval.HasValue)
|
|
{
|
|
var lasts = await unit.GetLastValues();
|
|
if (message.Time - lasts.time > clearingInterval.Value)
|
|
{
|
|
unit = new PriceHistoryCacheUnit2(message.Figi);
|
|
_historyCash[message.Figi] = unit;
|
|
}
|
|
}
|
|
await unit.AddData(message);
|
|
}
|
|
else
|
|
{
|
|
unit = new PriceHistoryCacheUnit2(message.Figi, message);
|
|
_historyCash.TryAdd(message.Figi, unit);
|
|
}
|
|
}
|
|
|
|
public async ValueTask AddDataTo1MinuteWindowCache(string figi, string key, CachedValue data)
|
|
{
|
|
if (!_historyCash.TryGetValue(figi, out var unit))
|
|
{
|
|
unit = new PriceHistoryCacheUnit2(figi);
|
|
_historyCash.TryAdd(figi, unit);
|
|
}
|
|
await _historyCash[figi].AddDataToTimeWindowCache(key, data, TimeWindowCacheType._1_Minute);
|
|
}
|
|
|
|
public async ValueTask AddDataTo5MinuteWindowCache(string figi, string key, CachedValue data)
|
|
{
|
|
if (!_historyCash.TryGetValue(figi, out var unit))
|
|
{
|
|
unit = new PriceHistoryCacheUnit2(figi);
|
|
_historyCash.TryAdd(figi, unit);
|
|
}
|
|
await _historyCash[figi].AddDataToTimeWindowCache(key, data, TimeWindowCacheType._5_Minutes);
|
|
}
|
|
|
|
public ValueTask<CachedValue[]> GetDataFrom1MinuteWindowCache(string figi, string key)
|
|
{
|
|
if (_historyCash.TryGetValue(figi, out var cahcheItem))
|
|
{
|
|
return cahcheItem.GetDataFromTimeWindowCache(key, TimeWindowCacheType._1_Minute);
|
|
}
|
|
return ValueTask.FromResult(Array.Empty<CachedValue>());
|
|
}
|
|
|
|
public ValueTask<CachedValue[]> GetDataFrom5MinuteWindowCache(string figi, string key)
|
|
{
|
|
if (_historyCash.TryGetValue(figi, out var cahcheItem))
|
|
{
|
|
return cahcheItem.GetDataFromTimeWindowCache(key, TimeWindowCacheType._5_Minutes);
|
|
}
|
|
return ValueTask.FromResult(Array.Empty<CachedValue>());
|
|
}
|
|
|
|
public async ValueTask AddOrderbook(IOrderbook orderbook)
|
|
{
|
|
if (!_historyCash.TryGetValue(orderbook.Figi, out var unit))
|
|
{
|
|
unit = new PriceHistoryCacheUnit2(orderbook.Figi);
|
|
_historyCash.TryAdd(orderbook.Figi, unit);
|
|
}
|
|
|
|
await unit.AddOrderbook(orderbook);
|
|
}
|
|
|
|
public async Task Init()
|
|
{
|
|
try
|
|
{
|
|
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(3));
|
|
await _initSemaphore.WaitAsync(cts.Token);
|
|
var shares = await _investApiClient.Instruments.SharesAsync();
|
|
foreach (var share in shares.Instruments)
|
|
{
|
|
if (_instrumentsFigis.Contains(share.Figi))
|
|
{
|
|
_tickersCache.TryAdd(share.Figi, share.Ticker);
|
|
_assetTypesCache.TryAdd(share.Figi, AssetType.Common);
|
|
}
|
|
}
|
|
var futures = await _investApiClient.Instruments.FuturesAsync();
|
|
foreach (var future in futures.Instruments)
|
|
{
|
|
if (_instrumentsFigis.Contains(future.Figi))
|
|
{
|
|
_tickersCache.TryAdd(future.Figi, future.Ticker);
|
|
_assetTypesCache.TryAdd(future.Figi, AssetType.Futures);
|
|
}
|
|
}
|
|
|
|
var accounts = await _investApiClient.GetAccounts(_managedAccountsNamePatterns);
|
|
var accountsList = new List<ManagedAccount>();
|
|
foreach (var accountId in accounts)
|
|
{
|
|
var acc = new ManagedAccount(accountId);
|
|
await SyncPortfolio(acc);
|
|
Accounts[accountId] = acc;
|
|
}
|
|
|
|
if (_isDataRecievingAllowed)
|
|
{
|
|
var time = DateTime.UtcNow.AddHours(-1.5);
|
|
using var context1 = await _dbContextFactory.CreateDbContextAsync();
|
|
context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
|
var data = await context1.PriceChanges
|
|
.Where(c => _instrumentsFigis.Contains(c.Figi) && c.Time >= time)
|
|
.OrderBy(c => c.Time)
|
|
.Select(c => new NewPriceMessage()
|
|
{
|
|
Figi = c.Figi,
|
|
Ticker = c.Ticker,
|
|
Time = c.Time,
|
|
Value = c.Value,
|
|
IsHistoricalData = true
|
|
})
|
|
.ToArrayAsync();
|
|
|
|
foreach (var price in data)
|
|
{
|
|
await AddData(price);
|
|
|
|
var cachedData = await GetData(price.Figi);
|
|
if ((DateTime.UtcNow - price.Time).TotalMinutes < 5)
|
|
{
|
|
if (ShapeAreaCalculator.TryGetAreasRelation(cachedData.timestamps, cachedData.prices, price.Value, Constants.AreasRelationWindow, out var rel))
|
|
{
|
|
await AddDataTo1MinuteWindowCache(price.Figi, Constants._1minCacheKey, new CachedValue() { Time = price.Time, Value = (decimal)rel });
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
_ = SyncPortfolioWorker();
|
|
_ = WritePricesTask();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
|
|
}
|
|
}
|
|
|
|
public string GetTickerByFigi(string figi)
|
|
{
|
|
return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty;
|
|
}
|
|
|
|
public AssetType GetAssetTypeByFigi(string figi)
|
|
{
|
|
return _assetTypesCache.TryGetValue(figi, out var t) ? t : AssetType.Unknown;
|
|
}
|
|
|
|
internal async Task SyncPortfolio(string accountId)
|
|
{
|
|
if (Accounts.TryGetValue(accountId, out var account))
|
|
{
|
|
await SyncPortfolio(account);
|
|
}
|
|
}
|
|
|
|
internal async Task SyncPortfolio(ManagedAccount account)
|
|
{
|
|
try
|
|
{
|
|
await _syncSemaphore.WaitAsync();
|
|
var portfolio = await _investApiClient.Operations.GetPortfolioAsync(new PortfolioRequest()
|
|
{
|
|
AccountId = account.AccountId,
|
|
});
|
|
|
|
var oldAssets = account.Assets.Keys.ToHashSet();
|
|
using var context = await _dbContextFactory.CreateDbContextAsync();
|
|
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
|
|
|
var trades = await context.Trades
|
|
.Where(t => t.AccountId == account.AccountId && t.ArchiveStatus == 0)
|
|
.ToListAsync();
|
|
foreach (var position in portfolio.Positions)
|
|
{
|
|
decimal price = 0;
|
|
var trade = trades.FirstOrDefault(t => t.Figi == position.Figi);
|
|
|
|
if (trade != null)
|
|
{
|
|
trade.Count = position.Quantity;
|
|
trade.Position = position.Quantity > 0 ? DataLayer.Entities.Trades.Enums.PositionType.Long : DataLayer.Entities.Trades.Enums.PositionType.Short;
|
|
trades.Remove(trade);
|
|
price = trade.Price;
|
|
context.Trades.Update(trade);
|
|
await context.SaveChangesAsync();
|
|
}
|
|
else
|
|
{
|
|
price = position.AveragePositionPrice;
|
|
|
|
}
|
|
|
|
#pragma warning disable CS0612 // Тип или член устарел
|
|
var asset = new Models.AssetsAccounting.Asset()
|
|
{
|
|
TradeId = trade?.Id,
|
|
AccountId = account.AccountId,
|
|
Figi = position.Figi,
|
|
Ticker = position.Ticker,
|
|
BoughtAt = trade?.BoughtAt ?? DateTime.UtcNow,
|
|
BoughtPrice = price,
|
|
Type = position.InstrumentType.ParseInstrumentType(),
|
|
Position = position.Quantity > 0 ? PositionType.Long : PositionType.Short,
|
|
BlockedItems = position.BlockedLots,
|
|
Count = position.Quantity,
|
|
CountLots = position.QuantityLots,
|
|
};
|
|
#pragma warning restore CS0612 // Тип или член устарел
|
|
account.Assets.AddOrUpdate(asset.Figi, asset, (k, v) => asset);
|
|
oldAssets.Remove(asset.Figi);
|
|
}
|
|
|
|
account.Total = portfolio.TotalAmountPortfolio;
|
|
account.Balance = portfolio.TotalAmountCurrencies;
|
|
|
|
foreach (var asset in oldAssets)
|
|
{
|
|
account.Assets.TryRemove(asset, out _);
|
|
}
|
|
|
|
var ids = trades.Select(t => t.Id).ToArray();
|
|
await context.Trades
|
|
.Where(t => ids.Contains(t.Id))
|
|
.ExecuteUpdateAsync(t => t.SetProperty(tr => tr.ArchiveStatus, 1));
|
|
|
|
var orders = await _investApiClient.Orders.GetOrdersAsync(new GetOrdersRequest() { AccountId = account.AccountId });
|
|
var actualOrders = orders.Orders.Select(o => new Order()
|
|
{
|
|
AccountId = account.AccountId,
|
|
Figi = o.Figi,
|
|
OrderId = o.OrderId,
|
|
Ticker = GetTickerByFigi(o.Figi),
|
|
Count = o.LotsRequested,
|
|
ExpirationTime = DateTime.UtcNow.AddMinutes(10),
|
|
OpenDate = DateTime.UtcNow,
|
|
Price = o.AveragePositionPrice,
|
|
Direction = (DealDirection)(int)o.Direction
|
|
}).ToArray();
|
|
|
|
foreach (var order in actualOrders)
|
|
{
|
|
account.Orders[order.OrderId] = order;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Ошибка при синхранизации портфеля счёта {accountId}", account.AccountId);
|
|
}
|
|
|
|
_syncSemaphore.Release();
|
|
}
|
|
|
|
internal async Task UpdateFuturesPrice(INewPrice newPrice, decimal newPriceValue)
|
|
{
|
|
using var context = await _dbContextFactory.CreateDbContextAsync();
|
|
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
|
await context.Trades
|
|
.Where(t => t.Figi == newPrice.Figi && t.ArchiveStatus == 0 && t.Asset == DataLayer.Entities.Trades.Enums.AssetType.Future)
|
|
.ExecuteUpdateAsync(t => t.SetProperty(tr => tr.Price, newPriceValue).SetProperty(tr => tr.BoughtAt, DateTime.UtcNow));
|
|
foreach (var account in Accounts.Values)
|
|
{
|
|
await SyncPortfolio(account);
|
|
}
|
|
}
|
|
|
|
internal async Task LogDeal(DealResult dealResult)
|
|
{
|
|
using var context = await _dbContextFactory.CreateDbContextAsync();
|
|
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
|
var priceCoeff = 1m;
|
|
var sign = dealResult.Direction == DealDirection.Sell ? -1m : 1;
|
|
var dealCount = dealResult.Count * sign;
|
|
if (_instrumentsSettings.TryGetValue(dealResult.Figi, out var se))
|
|
{
|
|
priceCoeff = se.PriceToRubConvertationCoefficient;
|
|
}
|
|
var trade = await context.Trades.FirstOrDefaultAsync(t => t.ArchiveStatus == 0 && t.Figi == dealResult.Figi && t.AccountId == dealResult.AccountId);
|
|
if (trade == null)
|
|
{
|
|
var newTrade = new DataLayer.Entities.Trades.Trade()
|
|
{
|
|
AccountId = dealResult.AccountId,
|
|
Figi = dealResult.Figi,
|
|
Ticker = GetTickerByFigi(dealResult.Figi),
|
|
BoughtAt = DateTime.UtcNow,
|
|
Count = dealCount,
|
|
Price = dealResult.Price * priceCoeff,
|
|
Position = dealCount >= 0 ? DataLayer.Entities.Trades.Enums.PositionType.Long : DataLayer.Entities.Trades.Enums.PositionType.Short,
|
|
Direction = (DataLayer.Entities.Trades.Enums.TradeDirection)(int)dealResult.Direction,
|
|
Asset = (DataLayer.Entities.Trades.Enums.AssetType)(int)GetAssetTypeByFigi(dealResult.Figi)
|
|
};
|
|
|
|
await context.Trades.AddAsync(newTrade);
|
|
await context.SaveChangesAsync();
|
|
}
|
|
else
|
|
{
|
|
var oldAmount = trade.Price * trade.Count;
|
|
var newAmount = dealResult.Price * priceCoeff * dealCount;
|
|
|
|
|
|
var oldCount = trade.Count;
|
|
|
|
trade.Count = trade.Count + dealCount;
|
|
if (trade.Count != 0)// Если суммарное количество элементов позиции сокращается - пересчитывать цену не нужно.
|
|
{
|
|
if (trade.Count / System.Math.Abs(trade.Count) != oldCount / System.Math.Abs(oldCount))//если сменился знак общего числа активов.
|
|
{
|
|
trade.Price = dealResult.Price;
|
|
trade.Position = trade.Count < 0 ? DataLayer.Entities.Trades.Enums.PositionType.Short : DataLayer.Entities.Trades.Enums.PositionType.Long;
|
|
}
|
|
else
|
|
{
|
|
trade.Price = (oldAmount + newAmount) / trade.Count;
|
|
}
|
|
context.Trades.Update(trade);
|
|
await context.SaveChangesAsync();
|
|
}
|
|
}
|
|
}
|
|
|
|
internal async Task LogPrice(ProcessedPrice price, bool saveImmediately)
|
|
{
|
|
if (saveImmediately)
|
|
{
|
|
using var context = await _dbContextFactory.CreateDbContextAsync();
|
|
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
|
await context.ProcessedPrices.AddRangeAsync(price);
|
|
await context.SaveChangesAsync();
|
|
}
|
|
else
|
|
{
|
|
await _forSave.Writer.WriteAsync(price);
|
|
}
|
|
}
|
|
|
|
internal async Task LogDeclision(Declision declision, bool saveImmediately)
|
|
{
|
|
if (saveImmediately)
|
|
{
|
|
using var context = await _dbContextFactory.CreateDbContextAsync();
|
|
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
|
await context.Declisions.AddRangeAsync(declision);
|
|
await context.SaveChangesAsync();
|
|
}
|
|
else
|
|
{
|
|
await _forSave.Writer.WriteAsync(declision);
|
|
}
|
|
}
|
|
|
|
private async Task SyncPortfolioWorker()
|
|
{
|
|
while (true)
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(20000);
|
|
foreach (var acc in Accounts)
|
|
{
|
|
await SyncPortfolio(acc.Value);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Ошибка при цикличном обновлении портфеля");
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task WritePricesTask()
|
|
{
|
|
var buffer1 = new List<ProcessedPrice>();
|
|
var buffer2 = new List<Declision>();
|
|
while (await _forSave.Reader.WaitToReadAsync())
|
|
{
|
|
try
|
|
{
|
|
var obj = await _forSave.Reader.ReadAsync();
|
|
if (obj is ProcessedPrice price)
|
|
{
|
|
buffer1.Add(price);
|
|
}
|
|
if (obj is Declision dec)
|
|
{
|
|
buffer2.Add(dec);
|
|
}
|
|
if ((buffer1.Count + buffer2.Count) > 50000 || _forSave.Reader.Count == 0)
|
|
{
|
|
using var context = await _dbContextFactory.CreateDbContextAsync();
|
|
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
|
if (buffer1.Count > 0)
|
|
{
|
|
await context.ProcessedPrices.AddRangeAsync(buffer1);
|
|
}
|
|
if (buffer2.Count > 0)
|
|
{
|
|
await context.Declisions.AddRangeAsync(buffer2);
|
|
}
|
|
|
|
await context.SaveChangesAsync();
|
|
buffer1.Clear();
|
|
buffer2.Clear();
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
public ValueTask<Asset[]> GetAssetsByFigi(string figi)
|
|
{
|
|
var assets = Accounts.Values.SelectMany(a => a.Assets.Values.Where(aa => aa.Figi == figi)).ToArray();
|
|
return ValueTask.FromResult(assets);
|
|
}
|
|
}
|
|
}
|