using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Intarfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using MathNet.Filtering.Kalman; using MathNet.Numerics.LinearAlgebra; using Microsoft.Extensions.Hosting; using System.Threading.Channels; namespace KLHZ.Trader.Core.Math.Declisions.Dtos.Services { public class KalmanPredictor : IHostedService { private readonly double r = 1000.0; // Measurement covariance private readonly PriceHistoryCacheUnit _cache = new PriceHistoryCacheUnit(""); private readonly Channel _messages = Channel.CreateUnbounded(); private readonly IDataBus _dataBus; private DiscreteKalmanFilter? _dkf; public KalmanPredictor(IDataBus bus) { _dataBus = bus; bus.AddChannel(nameof(KalmanPredictor), _messages); _ = ProcessMessages(); } public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } private async Task Init() { } private async Task ProcessCalman(INewPrice message) { var H = Matrix.Build.Dense(2, 2, new[] { 1d, 0d, 0d, 1d }); // Measurement matrix var length = _cache.Length; if (length > 2 && _dkf != null) { var data = await _cache.GetData(); var dt = (data.timestamps[data.prices.Length - 1] - data.timestamps[data.prices.Length - 2]).TotalSeconds; if (dt < 0.0000001 || dt > 1800) return; var F = Matrix.Build.Dense(2, 2, new[] { 1d, 0d, dt, 1 }); // State transition matrix var R = Matrix.Build.Dense(2, 2, new[] { r, r / dt, r / dt, 2 * r / (dt * dt) }); _dkf.Predict(F); var state = _dkf.State; var value = state[0, 0] + dt * state[1, 0]; if (!double.IsNaN(value)) { await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_predict", Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = (decimal)value, }); } var dprice = data.prices[data.prices.Length - 1] - data.prices[data.prices.Length - 2]; var k = dprice / dt; var b = data.prices[data.prices.Length - 2]; var z = Matrix.Build.Dense(2, 1, new[] { b, k }); _dkf.Update(z, H, R); var state2 = _dkf.State; var value2 = state2[0, 0] + dt * state2[1, 0]; if (!double.IsNaN(value)) { await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_state", Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = (decimal)value, }); } } else if (length >= 2) { var data = await _cache.GetData(); var dprice = data.prices[data.prices.Length - 1] - data.prices[data.prices.Length - 2]; var dt = (data.timestamps[data.prices.Length - 1] - data.timestamps[data.prices.Length - 2]).TotalSeconds; if (dt < 0.0000001) return; var k = dprice / dt; var b = data.prices[data.prices.Length - 2]; Matrix x0 = Matrix.Build.Dense(2, 1, new[] { b, k }); Matrix P0 = Matrix.Build.Dense(2, 2, new[] { r, r / dt, r / dt, 2 * r / (dt * dt) }); _dkf = new DiscreteKalmanFilter(x0, P0); } } private async Task ProcessMovAv(INewPrice message, int window) { var data = await _cache.GetData(); if (data.prices.Length < window) return; var sum = 0f; var count = 0; for (int i = 1; (i <= window); i++) { sum += data.prices[data.prices.Length - i]; count++; } await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_mov_av_window" + window.ToString(), Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = (decimal)(sum / count), }); } private async Task ProcessTimeWindow(INewPrice message, int window) { var data = await _cache.GetData(); var sum = data.prices[data.prices.Length - 1]; var count = 1; var startTime = data.timestamps[data.prices.Length - 1]; for (int i = 2; (i < data.prices.Length && (startTime - data.timestamps[data.prices.Length - i]) < TimeSpan.FromSeconds(window)); i++) { sum += data.prices[data.prices.Length - i]; count++; } await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_timeWindow" + window.ToString(), Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = (decimal)(sum / count), }); var diffValue = System.Math.Abs((decimal)(sum / count) - message.Value); await _dataBus.BroadcastProcessedPrice(new ProcessedPrice() { Figi = message.Figi, Processor = nameof(KalmanPredictor) + "_diff" + window.ToString(), Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = diffValue > 3 ? diffValue : 0, }); } private async Task ProcessMessages() { while (await _messages.Reader.WaitToReadAsync()) { var message = await _messages.Reader.ReadAsync(); await _cache.AddData(message); try { //await ProcessCalman(message); await ProcessMovAv(message, 3); await ProcessTimeWindow(message, 5); await ProcessTimeWindow(message, 15); await ProcessTimeWindow(message, 120); } catch (Exception ex) { } } } } }