import crypto from 'crypto' import { getDb } from './db' import { getAiMaxRetries } from './app-settings' import { tagSingleItem, generateItemDescription, extractItemText, translateItemText } from './ai-tagger' export type AiJobType = 'tag' | 'describe' | 'extract' | 'translate' export type AiJobStatus = 'queued' | 'running' | 'completed' | 'failed' export interface AiJob { id: string itemKey: string libraryId: string jobType: AiJobType status: AiJobStatus error: string | null attempt: number maxRetries: number createdAt: number startedAt: number | null completedAt: number | null itemTitle: string | null } interface AiJobRow { id: string item_key: string library_id: string job_type: string status: string error: string | null attempt: number max_retries: number created_at: number started_at: number | null completed_at: number | null item_title: string | null } function rowToJob(row: AiJobRow): AiJob { return { id: row.id, itemKey: row.item_key, libraryId: row.library_id, jobType: row.job_type as AiJobType, status: row.status as AiJobStatus, error: row.error, attempt: row.attempt, maxRetries: row.max_retries, createdAt: row.created_at, startedAt: row.started_at, completedAt: row.completed_at, itemTitle: row.item_title, } } /** * Look up the title of a media item for display purposes. */ function resolveItemTitle(itemKey: string): string | null { const db = getDb() const row = db .prepare('SELECT title FROM media_items WHERE item_key = ?') .get(itemKey) as { title: string | null } | undefined return row?.title ?? null } // ─── Enqueue ───────────────────────────────────────────────────────────────── /** * Enqueue an AI job. Deduplicates: if a queued/running job with the same * item_key + job_type already exists, returns its ID instead. */ export function enqueueJob( itemKey: string, jobType: AiJobType, libraryId: string, sourceLanguage?: string, ): string { const db = getDb() // Deduplication: check for existing queued/running job const existing = db .prepare( `SELECT id FROM ai_jobs WHERE item_key = ? AND job_type = ? AND status IN ('queued', 'running')` ) .get(itemKey, jobType) as { id: string } | undefined if (existing) return existing.id const id = crypto.randomUUID() const maxRetries = getAiMaxRetries() const title = resolveItemTitle(itemKey) // Store sourceLanguage in the error field temporarily for translate jobs // (it's null at creation, so we repurpose it briefly — cleared when the job runs) const metadata = jobType === 'translate' && sourceLanguage ? sourceLanguage : null db.prepare( `INSERT INTO ai_jobs (id, item_key, library_id, job_type, status, error, attempt, max_retries, created_at, item_title) VALUES (?, ?, ?, ?, 'queued', ?, 0, ?, ?, ?)` ).run(id, itemKey, libraryId, jobType, metadata, maxRetries, Date.now(), title) // Wake the processor wakeProcessor() return id } /** * Enqueue jobs for all media items in a directory (for bulk operations). * Returns the list of job IDs created. */ export function enqueueBulkJobs( libraryId: string, dirPath: string, jobType: AiJobType, itemTypeFilter?: string, extFilter?: Set, ): string[] { const db = getDb() const prefix = dirPath ? `${libraryId}:mixed_file:${encodeURIComponent(dirPath + '/')}` : `${libraryId}:mixed_file:` const items = db .prepare('SELECT item_key, item_type, file_path FROM media_items WHERE item_key LIKE ? AND item_type = ?') .all(`${prefix}%`, itemTypeFilter ?? 'mixed_file') as Array<{ item_key: string; item_type: string; file_path: string | null }> const path = require('path') const jobIds: string[] = [] for (const item of items) { if (!item.file_path) continue if (extFilter) { const ext = path.extname(item.file_path).toLowerCase() if (!extFilter.has(ext)) continue } const jobId = enqueueJob(item.item_key, jobType, libraryId) jobIds.push(jobId) } return jobIds } // ─── Query ─────────────────────────────────────────────────────────────────── export function getJobQueue(): AiJob[] { const db = getDb() const rows = db .prepare( `SELECT * FROM ai_jobs WHERE status IN ('running', 'queued') ORDER BY CASE status WHEN 'running' THEN 0 ELSE 1 END, created_at ASC` ) .all() as AiJobRow[] return rows.map(rowToJob) } export function getJobHistory(limit = 50): AiJob[] { const db = getDb() const rows = db .prepare( `SELECT * FROM ai_jobs WHERE status IN ('completed', 'failed') ORDER BY completed_at DESC LIMIT ?` ) .all(limit) as AiJobRow[] return rows.map(rowToJob) } export function getJob(jobId: string): AiJob | null { const db = getDb() const row = db .prepare('SELECT * FROM ai_jobs WHERE id = ?') .get(jobId) as AiJobRow | undefined return row ? rowToJob(row) : null } // ─── Actions ───────────────────────────────────────────────────────────────── export function retryJob(jobId: string): boolean { const db = getDb() const result = db .prepare( `UPDATE ai_jobs SET status = 'queued', error = NULL, attempt = 0, started_at = NULL, completed_at = NULL WHERE id = ? AND status = 'failed'` ) .run(jobId) if (result.changes > 0) { wakeProcessor() return true } return false } export function cancelJob(jobId: string): boolean { const db = getDb() const result = db .prepare("DELETE FROM ai_jobs WHERE id = ? AND status = 'queued'") .run(jobId) return result.changes > 0 } export function cancelAllQueued(): number { const db = getDb() const result = db .prepare("DELETE FROM ai_jobs WHERE status = 'queued'") .run() return result.changes } export function clearJobHistory(): number { const db = getDb() const result = db .prepare("DELETE FROM ai_jobs WHERE status IN ('completed', 'failed')") .run() return result.changes } // ─── Processor ─────────────────────────────────────────────────────────────── let processorRunning = false let processorWake: (() => void) | null = null function wakeProcessor(): void { if (processorWake) { processorWake() } else if (!processorRunning) { runProcessor() } } async function processNextJob(): Promise { const db = getDb() const row = db .prepare( `SELECT * FROM ai_jobs WHERE status = 'queued' ORDER BY created_at ASC LIMIT 1` ) .get() as AiJobRow | undefined if (!row) return false const now = Date.now() // Extract sourceLanguage for translate jobs (stored in error field at enqueue) const sourceLanguage = row.job_type === 'translate' ? row.error : null db.prepare( "UPDATE ai_jobs SET status = 'running', started_at = ?, error = NULL WHERE id = ?" ).run(now, row.id) try { switch (row.job_type) { case 'tag': await tagSingleItem(row.item_key) break case 'describe': await generateItemDescription(row.item_key) break case 'extract': await extractItemText(row.item_key) break case 'translate': await translateItemText(row.item_key, sourceLanguage || undefined) break } db.prepare( "UPDATE ai_jobs SET status = 'completed', completed_at = ? WHERE id = ?" ).run(Date.now(), row.id) } catch (err) { const errorMessage = err instanceof Error ? err.message : String(err) const attempt = row.attempt + 1 if (attempt < row.max_retries) { // Re-queue for retry db.prepare( "UPDATE ai_jobs SET status = 'queued', attempt = ?, error = ?, started_at = NULL WHERE id = ?" ).run(attempt, errorMessage, row.id) } else { // Final failure db.prepare( "UPDATE ai_jobs SET status = 'failed', attempt = ?, error = ?, completed_at = ? WHERE id = ?" ).run(attempt, errorMessage, Date.now(), row.id) } console.warn( `[ai-jobs] Job ${row.id} (${row.job_type} for "${row.item_key}") failed (attempt ${attempt}/${row.max_retries}):`, errorMessage ) } return true } async function runProcessor(): Promise { if (processorRunning) return processorRunning = true console.log('[ai-jobs] Processor started') try { while (true) { const hadWork = await processNextJob() if (!hadWork) { // Wait for a wake signal or timeout after 60s (then check again for safety) await new Promise((resolve) => { processorWake = resolve setTimeout(() => { processorWake = null resolve() }, 60_000) }) processorWake = null } } } catch (err) { console.error('[ai-jobs] Processor crashed:', err) } finally { processorRunning = false console.log('[ai-jobs] Processor stopped') } } /** * Initialize the job processor. Called on server startup. * Resets any jobs stuck in 'running' state (from a previous crash) back to 'queued'. */ export function initJobProcessor(): void { const db = getDb() const result = db .prepare("UPDATE ai_jobs SET status = 'queued', started_at = NULL WHERE status = 'running'") .run() if (result.changes > 0) { console.log(`[ai-jobs] Reset ${result.changes} stuck running job(s) to queued`) } // Check if there are any queued jobs and start the processor const pending = db .prepare("SELECT COUNT(*) as count FROM ai_jobs WHERE status = 'queued'") .get() as { count: number } if (pending.count > 0) { runProcessor() } }