using KLHZ.Trader.Core.Contracts.Declisions.Dtos; using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Intarfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.Math.Declisions.Dtos; using MathNet.Filtering.Kalman; using MathNet.Numerics.LinearAlgebra; using Microsoft.Extensions.Hosting; using System.Threading.Channels; using static System.Runtime.InteropServices.JavaScript.JSType; namespace KLHZ.Trader.Core.Math.Declisions.Services { public class KalmanPredictor : IHostedService { private readonly double r = 1000.0; // Measurement covariance private readonly PriceHistoryCacheUnit _cache = new PriceHistoryCacheUnit(""); private readonly Channel _messages = Channel.CreateUnbounded(); private readonly IDataBus _dataBus; private DiscreteKalmanFilter? _dkf; public KalmanPredictor(IDataBus bus) { _dataBus = bus; bus.AddChannel(nameof(KalmanPredictor), _messages); _ = ProcessMessages(); } public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private async Task Init() { } private async Task ProcessCalman(INewPrice message) { var H = Matrix.Build.Dense(2, 2, new[] { 1d, 0d, 0d, 1d }); // Measurement matrix var length = _cache.Length; if (length > 2 && _dkf != null) { var data = await _cache.GetData(); var dt = (data.timestamps[data.prices.Length - 1] - data.timestamps[data.prices.Length - 2]).TotalSeconds; if (dt < 0.0000001 || dt > 1800) return; var F = Matrix.Build.Dense(2, 2, new[] { 1d, 0d, dt, 1 }); // State transition matrix var R = Matrix.Build.Dense(2, 2, new[] { r, r / dt, r / dt, 2 * r / (dt * dt) }); _dkf.Predict(F); var state = _dkf.State; var value = state[0, 0] + dt * state[1, 0]; if (!double.IsNaN(value)) { await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_predict", Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = (decimal)value, }); } var dprice = data.prices[data.prices.Length - 1] - data.prices[data.prices.Length - 2]; var k = dprice / dt; var b = data.prices[data.prices.Length - 2]; var z = Matrix.Build.Dense(2, 1, new[] { b, k }); _dkf.Update(z, H, R); var state2 = _dkf.State; var value2 = state2[0, 0] + dt * state2[1, 0]; if (!double.IsNaN(value)) { await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_state", Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = (decimal)value, }); } } else if (length >= 2) { var data = await _cache.GetData(); var dprice = data.prices[data.prices.Length - 1] - data.prices[data.prices.Length - 2]; var dt = (data.timestamps[data.prices.Length - 1] - data.timestamps[data.prices.Length - 2]).TotalSeconds; if (dt < 0.0000001) return; var k = dprice / dt; var b = data.prices[data.prices.Length - 2]; Matrix x0 = Matrix.Build.Dense(2, 1, new[] { b, k }); Matrix P0 = Matrix.Build.Dense(2, 2, new[] { r, r / dt, r / dt, 2 * r / (dt * dt) }); _dkf = new DiscreteKalmanFilter(x0, P0); } } private async Task ProcessMovAv(INewPrice message, int window) { var data = await _cache.GetData(); if (data.prices.Length < window) return; var sum = 0f; var count = 0; for (int i = 1; i <= window; i++) { sum += data.prices[data.prices.Length - i]; count++; } await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_mov_av_window" + window.ToString(), Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = (decimal)(sum / count), }); } private static (DateTime time, float value) CalcTimeWindowAverageValue(DateTime[] timestamps, float[] values, int window, int shift = 0) { var sum = values[values.Length - 1 - shift]; var count = 1; var startTime = timestamps[timestamps.Length - 1 - shift]; for (int i = 2; i < values.Length && startTime - timestamps[values.Length - i - shift] < TimeSpan.FromSeconds(window); i++) { sum += values[values.Length - i - shift]; count++; } return (startTime, sum / count); } private static TradingEventsDto CheckByWindowAverageMean(DateTime[] timestamps, float[] prices, int size, float meanfullStep = 3f) { var twav15s = new float[size]; var twav120s = new float[size]; var times = new DateTime[size]; for (int shift = 0; shift < size; shift++) { var twav15 = CalcTimeWindowAverageValue(timestamps, prices, 15, shift); var twav120 = CalcTimeWindowAverageValue(timestamps, prices, 120, shift); twav15s[size - 1 - shift] = twav15.value; twav120s[size - 1 - shift] = twav120.value; times[size - 1 - shift] = twav120.time; if (shift > 0) { var isCrossing = Utils.Lines.IsLinesCrossing( times[size - 1 - shift], times[size - 2 - shift], twav15s[size - 1 - shift], twav15s[size - 2 - shift], twav120s[size - 1 - shift], twav120s[size - 2 - shift]); if (shift == 1 && !isCrossing) //если нет пересечения скользящих средний с окном 120 и 15 секунд между //текущей и предыдущей точкой - можно не продолжать выполнение. { break; } if (shift > 1 && isCrossing) { // если фильтрация окном 120 наползает на окно 15 сверху, потенциальное время открытия лонга и закрытия шорта if (twav120s[size - 1] <= twav15s[size - 1] && twav120s[size - 2] > twav15s[size - 2] ) { if (twav15s[size - 1 - shift] - twav15s[size - 1] >= meanfullStep) { return new TradingEventsDto(false, true); } } // если фильтрация окном 15 наползает на окно 120 сверху, потенциальное время закрытия лонга и возможно открытия шорта if (twav15s[size - 1] <= twav120s[size - 1] && twav15s[size - 2] > twav120s[size - 2]) { if (twav15s[size - 1 - shift] - twav15s[size - 1] <= - meanfullStep) { return new TradingEventsDto(true, false); } } } } } return new TradingEventsDto(false, false); } private async Task ProcessTimeWindow(INewPrice message, int window) { var data = await _cache.GetData(); var sum = data.prices[data.prices.Length - 1]; var count = 1; var startTime = data.timestamps[data.prices.Length - 1]; for (int i = 2; i < data.prices.Length && startTime - data.timestamps[data.prices.Length - i] < TimeSpan.FromSeconds(window); i++) { sum += data.prices[data.prices.Length - i]; count++; } await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_timeWindow" + window.ToString(), Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = (decimal)(sum / count), }); var diffValue = System.Math.Abs((decimal)(sum / count) - message.Value); await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_diff" + window.ToString(), Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = diffValue > 3 ? diffValue : 0, }); } private async Task ProcessMessages() { while (await _messages.Reader.WaitToReadAsync()) { var message = await _messages.Reader.ReadAsync(); await _cache.AddData(message); try { var data = await _cache.GetData(); var size = 50; var twav15s = new float[size]; var twav120s = new float[size]; var times = new DateTime[size]; for (int shift = 0; shift < size; shift++) { var twav15 = CalcTimeWindowAverageValue(data.timestamps, data.prices, 15, shift); var twav120 = CalcTimeWindowAverageValue(data.timestamps, data.prices, 120, shift); twav15s[size - 1-shift] = twav15.value; twav120s[size - 1 - shift] = twav120.value; times[size - 1 - shift] = twav120.time; if (shift>0) { var isCrossing = Utils.Lines.IsLinesCrossing( times[size - 1 - shift], times[size - 2 - shift], twav15s[size - 1 - shift], twav15s[size - 2 - shift], twav120s[size - 1 - shift], twav120s[size - 2 - shift]); if (shift == 1 && !isCrossing) //если нет пересечения скользящих средний с окном 120 и 15 секунд между //текущей и предыдущей точкой - можно не продолжать выполнение. { break; } if (shift>1 && isCrossing) { // если фильтрация окном 120 наползает на окно 15 сверху, потенциальное время открытия лонга if (twav120s[size - 1] <= twav15s[size - 1] && twav120s[size - 2] > twav15s[size - 2]) { } // если фильтрация окном 15 наползает на окно 120 сверху, потенциальное время закрытия лонга и открытия шорта if (twav15s[size - 1] <= twav120s[size - 1] && twav15s[size - 2] > twav120s[size - 2]) { } } } } //if (isCrossing && twav120_2 <= twav15_2)// если филтрация окном 120 наползает на окно 15 сверху, потенциальное время открытия лонга //{ // for (int i=shift;i<111; i++) // { // } //} //if (isCrossing && twav15_2 <= twav120_2)// если филтрация окном 15 наползает на окно 120 сверху, потенциальное время закрытия лонга //{ //} //await ProcessCalman(message); await ProcessMovAv(message, 3); await ProcessTimeWindow(message, 5); await ProcessTimeWindow(message, 15); await ProcessTimeWindow(message, 120); } catch (Exception ex) { } } } } }