klhztrader/KLHZ.Trader.Core/Exchange/Services/Trader.cs

420 lines
22 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using KLHZ.Trader.Core.Common;
using KLHZ.Trader.Core.Contracts.Common.Enums;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
using KLHZ.Trader.Core.Exchange.Interfaces;
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using KLHZ.Trader.Core.Exchange.Models.Configs;
using KLHZ.Trader.Core.Exchange.Models.Trading;
using KLHZ.Trader.Core.Exchange.Utils;
using KLHZ.Trader.Core.Math.Declisions.Utils;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Security.Cryptography;
using System.Threading.Channels;
using Tinkoff.InvestApi;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class Trader : IHostedService
{
private readonly IDataBus _dataBus;
private readonly TraderDataProvider _tradeDataProvider;
private readonly PortfolioWrapper _portfolioWrapper;
private readonly ExchangeConfig _exchangeConfig;
private readonly ILogger<Trader> _logger;
private readonly ConcurrentDictionary<string, TradingMode> TradingModes = new();
private readonly ConcurrentDictionary<string, decimal> DPirsonValues = new();
private readonly ConcurrentDictionary<string, decimal> DPricesValues = new();
private readonly Channel<ITradeDataItem> _pricesChannel = Channel.CreateUnbounded<ITradeDataItem>();
private readonly Channel<ITradeCommand> _commands = Channel.CreateUnbounded<ITradeCommand>();
private readonly Channel<IOrderbook> _orderbooks = Channel.CreateUnbounded<IOrderbook>();
public Trader(
ILogger<Trader> logger,
IOptions<ExchangeConfig> options,
IDataBus dataBus,
PortfolioWrapper portfolioWrapper,
TraderDataProvider tradeDataProvider,
InvestApiClient investApiClient)
{
_portfolioWrapper = portfolioWrapper;
_tradeDataProvider = tradeDataProvider;
_logger = logger;
_dataBus = dataBus;
_exchangeConfig = options.Value;
foreach (var f in _exchangeConfig.TradingInstrumentsFigis)
{
TradingModes[f] = TradingMode.None;
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
_dataBus.AddChannel(nameof(Trader), _pricesChannel);
_dataBus.AddChannel(nameof(Trader), _orderbooks);
_dataBus.AddChannel(nameof(Trader), _commands);
_ = ProcessPrices();
_ = ProcessOrderbooks();
_ = ProcessCommands();
return Task.CompletedTask;
}
private async Task ProcessCommands()
{
while (await _commands.Reader.WaitToReadAsync())
{
var command = await _commands.Reader.ReadAsync();
try
{
if (command.CommandType == TradeCommandType.OpenLong
|| command.CommandType == TradeCommandType.OpenShort)
{
var fakeMessage = new TradeDataItem() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
var positionType = command.CommandType == TradeCommandType.OpenLong ? PositionType.Long : PositionType.Short;
var stops = GetStops(fakeMessage, positionType);
var accounts = _portfolioWrapper.Accounts
.Where(a => !a.Value.Assets.ContainsKey(command.Figi))
.Take(1)
.Select(a => a.Value)
.ToArray();
await OpenPositions(accounts, fakeMessage, positionType, stops.stopLoss, stops.takeProfit, System.Math.Abs(command.Count));
}
else
{
var fakeMessage = new TradeDataItem() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
var assetsForClose = _portfolioWrapper.Accounts
.SelectMany(a => a.Value.Assets.Values)
.Where(a => a.Figi == fakeMessage.Figi)
.ToArray();
await ClosePositions(assetsForClose, fakeMessage, false);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при выполнении команды.");
}
}
}
private async Task ProcessOrderbooks()
{
while (await _orderbooks.Reader.WaitToReadAsync())
{
var message = await _orderbooks.Reader.ReadAsync();
await _tradeDataProvider.AddOrderbook(message);
}
}
private async Task ProcessPrices()
{
var pricesCache1 = new Dictionary<string, List<ITradeDataItem>>();
var pricesCache2 = new Dictionary<string, List<ITradeDataItem>>();
var timesCache = new Dictionary<string, DateTime>();
while (await _pricesChannel.Reader.WaitToReadAsync())
{
var message = await _pricesChannel.Reader.ReadAsync();
if (!message.IsHistoricalData && DateTime.UtcNow - message.Time > TimeSpan.FromMinutes(1))
{
continue;
}
var changeMods = TraderUtils.GetInitDict(0);
try
{
//message = TraderUtils.FilterHighFreqValues(message, message.Direction == 1 ? pricesCache1 : pricesCache2);
#region Добавление данных в кеши.
if (message.Figi == "BBG004730N88" || message.Figi == "FUTIMOEXF000")
{
await _tradeDataProvider.AddData(message);
}
#endregion
if (_exchangeConfig.TradingInstrumentsFigis.Contains(message.Figi) && message.Direction == 1)
{
var _15minCacheSize = TimeSpan.FromSeconds(400);
var smallWindow = TimeSpan.FromSeconds(180);
var bigWindow = TimeSpan.FromSeconds(360);
var meanWindow = TimeSpan.FromSeconds(360);
var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow;
var buys = await _tradeDataProvider.GetDataForTimeWindow(message.Figi, _15minCacheSize, selector: (i) => i.Direction == 1);
var trades = await _tradeDataProvider.GetDataForTimeWindow(message.Figi, _15minCacheSize);
if (trades.TryCalcTimeWindowsDiff(bigWindow, smallWindow, v => v.Count, false, out var tradesDiff)
&& buys.TryCalcTimeDiff(bigWindow, smallWindow, v => v.Price, true, out var pricesDiff))
{
await _tradeDataProvider.LogPrice(message, "privcesDiff", pricesDiff);
await _tradeDataProvider.LogPrice(message, "tradevolume_diff", tradesDiff);
await _tradeDataProvider.AddData(message.Figi, "5min_diff", new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Value2 = tradesDiff,
Value = pricesDiff,
Figi = message.Figi,
Ticker = message.Ticker,
});
if (DPricesValues.TryGetValue(message.Figi, out var olddPrice))
{
if (olddPrice < 0m && pricesDiff > 0)
{
//await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_long_in", message.Price);
}
if (olddPrice > 0m && pricesDiff < 0m)
{
//await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_short_in", message.Price);
}
}
DPricesValues[message.Figi] = pricesDiff;
var diffs = await _tradeDataProvider.GetDataForTimeWindow(message.Figi, _15minCacheSize, "5min_diff");
if (diffs.TryCalcPirsonCorrelation(meanWindow, out var pirson))
{
var res = pirson;
await _tradeDataProvider.LogPrice(message, "diffs_pirson", (decimal)pirson);
await _tradeDataProvider.AddData(message.Figi, "diffs_pirson", new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Value = (decimal)pirson,
Figi = message.Figi,
Ticker = message.Ticker,
});
if (DPirsonValues.TryGetValue(message.Figi, out var olddpirs))
{
if (olddpirs < -0.3m && res > -0.3m && pricesDiff > 0 && (tradesDiff > 0))
{
await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_long_in", message.Price);
}
if (olddpirs > 0.7m && res < 0.7m)
{
// await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_long_out", message.Price);
}
if (olddpirs > 0.3m && res < 0.3m && pricesDiff < 0 && (tradesDiff > 0))
{
await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_short_in", message.Price);
}
if (olddpirs < -0.7m && res > -0.7m)
{
// await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_short_out", message.Price);
}
}
DPirsonValues[message.Figi] = res;
}
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при боработке новой цены.");
}
}
}
private async Task ClosePositions(Asset[] assets, ITradeDataItem message, bool withProfitOnly = true)
{
var loggedDeclisions = 0;
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
var assetsForClose = new List<Asset>();
var price = message.Price;
if (price == 0)
{
price = await _tradeDataProvider.GetLastPrice(message.Figi);
}
price = System.Math.Round(price, 2);
var messages = new List<string>();
foreach (var asset in assets)
{
Asset? assetForClose = null;
string? mess = null;
if (withProfitOnly)
{
var profit = 0m;
if (assetType == AssetType.Futures)
{
if (_tradeDataProvider.Orderbooks.TryGetValue(message.Figi, out var orderbook))
{
if (asset.Count < 0 && orderbook.Asks.Length > 0)
{
price = orderbook.Asks[0].Price;
}
else if (orderbook.Bids.Length > 0)
{
price = orderbook.Bids[0].Price;
}
}
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, price,
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
}
if (profit > 0)
{
profit = System.Math.Round(profit, 2);
assetForClose = asset;
mess = $"Закрываю позицию {asset.Figi} ({(asset.Count > 0 ? "лонг" : "шорт")}) на счёте {_portfolioWrapper.Accounts[asset.AccountId].AccountName}. Количество {(long)asset.Count}, цена ~{price}, профит {profit}";
if (loggedDeclisions == 0)
{
loggedDeclisions++;
await _tradeDataProvider.LogDeclision(asset.Count < 0 ? DeclisionTradeAction.CloseShortReal : DeclisionTradeAction.CloseLongReal, message, profit);
}
}
}
else
{
mess = $"Закрываю позицию {asset.Figi} ({(asset.Count > 0 ? "лонг" : "шорт")}) на счёте {_portfolioWrapper.Accounts[asset.AccountId].AccountName}. Количество {(long)asset.Count}, цена ~{price}";
assetForClose = asset;
}
if (assetForClose != null && mess != null)
{
await _portfolioWrapper.Accounts[asset.AccountId].ClosePosition(message.Figi);
await _dataBus.Broadcast(new MessageForAdmin() { Text = mess });
}
}
}
private async Task OpenPositions(IManagedAccount[] accounts, ITradeDataItem message, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1)
{
var loggedDeclisions = 0;
var sign = positionType == PositionType.Long ? 1 : 1;
foreach (var acc in accounts)
{
if (TraderUtils.IsOperationAllowed(acc, message.Price, 1, _exchangeConfig.AccountCashPartFutures, _exchangeConfig.AccountCashPart))
{
await acc.OpenPosition(message.Figi, positionType, stopLossShift, takeProfitShift, count);
await _dataBus.Broadcast(new MessageForAdmin()
{
Text = $"Открываю позицию {message.Figi} ({(positionType == PositionType.Long ? "лонг" : "шорт")}) " +
$"на счёте {acc.AccountName}. Количество {(positionType == PositionType.Long ? "" : "-")}{count}, " +
$"цена ~{System.Math.Round(message.Price, 2)}. Стоп лосс: {(positionType == PositionType.Long ? "-" : "+")}{stopLossShift}. " +
$"Тейк профит: {(positionType == PositionType.Long ? "+" : "-")}{takeProfitShift}"
});
}
if (loggedDeclisions == 0)
{
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenLongReal, message);
loggedDeclisions++;
}
}
}
private async Task ExecuteDeclisions(ITradeDataItem message, ImmutableDictionary<TradingEvent, decimal> result)
{
var state = ExchangeScheduler.GetCurrentState();
if (result[TradingEvent.UptrendStart] >= Constants.UppingCoefficient
&& state == ExchangeState.Open
)
{
var stops = GetStops(message, PositionType.Long);
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
{
var accounts = _portfolioWrapper.Accounts
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
.Take(1)
.Select(a => a.Value)
.ToArray();
await OpenPositions(accounts, message, PositionType.Long, stops.stopLoss, stops.takeProfit, 1);
}
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenLong, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsLong, message.Price + stops.takeProfit, message.Time.AddMilliseconds(-RandomNumberGenerator.GetInt32(300, 1000)), message);
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsLong, message.Price - stops.stopLoss, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(300, 1000)), message);
}
if (result[TradingEvent.DowntrendStart] >= Constants.UppingCoefficient
&& state == ExchangeState.Open
)
{
var stops = GetStops(message, PositionType.Short);
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
{
var accounts = _portfolioWrapper.Accounts
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
.Take(1)
.Select(a => a.Value)
.ToArray();
await OpenPositions(accounts, message, PositionType.Short, stops.stopLoss, stops.takeProfit, 1);
}
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenShort, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsShort, message.Price - stops.takeProfit, message.Time.AddMilliseconds(-RandomNumberGenerator.GetInt32(300, 1000)), message);
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsShort, message.Price + stops.stopLoss, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(300, 1000)), message);
}
if (result[TradingEvent.UptrendEnd] >= Constants.UppingCoefficient * 10)
{
if (!message.IsHistoricalData && BotModeSwitcher.CanSell())
{
var assetsForClose = _portfolioWrapper.Accounts
.SelectMany(a => a.Value.Assets.Values)
.Where(a => a.Figi == message.Figi && a.Count > 0)
.ToArray();
await ClosePositions(assetsForClose, message);
}
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.CloseLong, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
}
if (result[TradingEvent.DowntrendEnd] >= Constants.UppingCoefficient * 10)
{
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
{
var assetsForClose = _portfolioWrapper.Accounts
.SelectMany(a => a.Value.Assets.Values)
.Where(a => a.Figi == message.Figi && a.Count < 0)
.ToArray();
await ClosePositions(assetsForClose, message);
}
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.CloseShort, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
private decimal GetComission(AssetType assetType)
{
if (assetType == AssetType.Common)
{
return _exchangeConfig.ShareComission;
}
else if (assetType == AssetType.Futures)
{
return _exchangeConfig.FutureComission;
}
else
{
return 0;
}
}
private decimal GetLeverage(string figi, bool isShort)
{
var res = 1m;
var leverage = _exchangeConfig.InstrumentsSettings.FirstOrDefault(l => l.Figi == figi);
if (leverage != null)
{
res = isShort ? leverage.ShortLeverage : leverage.LongLeverage;
}
return res;
}
private (decimal stopLoss, decimal takeProfit) GetStops(ITradeDataItem message, PositionType type)
{
decimal stopLossShift = 2m;
decimal takeProfitShift = 6;
return (stopLossShift, takeProfitShift);
}
}
}