klhztrader/KLHZ.Trader.Core/Exchange/Services/Trader.cs

388 lines
19 KiB
C#

using KLHZ.Trader.Core.Common;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using KLHZ.Trader.Core.Exchange.Models.Configs;
using KLHZ.Trader.Core.Exchange.Models.Trading;
using KLHZ.Trader.Core.Exchange.Utils;
using KLHZ.Trader.Core.Math.Declisions.Services.Cache;
using KLHZ.Trader.Core.Math.Declisions.Utils;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Threading.Channels;
using Tinkoff.InvestApi;
using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class Trader : IHostedService
{
private readonly IDataBus _dataBus;
private readonly BotModeSwitcher _botModeSwitcher;
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
private readonly TradeDataProvider _tradeDataProvider;
private readonly ILogger<Trader> _logger;
private readonly ConcurrentDictionary<string, DeferredTrade> DeferredLongOpens = new();
private readonly ConcurrentDictionary<string, DeferredTrade> DeferredLongCloses = new();
private readonly ConcurrentDictionary<string, DateTime> OpeningStops = new();
private readonly ConcurrentDictionary<string, InstrumentSettings> Leverages = new();
private readonly ConcurrentDictionary<string, IPriceHistoryCacheUnit> _historyCash = new();
private readonly double _buyStopLength;
private readonly decimal _futureComission;
private readonly decimal _shareComission;
private readonly decimal _accountCashPart;
private readonly decimal _accountCashPartFutures;
private readonly decimal _defaultBuyPartOfAccount;
private readonly string[] _tradingInstrumentsFigis = [];
private readonly Channel<INewPrice> _pricesChannel = Channel.CreateUnbounded<INewPrice>();
private readonly Channel<IOrderbook> _ordersbookChannel = Channel.CreateUnbounded<IOrderbook>();
private readonly CancellationTokenSource _cts = new();
public Trader(
ILogger<Trader> logger,
BotModeSwitcher botModeSwitcher,
IServiceProvider provider,
IOptions<ExchangeConfig> options,
IDataBus dataBus,
IDbContextFactory<TraderDbContext> dbContextFactory,
TradeDataProvider tradeDataProvider,
InvestApiClient investApiClient)
{
_tradeDataProvider = tradeDataProvider;
_logger = logger;
_botModeSwitcher = botModeSwitcher;
_dataBus = dataBus;
_dbContextFactory = dbContextFactory;
_futureComission = options.Value.FutureComission;
_shareComission = options.Value.ShareComission;
_accountCashPart = options.Value.AccountCashPart;
_accountCashPartFutures = options.Value.AccountCashPartFutures;
_defaultBuyPartOfAccount = options.Value.DefaultBuyPartOfAccount;
_tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis;
_buyStopLength = (double)options.Value.StopBuyLengthMinuts;
foreach (var lev in options.Value.InstrumentsSettings)
{
Leverages.TryAdd(lev.Figi, lev);
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _tradeDataProvider.Init();
_dataBus.AddChannel(nameof(Trader), _pricesChannel);
_dataBus.AddChannel(nameof(Trader), _ordersbookChannel);
_ = ProcessPrices();
_ = ProcessOrdersbooks();
}
private async Task ProcessPrices()
{
var declisionsForSave = new List<Declision>();
var processedPrices = new List<ProcessedPrice>();
while (await _pricesChannel.Reader.WaitToReadAsync())
{
var bigWindowProcessor = nameof(Trader) + "_big";
var smallWindowProcessor = nameof(Trader) + "_small";
var message = await _pricesChannel.Reader.ReadAsync();
if (_tradingInstrumentsFigis.Contains(message.Figi))
{
var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow;
if (_historyCash.TryGetValue(message.Figi, out var unit))
{
await unit.AddData(message);
}
else
{
unit = new PriceHistoryCacheUnit2(message.Figi, message);
_historyCash.TryAdd(message.Figi, unit);
}
try
{
if (message.Figi == "FUTIMOEXF000")
{
DeferredTrade? longOpen;
DeferredLongOpens.TryGetValue(message.Figi, out longOpen);
if (longOpen != null)
{
var t = currentTime;
if (longOpen.Time <= t
&& t - longOpen.Time < TimeSpan.FromMinutes(3))
{
DeferredLongOpens.TryRemove(message.Figi, out _);
if (message.Value - longOpen.Price < 1)
{
LogDeclision(declisionsForSave, DeclisionTradeAction.OpenLong, message);
}
}
}
DeferredTrade? longClose;
DeferredLongCloses.TryGetValue(message.Figi, out longClose);
if (longClose != null)
{
if (longClose.Time <= currentTime)
{
DeferredLongCloses.TryRemove(message.Figi, out _);
if (longClose.Price - message.Value < 1)
{
LogDeclision(declisionsForSave, DeclisionTradeAction.CloseLong, message);
}
}
}
var windowMaxSize = 100;
var data = await unit.GetData(windowMaxSize);
var state = ExchangeScheduler.GetCurrentState(message.Time);
if (state == ExchangeState.ClearingTime
&& data.timestamps.Length > 1
&& (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3))
{
await UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]);
}
if (OpeningStops.TryGetValue(message.Figi, out var dt))
{
if (dt < currentTime)
{
OpeningStops.TryRemove(message.Figi, out _);
}
}
if ((unit.BidsCount / unit.AsksCount) < 0.5m || (unit.BidsCount / unit.AsksCount) > 2m)
{
var stopTo = currentTime.AddMinutes(3);
//OpeningStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo);
//LogDeclision(declisionsForSave, DeclisionTradeAction.StopBuyShortTime, message);
}
var res = TradingEvent.None;
var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 45, 180, 2.5m);
var resultLongClose = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 15, 120, 2.5m).events;
var uptrendStarts = LocalTrends.CheckByLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(120), TimeSpan.FromSeconds(15), 2m, 10);
//var uptrendStarts2 = LocalTrends.CheckByLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(3), 1.5m, 2);
res |= (uptrendStarts & TradingEvent.UptrendStart);
//res |= (uptrendStarts2 & TradingEvent.UptrendStart);
//res |= downtrendEnds;
res |= resultLongClose;
res |= resultMoveAvFull.events;
if (resultMoveAvFull.bigWindowAv != 0)
{
LogPrice(processedPrices, message, bigWindowProcessor, resultMoveAvFull.bigWindowAv);
LogPrice(processedPrices, message, smallWindowProcessor, resultMoveAvFull.smallWindowAv);
}
if ((resultLongClose & TradingEvent.StopBuy) == TradingEvent.StopBuy)
{
var stopTo = (message.IsHistoricalData ? message.Time : DateTime.UtcNow).AddMinutes(_buyStopLength);
OpeningStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo);
//LogDeclision(declisionsForSave, DeclisionTradeAction.StopBuy, message);
}
if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart
&& !OpeningStops.TryGetValue(message.Figi, out _)
&& state == ExchangeState.Open
&& data.timestamps.Length > 1
&& (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1)))
{
var trade = new DeferredTrade()
{
Figi = message.Figi,
Price = message.Value,
Time = message.Time.AddSeconds(15)
};
DeferredLongOpens[message.Figi] = trade;
}
if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd)
{
var trade = new DeferredTrade()
{
Figi = message.Figi,
Price = message.Value,
Time = message.Time.AddSeconds(15)
};
DeferredLongCloses[message.Figi] = trade;
}
//if ((resultLongOpen.events & TradingEvent.ShortOpen) == TradingEvent.ShortOpen
// && !OpeningStops.TryGetValue(message.Figi, out _))
//{
// LogDeclision(declisionsForSave, DeclisionTradeAction.OpenShort, message);
//}
//if ((resultLongOpen.events & TradingEvent.ShortClose) == TradingEvent.ShortClose)
//{
// LogDeclision(declisionsForSave, DeclisionTradeAction.CloseShort, message);
//}
if ((!message.IsHistoricalData && (processedPrices.Count > 0 || declisionsForSave.Count > 0))
|| (message.IsHistoricalData && ((processedPrices.Count + declisionsForSave.Count > 10000) || _pricesChannel.Reader.Count == 0)))
{
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
if (processedPrices.Count > 0)
{
await context.ProcessedPrices.AddRangeAsync(processedPrices);
processedPrices.Clear();
}
if (declisionsForSave.Count > 0)
{
await context.Declisions.AddRangeAsync(declisionsForSave);
declisionsForSave.Clear();
}
await context.SaveChangesAsync();
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF");
}
}
}
}
private async Task UpdateFuturesPrice(INewPrice newPrice, decimal newPriceValue)
{
using var context = await _dbContextFactory.CreateDbContextAsync();
await context.Trades
.Where(t => t.Figi == newPrice.Figi && t.ArchiveStatus == 0 && t.Asset == DataLayer.Entities.Trades.Enums.AssetType.Future)
.ExecuteUpdateAsync(t => t.SetProperty(tr => tr.Price, newPriceValue));
foreach (var account in _tradeDataProvider.Accounts.Values)
{
await _tradeDataProvider.SyncPortfolio(account);
}
}
private static void LogPrice(List<ProcessedPrice> prices, INewPrice message, string processor, decimal value)
{
prices.Add(new ProcessedPrice()
{
Figi = message.Figi,
Ticker = message.Ticker,
Processor = processor,
Time = message.Time,
Value = value,
});
}
private static void LogDeclision(List<Declision> declisions, DeclisionTradeAction action, INewPrice message)
{
declisions.Add(new Declision()
{
AccountId = string.Empty,
Figi = message.Figi,
Ticker = message.Ticker,
Price = message.Value,
Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
Action = action,
});
}
private async Task ProcessOrdersbooks()
{
while (await _ordersbookChannel.Reader.WaitToReadAsync())
{
var message = await _ordersbookChannel.Reader.ReadAsync();
if (!_historyCash.TryGetValue(message.Figi, out var data))
{
data = new PriceHistoryCacheUnit2(message.Figi);
_historyCash.TryAdd(message.Figi, data);
}
await data.AddOrderbook(message);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();
return Task.CompletedTask;
}
private decimal GetComission(AssetType assetType)
{
if (assetType == AssetType.Common)
{
return _shareComission;
}
else if (assetType == AssetType.Futures)
{
return _futureComission;
}
else
{
return 0;
}
}
private decimal CalcProfit(string accountId, string figi, decimal closePrice)
{
if (_tradeDataProvider.Accounts.TryGetValue(accountId, out var account))
{
if (account.Assets.TryGetValue(figi, out var asset))
{
var leverageValue = 1m;
var isShort = asset.Position == PositionType.Short;
if (Leverages.TryGetValue(figi, out var leverage))
{
if (asset.Type == AssetType.Futures && !isShort)
{
leverageValue = leverage.LongLeverage;
}
else if (isShort)
{
leverageValue = leverage.ShortLeverage;
}
}
return TradingCalculator.CaclProfit(asset.BoughtPrice, closePrice, GetComission(asset.Type), leverageValue, isShort);
}
}
return 0;
}
private decimal GetCount(string accountId, decimal boutPrice)
{
var balance = _tradeDataProvider.Accounts[accountId].Balance;
return System.Math.Floor(balance * _defaultBuyPartOfAccount / boutPrice);
}
private bool IsBuyAllowed(string accountId, decimal boutPrice, decimal count, bool needBigCash)
{
if (!_botModeSwitcher.CanPurchase()) return false;
var balance = _tradeDataProvider.Accounts[accountId].Balance;
var total = _tradeDataProvider.Accounts[accountId].Total;
var futures = _tradeDataProvider.Accounts[accountId].Assets.Values.FirstOrDefault(v => v.Type == AssetType.Futures);
if (futures != null || needBigCash)
{
if ((balance - boutPrice * count) / total < _accountCashPartFutures) return false;
}
else
{
if ((balance - boutPrice * count) / total < _accountCashPart) return false;
}
return true;
}
}
}