forked from Eauldane/SnowcloakClient
Initial
This commit is contained in:
510
MareSynchronos/WebAPI/Files/FileDownloadManager.cs
Normal file
510
MareSynchronos/WebAPI/Files/FileDownloadManager.cs
Normal file
@@ -0,0 +1,510 @@
|
||||
using Dalamud.Utility;
|
||||
using K4os.Compression.LZ4.Streams;
|
||||
using MareSynchronos.API.Data;
|
||||
using MareSynchronos.API.Dto.Files;
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronos.FileCache;
|
||||
using MareSynchronos.PlayerData.Handlers;
|
||||
using MareSynchronos.Services.Mediator;
|
||||
using MareSynchronos.Utils;
|
||||
using MareSynchronos.WebAPI.Files.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System.Net;
|
||||
using System.Net.Http.Json;
|
||||
using System.Security.Cryptography;
|
||||
|
||||
namespace MareSynchronos.WebAPI.Files;
|
||||
|
||||
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
||||
{
|
||||
private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
|
||||
private readonly FileCompactor _fileCompactor;
|
||||
private readonly FileCacheManager _fileDbManager;
|
||||
private readonly FileTransferOrchestrator _orchestrator;
|
||||
private readonly List<ThrottledStream> _activeDownloadStreams;
|
||||
|
||||
public FileDownloadManager(ILogger<FileDownloadManager> logger, MareMediator mediator,
|
||||
FileTransferOrchestrator orchestrator,
|
||||
FileCacheManager fileCacheManager, FileCompactor fileCompactor) : base(logger, mediator)
|
||||
{
|
||||
_downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
|
||||
_orchestrator = orchestrator;
|
||||
_fileDbManager = fileCacheManager;
|
||||
_fileCompactor = fileCompactor;
|
||||
_activeDownloadStreams = [];
|
||||
|
||||
Mediator.Subscribe<DownloadLimitChangedMessage>(this, (msg) =>
|
||||
{
|
||||
if (!_activeDownloadStreams.Any()) return;
|
||||
var newLimit = _orchestrator.DownloadLimitPerSlot();
|
||||
Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
|
||||
foreach (var stream in _activeDownloadStreams)
|
||||
{
|
||||
stream.BandwidthLimit = newLimit;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public List<DownloadFileTransfer> CurrentDownloads { get; private set; } = [];
|
||||
|
||||
public List<FileTransfer> ForbiddenTransfers => _orchestrator.ForbiddenTransfers;
|
||||
|
||||
public bool IsDownloading => !CurrentDownloads.Any();
|
||||
|
||||
public void ClearDownload()
|
||||
{
|
||||
CurrentDownloads.Clear();
|
||||
_downloadStatus.Clear();
|
||||
}
|
||||
|
||||
public async Task DownloadFiles(GameObjectHandler gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct)
|
||||
{
|
||||
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
|
||||
try
|
||||
{
|
||||
await DownloadFilesInternal(gameObject, fileReplacementDto, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
ClearDownload();
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mediator.Publish(new DownloadFinishedMessage(gameObject));
|
||||
Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
|
||||
}
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
ClearDownload();
|
||||
foreach (var stream in _activeDownloadStreams.ToList())
|
||||
{
|
||||
try
|
||||
{
|
||||
stream.Dispose();
|
||||
}
|
||||
catch
|
||||
{
|
||||
// do nothing
|
||||
//
|
||||
}
|
||||
}
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
private static byte ConvertReadByte(int byteOrEof)
|
||||
{
|
||||
if (byteOrEof == -1)
|
||||
{
|
||||
throw new EndOfStreamException();
|
||||
}
|
||||
|
||||
return (byte)byteOrEof;
|
||||
}
|
||||
|
||||
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
|
||||
{
|
||||
List<char> hashName = [];
|
||||
List<char> fileLength = [];
|
||||
var separator = (char)ConvertReadByte(fileBlockStream.ReadByte());
|
||||
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
|
||||
|
||||
bool readHash = false;
|
||||
while (true)
|
||||
{
|
||||
int readByte = fileBlockStream.ReadByte();
|
||||
if (readByte == -1)
|
||||
throw new EndOfStreamException();
|
||||
|
||||
var readChar = (char)ConvertReadByte(readByte);
|
||||
if (readChar == ':')
|
||||
{
|
||||
readHash = true;
|
||||
continue;
|
||||
}
|
||||
if (readChar == '#') break;
|
||||
if (!readHash) hashName.Add(readChar);
|
||||
else fileLength.Add(readChar);
|
||||
}
|
||||
if (fileLength.Count == 0)
|
||||
fileLength.Add('0');
|
||||
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
|
||||
}
|
||||
|
||||
private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List<DownloadFileTransfer> fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
|
||||
{
|
||||
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList()));
|
||||
|
||||
await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false);
|
||||
|
||||
_downloadStatus[downloadGroup].DownloadStatus = DownloadStatus.Downloading;
|
||||
|
||||
HttpResponseMessage response = null!;
|
||||
var requestUrl = MareFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId);
|
||||
|
||||
Logger.LogDebug("Downloading {requestUrl} for request {id}", requestUrl, requestId);
|
||||
try
|
||||
{
|
||||
response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
|
||||
response.EnsureSuccessStatusCode();
|
||||
}
|
||||
catch (HttpRequestException ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode);
|
||||
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
|
||||
{
|
||||
throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
|
||||
}
|
||||
}
|
||||
|
||||
ThrottledStream? stream = null;
|
||||
try
|
||||
{
|
||||
var fileStream = File.Create(tempPath);
|
||||
await using (fileStream.ConfigureAwait(false))
|
||||
{
|
||||
var bufferSize = response.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196;
|
||||
var buffer = new byte[bufferSize];
|
||||
|
||||
var bytesRead = 0;
|
||||
var limit = _orchestrator.DownloadLimitPerSlot();
|
||||
Logger.LogTrace("Starting Download of {id} with a speed limit of {limit} to {tempPath}", requestId, limit, tempPath);
|
||||
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
|
||||
_activeDownloadStreams.Add(stream);
|
||||
while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
|
||||
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
|
||||
|
||||
progress.Report(bytesRead);
|
||||
}
|
||||
|
||||
Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!tempPath.IsNullOrEmpty())
|
||||
File.Delete(tempPath);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore if file deletion fails
|
||||
}
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (stream != null)
|
||||
{
|
||||
_activeDownloadStreams.Remove(stream);
|
||||
await stream.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<List<DownloadFileTransfer>> InitiateDownloadList(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
|
||||
{
|
||||
Logger.LogDebug("Download start: {id}", gameObjectHandler.Name);
|
||||
|
||||
List<DownloadFileDto> downloadFileInfoFromService =
|
||||
[
|
||||
.. await FilesGetSizes(fileReplacement.Select(f => f.Hash).Distinct(StringComparer.Ordinal).ToList(), ct).ConfigureAwait(false),
|
||||
];
|
||||
|
||||
Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
|
||||
|
||||
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
||||
{
|
||||
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
||||
{
|
||||
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
||||
}
|
||||
}
|
||||
|
||||
CurrentDownloads = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d))
|
||||
.Where(d => d.CanBeTransferred).ToList();
|
||||
|
||||
return CurrentDownloads;
|
||||
}
|
||||
|
||||
private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
|
||||
{
|
||||
var downloadGroups = CurrentDownloads.GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal);
|
||||
|
||||
foreach (var downloadGroup in downloadGroups)
|
||||
{
|
||||
_downloadStatus[downloadGroup.Key] = new FileDownloadStatus()
|
||||
{
|
||||
DownloadStatus = DownloadStatus.Initializing,
|
||||
TotalBytes = downloadGroup.Sum(c => c.Total),
|
||||
TotalFiles = 1,
|
||||
TransferredBytes = 0,
|
||||
TransferredFiles = 0
|
||||
};
|
||||
}
|
||||
|
||||
Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
|
||||
|
||||
await Parallel.ForEachAsync(downloadGroups, new ParallelOptions()
|
||||
{
|
||||
MaxDegreeOfParallelism = downloadGroups.Count(),
|
||||
CancellationToken = ct,
|
||||
},
|
||||
async (fileGroup, token) =>
|
||||
{
|
||||
// let server predownload files
|
||||
var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri),
|
||||
fileGroup.Select(c => c.Hash), token).ConfigureAwait(false);
|
||||
Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri,
|
||||
await requestIdResponse.Content.ReadAsStringAsync(token).ConfigureAwait(false));
|
||||
|
||||
Guid requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"'));
|
||||
|
||||
Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
|
||||
|
||||
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
|
||||
FileInfo fi = new(blockFile);
|
||||
try
|
||||
{
|
||||
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
|
||||
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
|
||||
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue;
|
||||
Progress<long> progress = new((bytesDownloaded) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!_downloadStatus.TryGetValue(fileGroup.Key, out FileDownloadStatus? value)) return;
|
||||
value.TransferredBytes += bytesDownloaded;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Could not set download progress");
|
||||
}
|
||||
});
|
||||
await DownloadAndMungeFileHttpClient(fileGroup.Key, requestId, [.. fileGroup], blockFile, progress, token).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, gameObjectHandler);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_orchestrator.ReleaseDownloadSlot();
|
||||
File.Delete(blockFile);
|
||||
Logger.LogError(ex, "{dlName}: Error during download of {id}", fi.Name, requestId);
|
||||
ClearDownload();
|
||||
return;
|
||||
}
|
||||
|
||||
FileStream? fileBlockStream = null;
|
||||
var threadCount = Math.Clamp((int)(Environment.ProcessorCount / 2.0f), 2, 8);
|
||||
var tasks = new List<Task>();
|
||||
try
|
||||
{
|
||||
if (_downloadStatus.TryGetValue(fileGroup.Key, out var status))
|
||||
{
|
||||
status.TransferredFiles = 1;
|
||||
status.DownloadStatus = DownloadStatus.Decompressing;
|
||||
}
|
||||
fileBlockStream = File.OpenRead(blockFile);
|
||||
while (fileBlockStream.Position < fileBlockStream.Length)
|
||||
{
|
||||
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
|
||||
var chunkPosition = fileBlockStream.Position;
|
||||
fileBlockStream.Position += fileLengthBytes;
|
||||
|
||||
while (tasks.Count > threadCount && tasks.Where(t => !t.IsCompleted).Count() > 4)
|
||||
await Task.Delay(10, CancellationToken.None).ConfigureAwait(false);
|
||||
|
||||
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
|
||||
var tmpPath = _fileDbManager.GetCacheFilePath(Guid.NewGuid().ToString(), "tmp");
|
||||
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
|
||||
|
||||
Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", fi.Name, fileHash, fileLengthBytes, filePath);
|
||||
|
||||
tasks.Add(Task.Run(() => {
|
||||
try
|
||||
{
|
||||
using var tmpFileStream = new HashingStream(new FileStream(tmpPath, new FileStreamOptions()
|
||||
{
|
||||
Mode = FileMode.CreateNew,
|
||||
Access = FileAccess.Write,
|
||||
Share = FileShare.None
|
||||
}), SHA1.Create());
|
||||
|
||||
using var fileChunkStream = new FileStream(blockFile, new FileStreamOptions()
|
||||
{
|
||||
BufferSize = 80000,
|
||||
Mode = FileMode.Open,
|
||||
Access = FileAccess.Read
|
||||
});
|
||||
fileChunkStream.Position = chunkPosition;
|
||||
|
||||
using var innerFileStream = new LimitedStream(fileChunkStream, fileLengthBytes);
|
||||
using var decoder = LZ4Frame.Decode(innerFileStream);
|
||||
long startPos = fileChunkStream.Position;
|
||||
decoder.AsStream().CopyTo(tmpFileStream);
|
||||
long readBytes = fileChunkStream.Position - startPos;
|
||||
|
||||
if (readBytes != fileLengthBytes)
|
||||
{
|
||||
throw new EndOfStreamException();
|
||||
}
|
||||
|
||||
string calculatedHash = BitConverter.ToString(tmpFileStream.Finish()).Replace("-", "", StringComparison.Ordinal);
|
||||
|
||||
if (!calculatedHash.Equals(fileHash, StringComparison.Ordinal))
|
||||
{
|
||||
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", calculatedHash, fileHash);
|
||||
return;
|
||||
}
|
||||
|
||||
tmpFileStream.Close();
|
||||
_fileCompactor.RenameAndCompact(filePath, tmpPath);
|
||||
PersistFileToStorage(fileHash, filePath, fileLengthBytes);
|
||||
}
|
||||
catch (EndOfStreamException)
|
||||
{
|
||||
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", fi.Name, fileHash);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Logger.LogWarning(e, "{dlName}: Error during decompression of {hash}", fi.Name, fileHash);
|
||||
|
||||
foreach (var fr in fileReplacement)
|
||||
Logger.LogWarning(" - {h}: {x}", fr.Hash, fr.GamePaths[0]);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (File.Exists(tmpPath))
|
||||
File.Delete(tmpPath);
|
||||
}
|
||||
}, CancellationToken.None));
|
||||
}
|
||||
|
||||
Task.WaitAll([..tasks], CancellationToken.None);
|
||||
}
|
||||
catch (EndOfStreamException)
|
||||
{
|
||||
Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", fi.Name);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogError(ex, "{dlName}: Error during block file read", fi.Name);
|
||||
}
|
||||
finally
|
||||
{
|
||||
Task.WaitAll([..tasks], CancellationToken.None);
|
||||
_orchestrator.ReleaseDownloadSlot();
|
||||
if (fileBlockStream != null)
|
||||
await fileBlockStream.DisposeAsync().ConfigureAwait(false);
|
||||
File.Delete(blockFile);
|
||||
}
|
||||
}).ConfigureAwait(false);
|
||||
|
||||
Logger.LogDebug("Download end: {id}", gameObjectHandler);
|
||||
|
||||
ClearDownload();
|
||||
}
|
||||
|
||||
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)
|
||||
{
|
||||
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
|
||||
var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false);
|
||||
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
|
||||
}
|
||||
|
||||
private void PersistFileToStorage(string fileHash, string filePath, long? compressedSize = null)
|
||||
{
|
||||
try
|
||||
{
|
||||
var entry = _fileDbManager.CreateCacheEntry(filePath, fileHash);
|
||||
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
_fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath);
|
||||
entry = null;
|
||||
}
|
||||
if (entry != null)
|
||||
entry.CompressedSize = compressedSize;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Error creating cache entry");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
|
||||
{
|
||||
bool alreadyCancelled = false;
|
||||
try
|
||||
{
|
||||
CancellationTokenSource localTimeoutCts = new();
|
||||
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
|
||||
CancellationTokenSource composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
|
||||
|
||||
while (!_orchestrator.IsDownloadReady(requestId))
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(250, composite.Token).ConfigureAwait(false);
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
if (downloadCt.IsCancellationRequested) throw;
|
||||
|
||||
var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
|
||||
downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false);
|
||||
req.EnsureSuccessStatusCode();
|
||||
localTimeoutCts.Dispose();
|
||||
composite.Dispose();
|
||||
localTimeoutCts = new();
|
||||
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
|
||||
composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
|
||||
}
|
||||
}
|
||||
|
||||
localTimeoutCts.Dispose();
|
||||
composite.Dispose();
|
||||
|
||||
Logger.LogDebug("Download {requestId} ready", requestId);
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
|
||||
alreadyCancelled = true;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore whatever happens here
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (downloadCt.IsCancellationRequested && !alreadyCancelled)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignore whatever happens here
|
||||
}
|
||||
}
|
||||
_orchestrator.ClearDownloadRequest(requestId);
|
||||
}
|
||||
}
|
||||
}
|
177
MareSynchronos/WebAPI/Files/FileTransferOrchestrator.cs
Normal file
177
MareSynchronos/WebAPI/Files/FileTransferOrchestrator.cs
Normal file
@@ -0,0 +1,177 @@
|
||||
using MareSynchronos.MareConfiguration;
|
||||
using MareSynchronos.Services.Mediator;
|
||||
using MareSynchronos.WebAPI.Files.Models;
|
||||
using MareSynchronos.WebAPI.SignalR;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Net.Http.Json;
|
||||
using System.Reflection;
|
||||
|
||||
namespace MareSynchronos.WebAPI.Files;
|
||||
|
||||
public class FileTransferOrchestrator : DisposableMediatorSubscriberBase
|
||||
{
|
||||
private readonly ConcurrentDictionary<Guid, bool> _downloadReady = new();
|
||||
private readonly HttpClient _httpClient;
|
||||
private readonly MareConfigService _mareConfig;
|
||||
private readonly Lock _semaphoreModificationLock = new();
|
||||
private readonly TokenProvider _tokenProvider;
|
||||
private int _availableDownloadSlots;
|
||||
private SemaphoreSlim _downloadSemaphore;
|
||||
private int CurrentlyUsedDownloadSlots => _availableDownloadSlots - _downloadSemaphore.CurrentCount;
|
||||
|
||||
public FileTransferOrchestrator(ILogger<FileTransferOrchestrator> logger, MareConfigService mareConfig,
|
||||
MareMediator mediator, TokenProvider tokenProvider) : base(logger, mediator)
|
||||
{
|
||||
_mareConfig = mareConfig;
|
||||
_tokenProvider = tokenProvider;
|
||||
_httpClient = new()
|
||||
{
|
||||
Timeout = TimeSpan.FromSeconds(3000)
|
||||
};
|
||||
var ver = Assembly.GetExecutingAssembly().GetName().Version;
|
||||
_httpClient.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("MareSynchronos", ver!.Major + "." + ver!.Minor + "." + ver!.Build));
|
||||
|
||||
_availableDownloadSlots = mareConfig.Current.ParallelDownloads;
|
||||
_downloadSemaphore = new(_availableDownloadSlots, _availableDownloadSlots);
|
||||
|
||||
Mediator.Subscribe<ConnectedMessage>(this, (msg) =>
|
||||
{
|
||||
FilesCdnUri = msg.Connection.ServerInfo.FileServerAddress;
|
||||
});
|
||||
|
||||
Mediator.Subscribe<DisconnectedMessage>(this, (msg) =>
|
||||
{
|
||||
FilesCdnUri = null;
|
||||
});
|
||||
Mediator.Subscribe<DownloadReadyMessage>(this, (msg) =>
|
||||
{
|
||||
_downloadReady[msg.RequestId] = true;
|
||||
});
|
||||
}
|
||||
|
||||
public Uri? FilesCdnUri { private set; get; }
|
||||
public List<FileTransfer> ForbiddenTransfers { get; } = [];
|
||||
public bool IsInitialized => FilesCdnUri != null;
|
||||
|
||||
public void ClearDownloadRequest(Guid guid)
|
||||
{
|
||||
_downloadReady.Remove(guid, out _);
|
||||
}
|
||||
|
||||
public bool IsDownloadReady(Guid guid)
|
||||
{
|
||||
if (_downloadReady.TryGetValue(guid, out bool isReady) && isReady)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public void ReleaseDownloadSlot()
|
||||
{
|
||||
try
|
||||
{
|
||||
_downloadSemaphore.Release();
|
||||
Mediator.Publish(new DownloadLimitChangedMessage());
|
||||
}
|
||||
catch (SemaphoreFullException)
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<HttpResponseMessage> SendRequestAsync(HttpMethod method, Uri uri,
|
||||
CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead)
|
||||
{
|
||||
using var requestMessage = new HttpRequestMessage(method, uri);
|
||||
return await SendRequestInternalAsync(requestMessage, ct, httpCompletionOption).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<HttpResponseMessage> SendRequestAsync<T>(HttpMethod method, Uri uri, T content, CancellationToken ct) where T : class
|
||||
{
|
||||
using var requestMessage = new HttpRequestMessage(method, uri);
|
||||
if (content is not ByteArrayContent)
|
||||
requestMessage.Content = JsonContent.Create(content);
|
||||
else
|
||||
requestMessage.Content = content as ByteArrayContent;
|
||||
return await SendRequestInternalAsync(requestMessage, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<HttpResponseMessage> SendRequestStreamAsync(HttpMethod method, Uri uri, ProgressableStreamContent content, CancellationToken ct)
|
||||
{
|
||||
using var requestMessage = new HttpRequestMessage(method, uri);
|
||||
requestMessage.Content = content;
|
||||
return await SendRequestInternalAsync(requestMessage, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task WaitForDownloadSlotAsync(CancellationToken token)
|
||||
{
|
||||
lock (_semaphoreModificationLock)
|
||||
{
|
||||
if (_availableDownloadSlots != _mareConfig.Current.ParallelDownloads && _availableDownloadSlots == _downloadSemaphore.CurrentCount)
|
||||
{
|
||||
_availableDownloadSlots = _mareConfig.Current.ParallelDownloads;
|
||||
_downloadSemaphore = new(_availableDownloadSlots, _availableDownloadSlots);
|
||||
}
|
||||
}
|
||||
|
||||
await _downloadSemaphore.WaitAsync(token).ConfigureAwait(false);
|
||||
Mediator.Publish(new DownloadLimitChangedMessage());
|
||||
}
|
||||
|
||||
public long DownloadLimitPerSlot()
|
||||
{
|
||||
var limit = _mareConfig.Current.DownloadSpeedLimitInBytes;
|
||||
if (limit <= 0) return 0;
|
||||
limit = _mareConfig.Current.DownloadSpeedType switch
|
||||
{
|
||||
MareConfiguration.Models.DownloadSpeeds.Bps => limit,
|
||||
MareConfiguration.Models.DownloadSpeeds.KBps => limit * 1024,
|
||||
MareConfiguration.Models.DownloadSpeeds.MBps => limit * 1024 * 1024,
|
||||
_ => limit,
|
||||
};
|
||||
var currentUsedDlSlots = CurrentlyUsedDownloadSlots;
|
||||
var avaialble = _availableDownloadSlots;
|
||||
var currentCount = _downloadSemaphore.CurrentCount;
|
||||
var dividedLimit = limit / (currentUsedDlSlots == 0 ? 1 : currentUsedDlSlots);
|
||||
if (dividedLimit < 0)
|
||||
{
|
||||
Logger.LogWarning("Calculated Bandwidth Limit is negative, returning Infinity: {value}, CurrentlyUsedDownloadSlots is {currentSlots}, " +
|
||||
"DownloadSpeedLimit is {limit}, available slots: {avail}, current count: {count}", dividedLimit, currentUsedDlSlots, limit, avaialble, currentCount);
|
||||
return long.MaxValue;
|
||||
}
|
||||
return Math.Clamp(dividedLimit, 1, long.MaxValue);
|
||||
}
|
||||
|
||||
private async Task<HttpResponseMessage> SendRequestInternalAsync(HttpRequestMessage requestMessage,
|
||||
CancellationToken? ct = null, HttpCompletionOption httpCompletionOption = HttpCompletionOption.ResponseContentRead)
|
||||
{
|
||||
var token = await _tokenProvider.GetToken().ConfigureAwait(false);
|
||||
requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
|
||||
|
||||
if (requestMessage.Content != null && requestMessage.Content is not StreamContent && requestMessage.Content is not ByteArrayContent)
|
||||
{
|
||||
var content = await ((JsonContent)requestMessage.Content).ReadAsStringAsync().ConfigureAwait(false);
|
||||
Logger.LogDebug("Sending {method} to {uri} (Content: {content})", requestMessage.Method, requestMessage.RequestUri, content);
|
||||
}
|
||||
else
|
||||
{
|
||||
Logger.LogDebug("Sending {method} to {uri}", requestMessage.Method, requestMessage.RequestUri);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
if (ct != null)
|
||||
return await _httpClient.SendAsync(requestMessage, httpCompletionOption, ct.Value).ConfigureAwait(false);
|
||||
return await _httpClient.SendAsync(requestMessage, httpCompletionOption).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Error during SendRequestInternal for {uri}", requestMessage.RequestUri);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
289
MareSynchronos/WebAPI/Files/FileUploadManager.cs
Normal file
289
MareSynchronos/WebAPI/Files/FileUploadManager.cs
Normal file
@@ -0,0 +1,289 @@
|
||||
using MareSynchronos.API.Data;
|
||||
using MareSynchronos.API.Dto.Files;
|
||||
using MareSynchronos.API.Routes;
|
||||
using MareSynchronos.FileCache;
|
||||
using MareSynchronos.MareConfiguration;
|
||||
using MareSynchronos.Services.Mediator;
|
||||
using MareSynchronos.Services.ServerConfiguration;
|
||||
using MareSynchronos.UI;
|
||||
using MareSynchronos.WebAPI.Files.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Net.Http.Json;
|
||||
|
||||
|
||||
namespace MareSynchronos.WebAPI.Files;
|
||||
|
||||
public sealed class FileUploadManager : DisposableMediatorSubscriberBase
|
||||
{
|
||||
private readonly FileCacheManager _fileDbManager;
|
||||
private readonly MareConfigService _mareConfigService;
|
||||
private readonly FileTransferOrchestrator _orchestrator;
|
||||
private readonly ServerConfigurationManager _serverManager;
|
||||
private readonly Dictionary<string, DateTime> _verifiedUploadedHashes = new(StringComparer.Ordinal);
|
||||
private CancellationTokenSource? _uploadCancellationTokenSource = new();
|
||||
|
||||
public FileUploadManager(ILogger<FileUploadManager> logger, MareMediator mediator,
|
||||
MareConfigService mareConfigService,
|
||||
FileTransferOrchestrator orchestrator,
|
||||
FileCacheManager fileDbManager,
|
||||
ServerConfigurationManager serverManager) : base(logger, mediator)
|
||||
{
|
||||
_mareConfigService = mareConfigService;
|
||||
_orchestrator = orchestrator;
|
||||
_fileDbManager = fileDbManager;
|
||||
_serverManager = serverManager;
|
||||
|
||||
Mediator.Subscribe<DisconnectedMessage>(this, (msg) =>
|
||||
{
|
||||
Reset();
|
||||
});
|
||||
}
|
||||
|
||||
public List<FileTransfer> CurrentUploads { get; } = [];
|
||||
public bool IsUploading => CurrentUploads.Count > 0;
|
||||
|
||||
public bool CancelUpload()
|
||||
{
|
||||
if (CurrentUploads.Any())
|
||||
{
|
||||
Logger.LogDebug("Cancelling current upload");
|
||||
_uploadCancellationTokenSource?.Cancel();
|
||||
_uploadCancellationTokenSource?.Dispose();
|
||||
_uploadCancellationTokenSource = null;
|
||||
CurrentUploads.Clear();
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public async Task DeleteAllFiles()
|
||||
{
|
||||
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
|
||||
|
||||
await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesDeleteAllFullPath(_orchestrator.FilesCdnUri!)).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<List<string>> UploadFiles(List<string> hashesToUpload, IProgress<string> progress, CancellationToken? ct = null)
|
||||
{
|
||||
Logger.LogDebug("Trying to upload files");
|
||||
var filesPresentLocally = hashesToUpload.Where(h => _fileDbManager.GetFileCacheByHash(h) != null).ToHashSet(StringComparer.Ordinal);
|
||||
var locallyMissingFiles = hashesToUpload.Except(filesPresentLocally, StringComparer.Ordinal).ToList();
|
||||
if (locallyMissingFiles.Any())
|
||||
{
|
||||
return locallyMissingFiles;
|
||||
}
|
||||
|
||||
progress.Report($"Starting upload for {filesPresentLocally.Count} files");
|
||||
|
||||
var filesToUpload = await FilesSend([.. filesPresentLocally], [], ct ?? CancellationToken.None).ConfigureAwait(false);
|
||||
|
||||
if (filesToUpload.Exists(f => f.IsForbidden))
|
||||
{
|
||||
return [.. filesToUpload.Where(f => f.IsForbidden).Select(f => f.Hash)];
|
||||
}
|
||||
|
||||
Task uploadTask = Task.CompletedTask;
|
||||
int i = 1;
|
||||
foreach (var file in filesToUpload)
|
||||
{
|
||||
progress.Report($"Uploading file {i++}/{filesToUpload.Count}. Please wait until the upload is completed.");
|
||||
Logger.LogDebug("[{hash}] Compressing", file);
|
||||
var data = await _fileDbManager.GetCompressedFileData(file.Hash, ct ?? CancellationToken.None).ConfigureAwait(false);
|
||||
Logger.LogDebug("[{hash}] Starting upload for {filePath}", data.Item1, _fileDbManager.GetFileCacheByHash(data.Item1)!.ResolvedFilepath);
|
||||
await uploadTask.ConfigureAwait(false);
|
||||
uploadTask = UploadFile(data.Item2, file.Hash, false, ct ?? CancellationToken.None);
|
||||
(ct ?? CancellationToken.None).ThrowIfCancellationRequested();
|
||||
}
|
||||
|
||||
await uploadTask.ConfigureAwait(false);
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
public async Task<CharacterData> UploadFiles(CharacterData data, List<UserData> visiblePlayers)
|
||||
{
|
||||
CancelUpload();
|
||||
|
||||
_uploadCancellationTokenSource = new CancellationTokenSource();
|
||||
var uploadToken = _uploadCancellationTokenSource.Token;
|
||||
Logger.LogDebug("Sending Character data {hash} to service {url}", data.DataHash.Value, _serverManager.CurrentRealApiUrl);
|
||||
|
||||
HashSet<string> unverifiedUploads = GetUnverifiedFiles(data);
|
||||
if (unverifiedUploads.Any())
|
||||
{
|
||||
await UploadUnverifiedFiles(unverifiedUploads, visiblePlayers, uploadToken).ConfigureAwait(false);
|
||||
Logger.LogInformation("Upload complete for {hash}", data.DataHash.Value);
|
||||
}
|
||||
|
||||
foreach (var kvp in data.FileReplacements)
|
||||
{
|
||||
data.FileReplacements[kvp.Key].RemoveAll(i => _orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, i.Hash, StringComparison.OrdinalIgnoreCase)));
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
base.Dispose(disposing);
|
||||
Reset();
|
||||
}
|
||||
|
||||
private async Task<List<UploadFileDto>> FilesSend(List<string> hashes, List<string> uids, CancellationToken ct)
|
||||
{
|
||||
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
|
||||
FilesSendDto filesSendDto = new()
|
||||
{
|
||||
FileHashes = hashes,
|
||||
UIDs = uids
|
||||
};
|
||||
var response = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.ServerFilesFilesSendFullPath(_orchestrator.FilesCdnUri!), filesSendDto, ct).ConfigureAwait(false);
|
||||
return await response.Content.ReadFromJsonAsync<List<UploadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
|
||||
}
|
||||
|
||||
private HashSet<string> GetUnverifiedFiles(CharacterData data)
|
||||
{
|
||||
HashSet<string> unverifiedUploadHashes = new(StringComparer.Ordinal);
|
||||
foreach (var item in data.FileReplacements.SelectMany(c => c.Value.Where(f => string.IsNullOrEmpty(f.FileSwapPath)).Select(v => v.Hash).Distinct(StringComparer.Ordinal)).Distinct(StringComparer.Ordinal).ToList())
|
||||
{
|
||||
if (!_verifiedUploadedHashes.TryGetValue(item, out var verifiedTime))
|
||||
{
|
||||
verifiedTime = DateTime.MinValue;
|
||||
}
|
||||
|
||||
if (verifiedTime < DateTime.UtcNow.Subtract(TimeSpan.FromMinutes(10)))
|
||||
{
|
||||
Logger.LogTrace("Verifying {item}, last verified: {date}", item, verifiedTime);
|
||||
unverifiedUploadHashes.Add(item);
|
||||
}
|
||||
}
|
||||
|
||||
return unverifiedUploadHashes;
|
||||
}
|
||||
|
||||
private void Reset()
|
||||
{
|
||||
_uploadCancellationTokenSource?.Cancel();
|
||||
_uploadCancellationTokenSource?.Dispose();
|
||||
_uploadCancellationTokenSource = null;
|
||||
CurrentUploads.Clear();
|
||||
_verifiedUploadedHashes.Clear();
|
||||
}
|
||||
|
||||
private async Task UploadFile(byte[] compressedFile, string fileHash, bool postProgress, CancellationToken uploadToken)
|
||||
{
|
||||
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
|
||||
|
||||
Logger.LogInformation("[{hash}] Uploading {size}", fileHash, UiSharedService.ByteToString(compressedFile.Length));
|
||||
|
||||
if (uploadToken.IsCancellationRequested) return;
|
||||
|
||||
try
|
||||
{
|
||||
await UploadFileStream(compressedFile, fileHash, munged: false, postProgress, uploadToken).ConfigureAwait(false);
|
||||
_verifiedUploadedHashes[fileHash] = DateTime.UtcNow;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "[{hash}] File upload cancelled", fileHash);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UploadFileStream(byte[] compressedFile, string fileHash, bool munged, bool postProgress, CancellationToken uploadToken)
|
||||
{
|
||||
if (munged)
|
||||
throw new InvalidOperationException();
|
||||
|
||||
using var ms = new MemoryStream(compressedFile);
|
||||
|
||||
Progress<UploadProgress>? prog = !postProgress ? null : new((prog) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
CurrentUploads.Single(f => string.Equals(f.Hash, fileHash, StringComparison.Ordinal)).Transferred = prog.Uploaded;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "[{hash}] Could not set upload progress", fileHash);
|
||||
}
|
||||
});
|
||||
|
||||
var streamContent = new ProgressableStreamContent(ms, prog);
|
||||
streamContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
|
||||
HttpResponseMessage response;
|
||||
if (!munged)
|
||||
response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadFullPath(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false);
|
||||
else
|
||||
response = await _orchestrator.SendRequestStreamAsync(HttpMethod.Post, MareFiles.ServerFilesUploadMunged(_orchestrator.FilesCdnUri!, fileHash), streamContent, uploadToken).ConfigureAwait(false);
|
||||
Logger.LogDebug("[{hash}] Upload Status: {status}", fileHash, response.StatusCode);
|
||||
}
|
||||
|
||||
private async Task UploadUnverifiedFiles(HashSet<string> unverifiedUploadHashes, List<UserData> visiblePlayers, CancellationToken uploadToken)
|
||||
{
|
||||
unverifiedUploadHashes = unverifiedUploadHashes.Where(h => _fileDbManager.GetFileCacheByHash(h) != null).ToHashSet(StringComparer.Ordinal);
|
||||
|
||||
Logger.LogDebug("Verifying {count} files", unverifiedUploadHashes.Count);
|
||||
var filesToUpload = await FilesSend([.. unverifiedUploadHashes], visiblePlayers.Select(p => p.UID).ToList(), uploadToken).ConfigureAwait(false);
|
||||
|
||||
foreach (var file in filesToUpload.Where(f => !f.IsForbidden).DistinctBy(f => f.Hash))
|
||||
{
|
||||
try
|
||||
{
|
||||
CurrentUploads.Add(new UploadFileTransfer(file)
|
||||
{
|
||||
Total = new FileInfo(_fileDbManager.GetFileCacheByHash(file.Hash)!.ResolvedFilepath).Length,
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Logger.LogWarning(ex, "Tried to request file {hash} but file was not present", file.Hash);
|
||||
}
|
||||
}
|
||||
|
||||
foreach (var file in filesToUpload.Where(c => c.IsForbidden))
|
||||
{
|
||||
if (_orchestrator.ForbiddenTransfers.TrueForAll(f => !string.Equals(f.Hash, file.Hash, StringComparison.Ordinal)))
|
||||
{
|
||||
_orchestrator.ForbiddenTransfers.Add(new UploadFileTransfer(file)
|
||||
{
|
||||
LocalFile = _fileDbManager.GetFileCacheByHash(file.Hash)?.ResolvedFilepath ?? string.Empty,
|
||||
});
|
||||
}
|
||||
|
||||
_verifiedUploadedHashes[file.Hash] = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
var totalSize = CurrentUploads.Sum(c => c.Total);
|
||||
Logger.LogDebug("Compressing and uploading files");
|
||||
Task uploadTask = Task.CompletedTask;
|
||||
foreach (var file in CurrentUploads.Where(f => f.CanBeTransferred && !f.IsTransferred).ToList())
|
||||
{
|
||||
Logger.LogDebug("[{hash}] Compressing", file);
|
||||
var data = await _fileDbManager.GetCompressedFileData(file.Hash, uploadToken).ConfigureAwait(false);
|
||||
CurrentUploads.Single(e => string.Equals(e.Hash, file.Hash, StringComparison.Ordinal)).Total = data.Item2.Length;
|
||||
Logger.LogDebug("[{hash}] Starting upload for {filePath}", file.Hash, _fileDbManager.GetFileCacheByHash(file.Hash)!.ResolvedFilepath);
|
||||
await uploadTask.ConfigureAwait(false);
|
||||
uploadTask = UploadFile(data.Item2, file.Hash, true, uploadToken);
|
||||
uploadToken.ThrowIfCancellationRequested();
|
||||
}
|
||||
|
||||
if (CurrentUploads.Any())
|
||||
{
|
||||
await uploadTask.ConfigureAwait(false);
|
||||
|
||||
var compressedSize = CurrentUploads.Sum(c => c.Total);
|
||||
Logger.LogDebug("Upload complete, compressed {size} to {compressed}", UiSharedService.ByteToString(totalSize), UiSharedService.ByteToString(compressedSize));
|
||||
|
||||
_fileDbManager.WriteOutFullCsv();
|
||||
}
|
||||
|
||||
foreach (var file in unverifiedUploadHashes.Where(c => !CurrentUploads.Exists(u => string.Equals(u.Hash, c, StringComparison.Ordinal))))
|
||||
{
|
||||
_verifiedUploadedHashes[file] = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
CurrentUploads.Clear();
|
||||
}
|
||||
}
|
24
MareSynchronos/WebAPI/Files/Models/DownloadFileTransfer.cs
Normal file
24
MareSynchronos/WebAPI/Files/Models/DownloadFileTransfer.cs
Normal file
@@ -0,0 +1,24 @@
|
||||
using MareSynchronos.API.Dto.Files;
|
||||
|
||||
namespace MareSynchronos.WebAPI.Files.Models;
|
||||
|
||||
public class DownloadFileTransfer : FileTransfer
|
||||
{
|
||||
public DownloadFileTransfer(DownloadFileDto dto) : base(dto)
|
||||
{
|
||||
}
|
||||
|
||||
public override bool CanBeTransferred => Dto.FileExists && !Dto.IsForbidden && Dto.Size > 0;
|
||||
public Uri DownloadUri => new(Dto.Url);
|
||||
public override long Total
|
||||
{
|
||||
set
|
||||
{
|
||||
// nothing to set
|
||||
}
|
||||
get => Dto.Size;
|
||||
}
|
||||
|
||||
public long TotalRaw => 0; // XXX
|
||||
private DownloadFileDto Dto => (DownloadFileDto)TransferDto;
|
||||
}
|
10
MareSynchronos/WebAPI/Files/Models/DownloadStatus.cs
Normal file
10
MareSynchronos/WebAPI/Files/Models/DownloadStatus.cs
Normal file
@@ -0,0 +1,10 @@
|
||||
namespace MareSynchronos.WebAPI.Files.Models;
|
||||
|
||||
public enum DownloadStatus
|
||||
{
|
||||
Initializing,
|
||||
WaitingForSlot,
|
||||
WaitingForQueue,
|
||||
Downloading,
|
||||
Decompressing
|
||||
}
|
10
MareSynchronos/WebAPI/Files/Models/FileDownloadStatus.cs
Normal file
10
MareSynchronos/WebAPI/Files/Models/FileDownloadStatus.cs
Normal file
@@ -0,0 +1,10 @@
|
||||
namespace MareSynchronos.WebAPI.Files.Models;
|
||||
|
||||
public class FileDownloadStatus
|
||||
{
|
||||
public DownloadStatus DownloadStatus { get; set; }
|
||||
public long TotalBytes { get; set; }
|
||||
public int TotalFiles { get; set; }
|
||||
public long TransferredBytes { get; set; }
|
||||
public int TransferredFiles { get; set; }
|
||||
}
|
27
MareSynchronos/WebAPI/Files/Models/FileTransfer.cs
Normal file
27
MareSynchronos/WebAPI/Files/Models/FileTransfer.cs
Normal file
@@ -0,0 +1,27 @@
|
||||
using MareSynchronos.API.Dto.Files;
|
||||
|
||||
namespace MareSynchronos.WebAPI.Files.Models;
|
||||
|
||||
public abstract class FileTransfer
|
||||
{
|
||||
protected readonly ITransferFileDto TransferDto;
|
||||
|
||||
protected FileTransfer(ITransferFileDto transferDto)
|
||||
{
|
||||
TransferDto = transferDto;
|
||||
}
|
||||
|
||||
public virtual bool CanBeTransferred => !TransferDto.IsForbidden && (TransferDto is not DownloadFileDto dto || dto.FileExists);
|
||||
public string ForbiddenBy => TransferDto.ForbiddenBy;
|
||||
public string Hash => TransferDto.Hash;
|
||||
public bool IsForbidden => TransferDto.IsForbidden;
|
||||
public bool IsInTransfer => Transferred != Total && Transferred > 0;
|
||||
public bool IsTransferred => Transferred == Total;
|
||||
public abstract long Total { get; set; }
|
||||
public long Transferred { get; set; } = 0;
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
return Hash;
|
||||
}
|
||||
}
|
@@ -0,0 +1,93 @@
|
||||
using System.Net;
|
||||
|
||||
namespace MareSynchronos.WebAPI.Files.Models;
|
||||
|
||||
public class ProgressableStreamContent : StreamContent
|
||||
{
|
||||
private const int _defaultBufferSize = 4096;
|
||||
private readonly int _bufferSize;
|
||||
private readonly IProgress<UploadProgress>? _progress;
|
||||
private readonly Stream _streamToWrite;
|
||||
private bool _contentConsumed;
|
||||
|
||||
public ProgressableStreamContent(Stream streamToWrite, IProgress<UploadProgress>? downloader)
|
||||
: this(streamToWrite, _defaultBufferSize, downloader)
|
||||
{
|
||||
}
|
||||
|
||||
public ProgressableStreamContent(Stream streamToWrite, int bufferSize, IProgress<UploadProgress>? progress)
|
||||
: base(streamToWrite, bufferSize)
|
||||
{
|
||||
if (streamToWrite == null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(streamToWrite));
|
||||
}
|
||||
|
||||
if (bufferSize <= 0)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(bufferSize));
|
||||
}
|
||||
|
||||
_streamToWrite = streamToWrite;
|
||||
_bufferSize = bufferSize;
|
||||
_progress = progress;
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
{
|
||||
_streamToWrite.Dispose();
|
||||
}
|
||||
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
|
||||
{
|
||||
PrepareContent();
|
||||
|
||||
var buffer = new byte[_bufferSize];
|
||||
var size = _streamToWrite.Length;
|
||||
var uploaded = 0;
|
||||
|
||||
using (_streamToWrite)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
var length = await _streamToWrite.ReadAsync(buffer).ConfigureAwait(false);
|
||||
if (length <= 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
uploaded += length;
|
||||
_progress?.Report(new UploadProgress(uploaded, size));
|
||||
await stream.WriteAsync(buffer.AsMemory(0, length)).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected override bool TryComputeLength(out long length)
|
||||
{
|
||||
length = _streamToWrite.Length;
|
||||
return true;
|
||||
}
|
||||
|
||||
private void PrepareContent()
|
||||
{
|
||||
if (_contentConsumed)
|
||||
{
|
||||
if (_streamToWrite.CanSeek)
|
||||
{
|
||||
_streamToWrite.Position = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
throw new InvalidOperationException("The stream has already been read.");
|
||||
}
|
||||
}
|
||||
|
||||
_contentConsumed = true;
|
||||
}
|
||||
}
|
13
MareSynchronos/WebAPI/Files/Models/UploadFileTransfer.cs
Normal file
13
MareSynchronos/WebAPI/Files/Models/UploadFileTransfer.cs
Normal file
@@ -0,0 +1,13 @@
|
||||
using MareSynchronos.API.Dto.Files;
|
||||
|
||||
namespace MareSynchronos.WebAPI.Files.Models;
|
||||
|
||||
public class UploadFileTransfer : FileTransfer
|
||||
{
|
||||
public UploadFileTransfer(UploadFileDto dto) : base(dto)
|
||||
{
|
||||
}
|
||||
|
||||
public string LocalFile { get; set; } = string.Empty;
|
||||
public override long Total { get; set; }
|
||||
}
|
3
MareSynchronos/WebAPI/Files/Models/UploadProgress.cs
Normal file
3
MareSynchronos/WebAPI/Files/Models/UploadProgress.cs
Normal file
@@ -0,0 +1,3 @@
|
||||
namespace MareSynchronos.WebAPI.Files.Models;
|
||||
|
||||
public record UploadProgress(long Uploaded, long Size);
|
231
MareSynchronos/WebAPI/Files/ThrottledStream.cs
Normal file
231
MareSynchronos/WebAPI/Files/ThrottledStream.cs
Normal file
@@ -0,0 +1,231 @@
|
||||
namespace MareSynchronos.WebAPI.Files
|
||||
{
|
||||
/// <summary>
|
||||
/// Class for streaming data with throttling support.
|
||||
/// Borrowed from https://github.com/bezzad/Downloader
|
||||
/// </summary>
|
||||
internal class ThrottledStream : Stream
|
||||
{
|
||||
public static long Infinite => long.MaxValue;
|
||||
private readonly Stream _baseStream;
|
||||
private long _bandwidthLimit;
|
||||
private readonly Bandwidth _bandwidth = new();
|
||||
private CancellationTokenSource _bandwidthChangeTokenSource = new();
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="T:ThrottledStream" /> class.
|
||||
/// </summary>
|
||||
/// <param name="baseStream">The base stream.</param>
|
||||
/// <param name="bandwidthLimit">The maximum bytes per second that can be transferred through the base stream.</param>
|
||||
/// <exception cref="ArgumentNullException">Thrown when <see cref="baseStream" /> is a null reference.</exception>
|
||||
/// <exception cref="ArgumentOutOfRangeException">Thrown when <see cref="BandwidthLimit" /> is a negative value.</exception>
|
||||
public ThrottledStream(Stream baseStream, long bandwidthLimit)
|
||||
{
|
||||
if (bandwidthLimit < 0)
|
||||
{
|
||||
throw new ArgumentOutOfRangeException(nameof(bandwidthLimit),
|
||||
bandwidthLimit, "The maximum number of bytes per second can't be negative.");
|
||||
}
|
||||
|
||||
_baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
|
||||
BandwidthLimit = bandwidthLimit;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bandwidth Limit (in B/s)
|
||||
/// </summary>
|
||||
/// <value>The maximum bytes per second.</value>
|
||||
public long BandwidthLimit
|
||||
{
|
||||
get => _bandwidthLimit;
|
||||
set
|
||||
{
|
||||
if (_bandwidthLimit == value) return;
|
||||
_bandwidthLimit = value <= 0 ? Infinite : value;
|
||||
_bandwidth.BandwidthLimit = _bandwidthLimit;
|
||||
_bandwidthChangeTokenSource.Cancel();
|
||||
_bandwidthChangeTokenSource.Dispose();
|
||||
_bandwidthChangeTokenSource = new();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool CanRead => _baseStream.CanRead;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool CanSeek => _baseStream.CanSeek;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override bool CanWrite => _baseStream.CanWrite;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override long Length => _baseStream.Length;
|
||||
|
||||
/// <inheritdoc />
|
||||
public override long Position
|
||||
{
|
||||
get => _baseStream.Position;
|
||||
set => _baseStream.Position = value;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Flush()
|
||||
{
|
||||
_baseStream.Flush();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override long Seek(long offset, SeekOrigin origin)
|
||||
{
|
||||
return _baseStream.Seek(offset, origin);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void SetLength(long value)
|
||||
{
|
||||
_baseStream.SetLength(value);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
Throttle(count).Wait();
|
||||
return _baseStream.Read(buffer, offset, count);
|
||||
}
|
||||
|
||||
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
await Throttle(count, cancellationToken).ConfigureAwait(false);
|
||||
#pragma warning disable CA1835
|
||||
return await _baseStream.ReadAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
|
||||
#pragma warning restore CA1835
|
||||
}
|
||||
|
||||
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await Throttle(buffer.Length, cancellationToken).ConfigureAwait(false);
|
||||
return await _baseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
Throttle(count).Wait();
|
||||
_baseStream.Write(buffer, offset, count);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
await Throttle(count, cancellationToken).ConfigureAwait(false);
|
||||
#pragma warning disable CA1835
|
||||
await _baseStream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(false);
|
||||
#pragma warning restore CA1835
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await Throttle(buffer.Length, cancellationToken).ConfigureAwait(false);
|
||||
await _baseStream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public override void Close()
|
||||
{
|
||||
_baseStream.Close();
|
||||
base.Close();
|
||||
}
|
||||
|
||||
private async Task Throttle(int transmissionVolume, CancellationToken token = default)
|
||||
{
|
||||
// Make sure the buffer isn't empty.
|
||||
if (BandwidthLimit > 0 && transmissionVolume > 0)
|
||||
{
|
||||
// Calculate the time to sleep.
|
||||
_bandwidth.CalculateSpeed(transmissionVolume);
|
||||
await Sleep(_bandwidth.PopSpeedRetrieveTime(), token).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task Sleep(int time, CancellationToken token = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (time > 0)
|
||||
{
|
||||
var bandWidthtoken = _bandwidthChangeTokenSource.Token;
|
||||
var linked = CancellationTokenSource.CreateLinkedTokenSource(token, bandWidthtoken).Token;
|
||||
await Task.Delay(time, linked).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public override string ToString()
|
||||
{
|
||||
return _baseStream?.ToString() ?? string.Empty;
|
||||
}
|
||||
|
||||
private sealed class Bandwidth
|
||||
{
|
||||
private long _count;
|
||||
private int _lastSecondCheckpoint;
|
||||
private long _lastTransferredBytesCount;
|
||||
private int _speedRetrieveTime;
|
||||
public double Speed { get; private set; }
|
||||
public double AverageSpeed { get; private set; }
|
||||
public long BandwidthLimit { get; set; }
|
||||
|
||||
public Bandwidth()
|
||||
{
|
||||
BandwidthLimit = long.MaxValue;
|
||||
Reset();
|
||||
}
|
||||
|
||||
public void CalculateSpeed(long receivedBytesCount)
|
||||
{
|
||||
int elapsedTime = Environment.TickCount - _lastSecondCheckpoint + 1;
|
||||
receivedBytesCount = Interlocked.Add(ref _lastTransferredBytesCount, receivedBytesCount);
|
||||
double momentSpeed = receivedBytesCount * 1000 / (double)elapsedTime; // B/s
|
||||
|
||||
if (1000 < elapsedTime)
|
||||
{
|
||||
Speed = momentSpeed;
|
||||
AverageSpeed = ((AverageSpeed * _count) + Speed) / (_count + 1);
|
||||
_count++;
|
||||
SecondCheckpoint();
|
||||
}
|
||||
|
||||
if (momentSpeed >= BandwidthLimit)
|
||||
{
|
||||
var expectedTime = receivedBytesCount * 1000 / BandwidthLimit;
|
||||
Interlocked.Add(ref _speedRetrieveTime, (int)expectedTime - elapsedTime);
|
||||
}
|
||||
}
|
||||
|
||||
public int PopSpeedRetrieveTime()
|
||||
{
|
||||
return Interlocked.Exchange(ref _speedRetrieveTime, 0);
|
||||
}
|
||||
|
||||
public void Reset()
|
||||
{
|
||||
SecondCheckpoint();
|
||||
_count = 0;
|
||||
Speed = 0;
|
||||
AverageSpeed = 0;
|
||||
}
|
||||
|
||||
private void SecondCheckpoint()
|
||||
{
|
||||
Interlocked.Exchange(ref _lastSecondCheckpoint, Environment.TickCount);
|
||||
Interlocked.Exchange(ref _lastTransferredBytesCount, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user