from __future__ import annotations

from datetime import datetime, timezone
from difflib import SequenceMatcher
import hashlib
import io
from pathlib import Path
import re
import sqlite3
from statistics import median
import unicodedata
import uuid
from zoneinfo import ZoneInfo

import fitz
from fastapi import HTTPException, UploadFile, status
import httpx
from PIL import Image
import pytesseract

from app.core.config import get_settings
from app.services.google_document_ai import process_document_with_document_ai
from app.services.google_workspace_drive import upload_binary_file_to_drive
from app.services.google_workspace_session import get_active_google_workspace_connection
from app.services.menu_asset_service import (
    extract_menu_asset_text,
    normalize_asset_name,
    normalize_text_block,
    resolve_asset_kind,
    resolve_asset_mime_type,
    truncate_text,
)
from app.services.tenant_store import FiscalDocumentRecord, SessionIdentity, get_tenant_store


_DATE_PATTERN = re.compile(r"\b(\d{2})[./-](\d{2})[./-](\d{2,4})\b")
_AMOUNT_PATTERN = re.compile(r"(?<!\d)(\d{1,3}(?:\.\d{3})*,\d{2}|\d+,\d{2})(?!\d)")
_DECIMAL_NUMBER_PATTERN = re.compile(r"(?<![\d/.-])(\d+(?:[.,]\d{2}))(?![\d/])")
_LEGAL_ENTITY_PATTERN = re.compile(r"\b(s\.?\s*r\.?\s*l\.?|s\.?\s*n\.?\s*c\.?|s\.?\s*a\.?\s*s\.?|s\.?\s*p\.?\s*a\.?)\b", re.IGNORECASE)
_STACKED_DELIVERY_LOT_CODES = {"ct", "cs", "fd", "bt", "pz", "bm", "kg", "lt"}
_LACONI_STRUCTURED_UNIT_CODES = {"FS", "CT", "BM", "BT", "CS", "FD", "PZ", "KG", "LT", "NR"}
_ITALIAN_TIMEZONE = ZoneInfo("Europe/Rome")
_DELIVERY_ITEM_NOISE_TOKENS = (
    "descrizione prodotto",
    "descrizione merce",
    "quantita",
    "bottiglie",
    "categoria",
    "cod. ean",
    "confezione",
    "in cartoni",
    "in casse",
    "giorno chiusura",
    "diritto fisso",
    "firma del destinatario",
    "dichiaro di aver ricevuto",
    "ai sensi dell'art.",
)
_MATCH_TOKEN_STOPWORDS = {
    "di",
    "del",
    "della",
    "delle",
    "dei",
    "and",
    "the",
    "al",
    "lt18",
    "kg10",
    "liit",
    "paper",
    "super",
}
_DISTINCTIVE_PRODUCT_TOKENS = {
    "anejo",
    "bianco",
    "blanco",
    "black",
    "blu",
    "blue",
    "dry",
    "gas",
    "gold",
    "gialla",
    "giallo",
    "green",
    "mini",
    "nat",
    "naturale",
    "plata",
    "pet",
    "reposado",
    "rose",
    "rosso",
    "sapphire",
    "silver",
    "verde",
    "vetro",
    "white",
}
_ORDER_RECONCILABLE_DOCUMENT_TYPES = {"delivery_note", "instant_invoice", "invoice"}
_MIN_FISCAL_ORDER_MATCH_SCORE = 0.35
_COMMON_PACK_SIZES = {2, 3, 4, 6, 12, 24}
_GENERIC_ORDER_SUPPLIER_NAMES = {
    "cash",
    "diretto",
    "diretta",
    "fornitore diretto",
    "fornitore generico",
    "generico",
    "varie",
    "vari",
}
_SUPPLIER_ALIAS_GROUPS = (
    {"el original", "25 holding"},
    {"eleven trade", "roby marton"},
    {"montenegro", "marcello reduzzi", "marcello reduzzi montenegro", "marecello reduzzi montenegro"},
)
_COMPACT_FISCAL_PRODUCT_PREFIXES = {"as", "bv", "dp", "es", "kg", "mc", "mp", "ru", "rur", "tz", "vo"}
_ELEVEN_UNIT_CODES = {"bt", "ct", "pz", "nr", "kg", "lt", "l", "cf", "crt"}
_OCR_LATIN_CONFUSABLES = str.maketrans(
    {
        "Α": "A",
        "А": "A",
        "Β": "B",
        "В": "B",
        "Ϲ": "C",
        "С": "C",
        "Ε": "E",
        "Е": "E",
        "Ζ": "Z",
        "Η": "H",
        "Н": "H",
        "Ι": "I",
        "І": "I",
        "Κ": "K",
        "К": "K",
        "Μ": "M",
        "М": "M",
        "Ν": "N",
        "Ο": "O",
        "О": "O",
        "Ρ": "P",
        "Р": "P",
        "Τ": "T",
        "Т": "T",
        "Υ": "Y",
        "У": "Y",
        "Χ": "X",
        "Х": "X",
        "α": "a",
        "а": "a",
        "β": "b",
        "в": "b",
        "ϲ": "c",
        "с": "c",
        "ε": "e",
        "е": "e",
        "ι": "i",
        "і": "i",
        "κ": "k",
        "м": "m",
        "ν": "n",
        "ο": "o",
        "о": "o",
        "ρ": "p",
        "р": "p",
        "τ": "t",
        "т": "t",
        "χ": "x",
        "х": "x",
    }
)


def _normalize_ocr_latin_confusables(value: str | None) -> str:
    return normalize_text_block(value or "").translate(_OCR_LATIN_CONFUSABLES)


def _has_legal_reference_context(value: str | None) -> bool:
    normalized = normalize_text_block(value or "").casefold()
    return any(token in normalized for token in ("d.p.r", "dpr", "decreto", "art.", "articolo", "legge"))


def _parse_euro_amount(value: str) -> float | None:
    raw_value = normalize_text_block(value)
    if not raw_value:
        return None
    if "," in raw_value:
        normalized = raw_value.replace(".", "").replace(",", ".").strip()
    else:
        normalized = raw_value.strip()
    try:
        parsed = float(normalized)
    except ValueError:
        return None
    return parsed if parsed >= 0 else None


def _parse_iso_or_italian_date(value: str | None) -> str | None:
    raw_value = normalize_text_block(value or "")
    if not raw_value:
        return None
    for pattern in ("%Y-%m-%d", "%d/%m/%Y", "%d-%m-%Y", "%d/%m/%y", "%d-%m-%y"):
        try:
            return datetime.strptime(raw_value, pattern).date().isoformat()
        except ValueError:
            continue
    return None


def _is_plausible_fiscal_document_date(value: str | None) -> bool:
    parsed = _parse_iso_or_italian_date(value)
    if not parsed:
        return False
    try:
        parsed_date = datetime.fromisoformat(parsed).date()
    except ValueError:
        return False
    current_year = datetime.now(_ITALIAN_TIMEZONE).year
    return 2000 <= parsed_date.year <= current_year + 1


def _parse_plausible_italian_document_date(value: str | None, *, context: str = "") -> str | None:
    raw_value = normalize_text_block(value or "")
    if not raw_value:
        return None
    if _has_legal_reference_context(f"{context} {raw_value}"):
        return None
    parsed = _parse_italian_date_match(raw_value)
    if parsed and _is_plausible_fiscal_document_date(parsed):
        return parsed
    return None


def _parse_italian_date_match(value: str | None) -> str | None:
    raw_value = normalize_text_block(value or "")
    if not raw_value:
        return None
    match = _DATE_PATTERN.search(raw_value)
    if not match:
        return None
    day, month, year = match.groups()
    try:
        year_value = int(year)
        if year_value < 100:
            year_value += 2000
        parsed = datetime(year_value, int(month), int(day)).date()
    except ValueError:
        return None
    return parsed.isoformat()


def _extract_amounts_from_line(line: str) -> list[float]:
    amounts: list[float] = []
    for match in _AMOUNT_PATTERN.findall(line):
        parsed = _parse_euro_amount(match)
        if parsed is not None:
            amounts.append(parsed)
    return amounts


def _coerce_positive_document_float(value: object | None) -> float | None:
    if value is None:
        return None
    if isinstance(value, bool):
        return None
    if isinstance(value, (int, float)):
        parsed = float(value)
    else:
        parsed = _parse_euro_amount(str(value))
        if parsed is None:
            return None
    return parsed if parsed > 0 else None


def _amounts_without_trailing_vat_rate(raw_row_text: str) -> list[float]:
    amounts = _extract_amounts_from_line(raw_row_text)
    normalized = normalize_text_block(raw_row_text)
    if (
        len(amounts) >= 4
        and re.search(r"(?:^|[\s|])(?:0[,.]00\s+)?(?:4|04|10|22)[,.]00\s*$", normalized)
    ):
        return amounts[:-1]
    return amounts


def _infer_document_line_pricing_from_raw_row(item: dict[str, object]) -> tuple[float | None, float | None]:
    raw_row_text = normalize_text_block(str(item.get("raw_row_text") or ""))
    if not raw_row_text:
        return (None, None)

    amounts = _amounts_without_trailing_vat_rate(raw_row_text)
    positive_amounts = [amount for amount in amounts if amount > 0]
    if not positive_amounts:
        return (None, None)

    line_total = positive_amounts[-1]
    quantity_values = [
        value
        for value in (
            _coerce_positive_document_float(item.get("pack_count")),
            _coerce_positive_document_float(item.get("quantity")),
            _coerce_positive_document_float(item.get("gross_quantity")),
            _coerce_positive_document_float(item.get("tare_quantity")),
            _coerce_positive_document_float(item.get("net_quantity")),
        )
        if value is not None
    ]
    unit_price_amounts: list[float] | None = None
    if "|" in raw_row_text and quantity_values:
        cells = [normalize_text_block(cell) for cell in re.split(r"\s*\|\s*", raw_row_text) if normalize_text_block(cell)]
        quantity_cell_index = None
        for cell_index, cell in enumerate(cells):
            quantity_candidate = _parse_euro_amount(re.sub(r"\s*[xX]\s*$", "", cell).strip())
            if quantity_candidate is None:
                continue
            if any(abs(quantity_candidate - quantity) <= 0.001 for quantity in quantity_values):
                quantity_cell_index = cell_index
                break
        if quantity_cell_index is not None and quantity_cell_index + 1 < len(cells):
            unit_price_amounts = _amounts_without_trailing_vat_rate(" | ".join(cells[quantity_cell_index + 1 :]))

    unit_candidates = unit_price_amounts if unit_price_amounts is not None else positive_amounts[:-1]
    unit_price = next(
        (
            amount
            for amount in unit_candidates
            if not any(abs(amount - quantity) <= 0.001 for quantity in quantity_values)
            and abs(amount - line_total) > 0.001
        ),
        None,
    )
    if unit_price is None and len(positive_amounts) == 1:
        unit_price = positive_amounts[0]
    return (unit_price, line_total)


def _document_line_pricing(item: dict[str, object]) -> tuple[float | None, float | None]:
    unit_price = _coerce_positive_document_float(item.get("unit_price"))
    line_total = _coerce_positive_document_float(item.get("line_total"))
    inferred_unit_price, inferred_line_total = _infer_document_line_pricing_from_raw_row(item)
    if unit_price is None:
        unit_price = inferred_unit_price
    if line_total is None:
        line_total = inferred_line_total
    return (unit_price, line_total)


def _enrich_document_item_pricing(item: dict[str, object]) -> dict[str, object]:
    enriched = dict(item)
    unit_price, line_total = _document_line_pricing(enriched)
    if _coerce_positive_document_float(enriched.get("unit_price")) is None and unit_price is not None:
        enriched["unit_price"] = round(unit_price, 6)
    if _coerce_positive_document_float(enriched.get("line_total")) is None and line_total is not None:
        enriched["line_total"] = round(line_total, 6)
    return enriched


def _document_vat_rate_from_code(value: object | None) -> float | None:
    if value is None:
        return None
    match = re.search(r"\b(4|04|10|22)\b", str(value))
    if not match:
        return None
    try:
        return float(int(match.group(1)))
    except ValueError:
        return None


def _looks_like_document_number(value: str | None) -> bool:
    candidate = normalize_text_block(value or "").strip(" .:-/#")
    if len(candidate) < 3:
        return False
    if "@" in candidate:
        return False
    if _parse_italian_date_match(candidate):
        return False
    if not re.fullmatch(r"[A-Z0-9./-]{3,}", candidate, re.IGNORECASE):
        return False
    return bool(re.search(r"\d", candidate))


def _is_noise_delivery_item(description: str | None, raw_row_text: str | None = None) -> bool:
    normalized_description = normalize_text_block(description or "").casefold()
    normalized_row = normalize_text_block(raw_row_text or "").casefold()
    if not normalized_description:
        return True
    compact_product_match = re.match(r"^([a-z]{2,3})\b", normalized_description)
    is_compact_product_description = (
        compact_product_match is not None
        and compact_product_match.group(1) in _COMPACT_FISCAL_PRODUCT_PREFIXES
        and re.search(r"\b\d{6,}\b", normalized_row) is not None
    )
    if (
        not is_compact_product_description
        and not re.search(r"[a-zà-ÿ]{3,}", normalized_description)
        and not re.search(r"\bco2\b", normalized_description)
    ):
        return True
    if any(token in normalized_description for token in _DELIVERY_ITEM_NOISE_TOKENS):
        return True
    if any(token in normalized_row for token in ("dichiaro di aver ricevuto", "ai sensi dell'art.", "firma del destinatario")):
        return True
    return False


def _clean_delivery_items(items: list[dict[str, object]]) -> list[dict[str, object]]:
    cleaned: list[dict[str, object]] = []
    for item in items:
        if not isinstance(item, dict):
            continue
        description = str(item.get("description") or "")
        raw_row_text = str(item.get("raw_row_text") or "")
        if _is_noise_delivery_item(description, raw_row_text):
            continue
        cleaned.append(item)
    return cleaned


def _looks_like_delivery_description_line(value: str | None) -> bool:
    normalized = normalize_text_block(value or "")
    if not normalized:
        return False
    if not re.search(r"[A-Za-zÀ-ÿ]{3,}", normalized) and not re.search(r"\bco2\b", normalized, re.IGNORECASE):
        return False
    lowered = normalized.casefold()
    if any(token in lowered for token in ("imposta di bollo", "riepiloghi iva", "totale documento", "totale a pagare", "annotazioni")):
        return False
    return True


def _looks_like_stacked_delivery_code(token: str | None) -> bool:
    normalized = normalize_text_block(token or "").upper()
    if not normalized:
        return False
    if normalized in {"C.I.", "C.I", "CI", "SS", "FS", "BT", "CT", "FD", "CS", "BM", "OCT", "0"}:
        return False
    if not re.fullmatch(r"[$A-Z0-9]+", normalized):
        return False
    if not re.search(r"\d", normalized):
        return False
    if re.fullmatch(r"\d{1,3}", normalized):
        return False
    return len(normalized) <= 12


def _normalize_stacked_delivery_lot_code(value: str | None) -> str | None:
    normalized = normalize_text_block(value or "").upper()
    if not normalized:
        return None
    alias_map = {
        "OCT": "CT",
        "0CT": "CT",
        "6CS": "CS",
        "Θ": None,
        "ও": None,
    }
    if normalized in alias_map:
        return alias_map[normalized]
    if normalized.casefold() in _STACKED_DELIVERY_LOT_CODES:
        return normalized
    suffix_match = re.search(r"(CT|CS|FD|BT|BM|KG|LT|PZ|FS)$", normalized)
    if suffix_match:
        return suffix_match.group(1)
    return None


def _split_laconi_structured_parts(line: str) -> list[str]:
    return [
        _normalize_ocr_latin_confusables(part)
        for part in re.split(r"\s*\|\s*", line)
        if _normalize_ocr_latin_confusables(part)
    ]


def _looks_like_laconi_structured_code(value: str | None) -> bool:
    return _looks_like_stacked_delivery_code(_normalize_ocr_latin_confusables(value))


def _normalize_laconi_structured_parts(parts: list[str]) -> list[str]:
    if not parts or _looks_like_laconi_structured_code(parts[0]):
        return parts
    first_part = _normalize_ocr_latin_confusables(parts[0])
    match = re.match(r"^(\$?[A-Z0-9]{2,12})\s+(.+)$", first_part, re.IGNORECASE)
    if not match:
        return parts
    code = match.group(1).upper()
    description = _normalize_ocr_latin_confusables(match.group(2))
    if not _looks_like_laconi_structured_code(code) or not re.search(r"[A-Za-zÀ-ÿ]{3,}", description):
        return parts
    return [code, description, *parts[1:]]


def _split_laconi_unit_token(value: str | None) -> tuple[str | None, str]:
    normalized = _normalize_ocr_latin_confusables(value)
    if not normalized:
        return (None, "")
    match = re.match(r"^(FS|CT|BM|BT|CS|FD|PZ|KG|LT|NR)\b\s*(.*)$", normalized, re.IGNORECASE)
    if not match:
        return (None, normalized)
    return (match.group(1).upper(), normalize_text_block(match.group(2)))


def _parse_laconi_quantity_expression(value: str | None) -> tuple[float | None, float | None]:
    normalized = _normalize_ocr_latin_confusables(value)
    if not normalized:
        return (None, None)
    match = re.search(r"(\d+(?:[.,]\d+)?)\s*[xX]\s*(\d+(?:[.,]\d+)?)?", normalized)
    if not match:
        return (None, None)
    pack_count = _parse_euro_amount(match.group(1))
    unit_multiplier = _parse_euro_amount(match.group(2)) if match.group(2) else None
    return (pack_count, unit_multiplier)


def _laconi_decimal_places(value: str) -> int:
    normalized = _normalize_ocr_latin_confusables(value)
    if "," in normalized:
        return len(normalized.rsplit(",", 1)[-1])
    if "." in normalized:
        return len(normalized.rsplit(".", 1)[-1])
    return 0


def _looks_like_laconi_detail_start(value: str | None) -> bool:
    normalized = _normalize_ocr_latin_confusables(value)
    if not normalized:
        return False
    unit_code, unit_rest = _split_laconi_unit_token(normalized)
    if unit_code is not None:
        return True
    if re.fullmatch(r"\d+(?:[.,]\d+)?\s*[xX]\s*(?:\d+(?:[.,]\d+)?)?", normalized):
        return True
    if _extract_decimal_tokens(unit_rest or normalized) and not re.search(r"[A-Za-zÀ-ÿ]{3,}", normalized):
        return True
    return False


def _parse_laconi_structured_item_parts(parts: list[str], line_index: int) -> dict[str, object] | None:
    if len(parts) < 2 or not _looks_like_laconi_structured_code(parts[0]):
        return None

    description_parts: list[str] = []
    detail_start = len(parts)
    for index, part in enumerate(parts[1:], start=1):
        if _looks_like_laconi_detail_start(part):
            detail_start = index
            break
        description_parts.append(part)

    description = _normalize_ocr_latin_confusables(" ".join(description_parts))
    if not description or _is_noise_delivery_item(description):
        return None

    detail_parts = parts[detail_start:]
    unit_code: str | None = None
    pack_count: float | None = None
    unit_multiplier: float | None = None
    pending_multiplier = False
    decimal_tokens: list[str] = []

    for raw_part in detail_parts:
        part = _normalize_ocr_latin_confusables(raw_part)
        if not part:
            continue
        current_unit_code, unit_rest = _split_laconi_unit_token(part)
        if current_unit_code is not None:
            unit_code = current_unit_code
            part = unit_rest
            if not part:
                continue

        current_pack_count, current_multiplier = _parse_laconi_quantity_expression(part)
        if current_pack_count is not None:
            pack_count = current_pack_count
            unit_multiplier = current_multiplier
            pending_multiplier = current_multiplier is None
            continue

        if pending_multiplier and re.fullmatch(r"\d+", part):
            parsed_multiplier = _parse_euro_amount(part)
            if parsed_multiplier is not None:
                unit_multiplier = parsed_multiplier
                pending_multiplier = False
                continue

        decimal_tokens.extend(_extract_decimal_tokens(part))

    parsed_decimals = [
        parsed
        for token in decimal_tokens
        if (parsed := _parse_euro_amount(token)) is not None
    ]
    high_precision_prices = [
        parsed
        for token, parsed in zip(decimal_tokens, parsed_decimals)
        if _laconi_decimal_places(token) >= 3
    ]
    unit_price = high_precision_prices[0] if high_precision_prices else (parsed_decimals[0] if len(parsed_decimals) >= 2 else None)
    line_total = parsed_decimals[-1] if parsed_decimals else None

    gross_quantity = None
    if pack_count is not None and unit_multiplier is not None:
        gross_quantity = pack_count * unit_multiplier
    elif pack_count is not None:
        gross_quantity = pack_count

    return {
        "line_index": line_index,
        "product_code": parts[0],
        "iso_code": None,
        "description": description,
        "category_code": None,
        "unit_code": unit_code,
        "pack_count": pack_count,
        "quantity": pack_count,
        "gross_quantity": gross_quantity,
        "tare_quantity": None,
        "net_quantity": pack_count,
        "unit_price": unit_price,
        "line_total": line_total,
        "vat_code": None,
        "raw_row_text": normalize_text_block(" | ".join(parts)),
    }


def _laconi_structured_row_needs_continuation(parts: list[str]) -> bool:
    parsed = _parse_laconi_structured_item_parts(parts, -1)
    if parsed is None:
        return False
    line_total = _coerce_positive_document_float(parsed.get("line_total"))
    unit_price = _coerce_positive_document_float(parsed.get("unit_price"))
    gross_quantity = _coerce_positive_document_float(parsed.get("gross_quantity"))
    if line_total is None:
        return True
    if gross_quantity is None or gross_quantity <= 1:
        return False
    if unit_price is not None:
        expected_total = unit_price * gross_quantity
        if expected_total > 0 and line_total < max(unit_price * 1.5, expected_total * 0.2):
            return True
    return line_total <= 20 and gross_quantity >= 10


def _extract_laconi_delivery_note_items_from_preview_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    header_indices: list[int] = []
    for index, line in enumerate(lines):
        normalized = _normalize_match_text(line)
        if "c art" in normalized and "descrizione" in normalized and "quantita" in normalized and "prezzo" in normalized:
            header_indices.append(index)
    if not header_indices:
        return _extract_simple_delivery_note_items_from_text(text)

    stop_tokens = (
        "totale merce",
        "totale documento",
        "totale a pagare",
        "imposta di bollo",
        "riepiloghi iva",
        "riepiloghiiva",
        "data inizio trasporto",
        "causale del trasporto",
        "firma del destinatario",
    )
    structured_rows: list[list[str]] = []
    seen_raw_rows: set[str] = set()
    for section_index, header_index in enumerate(header_indices):
        next_header_index = header_indices[section_index + 1] if section_index + 1 < len(header_indices) else len(lines)
        body = lines[header_index + 1 : next_header_index]
        i = 0
        while i < len(body):
            line = body[i]
            normalized = line.casefold()
            if any(token in normalized for token in stop_tokens):
                break
            parts = _normalize_laconi_structured_parts(_split_laconi_structured_parts(line))
            if not parts:
                i += 1
                continue
            if not _looks_like_laconi_structured_code(parts[0]):
                for code_index in range(1, min(len(parts) - 1, 4)):
                    if (
                        _looks_like_laconi_structured_code(parts[code_index])
                        and any(_looks_like_laconi_detail_start(part) for part in parts[code_index + 1 :])
                    ):
                        parts = parts[code_index:]
                        break

            if _looks_like_laconi_structured_code(parts[0]):
                if _laconi_structured_row_needs_continuation(parts):
                    lookahead_index = i + 1
                    while lookahead_index < len(body):
                        next_line = body[lookahead_index]
                        next_normalized = next_line.casefold()
                        if any(token in next_normalized for token in stop_tokens):
                            break
                        next_parts = _normalize_laconi_structured_parts(_split_laconi_structured_parts(next_line))
                        if not next_parts:
                            lookahead_index += 1
                            continue
                        next_text = normalize_text_block(" ".join(next_parts))
                        if len(re.findall(r"[A-Za-zÀ-ÿ]{3,}", next_text)) >= 2:
                            break
                        if _looks_like_laconi_structured_code(next_parts[0]):
                            break
                        if any(
                            _looks_like_laconi_structured_code(next_parts[code_index])
                            for code_index in range(1, min(len(next_parts), 4))
                        ):
                            break
                        if not any(_extract_decimal_tokens(part) for part in next_parts):
                            break
                        for continuation_part in next_parts:
                            if continuation_part not in parts:
                                parts.append(continuation_part)
                        i = lookahead_index
                        if not _laconi_structured_row_needs_continuation(parts):
                            break
                        lookahead_index += 1

                raw_row = normalize_text_block(" | ".join(parts)).casefold()
                if raw_row not in seen_raw_rows:
                    structured_rows.append(parts)
                    seen_raw_rows.add(raw_row)
                i += 1
                continue

            next_parts = (
                _normalize_laconi_structured_parts(_split_laconi_structured_parts(body[i + 1]))
                if i + 1 < len(body)
                else []
            )
            if (
                next_parts
                and _looks_like_laconi_structured_code(next_parts[0])
                and any(_split_laconi_unit_token(part)[0] is not None for part in next_parts[1:5])
                and any(_extract_decimal_tokens(part) for part in parts)
                and len(re.findall(r"[A-Za-zÀ-ÿ]{3,}", normalize_text_block(" ".join(parts)))) < 2
            ):
                merged = [next_parts[0], *next_parts[1:], *parts]
                raw_row = normalize_text_block(" | ".join(merged)).casefold()
                if raw_row not in seen_raw_rows:
                    structured_rows.append(merged)
                    seen_raw_rows.add(raw_row)
                i += 2
                continue

            if (
                len(parts) >= 2
                and next_parts
                and _looks_like_laconi_structured_code(next_parts[0])
                and len(next_parts) >= 2
                and _split_laconi_unit_token(next_parts[1])[0] is not None
            ):
                merged = [next_parts[0], parts[0], *next_parts[1:]]
                for trailing_part in parts[1:]:
                    if _extract_decimal_tokens(trailing_part) and trailing_part not in merged:
                        merged.append(trailing_part)
                raw_row = normalize_text_block(" | ".join(merged)).casefold()
                if raw_row not in seen_raw_rows:
                    structured_rows.append(merged)
                    seen_raw_rows.add(raw_row)
                i += 2
                continue

            i += 1

    items: list[dict[str, object]] = []
    for row_parts in structured_rows:
        parsed = _parse_laconi_structured_item_parts(_normalize_laconi_structured_parts(row_parts), len(items))
        if parsed is not None:
            items.append(parsed)
    cleaned_items = _clean_delivery_items(items)
    if cleaned_items:
        return cleaned_items
    return _extract_simple_delivery_note_items_from_text(text)


def _is_quantity_header_cell(normalized_cell: str) -> bool:
    compact = re.sub(r"\s+", "", normalized_cell)
    return (
        "quantita" in normalized_cell
        or normalized_cell in {"qta", "qt"}
        or compact.startswith("qta")
    )


def _parse_delivery_quantity_line(value: str | None) -> float | None:
    normalized = normalize_text_block(value or "")
    if not normalized:
        return None
    match = re.match(r"^(\d+(?:[.,]\d+)?)\s*(.*)$", normalized, re.IGNORECASE)
    if not match:
        return None
    trailing = normalize_text_block(match.group(2)).casefold().strip(" .")
    allowed_trailing_tokens = {
        "",
        "n d",
        "nd",
        "nr",
        "pz",
        "ct",
        "bt",
        "cs",
        "cartone",
        "cartoni",
        "bottiglia",
        "bottiglie",
        "lattina",
        "lattine",
        "colli",
        "kg",
        "lt",
        "ml",
    }
    trailing = re.sub(r"[^a-z0-9]+", " ", trailing).strip()
    if trailing not in allowed_trailing_tokens:
        return None
    quantity = _parse_euro_amount(match.group(1))
    return quantity if quantity is not None and quantity > 0 else None


def _extract_description_quantity_delivery_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    header_index: int | None = None
    for index, line in enumerate(lines):
        if "descrizione" not in line.casefold():
            continue
        nearby = " ".join(lines[index : min(index + 4, len(lines))]).casefold()
        if "descrizione" in nearby and "quantit" in nearby:
            header_index = index
            break
    if header_index is None:
        return []

    stop_tokens = (
        "peso",
        "colli",
        "bancali",
        "causale trasporto",
        "causale del trasporto",
        "porto",
        "vettore",
        "firma incaricato",
        "firma destinatario",
        "firma del destinatario",
        "note",
        "totale documento",
        "totale merce",
    )
    header_tokens = ("descrizione", "quantit")
    pending_description: str | None = None
    queued_descriptions: list[str] = []
    queued_quantities: list[float] = []
    items: list[dict[str, object]] = []

    def append_item(description: str, quantity: float) -> None:
        cleaned_description = normalize_text_block(description)
        if quantity <= 0 or _is_noise_delivery_item(cleaned_description, cleaned_description):
            return
        items.append(
            {
                "line_index": len(items),
                "product_code": None,
                "iso_code": None,
                "description": cleaned_description,
                "category_code": None,
                "unit_code": None,
                "pack_count": quantity,
                "quantity": quantity,
                "gross_quantity": quantity,
                "tare_quantity": None,
                "net_quantity": quantity,
                "unit_price": None,
                "line_total": None,
                "vat_code": None,
                "raw_row_text": f"{cleaned_description} | {format(quantity, 'g')}",
            }
        )

    for line in lines[header_index + 1 :]:
        normalized = normalize_text_block(line)
        lowered = normalized.casefold()
        if any(token in lowered for token in stop_tokens):
            break
        if any(token in lowered for token in header_tokens) and len(normalized) <= 24:
            continue

        quantity = _parse_delivery_quantity_line(normalized)
        if quantity is not None:
            if pending_description:
                append_item(pending_description, quantity)
                pending_description = None
            elif queued_descriptions:
                append_item(queued_descriptions.pop(0), quantity)
            else:
                queued_quantities.append(quantity)
            continue

        if not _looks_like_delivery_description_line(normalized):
            continue
        if queued_quantities:
            append_item(normalized, queued_quantities.pop(0))
            continue
        if pending_description is None and not queued_descriptions:
            pending_description = normalized
            continue
        if pending_description is not None:
            queued_descriptions.append(pending_description)
            pending_description = None
        queued_descriptions.append(normalized)

    return _clean_delivery_items(items)


def _extract_simple_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    header_index: int | None = None
    header_cells: list[str] = []
    for index, line in enumerate(lines):
        cells = [normalize_text_block(part) for part in re.split(r"\s*\|\s*", line) if normalize_text_block(part)]
        normalized_cells = [_normalize_match_text(cell) for cell in cells]
        if (
            len(cells) >= 2
            and any("descrizione" in cell for cell in normalized_cells)
            and any(_is_quantity_header_cell(cell) for cell in normalized_cells)
        ):
            header_index = index
            header_cells = cells
            break
    if header_index is None:
        return _extract_description_quantity_delivery_items_from_text(text)

    normalized_header = [_normalize_match_text(cell) for cell in header_cells]
    description_index = next((index for index, cell in enumerate(normalized_header) if "descrizione" in cell), 1)
    quantity_index = next(
        (
            index
            for index, cell in enumerate(normalized_header)
            if _is_quantity_header_cell(cell)
        ),
        len(header_cells) - 1,
    )
    code_index = next((index for index, cell in enumerate(normalized_header) if "codice" in cell or cell == "cod"), 0)
    unit_index = next((index for index, cell in enumerate(normalized_header) if cell in {"um", "u m", "unita"}), None)
    stop_tokens = (
        "causale del trasporto",
        "data e firma",
        "firma del destinatario",
        "numero di colli",
        "peso",
        "totale documento",
        "totale merce",
        "trasporto a mezzo",
    )

    items: list[dict[str, object]] = []
    for line in lines[header_index + 1 :]:
        normalized = normalize_text_block(line).casefold()
        if any(token in normalized for token in stop_tokens):
            break
        if "|" not in line:
            continue

        cells = [normalize_text_block(part) for part in re.split(r"\s*\|\s*", line) if normalize_text_block(part)]
        if len(cells) < 3:
            continue

        product_code = cells[code_index] if code_index < len(cells) and code_index != description_index else None
        description = cells[description_index] if description_index < len(cells) else ""
        quantity: float | None = None
        merged_description_match = re.match(r"^([A-Z0-9]{4,})\s+(.+)$", cells[0], re.IGNORECASE)
        if merged_description_match and len(cells) > 1:
            merged_quantity = _parse_euro_amount(cells[1])
            merged_description = normalize_text_block(merged_description_match.group(2))
            if merged_quantity is not None and re.search(r"[A-Za-zÀ-ÿ]{2,}", merged_description):
                product_code = merged_description_match.group(1)
                description = merged_description
                quantity = merged_quantity
                quantity_cell_index = 1

        quantity_cell = cells[quantity_index] if quantity is None and quantity_index < len(cells) else ""
        if quantity is None:
            quantity = _parse_euro_amount(quantity_cell) if quantity_cell else None
        if quantity is None:
            numeric_cells: list[tuple[int, float]] = []
            for cell_index, cell in enumerate(cells):
                parsed = _parse_euro_amount(cell)
                if parsed is not None:
                    numeric_cells.append((cell_index, parsed))
            if not numeric_cells:
                continue
            quantity_cell_index, quantity = numeric_cells[-1]
        else:
            quantity_cell_index = quantity_index

        if quantity is None or quantity <= 0:
            continue

        if not description and quantity_cell_index > 1:
            description = " ".join(cells[1:quantity_cell_index])
        if _is_noise_delivery_item(description, line):
            continue

        unit_code = None
        if unit_index is not None and unit_index < len(cells):
            candidate_unit = cells[unit_index]
            if _parse_euro_amount(candidate_unit) is None:
                unit_code = candidate_unit

        items.append(
            {
                "line_index": len(items),
                "product_code": product_code,
                "iso_code": None,
                "description": description,
                "category_code": None,
                "unit_code": unit_code,
                "pack_count": quantity,
                "quantity": quantity,
                "gross_quantity": quantity,
                "tare_quantity": None,
                "net_quantity": quantity,
                "unit_price": None,
                "line_total": None,
                "vat_code": None,
                "raw_row_text": normalize_text_block(line),
            }
        )
    cleaned_items = _clean_delivery_items(items)
    if cleaned_items:
        return cleaned_items
    return _extract_description_quantity_delivery_items_from_text(text)


def _extract_decimal_tokens(value: str) -> list[str]:
    return re.findall(r"\d{1,3}(?:\.\d{3})+,\d{2,4}|\d+(?:[.,]\d{2,4})", value)


def _document_ai_entity_text(entity: dict[str, object]) -> str:
    text_anchor = entity.get("textAnchor") if isinstance(entity.get("textAnchor"), dict) else {}
    if isinstance(text_anchor, dict):
        anchored = normalize_text_block(str(text_anchor.get("content") or ""))
        if anchored:
            return anchored
    mention_text = normalize_text_block(str(entity.get("mentionText") or ""))
    return mention_text


def _document_ai_entity_normalized_text(entity: dict[str, object]) -> str:
    normalized = entity.get("normalizedValue") if isinstance(entity.get("normalizedValue"), dict) else {}
    if not isinstance(normalized, dict):
        return ""
    normalized_text = normalize_text_block(str(normalized.get("text") or ""))
    if normalized_text:
        return normalized_text
    date_value = normalized.get("dateValue") if isinstance(normalized.get("dateValue"), dict) else {}
    if isinstance(date_value, dict):
        year = int(date_value.get("year") or 0)
        month = int(date_value.get("month") or 0)
        day = int(date_value.get("day") or 0)
        if year and month and day:
            try:
                return datetime(year, month, day).date().isoformat()
            except ValueError:
                pass
    money_value = normalized.get("moneyValue") if isinstance(normalized.get("moneyValue"), dict) else {}
    if isinstance(money_value, dict):
        units = int(money_value.get("units") or 0)
        nanos = int(money_value.get("nanos") or 0)
        amount = units + (nanos / 1_000_000_000)
        if amount:
            return f"{amount:.2f}"
    return ""


def _document_ai_anchor_text(anchor: dict[str, object], document_text: str) -> str:
    if not isinstance(anchor, dict):
        return ""
    segments = anchor.get("textSegments") if isinstance(anchor.get("textSegments"), list) else []
    if not isinstance(segments, list):
        return ""
    parts: list[str] = []
    for segment in segments:
        if not isinstance(segment, dict):
            continue
        start = int(segment.get("startIndex") or 0)
        end = int(segment.get("endIndex") or 0)
        if end <= start:
            continue
        parts.append(document_text[start:end])
    return normalize_text_block(" ".join(parts))


def _normalized_poly_bounds(layout: dict[str, object]) -> tuple[float, float, float, float]:
    poly = layout.get("boundingPoly") if isinstance(layout.get("boundingPoly"), dict) else {}
    vertices = []
    if isinstance(poly, dict):
        vertices = poly.get("normalizedVertices") if isinstance(poly.get("normalizedVertices"), list) else []
        if not vertices:
            vertices = poly.get("vertices") if isinstance(poly.get("vertices"), list) else []
    xs = [float(vertex.get("x") or 0.0) for vertex in vertices if isinstance(vertex, dict)]
    ys = [float(vertex.get("y") or 0.0) for vertex in vertices if isinstance(vertex, dict)]
    if not xs or not ys:
        return (0.0, 0.0, 0.0, 0.0)
    return (min(xs), min(ys), max(xs), max(ys))


def _extract_google_ocr_rows(document: dict[str, object]) -> list[dict[str, object]]:
    document_text = str(document.get("text") or "")
    pages = document.get("pages") if isinstance(document.get("pages"), list) else []
    if not document_text or not isinstance(pages, list):
        return []

    segments: list[dict[str, object]] = []
    for page_index, page in enumerate(pages):
        if not isinstance(page, dict):
            continue
        for line in page.get("lines") if isinstance(page.get("lines"), list) else []:
            if not isinstance(line, dict):
                continue
            layout = line.get("layout") if isinstance(line.get("layout"), dict) else {}
            if not isinstance(layout, dict):
                continue
            text = _document_ai_anchor_text(layout.get("textAnchor") if isinstance(layout.get("textAnchor"), dict) else {}, document_text)
            if not text:
                continue
            x0, y0, x1, y1 = _normalized_poly_bounds(layout)
            segments.append(
                {
                    "page_index": page_index,
                    "text": text,
                    "x0": x0,
                    "y0": y0,
                    "x1": x1,
                    "y1": y1,
                }
            )

    if not segments:
        return []

    heights = [max(0.0, float(segment["y1"]) - float(segment["y0"])) for segment in segments]
    row_threshold = max(0.0045, min(0.012, (median(heights) if heights else 0.008) * 0.75))

    rows: list[dict[str, object]] = []
    for segment in sorted(segments, key=lambda item: (int(item["page_index"]), float(item["y0"]), float(item["x0"]))):
        if (
            not rows
            or int(segment["page_index"]) != int(rows[-1]["page_index"])
            or abs(float(segment["y0"]) - float(rows[-1]["y0"])) > row_threshold
        ):
            rows.append(
                {
                    "page_index": int(segment["page_index"]),
                    "y0": float(segment["y0"]),
                    "segments": [segment],
                }
            )
            continue
        rows[-1]["segments"].append(segment)
        rows[-1]["y0"] = min(float(rows[-1]["y0"]), float(segment["y0"]))

    for row in rows:
        row_segments = sorted(
            [segment for segment in row.get("segments", []) if isinstance(segment, dict)],
            key=lambda item: float(item["x0"]),
        )
        row["segments"] = row_segments
        row["text"] = " | ".join(str(segment["text"]) for segment in row_segments)
    return rows


def _build_google_ocr_preview_text(document: dict[str, object]) -> str:
    rows = _extract_google_ocr_rows(document)
    if not rows:
        return ""
    return "\n".join(
        normalize_text_block(str(row.get("text") or ""))
        for row in rows
        if normalize_text_block(str(row.get("text") or ""))
    )


def _extract_delivery_header_supplier_name(text: str) -> str | None:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return None

    ignored_tokens = ("p.iva", "telefono", "piazza", "via ", "viale", "loc.", "capitale sociale", "registro delle imprese")
    markers = ("documento di trasporto", "d.d.t", "ddt")
    for index, line in enumerate(lines[:32]):
        normalized = line.casefold()
        if not any(marker in normalized for marker in markers):
            continue
        for candidate in reversed(lines[max(0, index - 8) : index]):
            candidate_normalized = candidate.casefold()
            if any(token in candidate_normalized for token in ignored_tokens):
                continue
            if _LEGAL_ENTITY_PATTERN.search(candidate) and _is_company_like_line(candidate):
                return candidate[:120]
    return None


def _extract_supplier_name_from_ocr_document(document: dict[str, object]) -> str | None:
    delivery_header_supplier = _extract_delivery_header_supplier_name(str(document.get("text") or ""))
    if delivery_header_supplier:
        return delivery_header_supplier

    rows = _extract_google_ocr_rows(document)
    if not rows:
        return None

    header_boundary = 0.34
    for row in rows:
        if float(row.get("y0") or 0.0) >= header_boundary:
            break
        left_segments = [
            normalize_text_block(str(segment.get("text") or ""))
            for segment in row.get("segments", [])
            if isinstance(segment, dict) and float(segment.get("x0") or 0.0) <= 0.48
        ]
        for candidate in left_segments:
            if _LEGAL_ENTITY_PATTERN.search(candidate) and _is_company_like_line(candidate):
                return candidate[:120]
    return None


def _looks_like_delivery_note_item_row(values: list[str]) -> bool:
    if len(values) < 9:
        return False
    description = values[2] if len(values) > 2 else ""
    if not re.search(r"[A-Za-zÀ-ÿ]", description):
        return False
    if not re.search(r"\d", values[-1] if values else "") and not re.search(r"\d", values[-2] if len(values) > 1 else ""):
        return False
    return True


def _split_total_and_vat_cell(value: str | None) -> tuple[float | None, str | None]:
    normalized = normalize_text_block(value or "")
    if not normalized:
        return (None, None)
    match = re.fullmatch(r"(\d+(?:[.,]\d{2,4})?)\s+([A-Za-z]?\d{1,3})", normalized)
    if not match:
        return (_parse_euro_amount(normalized), None)
    return (_parse_euro_amount(match.group(1)), match.group(2))


def _parse_delivery_note_table_row(values: list[str], line_index: int) -> dict[str, object] | None:
    if not _looks_like_delivery_note_item_row(values):
        return None

    prefix = values[:6]
    tail = values[6:]
    if len(prefix) < 6 or len(tail) < 3:
        return None

    trailing_total, trailing_vat_code = _split_total_and_vat_cell(tail[-1])
    if trailing_total is not None and trailing_vat_code is not None and len(tail) >= 2:
        unit_price = _parse_euro_amount(tail[-2])
        line_total = trailing_total
        vat_code = trailing_vat_code
        measurement_cells = tail[:-2]
    else:
        unit_price = _parse_euro_amount(tail[-3]) if len(tail) >= 3 else None
        line_total = _parse_euro_amount(tail[-2]) if len(tail) >= 2 else None
        vat_code = tail[-1] if tail else None
        measurement_cells = tail[:-3]

    measurements = [
        parsed
        for parsed in (_parse_euro_amount(cell) for cell in measurement_cells)
        if parsed is not None
    ]
    pack_count = _parse_euro_amount(prefix[5])
    gross_quantity = measurements[0] if measurements else None
    tare_quantity = measurements[1] if len(measurements) >= 3 else None
    net_quantity = measurements[2] if len(measurements) >= 3 else (measurements[-1] if measurements else None)
    quantity = net_quantity if net_quantity is not None else (gross_quantity if gross_quantity is not None else pack_count)

    return {
        "line_index": line_index,
        "product_code": prefix[0] or None,
        "iso_code": prefix[1] or None,
        "description": prefix[2],
        "category_code": prefix[3] or None,
        "unit_code": prefix[4] or None,
        "pack_count": pack_count,
        "quantity": quantity,
        "gross_quantity": gross_quantity,
        "tare_quantity": tare_quantity,
        "net_quantity": net_quantity,
        "unit_price": unit_price,
        "line_total": line_total,
        "vat_code": vat_code or None,
        "raw_row_text": normalize_text_block(" | ".join(values)),
    }


_CAVALLARO_CATEGORY_CODES = {"BU", "CL", "CS", "CT", "LE", "MA", "MZ", "PD", "PL", "PO", "SA", "SF", "VS"}
_CAVALLARO_UNIT_CODES = {"CS", "CT", "KG", "NR", "PZ"}


def _cavallaro_amount_parts(cells: list[str]) -> tuple[list[float], str | None]:
    cleaned_cells = [normalize_text_block(cell) for cell in cells if normalize_text_block(cell)]
    vat_code = None
    if cleaned_cells and re.fullmatch(r"\d{2}", cleaned_cells[-1]):
        vat_code = cleaned_cells[-1]
        cleaned_cells = cleaned_cells[:-1]
    values = [
        parsed
        for parsed in (_parse_euro_amount(cell) for cell in cleaned_cells)
        if parsed is not None
    ]
    return values, vat_code


def _parse_cavallaro_amount_line(cells: list[str]) -> dict[str, object] | None:
    values, vat_code = _cavallaro_amount_parts(cells)
    if not values:
        return None
    measurements: list[float] = []
    unit_price = None
    line_total = None
    if len(values) >= 3:
        measurements = values[:-2]
        unit_price = values[-2]
        line_total = values[-1]
    elif len(values) == 2:
        unit_price = values[0]
        line_total = values[1]
    else:
        line_total = values[0]
    return {
        "measurements": measurements,
        "unit_price": unit_price,
        "line_total": line_total,
        "vat_code": vat_code,
    }


def _parse_cavallaro_lot_line(cells: list[str]) -> dict[str, object] | None:
    if not cells:
        return None
    tokens = [normalize_text_block(cell).upper() for cell in cells if normalize_text_block(cell)]
    if not tokens:
        return None
    category_code = None
    unit_code = None
    pack_count = None
    rest: list[str] = []
    if tokens[0] in _CAVALLARO_CATEGORY_CODES and len(tokens) >= 2 and tokens[1] in _CAVALLARO_UNIT_CODES:
        category_code = tokens[0]
        unit_code = tokens[1]
        if len(cells) >= 3:
            pack_count = _parse_euro_amount(cells[2])
        rest = cells[3:]
    elif tokens[0] in _CAVALLARO_UNIT_CODES:
        unit_code = tokens[0]
        if len(cells) >= 2:
            pack_count = _parse_euro_amount(cells[1])
        rest = cells[2:]
    elif tokens[0] in _CAVALLARO_CATEGORY_CODES:
        category_code = tokens[0]
        rest = cells[1:]
    else:
        return None

    rest_values = [
        parsed
        for parsed in (_parse_euro_amount(cell) for cell in rest)
        if parsed is not None
    ]
    measurements: list[float] = []
    unit_price = None
    if len(rest_values) >= 3:
        measurements = rest_values[:-1]
        unit_price = rest_values[-1]
    elif rest_values:
        measurements = rest_values
    return {
        "category_code": category_code,
        "unit_code": unit_code,
        "pack_count": pack_count,
        "measurements": measurements,
        "unit_price": unit_price,
        "line_total": None,
        "vat_code": None,
    }


def _merge_cavallaro_lot_data(
    base: dict[str, object] | None,
    override: dict[str, object] | None,
) -> dict[str, object]:
    merged: dict[str, object] = {}
    for source in (base or {}, override or {}):
        for key, value in source.items():
            if key == "measurements":
                if value:
                    merged[key] = list(value) if isinstance(value, list) else value
                continue
            if value is not None:
                merged[key] = value
    return merged


def _looks_like_cavallaro_product_line(cells: list[str]) -> bool:
    if len(cells) < 2:
        return False
    if not re.fullmatch(r"\d{3,6}", normalize_text_block(cells[0])):
        return False
    description_index = 2 if len(cells) >= 3 and re.fullmatch(r"[A-Z]{2}", normalize_text_block(cells[1]).upper()) else 1
    if len(cells) <= description_index:
        return False
    return bool(re.search(r"[A-Za-zÀ-ÿ]{3,}", normalize_text_block(cells[description_index])))


def _parse_cavallaro_product_line(cells: list[str]) -> tuple[dict[str, object], dict[str, object] | None, dict[str, object] | None] | None:
    if not _looks_like_cavallaro_product_line(cells):
        return None
    product_code = normalize_text_block(cells[0])
    iso_code = None
    description_index = 1
    if len(cells) >= 3 and re.fullmatch(r"[A-Z]{2}", normalize_text_block(cells[1]).upper()):
        iso_code = normalize_text_block(cells[1]).upper()
        description_index = 2
    description = normalize_text_block(cells[description_index])
    tail = cells[description_index + 1 :]
    lot_data = _parse_cavallaro_lot_line(tail)
    trailing_amount = None if lot_data is not None else _parse_cavallaro_amount_line(tail)
    return (
        {
            "product_code": product_code,
            "iso_code": iso_code,
            "description": description,
        },
        lot_data,
        trailing_amount,
    )


def _parse_cavallaro_wrapped_amount_cells(cells: list[str]) -> dict[str, object] | None:
    cleaned_cells = [normalize_text_block(cell) for cell in cells if normalize_text_block(cell)]
    if not cleaned_cells:
        return None
    vat_code = None
    if re.fullmatch(r"\d{2}", cleaned_cells[-1]):
        vat_code = cleaned_cells[-1]
        cleaned_cells = cleaned_cells[:-1]
    values = [
        parsed
        for parsed in (_parse_euro_amount(cell) for cell in cleaned_cells)
        if parsed is not None
    ]
    if not values and vat_code is None:
        return None
    return {"values": values, "vat_code": vat_code}


def _parse_cavallaro_continuation_line(cells: list[str]) -> dict[str, object] | None:
    if not cells:
        return None
    first_cell = normalize_text_block(cells[0])
    if not re.fullmatch(r"\d{3,6}", first_cell):
        return None
    product_code = first_cell
    iso_code = None
    amount_start_index = 1
    if len(cells) >= 2 and re.fullmatch(r"[A-Z]{2}", normalize_text_block(cells[1]).upper()):
        iso_code = normalize_text_block(cells[1]).upper()
        amount_start_index = 2
    amount_data = _parse_cavallaro_wrapped_amount_cells(cells[amount_start_index:])
    return {
        "product_code": product_code,
        "iso_code": iso_code,
        "amount_data": amount_data,
    }


def _find_cavallaro_lot_start(cells: list[str]) -> int | None:
    upper_cells = [normalize_text_block(cell).upper() for cell in cells]
    for index in range(0, len(upper_cells) - 1):
        if upper_cells[index] in _CAVALLARO_CATEGORY_CODES and upper_cells[index + 1] in _CAVALLARO_UNIT_CODES:
            return index
    for index, value in enumerate(upper_cells):
        if value in _CAVALLARO_UNIT_CODES:
            return index
    return None


def _parse_cavallaro_inline_item_line(cells: list[str], line_index: int) -> dict[str, object] | None:
    lot_start_index = _find_cavallaro_lot_start(cells)
    if lot_start_index is None:
        return None

    prefix = [normalize_text_block(cell) for cell in cells[:lot_start_index] if normalize_text_block(cell)]
    if not prefix:
        return None

    product_code = None
    iso_code = None
    description_parts: list[str] = []
    if re.fullmatch(r"\d{3,6}", prefix[0]):
        product_code = prefix[0]
        if len(prefix) >= 2 and re.fullmatch(r"[A-Z]{2}", prefix[1].upper()):
            iso_code = prefix[1].upper()
            description_parts = prefix[2:]
        else:
            description_parts = prefix[1:]
    elif re.fullmatch(r"[A-Z]{2}", prefix[0].upper()) and len(prefix) >= 2:
        iso_code = prefix[0].upper()
        description_parts = prefix[1:]
    else:
        description_parts = prefix

    description = normalize_text_block(" ".join(description_parts))
    if not description:
        return None

    category_code = None
    unit_code = normalize_text_block(cells[lot_start_index]).upper()
    detail_start_index = lot_start_index + 1
    if unit_code in _CAVALLARO_CATEGORY_CODES and detail_start_index < len(cells):
        category_code = unit_code
        unit_code = normalize_text_block(cells[detail_start_index]).upper()
        detail_start_index += 1
    if unit_code not in _CAVALLARO_UNIT_CODES:
        return None

    detail_cells = [normalize_text_block(cell) for cell in cells[detail_start_index:] if normalize_text_block(cell)]
    vat_code = None
    if detail_cells and re.fullmatch(r"\d{2}", detail_cells[-1]):
        vat_code = detail_cells[-1]
        detail_cells = detail_cells[:-1]
    numbers = [
        parsed
        for parsed in (_parse_euro_amount(cell) for cell in detail_cells)
        if parsed is not None
    ]
    if not numbers:
        return None

    pack_count = numbers[0]
    detail_values = numbers[1:]
    gross_quantity = None
    tare_quantity = None
    net_quantity = None
    unit_price = None
    line_total = None

    if unit_code == "KG":
        if detail_values:
            gross_quantity = detail_values[0]
        if len(detail_values) >= 5:
            tare_quantity = detail_values[1]
            net_quantity = detail_values[2]
            unit_price = detail_values[3]
            line_total = detail_values[4]
        elif len(detail_values) >= 4:
            tare_quantity = detail_values[1]
            net_quantity = detail_values[2]
            unit_price = detail_values[3]
        elif len(detail_values) >= 3:
            tare_quantity = detail_values[1]
            net_quantity = detail_values[2]
        elif len(detail_values) >= 2:
            tare_quantity = detail_values[1]
    else:
        if detail_values:
            gross_quantity = detail_values[0]
            net_quantity = detail_values[0]
        if len(detail_values) >= 4:
            net_quantity = detail_values[1]
            unit_price = detail_values[2]
            line_total = detail_values[3]
        elif len(detail_values) >= 3:
            unit_price = detail_values[1]
            line_total = detail_values[2]
        elif len(detail_values) >= 2:
            unit_price = detail_values[1]

    quantity = net_quantity if net_quantity is not None else (gross_quantity if gross_quantity is not None else pack_count)
    item = {
        "line_index": line_index,
        "product_code": product_code,
        "iso_code": iso_code,
        "description": description,
        "category_code": category_code,
        "unit_code": unit_code,
        "pack_count": pack_count,
        "quantity": quantity,
        "gross_quantity": gross_quantity,
        "tare_quantity": tare_quantity,
        "net_quantity": net_quantity,
        "unit_price": unit_price,
        "line_total": line_total,
        "vat_code": vat_code,
        "raw_row_text": normalize_text_block(" | ".join(cells)),
    }
    if _is_noise_delivery_item(str(item.get("description") or ""), str(item.get("raw_row_text") or "")):
        return None
    return item


def _apply_cavallaro_amount_data(item: dict[str, object], amount_data: dict[str, object] | None) -> None:
    if not amount_data:
        return
    values = [float(value) for value in amount_data.get("values") or [] if isinstance(value, (int, float))]
    vat_code = amount_data.get("vat_code")
    unit_code = normalize_text_block(str(item.get("unit_code") or "")).upper()
    if unit_code == "KG" and len(values) >= 3:
        if item.get("net_quantity") is None:
            item["net_quantity"] = values[-3]
            item["quantity"] = values[-3]
        if item.get("unit_price") is None:
            item["unit_price"] = values[-2]
        if item.get("line_total") is None:
            item["line_total"] = values[-1]
    elif len(values) >= 2:
        if item.get("unit_price") is None:
            item["unit_price"] = values[-2]
        if item.get("line_total") is None:
            item["line_total"] = values[-1]
    elif len(values) == 1 and item.get("line_total") is None:
        item["line_total"] = values[0]
    if vat_code and item.get("vat_code") is None:
        item["vat_code"] = vat_code


def _extract_cavallaro_split_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    items: list[dict[str, object]] = []
    pending_amount_data: dict[str, object] | None = None
    in_table = False
    stop_tokens = (
        "causale del trasporto",
        "firma del conducente",
        "firma del destinatario",
        "totale imponibile",
        "totale documento",
    )
    for line in lines:
        normalized_line = line.casefold()
        if any(token in normalized_line for token in stop_tokens):
            if in_table:
                break
            continue
        if "descrizione merce" in normalized_line or ("categ." in normalized_line and "prezzo" in normalized_line):
            in_table = True
            continue
        if not in_table:
            continue

        cells = [normalize_text_block(part) for part in re.split(r"\s*\|\s*", line) if normalize_text_block(part)]
        if not cells:
            continue

        item = _parse_cavallaro_inline_item_line(cells, len(items))
        if item is not None:
            _apply_cavallaro_amount_data(item, pending_amount_data)
            pending_amount_data = None
            items.append(item)
            continue

        continuation = _parse_cavallaro_continuation_line(cells)
        if continuation is not None:
            if items:
                last_item = items[-1]
                if not last_item.get("product_code"):
                    last_item["product_code"] = continuation.get("product_code")
                if not last_item.get("iso_code") and continuation.get("iso_code"):
                    last_item["iso_code"] = continuation.get("iso_code")
                last_item["raw_row_text"] = normalize_text_block(
                    f"{last_item.get('raw_row_text') or ''} | {' | '.join(cells)}"
                )
            pending_amount_data = continuation.get("amount_data") if isinstance(continuation.get("amount_data"), dict) else None
            continue

        amount_data = _parse_cavallaro_wrapped_amount_cells(cells)
        if amount_data is not None:
            pending_amount_data = amount_data

    return _clean_delivery_items(items)


def _build_cavallaro_wrapped_item(
    *,
    product: dict[str, object],
    lot_data: dict[str, object] | None,
    amount_data: dict[str, object] | None,
    raw_parts: list[str],
    line_index: int,
) -> dict[str, object] | None:
    merged_lot = lot_data or {}
    amount = amount_data or {}
    lot_measurements = [float(value) for value in (merged_lot.get("measurements") or []) if isinstance(value, (int, float))]
    amount_measurements = [float(value) for value in (amount.get("measurements") or []) if isinstance(value, (int, float))]
    if len(amount_measurements) >= 3:
        measurements = amount_measurements
    elif len(lot_measurements) >= 2 and len(amount_measurements) == 1:
        measurements = [lot_measurements[0], lot_measurements[1], amount_measurements[0]]
    elif amount_measurements:
        measurements = amount_measurements
    else:
        measurements = lot_measurements
    pack_count = merged_lot.get("pack_count")
    gross_quantity = measurements[0] if measurements else None
    tare_quantity = measurements[1] if len(measurements) >= 3 else None
    net_quantity = measurements[2] if len(measurements) >= 3 else (measurements[-1] if measurements else None)
    quantity = net_quantity if net_quantity is not None else (gross_quantity if gross_quantity is not None else pack_count)
    unit_price = amount.get("unit_price") if amount.get("unit_price") is not None else merged_lot.get("unit_price")
    line_total = amount.get("line_total") if amount.get("line_total") is not None else merged_lot.get("line_total")
    vat_code = amount.get("vat_code") if amount.get("vat_code") is not None else merged_lot.get("vat_code")
    item = {
        "line_index": line_index,
        "product_code": product.get("product_code"),
        "iso_code": product.get("iso_code"),
        "description": product.get("description"),
        "category_code": merged_lot.get("category_code"),
        "unit_code": merged_lot.get("unit_code"),
        "pack_count": pack_count,
        "quantity": quantity,
        "gross_quantity": gross_quantity,
        "tare_quantity": tare_quantity,
        "net_quantity": net_quantity,
        "unit_price": unit_price,
        "line_total": line_total,
        "vat_code": vat_code,
        "raw_row_text": normalize_text_block(" | ".join(raw_parts)),
    }
    if _is_noise_delivery_item(str(item.get("description") or ""), str(item.get("raw_row_text") or "")):
        return None
    return item


def _extract_cavallaro_wrapped_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    items: list[dict[str, object]] = []
    pending_amount: dict[str, object] | None = None
    pending_lot: dict[str, object] | None = None
    in_table = False
    for line in lines:
        normalized_line = line.casefold()
        if any(token in normalized_line for token in ("causale del trasporto", "firma del conducente", "totale imponibile", "totale documento")):
            if in_table:
                break
            continue
        cells = [
            normalize_text_block(part)
            for part in re.split(r"\s*\|\s*", line)
            if normalize_text_block(part)
        ]
        if not cells:
            continue
        normalized_cells = [cell.casefold() for cell in cells]
        if "descrizione merce" in normalized_line or ("codice" in normalized_cells and "c.iso" in normalized_cells):
            in_table = True
        if "codice" in normalized_cells and "c.iso" in normalized_cells:
            ciso_index = normalized_cells.index("c.iso")
            pending_amount = _parse_cavallaro_amount_line(cells[ciso_index + 1 :])
            continue
        if not in_table and not _looks_like_cavallaro_product_line(cells):
            continue

        product_parts = _parse_cavallaro_product_line(cells)
        if product_parts is not None:
            product, product_lot, trailing_amount = product_parts
            lot_data = _merge_cavallaro_lot_data(pending_lot, product_lot)
            item = _build_cavallaro_wrapped_item(
                product=product,
                lot_data=lot_data,
                amount_data=pending_amount,
                raw_parts=[line],
                line_index=len(items),
            )
            if item is not None:
                items.append(item)
            pending_amount = trailing_amount
            pending_lot = None
            in_table = True
            continue

        lot_line = _parse_cavallaro_lot_line(cells)
        if lot_line is not None:
            pending_lot = _merge_cavallaro_lot_data(pending_lot, lot_line)
            in_table = True
            continue

        amount_line = _parse_cavallaro_amount_line(cells)
        if amount_line is not None:
            pending_amount = amount_line
            in_table = True

    return _clean_delivery_items(items)


def _normalize_match_text(value: str | None) -> str:
    raw = _normalize_ocr_latin_confusables(value)
    if not raw:
        return ""
    normalized = unicodedata.normalize("NFKD", raw)
    normalized = "".join(character for character in normalized if not unicodedata.combining(character))
    normalized = normalized.casefold()
    normalized = re.sub(r"\bkg\s+grande\s+cuvee\b", " krug grande cuvee ", normalized)
    normalized = re.sub(r"\bkg\b", " krug ", normalized)
    normalized = re.sub(r"\bas\b", " armand de brignac ", normalized)
    normalized = re.sub(r"\bbv\b", " belvedere ", normalized)
    normalized = re.sub(r"\bdp\b", " dom peri ", normalized)
    normalized = re.sub(r"\bro\s*(\d{2})\b", r" rose 20\1 ", normalized)
    normalized = re.sub(r"\bmgm\b|\bmagnum\b", " magnum 1 5l ", normalized)
    normalized = re.sub(r"\bmc\b", " moet ", normalized)
    normalized = re.sub(r"\bmp\b", " minuty ", normalized)
    normalized = re.sub(r"\brur\b", " ruinart ", normalized)
    normalized = re.sub(r"\bru\b", " ruinart ", normalized)
    normalized = re.sub(r"\bvo\b", " volcan ", normalized)
    normalized = re.sub(r"\bes\b", " whispering angel ", normalized)
    normalized = re.sub(r"\btz\b", " terrazas ", normalized)
    normalized = re.sub(r"\bbomsapp\b|\bbomsap\b", " bombay sapphire ", normalized)
    normalized = re.sub(r"\bvodka\s*10\b|\bvodka10\b", " ten ", normalized)
    normalized = re.sub(r"\bsaphire\b", " sapphire ", normalized)
    normalized = re.sub(r"\bbacb\b", " bacardi carta bianca ", normalized)
    normalized = re.sub(r"\bsat1796\b", " santa teresa ", normalized)
    normalized = re.sub(r"\bpaxocaf\b", " patron xo cafe ", normalized)
    normalized = re.sub(r"\bparepal\b", " patron el alto ", normalized)
    normalized = re.sub(r"\bgreyori\b", " grey goose ", normalized)
    normalized = re.sub(r"\bred\s*bull\b|\bredbull\b", " redbull ", normalized)
    normalized = re.sub(r"\bamaretto\s+di\s+saronno\b|\bdi\s+saronno\b", " disaronno ", normalized)
    normalized = re.sub(r"\bderby\s*blue\b|\bderbyblue\b", " succhi derby ", normalized)
    normalized = re.sub(r"\bseed\s*lip\b|\bseedlip\b", " seedlip ", normalized)
    normalized = re.sub(r"\b1\s*/\s*1\b", " 1l ", normalized)
    normalized = re.sub(r"\b1\s*/\s*2\b", " 0 5l ", normalized)
    normalized = re.sub(r"\blt\s*1[,.]\s*5\b|\blt1[,.]\s*5\b", " 1 5l ", normalized)
    normalized = re.sub(r"\blt\s*1\b|\blt1\b", " 1l ", normalized)
    normalized = re.sub(r"\bcl\s*18\b", " 0 18l ", normalized)
    normalized = re.sub(r"\bcl\s*20\b", " 0 2l ", normalized)
    normalized = re.sub(r"\bcl\s*25\b", " 0 25l ", normalized)
    normalized = re.sub(r"\bcl\s*34\b", " 0 34l ", normalized)
    normalized = re.sub(r"\bcl\s*70\b", " 0 7l ", normalized)
    normalized = re.sub(r"\bml\s*200\b|\b200\s*ml\b", " 0 2l ", normalized)
    normalized = re.sub(r"\bml\s*750\b|\b750\s*ml\b", " 0 75l ", normalized)
    normalized = re.sub(r"\bnat\b", " naturale ", normalized)
    normalized = re.sub(r"\b\d+\s*x\s*70\b", " 0 7l ", normalized)
    normalized = re.sub(r"\b\d+\s*x\s*100\b", " 1l ", normalized)
    normalized = re.sub(r"\b\d+\s*x\s*150\b", " 1 5l ", normalized)
    normalized = re.sub(r"\b\d+\s*x\s*175\b", " 1 75l ", normalized)
    normalized = re.sub(r"\bschw\.?\b", " schweppes ", normalized)
    normalized = re.sub(r"\bpre[\s.\-]*mix\b|\bpremix\b", " fusto ", normalized)
    normalized = re.sub(r"\blimone\b", " lemon ", normalized)
    normalized = re.sub(r"\bbobb?in[ae]\b", " bobina ", normalized)
    normalized = re.sub(r"\bneutr[oi]\b", " secco ", normalized)
    normalized = re.sub(r"\b(srl|s\.r\.l\.|snc|s\.n\.c\.|sas|s\.a\.s\.|spa|s\.p\.a\.)\b", " ", normalized)
    normalized = re.sub(r"[^a-z0-9]+", " ", normalized)
    normalized = re.sub(r"\bmoet imperial magnum(?: nh)?\b", " moet magnum 1 5l ", normalized)
    normalized = re.sub(r"\bmoet imperial(?: nh)?\b", " moet 0 7l ", normalized)
    normalized = re.sub(r"\bminuty prestige rose 2025 1 5\b", " minuty magnum 1 5l ", normalized)
    normalized = re.sub(r"\bminuty prestige 2025 75cl c6\b", " minuty 0 7l ", normalized)
    normalized = re.sub(r"\b(?:lt18|kg10|liit|paper|super)\b", " ", normalized)
    return re.sub(r"\s{2,}", " ", normalized).strip()


def _token_set(value: str | None) -> set[str]:
    tokens = {token for token in _normalize_match_text(value).split(" ") if token and len(token) > 1}
    return {token for token in tokens if token not in _MATCH_TOKEN_STOPWORDS and not token.isdigit()}


def _product_volume_tokens(normalized_value: str) -> set[str]:
    patterns = {
        "0.18l": r"\b0\s*18l\b|\b18cl\b",
        "0.2l": r"\b0\s*2l\b|\b20cl\b",
        "0.25l": r"\b0\s*25l\b|\b25cl\b",
        "0.33l": r"\b0\s*33l\b|\b33cl\b",
        "0.5l": r"\b0\s*5l\b|\b50cl\b",
        "0.7l": r"\b0\s*7l\b|\b70cl\b",
        "0.75l": r"\b0\s*75l\b|\b75cl\b",
        "1l": r"\b1l\b|\b100cl\b",
        "1.5l": r"\b1\s*5l\b|\b150cl\b",
        "1.75l": r"\b1\s*75l\b|\b175cl\b",
        "3l": r"\b3l\b|\b300cl\b",
    }
    return {label for label, pattern in patterns.items() if re.search(pattern, normalized_value)}


def _is_volume_fragment_token(token: str) -> bool:
    return bool(re.fullmatch(r"\d+(?:\s*\d+)?l", token))


def _volume_token_sets_are_compatible(left_tokens: set[str], right_tokens: set[str]) -> bool:
    if not left_tokens or not right_tokens:
        return True
    if left_tokens & right_tokens:
        return True
    champagne_standard_sizes = {"0.7l", "0.75l"}
    return left_tokens <= champagne_standard_sizes and right_tokens <= champagne_standard_sizes


def _name_similarity(left: str | None, right: str | None) -> float:
    left_normalized = _normalize_match_text(left)
    right_normalized = _normalize_match_text(right)
    if not left_normalized or not right_normalized:
        return 0.0
    if left_normalized == right_normalized:
        return 1.0
    if left_normalized in right_normalized or right_normalized in left_normalized:
        return 0.92
    left_tokens = _token_set(left_normalized)
    right_tokens = _token_set(right_normalized)
    token_score = 0.0
    if left_tokens and right_tokens:
        token_score = len(left_tokens & right_tokens) / max(len(left_tokens | right_tokens), 1)
    sequence_score = SequenceMatcher(None, left_normalized, right_normalized).ratio()
    non_volume_overlap = {
        token
        for token in (left_tokens & right_tokens)
        if not _is_volume_fragment_token(token)
    }
    if left_tokens and right_tokens and not non_volume_overlap:
        return min(sequence_score * 0.45, 0.39)
    combined_score = max(token_score, sequence_score)
    left_distinctive_tokens = left_tokens & _DISTINCTIVE_PRODUCT_TOKENS
    right_distinctive_tokens = right_tokens & _DISTINCTIVE_PRODUCT_TOKENS
    if left_distinctive_tokens and right_distinctive_tokens and not (left_distinctive_tokens & right_distinctive_tokens):
        return min(combined_score * 0.55, 0.39)
    left_volume_tokens = _product_volume_tokens(left_normalized)
    right_volume_tokens = _product_volume_tokens(right_normalized)
    if not _volume_token_sets_are_compatible(left_volume_tokens, right_volume_tokens):
        return min(combined_score * 0.55, 0.39)
    return combined_score


def _product_family_similarity_for_variant_mismatch(left: str | None, right: str | None) -> float:
    left_normalized = _normalize_match_text(left)
    right_normalized = _normalize_match_text(right)
    if not left_normalized or not right_normalized:
        return 0.0
    left_volume_tokens = _product_volume_tokens(left_normalized)
    right_volume_tokens = _product_volume_tokens(right_normalized)
    if _volume_token_sets_are_compatible(left_volume_tokens, right_volume_tokens):
        return 0.0

    left_tokens = {token for token in _token_set(left_normalized) if not _is_volume_fragment_token(token)}
    right_tokens = {token for token in _token_set(right_normalized) if not _is_volume_fragment_token(token)}
    if not left_tokens or not right_tokens:
        return 0.0
    left_distinctive_tokens = left_tokens & _DISTINCTIVE_PRODUCT_TOKENS
    right_distinctive_tokens = right_tokens & _DISTINCTIVE_PRODUCT_TOKENS
    if left_distinctive_tokens and right_distinctive_tokens and not (left_distinctive_tokens & right_distinctive_tokens):
        return 0.0
    overlap = left_tokens & right_tokens
    if not overlap:
        return 0.0
    token_score = len(overlap) / max(min(len(left_tokens), len(right_tokens)), 1)
    sequence_score = SequenceMatcher(None, " ".join(sorted(left_tokens)), " ".join(sorted(right_tokens))).ratio()
    return max(token_score, sequence_score)


def _normalize_product_code_for_match(value: object | None) -> str:
    normalized = _normalize_ocr_latin_confusables(str(value or "")).casefold()
    return re.sub(r"[^a-z0-9$]+", "", normalized)


def _product_code_match_score(left: object | None, right: object | None) -> float:
    left_code = _normalize_product_code_for_match(left)
    right_code = _normalize_product_code_for_match(right)
    if not left_code or not right_code:
        return 0.0
    return 1.0 if left_code == right_code else 0.0


def _supplier_similarity(left: str | None, right: str | None) -> float:
    left_normalized = _normalize_match_text(left)
    right_normalized = _normalize_match_text(right)
    for alias_group in _SUPPLIER_ALIAS_GROUPS:
        if left_normalized in alias_group and right_normalized in alias_group:
            return 0.92
    return _name_similarity(left, right)


def _extract_known_supplier_name_from_text(text: str | None) -> str | None:
    normalized = _normalize_match_text(text)
    if "bibite laconi" in normalized or "bibitelaconi" in normalized or "bibitelaconisrl" in normalized:
        return "BIBITE LACONI S.R.L."
    if "tonino cavallaro" in normalized or "toninocavallaro" in normalized or (
        "cavallaro" in normalized and "via seychelles" in normalized
    ):
        return "TONINO CAVALLARO & C. S.n.c."
    if "moet hennessy" in normalized or "moethennessy it" in normalized:
        return "MOET"
    if "martini rossi" in normalized or "gruppo bacardi martini" in normalized:
        return "MARTINI"
    if "montenegro" in normalized and any(
        token in normalized
        for token in (
            "vecchia romagna",
            "brandy 1820",
            "ruzzeddu marcello",
            "gruppo montenegro",
        )
    ):
        return "MARCELLO REDUZZI - MONTENEGRO"
    return None


def _is_generic_order_supplier(value: str | None) -> bool:
    return _normalize_match_text(value) in _GENERIC_ORDER_SUPPLIER_NAMES


def _best_document_name_similarity_for_order_item(
    document_items: list[dict[str, object]],
    order_item: dict[str, object],
) -> float:
    order_name = str(order_item.get("product_name") or "")
    return max(
        (
            _name_similarity(str(document_item.get("description") or ""), order_name)
            for document_item in document_items
            if isinstance(document_item, dict)
        ),
        default=0.0,
    )


def _is_integer_like(value: float | None, *, tolerance: float = 0.12) -> bool:
    if value is None:
        return False
    return abs(value - round(value)) <= tolerance


def _delivery_quantity_candidates(doc_item: dict[str, object]) -> list[tuple[str, float]]:
    candidates: list[tuple[str, float]] = []
    for key in ("pack_count", "quantity", "net_quantity", "gross_quantity"):
        raw_value = doc_item.get(key)
        if raw_value is None:
            continue
        try:
            value = float(raw_value)
        except (TypeError, ValueError):
            continue
        if value < 0:
            continue
        candidates.append((key, value))
    deduped: list[tuple[str, float]] = []
    seen_values: list[float] = []
    for source, value in candidates:
        if any(abs(value - existing) <= 0.0001 for existing in seen_values):
            continue
        seen_values.append(value)
        deduped.append((source, value))
    return deduped


def _round_quantity_for_storage(value: float | int | None) -> float | None:
    if value is None:
        return None
    numeric_value = float(value)
    if abs(numeric_value - round(numeric_value)) <= 0.000001:
        return float(int(round(numeric_value)))
    return round(numeric_value, 6)


def _quantities_match(left: float | int | None, right: float | int | None) -> bool:
    if left is None or right is None:
        return False
    return abs(float(left) - float(right)) <= 0.001


def _is_pack_lot_code(value: object | None) -> bool:
    return normalize_text_block(str(value or "")).casefold() in {"ct", "cartone", "cartoni", "cassa", "casse", "fd"}


def _is_single_unit_lot_code(value: object | None) -> bool:
    return _normalize_ocr_latin_confusables(str(value or "")).casefold() in {
        "bt",
        "bottiglia",
        "bottiglie",
        "bm",
        "nr",
        "pz",
        "pezzi",
        "pezzo",
    }


def _single_unit_lot_code_from_document(value: object | None) -> str | None:
    normalized = _normalize_ocr_latin_confusables(str(value or "")).casefold()
    if normalized in {"bt", "bottiglia", "bottiglie"}:
        return "bt"
    if normalized in {"bm", "nr", "pz", "pezzi", "pezzo"}:
        return "pz"
    return None


def _is_cavallaro_supplier_name(value: object | None) -> bool:
    return "cavallaro" in _normalize_match_text(str(value or ""))


def _document_has_weighed_produce_quantities(doc_item: dict[str, object]) -> bool:
    if _normalized_document_unit_code(doc_item.get("unit_code")) not in {"kg", "kgs", "kilo", "kilogrammo", "kilogrammi"}:
        return False
    # Cavallaro also uses KG for packaged purees. Rows with tare/net quantities are weighed produce.
    if _coerce_positive_document_float(doc_item.get("tare_quantity")) is None:
        return False
    return (
        _coerce_positive_document_float(doc_item.get("net_quantity")) is not None
        or _coerce_positive_document_float(doc_item.get("quantity")) is not None
    )


def _weighed_cavallaro_delivery_fulfills_single_unit_order(
    doc_item: dict[str, object],
    *,
    supplier_name: str | None,
    order_supplier_name: object | None,
    order_lot_code: object | None,
    name_score: float,
) -> bool:
    if name_score < 0.55:
        return False
    if not (_is_cavallaro_supplier_name(supplier_name) or _is_cavallaro_supplier_name(order_supplier_name)):
        return False
    if not _is_single_unit_lot_code(order_lot_code):
        return False
    return _document_has_weighed_produce_quantities(doc_item)


def _document_weighed_quantity(doc_item: dict[str, object]) -> float | None:
    for key in ("net_quantity", "quantity", "gross_quantity"):
        quantity = _coerce_positive_document_float(doc_item.get(key))
        if quantity is not None:
            return quantity
    return None


def _best_comparable_delivery_quantity(
    doc_item: dict[str, object],
    ordered_quantity: float,
    order_lot_code: str | None = None,
    units_per_pack: float | None = None,
) -> tuple[float | None, float | None]:
    candidates = _delivery_quantity_candidates(doc_item)
    if not candidates:
        return (None, None)
    best_score: tuple[float, float] | None = None
    best_quantity: float | None = None
    best_raw: float | None = None
    normalized_lot_code = normalize_text_block(order_lot_code or "").casefold()
    document_unit_code = doc_item.get("unit_code")
    document_uses_single_units = _is_single_unit_lot_code(document_unit_code)
    document_uses_single_units_for_pack_order = (
        _is_pack_lot_code(normalized_lot_code)
        and document_uses_single_units
        and units_per_pack is not None
        and units_per_pack > 1
    )
    expanded_candidates: list[tuple[str, float, float]] = [] if document_uses_single_units_for_pack_order else [
        (source, value, value) for source, value in candidates
    ]
    if _is_pack_lot_code(normalized_lot_code):
        for source, value in candidates:
            pack_size = units_per_pack if units_per_pack is not None and units_per_pack > 1 else None
            if pack_size is None and (source == "gross_quantity" or document_uses_single_units) and ordered_quantity > 0:
                inferred_pack_size = value / float(ordered_quantity)
                rounded_pack_size = int(round(inferred_pack_size))
                if _is_integer_like(inferred_pack_size) and rounded_pack_size in _COMMON_PACK_SIZES:
                    pack_size = float(rounded_pack_size)
            if pack_size is not None and pack_size > 1:
                expanded_candidates.append((f"{source}_converted_pack", value / pack_size, value))
    for _source, comparable_value, raw_value in expanded_candidates:
        candidate_quantity = _round_quantity_for_storage(comparable_value)
        if candidate_quantity is None:
            continue
        distance = abs(comparable_value - float(ordered_quantity))
        integer_penalty = 0.0 if _is_integer_like(comparable_value) else 0.25
        source_penalty = 0.0
        if _is_pack_lot_code(normalized_lot_code):
            if _source.endswith("_converted_pack"):
                source_penalty = -0.45
            elif _source == "pack_count":
                source_penalty = -0.35
            elif _source in {"quantity", "net_quantity"}:
                source_penalty = 0.15
        elif _is_single_unit_lot_code(normalized_lot_code):
            if _source in {"quantity", "net_quantity", "gross_quantity"}:
                source_penalty = -0.15
            elif _source == "pack_count":
                source_penalty = 0.25
        score = (distance + integer_penalty + source_penalty, abs(candidate_quantity - ordered_quantity))
        if best_score is None or score < best_score:
            best_score = score
            best_quantity = candidate_quantity
            best_raw = raw_value
    return (best_quantity, best_raw)


def _open_tenant_order_connection(database_path: str) -> sqlite3.Connection:
    connection = sqlite3.connect(database_path)
    connection.row_factory = sqlite3.Row
    return connection


def _sqlite_table_columns(connection: sqlite3.Connection, table_name: str) -> set[str]:
    return {str(row["name"]) for row in connection.execute(f"PRAGMA table_info({table_name})").fetchall()}


def _document_price_for_matched_order_line(line: dict[str, object]) -> float | None:
    comparable_quantity = _coerce_positive_document_float(line.get("comparable_delivered_quantity"))
    delivered_quantity = _coerce_positive_document_float(line.get("delivered_quantity"))
    line_total = _coerce_positive_document_float(line.get("document_line_total"))
    unit_price = _coerce_positive_document_float(line.get("document_unit_price"))
    applied_quantity = _coerce_positive_document_float(line.get("applied_quantity"))
    is_weight_update = _document_weight_quantity_for_order_update(line) is not None

    if (
        applied_quantity is not None
        and unit_price is not None
        and line_total is not None
        and abs(round(unit_price * applied_quantity, 2) - round(line_total, 2)) <= 0.02
    ):
        return round(unit_price, 6)
    if line_total is not None and applied_quantity is not None:
        return round(line_total / applied_quantity, 6)
    if is_weight_update and unit_price is not None:
        return round(unit_price, 6)
    if line_total is not None and comparable_quantity is not None:
        return round(line_total / comparable_quantity, 6)
    if unit_price is None:
        return None
    if (
        comparable_quantity is not None
        and delivered_quantity is not None
        and abs(delivered_quantity - comparable_quantity) > 0.001
    ):
        return round(unit_price * delivered_quantity / comparable_quantity, 6)
    return round(unit_price, 6)


def _normalized_document_unit_code(value: object | None) -> str:
    return _normalize_ocr_latin_confusables(str(value or "")).casefold()


def _document_weight_quantity_for_order_update(line: dict[str, object]) -> float | None:
    if _normalized_document_unit_code(line.get("document_unit_code")) not in {"kg", "kgs", "kilo", "kilogrammo", "kilogrammi"}:
        return None
    # Cavallaro uses KG both for weighed produce and for 1kg packaged purees.
    # Only rows with an actual tare represent produce paid by net weight.
    if _coerce_positive_document_float(line.get("document_tare_quantity")) is None:
        return None
    for key in ("document_net_quantity", "document_quantity", "delivered_quantity", "document_gross_quantity"):
        quantity = _coerce_positive_document_float(line.get(key))
        if quantity is not None:
            return quantity
    return None


def _document_single_unit_update_for_pack_order(line: dict[str, object]) -> tuple[str | None, float | None]:
    next_lot_code = _single_unit_lot_code_from_document(line.get("document_unit_code"))
    if next_lot_code is None:
        return (None, None)
    if not _is_pack_lot_code(line.get("order_lot_code")):
        return (None, None)
    for key in ("document_quantity", "document_net_quantity", "delivered_quantity", "document_gross_quantity", "document_pack_count"):
        quantity = _coerce_positive_document_float(line.get(key))
        if quantity is not None:
            return (next_lot_code, _round_quantity_for_storage(quantity))
    return (None, None)


def _order_update_lot_and_quantity(line: dict[str, object]) -> tuple[str | None, float | None]:
    weight_quantity = _document_weight_quantity_for_order_update(line)
    if weight_quantity is not None:
        return ("kg", _round_quantity_for_storage(weight_quantity))
    single_lot_code, single_quantity = _document_single_unit_update_for_pack_order(line)
    if single_lot_code is not None and single_quantity is not None:
        return (single_lot_code, single_quantity)
    comparable_quantity = _coerce_positive_document_float(line.get("comparable_delivered_quantity"))
    return (None, _round_quantity_for_storage(comparable_quantity) if comparable_quantity is not None else None)


def _load_order_batches_for_fiscal_matching(
    session: SessionIdentity,
    *,
    document_date: str | None,
) -> list[dict[str, object]]:
    with _open_tenant_order_connection(session.database_path) as connection:
        _ensure_order_batch_fiscal_match_columns(connection)
        parameters: list[object] = []
        query = """
            SELECT
                b.id AS batch_id,
                b.confirmed_at AS confirmed_at,
                b.staff AS staff,
                b.total_estimated_amount AS total_estimated_amount,
                b.fiscal_document_id AS fiscal_document_id,
                i.id AS item_id,
                i.product_id AS product_id,
                i.product_name AS product_name,
                i.lot_code AS lot_code,
                i.supplier_name AS supplier_name,
                i.quantity AS quantity,
                i.final_price_vat_snapshot AS final_price_vat_snapshot,
                i.estimated_line_total AS estimated_line_total,
                i.units_per_pack AS units_per_pack,
                i.liters_per_unit AS liters_per_unit,
                p.product_code AS product_code
            FROM ordini_batches b
            JOIN ordini_items i ON i.batch_id = b.id
            LEFT JOIN ordini_products p ON p.id = i.product_id
        """
        if document_date:
            query += """
            WHERE datetime(b.confirmed_at) >= datetime(?, '-21 days')
              AND datetime(b.confirmed_at) <= datetime(?, '+21 days')
            """
            parameters.extend([document_date, document_date])
        query += " ORDER BY datetime(b.confirmed_at) DESC, b.id DESC, i.id ASC"
        rows = connection.execute(query, parameters).fetchall()

    grouped: dict[tuple[str, str, str], dict[str, object]] = {}
    for row in rows:
        batch_id = int(row["batch_id"])
        supplier_name = str(row["supplier_name"] or "")
        supplier_key = _normalize_match_text(supplier_name)
        confirmed_at = str(row["confirmed_at"] or "")
        try:
            confirmed_at_value = datetime.fromisoformat(confirmed_at.replace("Z", "+00:00"))
            if confirmed_at_value.tzinfo is None:
                confirmed_at_value = confirmed_at_value.replace(tzinfo=timezone.utc)
            supplier_day = confirmed_at_value.astimezone(_ITALIAN_TIMEZONE).date().isoformat()
        except ValueError:
            supplier_day = confirmed_at[:10]
        fiscal_document_id = str(row["fiscal_document_id"] or "").strip()
        fiscal_group = f"matched:{fiscal_document_id}" if fiscal_document_id else "unmatched"
        group_key = (supplier_day, supplier_key, fiscal_group)
        batch = grouped.setdefault(
            group_key,
            {
                "batch_id": batch_id,
                "batch_ids": [],
                "confirmed_at": confirmed_at,
                "staff": row["staff"],
                "total_estimated_amount": None,
                "items": [],
                "_items_by_identity": {},
            },
        )
        batch_ids = batch["batch_ids"]
        if isinstance(batch_ids, list) and batch_id not in batch_ids:
            batch_ids.append(batch_id)
        if batch_id < int(batch["batch_id"]):
            batch["batch_id"] = batch_id
            batch["confirmed_at"] = confirmed_at
            batch["staff"] = row["staff"]

        product_name = str(row["product_name"] or "")
        lot_code = str(row["lot_code"] or "")
        item_identity = (_normalize_match_text(product_name), _normalize_match_text(lot_code), supplier_key)
        item_id = int(row["item_id"])
        items_by_identity = batch["_items_by_identity"]
        if not isinstance(items_by_identity, dict):
            continue
        item = items_by_identity.get(item_identity)
        if item is None:
            item = {
                "item_id": item_id,
                "item_ids": [item_id],
                "product_id": row["product_id"],
                "product_name": product_name,
                "lot_code": lot_code,
                "supplier_name": supplier_name,
                "quantity": float(row["quantity"] or 0),
                "final_price_vat_snapshot": float(row["final_price_vat_snapshot"]) if row["final_price_vat_snapshot"] is not None else None,
                "estimated_line_total": float(row["estimated_line_total"]) if row["estimated_line_total"] is not None else None,
                "units_per_pack": float(row["units_per_pack"]) if row["units_per_pack"] is not None else None,
                "liters_per_unit": float(row["liters_per_unit"]) if row["liters_per_unit"] is not None else None,
                "product_code": str(row["product_code"] or "").strip(),
            }
            items_by_identity[item_identity] = item
            batch["items"].append(item)
            continue
        item["item_ids"].append(item_id)
        item["quantity"] = float(item.get("quantity") or 0) + float(row["quantity"] or 0)
        if not str(item.get("product_code") or "").strip() and str(row["product_code"] or "").strip():
            item["product_code"] = str(row["product_code"] or "").strip()
        if row["estimated_line_total"] is not None:
            item["estimated_line_total"] = float(item.get("estimated_line_total") or 0) + float(row["estimated_line_total"])

    results: list[dict[str, object]] = []
    for batch in grouped.values():
        batch.pop("_items_by_identity", None)
        items = batch.get("items") if isinstance(batch.get("items"), list) else []
        line_totals = [
            float(item["estimated_line_total"])
            for item in items
            if isinstance(item, dict) and item.get("estimated_line_total") is not None
        ]
        batch["total_estimated_amount"] = round(sum(line_totals), 2) if line_totals else None
        results.append(batch)
    return results


def _build_delivery_note_order_match(
    session: SessionIdentity,
    *,
    supplier_name: str | None,
    document_date: str | None,
    document_items: list[dict[str, object]],
) -> dict[str, object]:
    if not document_items:
        return {
            "status": "no_document_lines",
            "score": 0.0,
            "line_match_count": 0,
            "exact_line_count": 0,
            "missing_line_count": 0,
            "extra_line_count": 0,
            "can_apply_storno": False,
            "lines": [],
        }

    candidate_batches = _load_order_batches_for_fiscal_matching(
        session,
        document_date=document_date,
    )
    best_result: dict[str, object] | None = None

    try:
        document_date_value = datetime.fromisoformat(document_date) if document_date else None
    except ValueError:
        document_date_value = None

    for batch in candidate_batches:
        batch_items = [item for item in batch.get("items", []) if isinstance(item, dict)]
        if not batch_items:
            continue
        supplier_scored_items: list[tuple[dict[str, object], float, float]] = []
        for item in batch_items:
            order_supplier_name = str(item.get("supplier_name") or "")
            supplier_match_score = _supplier_similarity(supplier_name, order_supplier_name)
            generic_product_score = (
                _best_document_name_similarity_for_order_item(document_items, item)
                if _is_generic_order_supplier(order_supplier_name)
                else 0.0
            )
            supplier_scored_items.append((item, supplier_match_score, generic_product_score))

        relevant_items = [
            item
            for item, supplier_match_score, generic_product_score in supplier_scored_items
            if supplier_match_score >= 0.45 or generic_product_score >= 0.4
        ]
        supplier_score = max(
            (
                max(supplier_match_score, min(generic_product_score, 0.75))
                for _, supplier_match_score, generic_product_score in supplier_scored_items
            ),
            default=0.0,
        )
        if not relevant_items:
            continue

        used_item_ids: set[int] = set()
        matched_lines: list[dict[str, object]] = []
        line_match_count = 0
        exact_line_count = 0
        for document_item in document_items:
            document_unit_price, document_line_total = _document_line_pricing(document_item)
            best_line_match: dict[str, object] | None = None
            for order_item in relevant_items:
                item_id = int(order_item.get("item_id") or 0)
                if item_id in used_item_ids:
                    continue
                name_score = _name_similarity(
                    str(document_item.get("description") or ""),
                    str(order_item.get("product_name") or ""),
                )
                code_score = _product_code_match_score(document_item.get("product_code"), order_item.get("product_code"))
                family_variant_score = 0.0
                is_variant_mismatch = False
                if code_score <= 0 and name_score < 0.4:
                    family_variant_score = _product_family_similarity_for_variant_mismatch(
                        str(document_item.get("description") or ""),
                        str(order_item.get("product_name") or ""),
                    )
                    is_variant_mismatch = family_variant_score >= 0.75
                name_score = max(name_score, code_score)
                if is_variant_mismatch:
                    name_score = max(name_score, min(family_variant_score, 0.62))
                if name_score < 0.4:
                    continue
                ordered_quantity = float(order_item.get("quantity") or 0)
                comparable_quantity, delivered_raw_quantity = _best_comparable_delivery_quantity(
                    document_item,
                    ordered_quantity,
                    str(order_item.get("lot_code") or ""),
                    float(order_item["units_per_pack"]) if order_item.get("units_per_pack") is not None else None,
                )
                if _weighed_cavallaro_delivery_fulfills_single_unit_order(
                    document_item,
                    supplier_name=supplier_name,
                    order_supplier_name=order_item.get("supplier_name"),
                    order_lot_code=order_item.get("lot_code"),
                    name_score=name_score,
                ):
                    comparable_quantity = ordered_quantity
                    delivered_raw_quantity = _document_weighed_quantity(document_item)
                quantity_score = 0.0
                if comparable_quantity is not None:
                    quantity_score = max(
                        0.0,
                        1.0 - (abs(float(comparable_quantity) - float(ordered_quantity)) / max(float(ordered_quantity), 1.0)),
                    )
                confidence = round((name_score * 0.7) + (quantity_score * 0.3), 4)
                if best_line_match is None or confidence > float(best_line_match["confidence"]):
                    missing_quantity = None
                    extra_quantity = None
                    status = "partial"
                    if comparable_quantity is not None:
                        missing_quantity = max(ordered_quantity - comparable_quantity, 0)
                        extra_quantity = max(comparable_quantity - ordered_quantity, 0)
                        if _quantities_match(comparable_quantity, ordered_quantity):
                            status = "exact"
                        elif comparable_quantity < ordered_quantity:
                            status = "partial"
                        else:
                            status = "over_delivered"
                    best_line_match = {
                        "status": status,
                        "confidence": confidence,
                        "order_item_id": item_id,
                        "order_item_ids": list(order_item.get("item_ids") or [item_id]),
                        "order_product_name": order_item.get("product_name"),
                        "order_lot_code": order_item.get("lot_code"),
                        "order_supplier_name": order_item.get("supplier_name"),
                        "order_product_code": order_item.get("product_code"),
                        "ordered_quantity": ordered_quantity,
                        "delivered_quantity": delivered_raw_quantity,
                        "comparable_delivered_quantity": comparable_quantity,
                        "missing_quantity": missing_quantity,
                        "extra_quantity": extra_quantity,
                        "document_line_index": document_item.get("line_index"),
                        "document_description": document_item.get("description"),
                        "document_raw_row_text": document_item.get("raw_row_text"),
                        "document_product_code": document_item.get("product_code"),
                        "document_unit_code": document_item.get("unit_code"),
                        "document_pack_count": document_item.get("pack_count"),
                        "document_quantity": document_item.get("quantity"),
                        "document_gross_quantity": document_item.get("gross_quantity"),
                        "document_tare_quantity": document_item.get("tare_quantity"),
                        "document_net_quantity": document_item.get("net_quantity"),
                        "document_unit_price": document_unit_price,
                        "document_line_total": document_line_total,
                        "document_vat_code": document_item.get("vat_code"),
                    }
            if best_line_match is None:
                matched_lines.append(
                    {
                        "status": "extra",
                        "confidence": 0.0,
                        "order_item_id": None,
                        "order_product_name": None,
                        "order_lot_code": None,
                        "order_supplier_name": supplier_name,
                        "order_product_code": None,
                        "ordered_quantity": None,
                        "delivered_quantity": document_item.get("quantity"),
                        "comparable_delivered_quantity": None,
                        "missing_quantity": None,
                        "extra_quantity": None,
                        "document_line_index": document_item.get("line_index"),
                        "document_description": document_item.get("description"),
                        "document_raw_row_text": document_item.get("raw_row_text"),
                        "document_product_code": document_item.get("product_code"),
                        "document_unit_code": document_item.get("unit_code"),
                        "document_pack_count": document_item.get("pack_count"),
                        "document_quantity": document_item.get("quantity"),
                        "document_gross_quantity": document_item.get("gross_quantity"),
                        "document_tare_quantity": document_item.get("tare_quantity"),
                        "document_net_quantity": document_item.get("net_quantity"),
                        "document_unit_price": document_unit_price,
                        "document_line_total": document_line_total,
                        "document_vat_code": document_item.get("vat_code"),
                    }
                )
                continue
            used_item_ids.add(int(best_line_match["order_item_id"]))
            matched_lines.append(best_line_match)
            line_match_count += 1
            if best_line_match["status"] == "exact":
                exact_line_count += 1

        for order_item in relevant_items:
            item_id = int(order_item.get("item_id") or 0)
            if item_id in used_item_ids:
                continue
            matched_lines.append(
                {
                    "status": "missing",
                    "confidence": 0.0,
                    "order_item_id": item_id,
                    "order_item_ids": list(order_item.get("item_ids") or [item_id]),
                    "order_product_name": order_item.get("product_name"),
                    "order_lot_code": order_item.get("lot_code"),
                    "order_supplier_name": order_item.get("supplier_name"),
                    "order_product_code": order_item.get("product_code"),
                    "ordered_quantity": float(order_item.get("quantity") or 0),
                    "delivered_quantity": 0.0,
                    "comparable_delivered_quantity": 0,
                    "missing_quantity": float(order_item.get("quantity") or 0),
                    "extra_quantity": 0,
                    "document_line_index": None,
                    "document_description": None,
                    "document_raw_row_text": None,
                }
            )

        missing_line_count = sum(1 for line in matched_lines if line["status"] in {"partial", "missing"})
        extra_line_count = sum(1 for line in matched_lines if line["status"] == "extra")
        compared_line_count = max(len(document_items), len(relevant_items), 1)
        coverage_score = line_match_count / compared_line_count
        avg_confidence = sum(float(line.get("confidence") or 0.0) for line in matched_lines) / max(len(matched_lines), 1)
        date_score = 0.0
        if document_date_value is not None and batch.get("confirmed_at"):
            try:
                batch_date_value = datetime.fromisoformat(str(batch["confirmed_at"]).replace(" ", "T"))
                day_distance = abs((document_date_value.date() - batch_date_value.date()).days)
                date_score = max(0.0, 1.0 - (day_distance / 14.0))
            except ValueError:
                date_score = 0.0

        batch_score = round((coverage_score * 0.45) + (avg_confidence * 0.35) + (supplier_score * 0.15) + (date_score * 0.05), 4)
        can_apply_storno = all(
            line["order_item_id"] is None
            or line.get("comparable_delivered_quantity") is None
            or isinstance(line.get("comparable_delivered_quantity"), (int, float))
            for line in matched_lines
        )
        current_result = {
            "status": "matched",
            "matched_batch_id": int(batch["batch_id"]),
            "matched_batch_ids": list(batch.get("batch_ids") or [int(batch["batch_id"])]),
            "matched_batch_confirmed_at": str(batch.get("confirmed_at") or ""),
            "matched_batch_staff": str(batch.get("staff") or ""),
            "matched_supplier_name": str(
                max(
                    relevant_items,
                    key=lambda item: max(
                        _supplier_similarity(supplier_name, str(item.get("supplier_name") or "")),
                        min(_best_document_name_similarity_for_order_item(document_items, item), 0.75)
                        if _is_generic_order_supplier(str(item.get("supplier_name") or ""))
                        else 0.0,
                    ),
                ).get("supplier_name")
                or ""
            ),
            "score": batch_score,
            "line_match_count": line_match_count,
            "exact_line_count": exact_line_count,
            "missing_line_count": missing_line_count,
            "extra_line_count": extra_line_count,
            "can_apply_storno": can_apply_storno,
            "lines": matched_lines,
        }
        if best_result is None or (
            float(current_result["score"]),
            int(current_result["line_match_count"]),
            int(current_result["exact_line_count"]),
        ) > (
            float(best_result["score"]),
            int(best_result["line_match_count"]),
            int(best_result["exact_line_count"]),
        ):
            best_result = current_result

    if best_result is None:
        return {
            "status": "no_candidate",
            "score": 0.0,
            "line_match_count": 0,
            "exact_line_count": 0,
            "missing_line_count": 0,
            "extra_line_count": len(document_items),
            "can_apply_storno": False,
            "lines": [],
        }
    if float(best_result["score"]) < _MIN_FISCAL_ORDER_MATCH_SCORE or int(best_result["line_match_count"]) <= 0:
        return {
            "status": "no_candidate",
            "score": 0.0,
            "line_match_count": 0,
            "exact_line_count": 0,
            "missing_line_count": 0,
            "extra_line_count": len(document_items),
            "can_apply_storno": False,
            "lines": [],
        }
    return best_result


def build_fiscal_document_order_match(
    session: SessionIdentity,
    *,
    supplier_name: str | None,
    document_date: str | None,
    document_type: str,
    document_items: list[dict[str, object]],
) -> dict[str, object] | None:
    if document_type not in _ORDER_RECONCILABLE_DOCUMENT_TYPES:
        return None
    return _build_delivery_note_order_match(
        session,
        supplier_name=supplier_name,
        document_date=document_date,
        document_items=document_items,
    )


def _fiscal_document_items_for_order_match(items: list[object]) -> list[dict[str, object]]:
    return [
        {
            "line_index": getattr(item, "line_index", index),
            "description": getattr(item, "description", None),
            "raw_row_text": getattr(item, "raw_row_text", None),
            "pack_count": getattr(item, "pack_count", None),
            "quantity": getattr(item, "quantity", None),
            "gross_quantity": getattr(item, "gross_quantity", None),
            "tare_quantity": getattr(item, "tare_quantity", None),
            "net_quantity": getattr(item, "net_quantity", None),
            "unit_code": getattr(item, "unit_code", None),
            "category_code": getattr(item, "category_code", None),
            "product_code": getattr(item, "product_code", None),
            "unit_price": getattr(item, "unit_price", None),
            "line_total": getattr(item, "line_total", None),
            "vat_code": getattr(item, "vat_code", None),
        }
        for index, item in enumerate(items)
    ]


def _fiscal_order_match_has_discrepancies(order_match: dict[str, object] | None) -> bool:
    if not isinstance(order_match, dict) or order_match.get("status") != "matched":
        return False
    if int(order_match.get("missing_line_count") or 0) > 0 or int(order_match.get("extra_line_count") or 0) > 0:
        return True
    lines = order_match.get("lines") if isinstance(order_match.get("lines"), list) else []
    return any(isinstance(line, dict) and line.get("status") not in {None, "exact"} for line in lines)


def _fiscal_order_match_has_document_prices(order_match: dict[str, object] | None) -> bool:
    if not isinstance(order_match, dict):
        return False
    lines = order_match.get("lines") if isinstance(order_match.get("lines"), list) else []
    return any(
        isinstance(line, dict)
        and (
            _coerce_positive_document_float(line.get("document_unit_price")) is not None
            or _coerce_positive_document_float(line.get("document_line_total")) is not None
        )
        for line in lines
    )


def _fiscal_order_match_is_exact_with_prices(order_match: dict[str, object] | None) -> bool:
    if not isinstance(order_match, dict) or order_match.get("status") != "matched":
        return False
    if not bool(order_match.get("can_apply_storno")):
        return False
    line_match_count = int(order_match.get("line_match_count") or 0)
    if line_match_count <= 0 or int(order_match.get("exact_line_count") or 0) != line_match_count:
        return False
    if int(order_match.get("missing_line_count") or 0) > 0 or int(order_match.get("extra_line_count") or 0) > 0:
        return False
    lines = order_match.get("lines") if isinstance(order_match.get("lines"), list) else []
    if not lines:
        return False
    has_unmatched_or_non_exact_line = any(
        not isinstance(line, dict) or line.get("status") != "exact" or line.get("order_item_id") is None
        for line in lines
    )
    return not has_unmatched_or_non_exact_line and _fiscal_order_match_has_document_prices(order_match)


def _utc_iso_now() -> str:
    return datetime.now(timezone.utc).isoformat()


def _ensure_order_batch_fiscal_match_columns(connection: sqlite3.Connection) -> None:
    columns = _sqlite_table_columns(connection, "ordini_batches")
    column_specs = {
        "fiscal_document_id": "TEXT",
        "fiscal_document_name": "TEXT",
        "fiscal_document_type": "TEXT",
        "fiscal_document_matched_at": "TEXT",
    }
    for column_name, column_type in column_specs.items():
        if column_name in columns:
            continue
        connection.execute(f"ALTER TABLE ordini_batches ADD COLUMN {column_name} {column_type}")


def _matched_order_batch_ids(order_match: dict[str, object]) -> list[int]:
    raw_batch_ids = order_match.get("matched_batch_ids")
    values = [order_match.get("matched_batch_id")]
    if isinstance(raw_batch_ids, list):
        values.extend(raw_batch_ids)
    batch_ids: list[int] = []
    for value in values:
        if not isinstance(value, int) or value <= 0 or value in batch_ids:
            continue
        batch_ids.append(value)
    return batch_ids


def mark_order_batch_fiscal_document_match(
    session: SessionIdentity,
    *,
    document_id: str,
    order_match: dict[str, object],
) -> None:
    matched_batch_ids = _matched_order_batch_ids(order_match)
    if not matched_batch_ids:
        return

    document = get_tenant_store().get_fiscal_document(session.tenant_id, document_id)
    if document is None:
        return

    with _open_tenant_order_connection(session.database_path) as connection:
        _ensure_order_batch_fiscal_match_columns(connection)
        placeholders = ", ".join("?" for _ in matched_batch_ids)
        connection.execute(
            f"""
            UPDATE ordini_batches
            SET fiscal_document_id = ?,
                fiscal_document_name = ?,
                fiscal_document_type = ?,
                fiscal_document_matched_at = ?
            WHERE id IN ({placeholders})
            """,
            (
                document.id,
                document.display_name,
                document.document_type,
                _utc_iso_now(),
                *matched_batch_ids,
            ),
        )
        connection.commit()


def apply_delivery_note_storno_to_matched_order(
    session: SessionIdentity,
    *,
    document_id: str,
    order_match: dict[str, object],
) -> dict[str, object]:
    matched_batch_ids = _matched_order_batch_ids(order_match)
    if not matched_batch_ids:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Nessun ordine compatibile trovato per questo documento.")
    matched_batch_id = matched_batch_ids[0]
    if not order_match.get("can_apply_storno"):
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Questo documento richiede ancora una verifica manuale prima dell'aggiornamento ordine.")

    document = get_tenant_store().get_fiscal_document(session.tenant_id, document_id)
    lines = order_match.get("lines") if isinstance(order_match.get("lines"), list) else []
    adjustments: dict[int, dict[str, object]] = {}
    linked_item_primaries: dict[int, int] = {}
    for line in lines:
        if not isinstance(line, dict):
            continue
        order_item_id = line.get("order_item_id")
        if not isinstance(order_item_id, int):
            continue
        comparable_quantity = line.get("comparable_delivered_quantity")
        if comparable_quantity is None:
            continue
        next_lot_code, next_quantity = _order_update_lot_and_quantity(line)
        if next_quantity is None:
            continue
        raw_linked_item_ids = line.get("order_item_ids")
        linked_item_ids = [
            item_id
            for item_id in (raw_linked_item_ids if isinstance(raw_linked_item_ids, list) else [])
            if isinstance(item_id, int) and item_id > 0
        ]
        if order_item_id not in linked_item_ids:
            linked_item_ids.insert(0, order_item_id)
        adjustments[order_item_id] = {
            **line,
            "linked_order_item_ids": linked_item_ids,
            "next_lot_code": next_lot_code,
            "next_quantity": max(float(next_quantity), 0.0),
            "applied_quantity": max(float(next_quantity), 0.0),
        }
        for linked_item_id in linked_item_ids:
            if linked_item_id != order_item_id:
                linked_item_primaries[linked_item_id] = order_item_id

    if not adjustments:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Non ho trovato righe ordine aggiornabili da questo documento.")

    with _open_tenant_order_connection(session.database_path) as connection:
        _ensure_order_batch_fiscal_match_columns(connection)
        placeholders = ", ".join("?" for _ in matched_batch_ids)
        row = connection.execute(
            f"SELECT id FROM ordini_batches WHERE id IN ({placeholders}) LIMIT 1",
            tuple(matched_batch_ids),
        ).fetchone()
        if row is None:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Ordine storico associato non trovato.")

        item_rows = connection.execute(
            f"""
            SELECT id, batch_id, product_id, quantity, lot_code, final_price_vat_snapshot, estimated_line_total, supplier_name
            FROM ordini_items
            WHERE batch_id IN ({placeholders})
            ORDER BY id ASC
            """,
            tuple(matched_batch_ids),
        ).fetchall()
        product_columns = _sqlite_table_columns(connection, "ordini_products")

        for item_row in item_rows:
            item_id = int(item_row["id"])
            if item_id in linked_item_primaries:
                connection.execute("DELETE FROM ordini_items WHERE id = ?", (item_id,))
                continue
            if item_id not in adjustments:
                continue
            adjustment = adjustments[item_id]
            next_quantity = float(adjustment["next_quantity"])
            next_lot_code = str(adjustment.get("next_lot_code") or item_row["lot_code"] or "").strip()
            is_weight_update = _normalized_document_unit_code(next_lot_code) == "kg"
            clear_units_per_pack = is_weight_update or (
                _is_pack_lot_code(item_row["lot_code"])
                and _is_single_unit_lot_code(next_lot_code)
            )
            document_price = _document_price_for_matched_order_line(adjustment)
            final_price = (
                document_price
                if document_price is not None
                else float(item_row["final_price_vat_snapshot"])
                if item_row["final_price_vat_snapshot"] is not None
                else None
            )
            document_line_total = _coerce_positive_document_float(adjustment.get("document_line_total"))
            next_total = (
                round(document_line_total, 2)
                if document_line_total is not None
                else round(next_quantity * final_price, 2)
                if final_price is not None
                else None
            )
            if next_quantity <= 0:
                connection.execute("DELETE FROM ordini_items WHERE id = ?", (item_id,))
                continue
            if document_price is not None:
                vat_rate = _document_vat_rate_from_code(adjustment.get("document_vat_code"))
                connection.execute(
                    """
                    UPDATE ordini_items
                    SET quantity = ?,
                        lot_code = ?,
                        final_price_vat_snapshot = ?,
                        estimated_line_total = ?,
                        units_per_pack = CASE WHEN ? THEN NULL ELSE units_per_pack END,
                        liters_per_unit = CASE WHEN ? THEN NULL ELSE liters_per_unit END
                    WHERE id = ?
                    """,
                    (next_quantity, next_lot_code or item_row["lot_code"], document_price, next_total, int(clear_units_per_pack), int(is_weight_update), item_id),
                )
                product_id = item_row["product_id"]
                if product_id is not None:
                    product_assignments: list[str] = []
                    product_values: list[object] = []
                    if is_weight_update and "unit_price_per_kg" in product_columns:
                        product_assignments.append("unit_price_per_kg = ?")
                        product_values.append(document_price)
                    elif not clear_units_per_pack and "final_price_vat" in product_columns:
                        product_assignments.append("final_price_vat = ?")
                        product_values.append(document_price)
                    if vat_rate is not None and "vat_rate" in product_columns:
                        product_assignments.append("vat_rate = ?")
                        product_values.append(vat_rate)
                    if "updated_at" in product_columns:
                        product_assignments.append("updated_at = CURRENT_TIMESTAMP")
                    if product_assignments:
                        connection.execute(
                            f"UPDATE ordini_products SET {', '.join(product_assignments)} WHERE id = ?",
                            tuple(product_values + [int(product_id)]),
                        )
                continue
            connection.execute(
                """
                UPDATE ordini_items
                SET quantity = ?,
                    lot_code = ?,
                    estimated_line_total = ?,
                    units_per_pack = CASE WHEN ? THEN NULL ELSE units_per_pack END,
                    liters_per_unit = CASE WHEN ? THEN NULL ELSE liters_per_unit END
                WHERE id = ?
                """,
                (next_quantity, next_lot_code or item_row["lot_code"], next_total, int(clear_units_per_pack), int(is_weight_update), item_id),
            )

        for batch_id in matched_batch_ids:
            batch_total = connection.execute(
                "SELECT SUM(estimated_line_total) FROM ordini_items WHERE batch_id = ?",
                (batch_id,),
            ).fetchone()[0]
            connection.execute(
                """
                UPDATE ordini_batches
                SET total_estimated_amount = ?,
                    fiscal_document_id = ?,
                    fiscal_document_name = ?,
                    fiscal_document_type = ?,
                    fiscal_document_matched_at = ?
                WHERE id = ?
                """,
                (
                    float(batch_total) if batch_total is not None else None,
                    document.id if document is not None else document_id,
                    document.display_name if document is not None else None,
                    document.document_type if document is not None else None,
                    _utc_iso_now(),
                    batch_id,
                ),
            )
        for batch_id in matched_batch_ids:
            if batch_id == matched_batch_id:
                continue
            connection.execute(
                """
                DELETE FROM ordini_batches
                WHERE id = ?
                  AND NOT EXISTS (SELECT 1 FROM ordini_items WHERE batch_id = ?)
                """,
                (batch_id, batch_id),
            )
        connection.commit()

    store = get_tenant_store()
    store.update_fiscal_document(
        session.tenant_id,
        document_id,
        matching_status="ready_for_match",
        review_status="reviewed",
    )
    return {"ok": True, "matched_batch_id": matched_batch_id, "matched_batch_ids": matched_batch_ids}


def _extract_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    header_index: int | None = None
    for index, line in enumerate(lines):
        normalized = line.casefold()
        nearby = " ".join(lines[index : min(index + 5, len(lines))]).casefold()
        if "descrizione prodotto" in normalized or "descrizione merce" in normalized:
            header_index = max(index - 4, 0)
            break
        if (
            "descrizione prodotto" in nearby
            or "descrizione merce" in nearby
        ) and ("codice" in nearby or "prodotto" in nearby):
            header_index = index
            break
    if header_index is None:
        return _extract_simple_delivery_note_items_from_text(text)

    stop_tokens = ("firma del destinatario", "causale del trasporto", "chiusura", "totale colli", "peso netto")
    body = lines[header_index + 1 :]
    items: list[dict[str, object]] = []
    line_index = 0
    i = 0
    while i < len(body):
        line = body[i]
        normalized = line.casefold()
        if any(token in normalized for token in stop_tokens):
            break
        product_match = re.match(r"^(?:\d{6,}\s+){1,3}(.+)$", line)
        if not product_match:
            i += 1
            continue

        description = normalize_text_block(product_match.group(1))
        if not re.search(r"[A-Za-zÀ-ÿ]{3,}", description) or "@" in description:
            i += 1
            continue

        block_lines = [line]
        j = i + 1
        while j < len(body):
            next_line = body[j]
            next_normalized = next_line.casefold()
            if any(token in next_normalized for token in stop_tokens):
                break
            if re.match(r"^(?:\d{6,}\s+){1,3}(.+)$", next_line):
                break
            block_lines.append(next_line)
            j += 1

        block_text = " | ".join(block_lines)
        numeric_values = []
        for match in _DECIMAL_NUMBER_PATTERN.finditer(block_text):
            parsed = _parse_euro_amount(match.group(1))
            if parsed is not None:
                numeric_values.append(parsed)
        upper_tokens = re.findall(r"\b[A-Z]{2,}\b", block_text)
        unit_price = numeric_values[0] if numeric_values else None
        pack_count = numeric_values[1] if len(numeric_values) >= 2 else None
        gross_quantity = numeric_values[2] if len(numeric_values) >= 3 else None
        quantity = pack_count if pack_count is not None else (gross_quantity if gross_quantity is not None else None)
        line_total = numeric_values[-1] if len(numeric_values) >= 2 else unit_price

        items.append(
            {
                "line_index": line_index,
                "product_code": None,
                "iso_code": None,
                "description": description,
                "category_code": upper_tokens[0] if upper_tokens else None,
                "unit_code": None,
                "pack_count": pack_count,
                "quantity": quantity,
                "gross_quantity": gross_quantity,
                "tare_quantity": None,
                "net_quantity": quantity,
                "unit_price": unit_price,
                "line_total": line_total,
                "vat_code": upper_tokens[1] if len(upper_tokens) >= 2 else None,
                "raw_row_text": normalize_text_block(block_text),
            }
        )
        line_index += 1
        i = j
    cleaned_items = _clean_delivery_items(items)
    if cleaned_items:
        return cleaned_items
    return _extract_simple_delivery_note_items_from_text(text)


def _extract_martini_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    table_items: list[dict[str, object]] = []
    for line in lines:
        cells = [normalize_text_block(part) for part in re.split(r"\s*\|\s*", line) if normalize_text_block(part)]
        if len(cells) < 5 or not re.fullmatch(r"\d{1,4}", cells[0]):
            continue
        quantity_match = re.fullmatch(r"(\d+(?:[.,]\d+)?)\s*([A-Za-z]{1,4})", cells[1], re.IGNORECASE)
        if not quantity_match:
            continue
        quantity = _parse_euro_amount(quantity_match.group(1))
        if quantity is None or quantity <= 0:
            continue
        description = normalize_text_block(cells[2])
        if _is_noise_delivery_item(description, line):
            continue
        unit_code = quantity_match.group(2).upper()
        if unit_code == "SC":
            unit_code = "CT"
        table_items.append(
            {
                "line_index": len(table_items),
                "product_code": cells[0],
                "iso_code": None,
                "description": description,
                "category_code": None,
                "unit_code": unit_code,
                "pack_count": quantity,
                "quantity": quantity,
                "gross_quantity": quantity,
                "tare_quantity": None,
                "net_quantity": quantity,
                "unit_price": _parse_euro_amount(cells[3]),
                "line_total": _parse_euro_amount(cells[4]),
                "vat_code": None,
                "raw_row_text": normalize_text_block(line),
            }
        )
    cleaned_table_items = _clean_delivery_items(table_items)
    if cleaned_table_items:
        return cleaned_table_items

    structured_items = _extract_martini_structured_delivery_note_items(lines)
    if structured_items:
        return structured_items

    items: list[dict[str, object]] = []
    for index, line in enumerate(lines):
        description_match = re.match(r"^(\d{10,14})\s+(.+)$", line)
        if not description_match:
            continue
        description = normalize_text_block(description_match.group(2))
        if _is_noise_delivery_item(description):
            continue

        product_code = None
        if index > 0 and re.fullmatch(r"\d{6,12}", lines[index - 1]):
            product_code = lines[index - 1]

        block: list[str] = [line]
        j = index + 1
        while j < len(lines):
            candidate = lines[j]
            if re.match(r"^\d{10,14}\s+.+$", candidate):
                break
            block.append(candidate)
            j += 1

        pack_size_match = re.search(r"\b(\d+)\s*[xX]\s*\d", description)
        pack_size = int(pack_size_match.group(1)) if pack_size_match else None

        integer_values: list[int] = []
        decimal_values: list[float] = []
        compact_row_parts: list[str] = [line]
        for part in block[1:]:
            stripped = part.strip()
            if re.fullmatch(r"\d+", stripped):
                value = int(stripped)
                if value > 2000:
                    continue
                if pack_size is not None and value == pack_size:
                    continue
                integer_values.append(value)
                compact_row_parts.append(stripped)
                continue
            if re.fullmatch(r"\d+[.,]\d{2}", stripped):
                parsed = _parse_euro_amount(stripped)
                if parsed is not None:
                    decimal_values.append(parsed)
                    compact_row_parts.append(stripped)
                continue
            if stripped.casefold() in {"in casse", "bottiglie peso", "categoria", "lettere (*)", "vodka", "aez"}:
                compact_row_parts.append(stripped)

        pack_count = integer_values[0] if integer_values else None
        gross_quantity = integer_values[1] if len(integer_values) >= 2 else None
        quantity = pack_count
        line_total = decimal_values[-1] if decimal_values else None

        items.append(
            {
                "line_index": len(items),
                "product_code": product_code,
                "iso_code": description_match.group(1),
                "description": description,
                "category_code": None,
                "unit_code": "CT",
                "pack_count": float(pack_count) if pack_count is not None else None,
                "quantity": float(quantity) if quantity is not None else None,
                "gross_quantity": float(gross_quantity) if gross_quantity is not None else None,
                "tare_quantity": None,
                "net_quantity": float(quantity) if quantity is not None else None,
                "unit_price": None,
                "line_total": line_total,
                "vat_code": None,
                "raw_row_text": normalize_text_block(" | ".join(compact_row_parts)),
            }
        )
    return _clean_delivery_items(items)


def _split_martini_line_cells(line: str) -> list[str]:
    return [
        normalize_text_block(part)
        for part in re.split(r"\s*\|\s*", line)
        if normalize_text_block(part)
    ]


def _parse_martini_product_line(line: str) -> dict[str, object] | None:
    cells = _split_martini_line_cells(line)
    if not cells:
        return None
    head = cells[0]
    match = re.match(r"^(?:(\d{6,12})\s+)?(\d{10,14})\s+(.+)$", head)
    if not match:
        return None
    internal_code, ean_code, description = match.groups()
    description = normalize_text_block(description)
    if _is_noise_delivery_item(description, line):
        return None
    return {
        "internal_code": internal_code,
        "product_code": ean_code,
        "description": description,
        "quantity_cells": cells[1:],
    }


def _martini_pack_size_from_description(description: str | None) -> float | None:
    normalized = _normalize_ocr_latin_confusables(description or "")
    for match in re.finditer(r"\b(\d{1,2})\s*[xX]\s*(?:\d{2,3}|[A-Za-z]{1,4})", normalized):
        parsed = _parse_euro_amount(match.group(1))
        if parsed is not None and 1 <= parsed <= 48:
            return float(parsed)
    return None


def _martini_numeric_cell_values(cells: list[str]) -> list[float]:
    values: list[float] = []
    for cell in cells:
        normalized = normalize_text_block(cell)
        if not normalized:
            continue
        if re.fullmatch(r"\d{6,}", normalized):
            continue
        for token in re.findall(r"\d+(?:[.,]\d+)?", normalized):
            parsed = _parse_euro_amount(token)
            if parsed is None or parsed <= 0 or parsed > 5000:
                continue
            values.append(float(parsed))
    return values


def _martini_integer_like_values(values: list[float]) -> list[int]:
    integers: list[int] = []
    for value in values:
        if not _is_integer_like(value, tolerance=0.001):
            continue
        rounded = int(round(value))
        if 0 < rounded <= 2000:
            integers.append(rounded)
    return integers


def _pick_martini_quantities(
    values: list[float],
    *,
    pack_size: float | None,
    prefer_single_as_cartons: bool,
) -> tuple[float | None, float | None, float | None]:
    integers = _martini_integer_like_values(values)
    if not integers:
        return (None, None, None)

    if pack_size is not None and pack_size > 0:
        pack_integer = int(round(pack_size))
        for index, value in enumerate(integers[:-1]):
            if value != pack_integer:
                continue
            cartons = float(integers[index + 1])
            bottles = float(integers[index + 2]) if index + 2 < len(integers) else round(pack_size * cartons, 6)
            return (pack_size, cartons, bottles)

        if len(integers) >= 2:
            first, second = integers[0], integers[1]
            if str(first).startswith(str(pack_integer)) and second <= 20:
                cartons = float(second)
                return (pack_size, cartons, round(pack_size * cartons, 6))
            if abs(second - (pack_size * first)) <= 0.001:
                cartons = float(first)
                return (pack_size, cartons, float(second))
            if first > pack_size and first % pack_integer == 0:
                bottles = float(first)
                return (pack_size, round(bottles / pack_size, 6), bottles)

        if len(integers) == 1:
            only_value = float(integers[0])
            if prefer_single_as_cartons or only_value <= pack_size:
                return (pack_size, only_value, round(pack_size * only_value, 6))
            if integers[0] % pack_integer == 0:
                return (pack_size, round(only_value / pack_size, 6), only_value)
            return (pack_size, only_value, round(pack_size * only_value, 6))

    if len(integers) >= 2:
        return (float(integers[0]), float(integers[1]), None)
    return (None, float(integers[0]), None)


def _martini_line_is_table_stop(line: str) -> bool:
    normalized = normalize_text_block(line).casefold()
    return any(
        token in normalized
        for token in (
            "dichiaro di aver ricevuto",
            "firma del destinatario",
            "giorno di chiusura",
            "modalita idonee",
            "ai sensi dell'art",
            "totale",
        )
    )


def _extract_martini_structured_delivery_note_items(lines: list[str]) -> list[dict[str, object]]:
    items: list[dict[str, object]] = []
    index = 0
    pending_quantity_cells: list[str] = []
    pending_quantity_lines: list[str] = []
    while index < len(lines):
        line = lines[index]
        if _martini_line_is_table_stop(line):
            break
        product = _parse_martini_product_line(line)
        if product is None:
            cells = _split_martini_line_cells(line)
            if cells and _martini_numeric_cell_values(cells):
                pending_quantity_cells.extend(cells)
                pending_quantity_lines.append(line)
            index += 1
            continue

        description = str(product["description"])
        product_quantity_values = _martini_numeric_cell_values(list(product.get("quantity_cells") or []))
        pack_size = _martini_pack_size_from_description(description)
        parsed_pack_size, pack_count, gross_quantity = _pick_martini_quantities(
            product_quantity_values,
            pack_size=pack_size,
            prefer_single_as_cartons=True,
        )
        block_lines = [line]
        lookahead = index + 1
        if pack_count is None:
            continuation_cells: list[str] = []
            while lookahead < len(lines):
                next_line = lines[lookahead]
                if _martini_line_is_table_stop(next_line) or _parse_martini_product_line(next_line) is not None:
                    break
                block_lines.append(next_line)
                continuation_cells.extend(_split_martini_line_cells(next_line))
                lookahead += 1
            continuation_quantity_values = _martini_numeric_cell_values(pending_quantity_cells + continuation_cells)
            parsed_pack_size, pack_count, gross_quantity = _pick_martini_quantities(
                continuation_quantity_values,
                pack_size=pack_size,
                prefer_single_as_cartons=False,
            )
            if pack_count is not None and pending_quantity_lines:
                block_lines = [*pending_quantity_lines, *block_lines]
        if gross_quantity is None and parsed_pack_size is not None and pack_count is not None:
            gross_quantity = round(parsed_pack_size * pack_count, 6)
        if pack_count is None:
            pending_quantity_cells = []
            pending_quantity_lines = []
            index = max(lookahead, index + 1)
            continue

        items.append(
            {
                "line_index": len(items),
                "product_code": product.get("product_code"),
                "iso_code": product.get("internal_code"),
                "description": description,
                "category_code": None,
                "unit_code": "CT",
                "pack_count": float(pack_count),
                "quantity": float(pack_count),
                "gross_quantity": float(gross_quantity) if gross_quantity is not None else None,
                "tare_quantity": None,
                "net_quantity": float(pack_count),
                "unit_price": None,
                "line_total": None,
                "vat_code": None,
                "raw_row_text": normalize_text_block(" | ".join(block_lines)),
            }
        )
        pending_quantity_cells = []
        pending_quantity_lines = []
        index = max(lookahead, index + 1) if product_quantity_values == [] else index + 1

    return _clean_delivery_items(items)


def _extract_montenegro_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    header_index: int | None = None
    for index, line in enumerate(lines):
        nearby = _normalize_match_text(" ".join(lines[index : min(index + 4, len(lines))]))
        if "articolo" in nearby and "lotto" in nearby and "descrizione" in nearby and "quantita" in nearby:
            header_index = index
            break
    if header_index is None:
        return []

    stop_tokens = (
        "causale del trasporto",
        "dichiaro di aver ricevuto",
        "firma del destinatario",
        "in caso di reclamo",
        "peso netto",
        "totale colli",
        "trasporto a mezzo",
    )
    items: list[dict[str, object]] = []
    i = header_index + 1
    while i < len(lines):
        line = lines[i]
        normalized = line.casefold()
        if any(token in normalized for token in stop_tokens):
            break

        cells = [normalize_text_block(part) for part in re.split(r"\s*\|\s*", line) if normalize_text_block(part)]
        if len(cells) < 5 or not re.fullmatch(r"\d{5,}", cells[0]):
            i += 1
            continue

        description = ""
        consumed = 1
        if i + 1 < len(lines):
            next_line = normalize_text_block(lines[i + 1])
            next_cells = [normalize_text_block(part) for part in re.split(r"\s*\|\s*", next_line) if normalize_text_block(part)]
            if (
                "|" not in next_line
                and not (next_cells and re.fullmatch(r"\d{5,}", next_cells[0]))
                and _looks_like_delivery_description_line(next_line)
            ):
                description = next_line
                consumed = 2

        if not description:
            i += consumed
            continue

        quantity_parts: list[tuple[str, float]] = []
        for cell in cells[2:]:
            if re.fullmatch(r"\d{10,}", cell):
                continue
            quantity = _parse_delivery_quantity_line(cell)
            if quantity is not None:
                quantity_parts.append((cell, quantity))

        if not quantity_parts:
            i += consumed
            continue

        pack_count = quantity_parts[0][1]
        unit_multiplier = quantity_parts[1][1] if len(quantity_parts) >= 2 else None
        gross_quantity = quantity_parts[2][1] if len(quantity_parts) >= 3 else None
        if gross_quantity is None and pack_count is not None and unit_multiplier is not None:
            gross_quantity = pack_count * unit_multiplier

        unit_code = _normalize_stacked_delivery_lot_code(quantity_parts[0][0])
        if unit_code is None:
            latinized_quantity_cell = quantity_parts[0][0].upper().replace("С", "C").replace("Т", "T")
            unit_match = re.search(r"\b(CT|CS|BT|PZ|KG|LT)\b", latinized_quantity_cell)
            unit_code = unit_match.group(1) if unit_match else None
        items.append(
            {
                "line_index": len(items),
                "product_code": cells[0],
                "iso_code": None,
                "description": description,
                "category_code": None,
                "unit_code": unit_code,
                "pack_count": pack_count,
                "quantity": pack_count,
                "gross_quantity": gross_quantity,
                "tare_quantity": None,
                "net_quantity": pack_count,
                "unit_price": None,
                "line_total": None,
                "vat_code": None,
                "raw_row_text": normalize_text_block(" | ".join([line, description])),
            }
        )
        i += consumed

    return _clean_delivery_items(items)


def _looks_like_stacked_delivery_item_start(line: str, next_line: str | None = None) -> bool:
    normalized = normalize_text_block(line).casefold()
    if not normalized:
        return False
    if any(token in normalized for token in ("descrizione", "quantita", "prezzo", "totale", "imposta", "cliente", "destinatario")):
        return False
    inline_match = re.match(r"^(\S+)\s+(.+)$", normalize_text_block(line))
    if inline_match and _looks_like_stacked_delivery_code(inline_match.group(1)) and _looks_like_delivery_description_line(inline_match.group(2)):
        return True
    if _looks_like_stacked_delivery_code(line) and next_line and _looks_like_delivery_description_line(next_line):
        return True
    return False


def _extract_stacked_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    header_index: int | None = None
    for index, line in enumerate(lines):
        normalized = _normalize_match_text(line)
        nearby = _normalize_match_text(" ".join(lines[index : min(index + 20, len(lines))]))
        if (
            "c art" in normalized
            and "descrizione" in nearby
            and "quantita" in nearby
            and "prezzo" in nearby
        ) or ("descrizione" in normalized and "quantita" in normalized and "prezzo" in normalized):
            header_index = index
            break
    if header_index is None:
        return []

    stop_tokens = (
        "totale merce",
        "totale documento",
        "totale a pagare",
        "imposta di bollo",
        "riepiloghiiva",
        "data inizio trasporto",
        "causale del trasporto",
        "firma del destinatario",
    )
    body = lines[header_index + 1 :]
    items: list[dict[str, object]] = []
    i = 0
    while i < len(body):
        line = body[i]
        next_line = body[i + 1] if i + 1 < len(body) else None
        normalized = line.casefold()
        if any(token in normalized for token in stop_tokens):
            break
        if not _looks_like_stacked_delivery_item_start(line, next_line):
            i += 1
            continue

        product_code: str | None = None
        description: str | None = None
        consumed = 1
        inline_match = re.match(r"^(\S+)\s+(.+)$", normalize_text_block(line))
        if inline_match and _looks_like_stacked_delivery_code(inline_match.group(1)) and _looks_like_delivery_description_line(inline_match.group(2)):
            product_code = inline_match.group(1)
            description = normalize_text_block(inline_match.group(2))
        else:
            product_code = normalize_text_block(line)
            description = normalize_text_block(next_line or "")
            consumed = 2

        block_lines = [line]
        if consumed == 2 and next_line:
            block_lines.append(next_line)
        j = i + consumed
        while j < len(body):
            candidate = body[j]
            candidate_next = body[j + 1] if j + 1 < len(body) else None
            candidate_normalized = candidate.casefold()
            if any(token in candidate_normalized for token in stop_tokens):
                break
            if _looks_like_stacked_delivery_item_start(candidate, candidate_next):
                break
            block_lines.append(candidate)
            j += 1

        lot_code = None
        pack_count = None
        unit_multiplier = None
        unit_price = None
        line_total = None
        vat_code = None
        for block_line in block_lines[1:]:
            stripped = normalize_text_block(block_line)
            normalized_lot_code = _normalize_stacked_delivery_lot_code(stripped)
            if normalized_lot_code is not None:
                lot_code = normalized_lot_code
                continue
            qty_pair_match = re.fullmatch(r"(\d+)\s*[xX]\s*(\d+)", stripped, re.IGNORECASE)
            if qty_pair_match:
                try:
                    pack_count = float(qty_pair_match.group(1))
                    unit_multiplier = float(qty_pair_match.group(2))
                except ValueError:
                    pass
                continue
            if re.fullmatch(r"\d+\s*[xX]", stripped):
                try:
                    pack_count = float(re.sub(r"\s*[xX]$", "", stripped))
                except ValueError:
                    pass
                continue
            if re.fullmatch(r"\d+", stripped) and pack_count is not None and unit_multiplier is None:
                try:
                    unit_multiplier = float(stripped)
                except ValueError:
                    pass
                continue
            if re.fullmatch(r"\d+[.,]\d{3,4}", stripped) and unit_price is None:
                unit_price = _parse_euro_amount(stripped)
                continue
            total_match = re.fullmatch(r"([0-9][0-9.,]*)\s+(\d{2})", stripped)
            if total_match:
                line_total = _parse_euro_amount(total_match.group(1))
                vat_code = total_match.group(2)
                continue
            decimal_tokens = _extract_decimal_tokens(stripped)
            if unit_price is None and decimal_tokens:
                high_precision_tokens = [token for token in decimal_tokens if len(token.split(",")[-1] if "," in token else token.split(".")[-1]) >= 3]
                candidate_token = high_precision_tokens[0] if high_precision_tokens else (decimal_tokens[0] if len(decimal_tokens) >= 2 else None)
                if candidate_token is not None:
                    parsed_candidate = _parse_euro_amount(candidate_token)
                    if parsed_candidate is not None:
                        unit_price = parsed_candidate

        gross_quantity = None
        if pack_count is not None and unit_multiplier is not None:
            gross_quantity = pack_count * unit_multiplier
        elif pack_count is not None:
            gross_quantity = pack_count

        items.append(
            {
                "line_index": len(items),
                "product_code": product_code,
                "iso_code": None,
                "description": description,
                "category_code": None,
                "unit_code": lot_code,
                "pack_count": pack_count,
                "quantity": pack_count,
                "gross_quantity": gross_quantity,
                "tare_quantity": None,
                "net_quantity": pack_count,
                "unit_price": unit_price,
                "line_total": line_total,
                "vat_code": vat_code,
                "raw_row_text": normalize_text_block(" | ".join(block_lines)),
            }
        )
        i = j
    return _clean_delivery_items(items)


def _looks_like_eleven_unit_code(value: str | None) -> bool:
    token = normalize_text_block(value or "").casefold().replace(".", "")
    return token in _ELEVEN_UNIT_CODES


def _parse_eleven_quantity_cell(value: str | None) -> float | None:
    raw_value = normalize_text_block(value or "")
    if not raw_value or "%" in raw_value:
        return None
    match = re.search(r"\d+(?:[.,]\d+)?", raw_value)
    if not match:
        return None
    return _parse_euro_amount(match.group(0))


def _extract_eleven_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [normalize_text_block(line) for line in (text or "").splitlines() if normalize_text_block(line)]
    items: list[dict[str, object]] = []
    in_items_table = False
    stop_tokens = (
        "scadenze",
        "controllate la merce",
        "totale merce",
        "totale netto",
        "riepilogo",
        "spese bolli",
        "pagamento",
        "banca d'appoggio",
        "vettore",
        "trasporto a mezzo",
    )

    for line in lines:
        normalized_line = line.casefold()
        if not in_items_table:
            if "articolo" in normalized_line and "descrizione" in normalized_line and "quantit" in normalized_line:
                in_items_table = True
            continue
        if any(token in normalized_line for token in stop_tokens):
            break
        if "|" not in line:
            continue
        if normalized_line.startswith(("iva |", "iva ")) or "totale imponibile" in normalized_line:
            continue
        if re.match(r"^\d{2}\s*\|\s*iva\b", normalized_line):
            continue

        cells = [normalize_text_block(cell) for cell in line.split("|") if normalize_text_block(cell)]
        if len(cells) < 3:
            continue

        product_code: str | None = None
        description: str | None = None
        unit_code: str | None = None
        quantity: float | None = None
        unit_price: float | None = None

        if len(cells) >= 4 and _looks_like_eleven_unit_code(cells[2]):
            quantity = _parse_eleven_quantity_cell(cells[3])
            if quantity is None:
                continue
            product_code = cells[0]
            description = cells[1]
            unit_code = cells[2].upper()
            if len(cells) >= 5:
                unit_price = _parse_euro_amount(cells[4])
        elif len(cells) >= 3 and _looks_like_eleven_unit_code(cells[1]):
            quantity = _parse_eleven_quantity_cell(cells[2])
            if quantity is None:
                continue
            description = cells[0]
            unit_code = cells[1].upper()
            if len(cells) >= 4:
                unit_price = _parse_euro_amount(cells[3])
        else:
            quantity = _parse_eleven_quantity_cell(cells[2])
            if quantity is None:
                continue
            product_code = cells[0]
            description = cells[1]
            if len(cells) >= 4:
                amount_tokens = _extract_amounts_from_line(cells[3])
                unit_price = amount_tokens[0] if amount_tokens else _parse_euro_amount(cells[3])

        raw_row_text = normalize_text_block(line)
        if _is_noise_delivery_item(description, raw_row_text):
            continue
        amount_tokens = _extract_amounts_from_line(raw_row_text)
        if unit_price is None and len(amount_tokens) >= 2:
            unit_price = amount_tokens[0]
        vat_match = re.search(r"\b(\d{2})\s*$", raw_row_text)
        items.append(
            {
                "line_index": len(items),
                "product_code": product_code,
                "iso_code": None,
                "description": description,
                "category_code": None,
                "unit_code": unit_code,
                "pack_count": quantity,
                "quantity": quantity,
                "gross_quantity": quantity,
                "tare_quantity": None,
                "net_quantity": quantity,
                "unit_price": unit_price,
                "line_total": amount_tokens[-1] if amount_tokens else None,
                "vat_code": vat_match.group(1) if vat_match else None,
                "raw_row_text": raw_row_text,
            }
        )
    return _clean_delivery_items(items)


def _parse_moet_line_quantity_and_unit_price(cells: list[str]) -> tuple[float | None, float | None]:
    if not cells:
        return (None, None)
    first_cell = normalize_text_block(cells[0])
    combined_match = re.fullmatch(
        r"(\d+(?:[.,]\d+)?)\s+(\d{1,3}(?:\.\d{3})*,\d{2}|\d+,\d{2})",
        first_cell,
    )
    if combined_match:
        return (_parse_euro_amount(combined_match.group(1)), _parse_euro_amount(combined_match.group(2)))

    quantity = _parse_euro_amount(first_cell)
    unit_price = _parse_euro_amount(normalize_text_block(cells[1])) if len(cells) >= 2 else None
    return (quantity, unit_price)


def _parse_moet_line_total(cells: list[str]) -> float | None:
    for cell in reversed(cells):
        normalized = normalize_text_block(cell)
        if not normalized:
            continue
        total_with_vat_match = re.search(
            r"(\d{1,3}(?:\.\d{3})*,\d{2}|\d+,\d{2})\s+(?:0[,.]00\s+)?22[,.]00\s*$",
            normalized,
        )
        if total_with_vat_match:
            parsed = _parse_euro_amount(total_with_vat_match.group(1))
            if parsed and parsed > 0:
                return parsed
            continue
        if re.fullmatch(r"0[,.]00\s+22[,.]00", normalized):
            continue
        if normalized.endswith("-"):
            continue
        parsed = _parse_euro_amount(normalized)
        if parsed and parsed > 0:
            return parsed
    return None


def _parse_moet_invoice_item_line(cells: list[str], line_index: int) -> dict[str, object] | None:
    normalized_cells = [normalize_text_block(cell) for cell in cells if normalize_text_block(cell)]
    if len(normalized_cells) < 3:
        return None

    product_code: str | None = None
    description: str | None = None
    detail_cells: list[str] = []
    if re.fullmatch(r"\d{6,}", normalized_cells[0]):
        product_code = normalized_cells[0]
        description = normalized_cells[1] if len(normalized_cells) >= 2 else None
        detail_cells = normalized_cells[2:]
    else:
        inline_match = re.match(r"^(\d{6,})\s+(.+)$", normalized_cells[0])
        if inline_match:
            product_code = inline_match.group(1)
            description = normalize_text_block(inline_match.group(2))
            detail_cells = normalized_cells[1:]

    if not product_code or not description or not detail_cells:
        return None

    quantity, unit_price = _parse_moet_line_quantity_and_unit_price(detail_cells)
    line_total = _parse_moet_line_total(detail_cells)
    if quantity is None:
        return None

    item = {
        "line_index": line_index,
        "product_code": product_code,
        "iso_code": None,
        "description": description,
        "category_code": None,
        "unit_code": "bt",
        "pack_count": None,
        "quantity": quantity,
        "gross_quantity": quantity,
        "tare_quantity": None,
        "net_quantity": quantity,
        "unit_price": unit_price,
        "line_total": line_total,
        "vat_code": "22",
        "raw_row_text": normalize_text_block(" | ".join(normalized_cells)),
    }
    if _is_noise_delivery_item(str(item.get("description") or ""), str(item.get("raw_row_text") or "")):
        return None
    return item


def _extract_moet_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    if not lines:
        return []

    items: list[dict[str, object]] = []
    in_table = False
    stop_tokens = (
        "annotazioni",
        "contributo ambientale",
        "imponibile",
        "totale fattura",
        "mh portfolio",
        "la merce viaggia",
    )
    for line in lines:
        normalized_line = line.casefold()
        if "codice" in normalized_line and "descrizione" in normalized_line and ("q.ta" in normalized_line or "q.tà" in normalized_line):
            in_table = True
            continue
        if not in_table:
            continue
        if any(token in normalized_line for token in stop_tokens):
            break
        if not re.match(r"^\d{6,}(?:\s|\|)", normalize_text_block(line)):
            continue
        cells = [
            normalize_text_block(part)
            for part in re.split(r"\s*\|\s*", line)
            if normalize_text_block(part)
        ]
        item = _parse_moet_invoice_item_line(cells, len(items))
        if item is not None:
            items.append(item)

    return _clean_delivery_items(items)


def _extract_supplier_specific_delivery_note_items(
    supplier_name: str | None,
    *,
    preview_text: str,
    source_text: str,
) -> list[dict[str, object]]:
    supplier = normalize_text_block(supplier_name or "").casefold()
    if not (preview_text or source_text):
        return []
    if "eleven" in supplier:
        preview_items = _extract_eleven_delivery_note_items_from_text(preview_text)
        if preview_items:
            return preview_items
        return _extract_eleven_delivery_note_items_from_text(source_text)
    if "martini" in supplier:
        preview_items = _extract_martini_delivery_note_items_from_text(preview_text)
        if preview_items:
            return preview_items
        return _extract_martini_delivery_note_items_from_text(source_text)
    if "montenegro" in supplier or "reduzzi" in supplier:
        preview_items = _extract_montenegro_delivery_note_items_from_text(preview_text)
        if preview_items:
            return preview_items
        return _extract_montenegro_delivery_note_items_from_text(source_text)
    if "moet" in supplier or "hennessy" in supplier:
        preview_items = _extract_moet_delivery_note_items_from_text(preview_text)
        if preview_items:
            return preview_items
        return _extract_moet_delivery_note_items_from_text(source_text)
    if "laconi" in supplier:
        preview_items = _extract_laconi_delivery_note_items_from_preview_text(preview_text)
        if preview_items:
            return preview_items
        return _extract_stacked_delivery_note_items_from_text(source_text or preview_text)
    if "cavallaro" in supplier:
        preview_items = _extract_cavallaro_delivery_note_items_from_text(preview_text)
        if preview_items:
            return preview_items
        return _extract_cavallaro_delivery_note_items_from_text(source_text)
    return []


def _extract_cavallaro_delivery_note_items_from_text(text: str) -> list[dict[str, object]]:
    split_items = _extract_cavallaro_split_delivery_note_items_from_text(text)
    if split_items:
        return split_items

    lines = [line.strip() for line in normalize_text_block(text).splitlines() if line.strip()]
    items: list[dict[str, object]] = []
    for line in lines:
        values = [
            normalize_text_block(part)
            for part in re.split(r"\s*\|\s*", line)
            if normalize_text_block(part)
        ]
        parsed_item = _parse_delivery_note_table_row(values, len(items))
        if parsed_item is not None:
            items.append(parsed_item)
    cleaned_items = _clean_delivery_items(items)
    if cleaned_items:
        return cleaned_items
    return _extract_cavallaro_wrapped_delivery_note_items_from_text(text)


def _extract_delivery_note_items_from_ocr_document(document: dict[str, object]) -> list[dict[str, object]]:
    document_text = str(document.get("text") or "")
    rows = _extract_google_ocr_rows(document)
    if not rows:
        return _extract_delivery_note_items_from_text(document_text)

    header_index: int | None = None
    for index, row in enumerate(rows):
        normalized = normalize_text_block(str(row.get("text") or "")).casefold()
        if "descrizione merce" in normalized and "codice" in normalized:
            header_index = index
            break
    if header_index is None:
        return _extract_delivery_note_items_from_text(document_text)

    stop_tokens = (
        "causale del trasporto",
        "peso netto",
        "totale colli",
        "firma",
        "trasporto a mezzo",
    )
    body_rows: list[dict[str, object]] = []
    for row in rows[header_index + 1 :]:
        normalized = normalize_text_block(str(row.get("text") or "")).casefold()
        if any(token in normalized for token in stop_tokens):
            break
        body_rows.append(row)

    items: list[dict[str, object]] = []
    for line_index, row in enumerate(body_rows):
        values = [
            normalize_text_block(part)
            for part in re.split(r"\s*\|\s*", str(row.get("text") or ""))
            if normalize_text_block(part)
        ]
        if not _looks_like_delivery_note_item_row(values):
            continue

        parsed_item = _parse_delivery_note_table_row(values, line_index)
        if parsed_item is None:
            continue
        parsed_item["raw_row_text"] = normalize_text_block(str(row.get("text") or ""))
        items.append(parsed_item)
    cleaned_items = _clean_delivery_items(items)
    if cleaned_items:
        return cleaned_items
    return _extract_delivery_note_items_from_text(document_text)


def _extract_invoice_fields_from_document_ai(document: dict[str, object]) -> dict[str, object]:
    entities = document.get("entities") if isinstance(document.get("entities"), list) else []
    if not isinstance(entities, list):
        return {}

    extracted: dict[str, object] = {}
    for raw_entity in entities:
        if not isinstance(raw_entity, dict):
            continue
        entity_type = normalize_text_block(str(raw_entity.get("type") or "")).casefold()
        if not entity_type:
            continue
        value = _document_ai_entity_normalized_text(raw_entity) or _document_ai_entity_text(raw_entity)
        if not value:
            continue
        if entity_type in {"supplier_name", "supplier", "vendor_name"} and not extracted.get("supplier_name"):
            extracted["supplier_name"] = value
        elif entity_type in {"invoice_date", "due_date"} and not extracted.get("document_date"):
            parsed_date = _parse_iso_or_italian_date(value)
            if parsed_date:
                extracted["document_date"] = parsed_date
        elif entity_type in {"invoice_id", "invoice_number"} and not extracted.get("document_number"):
            extracted["document_number"] = value
        elif entity_type in {"total_amount", "amount_due", "net_amount"} and extracted.get("total_amount") is None:
            parsed_amount = _parse_euro_amount(value)
            if parsed_amount is not None:
                extracted["total_amount"] = parsed_amount

    if extracted:
        extracted["document_type"] = "invoice"
    return extracted


def _maybe_process_invoice_with_document_ai(
    *,
    raw_bytes: bytes,
    mime_type: str,
    display_name: str,
    ocr_text: str,
) -> dict[str, object] | None:
    settings = get_settings()
    invoice_processor_id = (settings.google_document_ai_invoice_processor_id or "").strip()
    if not invoice_processor_id:
        return None

    preliminary_source = f"{display_name}\n{ocr_text}"
    if _classify_document_type(preliminary_source, display_name) not in {"invoice", "instant_invoice"}:
        return None

    try:
        return process_document_with_document_ai(
            raw_bytes=raw_bytes,
            mime_type=mime_type,
            processor_id=invoice_processor_id,
        )
    except Exception:
        return None


def _extract_document_date(text: str, filename: str) -> str | None:
    normalized = normalize_text_block(text)
    lines = [line.strip() for line in normalized.splitlines() if line.strip()]

    for index, line in enumerate(lines):
        line_lower = line.casefold()
        if re.search(r"\bdata(?:\s+documento|\s+fattura|\s+ddt|\s+bolla)?\b", line, re.IGNORECASE):
            inline_date = _parse_plausible_italian_document_date(line)
            if inline_date:
                return inline_date
            for candidate_index in range(index + 1, min(index + 8, len(lines))):
                parsed = _parse_plausible_italian_document_date(lines[candidate_index])
                if parsed:
                    return parsed
        if ("numero" in line_lower and "data" in line_lower) or re.fullmatch(r"n\.?", line, re.IGNORECASE):
            for candidate_index in range(index + 1, min(index + 8, len(lines))):
                parsed = _parse_plausible_italian_document_date(lines[candidate_index])
                if parsed:
                    return parsed
        combined_date = _parse_plausible_italian_document_date(line)
        if combined_date:
            context = " ".join(lines[max(0, index - 6) : index + 1])
            if _has_legal_reference_context(context):
                continue
            if any(token in context.casefold() for token in ("documento", "fattura", "ddt", "bolla", "numero", "data")):
                return combined_date

    for pattern in (
        r"\b(?:[A-Z0-9]{1,4}[-./])?boll[a-z.]*\s+[A-Z0-9./-]*\d[A-Z0-9./-]*\s+(\d{2}[/-]\d{2}[/-]\d{2,4})",
        r"\bdata(?:\s+documento|\s+fattura|\s+ddt|\s+bolla)?\s*[:.]?\s*(\d{2}[/-]\d{2}[/-]\d{2,4})",
        r"\bpagina\s+\d+\s+\d{1,12}\s+(\d{2}[/-]\d{2}[/-]\d{2,4})\b",
    ):
        match = re.search(pattern, normalized, re.IGNORECASE)
        if match:
            context = normalized[max(0, match.start() - 80) : match.end() + 80]
            parsed = _parse_plausible_italian_document_date(match.group(1), context=context)
            if parsed:
                return parsed

    for match in re.finditer(r"\bdel\s+(\d{2}[/-]\d{2}[/-]\d{2,4})", normalized, re.IGNORECASE):
        context = normalized[max(0, match.start() - 48) : match.start()]
        if _has_legal_reference_context(context):
            continue
        parsed = _parse_plausible_italian_document_date(match.group(1), context=context)
        if parsed:
            return parsed

    filename_match = re.search(r"(\d{2})(\d{2})(20\d{2})", Path(filename).stem)
    if filename_match:
        try:
            parsed = datetime.strptime("".join(filename_match.groups()), "%d%m%Y").date()
            parsed_iso = parsed.isoformat()
            return parsed_iso if _is_plausible_fiscal_document_date(parsed_iso) else None
        except ValueError:
            return None
    return None


def _extract_document_number(text: str, document_type: str, filename: str) -> str | None:
    normalized = normalize_text_block(text)
    patterns = []
    if document_type in {"invoice", "instant_invoice"}:
        patterns.extend(
            [
                r"\bn\.?\s*ordine\s*/\s*data\b[\s\S]{0,180}?\|\s*([A-Z0-9]{4,})[./-]\d{2}[./-]\d{2,4}",
                r"\bn\.?\s*ordine\s*/\s*data\s*[:#]?\s*([A-Z0-9]{4,})[./-]\d{2}[./-]\d{2,4}",
                r"\bfattura(?:\s+\w+)?\s*(?:n(?:umero)?\.?|nr\.?)?\s*[:#]?\s*([A-Z0-9][A-Z0-9./-]{3,})",
                r"\bnumero\s+fattura\s*[:#]?\s*([A-Z0-9][A-Z0-9./-]{3,})",
            ]
        )
    elif document_type == "delivery_note":
        patterns.extend(
            [
                r"\bconsegna\s+imm\.?\s+[A-Z]{2,5}\s+([A-Z0-9][A-Z0-9./-]{2,})",
                r"\b(?:[A-Z0-9]{1,4}[-./])?boll[a-z.]*\s+([A-Z0-9./-]*\d[A-Z0-9./-]*)\s+(?:\d{2}[/-]\d{2}[/-]\d{4})",
                r"\b(?:[A-Z0-9]{1,4}[-./])?boll[a-z.]*\s+([A-Z0-9./-]*\d[A-Z0-9./-]*)",
                r"\b(?:ddt|d\.d\.t\.|bolla|documento di trasporto)\s*(?:n(?:umero)?\.?|nr\.?)?\s*[:#]?\s*([A-Z0-9][A-Z0-9./-]{2,})",
            ]
        )
    patterns.extend(
        [
            r"\bnumero\s+documento\s*[:#]?\s*([A-Z0-9][A-Z0-9./-]{2,})",
            r"\bn(?:umero)?\.?\s*documento\s*[:#]?\s*([A-Z0-9][A-Z0-9./-]{2,})",
            r"\bpagina\s+\d+\s+(\d{1,12})\s+\d{2}[/-]\d{2}[/-]\d{4}\b",
        ]
    )

    for pattern in patterns:
        match = re.search(pattern, normalized, re.IGNORECASE)
        if match:
            candidate = match.group(1).strip(" .:-/#")
            if _looks_like_document_number(candidate):
                return candidate

    lines = [line.strip() for line in normalized.splitlines() if line.strip()]
    for index, line in enumerate(lines):
        if re.fullmatch(r"n\.?", line, re.IGNORECASE):
            for candidate_index in range(index + 1, min(index + 5, len(lines))):
                candidate = lines[candidate_index].strip(" .:-/#")
                if _looks_like_document_number(candidate):
                    return candidate
        combined_match = re.search(r"\b([A-Z0-9./-]{3,})\s+(\d{2}[/-]\d{2}[/-]\d{2,4})\b", line, re.IGNORECASE)
        if combined_match:
            context = " ".join(lines[max(0, index - 6) : index + 1])
            if any(token in context.casefold() for token in ("documento", "fattura", "ddt", "bolla", "numero")):
                candidate = combined_match.group(1).strip(" .:-/#")
                if _looks_like_document_number(candidate):
                    return candidate
        if not re.search(r"\bnumero(?:\s+documento|\s+fattura)?\b", line, re.IGNORECASE):
            continue
        for candidate_index in range(index + 1, min(index + 8, len(lines))):
            candidate = lines[candidate_index].strip(" .:-/#")
            if _looks_like_document_number(candidate):
                return candidate

    stem = Path(filename).stem.strip()
    if stem:
        compact_tokens = [token for token in re.split(r"[_\s]+", stem) if token]
        if compact_tokens:
            return compact_tokens[0][:80]
    return None


def _is_company_like_line(line: str) -> bool:
    normalized_line = line.casefold()
    ignored_tokens = {
        "ocr strutturato",
        "ocr ricostruito",
        "fattura",
        "ddt",
        "bolla",
        "boll ",
        "boll.",
        "documento di trasporto",
        "cliente",
        "destinatario",
        "destinazione",
        "totale",
        "imponibile",
        "iva",
        "pagamento",
        "codice",
        "descrizione",
        "valuta",
        "rif.",
        "iban",
        "banca",
        "bank",
        "deutsche",
        "intesa",
        "san paolo",
        "agente",
        "orario consegna",
    }
    if any(token in normalized_line for token in ignored_tokens):
        return False
    word_count = len(re.findall(r"[A-Za-zÀ-ÿ]{2,}", line))
    if word_count < 2:
        return False
    if _LEGAL_ENTITY_PATTERN.search(line):
        return True
    if "%" in line or re.search(r"\b\d{3,}\b", line):
        return False
    digit_ratio = sum(character.isdigit() for character in line) / max(len(line), 1)
    if digit_ratio > 0.2:
        return False
    if len(line) > 80:
        return False
    return True


def _extract_supplier_name(text: str) -> str | None:
    lines: list[str] = []
    for raw_line in normalize_text_block(text).splitlines():
        for candidate in re.split(r"\s+\|\s+", raw_line):
            cleaned = candidate.strip()
            if cleaned:
                lines.append(cleaned)

    for line in lines[:40]:
        if _LEGAL_ENTITY_PATTERN.search(line) and _is_company_like_line(line):
            return line[:120]

    for index, line in enumerate(lines[:30]):
        if "cliente" not in line.casefold():
            continue
        start_index = max(0, index - 10)
        for candidate in lines[start_index:index]:
            if _is_company_like_line(candidate):
                return candidate[:120]

    for line in lines[:18]:
        if _is_company_like_line(line):
            return line[:120]
    return None


def _extract_pdf_header_text(path: Path) -> str:
    try:
        document = fitz.open(str(path))
    except Exception:
        return ""

    try:
        if document.page_count <= 0:
            return ""
        page = document.load_page(0)
        rect = page.rect
        clip = fitz.Rect(
            rect.x0,
            rect.y0,
            rect.x0 + rect.width * 0.45,
            rect.y0 + rect.height * 0.22,
        )
        pixmap = page.get_pixmap(matrix=fitz.Matrix(3, 3), clip=clip, alpha=False)
        with Image.open(io.BytesIO(pixmap.tobytes("png"))) as image:
            extracted = pytesseract.image_to_string(image, lang="ita+eng", config="--psm 6")
        return normalize_text_block(extracted)
    except Exception:
        return ""
    finally:
        document.close()


def _extract_image_header_text(path: Path) -> str:
    try:
        with Image.open(path) as image:
            width, height = image.size
            crop = image.crop((0, 0, max(1, int(width * 0.55)), max(1, int(height * 0.26))))
            extracted = pytesseract.image_to_string(crop.convert("L"), lang="ita+eng", config="--psm 6")
        return normalize_text_block(extracted)
    except Exception:
        return ""


def _extract_visual_header_text(path: Path, kind: str) -> str:
    if kind == "pdf":
        return _extract_pdf_header_text(path)
    if kind == "image":
        return _extract_image_header_text(path)
    return ""


def _sanitize_extracted_text_for_fiscal_parsing(value: str) -> str:
    cleaned_lines: list[str] = []
    for raw_line in normalize_text_block(value).splitlines():
        line = raw_line.strip()
        if not line:
            continue
        normalized = line.casefold()
        if normalized in {"ocr strutturato con coordinate:", "ocr ricostruito per card:"}:
            continue
        if normalized.startswith("card x="):
            continue
        if normalized.startswith("row y=") and ":" in line:
            line = line.split(":", 1)[1].strip()
        line = re.sub(r"\[x=\d+\s+[A-Z]+\]\s*", "", line)
        line = re.sub(r"^(?:TITLE|DETAIL):\s*", "", line, flags=re.IGNORECASE)
        line = re.sub(r"\s{2,}", " ", line).strip(" |")
        if line:
            cleaned_lines.append(line)
    return normalize_text_block("\n".join(cleaned_lines))


def _extract_amount_from_line(line: str) -> float | None:
    amounts = _extract_amounts_from_line(line)
    if not amounts:
        return None
    return amounts[-1]


def _extract_amount_near_label(lines: list[str], *labels: str) -> float | None:
    compiled_labels = [re.compile(label, re.IGNORECASE) for label in labels]
    for index in range(len(lines) - 1, -1, -1):
        line = lines[index]
        if not any(pattern.search(line) for pattern in compiled_labels):
            continue
        candidate_amounts: list[float] = []
        for candidate_index in range(index, min(index + 25, len(lines))):
            candidate_line = lines[candidate_index]
            candidate_amounts.extend(_extract_amounts_from_line(candidate_line))
            for candidate in re.findall(r"(?<!\d)(\d{1,3}(?:\.\d{3})*,\d{1,2}|\d+,\d{1,2})(?!\d)", candidate_line):
                parsed = _parse_euro_amount(candidate)
                if parsed is not None:
                    candidate_amounts.append(parsed)
        if candidate_amounts:
            return max(candidate_amounts)
    return None


def _extract_total_amount(text: str) -> float | None:
    normalized = normalize_text_block(text)
    lines = [line.strip() for line in normalized.splitlines() if line.strip()]

    total_due = _extract_amount_near_label(lines, r"\btotale\s+da\s+pagare\b")
    if total_due is not None:
        return total_due

    total_document = _extract_amount_near_label(
        lines,
        r"\btotale\s+documento\b",
        r"\btotale\s+fattura\b",
        r"\bimporto\s+totale\b",
    )
    if total_document is not None:
        return total_document

    plain_total = _extract_amount_near_label(lines, r"^totale\s*(?:\|\s*[0-9\.,]+)?$")
    if plain_total is not None:
        return plain_total

    patterns = (
        r"\b(?:totale(?:\s+documento|\s+fattura|\s+da\s+pagare)?|importo\s+totale)\s*[:€]?\s*([0-9\.,]+)",
        r"\b(?:totale\s+eur|totale\s+euro)\s*[:€]?\s*([0-9\.,]+)",
    )
    for pattern in patterns:
        match = re.search(pattern, normalized, re.IGNORECASE)
        if not match:
            continue
        parsed = _parse_euro_amount(match.group(1))
        if parsed is not None:
            return parsed
    return None


def _classify_document_type(text: str, filename: str) -> str:
    filename_normalized = Path(filename).name.casefold()
    normalized = f"{normalize_text_block(text)}\n{filename_normalized}".casefold()
    has_instant_invoice = any(
        token in normalized
        for token in (
            "fattura accompagnatoria",
            "fattura accomp",
            "fatt. accomp",
            "fattura immediata",
            "fattura immed",
            "fatt. immed",
            "fatt. imm",
            "fattura differita immediata",
            "fattura proforma",
            "fattura pro forma",
            "proforma",
        )
    )
    has_invoice = any(token in normalized for token in ("fattura accompagnatoria", "fattura immediata", "fattura", "invoice"))
    has_delivery = (
        re.search(r"\b(?:documento di trasporto|d\.?\s*d\.?\s*t\.?|ddt)\b", normalized) is not None
        or re.search(r"\bbolla\b", filename_normalized) is not None
        or re.search(r"\bbolla\s+(?:di\s+consegna|accompagnatoria|merce|ddt)\b", normalized) is not None
        or "consegna imm" in normalized
        or "data uscita merci" in normalized
    )
    compact = re.sub(r"[\s|:]+", " ", normalized)
    if re.search(r"\btipo documento\s+(?:d\.?\s*d\.?\s*t\.?|ddt|bolla)\b", compact):
        return "delivery_note"
    if re.search(r"\btipo documento\s+fattura\b", compact):
        return "instant_invoice" if has_instant_invoice else "invoice"
    if has_instant_invoice:
        return "instant_invoice"
    if has_delivery and has_invoice:
        return "delivery_note"
    if has_invoice and not has_delivery:
        return "invoice"
    if has_delivery and not has_invoice:
        return "delivery_note"
    if has_invoice:
        return "invoice"
    if has_delivery:
        return "delivery_note"
    return "unknown"


def _build_summary(
    *,
    document_type: str,
    document_number: str | None,
    document_date: str | None,
    supplier_name: str | None,
    total_amount: float | None,
    currency: str,
) -> str:
    type_label = {
        "invoice": "Fattura",
        "instant_invoice": "Fattura immediata",
        "delivery_note": "Bolla/DDT",
        "unknown": "Documento fiscale da classificare",
    }.get(document_type, "Documento fiscale")
    parts = [type_label]
    if document_number:
        parts.append(f"n. {document_number}")
    if document_date:
        parts.append(f"del {document_date}")
    if supplier_name:
        parts.append(f"emittente {supplier_name}")
    if total_amount is not None:
        rendered = f"{total_amount:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
        parts.append(f"totale {rendered} {currency}")
    return " | ".join(parts)


def _sanitize_fiscal_filename_part(value: str | None, *, fallback: str) -> str:
    cleaned = normalize_text_block(value or "")
    cleaned = re.sub(r"[\\/:*?\"<>|]+", " ", cleaned)
    cleaned = re.sub(r"\s{2,}", " ", cleaned).strip(" ._-")
    return (cleaned[:90].strip(" ._-") or fallback)


def _safe_fiscal_document_date(value: str | None, *, fallback: str | None = None) -> str:
    parsed = _parse_iso_or_italian_date(value)
    if parsed and _is_plausible_fiscal_document_date(parsed):
        return parsed
    if fallback:
        fallback_parsed = _parse_iso_or_italian_date(fallback)
        if fallback_parsed and _is_plausible_fiscal_document_date(fallback_parsed):
            return fallback_parsed
    return datetime.now(timezone.utc).date().isoformat()


def _fiscal_document_type_filename_label(document_type: str | None) -> str:
    normalized = normalize_text_block(document_type or "").casefold()
    if normalized in {"invoice", "instant_invoice"}:
        return "Fattura"
    if normalized == "delivery_note":
        return "Bolla"
    return "Documento fiscale"


def build_fiscal_document_display_name(
    *,
    document_type: str | None,
    supplier_name: str | None,
    document_date: str | None,
    original_name: str | None,
    fallback_date: str | None = None,
) -> str:
    label = _fiscal_document_type_filename_label(document_type)
    supplier = _sanitize_fiscal_filename_part(supplier_name, fallback="Fornitore sconosciuto")
    safe_date = _safe_fiscal_document_date(document_date, fallback=fallback_date)
    suffix = Path(normalize_asset_name(original_name or "", fallback="documento-fiscale")).suffix.lower()
    filename = f"{label} {supplier} {safe_date}{suffix or '.pdf'}"
    return normalize_asset_name(filename, fallback=f"{label} {safe_date}{suffix or '.pdf'}")


def _preferred_fiscal_document_display_name(record: FiscalDocumentRecord) -> str:
    return build_fiscal_document_display_name(
        document_type=record.document_type,
        supplier_name=record.supplier_name,
        document_date=record.document_date,
        original_name=record.original_name or record.display_name,
        fallback_date=(record.created_at or "")[:10],
    )


def ensure_fiscal_document_display_name(record: FiscalDocumentRecord) -> FiscalDocumentRecord:
    target_display_name = _preferred_fiscal_document_display_name(record)
    if record.display_name == target_display_name:
        return record
    return get_tenant_store().update_fiscal_document(
        record.tenant_id,
        record.id,
        display_name=target_display_name,
    )


def _drive_fiscal_folder_name(
    document_type: str | None = None,
    document_date: str | None = None,
    supplier_name: str | None = None,
) -> str:
    year = _safe_fiscal_document_date(document_date)[:4]
    root = f"Documenti fiscali {year}"
    normalized_type = normalize_text_block(document_type or "").casefold()
    supplier_folder = _sanitize_fiscal_filename_part(supplier_name, fallback="Fornitore sconosciuto")
    if normalized_type == "instant_invoice":
        return f"{root}/Fatture Immediate/{supplier_folder}"
    if normalized_type == "invoice":
        return f"{root}/Fatture/{supplier_folder}"
    if normalized_type == "delivery_note":
        return f"{root}/Bolle/{supplier_folder}"
    return root


async def upload_fiscal_document_to_drive(
    session: SessionIdentity,
    record: FiscalDocumentRecord,
    *,
    raw_bytes: bytes | None = None,
) -> dict[str, str] | None:
    if record.drive_file_id and record.drive_web_url:
        return {
            "file_id": record.drive_file_id,
            "web_url": record.drive_web_url,
        }

    record = ensure_fiscal_document_display_name(record)
    try:
        connection = await get_active_google_workspace_connection(session)
    except Exception:
        return None

    if raw_bytes is None:
        try:
            raw_bytes = Path(record.storage_path).read_bytes()
        except OSError:
            return None
    if not raw_bytes:
        return None

    settings = get_settings()
    try:
        async with httpx.AsyncClient(timeout=settings.google_workspace_request_timeout_seconds) as client:
            uploaded = await upload_binary_file_to_drive(
                client,
                access_token=connection.access_token,
                filename=record.display_name,
                mime_type=record.mime_type,
                raw_bytes=raw_bytes,
                folder_reference=_drive_fiscal_folder_name(
                    record.document_type,
                    record.document_date,
                    record.supplier_name,
                ),
            )
    except Exception:
        return None

    get_tenant_store().update_fiscal_document(
        record.tenant_id,
        record.id,
        display_name=record.display_name,
        drive_file_id=uploaded.get("file_id"),
        drive_web_url=uploaded.get("web_url"),
        drive_uploaded_at=datetime.now(timezone.utc).isoformat(),
    )
    return uploaded


def _extract_fiscal_document_text(storage_path: Path, *, kind: str, display_name: str) -> str:
    mime_type = resolve_asset_mime_type(display_name, None)

    if kind in {"pdf", "image"}:
        try:
            raw_bytes = storage_path.read_bytes()
        except OSError:
            raw_bytes = b""
        if raw_bytes:
            try:
                document_ai_result = process_document_with_document_ai(
                    raw_bytes=raw_bytes,
                    mime_type=mime_type,
                )
            except Exception:
                document_ai_result = None
            if document_ai_result is not None:
                extracted = normalize_text_block(str(document_ai_result.get("text") or ""))
                if extracted:
                    return extracted

    return extract_menu_asset_text(storage_path, kind)


def _build_google_first_context(storage_path: Path, *, kind: str, display_name: str) -> dict[str, object]:
    mime_type = resolve_asset_mime_type(display_name, None)
    ocr_result: dict[str, object] | None = None
    invoice_result: dict[str, object] | None = None
    extracted_text = ""

    if kind in {"pdf", "image"}:
        try:
            raw_bytes = storage_path.read_bytes()
        except OSError:
            raw_bytes = b""
        if raw_bytes:
            try:
                ocr_result = process_document_with_document_ai(
                    raw_bytes=raw_bytes,
                    mime_type=mime_type,
                )
            except Exception:
                ocr_result = None
            extracted_text = normalize_text_block(str((ocr_result or {}).get("text") or ""))
            invoice_result = _maybe_process_invoice_with_document_ai(
                raw_bytes=raw_bytes,
                mime_type=mime_type,
                display_name=display_name,
                ocr_text=extracted_text,
            )

    if not extracted_text:
        extracted_text = extract_menu_asset_text(storage_path, kind)

    return {
        "extracted_text": extracted_text,
        "ocr_document": (ocr_result or {}).get("document") if isinstance(ocr_result, dict) else {},
        "invoice_document": (invoice_result or {}).get("document") if isinstance(invoice_result, dict) else {},
    }


def _analyze_fiscal_document_file(storage_path: Path, *, kind: str, display_name: str) -> dict[str, object]:
    settings = get_settings()
    google_context = _build_google_first_context(storage_path, kind=kind, display_name=display_name)
    ocr_document = google_context.get("ocr_document") if isinstance(google_context.get("ocr_document"), dict) else {}
    extracted_text = normalize_text_block(str(google_context.get("extracted_text") or ""))
    cleaned_text = truncate_text(extracted_text, settings.menu_asset_extracted_text_max_chars)
    parsing_text = _sanitize_extracted_text_for_fiscal_parsing(cleaned_text)
    preview_text = truncate_text(
        _build_google_ocr_preview_text(ocr_document) if isinstance(ocr_document, dict) else "",
        settings.menu_asset_extracted_text_max_chars,
    )
    header_text = cleaned_text or _extract_visual_header_text(storage_path, kind)
    source_text = parsing_text or cleaned_text
    invoice_fields = _extract_invoice_fields_from_document_ai(
        google_context.get("invoice_document") if isinstance(google_context.get("invoice_document"), dict) else {}
    )
    classified_document_type = _classify_document_type(source_text, display_name)
    invoice_document_type = invoice_fields.get("document_type") if isinstance(invoice_fields.get("document_type"), str) else None
    if classified_document_type == "instant_invoice":
        document_type = "instant_invoice"
    elif classified_document_type == "delivery_note":
        document_type = "delivery_note"
    else:
        document_type = str(invoice_document_type or classified_document_type)
    extracted_document_number = _extract_document_number(f"{preview_text}\n{source_text}", document_type, display_name)
    invoice_document_number = invoice_fields.get("document_number") if isinstance(invoice_fields.get("document_number"), str) else None
    document_number = (
        invoice_document_number
        if _looks_like_document_number(invoice_document_number)
        else extracted_document_number
    )
    invoice_document_date = invoice_fields.get("document_date") if isinstance(invoice_fields.get("document_date"), str) else None
    parsed_invoice_document_date = _parse_iso_or_italian_date(invoice_document_date)
    document_date = (
        parsed_invoice_document_date
        if parsed_invoice_document_date and _is_plausible_fiscal_document_date(parsed_invoice_document_date)
        else _extract_document_date(source_text, display_name)
    )
    known_supplier_name = _extract_known_supplier_name_from_text(f"{preview_text}\n{source_text}\n{header_text}")
    if known_supplier_name:
        supplier_name = known_supplier_name
    elif isinstance(invoice_fields.get("supplier_name"), str):
        supplier_name = invoice_fields.get("supplier_name")
    else:
        supplier_name = (
            _extract_supplier_name_from_ocr_document(ocr_document)
            or _extract_supplier_name(source_text)
            or _extract_supplier_name(header_text)
        )
    extracted_total_amount = _extract_total_amount(f"{preview_text}\n{source_text}")
    invoice_total_amount = (
        float(invoice_fields["total_amount"])
        if invoice_fields.get("total_amount") is not None
        else None
    )
    total_amount = extracted_total_amount if extracted_total_amount is not None else invoice_total_amount
    parsed_items: list[dict[str, object]] = []
    if document_type in _ORDER_RECONCILABLE_DOCUMENT_TYPES:
        parsed_items = _extract_supplier_specific_delivery_note_items(
            supplier_name,
            preview_text=preview_text,
            source_text=source_text,
        )
        if not parsed_items and isinstance(ocr_document, dict):
            parsed_items = _extract_delivery_note_items_from_ocr_document(ocr_document)
        if not parsed_items:
            parsed_items = _extract_delivery_note_items_from_text(preview_text or source_text)
    parsed_items = [_enrich_document_item_pricing(item) for item in parsed_items if isinstance(item, dict)]
    summary_text = _build_summary(
        document_type=document_type,
        document_number=document_number,
        document_date=document_date,
        supplier_name=supplier_name,
        total_amount=total_amount,
        currency="EUR",
    )
    return {
        "extracted_text": cleaned_text,
        "document_type": document_type,
        "document_number": document_number,
        "document_date": document_date,
        "supplier_name": supplier_name,
        "total_amount": total_amount,
        "summary_text": summary_text,
        "preview_text": preview_text,
        "parsed_items": parsed_items,
    }


def find_existing_fiscal_document_by_hash(
    session: SessionIdentity,
    *,
    file_hash: str,
) -> FiscalDocumentRecord | None:
    store = get_tenant_store()
    direct_match = store.get_fiscal_document_by_hash(session.tenant_id, file_hash)
    if direct_match is not None:
        return direct_match

    for candidate in store.list_fiscal_documents(session.tenant_id):
        if candidate.file_hash:
            continue
        storage_path = Path(candidate.storage_path)
        if not storage_path.exists() or not storage_path.is_file():
            continue
        try:
            candidate_hash = hashlib.sha256(storage_path.read_bytes()).hexdigest()
        except OSError:
            continue
        if candidate_hash != file_hash:
            continue
        return store.update_fiscal_document(session.tenant_id, candidate.id, file_hash=file_hash)
    return None


async def ingest_fiscal_document(session: SessionIdentity, upload: UploadFile) -> FiscalDocumentRecord:
    raw_bytes = await upload.read()
    record = ingest_fiscal_document_bytes(
        session,
        filename=upload.filename or "documento-fiscale",
        content_type=upload.content_type,
        raw_bytes=raw_bytes,
    )
    await upload_fiscal_document_to_drive(session, record, raw_bytes=raw_bytes)
    return get_tenant_store().get_fiscal_document(session.tenant_id, record.id) or record


def ingest_fiscal_document_bytes(
    session: SessionIdentity,
    *,
    filename: str,
    content_type: str | None,
    raw_bytes: bytes,
) -> FiscalDocumentRecord:
    settings = get_settings()
    store = get_tenant_store()
    file_hash = hashlib.sha256(raw_bytes).hexdigest() if raw_bytes else None
    if file_hash:
        existing = find_existing_fiscal_document_by_hash(session, file_hash=file_hash)
        if existing is not None:
            return ensure_fiscal_document_display_name(existing)
    display_name = normalize_asset_name(filename, fallback="documento-fiscale")
    mime_type = resolve_asset_mime_type(display_name, content_type)
    kind = resolve_asset_kind(display_name, mime_type)

    if kind == "other":
        raise HTTPException(
            status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
            detail=f"Formato non supportato per {display_name}. Carica PDF, immagini o testo.",
        )

    if not raw_bytes:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"{display_name}: file vuoto.")
    if len(raw_bytes) > settings.menu_asset_max_upload_bytes:
        raise HTTPException(
            status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
            detail=f"{display_name}: supera il limite di {settings.menu_asset_max_upload_bytes // (1024 * 1024)} MB.",
        )

    document_id = f"fdoc_{uuid.uuid4().hex}"
    suffix = Path(display_name).suffix.lower() or (".pdf" if kind == "pdf" else ".txt")
    storage_dir = store.fiscal_documents_directory(session.tenant_id)
    storage_path = storage_dir / f"{document_id}{suffix}"
    storage_path.write_bytes(raw_bytes)

    record = store.create_fiscal_document(
        document_id=document_id,
        tenant_id=session.tenant_id,
        original_name=display_name,
        display_name=display_name,
        mime_type=mime_type,
        kind=kind,
        file_size_bytes=len(raw_bytes),
        file_hash=file_hash,
        storage_path=str(storage_path),
        document_type="unknown",
        status="processing",
        matching_status="not_started",
        review_status="to_review",
    )

    extracted_text = ""
    try:
        analysis = _analyze_fiscal_document_file(storage_path, kind=kind, display_name=display_name)
        extracted_text = str(analysis["extracted_text"] or "")
        analyzed_document_type = str(analysis["document_type"])
        analyzed_document_date = analysis["document_date"] if isinstance(analysis["document_date"], str) else None
        analyzed_supplier_name = analysis["supplier_name"] if isinstance(analysis["supplier_name"], str) else None
        fiscal_display_name = build_fiscal_document_display_name(
            document_type=analyzed_document_type,
            supplier_name=analyzed_supplier_name,
            document_date=analyzed_document_date,
            original_name=display_name,
            fallback_date=datetime.now(timezone.utc).date().isoformat(),
        )
        updated = store.update_fiscal_document(
            session.tenant_id,
            document_id,
            display_name=fiscal_display_name,
            document_type=analyzed_document_type,
            document_number=analysis["document_number"] if isinstance(analysis["document_number"], str) else None,
            document_date=analyzed_document_date,
            supplier_name=analyzed_supplier_name,
            total_amount=float(analysis["total_amount"]) if analysis["total_amount"] is not None else None,
            currency="EUR",
            summary_text=str(analysis["summary_text"] or ""),
            extracted_text=extracted_text,
            preview_text=str(analysis["preview_text"] or ""),
            status="ready",
            matching_status="pending",
            review_status="to_review",
        )
        parsed_items = analysis.get("parsed_items")
        if isinstance(parsed_items, list):
            stored_items = store.replace_fiscal_document_items(
                session.tenant_id,
                document_id,
                [item for item in parsed_items if isinstance(item, dict)],
            )
            order_match = build_fiscal_document_order_match(
                session,
                supplier_name=updated.supplier_name,
                document_date=updated.document_date,
                document_type=updated.document_type,
                document_items=_fiscal_document_items_for_order_match(stored_items),
            )
            if isinstance(order_match, dict) and order_match.get("status") == "matched":
                updated = store.update_fiscal_document(
                    session.tenant_id,
                    document_id,
                    matching_status="ready_for_match",
                )
                mark_order_batch_fiscal_document_match(
                    session,
                    document_id=document_id,
                    order_match=order_match,
                )
                if _fiscal_order_match_is_exact_with_prices(order_match):
                    apply_delivery_note_storno_to_matched_order(
                        session,
                        document_id=document_id,
                        order_match=order_match,
                    )
                    refreshed = store.get_fiscal_document(session.tenant_id, document_id)
                    if refreshed is not None:
                        updated = refreshed
            if _fiscal_order_match_has_discrepancies(order_match):
                from app.services.push_notification_service import send_fiscal_document_discrepancy_notification

                send_fiscal_document_discrepancy_notification(session, updated, order_match or {})
        return updated
    except Exception as exc:
        detail = normalize_text_block(str(exc)) or "Analisi documento fiscale non riuscita."
        return store.update_fiscal_document(
            session.tenant_id,
            document_id,
            extracted_text=truncate_text(extracted_text, settings.menu_asset_extracted_text_max_chars),
            summary_text="Documento fiscale caricato ma non ancora leggibile in modo affidabile.",
            status="error",
            error_detail=truncate_text(detail, 500),
            matching_status="not_started",
            review_status="to_review",
        )
