fixes
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from app.config import settings
|
||||
@@ -6,6 +7,17 @@ engine = create_async_engine(settings.database_url, echo=False)
|
||||
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
|
||||
|
||||
|
||||
# Enable WAL mode so concurrent reads don't block on an open write transaction.
|
||||
# Also set a generous busy timeout so transient lock contention retries instead
|
||||
# of immediately raising OperationalError.
|
||||
@event.listens_for(engine.sync_engine, "connect")
|
||||
def _set_sqlite_pragmas(dbapi_conn, _record):
|
||||
cursor = dbapi_conn.cursor()
|
||||
cursor.execute("PRAGMA journal_mode=WAL")
|
||||
cursor.execute("PRAGMA busy_timeout=10000") # 10 s
|
||||
cursor.close()
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
@@ -1,29 +1,52 @@
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from alembic.config import Config
|
||||
from alembic import command
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from app.database import engine
|
||||
# uvicorn's dictConfig only configures uvicorn.* loggers; the root logger
|
||||
# ends up with no handler, so app.* records are silently discarded.
|
||||
# Give the app namespace its own StreamHandler to guarantee output.
|
||||
_app_logger = logging.getLogger("app")
|
||||
_app_logger.setLevel(logging.INFO)
|
||||
if not _app_logger.handlers:
|
||||
_h = logging.StreamHandler()
|
||||
_h.setFormatter(logging.Formatter("%(levelname)-8s [%(name)s] %(message)s"))
|
||||
_app_logger.addHandler(_h)
|
||||
_app_logger.propagate = False
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
from app.database import engine, Base
|
||||
from app.routers import libraries, media, tags, search
|
||||
from app.services import watcher as watcher_service
|
||||
|
||||
|
||||
def run_migrations():
|
||||
alembic_cfg = Config("/backend/alembic.ini")
|
||||
command.upgrade(alembic_cfg, "head")
|
||||
import app.models # noqa: F401 — registers models with Base.metadata
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
run_migrations()
|
||||
log.info("Creating database tables...")
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
log.info("Starting library watchers...")
|
||||
await watcher_service.start_all()
|
||||
|
||||
log.info("Startup complete.")
|
||||
yield
|
||||
|
||||
await watcher_service.stop_all()
|
||||
|
||||
|
||||
app = FastAPI(title="MediaLore", lifespan=lifespan)
|
||||
|
||||
|
||||
@app.exception_handler(Exception)
|
||||
async def unhandled_exception_handler(request: Request, exc: Exception):
|
||||
log.exception("Unhandled error on %s %s", request.method, request.url)
|
||||
return JSONResponse(status_code=500, content={"detail": str(exc)})
|
||||
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select
|
||||
|
||||
@@ -19,7 +18,11 @@ async def list_libraries(db: AsyncSession = Depends(get_db)):
|
||||
|
||||
|
||||
@router.post("", response_model=LibraryOut, status_code=201)
|
||||
async def create_library(body: LibraryCreate, db: AsyncSession = Depends(get_db)):
|
||||
async def create_library(
|
||||
body: LibraryCreate,
|
||||
background_tasks: BackgroundTasks,
|
||||
db: AsyncSession = Depends(get_db),
|
||||
):
|
||||
path = Path(body.path)
|
||||
if not path.is_dir():
|
||||
raise HTTPException(400, f"Path does not exist or is not a directory: {body.path}")
|
||||
@@ -30,19 +33,18 @@ async def create_library(body: LibraryCreate, db: AsyncSession = Depends(get_db)
|
||||
|
||||
lib = Library(name=body.name, path=str(path))
|
||||
db.add(lib)
|
||||
await db.flush()
|
||||
await db.refresh(lib)
|
||||
lib_id = lib.id
|
||||
lib_path = lib.path
|
||||
await db.commit()
|
||||
await db.refresh(lib)
|
||||
|
||||
await scanner.scan_library(lib_id, lib_path, db)
|
||||
watcher_service.start_watcher(lib_id, lib_path)
|
||||
# Scan runs in the background so the HTTP response returns immediately
|
||||
background_tasks.add_task(scanner.scan_library_background, lib.id, lib.path)
|
||||
watcher_service.start_watcher(lib.id, lib.path)
|
||||
return lib
|
||||
|
||||
async with db.begin():
|
||||
pass
|
||||
result = await db.execute(select(Library).where(Library.id == lib_id))
|
||||
return result.scalars().first()
|
||||
|
||||
@router.get("/{library_id}/scan-status")
|
||||
async def get_scan_status(library_id: int):
|
||||
return {"scanning": scanner.is_scanning(library_id)}
|
||||
|
||||
|
||||
@router.delete("/{library_id}", status_code=204)
|
||||
@@ -72,8 +74,6 @@ async def browse_library(library_id: int, path: str = "", db: AsyncSession = Dep
|
||||
if not target.is_dir():
|
||||
raise HTTPException(404, "Directory not found")
|
||||
|
||||
# Load all media items in this directory (non-recursive)
|
||||
rel_prefix = path.strip("/")
|
||||
items_result = await db.execute(
|
||||
select(MediaItem).where(
|
||||
MediaItem.library_id == library_id,
|
||||
@@ -84,19 +84,32 @@ async def browse_library(library_id: int, path: str = "", db: AsyncSession = Dep
|
||||
for item in items_result.scalars().all():
|
||||
db_items[item.rel_path] = item
|
||||
|
||||
from app.services.scanner import classify
|
||||
|
||||
entries: list[BrowseEntry] = []
|
||||
|
||||
for entry in sorted(target.iterdir(), key=lambda e: (e.is_file(), e.name.lower())):
|
||||
rel_entry = str(entry.relative_to(root))
|
||||
if entry.is_dir():
|
||||
entries.append(BrowseEntry(name=entry.name, type="dir", rel_path=rel_entry))
|
||||
elif entry.is_file() and rel_entry in db_items:
|
||||
item = db_items[rel_entry]
|
||||
entries.append(BrowseEntry(
|
||||
name=entry.name,
|
||||
type=item.media_type,
|
||||
rel_path=rel_entry,
|
||||
media_item_id=item.id,
|
||||
))
|
||||
elif entry.is_file():
|
||||
db_item = db_items.get(rel_entry)
|
||||
if db_item:
|
||||
entries.append(BrowseEntry(
|
||||
name=entry.name,
|
||||
type=db_item.media_type,
|
||||
rel_path=rel_entry,
|
||||
media_item_id=db_item.id,
|
||||
))
|
||||
else:
|
||||
# File exists on disk but scan hasn't indexed it yet; show by extension
|
||||
media_type = classify(entry)
|
||||
if media_type:
|
||||
entries.append(BrowseEntry(
|
||||
name=entry.name,
|
||||
type=media_type,
|
||||
rel_path=rel_entry,
|
||||
media_item_id=None,
|
||||
))
|
||||
|
||||
return BrowseResult(path=path, entries=entries)
|
||||
|
||||
@@ -1,9 +1,22 @@
|
||||
import asyncio
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy import select
|
||||
from app.models import MediaItem
|
||||
from app.database import SessionLocal
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_scanning: set[int] = set()
|
||||
|
||||
|
||||
def is_scanning(library_id: int) -> bool:
|
||||
return library_id in _scanning
|
||||
|
||||
|
||||
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".tiff", ".avif", ".heic"}
|
||||
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".mov", ".avi", ".webm", ".m4v", ".flv", ".wmv", ".ts"}
|
||||
@@ -26,58 +39,88 @@ def hash_file(path: Path) -> str:
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
async def scan_library_background(library_id: int, library_path: str) -> None:
|
||||
"""Run a full library scan in a fresh session. Safe to call as a background task."""
|
||||
_scanning.add(library_id)
|
||||
try:
|
||||
async with SessionLocal() as db:
|
||||
await _do_scan(library_id, library_path, db)
|
||||
except Exception:
|
||||
log.exception("Scan failed for library %d at %s", library_id, library_path)
|
||||
finally:
|
||||
_scanning.discard(library_id)
|
||||
|
||||
|
||||
async def scan_library(library_id: int, library_path: str, db: AsyncSession) -> None:
|
||||
await _do_scan(library_id, library_path, db)
|
||||
|
||||
|
||||
async def _do_scan(library_id: int, library_path: str, db: AsyncSession) -> None:
|
||||
root = Path(library_path)
|
||||
log.info("Starting scan for library %d at %s", library_id, library_path)
|
||||
|
||||
existing = await db.execute(
|
||||
select(MediaItem).where(MediaItem.library_id == library_id)
|
||||
)
|
||||
db_items = {item.rel_path: item for item in existing.scalars().all()}
|
||||
|
||||
seen_paths: set[str] = set()
|
||||
loop = asyncio.get_running_loop()
|
||||
total_dirs = 0
|
||||
|
||||
for file_path in root.rglob("*"):
|
||||
if not file_path.is_file():
|
||||
continue
|
||||
media_type = classify(file_path)
|
||||
if not media_type:
|
||||
continue
|
||||
for dirpath, dirnames, filenames in os.walk(library_path):
|
||||
dirnames[:] = sorted(d for d in dirnames if not d.startswith("."))
|
||||
dir = Path(dirpath)
|
||||
rel_dir = str(dir.relative_to(root)) if dir != root else "."
|
||||
found_in_dir = 0
|
||||
|
||||
rel = str(file_path.relative_to(root))
|
||||
seen_paths.add(rel)
|
||||
for filename in sorted(f for f in filenames if not f.startswith(".")):
|
||||
file_path = dir / filename
|
||||
media_type = classify(file_path)
|
||||
if not media_type:
|
||||
continue
|
||||
|
||||
if rel in db_items:
|
||||
item = db_items[rel]
|
||||
if item.missing:
|
||||
item.missing = False
|
||||
item.updated_at = datetime.utcnow()
|
||||
else:
|
||||
file_hash = hash_file(file_path)
|
||||
# Check if this hash matches an orphaned (missing) item — file was moved while offline
|
||||
moved = await _find_by_hash(library_id, file_hash, db)
|
||||
if moved:
|
||||
moved.rel_path = rel
|
||||
moved.filename = file_path.name
|
||||
moved.missing = False
|
||||
moved.updated_at = datetime.utcnow()
|
||||
rel = str(file_path.relative_to(root))
|
||||
seen_paths.add(rel)
|
||||
found_in_dir += 1
|
||||
|
||||
if rel in db_items:
|
||||
item = db_items[rel]
|
||||
if item.missing:
|
||||
item.missing = False
|
||||
item.updated_at = datetime.utcnow()
|
||||
else:
|
||||
item = MediaItem(
|
||||
library_id=library_id,
|
||||
rel_path=rel,
|
||||
filename=file_path.name,
|
||||
file_hash=file_hash,
|
||||
media_type=media_type,
|
||||
size_bytes=file_path.stat().st_size,
|
||||
missing=False,
|
||||
)
|
||||
db.add(item)
|
||||
file_hash = await loop.run_in_executor(None, hash_file, file_path)
|
||||
moved = await _find_by_hash(library_id, file_hash, db)
|
||||
if moved:
|
||||
moved.rel_path = rel
|
||||
moved.filename = file_path.name
|
||||
moved.missing = False
|
||||
moved.updated_at = datetime.utcnow()
|
||||
else:
|
||||
db.add(MediaItem(
|
||||
library_id=library_id,
|
||||
rel_path=rel,
|
||||
filename=file_path.name,
|
||||
file_hash=file_hash,
|
||||
media_type=media_type,
|
||||
size_bytes=file_path.stat().st_size,
|
||||
missing=False,
|
||||
))
|
||||
|
||||
log.info("Scanned directory %s — %d media file(s) found", rel_dir, found_in_dir)
|
||||
total_dirs += 1
|
||||
|
||||
# Mark items no longer on disk as missing
|
||||
for rel_path, item in db_items.items():
|
||||
if rel_path not in seen_paths and not item.missing:
|
||||
item.missing = True
|
||||
item.updated_at = datetime.utcnow()
|
||||
|
||||
await db.commit()
|
||||
log.info(
|
||||
"Scan complete for library %d — %d director%s, %d media file(s) indexed",
|
||||
library_id, total_dirs, "y" if total_dirs == 1 else "ies", len(seen_paths),
|
||||
)
|
||||
|
||||
|
||||
async def _find_by_hash(library_id: int, file_hash: str, db: AsyncSession) -> MediaItem | None:
|
||||
|
||||
@@ -111,12 +111,15 @@ class LibraryEventHandler(FileSystemEventHandler):
|
||||
def start_watcher(library_id: int, library_path: str):
|
||||
if library_id in _observers:
|
||||
return
|
||||
handler = LibraryEventHandler(library_id, library_path)
|
||||
observer = Observer()
|
||||
observer.schedule(handler, library_path, recursive=True)
|
||||
observer.start()
|
||||
_observers[library_id] = observer
|
||||
log.info("Started watcher for library %d at %s", library_id, library_path)
|
||||
try:
|
||||
handler = LibraryEventHandler(library_id, library_path)
|
||||
observer = Observer()
|
||||
observer.schedule(handler, library_path, recursive=True)
|
||||
observer.start()
|
||||
_observers[library_id] = observer
|
||||
log.info("Started watcher for library %d at %s", library_id, library_path)
|
||||
except Exception:
|
||||
log.exception("Failed to start watcher for library %d at %s", library_id, library_path)
|
||||
|
||||
|
||||
def stop_watcher(library_id: int):
|
||||
|
||||
Reference in New Issue
Block a user