using MareSynchronos.API.Data; using MareSynchronos.API.Dto.Files; using MareSynchronos.API.Routes; using MareSynchronos.FileCache; using MareSynchronos.MareConfiguration; using MareSynchronos.Services.Mediator; using MareSynchronos.Services.ServerConfiguration; using MareSynchronos.UI; using MareSynchronos.WebAPI.Files.Models; using Microsoft.Extensions.Logging; using System.Net.Http.Headers; using System.Net.Http.Json; namespace MareSynchronos.WebAPI.Files; public sealed class FileUploadManager : DisposableMediatorSubscriberBase { private readonly FileCacheManager _fileDbManager; private readonly MareConfigService _mareConfigService; private readonly FileTransferOrchestrator _orchestrator; private readonly ServerConfigurationManager _serverManager; private readonly Dictionary _verifiedUploadedHashes = new(StringComparer.Ordinal); private CancellationTokenSource? _uploadCancellationTokenSource = new(); public FileUploadManager(ILogger logger, MareMediator mediator, MareConfigService mareConfigService, FileTransferOrchestrator orchestrator, FileCacheManager fileDbManager, ServerConfigurationManager serverManager) : base(logger, mediator) { _mareConfigService = mareConfigService; _orchestrator = orchestrator; _fileDbManager = fileDbManager; _serverManager = serverManager; Mediator.Subscribe(this, (msg) => { Reset(); }); } public List CurrentUploads { get; } = []; public bool IsUploading => CurrentUploads.Count > 0; public bool CancelUpload() { if (CurrentUploads.Any()) { Logger.LogDebug("Cancelling current upload"); _uploadCancellationTokenSource?.Cancel(); _uploadCancellationTokenSource?.Dispose(); _uploadCancellationTokenSource = null; CurrentUploads.Clear(); return true; } return false; } public async Task DeleteAllFiles() { if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized"); await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesDeleteAllFullPath(_orchestrator.FilesCdnUri!)).ConfigureAwait(false); } public async Task> UploadFiles(List hashesToUpload, IProgress progress, CancellationToken? ct = null) { Logger.LogDebug("Trying to upload files"); var filesPresentLocally = hashesToUpload.Where(h => _fileDbManager.GetFileCacheByHash(h) != null).ToHashSet(StringComparer.Ordinal); var locallyMissingFiles = hashesToUpload.Except(filesPresentLocally, StringComparer.Ordinal).ToList(); if (locallyMissingFiles.Any()) { return locallyMissingFiles; } progress.Report($"Starting upload for {filesPresentLocally.Count} files"); var filesToUpload = await FilesSend([.. filesPresentLocally], [], ct ?? CancellationToken.None).ConfigureAwait(false); if (filesToUpload.Exists(f => f.IsForbidden)) { return [.. filesToUpload.Where(f => f.IsForbidden).Select(f => f.Hash)]; } Task uploadTask = Task.CompletedTask; int i = 1; foreach (var file in filesToUpload) { progress.Report($"Uploading file {i++}/{filesToUpload.Count}. Please wait until the upload is completed."); Logger.LogDebug("[{hash}] Compressing", file); var data = await _fileDbManager.GetCompressedFileData(file.Hash, ct ?? CancellationToken.None).ConfigureAwait(false); Logger.LogDebug("[{hash}] Starting upload for {filePath}", data.Item1, _fileDbManager.GetFileCacheByHash(data.Item1)!.ResolvedFilepath); await uploadTask.ConfigureAwait(false); uploadTask = UploadFile(data.Item2, file.Hash, false, ct ?? CancellationToken.None); (ct ?? CancellationToken.None).ThrowIfCancellationRequested(); } await uploadTask.ConfigureAwait(false); return []; } public async Task UploadFiles(CharacterData data, List visiblePlayers) { CancelUpload(); _uploadCancellationTokenSource = new CancellationTokenSource(); var uploadToken = _uploadCancellationTokenSource.Token; Logger.LogDebug("Sending Character data {hash} to service {url}", data.DataHash.Value, _serverManager.CurrentRealApiUrl); HashSet unverifiedUploads = GetUnverifiedFiles(data); if (unverifiedUploads.Any()) { await UploadUnverifiedFiles(unverifiedUploads, visiblePlayers, uploadToken).ConfigureAwait(false); Logger.LogInformation("Upload complete for {hash}", data.DataHash.Value); } foreach (var kvp in data.FileReplacements) { data.FileReplacements[kvp.Key].RemoveAll(i => _orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, i.Hash, StringComparison.OrdinalIgnoreCase))); } return data; } protected override void Dispose(bool disposing) { base.Dispose(disposing); Reset(); } private async Task> FilesSend(List hashes, List uids, CancellationToken ct) { if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized"); FilesSendDto filesSendDto = new() { FileHashes = hashes, UIDs = uids }; var response = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesFilesSendFullPath(_orchestrator.FilesCdnUri!), filesSendDto, ct).ConfigureAwait(false); return await response.Content.ReadFromJsonAsync>(cancellationToken: ct).ConfigureAwait(false) ?? []; } private HashSet GetUnverifiedFiles(CharacterData data) { HashSet unverifiedUploadHashes = new(StringComparer.Ordinal); foreach (var item in data.FileReplacements.SelectMany(c => c.Value.Where(f => string.IsNullOrEmpty(f.FileSwapPath)).Select(v => v.Hash).Distinct(StringComparer.Ordinal)).Distinct(StringComparer.Ordinal).ToList()) { if (!_verifiedUploadedHashes.TryGetValue(item, out var verifiedTime)) { verifiedTime = DateTime.MinValue; } if (verifiedTime < DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(10))) { Logger.LogTrace("Verifying {item}, last verified: {date}", item, verifiedTime); unverifiedUploadHashes.Add(item); } } return unverifiedUploadHashes; } private void Reset() { _uploadCancellationTokenSource?.Cancel(); _uploadCancellationTokenSource?.Dispose(); _uploadCancellationTokenSource = null; CurrentUploads.Clear(); _verifiedUploadedHashes.Clear(); } private async Task UploadFile(byte[] compressedFile, string fileHash, bool postProgress, CancellationToken uploadToken) { if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized"); Logger.LogInformation("[{hash}] Uploading {size}", fileHash, UiSharedService.ByteToString(compressedFile.Length)); if (uploadToken.IsCancellationRequested) return; try { await UploadFileStream(compressedFile, fileHash, munged: false, postProgress, uploadToken).ConfigureAwait(false); _verifiedUploadedHashes[fileHash] = DateTime.UtcNow; } catch (Exception ex) { Logger.LogWarning(ex, "[{hash}] File upload cancelled", fileHash); } } private async Task UploadFileStream(byte[] compressedFile, string fileHash, bool munged, bool postProgress, CancellationToken uploadToken) { if (munged) throw new InvalidOperationException(); using var ms = new MemoryStream(compressedFile); Progress? prog = !postProgress ? null : new((prog) => { try { CurrentUploads.Single(f => string.Equals(f.Hash, fileHash, StringComparison.Ordinal)).Transferred = prog.Uploaded; } catch (Exception ex) { Logger.LogWarning(ex, "[{hash}] Could not set upload progress", fileHash); } }); var streamContent = new ProgressableStreamContent(ms, prog); streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream"); HttpResponseMessage response; if (!munged) response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false); else response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadMunged(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false); Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode); } private async Task UploadUnverifiedFiles(HashSet unverifiedUploadHashes, List visiblePlayers, CancellationToken uploadToken) { unverifiedUploadHashes = unverifiedUploadHashes.Where(h => _fileDbManager.GetFileCacheByHash(h) != null).ToHashSet(StringComparer.Ordinal); Logger.LogDebug("Verifying {count} files", unverifiedUploadHashes.Count); var filesToUpload = await FilesSend([.. unverifiedUploadHashes], visiblePlayers.Select(p => p.UID).ToList(), uploadToken).ConfigureAwait(false); foreach (var file in filesToUpload.Where(f => !f.IsForbidden).DistinctBy(f => f.Hash)) { try { CurrentUploads.Add(new UploadFileTransfer(file) { Total = new FileInfo(_fileDbManager.GetFileCacheByHash(file.Hash)!.ResolvedFilepath).Length, }); } catch (Exception ex) { Logger.LogWarning(ex, "Tried to request file {hash} but file was not present", file.Hash); } } foreach (var file in filesToUpload.Where(c => c.IsForbidden)) { if (_orchestrator.ForbiddenTransfers.TrueForAll(f => !string.Equals(f.Hash, file.Hash, StringComparison.Ordinal))) { _orchestrator.ForbiddenTransfers.Add(new UploadFileTransfer(file) { LocalFile = _fileDbManager.GetFileCacheByHash(file.Hash)?.ResolvedFilepath ?? string.Empty, }); } _verifiedUploadedHashes[file.Hash] = DateTime.UtcNow; } var totalSize = CurrentUploads.Sum(c => c.Total); Logger.LogDebug("Compressing and uploading files"); Task uploadTask = Task.CompletedTask; foreach (var file in CurrentUploads.Where(f => f.CanBeTransferred && !f.IsTransferred).ToList()) { Logger.LogDebug("[{hash}] Compressing", file); var data = await _fileDbManager.GetCompressedFileData(file.Hash, uploadToken).ConfigureAwait(false); CurrentUploads.Single(e => string.Equals(e.Hash, file.Hash, StringComparison.Ordinal)).Total = data.Item2.Length; Logger.LogDebug("[{hash}] Starting upload for {filePath}", file.Hash, _fileDbManager.GetFileCacheByHash(file.Hash)!.ResolvedFilepath); await uploadTask.ConfigureAwait(false); uploadTask = UploadFile(data.Item2, file.Hash, true, uploadToken); uploadToken.ThrowIfCancellationRequested(); } if (CurrentUploads.Any()) { await uploadTask.ConfigureAwait(false); var compressedSize = CurrentUploads.Sum(c => c.Total); Logger.LogDebug("Upload complete, compressed {size} to {compressed}", UiSharedService.ByteToString(totalSize), UiSharedService.ByteToString(compressedSize)); _fileDbManager.WriteOutFullCsv(); } foreach (var file in unverifiedUploadHashes.Where(c => !CurrentUploads.Exists(u => string.Equals(u.Hash, c, StringComparison.Ordinal)))) { _verifiedUploadedHashes[file] = DateTime.UtcNow; } CurrentUploads.Clear(); } }