mirror of
https://github.com/rishikanthc/Scriberr.git
synced 2026-06-29 15:26:02 +00:00
The jobScanner was running every 10 seconds and re-enqueueing jobs that were already in the queue but hadn't started processing yet. This caused completed files to be re-transcribed when auto-transcribe was enabled. Changes: - Removed jobScanner goroutine (10-second polling loop) - Removed scanPendingJobs function - Added recoverPendingJobs that runs ONCE at startup to recover any pending jobs left from previous server runs - Jobs are now only enqueued when explicitly requested: - Upload with auto-transcribe enabled - Manual transcription start - Server restart recovery (one-time)
478 lines
14 KiB
Go
478 lines
14 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"runtime"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"scriberr/internal/database"
|
|
"scriberr/internal/models"
|
|
"scriberr/pkg/logger"
|
|
)
|
|
|
|
// RunningJob tracks both context cancellation and OS process
|
|
type RunningJob struct {
|
|
Cancel context.CancelFunc
|
|
Process *exec.Cmd
|
|
}
|
|
|
|
// TaskQueue manages transcription job processing
|
|
type TaskQueue struct {
|
|
minWorkers int
|
|
maxWorkers int
|
|
currentWorkers int64 // Use atomic for thread-safe access
|
|
jobChannel chan string
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
wg sync.WaitGroup
|
|
processor JobProcessor
|
|
runningJobs map[string]*RunningJob
|
|
jobsMutex sync.RWMutex
|
|
workerMutex sync.Mutex
|
|
autoScale bool
|
|
lastScaleTime time.Time
|
|
}
|
|
|
|
// JobProcessor defines the interface for processing jobs
|
|
type JobProcessor interface {
|
|
ProcessJob(ctx context.Context, jobID string) error
|
|
ProcessJobWithProcess(ctx context.Context, jobID string, registerProcess func(*exec.Cmd)) error
|
|
}
|
|
|
|
// MultiTrackJobProcessor extends JobProcessor with multi-track specific methods
|
|
type MultiTrackJobProcessor interface {
|
|
JobProcessor
|
|
TerminateMultiTrackJob(jobID string) error
|
|
IsMultiTrackJob(jobID string) bool
|
|
}
|
|
|
|
// getOptimalWorkerCount calculates optimal worker count based on system resources
|
|
func getOptimalWorkerCount() (min, max int) {
|
|
numCPU := runtime.NumCPU()
|
|
|
|
// Check for environment variable override
|
|
if workerStr := os.Getenv("QUEUE_WORKERS"); workerStr != "" {
|
|
if workers, err := strconv.Atoi(workerStr); err == nil && workers > 0 {
|
|
return workers, workers // Fixed worker count
|
|
}
|
|
}
|
|
|
|
// For transcription workloads, we typically want fewer workers than CPUs
|
|
// since each job is CPU and I/O intensive
|
|
if numCPU <= 2 {
|
|
return 1, 2
|
|
} else if numCPU <= 4 {
|
|
return 1, 3
|
|
} else if numCPU <= 8 {
|
|
return 2, 4
|
|
} else {
|
|
return 2, 6 // Cap at 6 for very high CPU systems
|
|
}
|
|
}
|
|
|
|
// NewTaskQueue creates a new task queue with auto-scaling capabilities
|
|
func NewTaskQueue(legacyWorkers int, processor JobProcessor) *TaskQueue {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
// Calculate optimal worker counts, fallback to legacy parameter
|
|
min, max := getOptimalWorkerCount()
|
|
if legacyWorkers > 0 {
|
|
min = legacyWorkers
|
|
max = legacyWorkers
|
|
}
|
|
|
|
// Check if auto-scaling should be enabled
|
|
autoScale := os.Getenv("QUEUE_AUTO_SCALE") != "false"
|
|
if min == max {
|
|
autoScale = false // Disable auto-scaling if min == max
|
|
}
|
|
|
|
return &TaskQueue{
|
|
minWorkers: min,
|
|
maxWorkers: max,
|
|
currentWorkers: int64(min),
|
|
jobChannel: make(chan string, 200), // Increased buffer for better throughput
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
processor: processor,
|
|
runningJobs: make(map[string]*RunningJob),
|
|
autoScale: autoScale,
|
|
lastScaleTime: time.Now(),
|
|
}
|
|
}
|
|
|
|
// Start starts the task queue workers
|
|
func (tq *TaskQueue) Start() {
|
|
workers := int(atomic.LoadInt64(&tq.currentWorkers))
|
|
logger.Debug("Starting task queue",
|
|
"workers", workers,
|
|
"min_workers", tq.minWorkers,
|
|
"max_workers", tq.maxWorkers,
|
|
"auto_scale", tq.autoScale)
|
|
|
|
// Reset any zombie jobs from previous runs synchronously before starting workers
|
|
tq.ResetZombieJobs()
|
|
|
|
// One-time recovery: enqueue any pending jobs left from previous server run
|
|
// This is NOT a polling mechanism - it only runs once at startup
|
|
tq.recoverPendingJobs()
|
|
|
|
// Start initial workers
|
|
for i := 0; i < workers; i++ {
|
|
tq.wg.Add(1)
|
|
go tq.worker(i)
|
|
}
|
|
|
|
// Start auto-scaling monitor if enabled
|
|
if tq.autoScale {
|
|
tq.wg.Add(1)
|
|
go tq.autoScaler()
|
|
}
|
|
}
|
|
|
|
// Stop stops the task queue
|
|
func (tq *TaskQueue) Stop() {
|
|
logger.Debug("Stopping task queue")
|
|
logger.Debug("Stopping task queue")
|
|
tq.cancel()
|
|
// Do not close jobChannel here as it causes panics in EnqueueJob
|
|
// The channel will be garbage collected when the queue is no longer referenced
|
|
tq.wg.Wait()
|
|
logger.Debug("Task queue stopped")
|
|
}
|
|
|
|
// EnqueueJob adds a job to the queue
|
|
func (tq *TaskQueue) EnqueueJob(jobID string) error {
|
|
// Check if queue is already shut down
|
|
select {
|
|
case <-tq.ctx.Done():
|
|
return fmt.Errorf("queue is shutting down")
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case tq.jobChannel <- jobID:
|
|
return nil
|
|
case <-tq.ctx.Done():
|
|
return fmt.Errorf("queue is shutting down")
|
|
default:
|
|
return fmt.Errorf("queue is full")
|
|
}
|
|
}
|
|
|
|
// worker processes jobs from the channel
|
|
func (tq *TaskQueue) worker(id int) {
|
|
defer tq.wg.Done()
|
|
|
|
logger.Debug("Worker started", "worker_id", id)
|
|
|
|
for {
|
|
select {
|
|
case jobID, ok := <-tq.jobChannel:
|
|
if !ok {
|
|
logger.Debug("Worker stopped", "worker_id", id)
|
|
return
|
|
}
|
|
|
|
logger.WorkerOperation(id, jobID, "start")
|
|
|
|
// Update job status to processing
|
|
if err := tq.updateJobStatus(jobID, models.StatusProcessing); err != nil {
|
|
logger.Error("Failed to update job status", "worker_id", id, "job_id", jobID, "error", err)
|
|
continue
|
|
}
|
|
|
|
// Create context for this job and track it
|
|
jobCtx, jobCancel := context.WithCancel(tq.ctx)
|
|
runningJob := &RunningJob{
|
|
Cancel: jobCancel,
|
|
Process: nil, // Will be set by registerProcess callback
|
|
}
|
|
|
|
tq.jobsMutex.Lock()
|
|
tq.runningJobs[jobID] = runningJob
|
|
tq.jobsMutex.Unlock()
|
|
|
|
// Register process callback
|
|
registerProcess := func(cmd *exec.Cmd) {
|
|
tq.jobsMutex.Lock()
|
|
if job, exists := tq.runningJobs[jobID]; exists {
|
|
job.Process = cmd
|
|
}
|
|
tq.jobsMutex.Unlock()
|
|
}
|
|
|
|
// Process the job with process registration
|
|
err := tq.processor.ProcessJobWithProcess(jobCtx, jobID, registerProcess)
|
|
|
|
// Remove job from running jobs
|
|
tq.jobsMutex.Lock()
|
|
delete(tq.runningJobs, jobID)
|
|
tq.jobsMutex.Unlock()
|
|
|
|
// Handle result
|
|
if err != nil {
|
|
if jobCtx.Err() == context.Canceled {
|
|
logger.Info("Job cancelled", "worker_id", id, "job_id", jobID)
|
|
tq.updateJobStatus(jobID, models.StatusFailed)
|
|
tq.updateJobError(jobID, "Job was cancelled by user")
|
|
} else {
|
|
logger.Error("Job processing failed", "worker_id", id, "job_id", jobID, "error", err)
|
|
tq.updateJobStatus(jobID, models.StatusFailed)
|
|
tq.updateJobError(jobID, err.Error())
|
|
}
|
|
} else {
|
|
logger.Debug("Job processed successfully", "worker_id", id, "job_id", jobID)
|
|
tq.updateJobStatus(jobID, models.StatusCompleted)
|
|
}
|
|
|
|
case <-tq.ctx.Done():
|
|
logger.Debug("Worker stopped", "worker_id", id, "reason", "context_cancelled")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// KillJob aggressively terminates a running job
|
|
func (tq *TaskQueue) KillJob(jobID string) error {
|
|
tq.jobsMutex.Lock()
|
|
defer tq.jobsMutex.Unlock()
|
|
|
|
runningJob, exists := tq.runningJobs[jobID]
|
|
if !exists {
|
|
// If job is not in memory but exists in DB as processing, it's a zombie
|
|
// We should still mark it as failed in DB
|
|
logger.Warn("Job not found in running jobs map, checking DB status", "job_id", jobID)
|
|
|
|
var job models.TranscriptionJob
|
|
if err := database.DB.Where("id = ?", jobID).First(&job).Error; err != nil {
|
|
return fmt.Errorf("job %s not found: %v", jobID, err)
|
|
}
|
|
|
|
if job.Status == models.StatusProcessing {
|
|
logger.Info("Found zombie job in DB, marking as failed", "job_id", jobID)
|
|
tq.updateJobStatus(jobID, models.StatusFailed)
|
|
tq.updateJobError(jobID, "Job was forcefully terminated by user (zombie process)")
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("job %s is not currently running", jobID)
|
|
}
|
|
|
|
logger.Info("Killing job", "job_id", jobID)
|
|
|
|
// Check if this is a multi-track job and handle accordingly
|
|
if mtProcessor, ok := tq.processor.(MultiTrackJobProcessor); ok && mtProcessor.IsMultiTrackJob(jobID) {
|
|
logger.Debug("Terminating multi-track job", "job_id", jobID)
|
|
|
|
// Terminate all individual track jobs
|
|
if err := mtProcessor.TerminateMultiTrackJob(jobID); err != nil {
|
|
logger.Error("Failed to terminate multi-track job", "job_id", jobID, "error", err)
|
|
}
|
|
}
|
|
|
|
// First, try to kill the OS process group (or process on non-Unix)
|
|
if runningJob.Process != nil && runningJob.Process.Process != nil {
|
|
logger.Debug("Terminating process tree", "pid", runningJob.Process.Process.Pid, "job_id", jobID)
|
|
if err := killProcessTree(runningJob.Process.Process); err != nil {
|
|
log.Printf("Failed to terminate process tree for job %s: %v, trying direct kill()", jobID, err)
|
|
_ = runningJob.Process.Process.Kill()
|
|
}
|
|
}
|
|
|
|
// Also cancel the context for cleanup
|
|
runningJob.Cancel()
|
|
|
|
// Immediately update job status without waiting for process to finish
|
|
go func() {
|
|
tq.updateJobStatus(jobID, models.StatusFailed)
|
|
tq.updateJobError(jobID, "Job was forcefully terminated by user")
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// IsJobRunning checks if a job is currently being processed
|
|
func (tq *TaskQueue) IsJobRunning(jobID string) bool {
|
|
tq.jobsMutex.RLock()
|
|
defer tq.jobsMutex.RUnlock()
|
|
|
|
_, exists := tq.runningJobs[jobID]
|
|
return exists
|
|
}
|
|
|
|
// updateJobStatus updates the status of a job
|
|
func (tq *TaskQueue) updateJobStatus(jobID string, status models.JobStatus) error {
|
|
return database.DB.Model(&models.TranscriptionJob{}).
|
|
Where("id = ?", jobID).
|
|
Update("status", status).Error
|
|
}
|
|
|
|
// updateJobError updates the error message of a job
|
|
func (tq *TaskQueue) updateJobError(jobID string, errorMsg string) error {
|
|
return database.DB.Model(&models.TranscriptionJob{}).
|
|
Where("id = ?", jobID).
|
|
Update("error_message", errorMsg).Error
|
|
}
|
|
|
|
// GetJobStatus gets the status of a job
|
|
func (tq *TaskQueue) GetJobStatus(jobID string) (*models.TranscriptionJob, error) {
|
|
var job models.TranscriptionJob
|
|
err := database.DB.Where("id = ?", jobID).First(&job).Error
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &job, nil
|
|
}
|
|
|
|
// autoScaler monitors queue load and adjusts worker count
|
|
func (tq *TaskQueue) autoScaler() {
|
|
defer tq.wg.Done()
|
|
|
|
ticker := time.NewTicker(30 * time.Second) // Check every 30 seconds
|
|
defer ticker.Stop()
|
|
|
|
log.Println("Auto-scaler started")
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
tq.checkAndScale()
|
|
case <-tq.ctx.Done():
|
|
log.Println("Auto-scaler stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkAndScale evaluates current load and adjusts worker count
|
|
func (tq *TaskQueue) checkAndScale() {
|
|
// Prevent too frequent scaling
|
|
if time.Since(tq.lastScaleTime) < 1*time.Minute {
|
|
return
|
|
}
|
|
|
|
queueSize := len(tq.jobChannel)
|
|
currentWorkers := int(atomic.LoadInt64(&tq.currentWorkers))
|
|
|
|
tq.jobsMutex.RLock()
|
|
runningJobsCount := len(tq.runningJobs)
|
|
tq.jobsMutex.RUnlock()
|
|
|
|
// Scale up if queue is building up and we have capacity
|
|
if queueSize > 10 && currentWorkers < tq.maxWorkers {
|
|
newWorkerCount := currentWorkers + 1
|
|
log.Printf("Scaling up workers: %d -> %d (queue size: %d)", currentWorkers, newWorkerCount, queueSize)
|
|
|
|
atomic.StoreInt64(&tq.currentWorkers, int64(newWorkerCount))
|
|
tq.wg.Add(1)
|
|
go tq.worker(newWorkerCount - 1)
|
|
tq.lastScaleTime = time.Now()
|
|
|
|
// Scale down if queue is empty and minimal jobs running
|
|
} else if queueSize == 0 && runningJobsCount <= 1 && currentWorkers > tq.minWorkers {
|
|
newWorkerCount := currentWorkers - 1
|
|
log.Printf("Scaling down workers: %d -> %d (queue size: %d, running: %d)",
|
|
currentWorkers, newWorkerCount, queueSize, runningJobsCount)
|
|
|
|
atomic.StoreInt64(&tq.currentWorkers, int64(newWorkerCount))
|
|
tq.lastScaleTime = time.Now()
|
|
|
|
// Note: We don't actively stop workers here. They will naturally exit
|
|
// when no more jobs are available and the queue empties.
|
|
}
|
|
}
|
|
|
|
// GetQueueStats returns queue statistics
|
|
func (tq *TaskQueue) GetQueueStats() map[string]interface{} {
|
|
var pendingCount, processingCount, completedCount, failedCount int64
|
|
|
|
database.DB.Model(&models.TranscriptionJob{}).Where("status = ?", models.StatusPending).Count(&pendingCount)
|
|
database.DB.Model(&models.TranscriptionJob{}).Where("status = ?", models.StatusProcessing).Count(&processingCount)
|
|
database.DB.Model(&models.TranscriptionJob{}).Where("status = ?", models.StatusCompleted).Count(&completedCount)
|
|
database.DB.Model(&models.TranscriptionJob{}).Where("status = ?", models.StatusFailed).Count(&failedCount)
|
|
|
|
tq.jobsMutex.RLock()
|
|
runningJobsCount := len(tq.runningJobs)
|
|
tq.jobsMutex.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"queue_size": len(tq.jobChannel),
|
|
"queue_capacity": cap(tq.jobChannel),
|
|
"current_workers": int(atomic.LoadInt64(&tq.currentWorkers)),
|
|
"min_workers": tq.minWorkers,
|
|
"max_workers": tq.maxWorkers,
|
|
"auto_scale": tq.autoScale,
|
|
"running_jobs": runningJobsCount,
|
|
"pending_jobs": pendingCount,
|
|
"processing_jobs": processingCount,
|
|
"completed_jobs": completedCount,
|
|
"failed_jobs": failedCount,
|
|
}
|
|
}
|
|
|
|
// ResetZombieJobs finds jobs stuck in processing state from previous runs and marks them as failed
|
|
func (tq *TaskQueue) ResetZombieJobs() {
|
|
var zombieJobs []models.TranscriptionJob
|
|
|
|
// Find all jobs with status "processing"
|
|
if err := database.DB.Where("status = ?", models.StatusProcessing).Find(&zombieJobs).Error; err != nil {
|
|
logger.Error("Failed to scan for zombie jobs", "error", err)
|
|
return
|
|
}
|
|
|
|
if len(zombieJobs) == 0 {
|
|
return
|
|
}
|
|
|
|
logger.Info("Found zombie jobs from previous run", "count", len(zombieJobs))
|
|
|
|
for _, job := range zombieJobs {
|
|
logger.Info("Resetting zombie job", "job_id", job.ID)
|
|
|
|
// Mark as failed
|
|
if err := tq.updateJobStatus(job.ID, models.StatusFailed); err != nil {
|
|
logger.Error("Failed to update zombie job status", "job_id", job.ID, "error", err)
|
|
continue
|
|
}
|
|
|
|
// Update error message
|
|
if err := tq.updateJobError(job.ID, "Job interrupted by server restart"); err != nil {
|
|
logger.Error("Failed to update zombie job error message", "job_id", job.ID, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// recoverPendingJobs enqueues pending jobs from previous server runs
|
|
// This runs ONCE at startup, not repeatedly like the old scanner
|
|
func (tq *TaskQueue) recoverPendingJobs() {
|
|
var pendingJobs []models.TranscriptionJob
|
|
|
|
if err := database.DB.Where("status = ?", models.StatusPending).Find(&pendingJobs).Error; err != nil {
|
|
logger.Error("Failed to scan for pending jobs during startup recovery", "error", err)
|
|
return
|
|
}
|
|
|
|
if len(pendingJobs) == 0 {
|
|
return
|
|
}
|
|
|
|
logger.Info("Recovering pending jobs from previous server run", "count", len(pendingJobs))
|
|
|
|
for _, job := range pendingJobs {
|
|
select {
|
|
case tq.jobChannel <- job.ID:
|
|
logger.Debug("Recovered pending job", "job_id", job.ID)
|
|
default:
|
|
logger.Warn("Queue full during startup recovery, job will remain pending", "job_id", job.ID)
|
|
}
|
|
}
|
|
}
|