klhztrader/KLHZ.Trader.Core/Exchange/Services/Trader.cs

460 lines
24 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using KLHZ.Trader.Core.Common;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
using KLHZ.Trader.Core.Exchange.Interfaces;
using KLHZ.Trader.Core.Exchange.Models.Configs;
using KLHZ.Trader.Core.Exchange.Models.Trading;
using KLHZ.Trader.Core.Exchange.Utils;
using KLHZ.Trader.Core.Math.Declisions.Utils;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Security.Cryptography;
using System.Threading.Channels;
using Tinkoff.InvestApi;
using Asset = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.Asset;
using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType;
using PositionType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.PositionType;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class Trader : IHostedService
{
private readonly IDataBus _dataBus;
private readonly TraderDataProvider _tradeDataProvider;
private readonly PortfolioWrapper _portfolioWrapper;
private readonly ExchangeConfig _exchangeConfig;
private readonly ILogger<Trader> _logger;
private readonly ConcurrentDictionary<string, TradingMode> TradingModes = new();
private readonly ConcurrentDictionary<string, decimal> DPirsonValues = new();
private readonly Channel<INewPrice> _pricesChannel = Channel.CreateUnbounded<INewPrice>();
private readonly Channel<ITradeCommand> _commands = Channel.CreateUnbounded<ITradeCommand>();
private readonly Channel<IOrderbook> _orderbooks = Channel.CreateUnbounded<IOrderbook>();
public Trader(
ILogger<Trader> logger,
IOptions<ExchangeConfig> options,
IDataBus dataBus,
PortfolioWrapper portfolioWrapper,
TraderDataProvider tradeDataProvider,
InvestApiClient investApiClient)
{
_portfolioWrapper = portfolioWrapper;
_tradeDataProvider = tradeDataProvider;
_logger = logger;
_dataBus = dataBus;
_exchangeConfig = options.Value;
foreach (var f in _exchangeConfig.TradingInstrumentsFigis)
{
TradingModes[f] = TradingMode.None;
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
_dataBus.AddChannel(nameof(Trader), _pricesChannel);
_dataBus.AddChannel(nameof(Trader), _orderbooks);
_dataBus.AddChannel(nameof(Trader), _commands);
_ = ProcessPrices();
_ = ProcessOrderbooks();
_ = ProcessCommands();
return Task.CompletedTask;
}
private async Task ProcessCommands()
{
while (await _commands.Reader.WaitToReadAsync())
{
var command = await _commands.Reader.ReadAsync();
try
{
if (command.CommandType == Contracts.Messaging.Dtos.Enums.TradeCommandType.OpenLong
|| command.CommandType == Contracts.Messaging.Dtos.Enums.TradeCommandType.OpenShort)
{
var fakeMessage = new NewPriceMessage() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
var positionType = command.CommandType == TradeCommandType.OpenLong ? PositionType.Long : PositionType.Short;
var stops = GetStops(fakeMessage, positionType);
var accounts = _portfolioWrapper.Accounts
.Where(a => !a.Value.Assets.ContainsKey(command.Figi))
.Take(1)
.Select(a => a.Value)
.ToArray();
await OpenPositions(accounts, fakeMessage, positionType, stops.stopLoss, stops.takeProfit, System.Math.Abs(command.Count));
}
else
{
var fakeMessage = new NewPriceMessage() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
var assetsForClose = _portfolioWrapper.Accounts
.SelectMany(a => a.Value.Assets.Values)
.Where(a => a.Figi == fakeMessage.Figi)
.ToArray();
await ClosePositions(assetsForClose, fakeMessage, false);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при выполнении команды.");
}
}
}
private async Task ProcessOrderbooks()
{
while (await _orderbooks.Reader.WaitToReadAsync())
{
var message = await _orderbooks.Reader.ReadAsync();
await _tradeDataProvider.AddOrderbook(message);
}
}
private async Task ProcessPrices()
{
var pricesCache1 = new Dictionary<string, List<INewPrice>>();
var pricesCache2 = new Dictionary<string, List<INewPrice>>();
var timesCache = new Dictionary<string, DateTime>();
while (await _pricesChannel.Reader.WaitToReadAsync())
{
var message = await _pricesChannel.Reader.ReadAsync();
if (!message.IsHistoricalData && DateTime.UtcNow - message.Time > TimeSpan.FromMinutes(1))
{
continue;
}
var changeMods = TraderUtils.GetInitDict(0);
try
{
message = TraderUtils.FilterHighFreqValues(message, message.Direction == 1 ? pricesCache1 : pricesCache2);
#region Добавление данных в кеши.
if (message.Figi == "BBG004730N88" || message.Figi == "FUTIMOEXF000")
{
if (message.Direction == 1)
{
await _tradeDataProvider.AddDataTo20SecondsWindowCache(message.Figi, "1", new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Count = message.Count,
Price = message.Price,
});
await _tradeDataProvider.AddDataTo5MinuteWindowCache(message.Figi, Constants._5minBuyCacheKey, new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Count = message.Count,
Price = message.Price,
});
await _tradeDataProvider.AddDataTo15MinuteWindowCache(message.Figi, Constants._15minBuyCacheKey, new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Count = message.Count,
Price = message.Price,
});
await _tradeDataProvider.AddDataTo1MinuteWindowCache(message.Figi, Constants._1minBuyCacheKey, new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Count = message.Count,
Price = message.Price,
});
}
if (message.Direction == 2)
{
await _tradeDataProvider.AddDataTo5MinuteWindowCache(message.Figi, Constants._5minSellCacheKey, new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Count = message.Count,
Price = message.Price,
});
await _tradeDataProvider.AddDataTo15MinuteWindowCache(message.Figi, Constants._15minSellCacheKey, new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Count = message.Count,
Price = message.Price,
});
await _tradeDataProvider.AddDataTo1MinuteWindowCache(message.Figi, Constants._1minSellCacheKey, new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Count = message.Count,
Price = message.Price,
});
await _tradeDataProvider.AddDataTo20SecondsWindowCache(message.Figi, "2", new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Count = message.Count,
Price = message.Price,
});
}
}
#endregion
if (_exchangeConfig.TradingInstrumentsFigis.Contains(message.Figi) && message.Direction == 1)
{
var smallWindow = TimeSpan.FromSeconds(120);
var bigWindow = TimeSpan.FromSeconds(240);
var meanWindow = TimeSpan.FromSeconds(240);
var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow;
var buys = await _tradeDataProvider.GetDataFrom15MinuteWindowCache(message.Figi, Constants._15minBuyCacheKey);
var sells = await _tradeDataProvider.GetDataFrom15MinuteWindowCache(message.Figi, Constants._15minSellCacheKey);
var trades = buys.ToList();
trades.AddRange(sells);
var trades2 = trades.OrderBy(t => t.Time).ToArray();
if (trades2.TryCalcTimeWindowsDiff(bigWindow, smallWindow, v => v.Count, false, out var tradesDiff)
&& buys.TryCalcTimeWindowsDiff(bigWindow, smallWindow, v => v.Price, true, out var pricesDiff))
{
await _tradeDataProvider.LogPrice(message, "privcesDiff", pricesDiff);
await _tradeDataProvider.LogPrice(message, "tradevolume_diff", tradesDiff);
await _tradeDataProvider.AddDataTo15MinuteWindowCache(message.Figi, "5min_diff", new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Value2 = tradesDiff,
Value = pricesDiff,
});
var diffs = await _tradeDataProvider.GetDataFrom15MinuteWindowCache(message.Figi, "5min_diff");
if (diffs.TryCalcTimeWindowsDiff(bigWindow, smallWindow, (c) => c.Value, true, out var resdp)
&& diffs.TryCalcTimeWindowsDiff(bigWindow, smallWindow, (c) => c.Value2, true, out var resv))
{
await _tradeDataProvider.LogPrice(message, "privcesDiffDiff", (decimal)resdp);
await _tradeDataProvider.LogPrice(message, "tradevolume_diff_diff", (decimal)resv);
if (diffs.TryCalcPirsonCorrelation(meanWindow, out var pirson))
{
await _tradeDataProvider.LogPrice(message, "diffs_pirson", (decimal)pirson);
await _tradeDataProvider.AddDataTo15MinuteWindowCache(message.Figi, "diffs_pirson", new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Value = (decimal)pirson,
});
var diffs_pirson = await _tradeDataProvider.GetDataFrom15MinuteWindowCache(message.Figi, "diffs_pirson");
if (diffs_pirson.TryCalcTimeWindowsDiff(bigWindow, smallWindow, (c) => c.Value, true, out var res))
{
if (DPirsonValues.TryGetValue(message.Figi, out var olddpirs))
{
if (olddpirs < 0.5m && res > 0.5m)
{
await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point0.5", message.Price);
}
}
DPirsonValues[message.Figi] = res;
}
}
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при боработке новой цены.");
}
}
}
private async Task ClosePositions(Asset[] assets, INewPrice message, bool withProfitOnly = true)
{
var loggedDeclisions = 0;
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
var assetsForClose = new List<Asset>();
var price = message.Price;
if (price == 0)
{
price = await _tradeDataProvider.GetLastPrice(message.Figi);
}
price = System.Math.Round(price, 2);
var messages = new List<string>();
foreach (var asset in assets)
{
Asset? assetForClose = null;
string? mess = null;
if (withProfitOnly)
{
var profit = 0m;
if (assetType == AssetType.Futures)
{
if (_tradeDataProvider.Orderbooks.TryGetValue(message.Figi, out var orderbook))
{
if (asset.Count < 0 && orderbook.Asks.Length > 0)
{
price = orderbook.Asks[0].Price;
}
else if (orderbook.Bids.Length > 0)
{
price = orderbook.Bids[0].Price;
}
}
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, price,
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
}
if (profit > 0)
{
profit = System.Math.Round(profit, 2);
assetForClose = asset;
mess = $"Закрываю позицию {asset.Figi} ({(asset.Count > 0 ? "лонг" : "шорт")}) на счёте {_portfolioWrapper.Accounts[asset.AccountId].AccountName}. Количество {(long)asset.Count}, цена ~{price}, профит {profit}";
if (loggedDeclisions == 0)
{
loggedDeclisions++;
await _tradeDataProvider.LogDeclision(asset.Count < 0 ? DeclisionTradeAction.CloseShortReal : DeclisionTradeAction.CloseLongReal, message, profit);
}
}
}
else
{
mess = $"Закрываю позицию {asset.Figi} ({(asset.Count > 0 ? "лонг" : "шорт")}) на счёте {_portfolioWrapper.Accounts[asset.AccountId].AccountName}. Количество {(long)asset.Count}, цена ~{price}";
assetForClose = asset;
}
if (assetForClose != null && mess != null)
{
await _portfolioWrapper.Accounts[asset.AccountId].ClosePosition(message.Figi);
await _dataBus.Broadcast(new MessageForAdmin() { Text = mess });
}
}
}
private async Task OpenPositions(IManagedAccount[] accounts, INewPrice message, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1)
{
var loggedDeclisions = 0;
var sign = positionType == PositionType.Long ? 1 : 1;
foreach (var acc in accounts)
{
if (TraderUtils.IsOperationAllowed(acc, message.Price, 1, _exchangeConfig.AccountCashPartFutures, _exchangeConfig.AccountCashPart))
{
await acc.OpenPosition(message.Figi, positionType, stopLossShift, takeProfitShift, count);
await _dataBus.Broadcast(new MessageForAdmin()
{
Text = $"Открываю позицию {message.Figi} ({(positionType == PositionType.Long ? "лонг" : "шорт")}) " +
$"на счёте {acc.AccountName}. Количество {(positionType == PositionType.Long ? "" : "-")}{count}, " +
$"цена ~{System.Math.Round(message.Price, 2)}. Стоп лосс: {(positionType == PositionType.Long ? "-" : "+")}{stopLossShift}. " +
$"Тейк профит: {(positionType == PositionType.Long ? "+" : "-")}{takeProfitShift}"
});
}
if (loggedDeclisions == 0)
{
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenLongReal, message);
loggedDeclisions++;
}
}
}
private async Task ExecuteDeclisions(INewPrice message, ImmutableDictionary<TradingEvent, decimal> result)
{
var state = ExchangeScheduler.GetCurrentState();
if (result[TradingEvent.UptrendStart] >= Constants.UppingCoefficient
&& state == ExchangeState.Open
)
{
var stops = GetStops(message, PositionType.Long);
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
{
var accounts = _portfolioWrapper.Accounts
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
.Take(1)
.Select(a => a.Value)
.ToArray();
await OpenPositions(accounts, message, PositionType.Long, stops.stopLoss, stops.takeProfit, 1);
}
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenLong, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsLong, message.Price + stops.takeProfit, message.Time.AddMilliseconds(-RandomNumberGenerator.GetInt32(300, 1000)), message);
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsLong, message.Price - stops.stopLoss, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(300, 1000)), message);
}
if (result[TradingEvent.DowntrendStart] >= Constants.UppingCoefficient
&& state == ExchangeState.Open
)
{
var stops = GetStops(message, PositionType.Short);
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
{
var accounts = _portfolioWrapper.Accounts
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
.Take(1)
.Select(a => a.Value)
.ToArray();
await OpenPositions(accounts, message, PositionType.Short, stops.stopLoss, stops.takeProfit, 1);
}
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenShort, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsShort, message.Price - stops.takeProfit, message.Time.AddMilliseconds(-RandomNumberGenerator.GetInt32(300, 1000)), message);
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsShort, message.Price + stops.stopLoss, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(300, 1000)), message);
}
if (result[TradingEvent.UptrendEnd] >= Constants.UppingCoefficient * 10)
{
if (!message.IsHistoricalData && BotModeSwitcher.CanSell())
{
var assetsForClose = _portfolioWrapper.Accounts
.SelectMany(a => a.Value.Assets.Values)
.Where(a => a.Figi == message.Figi && a.Count > 0)
.ToArray();
await ClosePositions(assetsForClose, message);
}
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.CloseLong, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
}
if (result[TradingEvent.DowntrendEnd] >= Constants.UppingCoefficient * 10)
{
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
{
var assetsForClose = _portfolioWrapper.Accounts
.SelectMany(a => a.Value.Assets.Values)
.Where(a => a.Figi == message.Figi && a.Count < 0)
.ToArray();
await ClosePositions(assetsForClose, message);
}
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.CloseShort, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
private decimal GetComission(AssetType assetType)
{
if (assetType == AssetType.Common)
{
return _exchangeConfig.ShareComission;
}
else if (assetType == AssetType.Futures)
{
return _exchangeConfig.FutureComission;
}
else
{
return 0;
}
}
private decimal GetLeverage(string figi, bool isShort)
{
var res = 1m;
var leverage = _exchangeConfig.InstrumentsSettings.FirstOrDefault(l => l.Figi == figi);
if (leverage != null)
{
res = isShort ? leverage.ShortLeverage : leverage.LongLeverage;
}
return res;
}
private (decimal stopLoss, decimal takeProfit) GetStops(INewPrice message, PositionType type)
{
decimal stopLossShift = 2m;
decimal takeProfitShift = 6;
return (stopLossShift, takeProfitShift);
}
}
}