365 lines
16 KiB
C#
365 lines
16 KiB
C#
using KLHZ.Trader.Core.Common;
|
|
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
|
|
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
|
|
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
|
|
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
|
|
using KLHZ.Trader.Core.DataLayer.Entities.Declisions;
|
|
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
|
|
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
|
|
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
|
|
using KLHZ.Trader.Core.Exchange.Models.Configs;
|
|
using KLHZ.Trader.Core.Exchange.Models.Trading;
|
|
using KLHZ.Trader.Core.Exchange.Utils;
|
|
using KLHZ.Trader.Core.Math.Declisions.Utils;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using System.Collections.Concurrent;
|
|
using System.Security.Cryptography;
|
|
using System.Threading.Channels;
|
|
using Tinkoff.InvestApi;
|
|
using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType;
|
|
|
|
namespace KLHZ.Trader.Core.Exchange.Services
|
|
{
|
|
public class Trader : IHostedService
|
|
{
|
|
private readonly IDataBus _dataBus;
|
|
private readonly TraderDataProvider _tradeDataProvider;
|
|
private readonly ILogger<Trader> _logger;
|
|
private readonly ConcurrentDictionary<string, DateTime> OpeningStops = new();
|
|
private readonly ConcurrentDictionary<string, InstrumentSettings> Leverages = new();
|
|
|
|
private readonly string _bigWindowProcessor = nameof(Trader) + "_big";
|
|
private readonly string _smallWindowProcessor = nameof(Trader) + "_small";
|
|
|
|
private readonly double _buyStopLength;
|
|
private readonly decimal _futureComission;
|
|
private readonly decimal _shareComission;
|
|
private readonly decimal _accountCashPart;
|
|
private readonly decimal _accountCashPartFutures;
|
|
private readonly string[] _tradingInstrumentsFigis = [];
|
|
|
|
private readonly Channel<INewPrice> _pricesChannel = Channel.CreateUnbounded<INewPrice>();
|
|
private readonly Channel<IOrderbook> _ordersbookChannel = Channel.CreateUnbounded<IOrderbook>();
|
|
|
|
public Trader(
|
|
ILogger<Trader> logger,
|
|
IOptions<ExchangeConfig> options,
|
|
IDataBus dataBus,
|
|
TraderDataProvider tradeDataProvider,
|
|
InvestApiClient investApiClient)
|
|
{
|
|
_tradeDataProvider = tradeDataProvider;
|
|
_logger = logger;
|
|
_dataBus = dataBus;
|
|
_futureComission = options.Value.FutureComission;
|
|
_shareComission = options.Value.ShareComission;
|
|
_accountCashPart = options.Value.AccountCashPart;
|
|
_accountCashPartFutures = options.Value.AccountCashPartFutures;
|
|
_tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis;
|
|
_buyStopLength = (double)options.Value.StopBuyLengthMinuts;
|
|
|
|
foreach (var lev in options.Value.InstrumentsSettings)
|
|
{
|
|
Leverages.TryAdd(lev.Figi, lev);
|
|
}
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
await _tradeDataProvider.Init();
|
|
_dataBus.AddChannel(nameof(Trader), _pricesChannel);
|
|
_dataBus.AddChannel(nameof(Trader), _ordersbookChannel);
|
|
_ = ProcessPrices();
|
|
_ = ProcessOrdersbooks();
|
|
}
|
|
|
|
private async Task ProcessPrices()
|
|
{
|
|
while (await _pricesChannel.Reader.WaitToReadAsync())
|
|
{
|
|
var message = await _pricesChannel.Reader.ReadAsync();
|
|
|
|
if (_tradingInstrumentsFigis.Contains(message.Figi))
|
|
{
|
|
var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow;
|
|
|
|
try
|
|
{
|
|
await _tradeDataProvider.AddData(message, TimeSpan.FromHours(7));
|
|
//await ProcessDeferredLongOpens(message, currentTime);
|
|
//await ProcessDeferredLongCloses(message, currentTime);
|
|
if (message.Figi == "FUTIMOEXF000")
|
|
{
|
|
var windowMaxSize = 1000;
|
|
|
|
var data = await _tradeDataProvider.GetData(message.Figi, windowMaxSize);
|
|
var state = ExchangeScheduler.GetCurrentState(message.Time);
|
|
await ProcessClearing(data, state, message);
|
|
await SellOldAssetsIfCan(message);
|
|
|
|
ProcessOpeningStops(message, currentTime);
|
|
await ProcessNewPriceIMOEXF(data, state, message, windowMaxSize);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task SellOldAssetsIfCan(INewPrice message)
|
|
{
|
|
var accounts = _tradeDataProvider.Accounts.Values.ToArray();
|
|
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
|
|
foreach (var acc in accounts)
|
|
{
|
|
var assets = acc.Assets.Values.Where(a => a.Figi == message.Figi && DateTime.UtcNow - a.BoughtAt > TimeSpan.FromHours(4)).ToArray();
|
|
foreach (var asset in assets)
|
|
{
|
|
var profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
|
|
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
|
|
if (profit > 0)
|
|
{
|
|
await _dataBus.Broadcast(new TradeCommand()
|
|
{
|
|
AccountId = asset.AccountId,
|
|
Figi = message.Figi,
|
|
CommandType = asset.Count < 0 ? Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy
|
|
: Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell,
|
|
Count = (long)asset.Count,
|
|
RecomendPrice = null,
|
|
});
|
|
await LogDeclision(DeclisionTradeAction.CloseLong, message, profit);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ProcessNewPriceIMOEXF((DateTime[] timestamps, decimal[] prices, decimal bidsCount, decimal asksCount) data,
|
|
ExchangeState state,
|
|
INewPrice message, int windowMaxSize)
|
|
{
|
|
var res = TradingEvent.None;
|
|
var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices,
|
|
windowMaxSize, 25, 120, TimeSpan.FromSeconds(20), 1m, 1.5m);
|
|
|
|
res |= resultMoveAvFull.events;
|
|
|
|
if (resultMoveAvFull.bigWindowAv != 0)
|
|
{
|
|
await LogPrice(message, _bigWindowProcessor, resultMoveAvFull.bigWindowAv);
|
|
await LogPrice(message, _smallWindowProcessor, resultMoveAvFull.smallWindowAv);
|
|
}
|
|
|
|
if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart
|
|
&& !OpeningStops.TryGetValue(message.Figi, out _)
|
|
&& state == ExchangeState.Open
|
|
&& data.timestamps.Length > 1
|
|
&& (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1))
|
|
)
|
|
{
|
|
var fullData = await _tradeDataProvider.GetData(message.Figi, TimeSpan.FromMinutes(30));
|
|
if (fullData.isFullIntervalExists)
|
|
{
|
|
var max = fullData.prices.Max();
|
|
var min = fullData.prices.Min();
|
|
|
|
if (max - min < 15 && fullData.prices.Last() - fullData.prices.First() < 4 && fullData.prices.Last() - fullData.prices.First() > -4)
|
|
{
|
|
if (!message.IsHistoricalData && BotModeSwitcher.CanPurchase())
|
|
{
|
|
var accounts = _tradeDataProvider.Accounts
|
|
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
|
|
.ToArray();
|
|
var loggedDeclisions = 0;
|
|
foreach (var acc in accounts)
|
|
{
|
|
if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart))
|
|
{
|
|
if (RandomNumberGenerator.GetInt32(100) > 50)
|
|
{
|
|
await _dataBus.Broadcast(new TradeCommand()
|
|
{
|
|
AccountId = acc.Value.AccountId,
|
|
Figi = message.Figi,
|
|
CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy,
|
|
Count = 1,
|
|
RecomendPrice = null,
|
|
});
|
|
if (loggedDeclisions == 0)
|
|
{
|
|
await LogDeclision(DeclisionTradeAction.OpenLongReal, message);
|
|
OpeningStops[message.Figi] = DateTime.UtcNow.AddMinutes(1);
|
|
loggedDeclisions++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
await LogDeclision(DeclisionTradeAction.OpenLong, message);
|
|
}
|
|
}
|
|
}
|
|
|
|
if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd)
|
|
{
|
|
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
|
|
var loggedDeclisions = 0;
|
|
if (!message.IsHistoricalData && BotModeSwitcher.CanSell())
|
|
{
|
|
var assetsForClose = _tradeDataProvider.Accounts
|
|
.SelectMany(a => a.Value.Assets.Values)
|
|
.Where(a => a.Figi == message.Figi && a.Count > 0)
|
|
.ToArray();
|
|
foreach (var asset in assetsForClose)
|
|
{
|
|
var profit = 0m;
|
|
|
|
if (assetType == AssetType.Common && asset.Count > 0)
|
|
{
|
|
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
|
|
GetComission(assetType), 1, false);
|
|
}
|
|
if (assetType == AssetType.Futures)
|
|
{
|
|
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
|
|
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
|
|
}
|
|
if (profit > 0)
|
|
{
|
|
await _dataBus.Broadcast(new TradeCommand()
|
|
{
|
|
AccountId = asset.AccountId,
|
|
Figi = message.Figi,
|
|
CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell,
|
|
Count = (long)asset.Count,
|
|
RecomendPrice = null,
|
|
});
|
|
if (loggedDeclisions == 0)
|
|
{
|
|
loggedDeclisions++;
|
|
await LogDeclision(DeclisionTradeAction.CloseLongReal, message, profit);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
await LogDeclision(DeclisionTradeAction.CloseLong, message);
|
|
}
|
|
}
|
|
|
|
private async Task ProcessClearing((DateTime[] timestamps, decimal[] prices, decimal bidsCount, decimal asksCount) data, ExchangeState state, INewPrice message)
|
|
{
|
|
if (state == ExchangeState.ClearingTime
|
|
&& !message.IsHistoricalData
|
|
&& data.timestamps.Length > 1
|
|
&& (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3))
|
|
{
|
|
await _tradeDataProvider.UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]);
|
|
}
|
|
}
|
|
|
|
private void ProcessOpeningStops(INewPrice message, DateTime currentTime)
|
|
{
|
|
if (OpeningStops.TryGetValue(message.Figi, out var dt))
|
|
{
|
|
if (dt < currentTime)
|
|
{
|
|
OpeningStops.TryRemove(message.Figi, out _);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task LogPrice(INewPrice message, string processor, decimal value)
|
|
{
|
|
await _tradeDataProvider.LogPrice(new ProcessedPrice()
|
|
{
|
|
Figi = message.Figi,
|
|
Ticker = message.Ticker,
|
|
Processor = processor,
|
|
Time = message.Time,
|
|
Value = value,
|
|
}, false);
|
|
}
|
|
|
|
private async Task LogDeclision(DeclisionTradeAction action, INewPrice message, decimal? profit = null)
|
|
{
|
|
await _tradeDataProvider.LogDeclision(new Declision()
|
|
{
|
|
AccountId = string.Empty,
|
|
Figi = message.Figi,
|
|
Ticker = message.Ticker,
|
|
Value = profit,
|
|
Price = message.Value,
|
|
Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
|
|
Action = action,
|
|
}, false);
|
|
}
|
|
|
|
public Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private decimal GetComission(AssetType assetType)
|
|
{
|
|
if (assetType == AssetType.Common)
|
|
{
|
|
return _shareComission;
|
|
}
|
|
else if (assetType == AssetType.Futures)
|
|
{
|
|
return _futureComission;
|
|
}
|
|
else
|
|
{
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
private decimal GetLeverage(string figi, bool isShort)
|
|
{
|
|
var res = 1m;
|
|
if (Leverages.TryGetValue(figi, out var leverage))
|
|
{
|
|
res = isShort ? leverage.ShortLeverage : leverage.LongLeverage;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
internal static bool IsBuyAllowed(ManagedAccount account, decimal boutPrice, decimal count,
|
|
decimal accountCashPartFutures, decimal accountCashPart)
|
|
{
|
|
if (!BotModeSwitcher.CanPurchase()) return false;
|
|
|
|
var balance = account.Balance;
|
|
var total = account.Total;
|
|
|
|
var futures = account.Assets.Values.FirstOrDefault(v => v.Type == AssetType.Futures);
|
|
if (futures != null)
|
|
{
|
|
if ((balance - boutPrice * count) / total < accountCashPartFutures) return false;
|
|
}
|
|
else
|
|
{
|
|
if ((balance - boutPrice * count) / total < accountCashPart) return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
private async Task ProcessOrdersbooks()
|
|
{
|
|
while (await _ordersbookChannel.Reader.WaitToReadAsync())
|
|
{
|
|
var message = await _ordersbookChannel.Reader.ReadAsync();
|
|
await _tradeDataProvider.AddOrderbook(message);
|
|
}
|
|
}
|
|
}
|
|
}
|