420 lines
22 KiB
C#
420 lines
22 KiB
C#
using KLHZ.Trader.Core.Common;
|
||
using KLHZ.Trader.Core.Contracts.Common.Enums;
|
||
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
|
||
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
|
||
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Enums;
|
||
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
|
||
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
|
||
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
|
||
using KLHZ.Trader.Core.Exchange.Interfaces;
|
||
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
|
||
using KLHZ.Trader.Core.Exchange.Models.Configs;
|
||
using KLHZ.Trader.Core.Exchange.Models.Trading;
|
||
using KLHZ.Trader.Core.Exchange.Utils;
|
||
using KLHZ.Trader.Core.Math.Declisions.Utils;
|
||
using Microsoft.EntityFrameworkCore;
|
||
using Microsoft.Extensions.Hosting;
|
||
using Microsoft.Extensions.Logging;
|
||
using Microsoft.Extensions.Options;
|
||
using System.Collections.Concurrent;
|
||
using System.Collections.Immutable;
|
||
using System.Security.Cryptography;
|
||
using System.Threading.Channels;
|
||
using Tinkoff.InvestApi;
|
||
|
||
namespace KLHZ.Trader.Core.Exchange.Services
|
||
{
|
||
public class Trader : IHostedService
|
||
{
|
||
private readonly IDataBus _dataBus;
|
||
private readonly TraderDataProvider _tradeDataProvider;
|
||
private readonly PortfolioWrapper _portfolioWrapper;
|
||
private readonly ExchangeConfig _exchangeConfig;
|
||
private readonly ILogger<Trader> _logger;
|
||
|
||
private readonly ConcurrentDictionary<string, TradingMode> TradingModes = new();
|
||
|
||
private readonly ConcurrentDictionary<string, decimal> DPirsonValues = new();
|
||
private readonly ConcurrentDictionary<string, decimal> DPricesValues = new();
|
||
|
||
private readonly Channel<ITradeDataItem> _pricesChannel = Channel.CreateUnbounded<ITradeDataItem>();
|
||
private readonly Channel<ITradeCommand> _commands = Channel.CreateUnbounded<ITradeCommand>();
|
||
private readonly Channel<IOrderbook> _orderbooks = Channel.CreateUnbounded<IOrderbook>();
|
||
public Trader(
|
||
ILogger<Trader> logger,
|
||
IOptions<ExchangeConfig> options,
|
||
IDataBus dataBus,
|
||
PortfolioWrapper portfolioWrapper,
|
||
TraderDataProvider tradeDataProvider,
|
||
InvestApiClient investApiClient)
|
||
{
|
||
_portfolioWrapper = portfolioWrapper;
|
||
_tradeDataProvider = tradeDataProvider;
|
||
_logger = logger;
|
||
_dataBus = dataBus;
|
||
_exchangeConfig = options.Value;
|
||
foreach (var f in _exchangeConfig.TradingInstrumentsFigis)
|
||
{
|
||
TradingModes[f] = TradingMode.None;
|
||
}
|
||
}
|
||
|
||
public Task StartAsync(CancellationToken cancellationToken)
|
||
{
|
||
_dataBus.AddChannel(nameof(Trader), _pricesChannel);
|
||
_dataBus.AddChannel(nameof(Trader), _orderbooks);
|
||
_dataBus.AddChannel(nameof(Trader), _commands);
|
||
_ = ProcessPrices();
|
||
_ = ProcessOrderbooks();
|
||
_ = ProcessCommands();
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
private async Task ProcessCommands()
|
||
{
|
||
while (await _commands.Reader.WaitToReadAsync())
|
||
{
|
||
var command = await _commands.Reader.ReadAsync();
|
||
try
|
||
{
|
||
if (command.CommandType == TradeCommandType.OpenLong
|
||
|| command.CommandType == TradeCommandType.OpenShort)
|
||
{
|
||
var fakeMessage = new TradeDataItem() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
|
||
var positionType = command.CommandType == TradeCommandType.OpenLong ? PositionType.Long : PositionType.Short;
|
||
var stops = GetStops(fakeMessage, positionType);
|
||
var accounts = _portfolioWrapper.Accounts
|
||
.Where(a => !a.Value.Assets.ContainsKey(command.Figi))
|
||
.Take(1)
|
||
.Select(a => a.Value)
|
||
.ToArray();
|
||
await OpenPositions(accounts, fakeMessage, positionType, stops.stopLoss, stops.takeProfit, System.Math.Abs(command.Count));
|
||
}
|
||
else
|
||
{
|
||
var fakeMessage = new TradeDataItem() { Figi = command.Figi, Ticker = "", Count = command.Count, Direction = 1, IsHistoricalData = false, Time = DateTime.UtcNow, Price = command.RecomendPrice ?? 0m };
|
||
var assetsForClose = _portfolioWrapper.Accounts
|
||
.SelectMany(a => a.Value.Assets.Values)
|
||
.Where(a => a.Figi == fakeMessage.Figi)
|
||
.ToArray();
|
||
await ClosePositions(assetsForClose, fakeMessage, false);
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "Ошибка при выполнении команды.");
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task ProcessOrderbooks()
|
||
{
|
||
while (await _orderbooks.Reader.WaitToReadAsync())
|
||
{
|
||
var message = await _orderbooks.Reader.ReadAsync();
|
||
await _tradeDataProvider.AddOrderbook(message);
|
||
}
|
||
}
|
||
|
||
private async Task ProcessPrices()
|
||
{
|
||
var pricesCache1 = new Dictionary<string, List<ITradeDataItem>>();
|
||
var pricesCache2 = new Dictionary<string, List<ITradeDataItem>>();
|
||
var timesCache = new Dictionary<string, DateTime>();
|
||
while (await _pricesChannel.Reader.WaitToReadAsync())
|
||
{
|
||
var message = await _pricesChannel.Reader.ReadAsync();
|
||
if (!message.IsHistoricalData && DateTime.UtcNow - message.Time > TimeSpan.FromMinutes(1))
|
||
{
|
||
continue;
|
||
}
|
||
var changeMods = TraderUtils.GetInitDict(0);
|
||
try
|
||
{
|
||
message = TraderUtils.FilterHighFreqValues(message, message.Direction == 1 ? pricesCache1 : pricesCache2);
|
||
|
||
#region Добавление данных в кеши.
|
||
if (message.Figi == "BBG004730N88" || message.Figi == "FUTIMOEXF000")
|
||
{
|
||
await _tradeDataProvider.AddData(message);
|
||
}
|
||
#endregion
|
||
if (_exchangeConfig.TradingInstrumentsFigis.Contains(message.Figi) && message.Direction == 1)
|
||
{
|
||
var _15minCacheSize = TimeSpan.FromSeconds(400);
|
||
var smallWindow = TimeSpan.FromSeconds(180);
|
||
var bigWindow = TimeSpan.FromSeconds(360);
|
||
var meanWindow = TimeSpan.FromSeconds(360);
|
||
var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow;
|
||
|
||
var buys = await _tradeDataProvider.GetDataForTimeWindow(message.Figi, _15minCacheSize, selector: (i) => i.Direction == 1);
|
||
var trades = await _tradeDataProvider.GetDataForTimeWindow(message.Figi, _15minCacheSize);
|
||
if (trades.TryCalcTimeWindowsDiff(bigWindow, smallWindow, v => v.Count, false, out var tradesDiff)
|
||
&& buys.TryCalcTimeDiff(bigWindow, smallWindow, v => v.Price, true, out var pricesDiff))
|
||
{
|
||
await _tradeDataProvider.LogPrice(message, "privcesDiff", pricesDiff);
|
||
await _tradeDataProvider.LogPrice(message, "tradevolume_diff", tradesDiff);
|
||
await _tradeDataProvider.AddData(message.Figi, "5min_diff", new Contracts.Declisions.Dtos.CachedValue()
|
||
{
|
||
Time = message.Time,
|
||
Value2 = tradesDiff,
|
||
Value = pricesDiff,
|
||
Figi = message.Figi,
|
||
Ticker = message.Ticker,
|
||
});
|
||
|
||
if (DPricesValues.TryGetValue(message.Figi, out var olddPrice))
|
||
{
|
||
if (olddPrice < 0m && pricesDiff >0)
|
||
{
|
||
//await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_long_in", message.Price);
|
||
}
|
||
if (olddPrice > 0m && pricesDiff < 0m)
|
||
{
|
||
//await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_short_in", message.Price);
|
||
}
|
||
}
|
||
DPricesValues[message.Figi] = pricesDiff;
|
||
var diffs = await _tradeDataProvider.GetDataForTimeWindow(message.Figi, _15minCacheSize, "5min_diff");
|
||
if (diffs.TryCalcPirsonCorrelation(meanWindow, out var pirson))
|
||
{
|
||
var res = pirson;
|
||
await _tradeDataProvider.LogPrice(message, "diffs_pirson", (decimal)pirson);
|
||
await _tradeDataProvider.AddData(message.Figi, "diffs_pirson", new Contracts.Declisions.Dtos.CachedValue()
|
||
{
|
||
Time = message.Time,
|
||
Value = (decimal)pirson,
|
||
Figi = message.Figi,
|
||
Ticker = message.Ticker,
|
||
});
|
||
if (DPirsonValues.TryGetValue(message.Figi, out var olddpirs))
|
||
{
|
||
if (olddpirs < -0.3m && res > -0.3m && pricesDiff>0 && (tradesDiff > 0))
|
||
{
|
||
await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_long_in", message.Price);
|
||
}
|
||
if (olddpirs > 0.7m && res < 0.7m)
|
||
{
|
||
// await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_long_out", message.Price);
|
||
}
|
||
if (olddpirs > 0.3m && res < 0.3m && pricesDiff < 0 && (tradesDiff > 0))
|
||
{
|
||
await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_short_in", message.Price);
|
||
}
|
||
if (olddpirs < -0.7m && res > -0.7m)
|
||
{
|
||
// await _tradeDataProvider.LogPrice(message, "diffs_pirson_diff_point_short_out", message.Price);
|
||
}
|
||
}
|
||
DPirsonValues[message.Figi] = res;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "Ошибка при боработке новой цены.");
|
||
}
|
||
|
||
}
|
||
}
|
||
|
||
private async Task ClosePositions(Asset[] assets, ITradeDataItem message, bool withProfitOnly = true)
|
||
{
|
||
var loggedDeclisions = 0;
|
||
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
|
||
var assetsForClose = new List<Asset>();
|
||
var price = message.Price;
|
||
if (price == 0)
|
||
{
|
||
price = await _tradeDataProvider.GetLastPrice(message.Figi);
|
||
}
|
||
price = System.Math.Round(price, 2);
|
||
var messages = new List<string>();
|
||
foreach (var asset in assets)
|
||
{
|
||
Asset? assetForClose = null;
|
||
string? mess = null;
|
||
if (withProfitOnly)
|
||
{
|
||
var profit = 0m;
|
||
|
||
if (assetType == AssetType.Futures)
|
||
{
|
||
if (_tradeDataProvider.Orderbooks.TryGetValue(message.Figi, out var orderbook))
|
||
{
|
||
if (asset.Count < 0 && orderbook.Asks.Length > 0)
|
||
{
|
||
price = orderbook.Asks[0].Price;
|
||
}
|
||
else if (orderbook.Bids.Length > 0)
|
||
{
|
||
price = orderbook.Bids[0].Price;
|
||
}
|
||
}
|
||
|
||
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, price,
|
||
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
|
||
}
|
||
if (profit > 0)
|
||
{
|
||
profit = System.Math.Round(profit, 2);
|
||
assetForClose = asset;
|
||
mess = $"Закрываю позицию {asset.Figi} ({(asset.Count > 0 ? "лонг" : "шорт")}) на счёте {_portfolioWrapper.Accounts[asset.AccountId].AccountName}. Количество {(long)asset.Count}, цена ~{price}, профит {profit}";
|
||
if (loggedDeclisions == 0)
|
||
{
|
||
loggedDeclisions++;
|
||
await _tradeDataProvider.LogDeclision(asset.Count < 0 ? DeclisionTradeAction.CloseShortReal : DeclisionTradeAction.CloseLongReal, message, profit);
|
||
}
|
||
}
|
||
}
|
||
else
|
||
{
|
||
mess = $"Закрываю позицию {asset.Figi} ({(asset.Count > 0 ? "лонг" : "шорт")}) на счёте {_portfolioWrapper.Accounts[asset.AccountId].AccountName}. Количество {(long)asset.Count}, цена ~{price}";
|
||
assetForClose = asset;
|
||
}
|
||
|
||
if (assetForClose != null && mess != null)
|
||
{
|
||
await _portfolioWrapper.Accounts[asset.AccountId].ClosePosition(message.Figi);
|
||
await _dataBus.Broadcast(new MessageForAdmin() { Text = mess });
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task OpenPositions(IManagedAccount[] accounts, ITradeDataItem message, PositionType positionType, decimal stopLossShift, decimal takeProfitShift, long count = 1)
|
||
{
|
||
var loggedDeclisions = 0;
|
||
var sign = positionType == PositionType.Long ? 1 : 1;
|
||
foreach (var acc in accounts)
|
||
{
|
||
if (TraderUtils.IsOperationAllowed(acc, message.Price, 1, _exchangeConfig.AccountCashPartFutures, _exchangeConfig.AccountCashPart))
|
||
{
|
||
await acc.OpenPosition(message.Figi, positionType, stopLossShift, takeProfitShift, count);
|
||
await _dataBus.Broadcast(new MessageForAdmin()
|
||
{
|
||
Text = $"Открываю позицию {message.Figi} ({(positionType == PositionType.Long ? "лонг" : "шорт")}) " +
|
||
$"на счёте {acc.AccountName}. Количество {(positionType == PositionType.Long ? "" : "-")}{count}, " +
|
||
$"цена ~{System.Math.Round(message.Price, 2)}. Стоп лосс: {(positionType == PositionType.Long ? "-" : "+")}{stopLossShift}. " +
|
||
$"Тейк профит: {(positionType == PositionType.Long ? "+" : "-")}{takeProfitShift}"
|
||
});
|
||
}
|
||
|
||
if (loggedDeclisions == 0)
|
||
{
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenLongReal, message);
|
||
loggedDeclisions++;
|
||
}
|
||
}
|
||
}
|
||
|
||
private async Task ExecuteDeclisions(ITradeDataItem message, ImmutableDictionary<TradingEvent, decimal> result)
|
||
{
|
||
var state = ExchangeScheduler.GetCurrentState();
|
||
if (result[TradingEvent.UptrendStart] >= Constants.UppingCoefficient
|
||
&& state == ExchangeState.Open
|
||
)
|
||
{
|
||
var stops = GetStops(message, PositionType.Long);
|
||
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
|
||
{
|
||
var accounts = _portfolioWrapper.Accounts
|
||
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
|
||
.Take(1)
|
||
.Select(a => a.Value)
|
||
.ToArray();
|
||
await OpenPositions(accounts, message, PositionType.Long, stops.stopLoss, stops.takeProfit, 1);
|
||
}
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenLong, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsLong, message.Price + stops.takeProfit, message.Time.AddMilliseconds(-RandomNumberGenerator.GetInt32(300, 1000)), message);
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsLong, message.Price - stops.stopLoss, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(300, 1000)), message);
|
||
}
|
||
if (result[TradingEvent.DowntrendStart] >= Constants.UppingCoefficient
|
||
&& state == ExchangeState.Open
|
||
)
|
||
{
|
||
var stops = GetStops(message, PositionType.Short);
|
||
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
|
||
{
|
||
var accounts = _portfolioWrapper.Accounts
|
||
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
|
||
.Take(1)
|
||
.Select(a => a.Value)
|
||
.ToArray();
|
||
await OpenPositions(accounts, message, PositionType.Short, stops.stopLoss, stops.takeProfit, 1);
|
||
}
|
||
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.OpenShort, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsShort, message.Price - stops.takeProfit, message.Time.AddMilliseconds(-RandomNumberGenerator.GetInt32(300, 1000)), message);
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.ResetStopsShort, message.Price + stops.stopLoss, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(300, 1000)), message);
|
||
}
|
||
if (result[TradingEvent.UptrendEnd] >= Constants.UppingCoefficient * 10)
|
||
{
|
||
if (!message.IsHistoricalData && BotModeSwitcher.CanSell())
|
||
{
|
||
var assetsForClose = _portfolioWrapper.Accounts
|
||
.SelectMany(a => a.Value.Assets.Values)
|
||
.Where(a => a.Figi == message.Figi && a.Count > 0)
|
||
.ToArray();
|
||
await ClosePositions(assetsForClose, message);
|
||
}
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.CloseLong, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
|
||
|
||
}
|
||
|
||
if (result[TradingEvent.DowntrendEnd] >= Constants.UppingCoefficient * 10)
|
||
{
|
||
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
|
||
{
|
||
var assetsForClose = _portfolioWrapper.Accounts
|
||
.SelectMany(a => a.Value.Assets.Values)
|
||
.Where(a => a.Figi == message.Figi && a.Count < 0)
|
||
.ToArray();
|
||
await ClosePositions(assetsForClose, message);
|
||
}
|
||
await _tradeDataProvider.LogDeclision(DeclisionTradeAction.CloseShort, message.Price, message.Time.AddMilliseconds(RandomNumberGenerator.GetInt32(-100, 100)), message);
|
||
|
||
}
|
||
}
|
||
|
||
public Task StopAsync(CancellationToken cancellationToken)
|
||
{
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
private decimal GetComission(AssetType assetType)
|
||
{
|
||
if (assetType == AssetType.Common)
|
||
{
|
||
return _exchangeConfig.ShareComission;
|
||
}
|
||
else if (assetType == AssetType.Futures)
|
||
{
|
||
return _exchangeConfig.FutureComission;
|
||
}
|
||
else
|
||
{
|
||
return 0;
|
||
}
|
||
}
|
||
|
||
private decimal GetLeverage(string figi, bool isShort)
|
||
{
|
||
var res = 1m;
|
||
var leverage = _exchangeConfig.InstrumentsSettings.FirstOrDefault(l => l.Figi == figi);
|
||
if (leverage != null)
|
||
{
|
||
res = isShort ? leverage.ShortLeverage : leverage.LongLeverage;
|
||
}
|
||
return res;
|
||
}
|
||
|
||
private (decimal stopLoss, decimal takeProfit) GetStops(ITradeDataItem message, PositionType type)
|
||
{
|
||
decimal stopLossShift = 2m;
|
||
decimal takeProfitShift = 6;
|
||
return (stopLossShift, takeProfitShift);
|
||
}
|
||
}
|
||
}
|