using Dalamud.Utility; using K4os.Compression.LZ4.Streams; using MareSynchronos.API.Data; using MareSynchronos.API.Dto.Files; using MareSynchronos.API.Routes; using MareSynchronos.FileCache; using MareSynchronos.PlayerData.Handlers; using MareSynchronos.Services.Mediator; using MareSynchronos.Utils; using MareSynchronos.WebAPI.Files.Models; using Microsoft.Extensions.Logging; using System.Net; using System.Net.Http.Json; using System.Security.Cryptography; namespace MareSynchronos.WebAPI.Files; public partial class FileDownloadManager : DisposableMediatorSubscriberBase { private readonly Dictionary _downloadStatus; private readonly FileCompactor _fileCompactor; private readonly FileCacheManager _fileDbManager; private readonly FileTransferOrchestrator _orchestrator; private readonly List _activeDownloadStreams; public FileDownloadManager(ILogger logger, MareMediator mediator, FileTransferOrchestrator orchestrator, FileCacheManager fileCacheManager, FileCompactor fileCompactor) : base(logger, mediator) { _downloadStatus = new Dictionary(StringComparer.Ordinal); _orchestrator = orchestrator; _fileDbManager = fileCacheManager; _fileCompactor = fileCompactor; _activeDownloadStreams = []; Mediator.Subscribe(this, (msg) => { if (!_activeDownloadStreams.Any()) return; var newLimit = _orchestrator.DownloadLimitPerSlot(); Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit); foreach (var stream in _activeDownloadStreams) { stream.BandwidthLimit = newLimit; } }); } public List CurrentDownloads { get; private set; } = []; public List ForbiddenTransfers => _orchestrator.ForbiddenTransfers; public bool IsDownloading => !CurrentDownloads.Any(); public void ClearDownload() { CurrentDownloads.Clear(); _downloadStatus.Clear(); } public async Task DownloadFiles(GameObjectHandler gameObject, List fileReplacementDto, CancellationToken ct) { Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles))); try { await DownloadFilesInternal(gameObject, fileReplacementDto, ct).ConfigureAwait(false); } catch { ClearDownload(); } finally { Mediator.Publish(new DownloadFinishedMessage(gameObject)); Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles))); } } protected override void Dispose(bool disposing) { ClearDownload(); foreach (var stream in _activeDownloadStreams.ToList()) { try { stream.Dispose(); } catch { // do nothing // } } base.Dispose(disposing); } private static byte ConvertReadByte(int byteOrEof) { if (byteOrEof == -1) { throw new EndOfStreamException(); } return (byte)byteOrEof; } private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream) { List hashName = []; List fileLength = []; var separator = (char)ConvertReadByte(fileBlockStream.ReadByte()); if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #"); bool readHash = false; while (true) { int readByte = fileBlockStream.ReadByte(); if (readByte == -1) throw new EndOfStreamException(); var readChar = (char)ConvertReadByte(readByte); if (readChar == ':') { readHash = true; continue; } if (readChar == '#') break; if (!readHash) hashName.Add(readChar); else fileLength.Add(readChar); } if (fileLength.Count == 0) fileLength.Add('0'); return (string.Join("", hashName), long.Parse(string.Join("", fileLength))); } private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List fileTransfer, string tempPath, IProgress progress, CancellationToken ct) { Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList())); await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false); _downloadStatus[downloadGroup].DownloadStatus = DownloadStatus.Downloading; HttpResponseMessage response = null!; var requestUrl = MareFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId); Logger.LogDebug("Downloading {requestUrl} for request {id}", requestUrl, requestId); try { response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false); response.EnsureSuccessStatusCode(); } catch (HttpRequestException ex) { Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode); if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized) { throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex); } } ThrottledStream? stream = null; try { var fileStream = File.Create(tempPath); await using (fileStream.ConfigureAwait(false)) { var bufferSize = response.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196; var buffer = new byte[bufferSize]; var bytesRead = 0; var limit = _orchestrator.DownloadLimitPerSlot(); Logger.LogTrace("Starting Download of {id} with a speed limit of {limit} to {tempPath}", requestId, limit, tempPath); stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit); _activeDownloadStreams.Add(stream); while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0) { ct.ThrowIfCancellationRequested(); await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false); progress.Report(bytesRead); } Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath); } } catch (OperationCanceledException) { throw; } catch (Exception) { try { if (!tempPath.IsNullOrEmpty()) File.Delete(tempPath); } catch { // ignore if file deletion fails } throw; } finally { if (stream != null) { _activeDownloadStreams.Remove(stream); await stream.DisposeAsync().ConfigureAwait(false); } } } public async Task> InitiateDownloadList(GameObjectHandler gameObjectHandler, List fileReplacement, CancellationToken ct) { Logger.LogDebug("Download start: {id}", gameObjectHandler.Name); List downloadFileInfoFromService = [ .. await FilesGetSizes(fileReplacement.Select(f => f.Hash).Distinct(StringComparer.Ordinal).ToList(), ct).ConfigureAwait(false), ]; Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash))); foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden)) { if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal))) { _orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto)); } } CurrentDownloads = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d)) .Where(d => d.CanBeTransferred).ToList(); return CurrentDownloads; } private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List fileReplacement, CancellationToken ct) { var downloadGroups = CurrentDownloads.GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal); foreach (var downloadGroup in downloadGroups) { _downloadStatus[downloadGroup.Key] = new FileDownloadStatus() { DownloadStatus = DownloadStatus.Initializing, TotalBytes = downloadGroup.Sum(c => c.Total), TotalFiles = 1, TransferredBytes = 0, TransferredFiles = 0 }; } Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus)); await Parallel.ForEachAsync(downloadGroups, new ParallelOptions() { MaxDegreeOfParallelism = downloadGroups.Count(), CancellationToken = ct, }, async (fileGroup, token) => { // let server predownload files var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri), fileGroup.Select(c => c.Hash), token).ConfigureAwait(false); Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri, await requestIdResponse.Content.ReadAsStringAsync(token).ConfigureAwait(false)); Guid requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"')); Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri); var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk"); FileInfo fi = new(blockFile); try { _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot; await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false); _downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue; Progress progress = new((bytesDownloaded) => { try { if (!_downloadStatus.TryGetValue(fileGroup.Key, out FileDownloadStatus? value)) return; value.TransferredBytes += bytesDownloaded; } catch (Exception ex) { Logger.LogWarning(ex, "Could not set download progress"); } }); await DownloadAndMungeFileHttpClient(fileGroup.Key, requestId, [.. fileGroup], blockFile, progress, token).ConfigureAwait(false); } catch (OperationCanceledException) { Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, gameObjectHandler); } catch (Exception ex) { _orchestrator.ReleaseDownloadSlot(); File.Delete(blockFile); Logger.LogError(ex, "{dlName}: Error during download of {id}", fi.Name, requestId); ClearDownload(); return; } FileStream? fileBlockStream = null; var threadCount = Math.Clamp((int)(Environment.ProcessorCount / 2.0f), 2, 8); var tasks = new List(); try { if (_downloadStatus.TryGetValue(fileGroup.Key, out var status)) { status.TransferredFiles = 1; status.DownloadStatus = DownloadStatus.Decompressing; } fileBlockStream = File.OpenRead(blockFile); while (fileBlockStream.Position < fileBlockStream.Length) { (string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream); var chunkPosition = fileBlockStream.Position; fileBlockStream.Position += fileLengthBytes; while (tasks.Count > threadCount && tasks.Where(t => !t.IsCompleted).Count() > 4) await Task.Delay(10, CancellationToken.None).ConfigureAwait(false); var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1]; var tmpPath = _fileDbManager.GetCacheFilePath(Guid.NewGuid().ToString(), "tmp"); var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension); Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", fi.Name, fileHash, fileLengthBytes, filePath); tasks.Add(Task.Run(() => { try { using var tmpFileStream = new HashingStream(new FileStream(tmpPath, new FileStreamOptions() { Mode = FileMode.CreateNew, Access = FileAccess.Write, Share = FileShare.None }), SHA1.Create()); using var fileChunkStream = new FileStream(blockFile, new FileStreamOptions() { BufferSize = 80000, Mode = FileMode.Open, Access = FileAccess.Read }); fileChunkStream.Position = chunkPosition; using var innerFileStream = new LimitedStream(fileChunkStream, fileLengthBytes); using var decoder = LZ4Frame.Decode(innerFileStream); long startPos = fileChunkStream.Position; decoder.AsStream().CopyTo(tmpFileStream); long readBytes = fileChunkStream.Position - startPos; if (readBytes != fileLengthBytes) { throw new EndOfStreamException(); } string calculatedHash = BitConverter.ToString(tmpFileStream.Finish()).Replace("-", "", StringComparison.Ordinal); if (!calculatedHash.Equals(fileHash, StringComparison.Ordinal)) { Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", calculatedHash, fileHash); return; } tmpFileStream.Close(); _fileCompactor.RenameAndCompact(filePath, tmpPath); PersistFileToStorage(fileHash, filePath, fileLengthBytes); } catch (EndOfStreamException) { Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", fi.Name, fileHash); } catch (Exception e) { Logger.LogWarning(e, "{dlName}: Error during decompression of {hash}", fi.Name, fileHash); foreach (var fr in fileReplacement) Logger.LogWarning(" - {h}: {x}", fr.Hash, fr.GamePaths[0]); } finally { if (File.Exists(tmpPath)) File.Delete(tmpPath); } }, CancellationToken.None)); } Task.WaitAll([..tasks], CancellationToken.None); } catch (EndOfStreamException) { Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", fi.Name); } catch (Exception ex) { Logger.LogError(ex, "{dlName}: Error during block file read", fi.Name); } finally { Task.WaitAll([..tasks], CancellationToken.None); _orchestrator.ReleaseDownloadSlot(); if (fileBlockStream != null) await fileBlockStream.DisposeAsync().ConfigureAwait(false); File.Delete(blockFile); } }).ConfigureAwait(false); Logger.LogDebug("Download end: {id}", gameObjectHandler); ClearDownload(); } private async Task> FilesGetSizes(List hashes, CancellationToken ct) { if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized"); var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false); return await response.Content.ReadFromJsonAsync>(cancellationToken: ct).ConfigureAwait(false) ?? []; } private void PersistFileToStorage(string fileHash, string filePath, long? compressedSize = null) { try { var entry = _fileDbManager.CreateCacheEntry(filePath, fileHash); if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase)) { _fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath); entry = null; } if (entry != null) entry.CompressedSize = compressedSize; } catch (Exception ex) { Logger.LogWarning(ex, "Error creating cache entry"); } } private async Task WaitForDownloadReady(List downloadFileTransfer, Guid requestId, CancellationToken downloadCt) { bool alreadyCancelled = false; try { CancellationTokenSource localTimeoutCts = new(); localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5)); CancellationTokenSource composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token); while (!_orchestrator.IsDownloadReady(requestId)) { try { await Task.Delay(250, composite.Token).ConfigureAwait(false); } catch (TaskCanceledException) { if (downloadCt.IsCancellationRequested) throw; var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId), downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false); req.EnsureSuccessStatusCode(); localTimeoutCts.Dispose(); composite.Dispose(); localTimeoutCts = new(); localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5)); composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token); } } localTimeoutCts.Dispose(); composite.Dispose(); Logger.LogDebug("Download {requestId} ready", requestId); } catch (TaskCanceledException) { try { await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false); alreadyCancelled = true; } catch { // ignore whatever happens here } throw; } finally { if (downloadCt.IsCancellationRequested && !alreadyCancelled) { try { await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false); } catch { // ignore whatever happens here } } _orchestrator.ClearDownloadRequest(requestId); } } }