forked from Eauldane/SnowcloakClient
510 lines
21 KiB
C#
510 lines
21 KiB
C#
using Dalamud.Utility;
|
|
using K4os.Compression.LZ4.Streams;
|
|
using MareSynchronos.API.Data;
|
|
using MareSynchronos.API.Dto.Files;
|
|
using MareSynchronos.API.Routes;
|
|
using MareSynchronos.FileCache;
|
|
using MareSynchronos.PlayerData.Handlers;
|
|
using MareSynchronos.Services.Mediator;
|
|
using MareSynchronos.Utils;
|
|
using MareSynchronos.WebAPI.Files.Models;
|
|
using Microsoft.Extensions.Logging;
|
|
using System.Net;
|
|
using System.Net.Http.Json;
|
|
using System.Security.Cryptography;
|
|
|
|
namespace MareSynchronos.WebAPI.Files;
|
|
|
|
public partial class FileDownloadManager : DisposableMediatorSubscriberBase
|
|
{
|
|
private readonly Dictionary<string, FileDownloadStatus> _downloadStatus;
|
|
private readonly FileCompactor _fileCompactor;
|
|
private readonly FileCacheManager _fileDbManager;
|
|
private readonly FileTransferOrchestrator _orchestrator;
|
|
private readonly List<ThrottledStream> _activeDownloadStreams;
|
|
|
|
public FileDownloadManager(ILogger<FileDownloadManager> logger, MareMediator mediator,
|
|
FileTransferOrchestrator orchestrator,
|
|
FileCacheManager fileCacheManager, FileCompactor fileCompactor) : base(logger, mediator)
|
|
{
|
|
_downloadStatus = new Dictionary<string, FileDownloadStatus>(StringComparer.Ordinal);
|
|
_orchestrator = orchestrator;
|
|
_fileDbManager = fileCacheManager;
|
|
_fileCompactor = fileCompactor;
|
|
_activeDownloadStreams = [];
|
|
|
|
Mediator.Subscribe<DownloadLimitChangedMessage>(this, (msg) =>
|
|
{
|
|
if (!_activeDownloadStreams.Any()) return;
|
|
var newLimit = _orchestrator.DownloadLimitPerSlot();
|
|
Logger.LogTrace("Setting new Download Speed Limit to {newLimit}", newLimit);
|
|
foreach (var stream in _activeDownloadStreams)
|
|
{
|
|
stream.BandwidthLimit = newLimit;
|
|
}
|
|
});
|
|
}
|
|
|
|
public List<DownloadFileTransfer> CurrentDownloads { get; private set; } = [];
|
|
|
|
public List<FileTransfer> ForbiddenTransfers => _orchestrator.ForbiddenTransfers;
|
|
|
|
public bool IsDownloading => !CurrentDownloads.Any();
|
|
|
|
public void ClearDownload()
|
|
{
|
|
CurrentDownloads.Clear();
|
|
_downloadStatus.Clear();
|
|
}
|
|
|
|
public async Task DownloadFiles(GameObjectHandler gameObject, List<FileReplacementData> fileReplacementDto, CancellationToken ct)
|
|
{
|
|
Mediator.Publish(new HaltScanMessage(nameof(DownloadFiles)));
|
|
try
|
|
{
|
|
await DownloadFilesInternal(gameObject, fileReplacementDto, ct).ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
ClearDownload();
|
|
}
|
|
finally
|
|
{
|
|
Mediator.Publish(new DownloadFinishedMessage(gameObject));
|
|
Mediator.Publish(new ResumeScanMessage(nameof(DownloadFiles)));
|
|
}
|
|
}
|
|
|
|
protected override void Dispose(bool disposing)
|
|
{
|
|
ClearDownload();
|
|
foreach (var stream in _activeDownloadStreams.ToList())
|
|
{
|
|
try
|
|
{
|
|
stream.Dispose();
|
|
}
|
|
catch
|
|
{
|
|
// do nothing
|
|
//
|
|
}
|
|
}
|
|
base.Dispose(disposing);
|
|
}
|
|
|
|
private static byte ConvertReadByte(int byteOrEof)
|
|
{
|
|
if (byteOrEof == -1)
|
|
{
|
|
throw new EndOfStreamException();
|
|
}
|
|
|
|
return (byte)byteOrEof;
|
|
}
|
|
|
|
private static (string fileHash, long fileLengthBytes) ReadBlockFileHeader(FileStream fileBlockStream)
|
|
{
|
|
List<char> hashName = [];
|
|
List<char> fileLength = [];
|
|
var separator = (char)ConvertReadByte(fileBlockStream.ReadByte());
|
|
if (separator != '#') throw new InvalidDataException("Data is invalid, first char is not #");
|
|
|
|
bool readHash = false;
|
|
while (true)
|
|
{
|
|
int readByte = fileBlockStream.ReadByte();
|
|
if (readByte == -1)
|
|
throw new EndOfStreamException();
|
|
|
|
var readChar = (char)ConvertReadByte(readByte);
|
|
if (readChar == ':')
|
|
{
|
|
readHash = true;
|
|
continue;
|
|
}
|
|
if (readChar == '#') break;
|
|
if (!readHash) hashName.Add(readChar);
|
|
else fileLength.Add(readChar);
|
|
}
|
|
if (fileLength.Count == 0)
|
|
fileLength.Add('0');
|
|
return (string.Join("", hashName), long.Parse(string.Join("", fileLength)));
|
|
}
|
|
|
|
private async Task DownloadAndMungeFileHttpClient(string downloadGroup, Guid requestId, List<DownloadFileTransfer> fileTransfer, string tempPath, IProgress<long> progress, CancellationToken ct)
|
|
{
|
|
Logger.LogDebug("GUID {requestId} on server {uri} for files {files}", requestId, fileTransfer[0].DownloadUri, string.Join(", ", fileTransfer.Select(c => c.Hash).ToList()));
|
|
|
|
await WaitForDownloadReady(fileTransfer, requestId, ct).ConfigureAwait(false);
|
|
|
|
_downloadStatus[downloadGroup].DownloadStatus = DownloadStatus.Downloading;
|
|
|
|
HttpResponseMessage response = null!;
|
|
var requestUrl = MareFiles.CacheGetFullPath(fileTransfer[0].DownloadUri, requestId);
|
|
|
|
Logger.LogDebug("Downloading {requestUrl} for request {id}", requestUrl, requestId);
|
|
try
|
|
{
|
|
response = await _orchestrator.SendRequestAsync(HttpMethod.Get, requestUrl, ct, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
|
|
response.EnsureSuccessStatusCode();
|
|
}
|
|
catch (HttpRequestException ex)
|
|
{
|
|
Logger.LogWarning(ex, "Error during download of {requestUrl}, HttpStatusCode: {code}", requestUrl, ex.StatusCode);
|
|
if (ex.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.Unauthorized)
|
|
{
|
|
throw new InvalidDataException($"Http error {ex.StatusCode} (cancelled: {ct.IsCancellationRequested}): {requestUrl}", ex);
|
|
}
|
|
}
|
|
|
|
ThrottledStream? stream = null;
|
|
try
|
|
{
|
|
var fileStream = File.Create(tempPath);
|
|
await using (fileStream.ConfigureAwait(false))
|
|
{
|
|
var bufferSize = response.Content.Headers.ContentLength > 1024 * 1024 ? 65536 : 8196;
|
|
var buffer = new byte[bufferSize];
|
|
|
|
var bytesRead = 0;
|
|
var limit = _orchestrator.DownloadLimitPerSlot();
|
|
Logger.LogTrace("Starting Download of {id} with a speed limit of {limit} to {tempPath}", requestId, limit, tempPath);
|
|
stream = new ThrottledStream(await response.Content.ReadAsStreamAsync(ct).ConfigureAwait(false), limit);
|
|
_activeDownloadStreams.Add(stream);
|
|
while ((bytesRead = await stream.ReadAsync(buffer, ct).ConfigureAwait(false)) > 0)
|
|
{
|
|
ct.ThrowIfCancellationRequested();
|
|
|
|
await fileStream.WriteAsync(buffer.AsMemory(0, bytesRead), ct).ConfigureAwait(false);
|
|
|
|
progress.Report(bytesRead);
|
|
}
|
|
|
|
Logger.LogDebug("{requestUrl} downloaded to {tempPath}", requestUrl, tempPath);
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception)
|
|
{
|
|
try
|
|
{
|
|
if (!tempPath.IsNullOrEmpty())
|
|
File.Delete(tempPath);
|
|
}
|
|
catch
|
|
{
|
|
// ignore if file deletion fails
|
|
}
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
if (stream != null)
|
|
{
|
|
_activeDownloadStreams.Remove(stream);
|
|
await stream.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|
|
|
|
public async Task<List<DownloadFileTransfer>> InitiateDownloadList(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
|
|
{
|
|
Logger.LogDebug("Download start: {id}", gameObjectHandler.Name);
|
|
|
|
List<DownloadFileDto> downloadFileInfoFromService =
|
|
[
|
|
.. await FilesGetSizes(fileReplacement.Select(f => f.Hash).Distinct(StringComparer.Ordinal).ToList(), ct).ConfigureAwait(false),
|
|
];
|
|
|
|
Logger.LogDebug("Files with size 0 or less: {files}", string.Join(", ", downloadFileInfoFromService.Where(f => f.Size <= 0).Select(f => f.Hash)));
|
|
|
|
foreach (var dto in downloadFileInfoFromService.Where(c => c.IsForbidden))
|
|
{
|
|
if (!_orchestrator.ForbiddenTransfers.Exists(f => string.Equals(f.Hash, dto.Hash, StringComparison.Ordinal)))
|
|
{
|
|
_orchestrator.ForbiddenTransfers.Add(new DownloadFileTransfer(dto));
|
|
}
|
|
}
|
|
|
|
CurrentDownloads = downloadFileInfoFromService.Distinct().Select(d => new DownloadFileTransfer(d))
|
|
.Where(d => d.CanBeTransferred).ToList();
|
|
|
|
return CurrentDownloads;
|
|
}
|
|
|
|
private async Task DownloadFilesInternal(GameObjectHandler gameObjectHandler, List<FileReplacementData> fileReplacement, CancellationToken ct)
|
|
{
|
|
var downloadGroups = CurrentDownloads.GroupBy(f => f.DownloadUri.Host + ":" + f.DownloadUri.Port, StringComparer.Ordinal);
|
|
|
|
foreach (var downloadGroup in downloadGroups)
|
|
{
|
|
_downloadStatus[downloadGroup.Key] = new FileDownloadStatus()
|
|
{
|
|
DownloadStatus = DownloadStatus.Initializing,
|
|
TotalBytes = downloadGroup.Sum(c => c.Total),
|
|
TotalFiles = 1,
|
|
TransferredBytes = 0,
|
|
TransferredFiles = 0
|
|
};
|
|
}
|
|
|
|
Mediator.Publish(new DownloadStartedMessage(gameObjectHandler, _downloadStatus));
|
|
|
|
await Parallel.ForEachAsync(downloadGroups, new ParallelOptions()
|
|
{
|
|
MaxDegreeOfParallelism = downloadGroups.Count(),
|
|
CancellationToken = ct,
|
|
},
|
|
async (fileGroup, token) =>
|
|
{
|
|
// let server predownload files
|
|
var requestIdResponse = await _orchestrator.SendRequestAsync(HttpMethod.Post, MareFiles.RequestEnqueueFullPath(fileGroup.First().DownloadUri),
|
|
fileGroup.Select(c => c.Hash), token).ConfigureAwait(false);
|
|
Logger.LogDebug("Sent request for {n} files on server {uri} with result {result}", fileGroup.Count(), fileGroup.First().DownloadUri,
|
|
await requestIdResponse.Content.ReadAsStringAsync(token).ConfigureAwait(false));
|
|
|
|
Guid requestId = Guid.Parse((await requestIdResponse.Content.ReadAsStringAsync().ConfigureAwait(false)).Trim('"'));
|
|
|
|
Logger.LogDebug("GUID {requestId} for {n} files on server {uri}", requestId, fileGroup.Count(), fileGroup.First().DownloadUri);
|
|
|
|
var blockFile = _fileDbManager.GetCacheFilePath(requestId.ToString("N"), "blk");
|
|
FileInfo fi = new(blockFile);
|
|
try
|
|
{
|
|
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForSlot;
|
|
await _orchestrator.WaitForDownloadSlotAsync(token).ConfigureAwait(false);
|
|
_downloadStatus[fileGroup.Key].DownloadStatus = DownloadStatus.WaitingForQueue;
|
|
Progress<long> progress = new((bytesDownloaded) =>
|
|
{
|
|
try
|
|
{
|
|
if (!_downloadStatus.TryGetValue(fileGroup.Key, out FileDownloadStatus? value)) return;
|
|
value.TransferredBytes += bytesDownloaded;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogWarning(ex, "Could not set download progress");
|
|
}
|
|
});
|
|
await DownloadAndMungeFileHttpClient(fileGroup.Key, requestId, [.. fileGroup], blockFile, progress, token).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
Logger.LogDebug("{dlName}: Detected cancellation of download, partially extracting files for {id}", fi.Name, gameObjectHandler);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_orchestrator.ReleaseDownloadSlot();
|
|
File.Delete(blockFile);
|
|
Logger.LogError(ex, "{dlName}: Error during download of {id}", fi.Name, requestId);
|
|
ClearDownload();
|
|
return;
|
|
}
|
|
|
|
FileStream? fileBlockStream = null;
|
|
var threadCount = Math.Clamp((int)(Environment.ProcessorCount / 2.0f), 2, 8);
|
|
var tasks = new List<Task>();
|
|
try
|
|
{
|
|
if (_downloadStatus.TryGetValue(fileGroup.Key, out var status))
|
|
{
|
|
status.TransferredFiles = 1;
|
|
status.DownloadStatus = DownloadStatus.Decompressing;
|
|
}
|
|
fileBlockStream = File.OpenRead(blockFile);
|
|
while (fileBlockStream.Position < fileBlockStream.Length)
|
|
{
|
|
(string fileHash, long fileLengthBytes) = ReadBlockFileHeader(fileBlockStream);
|
|
var chunkPosition = fileBlockStream.Position;
|
|
fileBlockStream.Position += fileLengthBytes;
|
|
|
|
while (tasks.Count > threadCount && tasks.Where(t => !t.IsCompleted).Count() > 4)
|
|
await Task.Delay(10, CancellationToken.None).ConfigureAwait(false);
|
|
|
|
var fileExtension = fileReplacement.First(f => string.Equals(f.Hash, fileHash, StringComparison.OrdinalIgnoreCase)).GamePaths[0].Split(".")[^1];
|
|
var tmpPath = _fileDbManager.GetCacheFilePath(Guid.NewGuid().ToString(), "tmp");
|
|
var filePath = _fileDbManager.GetCacheFilePath(fileHash, fileExtension);
|
|
|
|
Logger.LogDebug("{dlName}: Decompressing {file}:{le} => {dest}", fi.Name, fileHash, fileLengthBytes, filePath);
|
|
|
|
tasks.Add(Task.Run(() => {
|
|
try
|
|
{
|
|
using var tmpFileStream = new HashingStream(new FileStream(tmpPath, new FileStreamOptions()
|
|
{
|
|
Mode = FileMode.CreateNew,
|
|
Access = FileAccess.Write,
|
|
Share = FileShare.None
|
|
}), SHA1.Create());
|
|
|
|
using var fileChunkStream = new FileStream(blockFile, new FileStreamOptions()
|
|
{
|
|
BufferSize = 80000,
|
|
Mode = FileMode.Open,
|
|
Access = FileAccess.Read
|
|
});
|
|
fileChunkStream.Position = chunkPosition;
|
|
|
|
using var innerFileStream = new LimitedStream(fileChunkStream, fileLengthBytes);
|
|
using var decoder = LZ4Frame.Decode(innerFileStream);
|
|
long startPos = fileChunkStream.Position;
|
|
decoder.AsStream().CopyTo(tmpFileStream);
|
|
long readBytes = fileChunkStream.Position - startPos;
|
|
|
|
if (readBytes != fileLengthBytes)
|
|
{
|
|
throw new EndOfStreamException();
|
|
}
|
|
|
|
string calculatedHash = BitConverter.ToString(tmpFileStream.Finish()).Replace("-", "", StringComparison.Ordinal);
|
|
|
|
if (!calculatedHash.Equals(fileHash, StringComparison.Ordinal))
|
|
{
|
|
Logger.LogError("Hash mismatch after extracting, got {hash}, expected {expectedHash}, deleting file", calculatedHash, fileHash);
|
|
return;
|
|
}
|
|
|
|
tmpFileStream.Close();
|
|
_fileCompactor.RenameAndCompact(filePath, tmpPath);
|
|
PersistFileToStorage(fileHash, filePath, fileLengthBytes);
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
Logger.LogWarning("{dlName}: Failure to extract file {fileHash}, stream ended prematurely", fi.Name, fileHash);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
Logger.LogWarning(e, "{dlName}: Error during decompression of {hash}", fi.Name, fileHash);
|
|
|
|
foreach (var fr in fileReplacement)
|
|
Logger.LogWarning(" - {h}: {x}", fr.Hash, fr.GamePaths[0]);
|
|
}
|
|
finally
|
|
{
|
|
if (File.Exists(tmpPath))
|
|
File.Delete(tmpPath);
|
|
}
|
|
}, CancellationToken.None));
|
|
}
|
|
|
|
Task.WaitAll([..tasks], CancellationToken.None);
|
|
}
|
|
catch (EndOfStreamException)
|
|
{
|
|
Logger.LogDebug("{dlName}: Failure to extract file header data, stream ended", fi.Name);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogError(ex, "{dlName}: Error during block file read", fi.Name);
|
|
}
|
|
finally
|
|
{
|
|
Task.WaitAll([..tasks], CancellationToken.None);
|
|
_orchestrator.ReleaseDownloadSlot();
|
|
if (fileBlockStream != null)
|
|
await fileBlockStream.DisposeAsync().ConfigureAwait(false);
|
|
File.Delete(blockFile);
|
|
}
|
|
}).ConfigureAwait(false);
|
|
|
|
Logger.LogDebug("Download end: {id}", gameObjectHandler);
|
|
|
|
ClearDownload();
|
|
}
|
|
|
|
private async Task<List<DownloadFileDto>> FilesGetSizes(List<string> hashes, CancellationToken ct)
|
|
{
|
|
if (!_orchestrator.IsInitialized) throw new InvalidOperationException("FileTransferManager is not initialized");
|
|
var response = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.ServerFilesGetSizesFullPath(_orchestrator.FilesCdnUri!), hashes, ct).ConfigureAwait(false);
|
|
return await response.Content.ReadFromJsonAsync<List<DownloadFileDto>>(cancellationToken: ct).ConfigureAwait(false) ?? [];
|
|
}
|
|
|
|
private void PersistFileToStorage(string fileHash, string filePath, long? compressedSize = null)
|
|
{
|
|
try
|
|
{
|
|
var entry = _fileDbManager.CreateCacheEntry(filePath, fileHash);
|
|
if (entry != null && !string.Equals(entry.Hash, fileHash, StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
_fileDbManager.RemoveHashedFile(entry.Hash, entry.PrefixedFilePath);
|
|
entry = null;
|
|
}
|
|
if (entry != null)
|
|
entry.CompressedSize = compressedSize;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Logger.LogWarning(ex, "Error creating cache entry");
|
|
}
|
|
}
|
|
|
|
private async Task WaitForDownloadReady(List<DownloadFileTransfer> downloadFileTransfer, Guid requestId, CancellationToken downloadCt)
|
|
{
|
|
bool alreadyCancelled = false;
|
|
try
|
|
{
|
|
CancellationTokenSource localTimeoutCts = new();
|
|
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
|
|
CancellationTokenSource composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
|
|
|
|
while (!_orchestrator.IsDownloadReady(requestId))
|
|
{
|
|
try
|
|
{
|
|
await Task.Delay(250, composite.Token).ConfigureAwait(false);
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
if (downloadCt.IsCancellationRequested) throw;
|
|
|
|
var req = await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCheckQueueFullPath(downloadFileTransfer[0].DownloadUri, requestId),
|
|
downloadFileTransfer.Select(c => c.Hash).ToList(), downloadCt).ConfigureAwait(false);
|
|
req.EnsureSuccessStatusCode();
|
|
localTimeoutCts.Dispose();
|
|
composite.Dispose();
|
|
localTimeoutCts = new();
|
|
localTimeoutCts.CancelAfter(TimeSpan.FromSeconds(5));
|
|
composite = CancellationTokenSource.CreateLinkedTokenSource(downloadCt, localTimeoutCts.Token);
|
|
}
|
|
}
|
|
|
|
localTimeoutCts.Dispose();
|
|
composite.Dispose();
|
|
|
|
Logger.LogDebug("Download {requestId} ready", requestId);
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
try
|
|
{
|
|
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
|
|
alreadyCancelled = true;
|
|
}
|
|
catch
|
|
{
|
|
// ignore whatever happens here
|
|
}
|
|
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
if (downloadCt.IsCancellationRequested && !alreadyCancelled)
|
|
{
|
|
try
|
|
{
|
|
await _orchestrator.SendRequestAsync(HttpMethod.Get, MareFiles.RequestCancelFullPath(downloadFileTransfer[0].DownloadUri, requestId)).ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
// ignore whatever happens here
|
|
}
|
|
}
|
|
_orchestrator.ClearDownloadRequest(requestId);
|
|
}
|
|
}
|
|
} |