klhztrader/KLHZ.Trader.Core/Exchange/Services/TraderDataProvider.cs

421 lines
18 KiB
C#

using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions;
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
using KLHZ.Trader.Core.Exchange.Extentions;
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using KLHZ.Trader.Core.Exchange.Models.Configs;
using KLHZ.Trader.Core.Math.Declisions.Services.Cache;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Threading.Channels;
using Tinkoff.InvestApi;
using Tinkoff.InvestApi.V1;
using Asset = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Asset;
using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class TraderDataProvider
{
private readonly ConcurrentDictionary<string, IPriceHistoryCacheUnit> _historyCash = new();
private readonly InvestApiClient _investApiClient;
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
private readonly ILogger<ManagedAccount> _logger;
private readonly string[] _managedAccountsNamePatterns = [];
private readonly string[] _instrumentsFigis = [];
private readonly ConcurrentDictionary<string, InstrumentSettings> _instrumentsSettings = new();
private readonly ConcurrentDictionary<string, string> _tickersCache = new();
private readonly ConcurrentDictionary<string, AssetType> _assetTypesCache = new();
internal readonly ConcurrentDictionary<string, ManagedAccount> Accounts = new();
private readonly bool _isDataRecievingAllowed = false;
private readonly Channel<object> _forSave = Channel.CreateUnbounded<object>();
private readonly SemaphoreSlim _initSemaphore = new SemaphoreSlim(1, 1);
public TraderDataProvider(InvestApiClient investApiClient, IOptions<ExchangeConfig> options, IDbContextFactory<TraderDbContext> dbContextFactory, ILogger<ManagedAccount> logger)
{
_investApiClient = investApiClient;
_dbContextFactory = dbContextFactory;
_logger = logger;
_managedAccountsNamePatterns = options.Value.ManagingAccountNamePatterns.ToArray();
_instrumentsFigis = options.Value.DataRecievingInstrumentsFigis.ToArray();
_isDataRecievingAllowed = options.Value.ExchangeDataRecievingEnabled;
foreach (var lev in options.Value.InstrumentsSettings)
{
_instrumentsSettings.TryAdd(lev.Figi, lev);
}
}
public async ValueTask<(DateTime[] timestamps, decimal[] prices, bool isFullIntervalExists)> GetData(string figi, TimeSpan timeSpan)
{
if (_historyCash.TryGetValue(figi, out var unit))
{
var res = await unit.GetData(timeSpan);
return (res.timestamps, res.prices, res.isFullIntervalExists);
}
return (Array.Empty<DateTime>(), Array.Empty<decimal>(), false);
}
public async ValueTask<(DateTime[] timestamps, decimal[] prices, decimal bidsCount, decimal asksCount)> GetData(string figi, int? length = null)
{
if (_historyCash.TryGetValue(figi, out var unit))
{
var res = await unit.GetData(length);
return (res.timestamps, res.prices, unit.BidsCount, unit.AsksCount);
}
return (Array.Empty<DateTime>(), Array.Empty<decimal>(), 1, 1);
}
public async ValueTask AddData(INewPrice message, TimeSpan? clearingInterval = null)
{
if (_historyCash.TryGetValue(message.Figi, out var unit))
{
if (clearingInterval.HasValue)
{
var lasts = await unit.GetLastValues();
if (message.Time - lasts.time > clearingInterval.Value)
{
unit = new PriceHistoryCacheUnit2(message.Figi);
_historyCash[message.Figi] = unit;
}
}
await unit.AddData(message);
}
else
{
unit = new PriceHistoryCacheUnit2(message.Figi, message);
_historyCash.TryAdd(message.Figi, unit);
}
}
public async ValueTask AddOrderbook(IOrderbook orderbook)
{
if (!_historyCash.TryGetValue(orderbook.Figi, out var unit))
{
unit = new PriceHistoryCacheUnit2(orderbook.Figi);
_historyCash.TryAdd(orderbook.Figi, unit);
}
await unit.AddOrderbook(orderbook);
}
public async Task Init()
{
await _initSemaphore.WaitAsync(TimeSpan.FromSeconds(3));
try
{
var shares = await _investApiClient.Instruments.SharesAsync();
foreach (var share in shares.Instruments)
{
if (_instrumentsFigis.Contains(share.Figi))
{
_tickersCache.TryAdd(share.Figi, share.Ticker);
_assetTypesCache.TryAdd(share.Figi, AssetType.Common);
}
}
var futures = await _investApiClient.Instruments.FuturesAsync();
foreach (var future in futures.Instruments)
{
if (_instrumentsFigis.Contains(future.Figi))
{
_tickersCache.TryAdd(future.Figi, future.Ticker);
_assetTypesCache.TryAdd(future.Figi, AssetType.Futures);
}
}
var accounts = await _investApiClient.GetAccounts(_managedAccountsNamePatterns);
var accountsList = new List<ManagedAccount>();
foreach (var accountId in accounts)
{
var acc = new ManagedAccount(accountId);
await SyncPortfolio(acc);
Accounts[accountId] = acc;
}
if (_isDataRecievingAllowed)
{
var time = DateTime.UtcNow.AddHours(-1.5);
using var context1 = await _dbContextFactory.CreateDbContextAsync();
context1.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
var data = await context1.PriceChanges
.Where(c => _instrumentsFigis.Contains(c.Figi) && c.Time >= time)
.OrderBy(c => c.Time)
.Select(c => new NewPriceMessage()
{
Figi = c.Figi,
Ticker = c.Ticker,
Time = c.Time,
Value = c.Value,
IsHistoricalData = true
})
.ToArrayAsync();
foreach (var price in data)
{
await AddData(price);
}
}
_ = WritePricesTask();
}
catch(Exception ex)
{
}
}
public string GetTickerByFigi(string figi)
{
return _tickersCache.TryGetValue(figi, out var ticker) ? ticker : string.Empty;
}
public AssetType GetAssetTypeByFigi(string figi)
{
return _assetTypesCache.TryGetValue(figi, out var t) ? t : AssetType.Unknown;
}
internal async Task SyncPortfolio(ManagedAccount account)
{
try
{
//await _semaphoreSlim.WaitAsync();
var portfolio = await _investApiClient.Operations.GetPortfolioAsync(new PortfolioRequest()
{
AccountId = account.AccountId,
});
var oldAssets = account.Assets.Keys.ToHashSet();
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
var trades = await context.Trades
.Where(t => t.AccountId == account.AccountId && t.ArchiveStatus == 0)
.ToListAsync();
foreach (var position in portfolio.Positions)
{
decimal price = 0;
var trade = trades.FirstOrDefault(t => t.Figi == position.Figi);
if (trade != null)
{
trades.Remove(trade);
price = trade.Price;
}
else
{
price = position.AveragePositionPrice;
}
#pragma warning disable CS0612 // Тип или член устарел
var asset = new Models.AssetsAccounting.Asset()
{
TradeId = trade?.Id,
AccountId = account.AccountId,
Figi = position.Figi,
Ticker = position.Ticker,
BoughtAt = trade?.BoughtAt ?? DateTime.UtcNow,
BoughtPrice = price,
Type = position.InstrumentType.ParseInstrumentType(),
Position = position.Quantity > 0 ? PositionType.Long : PositionType.Short,
BlockedItems = position.BlockedLots,
Count = position.Quantity,
CountLots = position.QuantityLots,
};
#pragma warning restore CS0612 // Тип или член устарел
account.Assets.AddOrUpdate(asset.Figi, asset, (k, v) => asset);
oldAssets.Remove(asset.Figi);
}
account.Total = portfolio.TotalAmountPortfolio;
account.Balance = portfolio.TotalAmountCurrencies;
foreach (var asset in oldAssets)
{
account.Assets.TryRemove(asset, out _);
}
var ids = trades.Select(t => t.Id).ToArray();
await context.Trades
.Where(t => ids.Contains(t.Id))
.ExecuteUpdateAsync(t => t.SetProperty(tr => tr.ArchiveStatus, 1));
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при синхранизации портфеля счёта {accountId}", account.AccountId);
}
finally
{
//_semaphoreSlim.Release();
}
}
internal async Task UpdateFuturesPrice(INewPrice newPrice, decimal newPriceValue)
{
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
await context.Trades
.Where(t => t.Figi == newPrice.Figi && t.ArchiveStatus == 0 && t.Asset == DataLayer.Entities.Trades.Enums.AssetType.Future)
.ExecuteUpdateAsync(t => t.SetProperty(tr => tr.Price, newPriceValue).SetProperty(tr => tr.BoughtAt, DateTime.UtcNow));
foreach (var account in Accounts.Values)
{
await SyncPortfolio(account);
}
}
internal async Task LogDeal(DealResult dealResult)
{
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
var priceCoeff = 1m;
if (_instrumentsSettings.TryGetValue(dealResult.Figi, out var se))
{
priceCoeff = se.PriceToRubConvertationCoefficient;
}
var trade = await context.Trades.FirstOrDefaultAsync(t => t.ArchiveStatus == 0 && t.Figi == dealResult.Figi && t.AccountId == dealResult.AccountId);
if (trade == null)
{
var newTrade = new DataLayer.Entities.Trades.Trade()
{
AccountId = dealResult.AccountId,
Figi = dealResult.Figi,
Ticker = GetTickerByFigi(dealResult.Figi),
BoughtAt = DateTime.UtcNow,
Count = dealResult.Count,
Price = dealResult.Price * priceCoeff,
Position = dealResult.Count > 0 ? DataLayer.Entities.Trades.Enums.PositionType.Long : DataLayer.Entities.Trades.Enums.PositionType.Short,
Direction = (DataLayer.Entities.Trades.Enums.TradeDirection)(int)dealResult.Direction,
Asset = (DataLayer.Entities.Trades.Enums.AssetType)(int)GetAssetTypeByFigi(dealResult.Figi)
};
await context.Trades.AddAsync(newTrade);
await context.SaveChangesAsync();
}
else
{
var oldAmount = trade.Price * trade.Count;
var newAmount = dealResult.Price * priceCoeff * dealResult.Count;
var oldCount = trade.Count;
trade.Count = trade.Count + dealResult.Count;
if (trade.Count != 0 && System.Math.Abs(oldCount) < System.Math.Abs(trade.Count))// Если суммарное количество элементов позиции сокращается - пересчитывать цену не нужно.
{
trade.Price = (oldAmount + newAmount) / trade.Count;
}
if (Accounts.TryGetValue(dealResult.AccountId, out var account))
{
if (account.Assets.TryGetValue(dealResult.Figi, out var asset))
{
if (trade.Count == 0)
{
await context.Trades.Where(t => t.Id == trade.Id && t.ArchiveStatus == 0)
.ExecuteUpdateAsync(t => t.SetProperty(tr => tr.ArchiveStatus, 1));
account.Assets.TryRemove(dealResult.Figi, out _);
return;
}
else
{
context.Trades.Update(trade);
await context.SaveChangesAsync();
var newAsset = new Asset()
{
AccountId = asset.AccountId,
Figi = asset.Figi,
Ticker = asset.Ticker,
BlockedItems = asset.BlockedItems,
BoughtAt = DateTime.UtcNow,
BoughtPrice = trade.Price,
Count = trade.Count,
Position = trade.Count > 0 ? PositionType.Long : PositionType.Short,
Type = asset.Type,
TradeId = asset.TradeId,
};
account.Assets[dealResult.Figi] = newAsset;
return;
}
}
}
}
await SyncPortfolio(Accounts[dealResult.AccountId]);
}
internal async Task LogPrice(ProcessedPrice price, bool saveImmediately)
{
if (saveImmediately)
{
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
await context.ProcessedPrices.AddRangeAsync(price);
await context.SaveChangesAsync();
}
else
{
await _forSave.Writer.WriteAsync(price);
}
}
internal async Task LogDeclision(Declision declision, bool saveImmediately)
{
if (saveImmediately)
{
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
await context.Declisions.AddRangeAsync(declision);
await context.SaveChangesAsync();
}
else
{
await _forSave.Writer.WriteAsync(declision);
}
}
private async Task WritePricesTask()
{
var buffer1 = new List<ProcessedPrice>();
var buffer2 = new List<Declision>();
while (await _forSave.Reader.WaitToReadAsync())
{
try
{
var obj = await _forSave.Reader.ReadAsync();
if (obj is ProcessedPrice price)
{
buffer1.Add(price);
}
if (obj is Declision dec)
{
buffer2.Add(dec);
}
if ((buffer1.Count + buffer2.Count) > 50000 || _forSave.Reader.Count == 0)
{
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
if (buffer1.Count > 0)
{
await context.ProcessedPrices.AddRangeAsync(buffer1);
}
if (buffer2.Count > 0)
{
await context.Declisions.AddRangeAsync(buffer2);
}
await context.SaveChangesAsync();
buffer1.Clear();
buffer2.Clear();
}
}
catch (Exception ex)
{
}
}
}
}
}