Merge branch 'feature/JobSystem' into develop
This commit is contained in:
1
.gitattributes
vendored
Normal file
1
.gitattributes
vendored
Normal file
@@ -0,0 +1 @@
|
||||
.jpg filter=lfs diff=lfs merge=lfs -text
|
||||
10
.idea/.idea.MilkyShots/.idea/swagger-settings.xml
generated
Normal file
10
.idea/.idea.MilkyShots/.idea/swagger-settings.xml
generated
Normal file
@@ -0,0 +1,10 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="SwaggerSettings">
|
||||
<remoteSpecificationUrls>
|
||||
<UrlInfo>
|
||||
<option name="urlString" value="http://192.168.51.101:5162/swagger/v1/swagger.json" />
|
||||
</UrlInfo>
|
||||
</remoteSpecificationUrls>
|
||||
</component>
|
||||
</project>
|
||||
@@ -1,5 +1,5 @@
|
||||
<component name="ProjectRunConfigurationManager">
|
||||
<configuration default="false" name="Compose Deployment" type="docker-deploy" factoryName="docker-compose.yml" server-name="Docker-PM">
|
||||
<configuration default="false" name="Compose Deployment (PM)" type="docker-deploy" factoryName="docker-compose.yml" server-name="Docker-PM">
|
||||
<deployment type="docker-compose.yml">
|
||||
<settings>
|
||||
<option name="containerName" value="" />
|
||||
10
Lactose/Jobs/EJobStatus.cs
Normal file
10
Lactose/Jobs/EJobStatus.cs
Normal file
@@ -0,0 +1,10 @@
|
||||
namespace Lactose.Jobs;
|
||||
|
||||
public enum EJobStatus {
|
||||
Queued, // Not yet started
|
||||
Running, // Actively executing
|
||||
Waiting, // Waiting for sub jobs or paused
|
||||
Completed, // Successfully finished
|
||||
Failed, // Finished with an error
|
||||
Canceled // Canceled by user or system
|
||||
}
|
||||
160
Lactose/Jobs/FileSystemCrawlJob.cs
Normal file
160
Lactose/Jobs/FileSystemCrawlJob.cs
Normal file
@@ -0,0 +1,160 @@
|
||||
using Butter;
|
||||
using Butter.Types;
|
||||
using Lactose.Models;
|
||||
using Lactose.Repositories;
|
||||
using System.Data;
|
||||
using static System.String;
|
||||
|
||||
namespace Lactose.Jobs;
|
||||
|
||||
public class FileSystemCrawlJob : Job {
|
||||
public override string Name { get; } = "Filesystem Crawl Job";
|
||||
public override JobStatus Status { get; }
|
||||
IAssetRepository assetRepository;
|
||||
string workingPath;
|
||||
ILogger<FileSystemCrawlJob> logger;
|
||||
JobManager jobManager;
|
||||
List<Job> childJobs = [];
|
||||
|
||||
public FileSystemCrawlJob(
|
||||
string workingPath,
|
||||
ILogger<FileSystemCrawlJob> logger,
|
||||
IAssetRepository assetRepository,
|
||||
JobManager jobManager
|
||||
) {
|
||||
Status = new(this);
|
||||
this.logger = logger;
|
||||
this.assetRepository = assetRepository;
|
||||
this.workingPath = workingPath;
|
||||
this.jobManager = jobManager;
|
||||
}
|
||||
|
||||
///<inheritdoc />
|
||||
protected override void TaskJob(CancellationToken token) {
|
||||
logger.LogInformation($"Started crawling directory {workingPath}");
|
||||
|
||||
DirectoryInfo dir = new(workingPath);
|
||||
|
||||
if (!dir.Exists) {
|
||||
logger.LogWarning("Directory {Dir} does not exist. Skipping crawl.", workingPath);
|
||||
return;
|
||||
}
|
||||
|
||||
int steps,
|
||||
totalSteps;
|
||||
|
||||
var files = dir.GetFiles();
|
||||
logger.LogDebug($"Found {files.Length} files.");
|
||||
|
||||
var folders = dir.GetDirectories();
|
||||
logger.LogDebug($"Found {folders.Length} folders.");
|
||||
|
||||
totalSteps = files.Length + folders.Length;
|
||||
steps = 0;
|
||||
|
||||
foreach (var folder in folders) {
|
||||
if (token.IsCancellationRequested) {
|
||||
Status.Cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
steps++;
|
||||
var job = jobManager.CreateJob<FileSystemCrawlJob>(folder.FullName);
|
||||
// When the job is done, remove it from the child jobs list
|
||||
job.Done += (o, _) => childJobs.Remove((o as Job)!);
|
||||
childJobs.Add(job);
|
||||
jobManager.EnqueueJob(job);
|
||||
logger.LogDebug($"Enqueued crawl job for folder {folder.FullName}");
|
||||
Status.UpdateProgress(Math.Clamp(steps/(float)totalSteps, 0, 1), $"Processing folder {folder.FullName}");
|
||||
}
|
||||
|
||||
foreach (var file in files) {
|
||||
if (token.IsCancellationRequested) {
|
||||
Status.Cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
steps++;
|
||||
Status.UpdateProgress(Math.Clamp(steps/(float)totalSteps, 0, 1), $"Processing file {file.FullName}");
|
||||
|
||||
if (assetRepository.FindByPath(file.FullName) != null) continue;
|
||||
|
||||
var asset = AssetFromPath(file.FullName);
|
||||
if (asset == null) continue;
|
||||
|
||||
try {
|
||||
assetRepository.Insert(asset);
|
||||
assetRepository.Save();
|
||||
logger.LogInformation("Inserted asset {Asset} into the database.", asset.OriginalPath);
|
||||
} catch (DuplicateNameException) {
|
||||
logger.LogWarning("Asset {Asset} already exists in the database. Skipping.", asset.OriginalPath);
|
||||
} catch (Exception e) {
|
||||
logger.LogError(e, "Error inserting asset {Asset} into the database. {e}", asset.OriginalPath, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all child jobs to complete
|
||||
Status.UpdateProgress(Math.Clamp(steps/(float)totalSteps, 0, 1), $"Waiting for {childJobs.Count} child jobs to complete.");
|
||||
Status.Status = EJobStatus.Waiting;
|
||||
|
||||
while (childJobs.Count > 0) {
|
||||
if (token.IsCancellationRequested) {
|
||||
// Cancel all child jobs
|
||||
childJobs.ForEach(j => j.Cancel());
|
||||
// Cancel this job as well
|
||||
Status.Cancel();
|
||||
return;
|
||||
}
|
||||
}
|
||||
Status.Complete("Crawl completed successfully.");
|
||||
}
|
||||
|
||||
Asset? AssetFromPath(string filePath) {
|
||||
logger.LogTrace("Loading asset from file path {filePath}", filePath);
|
||||
// Get the file info
|
||||
var finfo = new FileInfo(filePath);
|
||||
|
||||
// Determine the type of the file
|
||||
string ext = finfo.Extension.ToLower();
|
||||
EAssetType? type;
|
||||
|
||||
// ReSharper disable ArrangeObjectCreationWhenTypeNotEvident
|
||||
var mimeImg = MimeTypes.Image.FirstOrDefault(mime => mime.Extensions.Contains(ext), new(Empty, []));
|
||||
var mimeVid = MimeTypes.Video.FirstOrDefault(mime => mime.Extensions.Contains(ext), new(Empty, []));
|
||||
|
||||
if (mimeImg.MimeType != Empty) type = EAssetType.Image;
|
||||
else if (mimeVid.MimeType != Empty) type = EAssetType.Video;
|
||||
else return null;
|
||||
|
||||
// Create a new asset
|
||||
var asset = new Asset() {
|
||||
//TODO: folder ID is missing here
|
||||
OriginalPath = finfo.FullName,
|
||||
OriginalFilename = finfo.Name,
|
||||
Type = type.Value,
|
||||
IsPubliclyShared = false,
|
||||
CreatedAt = finfo.CreationTime,
|
||||
UpdatedAt = finfo.LastWriteTime,
|
||||
MimeType = type.Value switch {
|
||||
EAssetType.Image => mimeImg.MimeType,
|
||||
EAssetType.Video => mimeVid.MimeType,
|
||||
_ => throw new ArgumentOutOfRangeException()
|
||||
},
|
||||
FileSize = finfo.Length,
|
||||
Hash = []
|
||||
};
|
||||
|
||||
logger.LogDebug(
|
||||
$"""
|
||||
Asset created from path: {asset.OriginalPath}
|
||||
Data:
|
||||
- Filename: {asset.OriginalFilename}
|
||||
- Type: {asset.Type}
|
||||
- MimeType: {asset.MimeType}
|
||||
- CreatedAt: {asset.CreatedAt}
|
||||
- UpdatedAt: {asset.UpdatedAt}
|
||||
- FileSize: {asset.FileSize} bytes
|
||||
""");
|
||||
return asset;
|
||||
}
|
||||
}
|
||||
88
Lactose/Jobs/Job.cs
Normal file
88
Lactose/Jobs/Job.cs
Normal file
@@ -0,0 +1,88 @@
|
||||
namespace Lactose.Jobs;
|
||||
|
||||
/// <summary>
|
||||
/// Abstract class representing a job that can be executed asynchronously.
|
||||
/// Provides events for job lifecycle (Started, ProgressChanged, Completed, Canceled, Failed).
|
||||
/// Inherit from this class and implement the TaskJob method to define the job's functionality.
|
||||
/// </summary>
|
||||
public abstract class Job {
|
||||
public virtual Guid Id { get; } = Guid.NewGuid();
|
||||
public virtual string Name { get; } = "Unnamed Job";
|
||||
public virtual JobStatus Status { get; }
|
||||
Task? task;
|
||||
CancellationTokenSource? cts;
|
||||
|
||||
protected Job() => Status = new JobStatus(this);
|
||||
|
||||
/// <summary>
|
||||
/// Sets up the task and the cancellation token, then starts the task (triggering the Started event)
|
||||
/// Also sets up a continuation to handle completion, failure, or cancellation of the task.
|
||||
/// </summary>
|
||||
public virtual void Start() {
|
||||
Status.Start();
|
||||
cts = new CancellationTokenSource();
|
||||
task = Task.Run(() => TaskJob(cts.Token));
|
||||
|
||||
task.ContinueWith(t => {
|
||||
if (t.IsCanceled) {
|
||||
Status.Cancel("Job was canceled");
|
||||
Canceled?.Invoke(this, Status);
|
||||
} else if (t.IsFaulted) {
|
||||
Status.Fail(t.Exception?.Message ?? "Job failed with an unknown error");
|
||||
Failed?.Invoke(this, Status);
|
||||
} else {
|
||||
Status.Complete("Job completed successfully");
|
||||
Completed?.Invoke(this, Status);
|
||||
}
|
||||
Done?.Invoke(this, Status);
|
||||
}
|
||||
);
|
||||
|
||||
Started?.Invoke(this, EventArgs.Empty);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Override this method to implement the job's functionality. The provided CancellationToken can be used to check for cancellation requests.
|
||||
/// Invoke ProgressChanged event to report progress updates.
|
||||
/// </summary>
|
||||
/// <param name="token">Cancellation toke to retrieve when a task cancellation has been requested</param>
|
||||
protected abstract void TaskJob(CancellationToken token);
|
||||
|
||||
/// <summary>
|
||||
/// Cancels the job if it's running. Canceled event will be called by the callbacks on the task.
|
||||
/// </summary>
|
||||
public virtual void Cancel() => cts?.Cancel();
|
||||
|
||||
/// <summary>
|
||||
/// Raised when the job starts running.
|
||||
/// </summary>
|
||||
public virtual event EventHandler? Started;
|
||||
/// <summary>
|
||||
/// Raised when the job makes progress. The JobStatus parameter contains the current progress (0 to 1).
|
||||
/// </summary>
|
||||
public virtual event EventHandler<JobStatus>? ProgressChanged;
|
||||
/// <summary>
|
||||
/// Raised when the job transitions from running to waiting (e.g., waiting for resources or sub jobs).
|
||||
/// </summary>
|
||||
public virtual event EventHandler<JobStatus>? StartedWaiting;
|
||||
/// <summary>
|
||||
/// Raised when the job transitions from waiting to running.
|
||||
/// </summary>
|
||||
public virtual event EventHandler<JobStatus>? FinishedWaiting;
|
||||
/// <summary>
|
||||
/// Raised when the job completes successfully.
|
||||
/// </summary>
|
||||
public virtual event EventHandler<JobStatus>? Completed;
|
||||
/// <summary>
|
||||
/// Raised when the job is canceled by the user or system.
|
||||
/// </summary>
|
||||
public virtual event EventHandler<JobStatus>? Canceled;
|
||||
/// <summary>
|
||||
/// Raised when the job fails due to an unhandled exception.
|
||||
/// </summary>
|
||||
public virtual event EventHandler<JobStatus>? Failed;
|
||||
/// <summary>
|
||||
/// Raised when the job is done, regardless of the final status (Completed, Canceled, or Failed).
|
||||
/// </summary>
|
||||
public virtual event EventHandler<JobStatus>? Done;
|
||||
}
|
||||
118
Lactose/Jobs/JobManager.cs
Normal file
118
Lactose/Jobs/JobManager.cs
Normal file
@@ -0,0 +1,118 @@
|
||||
using Lactose.Utils;
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace Lactose.Jobs;
|
||||
|
||||
public class JobManager(IServiceProvider serviceProvider, ILogger<JobManager> logger) : BackgroundService {
|
||||
readonly ConcurrentDictionary<Guid, Job> jobs = new();
|
||||
|
||||
/// <summary>
|
||||
/// Gets all jobs currently managed.
|
||||
/// </summary>
|
||||
public IEnumerable<Job> Jobs => jobs.Values;
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the maximum number of jobs that can run concurrently.
|
||||
/// </summary>
|
||||
public int MaxConcurrentJobs { get; set; } = 8;
|
||||
|
||||
/// <summary>
|
||||
/// Adds a job to the manager.
|
||||
/// </summary>
|
||||
/// <param name="job">The job to enqueue.</param>
|
||||
/// <exception cref="InvalidOperationException"></exception>
|
||||
public void EnqueueJob(Job job) {
|
||||
var added = jobs.TryAdd(job.Id, job);
|
||||
if (!added) throw new InvalidOperationException($"Job with ID {job.Id} can't be added");
|
||||
logger.LogInformation($"Job with ID {job.Id} has been added to the job manager.");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tries to get a job by its ID.
|
||||
/// </summary>
|
||||
/// <param name="id">ID of the job to retrieve.</param>
|
||||
/// <returns> Returns null if the job can't be found. Otherwise a <see cref="Job"/></returns>
|
||||
public Job? GetJob(Guid id) {
|
||||
jobs.TryGetValue(id, out var job);
|
||||
return job;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tries to remove a job from the manager. If the job is running, it will be canceled first and removed once the cancellation is complete.
|
||||
/// </summary>
|
||||
/// <param name="id">ID of the job that has to be removed</param>
|
||||
/// <exception cref="InvalidOperationException">Thrown when a job with the provided ID can't be found.</exception>
|
||||
public void RemoveJob(Guid id) {
|
||||
if (!jobs.ContainsKey(id)) throw new InvalidOperationException($"Job with ID {id} doesn't exist");
|
||||
|
||||
switch (jobs[id].Status.Status) {
|
||||
case EJobStatus.Running:
|
||||
case EJobStatus.Waiting:
|
||||
// Schedule job removal from the dictionary once it's canceled
|
||||
jobs[id].Canceled += (sender, status) => jobs.Remove(status.Id, out _);
|
||||
//request cancellation
|
||||
jobs[id].Cancel();
|
||||
logger.LogInformation($"Job with ID {id} has been canceled and will be removed from the job manager once the cancellation is complete.");
|
||||
break;
|
||||
// for any other cases it's safe to remove the job straight away
|
||||
case EJobStatus.Queued:
|
||||
case EJobStatus.Completed:
|
||||
case EJobStatus.Canceled:
|
||||
case EJobStatus.Failed:
|
||||
logger.LogInformation($"Job with ID {id} has been removed from the job manager.");
|
||||
jobs.Remove(id, out _);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Injects required dependencies and creates a new instance of the specified job type.
|
||||
/// </summary>
|
||||
public Job CreateJob<T>() where T : Job {
|
||||
var job = ActivatorUtilities.CreateInstance<T>(serviceProvider);
|
||||
logger.LogInformation($"Job with ID {job.Id} has been created.");
|
||||
return job;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Injects required dependencies and creates a new instance of the specified job type, passing the provided arguments to its constructor.
|
||||
/// </summary>
|
||||
public Job CreateJob<T>(params object[] args) where T : Job {
|
||||
var scope = serviceProvider.CreateScope();
|
||||
var job = ActivatorUtilities.CreateInstance<T>(scope.ServiceProvider, args);
|
||||
logger.LogInformation($"Job with ID {job.Id} has been created.");
|
||||
return job;
|
||||
}
|
||||
|
||||
///<inheritdoc />
|
||||
protected override Task ExecuteAsync(CancellationToken stoppingToken) {
|
||||
logger.LogInformation("Preparing JobManager...");
|
||||
// Obtain the application lifetime service to hook into application start events
|
||||
var lifetime = serviceProvider.GetRequiredService<IHostApplicationLifetime>();
|
||||
|
||||
// Create a new task that will manage job execution
|
||||
var service = new Task(() => {
|
||||
while (!stoppingToken.IsCancellationRequested) {
|
||||
// Skip adding more jobs if we reached the max concurrent jobs limit
|
||||
int activeRunningJobs = jobs.Count(job => job.Value.Status.Status == EJobStatus.Running);
|
||||
if (activeRunningJobs >= MaxConcurrentJobs) continue;
|
||||
|
||||
//Take jobs from the jobs list that are queued and start them until we reach the max concurrent jobs limit
|
||||
jobs.Where(job => job.Value.Status.Status == EJobStatus.Queued)
|
||||
.Select(job => job.Value)
|
||||
.Take(MaxConcurrentJobs - activeRunningJobs)
|
||||
.ForEach(job => {
|
||||
job.Done += (sender, status) => jobs.Remove(status.Id, out _);
|
||||
job.Start();
|
||||
logger.LogInformation($"Job {job.Id} : {job.Name} has been started.");
|
||||
}
|
||||
);
|
||||
}
|
||||
},
|
||||
stoppingToken
|
||||
);
|
||||
// Defer the job management task to when application starts
|
||||
lifetime.ApplicationStarted.Register(() => service.Start());
|
||||
return service;
|
||||
}
|
||||
}
|
||||
48
Lactose/Jobs/JobStatus.cs
Normal file
48
Lactose/Jobs/JobStatus.cs
Normal file
@@ -0,0 +1,48 @@
|
||||
namespace Lactose.Jobs;
|
||||
|
||||
public class JobStatus(Job job) {
|
||||
public Guid Id => job.Id;
|
||||
public string Name => job.Name;
|
||||
public EJobStatus Status { get; set; } = EJobStatus.Queued;
|
||||
public DateTime Created { get; private set; } = DateTime.UtcNow;
|
||||
public DateTime? Started { get; private set; }
|
||||
public DateTime? Finished { get; private set; }
|
||||
public string? Message { get; private set; }
|
||||
public float Progress { get; private set; } = 0;
|
||||
|
||||
public override string ToString() => $"{Name} - {Status} - {Progress * 100}%";
|
||||
|
||||
public void Start(string message = "Job started") {
|
||||
Status = EJobStatus.Running;
|
||||
Started = DateTime.UtcNow;
|
||||
Progress = 0;
|
||||
Message = message;
|
||||
}
|
||||
|
||||
public void UpdateProgress(float progress, string? message = null) {
|
||||
if (progress < 0 || progress > 1)
|
||||
throw new ArgumentOutOfRangeException(nameof(progress), "Progress must be between 0 and 1");
|
||||
|
||||
Progress = progress;
|
||||
if (message != null) Message = message;
|
||||
}
|
||||
|
||||
public void Complete(string? message = null) {
|
||||
Status = EJobStatus.Completed;
|
||||
Finished = DateTime.UtcNow;
|
||||
Progress = 1;
|
||||
if (message != null) Message = message;
|
||||
}
|
||||
|
||||
public void Fail(string? message = null) {
|
||||
Status = EJobStatus.Failed;
|
||||
Finished = DateTime.UtcNow;
|
||||
if (message != null) Message = message;
|
||||
}
|
||||
|
||||
public void Cancel(string? message = null) {
|
||||
Status = EJobStatus.Canceled;
|
||||
Finished = DateTime.UtcNow;
|
||||
if (message != null) Message = message;
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
using Lactose.Authorization;
|
||||
using Lactose.Configuration;
|
||||
using Lactose.Context;
|
||||
using Lactose.Jobs;
|
||||
using Lactose.Models;
|
||||
using Lactose.Repositories;
|
||||
using Lactose.Services;
|
||||
@@ -74,7 +75,7 @@ builder.Services.AddTransient<IAlbumRepository, AlbumRepository>();
|
||||
builder.Services.AddTransient<IMediaRepository, MediaRepository>();
|
||||
builder.Services.AddTransient<ITokenService, TokenService>();
|
||||
builder.Services.AddTransient<LactoseAuthService>();
|
||||
builder.Services.AddSingleton<FileSystemScannerService>();
|
||||
builder.Services.AddSingleton<JobManager>();
|
||||
builder.Services.AddEndpointsApiExplorer();
|
||||
builder.Services.AddControllers();
|
||||
|
||||
@@ -150,9 +151,9 @@ builder.Services
|
||||
};
|
||||
});
|
||||
|
||||
//Add the background scan service
|
||||
builder.Services.AddHostedService<FileSystemScannerService>();
|
||||
|
||||
//Add the job manager service
|
||||
builder.Services.AddHostedService<JobManager>(p => p.GetRequiredService<JobManager>());
|
||||
builder.Services.AddHostedService<JobScheduler>();
|
||||
WebApplication app = builder.Build();
|
||||
|
||||
using (var scope = app.Services.CreateScope()) {
|
||||
|
||||
@@ -4,11 +4,16 @@ using Lactose.Services;
|
||||
|
||||
namespace Lactose.Repositories;
|
||||
|
||||
public class FolderRepository(LactoseDbContext context, FileSystemScannerService scanner) : IFolderRepository, IAsyncDisposable {
|
||||
public class FolderRepository(LactoseDbContext context) : IFolderRepository, IAsyncDisposable {
|
||||
public static event EventHandler<Folder>? FolderAdded;
|
||||
static void OnFolderAdded(Folder e) => FolderAdded?.Invoke(null, e);
|
||||
public static event EventHandler<Folder>? FolderRemoved;
|
||||
static void OnFolderRemoved(Folder e) => FolderRemoved?.Invoke(null, e);
|
||||
|
||||
public void Create(Folder folder) {
|
||||
context.Folders.Add(folder);
|
||||
context.SaveChanges();
|
||||
if (folder.Active) scanner.AddFolder(folder);
|
||||
if (folder.Active) OnFolderAdded(folder);
|
||||
}
|
||||
|
||||
public void Update(Guid id, Folder folder) {
|
||||
@@ -26,14 +31,12 @@ public class FolderRepository(LactoseDbContext context, FileSystemScannerService
|
||||
context.Folders.Update(origFolder);
|
||||
context.SaveChanges();
|
||||
|
||||
if (hasChanged && wasActive) {
|
||||
scanner.RemoveFolder(oldPath);
|
||||
}
|
||||
if (hasChanged && wasActive) OnFolderRemoved(oldPath);
|
||||
|
||||
if (hasChanged && folder.Active) {}
|
||||
|
||||
if (folder.Active) scanner.AddFolder(origFolder);
|
||||
else scanner.RemoveFolder(origFolder);
|
||||
if (folder.Active) OnFolderAdded(origFolder);
|
||||
else OnFolderRemoved(origFolder);
|
||||
}
|
||||
|
||||
public void Delete(Guid id) {
|
||||
@@ -41,7 +44,7 @@ public class FolderRepository(LactoseDbContext context, FileSystemScannerService
|
||||
|
||||
if (folder != null) {
|
||||
context.Folders.Remove(folder);
|
||||
scanner.RemoveFolder(folder);
|
||||
OnFolderRemoved(folder);
|
||||
}
|
||||
context.SaveChanges();
|
||||
}
|
||||
|
||||
@@ -58,9 +58,14 @@ public class SettingsRepository(LactoseDbContext context, IServiceProvider servi
|
||||
context.SaveChanges();
|
||||
}
|
||||
|
||||
public static event EventHandler<Setting>? SettingChanged;
|
||||
static void OnSettingChanged(Setting e) => SettingChanged?.Invoke(null, e);
|
||||
|
||||
|
||||
//TODO: May need to be change on a Event based system, because adding here all the Setting Actions could be too much ramification
|
||||
void SettingChange(Setting setting) {
|
||||
switch (setting.Name) {
|
||||
OnSettingChanged(setting);
|
||||
/*switch (setting.Name) {
|
||||
case var name when name == Settings.FolderScanEnabled.AsString():
|
||||
using (var scannerService = serviceProvider.GetService<FileSystemScannerService>()) {
|
||||
if (setting.Value == "false") scannerService?.Disable();
|
||||
@@ -72,7 +77,7 @@ public class SettingsRepository(LactoseDbContext context, IServiceProvider servi
|
||||
if (int.TryParse(setting.Value, out var interval)) scannerService?.SetTimerInterval(TimeSpan.FromMinutes(interval));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
public void Dispose() => context.Dispose();
|
||||
|
||||
@@ -1,241 +0,0 @@
|
||||
using Butter;
|
||||
using Butter.Types;
|
||||
using Butter.Dtos.Settings;
|
||||
using Butter.Settings;
|
||||
using Lactose.Context;
|
||||
using Lactose.Models;
|
||||
using Lactose.Repositories;
|
||||
using Lactose.Utils;
|
||||
using System.Data;
|
||||
using static System.String;
|
||||
|
||||
namespace Lactose.Services;
|
||||
|
||||
public class FileSystemScannerService(ILogger<FileSystemScannerService> logger, IServiceProvider serviceProvider)
|
||||
: BackgroundService {
|
||||
public bool IsScanning { get; private set; } = false;
|
||||
public bool IsEnabled { get; private set; } = false;
|
||||
public bool IsInitialized { get; private set; } = false;
|
||||
readonly PeriodicTimer scanTimer = new(TimeSpan.FromMinutes(1));
|
||||
readonly Dictionary<string, CancellationTokenSource> folders = new();
|
||||
readonly List<Task> tasks = [];
|
||||
|
||||
|
||||
protected override Task ExecuteAsync(CancellationToken stoppingToken) {
|
||||
var service = new Task(async void () => await TaskWatcher(stoppingToken));
|
||||
var lifetime = serviceProvider.GetRequiredService<IHostApplicationLifetime>();
|
||||
|
||||
lifetime.ApplicationStarted.Register(() => service.Start());
|
||||
return service;
|
||||
}
|
||||
|
||||
async Task TaskWatcher(CancellationToken stoppingToken) {
|
||||
// Resolve the repositories from the service provider
|
||||
var scope = serviceProvider.CreateScope();
|
||||
var settingsRepository = scope.ServiceProvider.GetRequiredService<ISettingsRepository>();
|
||||
IsEnabled = settingsRepository.Get(Settings.FolderScanEnabled.AsString())?.Value != "false";
|
||||
|
||||
stoppingToken.Register(() => {
|
||||
logger.LogInformation("Filesystem Scan Service stopping...");
|
||||
|
||||
folders.Values.Where(tokenSource => !tokenSource.IsCancellationRequested)
|
||||
.ForEach(tokenSource => {
|
||||
tokenSource.Cancel();
|
||||
tokenSource.Dispose();
|
||||
}
|
||||
);
|
||||
|
||||
folders.Clear();
|
||||
}
|
||||
);
|
||||
|
||||
var folderRepository = scope.ServiceProvider.GetRequiredService<IFolderRepository>();
|
||||
folderRepository.GetAllUntracked().Where(f => f.Active).Select(f => f.BasePath).ForEach(AddFolder);
|
||||
|
||||
logger.LogInformation("Filesystem Scan Service starting...");
|
||||
|
||||
var value = settingsRepository.Get(Settings.FolderScanInterval.AsString())?.Value;
|
||||
|
||||
if (value != null && !int.TryParse(value, out var interval)) {
|
||||
logger.LogWarning("Invalid scan interval setting. Defaulting to 30 minutes.");
|
||||
interval = 30; // Default to 30 minutes if parsing fails
|
||||
} else interval = int.Parse(value!);
|
||||
|
||||
SetTimerInterval(TimeSpan.FromMinutes(interval));
|
||||
|
||||
IsInitialized = true;
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested) {
|
||||
|
||||
// Wait for all tasks to complete before starting a new scan
|
||||
if (IsScanning) {
|
||||
logger.LogInformation("Waiting for {count} tasks to complete...", tasks.Count);
|
||||
WaitTasks(stoppingToken);
|
||||
}
|
||||
|
||||
if (IsEnabled && folders.Count != 0 && !IsScanning) {
|
||||
logger.LogInformation("Found {count} folders to scan.", folders.Count);
|
||||
logger.LogInformation("Starting Tasks for scanning folders...");
|
||||
|
||||
// Launch a folder scan for each folder, passing the stopping token to each task
|
||||
folders.Where(entry => !entry.Value.IsCancellationRequested)
|
||||
.Select(entry => entry.Key)
|
||||
.ToList()
|
||||
.ForEach(path => { folders[path] = new CancellationTokenSource(); });
|
||||
|
||||
folders.ForEach((entry)=>
|
||||
tasks.Add(Task.Run(() => ScanFolder(entry.Key, entry.Value.Token), entry.Value.Token))
|
||||
);
|
||||
|
||||
IsScanning = true;
|
||||
await scanTimer.WaitForNextTickAsync(stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void WaitTasks(CancellationToken stoppingToken) {
|
||||
while (IsScanning) {
|
||||
try {
|
||||
Task.WaitAll(tasks.ToArray(), stoppingToken);
|
||||
tasks.Clear();
|
||||
IsScanning = false;
|
||||
} catch (OperationCanceledException e) {
|
||||
logger.LogInformation("Scanning Service was cancelled.");
|
||||
IsScanning = false;
|
||||
IsEnabled = false;
|
||||
} catch (AggregateException e) {
|
||||
if (!e.InnerExceptions.All(ie => ie is OperationCanceledException)) throw;
|
||||
|
||||
logger.LogDebug("A Folder Has been removed. Continuing to scan.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ScanFolder(string path, CancellationToken stoppingToken) {
|
||||
|
||||
if (stoppingToken.IsCancellationRequested) { stoppingToken.ThrowIfCancellationRequested(); };
|
||||
|
||||
List<string> filePaths = [];
|
||||
List<string> folderPaths = [];
|
||||
|
||||
// Get all files and folders in the folder
|
||||
try {
|
||||
filePaths.AddRange(Directory.GetFiles(path));
|
||||
folderPaths.AddRange(Directory.GetDirectories(path));
|
||||
} catch (Exception e) { logger.LogError(e, "Error scanning folder {folder}", path); }
|
||||
|
||||
// Launch a folder scan for each subfolder
|
||||
folderPaths.ForEach(folderPath => {
|
||||
logger.LogDebug("Adding subfolder {folder} to scan queue.", folderPath);
|
||||
tasks.Add(Task.Run(() => ScanFolder(folderPath, stoppingToken), stoppingToken));
|
||||
});
|
||||
var scope = serviceProvider.CreateScope();
|
||||
var assetRepository = scope.ServiceProvider.GetRequiredService<IAssetRepository>();
|
||||
|
||||
// Process all files
|
||||
filePaths.ForEach(filePath => {
|
||||
// Check if the file is already in the database
|
||||
if (assetRepository.FindByPath(filePath) != null) return;
|
||||
|
||||
var asset = AssetFromPath(filePath);
|
||||
|
||||
if (asset == null) return;
|
||||
// Add the asset to the database
|
||||
try {
|
||||
assetRepository.Insert(asset);
|
||||
assetRepository.Save();
|
||||
} catch (DuplicateNameException e) {
|
||||
logger.LogError(e, $"Duplicate asset name \"{asset.OriginalFilename}\", skipped.");
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Asset? AssetFromPath(string filePath) {
|
||||
logger.LogTrace("Loading asset from file path {filePath}", filePath);
|
||||
// Get the file info
|
||||
var finfo = new FileInfo(filePath);
|
||||
|
||||
// Determine the type of the file
|
||||
string ext = finfo.Extension.ToLower();
|
||||
EAssetType? type;
|
||||
|
||||
// ReSharper disable ArrangeObjectCreationWhenTypeNotEvident
|
||||
var mimeImg = MimeTypes.Image.FirstOrDefault(mime => mime.Extensions.Contains(ext), new(Empty, []));
|
||||
var mimeVid = MimeTypes.Video.FirstOrDefault(mime => mime.Extensions.Contains(ext), new(Empty, []));
|
||||
|
||||
if (mimeImg.MimeType != Empty) type = EAssetType.Image;
|
||||
else if (mimeVid.MimeType != Empty) type = EAssetType.Video;
|
||||
else return null;
|
||||
|
||||
// Create a new asset
|
||||
var asset = new Asset() {
|
||||
//TODO: folder ID is missing here
|
||||
OriginalPath = finfo.FullName,
|
||||
OriginalFilename = finfo.Name,
|
||||
Type = type.Value,
|
||||
IsPubliclyShared = false,
|
||||
CreatedAt = finfo.CreationTime,
|
||||
UpdatedAt = finfo.LastWriteTime,
|
||||
MimeType = type.Value switch {
|
||||
EAssetType.Image => mimeImg.MimeType,
|
||||
EAssetType.Video => mimeVid.MimeType,
|
||||
_ => throw new ArgumentOutOfRangeException()
|
||||
},
|
||||
FileSize = finfo.Length,
|
||||
Hash = []
|
||||
};
|
||||
|
||||
logger.LogDebug(
|
||||
$"""
|
||||
Asset created from path: {asset.OriginalPath}
|
||||
Data:
|
||||
- Filename: {asset.OriginalFilename}
|
||||
- Type: {asset.Type}
|
||||
- MimeType: {asset.MimeType}
|
||||
- CreatedAt: {asset.CreatedAt}
|
||||
- UpdatedAt: {asset.UpdatedAt}
|
||||
- FileSize: {asset.FileSize} bytes
|
||||
""");
|
||||
return asset;
|
||||
}
|
||||
|
||||
public void SetTimerInterval(TimeSpan interval) {
|
||||
scanTimer.Period = interval;
|
||||
logger.LogInformation($"Service will scan every {scanTimer.Period.TotalMinutes} minutes.");
|
||||
}
|
||||
|
||||
public void AddFolder(string folder) {
|
||||
if (folders.ContainsKey(folder)) {
|
||||
logger.LogWarning("Folder {folder} already exists", folder);
|
||||
return;
|
||||
}
|
||||
|
||||
folders.Add(folder, new CancellationTokenSource());
|
||||
logger.LogInformation("Added folder {folder}", folder);
|
||||
|
||||
}
|
||||
|
||||
public void AddFolder(Folder folder) => AddFolder(folder.BasePath);
|
||||
|
||||
public void RemoveFolder(string folder) {
|
||||
if (!folders.TryGetValue(folder, out CancellationTokenSource? cancellationToken)) {
|
||||
logger.LogWarning("Folder {folder} does not exist", folder);
|
||||
return;
|
||||
}
|
||||
|
||||
cancellationToken.Cancel();
|
||||
cancellationToken.Dispose();
|
||||
folders.Remove(folder);
|
||||
logger.LogInformation("Removed folder {folder}", folder);
|
||||
}
|
||||
|
||||
public void RemoveFolder(Folder folder) => RemoveFolder(folder.BasePath);
|
||||
|
||||
public void Enable() {
|
||||
IsEnabled = true;
|
||||
logger.LogInformation("Service is now enabled.");
|
||||
}
|
||||
|
||||
public void Disable() => IsEnabled = false;
|
||||
}
|
||||
92
Lactose/Services/JobScheduler.cs
Normal file
92
Lactose/Services/JobScheduler.cs
Normal file
@@ -0,0 +1,92 @@
|
||||
using Butter.Settings;
|
||||
using Lactose.Jobs;
|
||||
using Lactose.Models;
|
||||
using Lactose.Repositories;
|
||||
|
||||
namespace Lactose.Services;
|
||||
|
||||
public class JobScheduler(IServiceProvider serviceProvider, ILogger<JobScheduler> logger) : BackgroundService {
|
||||
bool folderScanEnabled = false;
|
||||
Timer? fileSystemCrawl;
|
||||
List<Folder>? folders;
|
||||
JobManager? jobManager;
|
||||
|
||||
protected override Task ExecuteAsync(CancellationToken stoppingToken) {
|
||||
var lifetime = serviceProvider.GetRequiredService<IHostApplicationLifetime>();
|
||||
|
||||
var service = new Task(() => {
|
||||
logger?.LogInformation("Preparing JobScheduler...");
|
||||
jobManager = serviceProvider.GetRequiredService<JobManager>();
|
||||
logger?.LogInformation("JobScheduler started!");
|
||||
// fetch settings on startup
|
||||
FetchSettings();
|
||||
// listen for settings changes
|
||||
SettingsRepository.SettingChanged += OnSettingsChanged;
|
||||
// keep the task alive until the service is stopping
|
||||
stoppingToken.WaitHandle.WaitOne();
|
||||
}, stoppingToken
|
||||
);
|
||||
|
||||
lifetime.ApplicationStarted.Register(() => service.Start());
|
||||
return service;
|
||||
}
|
||||
|
||||
void QueueFileSystemCrawl(object? state) {
|
||||
// don't queue if there's already a crawl job in progress
|
||||
if(jobManager.Jobs.Any(j => j is FileSystemCrawlJob)) return;
|
||||
// queue a crawl job for each active folder
|
||||
folders?.ForEach(f => jobManager.EnqueueJob(jobManager.CreateJob<FileSystemCrawlJob>(f.BasePath)));
|
||||
logger?.LogInformation("Queued file system crawl jobs.");
|
||||
}
|
||||
|
||||
void FetchSettings() {
|
||||
logger?.LogInformation("Fetching settings...");
|
||||
var scope = serviceProvider.CreateScope();
|
||||
var settingsRepository = scope.ServiceProvider.GetRequiredService<ISettingsRepository>();
|
||||
var prevFolderScanEnabled = folderScanEnabled;
|
||||
// check if folder scanning is enabled
|
||||
folderScanEnabled = bool.Parse(settingsRepository.Get(Settings.FolderScanEnabled.AsString())?.Value ?? "false");
|
||||
|
||||
if (folderScanEnabled && !prevFolderScanEnabled) {
|
||||
FolderRepository.FolderAdded += OnFolderAdded;
|
||||
FolderRepository.FolderRemoved += OnFolderRemoved;
|
||||
// defaults to 30 minutes
|
||||
int.TryParse(settingsRepository.Get(Settings.FolderScanInterval.AsString())?.Value ?? "30", out var interval);
|
||||
fileSystemCrawl = new Timer(QueueFileSystemCrawl, null, TimeSpan.Zero, TimeSpan.FromMinutes(interval));
|
||||
logger?.LogInformation($"Periodic folder scanning is now enabled every {interval} minutes.");
|
||||
|
||||
var folderRepository = scope.ServiceProvider.GetRequiredService<IFolderRepository>();
|
||||
folders = folderRepository.GetAll().Where(f => f.Active).ToList();
|
||||
} else if(!folderScanEnabled && prevFolderScanEnabled) {
|
||||
FolderRepository.FolderAdded -= OnFolderAdded;
|
||||
FolderRepository.FolderRemoved -= OnFolderRemoved;
|
||||
fileSystemCrawl?.Dispose();
|
||||
folders?.Clear();
|
||||
logger?.LogInformation("Periodic folder scanning is now disabled.");
|
||||
} else if (folderScanEnabled && prevFolderScanEnabled) {
|
||||
// update interval
|
||||
int.TryParse(settingsRepository.Get(Settings.FolderScanInterval.AsString())?.Value ?? "30", out var interval);
|
||||
fileSystemCrawl?.Change(TimeSpan.Zero, TimeSpan.FromMinutes(interval));
|
||||
logger?.LogInformation($"Periodic folder scanning interval updated to {interval} minutes.");
|
||||
}
|
||||
}
|
||||
|
||||
void OnFolderAdded(object? sender, Folder e) {
|
||||
folders ??= new List<Folder>();
|
||||
if (folders.All(f => f.Id != e.Id)) folders.Add(e);
|
||||
logger?.LogInformation($"Folder added: {e.Id}");
|
||||
}
|
||||
|
||||
void OnFolderRemoved(object? sender, Folder e) {
|
||||
folders?.RemoveAll(f => f.Id == e.Id);
|
||||
logger?.LogInformation($"Folder removed: {e.Id}");
|
||||
}
|
||||
|
||||
void OnSettingsChanged(object? sender, Setting e) {
|
||||
switch (e) {
|
||||
case var s when s.Name == Settings.FolderScanEnabled.AsString() || s.Name == Settings.FolderScanInterval.AsString():
|
||||
FetchSettings();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,9 @@
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Debug",
|
||||
"Microsoft.AspNetCore": "Debug"
|
||||
"Microsoft.AspNetCore": "Information",
|
||||
"Microsoft.EntityFrameworkCore": "Warning",
|
||||
"Lactose.Services" : "Trace"
|
||||
}
|
||||
},
|
||||
"SignKey": {
|
||||
|
||||
@@ -2,8 +2,10 @@
|
||||
"Logging": {
|
||||
"LogLevel": {
|
||||
"Default": "Debug",
|
||||
"System": "Information",
|
||||
"Microsoft": "Information"
|
||||
"System": "Warning",
|
||||
"Microsoft.AspNetCore": "Warning",
|
||||
"Microsoft.EntityFrameworkCore": "Warning",
|
||||
"Lactose.Services" : "Trace"
|
||||
}
|
||||
},
|
||||
"DatabaseAddress": {
|
||||
|
||||
Reference in New Issue
Block a user