using KLHZ.Trader.Core.Contracts.Messaging.Dtos.Interfaces; using KLHZ.Trader.Core.Contracts.Messaging.Interfaces; using KLHZ.Trader.Core.DataLayer; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading.Channels; namespace KLHZ.Trader.Core.Common.Messaging.Services { public class ProcessedPricesLogger : IHostedService { private readonly ILogger _logger; private readonly IDataBus _dataBus; private readonly IDbContextFactory _dbContextFactory; private readonly Channel _channel = Channel.CreateUnbounded(); public ProcessedPricesLogger(IDataBus dataBus, IDbContextFactory dbContextFactory, ILogger logger) { _dataBus = dataBus; _dbContextFactory = dbContextFactory; _logger = logger; _dataBus.AddChannel(nameof(ProcessedPricesLogger), _channel); _ = ProcessMessages(); } private async Task ProcessMessages() { var buffer = new List(); var lastWrite = DateTime.UtcNow; while (await _channel.Reader.WaitToReadAsync()) { try { var message = await _channel.Reader.ReadAsync(); buffer.Add(new DataLayer.Entities.Prices.ProcessedPrice() { Figi = message.Figi, Processor = message.Processor, Ticker = message.Ticker, IsHistoricalData = message.IsHistoricalData, Time = message.Time, Value = message.Value, }); if (buffer.Count > 10000 || DateTime.UtcNow - lastWrite > TimeSpan.FromSeconds(5) || _channel.Reader.Count == 0) { lastWrite = DateTime.UtcNow; using var context = await _dbContextFactory.CreateDbContextAsync(); context.ChangeTracker.QueryTrackingBehavior = QueryTrackingBehavior.NoTracking; await context.AddRangeAsync(buffer); buffer.Clear(); await context.SaveChangesAsync(); } } catch (Exception ex) { } } } public Task StartAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } }