From fea55594d049e075b017eb9168ff38d2a0af410a Mon Sep 17 00:00:00 2001 From: Garret Patti <42485635+garretpatti@users.noreply.github.com> Date: Mon, 13 Apr 2026 12:29:09 -0400 Subject: [PATCH] add ai job queue --- src/app/api/ai-jobs/route.ts | 63 ++++ src/app/api/ai-settings/route.ts | 13 +- src/app/api/ai-tagging/describe-bulk/route.ts | 25 +- src/app/api/ai-tagging/describe/route.ts | 21 +- .../api/ai-tagging/extract-text-bulk/route.ts | 23 +- src/app/api/ai-tagging/extract-text/route.ts | 21 +- src/app/api/ai-tagging/route.ts | 21 +- src/app/api/ai-tagging/translate/route.ts | 18 +- src/app/manage/ai-tagging/page.tsx | 304 ++++++++++++++- src/components/DoomScrollView.tsx | 5 + src/components/ManageSubNav.tsx | 2 +- src/components/mixed/ImageLightbox.tsx | 11 +- src/components/tags/TagSelector.tsx | 5 + src/instrumentation.ts | 3 + src/lib/ai-jobs.ts | 351 ++++++++++++++++++ src/lib/ai-tagger.ts | 63 +--- src/lib/app-settings.ts | 12 + src/lib/db.ts | 24 ++ 18 files changed, 818 insertions(+), 167 deletions(-) create mode 100644 src/app/api/ai-jobs/route.ts create mode 100644 src/lib/ai-jobs.ts diff --git a/src/app/api/ai-jobs/route.ts b/src/app/api/ai-jobs/route.ts new file mode 100644 index 0000000..79c2810 --- /dev/null +++ b/src/app/api/ai-jobs/route.ts @@ -0,0 +1,63 @@ +import { NextRequest, NextResponse } from 'next/server' +import { requireAdmin } from '@/lib/auth' +import { getJobQueue, getJobHistory, retryJob, cancelJob, cancelAllQueued, clearJobHistory } from '@/lib/ai-jobs' + +export async function GET(request: NextRequest) { + const auth = await requireAdmin(request) + if (auth instanceof NextResponse) return auth + + const queue = getJobQueue() + const history = getJobHistory(50) + return NextResponse.json({ queue, history }) +} + +export async function POST(request: NextRequest) { + const auth = await requireAdmin(request) + if (auth instanceof NextResponse) return auth + + let body: { action?: string; jobId?: string } + try { + body = await request.json() + } catch { + return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 }) + } + + const { action, jobId } = body + + switch (action) { + case 'retry': { + if (!jobId || typeof jobId !== 'string') { + return NextResponse.json({ error: 'jobId is required' }, { status: 400 }) + } + const ok = retryJob(jobId) + if (!ok) { + return NextResponse.json({ error: 'Job not found or not in failed state' }, { status: 404 }) + } + return NextResponse.json({ ok: true }) + } + + case 'cancel': { + if (!jobId || typeof jobId !== 'string') { + return NextResponse.json({ error: 'jobId is required' }, { status: 400 }) + } + const ok = cancelJob(jobId) + if (!ok) { + return NextResponse.json({ error: 'Job not found or not in queued state' }, { status: 404 }) + } + return NextResponse.json({ ok: true }) + } + + case 'cancel-all': { + const cancelled = cancelAllQueued() + return NextResponse.json({ cancelled }) + } + + case 'clear-history': { + const cleared = clearJobHistory() + return NextResponse.json({ cleared }) + } + + default: + return NextResponse.json({ error: 'Unknown action' }, { status: 400 }) + } +} diff --git a/src/app/api/ai-settings/route.ts b/src/app/api/ai-settings/route.ts index 414ddbb..219e559 100644 --- a/src/app/api/ai-settings/route.ts +++ b/src/app/api/ai-settings/route.ts @@ -1,6 +1,6 @@ import { NextRequest, NextResponse } from 'next/server' import { requireAdmin } from '@/lib/auth' -import { getAiConfig, updateAiConfig, getPreferredLanguage, setPreferredLanguage } from '@/lib/app-settings' +import { getAiConfig, updateAiConfig, getPreferredLanguage, setPreferredLanguage, getAiMaxRetries, setAiMaxRetries } from '@/lib/app-settings' export async function GET(request: NextRequest) { const auth = await requireAdmin(request) @@ -8,7 +8,8 @@ export async function GET(request: NextRequest) { const config = getAiConfig() const preferredLanguage = getPreferredLanguage() - return NextResponse.json({ ...config, preferredLanguage }) + const maxRetries = getAiMaxRetries() + return NextResponse.json({ ...config, preferredLanguage, maxRetries }) } export async function PUT(request: NextRequest) { @@ -28,6 +29,7 @@ export async function PUT(request: NextRequest) { promptTagger?: string promptExtract?: string promptTranslate?: string + maxRetries?: number } try { body = await request.json() @@ -39,6 +41,7 @@ export async function PUT(request: NextRequest) { endpoint, model, enabled, preferredLanguage, modelTagging, modelDescribe, modelExtract, modelTranslate, promptDescribe, promptTagger, promptExtract, promptTranslate, + maxRetries, } = body if (typeof endpoint !== 'string') { @@ -69,6 +72,10 @@ export async function PUT(request: NextRequest) { setPreferredLanguage(preferredLanguage.trim()) } + if (typeof maxRetries === 'number' && Number.isFinite(maxRetries)) { + setAiMaxRetries(maxRetries) + } + const config = getAiConfig() - return NextResponse.json({ ...config, preferredLanguage: getPreferredLanguage() }) + return NextResponse.json({ ...config, preferredLanguage: getPreferredLanguage(), maxRetries: getAiMaxRetries() }) } diff --git a/src/app/api/ai-tagging/describe-bulk/route.ts b/src/app/api/ai-tagging/describe-bulk/route.ts index fb9d05e..35fc2ba 100644 --- a/src/app/api/ai-tagging/describe-bulk/route.ts +++ b/src/app/api/ai-tagging/describe-bulk/route.ts @@ -1,6 +1,10 @@ import { NextRequest, NextResponse } from 'next/server' import { requireLibraryAccess } from '@/lib/auth' -import { describeDirectoryItems } from '@/lib/ai-tagger' +import { enqueueBulkJobs } from '@/lib/ai-jobs' + +const IMAGE_EXTENSIONS = new Set(['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.tiff', '.tif']) +const VIDEO_EXTENSIONS = new Set(['.mp4', '.mkv', '.avi', '.mov', '.wmv', '.m4v', '.webm', '.flv', '.ts', '.mpg', '.mpeg']) +const MEDIA_EXTENSIONS = new Set([...IMAGE_EXTENSIONS, ...VIDEO_EXTENSIONS]) export async function POST(request: NextRequest) { let body: { libraryId?: string; path?: string } @@ -18,21 +22,6 @@ export async function POST(request: NextRequest) { const auth = await requireLibraryAccess(request, libraryId) if (auth instanceof NextResponse) return auth - try { - const processed = await describeDirectoryItems(libraryId, dirPath ?? '') - return NextResponse.json({ processed }) - } catch (err) { - const error = err as Error & { code?: string } - if (error.code === 'NOT_CONFIGURED') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - if (error.code === 'NOT_FOUND') { - return NextResponse.json({ error: error.message }, { status: 404 }) - } - if (error.code === 'INVALID_TYPE') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - console.error('[ai-tagging/describe-bulk] Error:', error) - return NextResponse.json({ error: 'Failed to generate descriptions' }, { status: 502 }) - } + const jobIds = enqueueBulkJobs(libraryId, dirPath ?? '', 'describe', 'mixed_file', MEDIA_EXTENSIONS) + return NextResponse.json({ jobIds, queued: jobIds.length }, { status: 202 }) } diff --git a/src/app/api/ai-tagging/describe/route.ts b/src/app/api/ai-tagging/describe/route.ts index 5121c65..ba6e508 100644 --- a/src/app/api/ai-tagging/describe/route.ts +++ b/src/app/api/ai-tagging/describe/route.ts @@ -1,6 +1,6 @@ import { NextRequest, NextResponse } from 'next/server' import { requireLibraryAccess } from '@/lib/auth' -import { generateItemDescription } from '@/lib/ai-tagger' +import { enqueueJob } from '@/lib/ai-jobs' export async function POST(request: NextRequest) { let body: { itemKey?: string } @@ -19,21 +19,6 @@ export async function POST(request: NextRequest) { const auth = await requireLibraryAccess(request, libraryId) if (auth instanceof NextResponse) return auth - try { - const description = await generateItemDescription(itemKey) - return NextResponse.json({ description }) - } catch (err) { - const error = err as Error & { code?: string } - if (error.code === 'NOT_CONFIGURED') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - if (error.code === 'NOT_FOUND') { - return NextResponse.json({ error: error.message }, { status: 404 }) - } - if (error.code === 'NO_IMAGE') { - return NextResponse.json({ error: error.message }, { status: 404 }) - } - console.error('[ai-tagging/describe] Error:', error) - return NextResponse.json({ error: 'Failed to generate description' }, { status: 502 }) - } + const jobId = enqueueJob(itemKey, 'describe', libraryId) + return NextResponse.json({ jobId }, { status: 202 }) } diff --git a/src/app/api/ai-tagging/extract-text-bulk/route.ts b/src/app/api/ai-tagging/extract-text-bulk/route.ts index 196ca19..3004abd 100644 --- a/src/app/api/ai-tagging/extract-text-bulk/route.ts +++ b/src/app/api/ai-tagging/extract-text-bulk/route.ts @@ -1,6 +1,8 @@ import { NextRequest, NextResponse } from 'next/server' import { requireLibraryAccess } from '@/lib/auth' -import { extractDirectoryText } from '@/lib/ai-tagger' +import { enqueueBulkJobs } from '@/lib/ai-jobs' + +const IMAGE_EXTENSIONS = new Set(['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.tiff', '.tif']) export async function POST(request: NextRequest) { let body: { libraryId?: string; path?: string } @@ -18,21 +20,6 @@ export async function POST(request: NextRequest) { const auth = await requireLibraryAccess(request, libraryId) if (auth instanceof NextResponse) return auth - try { - const processed = await extractDirectoryText(libraryId, dirPath ?? '') - return NextResponse.json({ processed }) - } catch (err) { - const error = err as Error & { code?: string } - if (error.code === 'NOT_CONFIGURED') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - if (error.code === 'NOT_FOUND') { - return NextResponse.json({ error: error.message }, { status: 404 }) - } - if (error.code === 'INVALID_TYPE') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - console.error('[ai-tagging/extract-text-bulk] Error:', error) - return NextResponse.json({ error: 'Failed to extract text' }, { status: 502 }) - } + const jobIds = enqueueBulkJobs(libraryId, dirPath ?? '', 'extract', 'mixed_file', IMAGE_EXTENSIONS) + return NextResponse.json({ jobIds, queued: jobIds.length }, { status: 202 }) } diff --git a/src/app/api/ai-tagging/extract-text/route.ts b/src/app/api/ai-tagging/extract-text/route.ts index 58de630..5b6ad22 100644 --- a/src/app/api/ai-tagging/extract-text/route.ts +++ b/src/app/api/ai-tagging/extract-text/route.ts @@ -1,6 +1,6 @@ import { NextRequest, NextResponse } from 'next/server' import { requireLibraryAccess } from '@/lib/auth' -import { extractItemText } from '@/lib/ai-tagger' +import { enqueueJob } from '@/lib/ai-jobs' export async function POST(request: NextRequest) { let body: { itemKey?: string } @@ -19,21 +19,6 @@ export async function POST(request: NextRequest) { const auth = await requireLibraryAccess(request, libraryId) if (auth instanceof NextResponse) return auth - try { - const result = await extractItemText(itemKey) - return NextResponse.json(result) - } catch (err) { - const error = err as Error & { code?: string } - if (error.code === 'NOT_CONFIGURED') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - if (error.code === 'NOT_FOUND') { - return NextResponse.json({ error: error.message }, { status: 404 }) - } - if (error.code === 'NO_IMAGE' || error.code === 'INVALID_TYPE') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - console.error('[ai-tagging/extract-text] Error:', error) - return NextResponse.json({ error: 'Failed to extract text' }, { status: 502 }) - } + const jobId = enqueueJob(itemKey, 'extract', libraryId) + return NextResponse.json({ jobId }, { status: 202 }) } diff --git a/src/app/api/ai-tagging/route.ts b/src/app/api/ai-tagging/route.ts index f248a14..428b701 100644 --- a/src/app/api/ai-tagging/route.ts +++ b/src/app/api/ai-tagging/route.ts @@ -1,6 +1,6 @@ import { NextRequest, NextResponse } from 'next/server' import { requireLibraryAccess } from '@/lib/auth' -import { tagSingleItem } from '@/lib/ai-tagger' +import { enqueueJob } from '@/lib/ai-jobs' export async function POST(request: NextRequest) { let body: { itemKey?: string } @@ -19,21 +19,6 @@ export async function POST(request: NextRequest) { const auth = await requireLibraryAccess(request, libraryId) if (auth instanceof NextResponse) return auth - try { - const tagIds = await tagSingleItem(itemKey) - return NextResponse.json({ tagIds }) - } catch (err) { - const error = err as Error & { code?: string } - if (error.code === 'NOT_CONFIGURED') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - if (error.code === 'NOT_FOUND') { - return NextResponse.json({ error: error.message }, { status: 404 }) - } - if (error.code === 'NO_IMAGE') { - return NextResponse.json({ error: error.message }, { status: 404 }) - } - console.error('[ai-tagging] Error tagging item:', error) - return NextResponse.json({ error: 'AI tagging failed' }, { status: 502 }) - } + const jobId = enqueueJob(itemKey, 'tag', libraryId) + return NextResponse.json({ jobId }, { status: 202 }) } diff --git a/src/app/api/ai-tagging/translate/route.ts b/src/app/api/ai-tagging/translate/route.ts index 89d041a..27ee8dd 100644 --- a/src/app/api/ai-tagging/translate/route.ts +++ b/src/app/api/ai-tagging/translate/route.ts @@ -1,6 +1,6 @@ import { NextRequest, NextResponse } from 'next/server' import { requireLibraryAccess } from '@/lib/auth' -import { translateItemText } from '@/lib/ai-tagger' +import { enqueueJob } from '@/lib/ai-jobs' export async function POST(request: NextRequest) { let body: { itemKey?: string; sourceLanguage?: string } @@ -19,18 +19,6 @@ export async function POST(request: NextRequest) { const auth = await requireLibraryAccess(request, libraryId) if (auth instanceof NextResponse) return auth - try { - const translatedText = await translateItemText(itemKey, sourceLanguage || undefined) - return NextResponse.json({ translatedText }) - } catch (err) { - const error = err as Error & { code?: string } - if (error.code === 'NOT_CONFIGURED') { - return NextResponse.json({ error: error.message }, { status: 400 }) - } - if (error.code === 'NOT_FOUND') { - return NextResponse.json({ error: error.message }, { status: 404 }) - } - console.error('[ai-tagging/translate] Error:', error) - return NextResponse.json({ error: 'Failed to translate text' }, { status: 502 }) - } + const jobId = enqueueJob(itemKey, 'translate', libraryId, sourceLanguage || undefined) + return NextResponse.json({ jobId }, { status: 202 }) } diff --git a/src/app/manage/ai-tagging/page.tsx b/src/app/manage/ai-tagging/page.tsx index 5807cd7..e45b65c 100644 --- a/src/app/manage/ai-tagging/page.tsx +++ b/src/app/manage/ai-tagging/page.tsx @@ -1,6 +1,6 @@ 'use client' -import { useEffect, useState, useCallback } from 'react' +import { useEffect, useState, useCallback, useRef } from 'react' interface AiSettings { endpoint: string @@ -15,6 +15,22 @@ interface AiSettings { promptTagger: string promptExtract: string promptTranslate: string + maxRetries: number +} + +interface AiJob { + id: string + itemKey: string + libraryId: string + jobType: string + status: string + error: string | null + attempt: number + maxRetries: number + createdAt: number + startedAt: number | null + completedAt: number | null + itemTitle: string | null } interface Library { @@ -33,11 +49,24 @@ interface LibraryOverride { promptTranslate: string } +function formatElapsed(startedAt: number): string { + const seconds = Math.floor((Date.now() - startedAt) / 1000) + if (seconds < 60) return `${seconds}s` + const m = Math.floor(seconds / 60) + const s = seconds % 60 + return `${m}m ${s}s` +} + +function formatDate(ts: number): string { + return new Date(ts).toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }) +} + export default function AiTaggingPage() { const [settings, setSettings] = useState({ endpoint: '', model: '', modelTagging: '', modelDescribe: '', modelExtract: '', modelTranslate: '', enabled: false, preferredLanguage: 'English', promptDescribe: '', promptTagger: '', promptExtract: '', promptTranslate: '', + maxRetries: 3, }) const [loading, setLoading] = useState(true) const [saving, setSaving] = useState(false) @@ -54,6 +83,11 @@ export default function AiTaggingPage() { const [librarySaving, setLibrarySaving] = useState>({}) const [librarySaveResult, setLibrarySaveResult] = useState>({}) + // Job queue state + const [jobQueue, setJobQueue] = useState([]) + const [jobHistory, setJobHistory] = useState([]) + const [historyExpanded, setHistoryExpanded] = useState(false) + const jobPollRef = useRef | null>(null) const fetchSettings = useCallback(async () => { try { const [settingsRes, librariesRes] = await Promise.all([ @@ -79,6 +113,77 @@ export default function AiTaggingPage() { fetchSettings() }, [fetchSettings]) + // ─── Job queue polling ─────────────────────────────────────────────────────── + + const fetchJobs = useCallback(async () => { + try { + const res = await fetch('/api/ai-jobs') + if (res.ok) { + const data: { queue: AiJob[]; history: AiJob[] } = await res.json() + setJobQueue(data.queue) + setJobHistory(data.history) + } + } catch { + // ignore + } + }, []) + + useEffect(() => { + fetchJobs() + }, [fetchJobs]) + + // Poll every 2s while there are active jobs + useEffect(() => { + const hasActive = jobQueue.length > 0 + if (hasActive) { + jobPollRef.current = setInterval(fetchJobs, 2000) + } else { + if (jobPollRef.current) { + clearInterval(jobPollRef.current) + jobPollRef.current = null + } + } + return () => { + if (jobPollRef.current) clearInterval(jobPollRef.current) + } + }, [jobQueue.length, fetchJobs]) + + const handleRetryJob = async (jobId: string) => { + await fetch('/api/ai-jobs', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ action: 'retry', jobId }), + }) + fetchJobs() + } + + const handleCancelJob = async (jobId: string) => { + await fetch('/api/ai-jobs', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ action: 'cancel', jobId }), + }) + fetchJobs() + } + + const handleCancelAll = async () => { + await fetch('/api/ai-jobs', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ action: 'cancel-all' }), + }) + fetchJobs() + } + + const handleClearHistory = async () => { + await fetch('/api/ai-jobs', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ action: 'clear-history' }), + }) + fetchJobs() + } + const fetchLibraryOverrides = useCallback(async (libraryId: string) => { try { const res = await fetch(`/api/ai-settings/library/${libraryId}`) @@ -201,12 +306,182 @@ export default function AiTaggingPage() { return (

- AI Tagging + AI Integrations

- Automatically tag media using a vision-capable LLM on your network. + Manage AI-powered tagging, descriptions, and text extraction.

+ {/* ─── Job Queue ─────────────────────────────────────────────────────── */} +
+ {(() => { + const running = jobQueue.filter((j) => j.status === 'running') + const queued = jobQueue.filter((j) => j.status === 'queued') + if (running.length === 0 && queued.length === 0) { + return ( +

+ No active jobs. +

+ ) + } + return ( +
+
+

+ {running.length > 0 && {running.length} running} + {running.length > 0 && queued.length > 0 && ', '} + {queued.length > 0 && {queued.length} queued} +

+ {queued.length > 0 && ( + + )} +
+
+ {running.map((job) => ( +
+ + Running + + + {job.itemTitle || job.itemKey} + + + {job.jobType} + + {job.startedAt && ( + + {formatElapsed(job.startedAt)} + + )} +
+ ))} + {queued.map((job) => ( +
+ + Queued + + + {job.itemTitle || job.itemKey} + + + {job.jobType} + + +
+ ))} +
+
+ ) + })()} +
+ + {/* ─── Job History ───────────────────────────────────────────────────── */} +
+ {jobHistory.length === 0 ? ( +

+ No recent jobs. +

+ ) : ( +
+ + {historyExpanded && ( + <> +
+ {jobHistory.map((job) => ( +
+ + {job.status === 'completed' ? 'Done' : 'Failed'} + +
+ + {job.itemTitle || job.itemKey} + + {job.status === 'failed' && job.error && ( + + {job.error} + + )} +
+ + {job.jobType} + + + {job.completedAt ? formatDate(job.completedAt) : ''} + + {job.status === 'failed' && ( + + )} +
+ ))} +
+
+ +
+ + )} +
+ )} +
+
{loading ? ( @@ -365,6 +640,29 @@ export default function AiTaggingPage() {

+ + + setSettings((s) => ({ ...s, maxRetries: Math.max(0, Math.min(10, parseInt(e.target.value) || 0)) })) + } + className="w-24 rounded-lg px-3 py-2 text-sm outline-none focus:ring-2" + style={{ + backgroundColor: 'var(--background)', + border: '1px solid var(--border)', + color: 'var(--text-primary)', + }} + onFocus={(e) => ((e.currentTarget as HTMLElement).style.borderColor = 'var(--accent)')} + onBlur={(e) => ((e.currentTarget as HTMLElement).style.borderColor = 'var(--border)')} + /> +

+ Number of times to automatically retry a failed AI job before marking it as failed (0–10). +

+
+ {saveError && (

({})) throw new Error((data as { error?: string }).error ?? 'Extraction failed') } + if (res.status === 202) { + setExtractError('Queued — check AI Integrations for progress') + setTimeout(() => setExtractError(null), 4000) + return + } const result = await res.json() setExtractedText(result.extractedText || null) setTranslatedText(result.translatedText || null) diff --git a/src/components/ManageSubNav.tsx b/src/components/ManageSubNav.tsx index a57531f..0195c31 100644 --- a/src/components/ManageSubNav.tsx +++ b/src/components/ManageSubNav.tsx @@ -8,7 +8,7 @@ const TABS = [ { href: '/manage/tags', label: 'Tags' }, { href: '/manage/users', label: 'Users' }, { href: '/manage/scanning', label: 'Scanning' }, - { href: '/manage/ai-tagging', label: 'AI Tagging' }, + { href: '/manage/ai-tagging', label: 'AI Integrations' }, ] export default function ManageSubNav() { diff --git a/src/components/mixed/ImageLightbox.tsx b/src/components/mixed/ImageLightbox.tsx index 50ef0fd..5c60986 100644 --- a/src/components/mixed/ImageLightbox.tsx +++ b/src/components/mixed/ImageLightbox.tsx @@ -269,6 +269,11 @@ export default function ImageLightbox({ url, name, onClose, onPrev, onNext, item const data = await res.json().catch(() => ({})) throw new Error((data as { error?: string }).error ?? 'Failed to extract text') } + if (res.status === 202) { + setExtractError('Queued — check AI Integrations for progress') + setTimeout(() => setExtractError(null), 4000) + return + } const result = await res.json() setExtractedText(result.extractedText || null) setEditedExtractedText(result.extractedText || '') @@ -385,8 +390,10 @@ export default function ImageLightbox({ url, name, onClose, onPrev, onNext, item const data = await res.json().catch(() => ({})) throw new Error((data as { error?: string }).error ?? 'Failed to translate') } - const result = await res.json() - setTranslatedText(result.translatedText || null) + if (res.status !== 202) { + const result = await res.json() + setTranslatedText(result.translatedText || null) + } } catch { // ignore } finally { diff --git a/src/components/tags/TagSelector.tsx b/src/components/tags/TagSelector.tsx index d46e00c..742cb16 100644 --- a/src/components/tags/TagSelector.tsx +++ b/src/components/tags/TagSelector.tsx @@ -192,6 +192,11 @@ export default function TagSelector({ itemKey, onTagsChanged, refreshKey }: Prop const data = await res.json().catch(() => ({})) throw new Error((data as { error?: string }).error ?? 'Failed to generate description') } + if (res.status === 202) { + setDescError('Queued — check AI Integrations for progress') + setTimeout(() => setDescError(null), 4000) + return + } const { description } = await res.json() setAiDescription(description) } catch (err) { diff --git a/src/instrumentation.ts b/src/instrumentation.ts index 483d983..8fbbc63 100644 --- a/src/instrumentation.ts +++ b/src/instrumentation.ts @@ -5,5 +5,8 @@ export async function register() { const { startScheduler } = await import('./lib/scheduler') startScheduler() + + const { initJobProcessor } = await import('./lib/ai-jobs') + initJobProcessor() } } diff --git a/src/lib/ai-jobs.ts b/src/lib/ai-jobs.ts new file mode 100644 index 0000000..70ca66e --- /dev/null +++ b/src/lib/ai-jobs.ts @@ -0,0 +1,351 @@ +import crypto from 'crypto' +import { getDb } from './db' +import { getAiMaxRetries } from './app-settings' +import { tagSingleItem, generateItemDescription, extractItemText, translateItemText } from './ai-tagger' + +export type AiJobType = 'tag' | 'describe' | 'extract' | 'translate' +export type AiJobStatus = 'queued' | 'running' | 'completed' | 'failed' + +export interface AiJob { + id: string + itemKey: string + libraryId: string + jobType: AiJobType + status: AiJobStatus + error: string | null + attempt: number + maxRetries: number + createdAt: number + startedAt: number | null + completedAt: number | null + itemTitle: string | null +} + +interface AiJobRow { + id: string + item_key: string + library_id: string + job_type: string + status: string + error: string | null + attempt: number + max_retries: number + created_at: number + started_at: number | null + completed_at: number | null + item_title: string | null +} + +function rowToJob(row: AiJobRow): AiJob { + return { + id: row.id, + itemKey: row.item_key, + libraryId: row.library_id, + jobType: row.job_type as AiJobType, + status: row.status as AiJobStatus, + error: row.error, + attempt: row.attempt, + maxRetries: row.max_retries, + createdAt: row.created_at, + startedAt: row.started_at, + completedAt: row.completed_at, + itemTitle: row.item_title, + } +} + +/** + * Look up the title of a media item for display purposes. + */ +function resolveItemTitle(itemKey: string): string | null { + const db = getDb() + const row = db + .prepare('SELECT title FROM media_items WHERE item_key = ?') + .get(itemKey) as { title: string | null } | undefined + return row?.title ?? null +} + +// ─── Enqueue ───────────────────────────────────────────────────────────────── + +/** + * Enqueue an AI job. Deduplicates: if a queued/running job with the same + * item_key + job_type already exists, returns its ID instead. + */ +export function enqueueJob( + itemKey: string, + jobType: AiJobType, + libraryId: string, + sourceLanguage?: string, +): string { + const db = getDb() + + // Deduplication: check for existing queued/running job + const existing = db + .prepare( + `SELECT id FROM ai_jobs + WHERE item_key = ? AND job_type = ? AND status IN ('queued', 'running')` + ) + .get(itemKey, jobType) as { id: string } | undefined + if (existing) return existing.id + + const id = crypto.randomUUID() + const maxRetries = getAiMaxRetries() + const title = resolveItemTitle(itemKey) + + // Store sourceLanguage in the error field temporarily for translate jobs + // (it's null at creation, so we repurpose it briefly — cleared when the job runs) + const metadata = jobType === 'translate' && sourceLanguage ? sourceLanguage : null + + db.prepare( + `INSERT INTO ai_jobs (id, item_key, library_id, job_type, status, error, attempt, max_retries, created_at, item_title) + VALUES (?, ?, ?, ?, 'queued', ?, 0, ?, ?, ?)` + ).run(id, itemKey, libraryId, jobType, metadata, maxRetries, Date.now(), title) + + // Wake the processor + wakeProcessor() + + return id +} + +/** + * Enqueue jobs for all media items in a directory (for bulk operations). + * Returns the list of job IDs created. + */ +export function enqueueBulkJobs( + libraryId: string, + dirPath: string, + jobType: AiJobType, + itemTypeFilter?: string, + extFilter?: Set, +): string[] { + const db = getDb() + const prefix = dirPath + ? `${libraryId}:mixed_file:${encodeURIComponent(dirPath + '/')}` + : `${libraryId}:mixed_file:` + + const items = db + .prepare('SELECT item_key, item_type, file_path FROM media_items WHERE item_key LIKE ? AND item_type = ?') + .all(`${prefix}%`, itemTypeFilter ?? 'mixed_file') as Array<{ item_key: string; item_type: string; file_path: string | null }> + + const path = require('path') + const jobIds: string[] = [] + + for (const item of items) { + if (!item.file_path) continue + if (extFilter) { + const ext = path.extname(item.file_path).toLowerCase() + if (!extFilter.has(ext)) continue + } + const jobId = enqueueJob(item.item_key, jobType, libraryId) + jobIds.push(jobId) + } + + return jobIds +} + +// ─── Query ─────────────────────────────────────────────────────────────────── + +export function getJobQueue(): AiJob[] { + const db = getDb() + const rows = db + .prepare( + `SELECT * FROM ai_jobs + WHERE status IN ('running', 'queued') + ORDER BY + CASE status WHEN 'running' THEN 0 ELSE 1 END, + created_at ASC` + ) + .all() as AiJobRow[] + return rows.map(rowToJob) +} + +export function getJobHistory(limit = 50): AiJob[] { + const db = getDb() + const rows = db + .prepare( + `SELECT * FROM ai_jobs + WHERE status IN ('completed', 'failed') + ORDER BY completed_at DESC + LIMIT ?` + ) + .all(limit) as AiJobRow[] + return rows.map(rowToJob) +} + +export function getJob(jobId: string): AiJob | null { + const db = getDb() + const row = db + .prepare('SELECT * FROM ai_jobs WHERE id = ?') + .get(jobId) as AiJobRow | undefined + return row ? rowToJob(row) : null +} + +// ─── Actions ───────────────────────────────────────────────────────────────── + +export function retryJob(jobId: string): boolean { + const db = getDb() + const result = db + .prepare( + `UPDATE ai_jobs SET status = 'queued', error = NULL, attempt = 0, started_at = NULL, completed_at = NULL + WHERE id = ? AND status = 'failed'` + ) + .run(jobId) + if (result.changes > 0) { + wakeProcessor() + return true + } + return false +} + +export function cancelJob(jobId: string): boolean { + const db = getDb() + const result = db + .prepare("DELETE FROM ai_jobs WHERE id = ? AND status = 'queued'") + .run(jobId) + return result.changes > 0 +} + +export function cancelAllQueued(): number { + const db = getDb() + const result = db + .prepare("DELETE FROM ai_jobs WHERE status = 'queued'") + .run() + return result.changes +} + +export function clearJobHistory(): number { + const db = getDb() + const result = db + .prepare("DELETE FROM ai_jobs WHERE status IN ('completed', 'failed')") + .run() + return result.changes +} + +// ─── Processor ─────────────────────────────────────────────────────────────── + +let processorRunning = false +let processorWake: (() => void) | null = null + +function wakeProcessor(): void { + if (processorWake) { + processorWake() + } else if (!processorRunning) { + runProcessor() + } +} + +async function processNextJob(): Promise { + const db = getDb() + + const row = db + .prepare( + `SELECT * FROM ai_jobs + WHERE status = 'queued' + ORDER BY created_at ASC + LIMIT 1` + ) + .get() as AiJobRow | undefined + + if (!row) return false + + const now = Date.now() + + // Extract sourceLanguage for translate jobs (stored in error field at enqueue) + const sourceLanguage = row.job_type === 'translate' ? row.error : null + + db.prepare( + "UPDATE ai_jobs SET status = 'running', started_at = ?, error = NULL WHERE id = ?" + ).run(now, row.id) + + try { + switch (row.job_type) { + case 'tag': + await tagSingleItem(row.item_key) + break + case 'describe': + await generateItemDescription(row.item_key) + break + case 'extract': + await extractItemText(row.item_key) + break + case 'translate': + await translateItemText(row.item_key, sourceLanguage || undefined) + break + } + + db.prepare( + "UPDATE ai_jobs SET status = 'completed', completed_at = ? WHERE id = ?" + ).run(Date.now(), row.id) + } catch (err) { + const errorMessage = err instanceof Error ? err.message : String(err) + const attempt = row.attempt + 1 + + if (attempt < row.max_retries) { + // Re-queue for retry + db.prepare( + "UPDATE ai_jobs SET status = 'queued', attempt = ?, error = ?, started_at = NULL WHERE id = ?" + ).run(attempt, errorMessage, row.id) + } else { + // Final failure + db.prepare( + "UPDATE ai_jobs SET status = 'failed', attempt = ?, error = ?, completed_at = ? WHERE id = ?" + ).run(attempt, errorMessage, Date.now(), row.id) + } + + console.warn( + `[ai-jobs] Job ${row.id} (${row.job_type} for "${row.item_key}") failed (attempt ${attempt}/${row.max_retries}):`, + errorMessage + ) + } + + return true +} + +async function runProcessor(): Promise { + if (processorRunning) return + processorRunning = true + console.log('[ai-jobs] Processor started') + + try { + while (true) { + const hadWork = await processNextJob() + if (!hadWork) { + // Wait for a wake signal or timeout after 60s (then check again for safety) + await new Promise((resolve) => { + processorWake = resolve + setTimeout(() => { + processorWake = null + resolve() + }, 60_000) + }) + processorWake = null + } + } + } catch (err) { + console.error('[ai-jobs] Processor crashed:', err) + } finally { + processorRunning = false + console.log('[ai-jobs] Processor stopped') + } +} + +/** + * Initialize the job processor. Called on server startup. + * Resets any jobs stuck in 'running' state (from a previous crash) back to 'queued'. + */ +export function initJobProcessor(): void { + const db = getDb() + const result = db + .prepare("UPDATE ai_jobs SET status = 'queued', started_at = NULL WHERE status = 'running'") + .run() + if (result.changes > 0) { + console.log(`[ai-jobs] Reset ${result.changes} stuck running job(s) to queued`) + } + + // Check if there are any queued jobs and start the processor + const pending = db + .prepare("SELECT COUNT(*) as count FROM ai_jobs WHERE status = 'queued'") + .get() as { count: number } + if (pending.count > 0) { + runProcessor() + } +} diff --git a/src/lib/ai-tagger.ts b/src/lib/ai-tagger.ts index 334a902..8adf44a 100644 --- a/src/lib/ai-tagger.ts +++ b/src/lib/ai-tagger.ts @@ -225,7 +225,7 @@ async function callVisionApi( /** * Run AI tagging for a single library. Called after the scanner finishes. - * Processes up to BATCH_LIMIT untagged items per invocation. + * Enqueues up to BATCH_LIMIT untagged items as jobs for the processor. */ export async function runAiTagging(library: Library, libraryRoot: string): Promise { const config = getEffectiveAiConfig(library.id) @@ -234,14 +234,10 @@ export async function runAiTagging(library: Library, libraryRoot: string): Promi const activeCategoryIds = new Set(getActiveCategoryIdsForLibrary(library.id)) const allTags = getTags() - const allCategories = getCategories() const tags = allTags.filter((t) => activeCategoryIds.has(t.categoryId)) - const categories = allCategories.filter((c) => activeCategoryIds.has(c.id)) if (tags.length === 0) return - const validTagIds = new Set(tags.map((t) => t.id)) - const db = getDb() const untaggedItems = db .prepare( @@ -254,18 +250,13 @@ export async function runAiTagging(library: Library, libraryRoot: string): Promi if (untaggedItems.length === 0) return - console.log(`[ai-tagger] Processing ${untaggedItems.length} items in library "${library.name}"`) + // Import enqueueJob lazily to avoid circular dependency + const { enqueueJob } = await import('./ai-jobs') - let tagged = 0 - let consecutiveFailures = 0 + let enqueued = 0 const markTagged = db.prepare('UPDATE media_items SET ai_tagged_at = ? WHERE item_key = ?') for (const item of untaggedItems) { - if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) { - console.warn(`[ai-tagger] Aborting after ${MAX_CONSECUTIVE_FAILURES} consecutive failures`) - break - } - const resolvedMedia = resolveItemImage(libraryRoot, item) if (!resolvedMedia) { // No image or video available — mark as tagged so we don't retry every scan @@ -273,48 +264,14 @@ export async function runAiTagging(library: Library, libraryRoot: string): Promi continue } - try { - let base64Images: string[] - if (resolvedMedia.mediaType === 'video') { - const framePaths = await getVideoFramePaths(resolvedMedia.path, library.id, VIDEO_FRAME_PERCENTAGES) - base64Images = framePaths.map((p) => fs.readFileSync(p, 'base64')) - } else { - const thumbnailPath = await getAiImagePath(resolvedMedia.path, library.id) - base64Images = [fs.readFileSync(thumbnailPath, 'base64')] - } - - const { tags: currentItemTags } = getResolvedTagsForItem(item.item_key) - const aiFields = getAiFields(item.item_key) - const systemPrompt = buildTagPrompt(tags, categories, { - currentTags: currentItemTags, - mediaContext: resolvedMedia.mediaType, - aiDescription: aiFields.aiDescription, - extractedText: aiFields.extractedTextTranslated ?? aiFields.extractedText, - customInstruction: config.promptTagger || undefined, - }) - - const suggestedIds = await callVisionApi(config.endpoint, taggingModel, base64Images, systemPrompt) - - // Filter to valid tags only - const validIds = suggestedIds.filter((id) => validTagIds.has(id)) - for (const tagId of validIds) { - addTagToItem(item.item_key, tagId) - } - - markTagged.run(Date.now(), item.item_key) - tagged++ - consecutiveFailures = 0 - } catch (err) { - consecutiveFailures++ - console.warn( - `[ai-tagger] Failed to tag "${item.item_key}":`, - err instanceof Error ? err.message : err - ) - } + enqueueJob(item.item_key, 'tag', library.id) + // Mark as tagged immediately so subsequent scans don't re-enqueue + markTagged.run(Date.now(), item.item_key) + enqueued++ } - if (tagged > 0) { - console.log(`[ai-tagger] Tagged ${tagged}/${untaggedItems.length} items in library "${library.name}"`) + if (enqueued > 0) { + console.log(`[ai-tagger] Enqueued ${enqueued} tagging jobs for library "${library.name}"`) } } diff --git a/src/lib/app-settings.ts b/src/lib/app-settings.ts index 2630d99..d685c39 100644 --- a/src/lib/app-settings.ts +++ b/src/lib/app-settings.ts @@ -202,3 +202,15 @@ export function getEffectiveAiConfig(libraryId: string): AiConfig { promptTranslate: overrides.promptTranslate || global.promptTranslate, } } + +// ─── AI Max Retries ────────────────────────────────────────────────────────── + +export function getAiMaxRetries(): number { + const raw = getSetting('ai_max_retries') + const parsed = parseInt(raw ?? '3', 10) + return Number.isFinite(parsed) && parsed >= 0 ? parsed : 3 +} + +export function setAiMaxRetries(n: number): void { + setSetting('ai_max_retries', String(Math.max(0, Math.floor(n)))) +} diff --git a/src/lib/db.ts b/src/lib/db.ts index 91ceefa..058a56b 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -105,6 +105,7 @@ function initDb(db: Database.Database): void { migrateMediaItemsAiTagged(db) migrateMediaItemsAiFields(db) migrateLibraryAiSettings(db) + migrateAiJobs(db) seedAppSettings(db) } @@ -117,6 +118,7 @@ function seedAppSettings(db: Database.Database): void { ai_endpoint: '', ai_model: '', preferred_language: 'English', + ai_max_retries: '3', } const insert = db.prepare( 'INSERT OR IGNORE INTO app_settings (key, value) VALUES (?, ?)' @@ -298,3 +300,25 @@ function migrateLibrariesType(db: Database.Database): void { `) } } + +function migrateAiJobs(db: Database.Database): void { + db.exec(` + CREATE TABLE IF NOT EXISTS ai_jobs ( + id TEXT PRIMARY KEY, + item_key TEXT NOT NULL, + library_id TEXT NOT NULL, + job_type TEXT NOT NULL CHECK(job_type IN ('tag','describe','extract','translate')), + status TEXT NOT NULL DEFAULT 'queued' CHECK(status IN ('queued','running','completed','failed')), + error TEXT, + attempt INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 3, + created_at INTEGER NOT NULL, + started_at INTEGER, + completed_at INTEGER, + item_title TEXT + ); + + CREATE INDEX IF NOT EXISTS ai_jobs_status ON ai_jobs(status); + CREATE INDEX IF NOT EXISTS ai_jobs_created_at ON ai_jobs(created_at); + `) +}