425 lines
22 KiB
C#
425 lines
22 KiB
C#
using KLHZ.Trader.Core.Common;
|
|
using KLHZ.Trader.Core.Contracts.Declisions.Dtos.Enums;
|
|
using KLHZ.Trader.Core.Contracts.Declisions.Interfaces;
|
|
using KLHZ.Trader.Core.Contracts.Messaging.Dtos;
|
|
using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces;
|
|
using KLHZ.Trader.Core.Contracts.Messaging.Interfaces;
|
|
using KLHZ.Trader.Core.DataLayer;
|
|
using KLHZ.Trader.Core.DataLayer.Entities.Declisions;
|
|
using KLHZ.Trader.Core.DataLayer.Entities.Declisions.Enums;
|
|
using KLHZ.Trader.Core.DataLayer.Entities.Prices;
|
|
using KLHZ.Trader.Core.Exchange.Models.AssetsAccounting;
|
|
using KLHZ.Trader.Core.Exchange.Models.Configs;
|
|
using KLHZ.Trader.Core.Exchange.Models.Trading;
|
|
using KLHZ.Trader.Core.Exchange.Utils;
|
|
using KLHZ.Trader.Core.Math.Declisions.Services.Cache;
|
|
using KLHZ.Trader.Core.Math.Declisions.Utils;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Hosting;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using System.Collections.Concurrent;
|
|
using System.Threading.Channels;
|
|
using Tinkoff.InvestApi;
|
|
using AssetType = KLHZ.Trader.Core.Exchange.Models.AssetsAccounting.AssetType;
|
|
|
|
namespace KLHZ.Trader.Core.Exchange.Services
|
|
{
|
|
public class Trader : IHostedService
|
|
{
|
|
private readonly IDataBus _dataBus;
|
|
private readonly IDbContextFactory<TraderDbContext> _dbContextFactory;
|
|
private readonly TradeDataProvider _tradeDataProvider;
|
|
private readonly ILogger<Trader> _logger;
|
|
internal readonly ConcurrentDictionary<string, DeferredTrade> DeferredLongOpens = new();
|
|
internal readonly ConcurrentDictionary<string, DeferredTrade> DeferredLongCloses = new();
|
|
private readonly ConcurrentDictionary<string, DateTime> OpeningStops = new();
|
|
private readonly ConcurrentDictionary<string, InstrumentSettings> Leverages = new();
|
|
private readonly ConcurrentDictionary<string, IPriceHistoryCacheUnit> _historyCash = new();
|
|
|
|
|
|
private readonly double _buyStopLength;
|
|
private readonly decimal _futureComission;
|
|
private readonly decimal _shareComission;
|
|
private readonly decimal _accountCashPart;
|
|
private readonly decimal _accountCashPartFutures;
|
|
private readonly string[] _tradingInstrumentsFigis = [];
|
|
|
|
private readonly Channel<INewPrice> _pricesChannel = Channel.CreateUnbounded<INewPrice>();
|
|
private readonly Channel<IOrderbook> _ordersbookChannel = Channel.CreateUnbounded<IOrderbook>();
|
|
private readonly CancellationTokenSource _cts = new();
|
|
public Trader(
|
|
ILogger<Trader> logger,
|
|
IOptions<ExchangeConfig> options,
|
|
IDataBus dataBus,
|
|
IDbContextFactory<TraderDbContext> dbContextFactory,
|
|
TradeDataProvider tradeDataProvider,
|
|
InvestApiClient investApiClient)
|
|
{
|
|
_tradeDataProvider = tradeDataProvider;
|
|
_logger = logger;
|
|
_dataBus = dataBus;
|
|
_dbContextFactory = dbContextFactory;
|
|
_futureComission = options.Value.FutureComission;
|
|
_shareComission = options.Value.ShareComission;
|
|
_accountCashPart = options.Value.AccountCashPart;
|
|
_accountCashPartFutures = options.Value.AccountCashPartFutures;
|
|
_tradingInstrumentsFigis = options.Value.TradingInstrumentsFigis;
|
|
_buyStopLength = (double)options.Value.StopBuyLengthMinuts;
|
|
|
|
foreach (var lev in options.Value.InstrumentsSettings)
|
|
{
|
|
Leverages.TryAdd(lev.Figi, lev);
|
|
}
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
await _tradeDataProvider.Init();
|
|
_dataBus.AddChannel(nameof(Trader), _pricesChannel);
|
|
_dataBus.AddChannel(nameof(Trader), _ordersbookChannel);
|
|
_ = ProcessPrices();
|
|
_ = ProcessOrdersbooks();
|
|
}
|
|
|
|
private async Task ProcessPrices()
|
|
{
|
|
var declisionsForSave = new List<Declision>();
|
|
var processedPrices = new List<ProcessedPrice>();
|
|
while (await _pricesChannel.Reader.WaitToReadAsync())
|
|
{
|
|
|
|
var bigWindowProcessor = nameof(Trader) + "_big";
|
|
var smallWindowProcessor = nameof(Trader) + "_small";
|
|
var message = await _pricesChannel.Reader.ReadAsync();
|
|
|
|
if (_tradingInstrumentsFigis.Contains(message.Figi))
|
|
{
|
|
var currentTime = message.IsHistoricalData ? message.Time : DateTime.UtcNow;
|
|
if (_historyCash.TryGetValue(message.Figi, out var unit))
|
|
{
|
|
await unit.AddData(message);
|
|
}
|
|
else
|
|
{
|
|
unit = new PriceHistoryCacheUnit2(message.Figi, message);
|
|
_historyCash.TryAdd(message.Figi, unit);
|
|
}
|
|
try
|
|
{
|
|
if (message.Figi == "FUTIMOEXF000")
|
|
{
|
|
DeferredTrade? longOpen;
|
|
DeferredLongOpens.TryGetValue(message.Figi, out longOpen);
|
|
if (longOpen != null)
|
|
{
|
|
var t = currentTime;
|
|
if (longOpen.Time <= t
|
|
&& t - longOpen.Time < TimeSpan.FromMinutes(3))
|
|
{
|
|
DeferredLongOpens.TryRemove(message.Figi, out _);
|
|
if (message.Value - longOpen.Price < 1)
|
|
{
|
|
if (!message.IsHistoricalData)
|
|
{
|
|
var accounts = _tradeDataProvider.Accounts
|
|
.Where(a => !a.Value.Assets.ContainsKey(message.Figi))
|
|
.ToArray();
|
|
foreach (var acc in accounts)
|
|
{
|
|
if (IsBuyAllowed(acc.Value, message.Value, 1, _accountCashPartFutures, _accountCashPart))
|
|
{
|
|
await _dataBus.Broadcast(new TradeCommand()
|
|
{
|
|
AccountId = acc.Value.AccountId,
|
|
Figi = message.Figi,
|
|
CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketBuy,
|
|
Count = 1,
|
|
RecomendPrice = null,
|
|
});
|
|
LogDeclision(declisionsForSave, DeclisionTradeAction.OpenLong, message);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
LogDeclision(declisionsForSave, DeclisionTradeAction.OpenLong, message);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
DeferredTrade? longClose;
|
|
DeferredLongCloses.TryGetValue(message.Figi, out longClose);
|
|
if (longClose != null)
|
|
{
|
|
if (longClose.Time <= currentTime)
|
|
{
|
|
DeferredLongCloses.TryRemove(message.Figi, out _);
|
|
if (longClose.Price - message.Value < 1)
|
|
{
|
|
if (!message.IsHistoricalData)
|
|
{
|
|
var assetsForClose = _tradeDataProvider.Accounts
|
|
.SelectMany(a => a.Value.Assets.Values)
|
|
.Where(a => a.Figi == message.Figi && a.Count > 0)
|
|
.ToArray();
|
|
foreach (var asset in assetsForClose)
|
|
{
|
|
var profit = 0m;
|
|
var assetType = _tradeDataProvider.GetAssetTypeByFigi(message.Figi);
|
|
if (assetType == AssetType.Common && asset.Count > 0)
|
|
{
|
|
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
|
|
GetComission(assetType), 1, false);
|
|
}
|
|
if (assetType == AssetType.Futures)
|
|
{
|
|
profit = TradingCalculator.CaclProfit(asset.BoughtPrice, message.Value,
|
|
GetComission(assetType), GetLeverage(message.Figi, asset.Count < 0), asset.Count < 0);
|
|
}
|
|
if (profit > 0)
|
|
{
|
|
await _dataBus.Broadcast(new TradeCommand()
|
|
{
|
|
AccountId = asset.AccountId,
|
|
Figi = message.Figi,
|
|
CommandType = Contracts.Messaging.Dtos.Enums.TradeCommandType.MarketSell,
|
|
Count = (long)asset.Count,
|
|
RecomendPrice = null,
|
|
});
|
|
LogDeclision(declisionsForSave, DeclisionTradeAction.CloseLong, message);
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
LogDeclision(declisionsForSave, DeclisionTradeAction.CloseLong, message);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var windowMaxSize = 100;
|
|
var data = await unit.GetData(windowMaxSize);
|
|
var state = ExchangeScheduler.GetCurrentState(message.Time);
|
|
|
|
if (state == ExchangeState.ClearingTime
|
|
&& !message.IsHistoricalData
|
|
&& data.timestamps.Length > 1
|
|
&& (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2]) > TimeSpan.FromMinutes(3))
|
|
{
|
|
await UpdateFuturesPrice(message, data.prices[data.prices.Length - 2]);
|
|
}
|
|
|
|
if (OpeningStops.TryGetValue(message.Figi, out var dt))
|
|
{
|
|
if (dt < currentTime)
|
|
{
|
|
OpeningStops.TryRemove(message.Figi, out _);
|
|
}
|
|
}
|
|
|
|
if ((unit.BidsCount / unit.AsksCount) < 0.5m || (unit.BidsCount / unit.AsksCount) > 2m)
|
|
{
|
|
var stopTo = currentTime.AddMinutes(3);
|
|
//OpeningStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo);
|
|
//LogDeclision(declisionsForSave, DeclisionTradeAction.StopBuyShortTime, message);
|
|
}
|
|
|
|
var res = TradingEvent.None;
|
|
var resultMoveAvFull = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 45, 180, 2.5m);
|
|
var resultLongClose = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, windowMaxSize, 15, 120, 2.5m).events;
|
|
|
|
var uptrendStarts = LocalTrends.CheckByLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(120), TimeSpan.FromSeconds(20), 1.5m, 15);
|
|
//var uptrendStarts2 = LocalTrends.CheckByLocalTrends(data.timestamps, data.prices, TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(3), 1.5m, 2);
|
|
|
|
|
|
res |= (uptrendStarts & TradingEvent.UptrendStart);
|
|
//res |= (uptrendStarts2 & TradingEvent.UptrendStart);
|
|
//res |= downtrendEnds;
|
|
res |= resultLongClose;
|
|
res |= resultMoveAvFull.events;
|
|
if (resultMoveAvFull.bigWindowAv != 0)
|
|
{
|
|
LogPrice(processedPrices, message, bigWindowProcessor, resultMoveAvFull.bigWindowAv);
|
|
LogPrice(processedPrices, message, smallWindowProcessor, resultMoveAvFull.smallWindowAv);
|
|
}
|
|
if ((resultLongClose & TradingEvent.StopBuy) == TradingEvent.StopBuy)
|
|
{
|
|
var stopTo = (message.IsHistoricalData ? message.Time : DateTime.UtcNow).AddMinutes(_buyStopLength/2);
|
|
OpeningStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo);
|
|
//LogDeclision(declisionsForSave, DeclisionTradeAction.StopBuy, message);
|
|
}
|
|
|
|
if ((res & TradingEvent.UptrendStart) == TradingEvent.UptrendStart
|
|
&& !OpeningStops.TryGetValue(message.Figi, out _)
|
|
&& state == ExchangeState.Open
|
|
&& data.timestamps.Length > 1
|
|
&& (data.timestamps[data.timestamps.Length - 1] - data.timestamps[data.timestamps.Length - 2] < TimeSpan.FromMinutes(1)))
|
|
{
|
|
var trade = new DeferredTrade()
|
|
{
|
|
Figi = message.Figi,
|
|
Price = message.Value,
|
|
Time = message.Time.AddSeconds(15)
|
|
};
|
|
DeferredLongOpens[message.Figi] = trade;
|
|
}
|
|
|
|
if ((res & TradingEvent.UptrendEnd) == TradingEvent.UptrendEnd)
|
|
{
|
|
var trade = new DeferredTrade()
|
|
{
|
|
Figi = message.Figi,
|
|
Price = message.Value,
|
|
Time = message.Time.AddSeconds(15)
|
|
};
|
|
DeferredLongCloses[message.Figi] = trade;
|
|
}
|
|
|
|
//if ((resultLongOpen.events & TradingEvent.ShortOpen) == TradingEvent.ShortOpen
|
|
// && !OpeningStops.TryGetValue(message.Figi, out _))
|
|
//{
|
|
// LogDeclision(declisionsForSave, DeclisionTradeAction.OpenShort, message);
|
|
//}
|
|
|
|
//if ((resultLongOpen.events & TradingEvent.ShortClose) == TradingEvent.ShortClose)
|
|
//{
|
|
// LogDeclision(declisionsForSave, DeclisionTradeAction.CloseShort, message);
|
|
//}
|
|
|
|
if ((!message.IsHistoricalData && (processedPrices.Count > 0 || declisionsForSave.Count > 0))
|
|
|| (message.IsHistoricalData && ((processedPrices.Count + declisionsForSave.Count > 10000) || _pricesChannel.Reader.Count == 0)))
|
|
{
|
|
using var context = await _dbContextFactory.CreateDbContextAsync();
|
|
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
|
|
|
|
if (processedPrices.Count > 0)
|
|
{
|
|
await context.ProcessedPrices.AddRangeAsync(processedPrices);
|
|
processedPrices.Clear();
|
|
}
|
|
if (declisionsForSave.Count > 0)
|
|
{
|
|
await context.Declisions.AddRangeAsync(declisionsForSave);
|
|
declisionsForSave.Clear();
|
|
}
|
|
await context.SaveChangesAsync();
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Ошибка при боработке новой цены IMOEXF");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task UpdateFuturesPrice(INewPrice newPrice, decimal newPriceValue)
|
|
{
|
|
using var context = await _dbContextFactory.CreateDbContextAsync();
|
|
await context.Trades
|
|
.Where(t => t.Figi == newPrice.Figi && t.ArchiveStatus == 0 && t.Asset == DataLayer.Entities.Trades.Enums.AssetType.Future)
|
|
.ExecuteUpdateAsync(t => t.SetProperty(tr => tr.Price, newPriceValue));
|
|
foreach (var account in _tradeDataProvider.Accounts.Values)
|
|
{
|
|
await _tradeDataProvider.SyncPortfolio(account);
|
|
}
|
|
}
|
|
|
|
private static void LogPrice(List<ProcessedPrice> prices, INewPrice message, string processor, decimal value)
|
|
{
|
|
prices.Add(new ProcessedPrice()
|
|
{
|
|
Figi = message.Figi,
|
|
Ticker = message.Ticker,
|
|
Processor = processor,
|
|
Time = message.Time,
|
|
Value = value,
|
|
});
|
|
}
|
|
|
|
private static void LogDeclision(List<Declision> declisions, DeclisionTradeAction action, INewPrice message)
|
|
{
|
|
declisions.Add(new Declision()
|
|
{
|
|
AccountId = string.Empty,
|
|
Figi = message.Figi,
|
|
Ticker = message.Ticker,
|
|
Price = message.Value,
|
|
Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
|
|
Action = action,
|
|
});
|
|
}
|
|
|
|
private async Task ProcessOrdersbooks()
|
|
{
|
|
while (await _ordersbookChannel.Reader.WaitToReadAsync())
|
|
{
|
|
var message = await _ordersbookChannel.Reader.ReadAsync();
|
|
if (!_historyCash.TryGetValue(message.Figi, out var data))
|
|
{
|
|
data = new PriceHistoryCacheUnit2(message.Figi);
|
|
_historyCash.TryAdd(message.Figi, data);
|
|
}
|
|
await data.AddOrderbook(message);
|
|
}
|
|
}
|
|
|
|
public Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
_cts.Cancel();
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private decimal GetComission(AssetType assetType)
|
|
{
|
|
if (assetType == AssetType.Common)
|
|
{
|
|
return _shareComission;
|
|
}
|
|
else if (assetType == AssetType.Futures)
|
|
{
|
|
return _futureComission;
|
|
}
|
|
else
|
|
{
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
private decimal GetLeverage(string figi, bool isShort)
|
|
{
|
|
var res = 1m;
|
|
if (Leverages.TryGetValue(figi, out var leverage))
|
|
{
|
|
res = isShort ? leverage.ShortLeverage : leverage.LongLeverage;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
internal static bool IsBuyAllowed(ManagedAccount account, decimal boutPrice, decimal count,
|
|
decimal accountCashPartFutures, decimal accountCashPart)
|
|
{
|
|
if (!BotModeSwitcher.CanPurchase()) return false;
|
|
|
|
var balance = account.Balance;
|
|
var total = account.Total;
|
|
|
|
var futures = account.Assets.Values.FirstOrDefault(v => v.Type == AssetType.Futures);
|
|
if (futures != null)
|
|
{
|
|
if ((balance - boutPrice * count) / total < accountCashPartFutures) return false;
|
|
}
|
|
else
|
|
{
|
|
if ((balance - boutPrice * count) / total < accountCashPart) return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
}
|
|
}
|