using MareSynchronosShared.Metrics; using MareSynchronosShared.Services; using MareSynchronosShared.Utils.Configuration; using MareSynchronosStaticFilesServer.Utils; using System.Collections.Concurrent; using System.Timers; namespace MareSynchronosStaticFilesServer.Services; public class RequestQueueService : IHostedService { private readonly IClientReadyMessageService _clientReadyMessageService; private readonly CachedFileProvider _cachedFileProvider; private readonly ILogger _logger; private readonly MareMetrics _metrics; private readonly ConcurrentQueue _queue = new(); private readonly ConcurrentQueue _priorityQueue = new(); private readonly int _queueExpirationSeconds; private readonly SemaphoreSlim _queueProcessingSemaphore = new(1); private readonly UserQueueEntry[] _userQueueRequests; private int _queueLimitForReset; private readonly int _queueReleaseSeconds; private System.Timers.Timer _queueTimer; public RequestQueueService(MareMetrics metrics, IConfigurationService configurationService, ILogger logger, IClientReadyMessageService hubContext, CachedFileProvider cachedFileProvider) { _userQueueRequests = new UserQueueEntry[configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueSize), 50)]; _queueExpirationSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadTimeoutSeconds), 5); _queueLimitForReset = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueClearLimit), 15000); _queueReleaseSeconds = configurationService.GetValueOrDefault(nameof(StaticFilesServerConfiguration.DownloadQueueReleaseSeconds), 15); _metrics = metrics; _logger = logger; _clientReadyMessageService = hubContext; _cachedFileProvider = cachedFileProvider; } public void ActivateRequest(Guid request) { _logger.LogDebug("Activating request {guid}", request); var req = _userQueueRequests.First(f => f != null && f.UserRequest.RequestId == request); req.MarkActive(); } public async Task EnqueueUser(UserRequest request, bool isPriority, CancellationToken token) { while (_queueProcessingSemaphore.CurrentCount == 0) { await Task.Delay(50, token).ConfigureAwait(false); } _logger.LogDebug("Enqueueing req {guid} from {user} for {file}", request.RequestId, request.User, string.Join(", ", request.FileIds)); GetQueue(isPriority).Enqueue(request); } public void FinishRequest(Guid request) { var req = _userQueueRequests.FirstOrDefault(f => f != null && f.UserRequest.RequestId == request); if (req != null) { var idx = Array.IndexOf(_userQueueRequests, req); _logger.LogDebug("Finishing Request {guid}, clearing slot {idx}", request, idx); _userQueueRequests[idx] = null; } else { _logger.LogDebug("Request {guid} already cleared", request); } } public bool IsActiveProcessing(Guid request, string user, out UserRequest userRequest) { var userQueueRequest = _userQueueRequests.FirstOrDefault(u => u != null && u.UserRequest.RequestId == request && string.Equals(u.UserRequest.User, user, StringComparison.Ordinal)); userRequest = userQueueRequest?.UserRequest ?? null; return userQueueRequest != null && userRequest != null && userQueueRequest.ExpirationDate > DateTime.UtcNow; } public void RemoveFromQueue(Guid requestId, string user, bool isPriority) { var existingRequest = GetQueue(isPriority).FirstOrDefault(f => f.RequestId == requestId && string.Equals(f.User, user, StringComparison.Ordinal)); if (existingRequest == null) { var activeSlot = _userQueueRequests.FirstOrDefault(r => r != null && string.Equals(r.UserRequest.User, user, StringComparison.Ordinal) && r.UserRequest.RequestId == requestId); if (activeSlot != null) { var idx = Array.IndexOf(_userQueueRequests, activeSlot); if (idx >= 0) { _userQueueRequests[idx] = null; } } } else { existingRequest.IsCancelled = true; } } public Task StartAsync(CancellationToken cancellationToken) { _queueTimer = new System.Timers.Timer(500); _queueTimer.Elapsed += ProcessQueue; _queueTimer.AutoReset = true; _queueTimer.Start(); return Task.CompletedTask; } private ConcurrentQueue GetQueue(bool isPriority) => isPriority ? _priorityQueue : _queue; public bool StillEnqueued(Guid request, string user, bool isPriority) { return GetQueue(isPriority).Any(c => c.RequestId == request && string.Equals(c.User, user, StringComparison.Ordinal)); } public Task StopAsync(CancellationToken cancellationToken) { _queueTimer.Stop(); return Task.CompletedTask; } private void DequeueIntoSlot(UserRequest userRequest, int slot) { _logger.LogDebug("Dequeueing {req} into {i}: {user} with {file}", userRequest.RequestId, slot, userRequest.User, string.Join(", ", userRequest.FileIds)); _userQueueRequests[slot] = new(userRequest, DateTime.UtcNow.AddSeconds(_queueExpirationSeconds)); _clientReadyMessageService.SendDownloadReady(userRequest.User, userRequest.RequestId); } private void ProcessQueue(object src, ElapsedEventArgs e) { if (_queueProcessingSemaphore.CurrentCount == 0) return; _queueProcessingSemaphore.Wait(); try { if (_queue.Count(c => !c.IsCancelled) > _queueLimitForReset) { _queue.Clear(); return; } for (int i = 0; i < _userQueueRequests.Length; i++) { try { if (_userQueueRequests[i] != null && (((!_userQueueRequests[i].IsActive && _userQueueRequests[i].ExpirationDate < DateTime.UtcNow)) || (_userQueueRequests[i].IsActive && _userQueueRequests[i].ActivationDate < DateTime.UtcNow.Subtract(TimeSpan.FromSeconds(_queueReleaseSeconds)))) ) { _logger.LogDebug("Expiring request {guid} slot {slot}", _userQueueRequests[i].UserRequest.RequestId, i); _userQueueRequests[i] = null; } if (_userQueueRequests[i] != null) continue; while (true) { if (!_priorityQueue.All(u => _cachedFileProvider.AnyFilesDownloading(u.FileIds)) && _priorityQueue.TryDequeue(out var prioRequest)) { if (prioRequest.IsCancelled) { continue; } if (_cachedFileProvider.AnyFilesDownloading(prioRequest.FileIds)) { _priorityQueue.Enqueue(prioRequest); continue; } DequeueIntoSlot(prioRequest, i); break; } if (!_queue.All(u => _cachedFileProvider.AnyFilesDownloading(u.FileIds)) && _queue.TryDequeue(out var request)) { if (request.IsCancelled) { continue; } if (_cachedFileProvider.AnyFilesDownloading(request.FileIds)) { _queue.Enqueue(request); continue; } DequeueIntoSlot(request, i); break; } break; } } catch (Exception ex) { _logger.LogWarning(ex, "Error during inside queue processing"); } } } catch (Exception ex) { _logger.LogError(ex, "Error during Queue processing"); } finally { _queueProcessingSemaphore.Release(); } _metrics.SetGaugeTo(MetricsAPI.GaugeQueueFree, _userQueueRequests.Count(c => c == null)); _metrics.SetGaugeTo(MetricsAPI.GaugeQueueActive, _userQueueRequests.Count(c => c != null && c.IsActive)); _metrics.SetGaugeTo(MetricsAPI.GaugeQueueInactive, _userQueueRequests.Count(c => c != null && !c.IsActive)); _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueue, _queue.Count(q => !q.IsCancelled)); _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadQueueCancelled, _queue.Count(q => q.IsCancelled)); _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueue, _priorityQueue.Count(q => !q.IsCancelled)); _metrics.SetGaugeTo(MetricsAPI.GaugeDownloadPriorityQueueCancelled, _priorityQueue.Count(q => q.IsCancelled)); } }