diff --git a/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs b/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs index 074a7c3..a457116 100644 --- a/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs +++ b/KLHZ.Trader.Core.Contracts/Declisions/Interfaces/IPriceHistoryCacheUnit.cs @@ -7,7 +7,6 @@ namespace KLHZ.Trader.Core.Contracts.Declisions.Interfaces public string Figi { get; } public int Length { get; } public ValueTask AddData(INewPrice priceChange); - public ValueTask<(DateTime[] timestamps, float[] prices)> GetData(); } } diff --git a/KLHZ.Trader.Core.Math/Declisions/Dtos/Services/KalmanPredictor.cs b/KLHZ.Trader.Core.Math/Declisions/Dtos/Services/KalmanPredictor.cs deleted file mode 100644 index 0b449cc..0000000 --- a/KLHZ.Trader.Core.Math/Declisions/Dtos/Services/KalmanPredictor.cs +++ /dev/null @@ -1,191 +0,0 @@ -using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Intarfaces; -using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; -using MathNet.Filtering.Kalman; -using MathNet.Numerics.LinearAlgebra; -using Microsoft.Extensions.Hosting; -using System.Threading.Channels; - -namespace KLHZ.Trader.Core.Math.Declisions.Dtos.Services -{ - public class KalmanPredictor : IHostedService - { - private readonly double r = 1000.0; // Measurement covariance - private readonly PriceHistoryCacheUnit _cache = new PriceHistoryCacheUnit(""); - private readonly Channel _messages = Channel.CreateUnbounded(); - private readonly IDataBus _dataBus; - - private DiscreteKalmanFilter? _dkf; - - public KalmanPredictor(IDataBus bus) - { - _dataBus = bus; - bus.AddChannel(nameof(KalmanPredictor), _messages); - _ = ProcessMessages(); - } - - public Task StartAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - - private async Task Init() - { - - } - - private async Task ProcessCalman(INewPrice message) - { - var H = Matrix.Build.Dense(2, 2, new[] { 1d, 0d, 0d, 1d }); // Measurement matrix - var length = _cache.Length; - - if (length > 2 && _dkf != null) - { - var data = await _cache.GetData(); - - var dt = (data.timestamps[data.prices.Length - 1] - data.timestamps[data.prices.Length - 2]).TotalSeconds; - if (dt < 0.0000001 || dt > 1800) return; - var F = Matrix.Build.Dense(2, 2, new[] { 1d, 0d, dt, 1 }); // State transition matrix - var R = Matrix.Build.Dense(2, 2, new[] { r, r / dt, r / dt, 2 * r / (dt * dt) }); - _dkf.Predict(F); - - var state = _dkf.State; - var value = state[0, 0] + dt * state[1, 0]; - if (!double.IsNaN(value)) - { - await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() - { - Figi = message.Figi, - Processor = nameof(KalmanPredictor) + "_predict", - Ticker = message.Ticker, - IsHistoricalData = message.IsHistoricalData, - Time = message.Time, - Value = (decimal)value, - }); - } - - - var dprice = data.prices[data.prices.Length - 1] - data.prices[data.prices.Length - 2]; - var k = dprice / dt; - var b = data.prices[data.prices.Length - 2]; - var z = Matrix.Build.Dense(2, 1, new[] { b, k }); - - _dkf.Update(z, H, R); - - var state2 = _dkf.State; - var value2 = state2[0, 0] + dt * state2[1, 0]; - if (!double.IsNaN(value)) - { - await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() - { - Figi = message.Figi, - Processor = nameof(KalmanPredictor) + "_state", - Ticker = message.Ticker, - IsHistoricalData = message.IsHistoricalData, - Time = message.Time, - Value = (decimal)value, - }); - } - } - else if (length >= 2) - { - var data = await _cache.GetData(); - var dprice = data.prices[data.prices.Length - 1] - data.prices[data.prices.Length - 2]; - var dt = (data.timestamps[data.prices.Length - 1] - data.timestamps[data.prices.Length - 2]).TotalSeconds; - if (dt < 0.0000001) return; - var k = dprice / dt; - var b = data.prices[data.prices.Length - 2]; - - - Matrix x0 = Matrix.Build.Dense(2, 1, new[] { b, k }); - Matrix P0 = Matrix.Build.Dense(2, 2, new[] { r, r / dt, r / dt, 2 * r / (dt * dt) }); - - _dkf = new DiscreteKalmanFilter(x0, P0); - } - } - - private async Task ProcessMovAv(INewPrice message, int window) - { - var data = await _cache.GetData(); - if (data.prices.Length < window) return; - var sum = 0f; - var count = 0; - for (int i = 1; (i <= window); i++) - { - sum += data.prices[data.prices.Length - i]; - count++; - } - - await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() - { - Figi = message.Figi, - Processor = nameof(KalmanPredictor) + "_mov_av_window" + window.ToString(), - Ticker = message.Ticker, - IsHistoricalData = message.IsHistoricalData, - Time = message.Time, - Value = (decimal)(sum / count), - }); - } - - private async Task ProcessTimeWindow(INewPrice message, int window) - { - var data = await _cache.GetData(); - var sum = data.prices[data.prices.Length - 1]; - var count = 1; - var startTime = data.timestamps[data.prices.Length - 1]; - for (int i = 2; (i < data.prices.Length && (startTime - data.timestamps[data.prices.Length - i]) < TimeSpan.FromSeconds(window)); i++) - { - sum += data.prices[data.prices.Length - i]; - count++; - } - - await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() - { - Figi = message.Figi, - Processor = nameof(KalmanPredictor) + "_timeWindow" + window.ToString(), - Ticker = message.Ticker, - IsHistoricalData = message.IsHistoricalData, - Time = message.Time, - Value = (decimal)(sum / count), - }); - - var diffValue = System.Math.Abs((decimal)(sum / count) - message.Value); - await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() - { - Figi = message.Figi, - Processor = nameof(KalmanPredictor) + "_diff" + window.ToString(), - Ticker = message.Ticker, - IsHistoricalData = message.IsHistoricalData, - Time = message.Time, - Value = diffValue > 3 ? diffValue : 0, - }); - } - - private async Task ProcessMessages() - { - - - while (await _messages.Reader.WaitToReadAsync()) - { - var message = await _messages.Reader.ReadAsync(); - await _cache.AddData(message); - try - { - //await ProcessCalman(message); - await ProcessMovAv(message, 3); - await ProcessTimeWindow(message, 5); - await ProcessTimeWindow(message, 15); - await ProcessTimeWindow(message, 120); - } - catch (Exception ex) - { - - } - } - } - } -} diff --git a/KLHZ.Trader.Core.Math/Declisions/Services/KalmanPredictor.cs b/KLHZ.Trader.Core.Math/Declisions/Services/KalmanPredictor.cs new file mode 100644 index 0000000..9a6a521 --- /dev/null +++ b/KLHZ.Trader.Core.Math/Declisions/Services/KalmanPredictor.cs @@ -0,0 +1,319 @@ +using KLHZ.Trader.Core.Contracts.Declisions.Dtos; +using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Intarfaces; +using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; +using KLHZ.Trader.Core.Math.Declisions.Dtos; +using MathNet.Filtering.Kalman; +using MathNet.Numerics.LinearAlgebra; +using Microsoft.Extensions.Hosting; +using System.Threading.Channels; +using static System.Runtime.InteropServices.JavaScript.JSType; + +namespace KLHZ.Trader.Core.Math.Declisions.Services +{ + public class KalmanPredictor : IHostedService + { + private readonly double r = 1000.0; // Measurement covariance + private readonly PriceHistoryCacheUnit _cache = new PriceHistoryCacheUnit(""); + private readonly Channel _messages = Channel.CreateUnbounded(); + private readonly IDataBus _dataBus; + + private DiscreteKalmanFilter? _dkf; + + public KalmanPredictor(IDataBus bus) + { + _dataBus = bus; + bus.AddChannel(nameof(KalmanPredictor), _messages); + _ = ProcessMessages(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + private async Task Init() + { + + } + + private async Task ProcessCalman(INewPrice message) + { + var H = Matrix.Build.Dense(2, 2, new[] { 1d, 0d, 0d, 1d }); // Measurement matrix + var length = _cache.Length; + + if (length > 2 && _dkf != null) + { + var data = await _cache.GetData(); + + var dt = (data.timestamps[data.prices.Length - 1] - data.timestamps[data.prices.Length - 2]).TotalSeconds; + if (dt < 0.0000001 || dt > 1800) return; + var F = Matrix.Build.Dense(2, 2, new[] { 1d, 0d, dt, 1 }); // State transition matrix + var R = Matrix.Build.Dense(2, 2, new[] { r, r / dt, r / dt, 2 * r / (dt * dt) }); + _dkf.Predict(F); + + var state = _dkf.State; + var value = state[0, 0] + dt * state[1, 0]; + if (!double.IsNaN(value)) + { + await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() + { + Figi = message.Figi, + Processor = nameof(KalmanPredictor) + "_predict", + Ticker = message.Ticker, + IsHistoricalData = message.IsHistoricalData, + Time = message.Time, + Value = (decimal)value, + }); + } + + + var dprice = data.prices[data.prices.Length - 1] - data.prices[data.prices.Length - 2]; + var k = dprice / dt; + var b = data.prices[data.prices.Length - 2]; + var z = Matrix.Build.Dense(2, 1, new[] { b, k }); + + _dkf.Update(z, H, R); + + var state2 = _dkf.State; + var value2 = state2[0, 0] + dt * state2[1, 0]; + if (!double.IsNaN(value)) + { + await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() + { + Figi = message.Figi, + Processor = nameof(KalmanPredictor) + "_state", + Ticker = message.Ticker, + IsHistoricalData = message.IsHistoricalData, + Time = message.Time, + Value = (decimal)value, + }); + } + } + else if (length >= 2) + { + var data = await _cache.GetData(); + var dprice = data.prices[data.prices.Length - 1] - data.prices[data.prices.Length - 2]; + var dt = (data.timestamps[data.prices.Length - 1] - data.timestamps[data.prices.Length - 2]).TotalSeconds; + if (dt < 0.0000001) return; + var k = dprice / dt; + var b = data.prices[data.prices.Length - 2]; + + + Matrix x0 = Matrix.Build.Dense(2, 1, new[] { b, k }); + Matrix P0 = Matrix.Build.Dense(2, 2, new[] { r, r / dt, r / dt, 2 * r / (dt * dt) }); + + _dkf = new DiscreteKalmanFilter(x0, P0); + } + } + private async Task ProcessMovAv(INewPrice message, int window) + { + var data = await _cache.GetData(); + if (data.prices.Length < window) return; + var sum = 0f; + var count = 0; + for (int i = 1; i <= window; i++) + { + sum += data.prices[data.prices.Length - i]; + count++; + } + + await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() + { + Figi = message.Figi, + Processor = nameof(KalmanPredictor) + "_mov_av_window" + window.ToString(), + Ticker = message.Ticker, + IsHistoricalData = message.IsHistoricalData, + Time = message.Time, + Value = (decimal)(sum / count), + }); + } + + private static (DateTime time, float value) CalcTimeWindowAverageValue(DateTime[] timestamps, float[] values, int window, int shift = 0) + { + var sum = values[values.Length - 1 - shift]; + var count = 1; + var startTime = timestamps[timestamps.Length - 1 - shift]; + for (int i = 2; i < values.Length && startTime - timestamps[values.Length - i - shift] < TimeSpan.FromSeconds(window); i++) + { + sum += values[values.Length - i - shift]; + count++; + } + return (startTime, sum / count); + } + + private static TradingEventsDto CheckByWindowAverageMean(DateTime[] timestamps, float[] prices, int size, float meanfullStep = 3f) + { + var twav15s = new float[size]; + var twav120s = new float[size]; + var times = new DateTime[size]; + + for (int shift = 0; shift < size; shift++) + { + var twav15 = CalcTimeWindowAverageValue(timestamps, prices, 15, shift); + var twav120 = CalcTimeWindowAverageValue(timestamps, prices, 120, shift); + twav15s[size - 1 - shift] = twav15.value; + twav120s[size - 1 - shift] = twav120.value; + times[size - 1 - shift] = twav120.time; + + if (shift > 0) + { + var isCrossing = Utils.Lines.IsLinesCrossing( + times[size - 1 - shift], + times[size - 2 - shift], + twav15s[size - 1 - shift], + twav15s[size - 2 - shift], + twav120s[size - 1 - shift], + twav120s[size - 2 - shift]); + if (shift == 1 && !isCrossing) //если нет пересечения скользящих средний с окном 120 и 15 секунд между + //текущей и предыдущей точкой - можно не продолжать выполнение. + { + break; + } + if (shift > 1 && isCrossing) + { + // если фильтрация окном 120 наползает на окно 15 сверху, потенциальное время открытия лонга и закрытия шорта + if (twav120s[size - 1] <= twav15s[size - 1] && twav120s[size - 2] > twav15s[size - 2] ) + { + if (twav15s[size - 1 - shift] - twav15s[size - 1] >= meanfullStep) + { + return new TradingEventsDto(false, true); + } + } + + // если фильтрация окном 15 наползает на окно 120 сверху, потенциальное время закрытия лонга и возможно открытия шорта + if (twav15s[size - 1] <= twav120s[size - 1] && twav15s[size - 2] > twav120s[size - 2]) + { + if (twav15s[size - 1 - shift] - twav15s[size - 1] <= - meanfullStep) + { + return new TradingEventsDto(true, false); + } + } + } + } + } + + return new TradingEventsDto(false, false); + } + + private async Task ProcessTimeWindow(INewPrice message, int window) + { + var data = await _cache.GetData(); + var sum = data.prices[data.prices.Length - 1]; + var count = 1; + var startTime = data.timestamps[data.prices.Length - 1]; + for (int i = 2; i < data.prices.Length && startTime - data.timestamps[data.prices.Length - i] < TimeSpan.FromSeconds(window); i++) + { + sum += data.prices[data.prices.Length - i]; + count++; + } + + await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() + { + Figi = message.Figi, + Processor = nameof(KalmanPredictor) + "_timeWindow" + window.ToString(), + Ticker = message.Ticker, + IsHistoricalData = message.IsHistoricalData, + Time = message.Time, + Value = (decimal)(sum / count), + }); + + var diffValue = System.Math.Abs((decimal)(sum / count) - message.Value); + await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() + { + Figi = message.Figi, + Processor = nameof(KalmanPredictor) + "_diff" + window.ToString(), + Ticker = message.Ticker, + IsHistoricalData = message.IsHistoricalData, + Time = message.Time, + Value = diffValue > 3 ? diffValue : 0, + }); + } + + private async Task ProcessMessages() + { + while (await _messages.Reader.WaitToReadAsync()) + { + var message = await _messages.Reader.ReadAsync(); + await _cache.AddData(message); + try + { + var data = await _cache.GetData(); + + + var size = 50; + var twav15s = new float[size]; + var twav120s = new float[size]; + var times = new DateTime[size]; + + for (int shift = 0; shift < size; shift++) + { + var twav15 = CalcTimeWindowAverageValue(data.timestamps, data.prices, 15, shift); + var twav120 = CalcTimeWindowAverageValue(data.timestamps, data.prices, 120, shift); + twav15s[size - 1-shift] = twav15.value; + twav120s[size - 1 - shift] = twav120.value; + times[size - 1 - shift] = twav120.time; + + if (shift>0) + { + var isCrossing = Utils.Lines.IsLinesCrossing( + times[size - 1 - shift], + times[size - 2 - shift], + twav15s[size - 1 - shift], + twav15s[size - 2 - shift], + twav120s[size - 1 - shift], + twav120s[size - 2 - shift]); + if (shift == 1 && !isCrossing) //если нет пересечения скользящих средний с окном 120 и 15 секунд между + //текущей и предыдущей точкой - можно не продолжать выполнение. + { + break; + } + if (shift>1 && isCrossing) + { + // если фильтрация окном 120 наползает на окно 15 сверху, потенциальное время открытия лонга + if (twav120s[size - 1] <= twav15s[size - 1] && twav120s[size - 2] > twav15s[size - 2]) + { + + } + + // если фильтрация окном 15 наползает на окно 120 сверху, потенциальное время закрытия лонга и открытия шорта + if (twav15s[size - 1] <= twav120s[size - 1] && twav15s[size - 2] > twav120s[size - 2]) + { + + } + } + } + } + + + //if (isCrossing && twav120_2 <= twav15_2)// если филтрация окном 120 наползает на окно 15 сверху, потенциальное время открытия лонга + //{ + // for (int i=shift;i<111; i++) + // { + + // } + //} + + //if (isCrossing && twav15_2 <= twav120_2)// если филтрация окном 15 наползает на окно 120 сверху, потенциальное время закрытия лонга + //{ + + //} + + //await ProcessCalman(message); + await ProcessMovAv(message, 3); + await ProcessTimeWindow(message, 5); + await ProcessTimeWindow(message, 15); + await ProcessTimeWindow(message, 120); + } + catch (Exception ex) + { + + } + } + } + } +} diff --git a/KLHZ.Trader.Core.Math/Declisions/Dtos/Services/PriceHistoryCacheUnit.cs b/KLHZ.Trader.Core.Math/Declisions/Services/PriceHistoryCacheUnit.cs similarity index 82% rename from KLHZ.Trader.Core.Math/Declisions/Dtos/Services/PriceHistoryCacheUnit.cs rename to KLHZ.Trader.Core.Math/Declisions/Services/PriceHistoryCacheUnit.cs index fe2a76f..cd7dd69 100644 --- a/KLHZ.Trader.Core.Math/Declisions/Dtos/Services/PriceHistoryCacheUnit.cs +++ b/KLHZ.Trader.Core.Math/Declisions/Services/PriceHistoryCacheUnit.cs @@ -1,11 +1,11 @@ using KLHZ.Trader.Core.Contracts.Declisions.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Intarfaces; -namespace KLHZ.Trader.Core.Math.Declisions.Dtos.Services +namespace KLHZ.Trader.Core.Math.Declisions.Services { public class PriceHistoryCacheUnit : IPriceHistoryCacheUnit { - public const int ArrayMaxLength = 500; + public const int CacheMaxLength = 500; public string Figi { get; init; } @@ -21,8 +21,8 @@ namespace KLHZ.Trader.Core.Math.Declisions.Dtos.Services } private readonly object _locker = new(); - private readonly float[] Prices = new float[ArrayMaxLength]; - private readonly DateTime[] Timestamps = new DateTime[ArrayMaxLength]; + private readonly float[] Prices = new float[CacheMaxLength]; + private readonly DateTime[] Timestamps = new DateTime[CacheMaxLength]; private int _length = 0; @@ -36,7 +36,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Dtos.Services Prices[Prices.Length - 1] = (float)priceChange.Value; Timestamps[Timestamps.Length - 1] = priceChange.Time; - if (_length < ArrayMaxLength) + if (_length < CacheMaxLength) { _length++; } @@ -68,7 +68,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Dtos.Services var selectedPriceChanges = priceChanges .OrderBy(pc => pc.Time) - .Skip(priceChanges.Length - ArrayMaxLength) + .Skip(priceChanges.Length - CacheMaxLength) .ToArray(); var prices = selectedPriceChanges .Select(pc => (float)pc.Value) @@ -80,7 +80,7 @@ namespace KLHZ.Trader.Core.Math.Declisions.Dtos.Services Array.Copy(prices, 0, Prices, Prices.Length - prices.Length, prices.Length); Array.Copy(times, 0, Timestamps, Timestamps.Length - times.Length, times.Length); - _length = times.Length > ArrayMaxLength ? ArrayMaxLength : times.Length; + _length = times.Length > CacheMaxLength ? CacheMaxLength : times.Length; } } } diff --git a/KLHZ.Trader.Core.Math/Declisions/Services/PriceHistoryCacheUnit2.cs b/KLHZ.Trader.Core.Math/Declisions/Services/PriceHistoryCacheUnit2.cs new file mode 100644 index 0000000..be6251b --- /dev/null +++ b/KLHZ.Trader.Core.Math/Declisions/Services/PriceHistoryCacheUnit2.cs @@ -0,0 +1,94 @@ +using KLHZ.Trader.Core.Contracts.Declisions.Interfaces; +using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Intarfaces; +using System.Reflection; + +namespace KLHZ.Trader.Core.Math.Declisions.Services +{ + public class PriceHistoryCacheUnit2 : IPriceHistoryCacheUnit + { + public const int CacheMaxLength = 500; + private const int _arrayMaxLength = 1500; + + public string Figi { get; init; } + + public int Length + { + get + { + lock (_locker) + { + return _length; + } + } + } + + private readonly object _locker = new(); + private readonly float[] Prices = new float[_arrayMaxLength]; + private readonly DateTime[] Timestamps = new DateTime[_arrayMaxLength]; + + private int _length = 0; + private int _pointer = -1; + + public ValueTask AddData(INewPrice priceChange) + { + lock (_locker) + { + _pointer++; + Prices[_pointer] = (float)priceChange.Value; + Timestamps[_pointer] = priceChange.Time; + if (_length < CacheMaxLength) + { + _length++; + } + + if (_pointer == _arrayMaxLength - 1) + { + Array.Copy(Prices, Prices.Length - CacheMaxLength, Prices, 0, CacheMaxLength); + Array.Copy(Timestamps, Timestamps.Length - CacheMaxLength, Timestamps, 0, CacheMaxLength); + _pointer = CacheMaxLength - 1; + } + } + return ValueTask.CompletedTask; + } + + public ValueTask<(DateTime[] timestamps, float[] prices)> GetData() + { + lock (_locker) + { + if (_pointer < 0) + { + return ValueTask.FromResult((Array.Empty(), Array.Empty ())); + } + else + { + var prices = new float[_length]; + var timestamps = new DateTime[_length]; + Array.Copy(Prices,1+ _pointer - _length, prices, 0, prices.Length); + Array.Copy(Timestamps,1+ _pointer - _length, timestamps, 0, timestamps.Length); + return ValueTask.FromResult((timestamps, prices)); + } + } + } + + public PriceHistoryCacheUnit2(string figi, params INewPrice[] priceChanges) + { + Figi = figi; + + + if (priceChanges.Length == 0) + { + return; + } + + var selectedPriceChanges = priceChanges + .OrderBy(pc => pc.Time) + .Skip(priceChanges.Length - CacheMaxLength) + .ToArray(); + + foreach ( var pc in selectedPriceChanges) + { + AddData(pc); + } + } + } +} diff --git a/KLHZ.Trader.Core.Math/KLHZ.Trader.Core.Math.csproj b/KLHZ.Trader.Core.Math/KLHZ.Trader.Core.Math.csproj index 8e5eb63..615143f 100644 --- a/KLHZ.Trader.Core.Math/KLHZ.Trader.Core.Math.csproj +++ b/KLHZ.Trader.Core.Math/KLHZ.Trader.Core.Math.csproj @@ -8,6 +8,7 @@ + diff --git a/KLHZ.Trader.Core.Math/Utils/Lines.cs b/KLHZ.Trader.Core.Math/Utils/Lines.cs new file mode 100644 index 0000000..0395a64 --- /dev/null +++ b/KLHZ.Trader.Core.Math/Utils/Lines.cs @@ -0,0 +1,34 @@ +namespace KLHZ.Trader.Core.Math.Utils +{ + public static class Lines + { + public static (float x, float y) LinesCrossing(float k1, float b1, float k2, float b2) + { + var x = (b2 - b1) / (k1 - k2); + var y = k1 * x + b1; + + return (x, y); + } + + public static bool IsLinesCrossing(DateTime time1, DateTime time2, float val1_1, float val1_2, float val2_1, float val2_2) + { + var dtime = (float)(time2 - time1).TotalSeconds; + + var dval1 = (val1_2 - val1_1); + var k1 = dval1 / dtime; + var b1 = val1_1; + + var dval2 = val2_2 - val2_1; + var k2 = dval2 / dtime; + var b2 = val2_1; + + if (k1 != k2) + { + var cross = LinesCrossing(k1, b1, k2, b2); + var crossingTimestamp = time1.AddSeconds(cross.x); + return crossingTimestamp >= time1 && crossingTimestamp <= time2; + } + return false; + } + } +} diff --git a/KLHZ.Trader.Core.Tests/HistoryCacheUnit2Tests.cs b/KLHZ.Trader.Core.Tests/HistoryCacheUnit2Tests.cs new file mode 100644 index 0000000..a55ea65 --- /dev/null +++ b/KLHZ.Trader.Core.Tests/HistoryCacheUnit2Tests.cs @@ -0,0 +1,223 @@ +using KLHZ.Trader.Core.DataLayer.Entities.Prices; +using KLHZ.Trader.Core.Math.Declisions.Services; + +namespace KLHZ.Trader.Core.Tests +{ + public class HistoryCacheUnit2Tests + { + private static PriceChange[] GetHistory(int count, string figi) + { + var res = new PriceChange[count]; + if (count != 0) + { + var startDt = DateTime.UtcNow.AddSeconds(-count); + for (int i = 0; i < count; i++) + { + startDt = startDt.AddSeconds(i); + res[i] = new PriceChange() + { + Figi = figi, + Ticker = figi + "_ticker", + Id = i, + Time = startDt, + Value = (decimal)(i + 0.5) + }; + } + } + return res; + } + + [Test] + public void Test1() + { + var count = 0; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2("", hist); + var data = cacheUnit.GetData().Result; + + Assert.That(data.prices.Length == count); + Assert.That(data.timestamps.Length == count); + } + + [Test] + public void Test2() + { + var count = 1; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2("", hist); + var data = cacheUnit.GetData().Result; + + Assert.That(data.prices.Length == count); + Assert.That(data.timestamps.Length == count); + for (var i = 0; i < count; i++) + { + Assert.That((float)hist[i].Value, Is.EqualTo(data.prices[i])); + Assert.That(hist[i].Time, Is.EqualTo(data.timestamps[i])); + } + } + + [Test] + public void Test3() + { + var count = 20; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2("", hist); + var data = cacheUnit.GetData().Result; + + Assert.That(data.prices.Length == count); + Assert.That(data.timestamps.Length == count); + + for (var i = 0; i < count; i++) + { + Assert.That((float)hist[i].Value, Is.EqualTo(data.prices[i])); + Assert.That(hist[i].Time, Is.EqualTo(data.timestamps[i])); + } + } + + [Test] + public void Test4() + { + var count = PriceHistoryCacheUnit.CacheMaxLength; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2("", hist); + var data = cacheUnit.GetData().Result; + + Assert.That(data.prices.Length == count); + Assert.That(data.timestamps.Length == count); + + for (var i = 0; i < count; i++) + { + Assert.That((float)hist[i].Value, Is.EqualTo(data.prices[i])); + Assert.That(hist[i].Time, Is.EqualTo(data.timestamps[i])); + } + } + + [Test] + public void Test5() + { + var shift = 7; + var count = PriceHistoryCacheUnit.CacheMaxLength + shift; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2("", hist); + var data = cacheUnit.GetData().Result; + + Assert.That(data.prices.Length == count - shift); + Assert.That(data.timestamps.Length == count - shift); + + for (var i = 0; i < count; i++) + { + var k = i + shift; + if (k < hist.Length) + { + Assert.That((float)hist[k].Value, Is.EqualTo(data.prices[i])); + Assert.That(hist[k].Time, Is.EqualTo(data.timestamps[i])); + } + } + } + + [Test] + public void Test6() + { + var shift = 10; + var count = PriceHistoryCacheUnit.CacheMaxLength + shift; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2("", hist); + var data = cacheUnit.GetData().Result; + + Assert.That(data.prices.Length == count - shift); + Assert.That(data.timestamps.Length == count - shift); + + for (var i = 0; i < count; i++) + { + var k = i + shift; + if (k < hist.Length) + { + Assert.That((float)hist[k].Value, Is.EqualTo(data.prices[i])); + Assert.That(hist[k].Time, Is.EqualTo(data.timestamps[i])); + } + } + } + + [Test] + public void Test7() + { + var shift = 334; + var count = PriceHistoryCacheUnit.CacheMaxLength + shift; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2("", hist); + var data = cacheUnit.GetData().Result; + + Assert.That(data.prices.Length == count - shift); + Assert.That(data.timestamps.Length == count - shift); + + for (var i = 0; i < count; i++) + { + var k = i + shift; + if (k < hist.Length) + { + Assert.That((float)hist[k].Value, Is.EqualTo(data.prices[i])); + Assert.That(hist[k].Time, Is.EqualTo(data.timestamps[i])); + } + } + } + + + [Test] + public void Test8() + { + var count = PriceHistoryCacheUnit.CacheMaxLength; + var figi = "figi"; + var hist = GetHistory(count, figi); + var cacheUnit = new PriceHistoryCacheUnit2("", hist); + var data = cacheUnit.GetData().Result; + + Assert.That(data.prices.Length == count); + Assert.That(data.timestamps.Length == count); + + for (var i = 0; i < count; i++) + { + Assert.That((float)hist[i].Value, Is.EqualTo(data.prices[i])); + Assert.That(hist[i].Time, Is.EqualTo(data.timestamps[i])); + } + + var newData1 = new PriceChange() { Figi = figi, Ticker = figi, Value = 100500, Time = DateTime.UtcNow }; + + cacheUnit.AddData(newData1); + + var data2 = cacheUnit.GetData().Result; + Assert.IsTrue(data2.prices[data2.prices.Length - 1] == (float)newData1.Value); + Assert.IsTrue(data2.timestamps[data2.timestamps.Length - 1] == newData1.Time); + + var newData2 = new PriceChange() { Figi = figi, Ticker = figi, Value = 100501, Time = DateTime.UtcNow }; + + cacheUnit.AddData(newData2); + + var data3 = cacheUnit.GetData().Result; + Assert.IsTrue(data3.prices[data3.prices.Length - 1] == (float)newData2.Value); + Assert.IsTrue(data3.timestamps[data3.timestamps.Length - 1] == newData2.Time); + } + + [Test] + public void Test9() + { + var cacheUnit = new PriceHistoryCacheUnit2(""); + for(int i= 0; i < 5*PriceHistoryCacheUnit2.CacheMaxLength; i++) + { + cacheUnit.AddData(new PriceChange() { Figi = "", Ticker = "", Value = i, Time = DateTime.UtcNow }); + if (i >= 500) + { + var data = cacheUnit.GetData().Result; + Assert.IsTrue(data.prices.Length == PriceHistoryCacheUnit2.CacheMaxLength); + Assert.IsTrue(data.prices.Last() == i); + } + } + } + } +} \ No newline at end of file diff --git a/KLHZ.Trader.Core.Tests/HistoryCacheUnitTests.cs b/KLHZ.Trader.Core.Tests/HistoryCacheUnitTests.cs index d9b3290..9783514 100644 --- a/KLHZ.Trader.Core.Tests/HistoryCacheUnitTests.cs +++ b/KLHZ.Trader.Core.Tests/HistoryCacheUnitTests.cs @@ -1,8 +1,9 @@ using KLHZ.Trader.Core.DataLayer.Entities.Prices; +using KLHZ.Trader.Core.Math.Declisions.Services; namespace KLHZ.Trader.Core.Tests { - public class Tests + public class HistoryCacheUnitTests { private static PriceChange[] GetHistory(int count, string figi) { @@ -79,7 +80,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void Test4() { - var count = PriceHistoryCacheUnit.ArrayMaxLength; + var count = PriceHistoryCacheUnit.CacheMaxLength; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit("", hist); @@ -99,7 +100,7 @@ namespace KLHZ.Trader.Core.Tests public void Test5() { var shift = 7; - var count = PriceHistoryCacheUnit.ArrayMaxLength + shift; + var count = PriceHistoryCacheUnit.CacheMaxLength + shift; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit("", hist); @@ -123,7 +124,7 @@ namespace KLHZ.Trader.Core.Tests public void Test6() { var shift = 10; - var count = PriceHistoryCacheUnit.ArrayMaxLength + shift; + var count = PriceHistoryCacheUnit.CacheMaxLength + shift; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit("", hist); @@ -147,7 +148,7 @@ namespace KLHZ.Trader.Core.Tests public void Test7() { var shift = 334; - var count = PriceHistoryCacheUnit.ArrayMaxLength + shift; + var count = PriceHistoryCacheUnit.CacheMaxLength + shift; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit("", hist); @@ -171,7 +172,7 @@ namespace KLHZ.Trader.Core.Tests [Test] public void Test8() { - var count = PriceHistoryCacheUnit.ArrayMaxLength; + var count = PriceHistoryCacheUnit.CacheMaxLength; var figi = "figi"; var hist = GetHistory(count, figi); var cacheUnit = new PriceHistoryCacheUnit("", hist); diff --git a/KLHZ.Trader.Core.Tests/HistoryProcessingInstrumentsTests.cs b/KLHZ.Trader.Core.Tests/HistoryProcessingInstrumentsTests.cs index e500f58..58e40a7 100644 --- a/KLHZ.Trader.Core.Tests/HistoryProcessingInstrumentsTests.cs +++ b/KLHZ.Trader.Core.Tests/HistoryProcessingInstrumentsTests.cs @@ -1,5 +1,6 @@ using KLHZ.Trader.Core.DataLayer.Entities.Prices; using KLHZ.Trader.Core.Declisions.Utils; +using KLHZ.Trader.Core.Math.Declisions.Services; namespace KLHZ.Trader.Core.Tests { diff --git a/KLHZ.Trader.Core.Tests/KLHZ.Trader.Core.Tests.csproj b/KLHZ.Trader.Core.Tests/KLHZ.Trader.Core.Tests.csproj index 498f3ad..161b44d 100644 --- a/KLHZ.Trader.Core.Tests/KLHZ.Trader.Core.Tests.csproj +++ b/KLHZ.Trader.Core.Tests/KLHZ.Trader.Core.Tests.csproj @@ -18,6 +18,7 @@ + diff --git a/KLHZ.Trader.Core.Tests/LinesProcessingTest.cs b/KLHZ.Trader.Core.Tests/LinesProcessingTest.cs new file mode 100644 index 0000000..22776da --- /dev/null +++ b/KLHZ.Trader.Core.Tests/LinesProcessingTest.cs @@ -0,0 +1,35 @@ +namespace KLHZ.Trader.Core.Tests +{ + public class LinesProcessingTest + { + [Test] + public void Test1() + { + var time2 = DateTime.UtcNow; + var time1 = time2.AddSeconds(-20); + + var val1_1 = 0.5f; + var val1_2 = -0.5f; + + var val2_1 = -0.5f; + var val2_2 = 0.5f; + + Assert.IsTrue(KLHZ.Trader.Core.Math.Utils.Lines.IsLinesCrossing(time1, time2, val1_1, val1_2, val2_1, val2_2)); + } + + [Test] + public void Test2() + { + var time2 = DateTime.UtcNow; + var time1 = time2.AddSeconds(-20); + + var val1_1 = 0.5f; + var val1_2 = -0.5f; + + var val2_1 = 0.5f; + var val2_2 = -0.5f; + + Assert.IsFalse(KLHZ.Trader.Core.Math.Utils.Lines.IsLinesCrossing(time1, time2, val1_1, val1_2, val2_1, val2_2)); + } + } +} \ No newline at end of file diff --git a/KLHZ.Trader.Core/Declisions/Services/ProcessedPricesLogger.cs b/KLHZ.Trader.Core/Declisions/Services/ProcessedPricesLogger.cs index 0512ce5..2d3f667 100644 --- a/KLHZ.Trader.Core/Declisions/Services/ProcessedPricesLogger.cs +++ b/KLHZ.Trader.Core/Declisions/Services/ProcessedPricesLogger.cs @@ -26,8 +26,6 @@ namespace KLHZ.Trader.Core.Declisions.Services private async Task ProcessMessages() { - using var context = await _dbContextFactory.CreateDbContextAsync(); - context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; var buffer = new List(); var lastWrite = DateTime.UtcNow; while (await _channel.Reader.WaitToReadAsync()) @@ -48,10 +46,12 @@ namespace KLHZ.Trader.Core.Declisions.Services if (buffer.Count > 10000 || (DateTime.UtcNow - lastWrite) > TimeSpan.FromSeconds(5) || _channel.Reader.Count == 0) { - await context.AddRangeAsync(buffer); - await context.SaveChangesAsync(); - buffer.Clear(); lastWrite = DateTime.UtcNow; + using var context = await _dbContextFactory.CreateDbContextAsync(); + context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; + await context.AddRangeAsync(buffer); + buffer.Clear(); + await context.SaveChangesAsync(); } } catch (Exception ex) diff --git a/KLHZ.Trader.Core/Declisions/Services/Trader.cs b/KLHZ.Trader.Core/Declisions/Services/Trader.cs index 2b019ff..bb86654 100644 --- a/KLHZ.Trader.Core/Declisions/Services/Trader.cs +++ b/KLHZ.Trader.Core/Declisions/Services/Trader.cs @@ -9,7 +9,7 @@ using KLHZ.Trader.Core.Exchange; using KLHZ.Trader.Core.Exchange.Extentions; using KLHZ.Trader.Core.Exchange.Models; using KLHZ.Trader.Core.Exchange.Services; -using KLHZ.Trader.Core.Math.Declisions.Dtos.Services; +using KLHZ.Trader.Core.Math.Declisions.Services; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -100,7 +100,7 @@ namespace KLHZ.Trader.Core.Declisions.Services } else { - data = new PriceHistoryCacheUnit(message.Figi, message); + data = new PriceHistoryCacheUnit2(message.Figi, message); _historyCash.TryAdd(message.Figi, data); } var result = await _tradingEventsDetector.Detect(data); diff --git a/KLHZ.Trader.Core/Declisions/Utils/HistoryProcessingInstruments.cs b/KLHZ.Trader.Core/Declisions/Utils/HistoryProcessingInstruments.cs index a231c77..01a9d98 100644 --- a/KLHZ.Trader.Core/Declisions/Utils/HistoryProcessingInstruments.cs +++ b/KLHZ.Trader.Core/Declisions/Utils/HistoryProcessingInstruments.cs @@ -1,6 +1,6 @@ using KLHZ.Trader.Core.Contracts.Declisions.Interfaces; using KLHZ.Trader.Core.Declisions.Dtos; -using KLHZ.Trader.Core.Math.Declisions.Dtos.Services; +using KLHZ.Trader.Core.Math.Declisions.Services; namespace KLHZ.Trader.Core.Declisions.Utils { diff --git a/KLHZ.Trader.sln b/KLHZ.Trader.sln index b247f42..8551b6e 100644 --- a/KLHZ.Trader.sln +++ b/KLHZ.Trader.sln @@ -26,7 +26,8 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postrgres", "postrgres", "{174A800A-6040-40CF-B331-8603E097CBAC}" ProjectSection(SolutionItems) = preProject KLHZ.Trader.Infrastructure\postgres\init.sql = KLHZ.Trader.Infrastructure\postgres\init.sql - migration1.sql = migration1.sql + KLHZ.Trader.Infrastructure\postgres\migration1.sql = KLHZ.Trader.Infrastructure\postgres\migration1.sql + KLHZ.Trader.Infrastructure\postgres\migration2.sql = KLHZ.Trader.Infrastructure\postgres\migration2.sql EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "loki", "loki", "{63D21DAF-FDF0-4F2D-A671-E9E59BB0CA5B}"