klhztrader/KLHZ.Trader.Core/Exchange/Services/Trader.cs

406 lines
20 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using KLHZ.Trader.Core.Common;
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions;
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
using KLHZ.Trader.Core.Exchange.Models.Configs;
using KLHZ.Trader.Core.Exchange.Models.Trading;
using KLHZ.Trader.Core.Exchange.Utils;
using KLHZ.Trader.Core.Math.Declisions.Utils;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Security.Cryptography;
using System.Threading.Channels;
using Tinkoff.InvestApi;
using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;
using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType;
namespace KLHZ.Trader.Core.Exchange.Services
{
public class Trader : IHostedService
{
private readonly IDataBus _dataBus;
private readonly TraderDataProvider _tradeDataProvider;
private readonly ILogger<Trader> _logger;
private readonly ConcurrentDictionary<string, DateTime> OpeningStops = new();
private readonly ConcurrentDictionary<string, DateTime> ClosingStops = new();
private readonly ConcurrentDictionary<string, InstrumentSettings> Leverages = new();
private readonly decimal _futureComission;
private readonly decimal _shareComission;
private readonly decimal _accountCashPart;
private readonly decimal _accountCashPartFutures;
private readonly string[] _tradingInstrumentsFigis = [];
private readonly Channel<INewPrice> _pricesChannel = Channel.CreateUnbounded<INewPrice>();
private readonly Channel<IOrderbook> _ordersbookChannel = Channel.CreateUnbounded<IOrderbook>();
public Trader(
ILogger<Trader> logger,
IOptions<ExchangeConfig> options,
IDataBus dataBus,
TraderDataProvider tradeDataProvider,
InvestApiClient investApiClient)
{
_tradeDataProvider = tradeDataProvider;
_logger = logger;
_dataBus = dataBus;
_futureComission = options.Value.FutureComission;
_shareComission = options.Value.ShareComission;
_accountCashPart = options.Value.AccountCashPart;
_accountCashPartFutures = options.Value.AccountCashPartFutures;
_tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis;
foreach (var lev in options.Value.InstrumentsSettings)
{
Leverages.TryAdd(lev.Figi, lev);
}
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _tradeDataProvider.Init();
_dataBus.AddChannel(nameof(Trader), _pricesChannel);
_dataBus.AddChannel(nameof(Trader), _ordersbookChannel);
_ = ProcessPrices();
}
private async Task ProcessPrices()
{
while (await _pricesChannel.Reader.WaitToReadAsync())
{
var message = await _pricesChannel.Reader.ReadAsync();
if (message.IsHistoricalData)
{
await _tradeDataProvider.AddData(message, TimeSpan.FromHours(6));
}
if (_tradingInstrumentsFigis.Contains(message.Figi))
{
var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow;
try
{
if (message.Figi == "FUTIMOEXF000")
{
ProcessStops(message, currentTime);
var windowMaxSize = 1000;
await SellAssetsIfNeed(message);
var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize);
var state = ExchangeScheduler.GetCurrentState(message.Time);
await ProcessClearing(data, state, message);
await ProcessNewPriceIMOEXF(data, state, message, windowMaxSize);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF");
}
}
}
}
private async Task SellAssetsIfNeed(INewPrice message)
{
if (!BotModeSwitcher.CanSell())
{
return;
}
var accounts = _tradeDataProvider.Accounts.Values.ToArray();
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
foreach (var acc in accounts)
{
var assets = acc.Assets.Values.Where(a => a.Figi == message.Figi).ToArray();
foreach (var asset in assets)
{
var profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
var stoppingKey = message.Figi + asset.AccountId;
if (message.Time - asset.BoughtAt > TimeSpan.FromMinutes(4) && profit < -66m && !ClosingStops.ContainsKey(stoppingKey))
{
var command = new TradeCommand()
{
AccountId = asset.AccountId,
Figi = message.Figi,
CommandType = asset.Count < 0 ? Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy
: Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell,
Count = System.Math.Abs((long)asset.Count),
RecomendPrice = null,
EnableMargin = false,
};
await _dataBus.Broadcast(command);
_logger.LogWarning("Сброс актива {figi}! id команды {commandId} Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}",
message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin);
OpeningStops[message.Figi] = DateTime.UtcNow.AddMinutes(10);
ClosingStops[stoppingKey] = DateTime.UtcNow.AddSeconds(30);
await LogDeclision(DeclisionTradeAction.CloseLong, message, profit);
await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit);
}
if (message.Time - asset.BoughtAt > TimeSpan.FromHours(4) && profit > 100 && !ClosingStops.ContainsKey(stoppingKey))
{
var command = new TradeCommand()
{
AccountId = asset.AccountId,
Figi = message.Figi,
CommandType = asset.Count < 0 ? Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy
: Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell,
Count = (long)asset.Count,
RecomendPrice = null,
EnableMargin = false,
};
await _dataBus.Broadcast(command);
_logger.LogWarning("Сброс актива {figi}! id команды {commandId} Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}",
message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin);
ClosingStops[stoppingKey] = DateTime.UtcNow.AddSeconds(30);
await LogDeclision(DeclisionTradeAction.CloseLong, message, profit);
await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit);
}
}
}
}
private async Task ProcessNewPriceIMOEXF((DateTime[] timestamps, decimal[] prices) data,
ExchangeState state,
INewPrice message, int windowMaxSize)
{
var res = TradingEvent.None;
var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices,
windowMaxSize, 30, 180, TimeSpan.FromSeconds(20), -1m, 2m);
res |= resultMoveAvFull.events;
if (resultMoveAvFull.bigWindowAv != 0)
{
await LogPrice(message, Constants.BigWindowCrossingAverageProcessor, resultMoveAvFull.bigWindowAv);
await LogPrice(message, Constants.SmallWindowCrossingAverageProcessor, resultMoveAvFull.smallWindowAv);
}
var areasRel = -1m;
if (ShapeAreaCalculator.TryGetAreasRelation(data.timestamps, data.prices, message.Value, Constants.AreasRelationWindow, out var rel))
{
await _tradeDataProvider.AddDataTo1MinuteWindowCache(message.Figi, Constants._1minCacheKey, new Contracts.Declisions.Dtos.CachedValue()
{
Time = message.Time,
Value = (decimal)rel
});
var areas = await _tradeDataProvider.GetDataFrom1MinuteWindowCache(message.Figi, Constants._1minCacheKey);
areasRel = (decimal)areas.Sum(a => a.Value) / areas.Length;
await LogPrice(message, Constants.AreasRelationProcessor, areasRel);
}
if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart
&& !OpeningStops.TryGetValue(message.Figi, out _)
&& state == ExchangeState.Open
&& data.timestamps.Length > 1
&& (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1))
)
{
if (areasRel >= 20 && areasRel < 75)
{
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
{
var accounts = _tradeDataProvider.Accounts
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
.ToArray();
var loggedDeclisions = 0;
foreach (var acc in accounts)
{
if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart))
{
if (RandomNumberGenerator.GetInt32(100) > 50)
{
var command = new TradeCommand()
{
AccountId = acc.Value.AccountId,
Figi = message.Figi,
CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy,
Count = 1,
RecomendPrice = null,
};
await _dataBus.Broadcast(command);
_logger.LogWarning("Покупка актива {figi}! id команды {commandId}. Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}",
message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin);
if (loggedDeclisions == 0)
{
await LogDeclision(DeclisionTradeAction.OpenLongReal, message);
OpeningStops[message.Figi] = DateTime.UtcNow.AddMinutes(1);
loggedDeclisions++;
}
}
}
}
}
await LogDeclision(DeclisionTradeAction.OpenLong, message);
}
}
if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd)
{
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
var loggedDeclisions = 0;
if (!message.IsHistoricalData && BotModeSwitcher.CanSell())
{
var assetsForClose = _tradeDataProvider.Accounts
.SelectMany(a => a.Value.Assets.Values)
.Where(a => a.Figi == message.Figi && a.Count > 0)
.ToArray();
foreach (var asset in assetsForClose)
{
var profit = 0m;
if (assetType == AssetType.Common && asset.Count > 0)
{
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
GetComission(assetType), 1, false);
}
if (assetType == AssetType.Futures)
{
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
}
var stoppingKey = message.Figi + asset.AccountId;
if (profit > 0 && !ClosingStops.ContainsKey(stoppingKey))
{
ClosingStops[stoppingKey] = DateTime.UtcNow.AddSeconds(30);
var command = new TradeCommand()
{
AccountId = asset.AccountId,
Figi = message.Figi,
CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell,
Count = (long)asset.Count,
RecomendPrice = null,
EnableMargin = false,
};
await _dataBus.Broadcast(command);
_logger.LogWarning("Продажа актива {figi}! id команды {commandId}. Направление сделки: {dir}; Количество активов: {count}; Разрешена ли маржиналка: {margin}",
message.Figi, command.CommandId, command.CommandType, command.Count, command.EnableMargin);
if (loggedDeclisions == 0)
{
loggedDeclisions++;
await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit);
}
}
}
}
await LogDeclision(DeclisionTradeAction.CloseLong, message);
}
}
private async Task ProcessClearing((DateTime[] timestamps, decimal[] prices) data, ExchangeState state, INewPrice message)
{
if (state == ExchangeState.ClearingTime
&& !message.IsHistoricalData
&& data.timestamps.Length > 1
&& (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3))
{
await _tradeDataProvider.UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]);
}
}
private void ProcessStops(INewPrice message, DateTime currentTime)
{
if (OpeningStops.TryGetValue(message.Figi, out var dt))
{
if (dt < currentTime)
{
OpeningStops.TryRemove(message.Figi, out _);
}
}
if (ClosingStops.TryGetValue(message.Figi, out var dt2))
{
if (dt2 < currentTime)
{
ClosingStops.TryRemove(message.Figi, out _);
}
}
}
private async Task LogPrice(INewPrice message, string processor, decimal value)
{
await _tradeDataProvider.LogPrice(new ProcessedPrice()
{
Figi = message.Figi,
Ticker = message.Ticker,
Processor = processor,
Time = message.Time,
Value = value,
}, false);
}
private async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null)
{
await _tradeDataProvider.LogDeclision(new Declision()
{
AccountId = string.Empty,
Figi = message.Figi,
Ticker = message.Ticker,
Value = profit,
Price = message.Value,
Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
Action = action,
}, false);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
private decimal GetComission(AssetType assetType)
{
if (assetType == AssetType.Common)
{
return _shareComission;
}
else if (assetType == AssetType.Futures)
{
return _futureComission;
}
else
{
return 0;
}
}
private decimal GetLeverage(string figi, bool isShort)
{
var res = 1m;
if (Leverages.TryGetValue(figi, out var leverage))
{
res = isShort ? leverage.ShortLeverage : leverage.LongLeverage;
}
return res;
}
internal static bool IsBuyAllowed(ManagedAccount account, decimal boutPrice, decimal count,
decimal accountCashPartFutures, decimal accountCashPart)
{
if (!BotModeSwitcher.CanPurchase()) return false;
var balance = account.Balance;
var total = account.Total;
var futures = account.Assets.Values.FirstOrDefault(v => v.Type == AssetType.Futures);
if (futures != null)
{
if ((balance - boutPrice * count) / total < accountCashPartFutures) return false;
}
else
{
if ((balance - boutPrice * count) / total < accountCashPart) return false;
}
return true;
}
}
}