Source code for app.models.processing_job

"""
ProcessingJob model — async parsing lifecycle tracker.
One per upload (or more if allow_reprocess=true).
"""
import uuid
from datetime import datetime
from typing import TYPE_CHECKING, Any

from sqlalchemy import (
    Boolean, DateTime, Enum as SAEnum, ForeignKey,
    Integer, SmallInteger, String, Text, func, JSON,
)
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship

from app.models.base import Base, UUIDPrimaryKeyMixin
from app.models.custom_types import IntegerArray
from app.models.enums import JobStatus

if TYPE_CHECKING:
    from app.models.document import Document
    from app.models.bank_statement import BankStatement
    from app.models.invoice import Invoice


[docs] class ProcessingJob(UUIDPrimaryKeyMixin, Base): __tablename__ = "processing_jobs" document_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), ForeignKey("documents.id", ondelete="CASCADE"), nullable=False ) status: Mapped[JobStatus] = mapped_column( SAEnum(JobStatus, name="job_status_enum"), nullable=False, default=JobStatus.PENDING, ) started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) # ── Error tracking ───────────────────────────────────────────────────────── error_message: Mapped[str | None] = mapped_column(Text, nullable=True) error_detail: Mapped[dict[str, Any] | None] = mapped_column( JSON, nullable=True, comment="Structured: {stage, field, raw_value, traceback}" ) # ── Retry tracking ───────────────────────────────────────────────────────── retry_count: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0) max_retries: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=3) # ── Non-fatal warnings ───────────────────────────────────────────────────── warnings: Mapped[list[dict] | None] = mapped_column( JSON, nullable=True, comment="Array: [{field, message}] — non-fatal issues during parsing" ) # ── Parser metadata ──────────────────────────────────────────────────────── parser_version: Mapped[str | None] = mapped_column(String(50), nullable=True) is_reprocess: Mapped[bool] = mapped_column( Boolean, nullable=False, default=False, comment="TRUE when triggered by allow_reprocess=true" ) # ── PDF-specific fields (from edge case analysis) ────────────────────────── pdf_password_used: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) pdf_encryption_type: Mapped[str | None] = mapped_column(String(50), nullable=True) ocr_used: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) ocr_engine: Mapped[str | None] = mapped_column(String(50), nullable=True) ocr_confidence_avg: Mapped[float | None] = mapped_column(nullable=True) scanned_pages: Mapped[list[int] | None] = mapped_column( IntegerArray, nullable=True, comment="Page numbers (1-indexed) that required OCR" ) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False ) # ── Relationships ────────────────────────────────────────────────────────── document: Mapped["Document"] = relationship("Document", back_populates="processing_jobs") bank_statement: Mapped["BankStatement | None"] = relationship( "BankStatement", back_populates="processing_job", uselist=False ) invoice: Mapped["Invoice | None"] = relationship( "Invoice", back_populates="processing_job", uselist=False ) def __repr__(self) -> str: return f"<ProcessingJob id={self.id} status={self.status} doc={self.document_id}>"