фиксация перед заменой float на decimal

main
vlad zverzhkhovskiy 2025-09-02 17:46:30 +03:00
parent 270e807591
commit ff1090c819
4 changed files with 150 additions and 122 deletions

View File

@ -9,7 +9,15 @@ namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces
public ValueTask AddData(INewPrice priceChange); public ValueTask AddData(INewPrice priceChange);
public ValueTask<(DateTime[] timestamps, float[] prices)> GetData(); public ValueTask<(DateTime[] timestamps, float[] prices)> GetData();
public ValueTask AddOrderbook(IOrderbook orderbook); public ValueTask AddOrderbook(IOrderbook orderbook);
/// <summary>
/// Число заявок на продаже в стакане.
/// </summary>
public decimal AsksCount { get; } public decimal AsksCount { get; }
/// <summary>
/// Число заявок на покупку в стакане.
/// </summary>
public decimal BidsCount { get; } public decimal BidsCount { get; }
} }
} }

View File

@ -38,13 +38,15 @@ namespace KLHZ.Trader.Core.Math.Declisions.Utils
} }
if (shift > 0) if (shift > 0)
{ {
var i1 = size - 1 - shift;
var i2 = size - 2 - shift;
var isCrossing = Lines.IsLinesCrossing( var isCrossing = Lines.IsLinesCrossing(
times[size - 1 - shift], times[i1],
times[size - 2 - shift], times[i2],
twav15s[size - 1 - shift], twav15s[i1],
twav15s[size - 2 - shift], twav15s[i2],
twav120s[size - 1 - shift], twav120s[i1],
twav120s[size - 2 - shift]); twav120s[i2]);
if (shift == 1 && !isCrossing) //если нет пересечения скользящих средний с окном 120 и 15 секунд между if (shift == 1 && !isCrossing) //если нет пересечения скользящих средний с окном 120 и 15 секунд между
//текущей и предыдущей точкой - можно не продолжать выполнение. //текущей и предыдущей точкой - можно не продолжать выполнение.
{ {

View File

@ -142,14 +142,10 @@ namespace KLHZ.Trader.Core.Exchange.Services
{ {
SubscribeOrderBookRequest = bookRequest SubscribeOrderBookRequest = bookRequest
}); });
using var context = await _dbContextFactory.CreateDbContextAsync();
context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
var pricesBuffer = new List<PriceChange>(); var pricesBuffer = new List<PriceChange>();
var orderbookItemsBuffer = new List<OrderbookItem>(); var orderbookItemsBuffer = new List<OrderbookItem>();
var tradesBuffer = new List<InstrumentTrade>(); var tradesBuffer = new List<InstrumentTrade>();
var lastWriteOrderbooks = DateTime.UtcNow;
var lastWriteTrades = DateTime.UtcNow;
var lastWritePrices = DateTime.UtcNow;
var lastWrite = DateTime.UtcNow; var lastWrite = DateTime.UtcNow;
await foreach (var response in stream.ResponseStream.ReadAllAsync()) await foreach (var response in stream.ResponseStream.ReadAllAsync())
{ {
@ -214,25 +210,34 @@ namespace KLHZ.Trader.Core.Exchange.Services
await _eventBus.Broadcast(message); await _eventBus.Broadcast(message);
} }
if (orderbookItemsBuffer.Count + pricesBuffer.Count + tradesBuffer.Count > 1000 || (DateTime.UtcNow - lastWrite).TotalSeconds > 10) if (orderbookItemsBuffer.Count + pricesBuffer.Count + tradesBuffer.Count > 0 || (DateTime.UtcNow - lastWrite).TotalSeconds > 10)
{ {
lastWrite = DateTime.UtcNow; try
if (orderbookItemsBuffer.Count > 0)
{ {
await context.OrderbookItems.AddRangeAsync(orderbookItemsBuffer); using var context = await _dbContextFactory.CreateDbContextAsync();
orderbookItemsBuffer.Clear(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
lastWrite = DateTime.UtcNow;
if (orderbookItemsBuffer.Count > 0)
{
await context.OrderbookItems.AddRangeAsync(orderbookItemsBuffer);
orderbookItemsBuffer.Clear();
}
if (pricesBuffer.Count > 0)
{
await context.PriceChanges.AddRangeAsync(pricesBuffer);
pricesBuffer.Clear();
}
if (tradesBuffer.Count > 0)
{
await context.InstrumentTrades.AddRangeAsync(tradesBuffer);
tradesBuffer.Clear();
}
await context.SaveChangesAsync();
} }
if (pricesBuffer.Count > 0) catch (Exception ex)
{ {
await context.PriceChanges.AddRangeAsync(pricesBuffer); _logger.LogError(ex, "Ошибка при сохранении данных биржи.");
pricesBuffer.Clear();
} }
if (tradesBuffer.Count > 0)
{
await context.InstrumentTrades.AddRangeAsync(tradesBuffer);
tradesBuffer.Clear();
}
await context.SaveChangesAsync();
} }
} }
} }

View File

@ -98,7 +98,6 @@ namespace KLHZ.Trader.Core.Exchange.Services
_dataBus.AddChannel(nameof(Trader), _ordersbookChannel); _dataBus.AddChannel(nameof(Trader), _ordersbookChannel);
_ = ProcessPrices(); _ = ProcessPrices();
_ = ProcessOrdersbooks(); _ = ProcessOrdersbooks();
_ = BackgroundWorker();
} }
private async Task InitStops() private async Task InitStops()
@ -119,86 +118,121 @@ namespace KLHZ.Trader.Core.Exchange.Services
while (await _pricesChannel.Reader.WaitToReadAsync()) while (await _pricesChannel.Reader.WaitToReadAsync())
{ {
var message = await _pricesChannel.Reader.ReadAsync(); var message = await _pricesChannel.Reader.ReadAsync();
//if (_tradingInstrumentsFigis.Contains(message.Figi)) if (_tradingInstrumentsFigis.Contains(message.Figi))
//{ {
// if (_historyCash.TryGetValue(message.Figi, out var unit)) if (_historyCash.TryGetValue(message.Figi, out var unit))
// { {
// await unit.AddData(message); await unit.AddData(message);
// } }
// else else
// { {
// unit = new PriceHistoryCacheUnit2(message.Figi, message); unit = new PriceHistoryCacheUnit2(message.Figi, message);
// _historyCash.TryAdd(message.Figi, unit); _historyCash.TryAdd(message.Figi, unit);
// } }
// var data = await unit.GetData(); try
// var declisionsForSave = new List<Declision>(); {
// if (message.Figi == "FUTIMOEXF000") var data = await unit.GetData();
// { var declisionsForSave = new List<Declision>();
// var result = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, 100, 3f); if (message.Figi == "FUTIMOEXF000")
// if ((result & TradingEvent.StopBuy) == TradingEvent.StopBuy) {
// { var result = MovingAverage.CheckByWindowAverageMean(data.timestamps, data.prices, 100, 3f);
// var stopTo = DateTime.UtcNow.AddMinutes(_buyStopLength); if ((result & TradingEvent.StopBuy) == TradingEvent.StopBuy)
// BuyStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo); {
// declisionsForSave.Add(new Declision() var stopTo = DateTime.UtcNow.AddMinutes(_buyStopLength);
// { BuyStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo);
// AccountId = string.Empty, declisionsForSave.Add(new Declision()
// Figi = message.Figi, {
// Ticker = message.Ticker, AccountId = string.Empty,
// Price = message.Value, Figi = message.Figi,
// Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Ticker = message.Ticker,
// Action = DeclisionTradeAction.StopBuy, Price = message.Value,
// }); Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
// } Action = DeclisionTradeAction.StopBuy,
});
}
// if ((result & TradingEvent.LongOpen) == TradingEvent.LongOpen if ((result & TradingEvent.LongOpen) == TradingEvent.LongOpen
// && !BuyStops.TryGetValue(message.Figi, out _)) && ((unit.BidsCount / unit.AsksCount) > 0.5m))
// { {
// var stopTo = DateTime.UtcNow.AddMinutes(_buyStopLength); if (BuyStops.TryGetValue(message.Figi, out var dt))
// BuyStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo); {
// declisionsForSave.Add(new Declision() if (dt > DateTime.UtcNow)
// { {
// AccountId = string.Empty, continue;
// Figi = message.Figi, }
// Ticker = message.Ticker, else
// Price = message.Value, {
// Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, BuyStops.TryRemove(message.Figi, out _);
// Action = DeclisionTradeAction.OpenLong, }
// }); }
// } var stopTo = DateTime.UtcNow.AddMinutes(_buyStopLength);
BuyStops.AddOrUpdate(message.Figi, stopTo, (k, v) => stopTo);
declisionsForSave.Add(new Declision()
{
AccountId = string.Empty,
Figi = message.Figi,
Ticker = message.Ticker,
Price = message.Value,
Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
Action = DeclisionTradeAction.OpenLong,
});
}
// if ((result & TradingEvent.LongClose) == TradingEvent.LongClose) if ((result & TradingEvent.LongClose) == TradingEvent.LongClose)
// { {
// declisionsForSave.Add(new Declision() declisionsForSave.Add(new Declision()
// { {
// AccountId = string.Empty, AccountId = string.Empty,
// Figi = message.Figi, Figi = message.Figi,
// Ticker = message.Ticker, Ticker = message.Ticker,
// Price = message.Value, Price = message.Value,
// Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
// Action = DeclisionTradeAction.CloseLong, Action = DeclisionTradeAction.CloseLong,
// }); });
// } }
// if ((result & TradingEvent.ShortOpen) == TradingEvent.ShortOpen && (unit.AsksCount/ unit.BidsCount>2 )) if ((result & TradingEvent.ShortOpen) == TradingEvent.ShortOpen && (unit.BidsCount / unit.AsksCount < 2))
// { {
// declisionsForSave.Add(new Declision() declisionsForSave.Add(new Declision()
// { {
// AccountId = string.Empty, AccountId = string.Empty,
// Figi = message.Figi, Figi = message.Figi,
// Ticker = message.Ticker, Ticker = message.Ticker,
// Price = message.Value, Price = message.Value,
// Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow, Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
// Action = DeclisionTradeAction.OpenShort, Action = DeclisionTradeAction.OpenShort,
// }); });
// } }
// using var context = await _dbContextFactory.CreateDbContextAsync(); if ((result & TradingEvent.ShortClose) == TradingEvent.ShortClose)
// context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; {
declisionsForSave.Add(new Declision()
{
AccountId = string.Empty,
Figi = message.Figi,
Ticker = message.Ticker,
Price = message.Value,
Time = message.IsHistoricalData ? message.Time : DateTime.UtcNow,
Action = DeclisionTradeAction.CloseShort,
});
}
// await context.AddRangeAsync(declisionsForSave); if (declisionsForSave.Count > 0)
// await context.SaveChangesAsync(); {
// } using var context = await _dbContextFactory.CreateDbContextAsync();
//} context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking;
await context.AddRangeAsync(declisionsForSave);
await context.SaveChangesAsync();
declisionsForSave.Clear();
}
}
}
catch (Exception ex)
{
}
}
} }
} }
@ -216,27 +250,6 @@ namespace KLHZ.Trader.Core.Exchange.Services
} }
} }
private async Task BackgroundWorker()
{
var keysForRemove = new List<string>();
while (!_cts.IsCancellationRequested)
{
var time = DateTime.UtcNow;
foreach (var kvp in BuyStops)
{
if (kvp.Value > time)
{
keysForRemove.Add(kvp.Key);
}
}
foreach (var key in keysForRemove)
{
BuyStops.TryRemove(key, out _);
}
await Task.Delay(10000);
}
}
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {
_cts.Cancel(); _cts.Cancel();