from __future__ import annotations

import asyncio
import base64
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
import hashlib
import io
import re
import time
import uuid

import httpx
from fastapi import HTTPException
from PIL import Image, ImageOps

from app.core.config import get_settings
from app.services.fiscal_document_service import (
    find_existing_fiscal_document_by_hash,
    ingest_fiscal_document_bytes,
    upload_fiscal_document_to_drive,
)
from app.services.google_workspace_session import ensure_google_workspace_configured, get_active_google_workspace_connection
from app.services.google_workspace_store import get_google_workspace_store
from app.services.menu_asset_service import normalize_asset_name, resolve_asset_kind, resolve_asset_mime_type
from app.services.tenant_store import FiscalDocumentInboxItemRecord, SessionIdentity, get_tenant_store


GMAIL_READONLY_SCOPE = "https://www.googleapis.com/auth/gmail.readonly"
GMAIL_API_BASE = "https://gmail.googleapis.com/gmail/v1/users/me"
_FISCAL_KEYWORDS = (
    "fattura",
    "invoice",
    "bolla",
    "ddt",
    "d.d.t",
    "documento di trasporto",
    "nota di credito",
)

_MAILBOX_SYNC_LOCKS: dict[str, asyncio.Lock] = {}
_MAILBOX_RATE_LIMIT_UNTIL: dict[str, float] = {}
_MAILBOX_RATE_LIMIT_COOLDOWN_SECONDS = 600
_GROUPABLE_IMAGE_MIME_TYPES = {
    "image/jpeg",
    "image/jpg",
    "image/png",
    "image/webp",
    "image/tiff",
    "image/bmp",
}


def _drive_fiscal_folder_name(document_type: str | None = None) -> str:
    root = f"Documenti fiscali {datetime.now(timezone.utc).year}"
    normalized_type = (document_type or "").strip().lower()
    if normalized_type == "invoice":
        return f"{root}/Fatture"
    if normalized_type == "delivery_note":
        return f"{root}/Bolle"
    return root


def _decode_gmail_base64(value: str) -> bytes:
    padded = value + "=" * (-len(value) % 4)
    return base64.urlsafe_b64decode(padded.encode("utf-8"))


def _natural_sort_key(value: str | None) -> list[object]:
    return [
        int(part) if part.isdigit() else part.casefold()
        for part in re.split(r"(\d+)", (value or "").strip())
    ]


def _is_groupable_image_attachment(attachment: dict[str, str]) -> bool:
    mime_type = (attachment.get("resolved_mime_type") or "").split(";", 1)[0].strip().lower()
    return attachment.get("resolved_kind") == "image" and mime_type in _GROUPABLE_IMAGE_MIME_TYPES


def _grouped_image_attachment_id(attachments: list[dict[str, str]]) -> str:
    raw_key = "\n".join(
        f"{attachment.get('attachment_id') or ''}|{attachment.get('attachment_name') or attachment.get('filename') or ''}"
        for attachment in attachments
    )
    digest = hashlib.sha1(raw_key.encode("utf-8")).hexdigest()[:24]
    return f"group:images:{digest}"


def _grouped_image_attachment_name(subject: str | None) -> str:
    normalized_subject = normalize_asset_name(subject or "documento-fiscale", fallback="documento-fiscale")
    stem = re.sub(r"\.[A-Za-z0-9]{1,8}$", "", normalized_subject).strip(" ._-")
    return f"{stem or 'documento-fiscale'}-multipagina.pdf"


def _images_to_pdf_bytes(images: list[tuple[str, bytes]]) -> bytes:
    pdf_pages: list[Image.Image] = []
    try:
        for filename, raw_bytes in images:
            if not raw_bytes:
                raise ValueError(f"Allegato immagine vuoto: {filename}")
            with Image.open(io.BytesIO(raw_bytes)) as image:
                page = ImageOps.exif_transpose(image).convert("RGB")
                pdf_pages.append(page.copy())

        if not pdf_pages:
            raise ValueError("Nessuna pagina immagine disponibile.")

        output = io.BytesIO()
        pdf_pages[0].save(output, format="PDF", save_all=True, append_images=pdf_pages[1:])
        return output.getvalue()
    finally:
        for page in pdf_pages:
            page.close()


def _mailbox_sync_lock(key: str) -> asyncio.Lock:
    normalized = key.strip().lower() or "__default__"
    lock = _MAILBOX_SYNC_LOCKS.get(normalized)
    if lock is None:
        lock = asyncio.Lock()
        _MAILBOX_SYNC_LOCKS[normalized] = lock
    return lock


def _completed_inbox_item(item: FiscalDocumentInboxItemRecord | None) -> bool:
    if item is None:
        return False
    status = (item.sync_status or "").strip()
    if status == "unsupported":
        return True
    return status == "imported" and bool((item.document_id or "").strip() or (item.file_hash or "").strip())


def _extract_header_map(payload: dict[str, object]) -> dict[str, str]:
    headers: dict[str, str] = {}
    for item in payload.get("headers", []) if isinstance(payload.get("headers"), list) else []:
        if not isinstance(item, dict):
            continue
        name = str(item.get("name") or "").strip().lower()
        value = str(item.get("value") or "").strip()
        if name and value and name not in headers:
            headers[name] = value
    return headers


def _parse_received_at(header_map: dict[str, str], internal_date: object) -> str | None:
    raw_date = header_map.get("date")
    if raw_date:
        try:
            parsed = parsedate_to_datetime(raw_date)
            if parsed.tzinfo is None:
                parsed = parsed.replace(tzinfo=timezone.utc)
            return parsed.astimezone(timezone.utc).isoformat()
        except (TypeError, ValueError, IndexError):
            pass

    if isinstance(internal_date, str) and internal_date.isdigit():
        return datetime.fromtimestamp(int(internal_date) / 1000, tz=timezone.utc).isoformat()
    return None


def _normalize_text(value: str | None) -> str:
    return re.sub(r"\s+", " ", (value or "").strip().casefold())


def _mailbox_looks_dedicated(inbound_email: str) -> bool:
    local_part = inbound_email.partition("@")[0].strip().casefold()
    return any(
        keyword in local_part
        for keyword in ("invoice", "fattur", "document", "bolla", "ddt", "fornitor")
    )


def _looks_like_fiscal_email(subject: str, snippet: str, attachments: list[dict[str, str]]) -> bool:
    searchable = " ".join([_normalize_text(subject), _normalize_text(snippet)])
    if any(keyword in searchable for keyword in _FISCAL_KEYWORDS):
        return True

    for attachment in attachments:
        candidate = _normalize_text(attachment.get("filename"))
        if any(keyword in candidate for keyword in _FISCAL_KEYWORDS):
            return True
    return False


def _compute_after_timestamp(connection_started_at: str, existing_items: list[FiscalDocumentInboxItemRecord]) -> int:
    try:
        connection_started = datetime.fromisoformat(connection_started_at.replace("Z", "+00:00"))
    except ValueError:
        connection_started = datetime.now(timezone.utc)

    completed_timestamps: list[datetime] = []
    for item in existing_items:
        if not _completed_inbox_item(item):
            continue
        if not item.received_at:
            continue
        try:
            completed_timestamps.append(datetime.fromisoformat(item.received_at.replace("Z", "+00:00")))
        except ValueError:
            continue

    if completed_timestamps:
        return max(0, int(max(completed_timestamps).timestamp()) + 1)
    return max(0, int(connection_started.timestamp()) - 60)


def _collect_attachment_parts(part: dict[str, object]) -> list[dict[str, str]]:
    attachments: list[dict[str, str]] = []
    filename = str(part.get("filename") or "").strip()
    mime_type = str(part.get("mimeType") or "application/octet-stream").strip() or "application/octet-stream"
    part_id = str(part.get("partId") or "").strip()
    body = part.get("body") if isinstance(part.get("body"), dict) else {}
    attachment_id = str(body.get("attachmentId") or "").strip() if isinstance(body, dict) else ""
    inline_data = str(body.get("data") or "").strip() if isinstance(body, dict) else ""

    if filename and (attachment_id or inline_data):
        attachments.append(
            {
                "filename": filename,
                "mime_type": mime_type,
                "attachment_id": attachment_id or f"inline:{part_id or filename}",
                "inline_data": inline_data,
            }
        )

    raw_parts = part.get("parts")
    if isinstance(raw_parts, list):
        for child in raw_parts:
            if isinstance(child, dict):
                attachments.extend(_collect_attachment_parts(child))
    return attachments


async def _gmail_get_json(
    client: httpx.AsyncClient,
    *,
    access_token: str,
    path: str,
    params: dict[str, object] | None = None,
) -> dict[str, object]:
    last_response: httpx.Response | None = None
    for attempt in range(2):
        response = await client.get(
            f"{GMAIL_API_BASE}{path}",
            headers={"Authorization": f"Bearer {access_token}"},
            params=params,
        )
        last_response = response
        if response.status_code != 429:
            break
        retry_after_raw = response.headers.get("Retry-After", "").strip()
        try:
            retry_after = int(retry_after_raw)
        except (TypeError, ValueError):
            retry_after = 3
        if attempt == 0:
            await asyncio.sleep(max(1, min(retry_after, 12)))
            continue
    if last_response is None:
        raise HTTPException(status_code=502, detail="Risposta Gmail non disponibile.")
    try:
        last_response.raise_for_status()
    except httpx.HTTPStatusError as exc:
        if exc.response.status_code == 429:
            raise HTTPException(
                status_code=429,
                detail=(
                    "Gmail sta limitando temporaneamente la sincronizzazione della mailbox. "
                    "Attendi circa 10 minuti e riprova. Se stai usando la stessa casella in piu tenant, "
                    "conviene separarla per evitare conflitti."
                ),
            ) from exc
        raise
    payload = last_response.json()
    return payload if isinstance(payload, dict) else {}


async def _download_attachment_bytes(
    client: httpx.AsyncClient,
    *,
    access_token: str,
    message_id: str,
    attachment_id: str,
    inline_data: str,
) -> bytes:
    if inline_data:
        return _decode_gmail_base64(inline_data)

    payload = await _gmail_get_json(
        client,
        access_token=access_token,
        path=f"/messages/{message_id}/attachments/{attachment_id}",
    )
    data = str(payload.get("data") or "").strip()
    if not data:
        raise HTTPException(status_code=502, detail="Allegato Gmail privo di contenuto.")
    return _decode_gmail_base64(data)


async def sync_fiscal_document_inbox(
    session: SessionIdentity,
    *,
    max_messages: int = 15,
) -> list[FiscalDocumentInboxItemRecord]:
    ensure_google_workspace_configured()
    store = get_tenant_store()
    settings = store.get_fiscal_document_settings(session.tenant_id)
    inbound_email = (settings.inbound_email or "").strip().lower()
    if not inbound_email:
        raise HTTPException(status_code=409, detail="Configura prima la mailbox del locale.")

    connection = await get_active_google_workspace_connection(session)
    granted_scopes = {item.strip() for item in connection.scope.split() if item.strip()}
    if GMAIL_READONLY_SCOPE not in granted_scopes:
        raise HTTPException(
            status_code=409,
            detail="Permesso Gmail non disponibile. Ricollega Google e concedi l'accesso in sola lettura alla mailbox.",
        )

    if connection.account_email and inbound_email != connection.account_email.strip().lower():
        raise HTTPException(
            status_code=409,
            detail="La mailbox configurata non coincide con l'account Google collegato.",
        )

    mailbox_key = (connection.account_email or inbound_email or session.tenant_id).strip().lower()
    async with _mailbox_sync_lock(mailbox_key):
        cooldown_until = _MAILBOX_RATE_LIMIT_UNTIL.get(mailbox_key, 0.0)
        now = time.monotonic()
        if cooldown_until > now:
            wait_seconds = max(1, int(cooldown_until - now))
            raise HTTPException(
                status_code=429,
                detail=f"Gmail ha appena limitato questa mailbox. Riprova tra circa {wait_seconds} secondi.",
            )

        dedicated_mailbox = _mailbox_looks_dedicated(inbound_email)
        existing_items = store.list_fiscal_document_inbox_items(session.tenant_id, limit=200)
        after_timestamp = _compute_after_timestamp(connection.connected_at, existing_items)
        query = f"in:inbox has:attachment after:{after_timestamp}"
        synced_items: list[FiscalDocumentInboxItemRecord] = []
        known_hash_items = {
            (item.file_hash or "").strip(): item
            for item in existing_items
            if (item.file_hash or "").strip()
        }

        def save_inbox_item(
            *,
            retry_item: FiscalDocumentInboxItemRecord | None,
            message_id: str,
            attachment_id: str,
            file_hash: str | None,
            subject: str,
            sender: str | None,
            received_at: str | None,
            attachment_name: str,
            mime_type: str,
            sync_status: str,
            document_id: str | None = None,
            error_detail: str | None = None,
        ) -> FiscalDocumentInboxItemRecord:
            if retry_item is not None:
                return store.update_fiscal_document_inbox_item(
                    session.tenant_id,
                    message_id=retry_item.message_id,
                    attachment_id=retry_item.attachment_id,
                    file_hash=file_hash,
                    subject=subject,
                    sender=sender,
                    received_at=received_at,
                    attachment_name=attachment_name,
                    mime_type=mime_type,
                    sync_status=sync_status,
                    document_id=document_id,
                    error_detail=error_detail,
                )

            return store.create_fiscal_document_inbox_item(
                item_id=f"finbox_{uuid.uuid4().hex}",
                tenant_id=session.tenant_id,
                message_id=message_id,
                attachment_id=attachment_id,
                file_hash=file_hash,
                subject=subject,
                sender=sender,
                received_at=received_at,
                attachment_name=attachment_name,
                mime_type=mime_type,
                sync_status=sync_status,
                document_id=document_id,
                error_detail=error_detail,
            )

        async def import_document_payload(
            *,
            retry_item: FiscalDocumentInboxItemRecord | None,
            message_id: str,
            attachment_id: str,
            subject: str,
            sender: str | None,
            received_at: str | None,
            attachment_name: str,
            mime_type: str,
            raw_bytes: bytes,
        ) -> FiscalDocumentInboxItemRecord:
            file_hash = hashlib.sha256(raw_bytes).hexdigest()
            existing_hash_item = known_hash_items.get(file_hash) or store.get_fiscal_document_inbox_item_by_hash(
                session.tenant_id,
                file_hash,
            )
            if existing_hash_item is not None:
                if retry_item is not None:
                    item = save_inbox_item(
                        retry_item=retry_item,
                        message_id=message_id,
                        attachment_id=attachment_id,
                        file_hash=file_hash,
                        subject=subject,
                        sender=sender,
                        received_at=received_at,
                        attachment_name=attachment_name,
                        mime_type=mime_type,
                        sync_status=existing_hash_item.sync_status,
                        document_id=existing_hash_item.document_id or "",
                        error_detail="",
                    )
                else:
                    item = existing_hash_item
                known_hash_items[file_hash] = item
                return item

            existing_document = find_existing_fiscal_document_by_hash(session, file_hash=file_hash)
            if existing_document is not None:
                await upload_fiscal_document_to_drive(session, existing_document, raw_bytes=raw_bytes)
                item = save_inbox_item(
                    retry_item=retry_item,
                    message_id=message_id,
                    attachment_id=attachment_id,
                    file_hash=file_hash,
                    subject=subject,
                    sender=sender,
                    received_at=received_at,
                    attachment_name=attachment_name,
                    mime_type=mime_type,
                    sync_status="imported",
                    document_id=existing_document.id,
                    error_detail="",
                )
                known_hash_items[file_hash] = item
                return item

            document = ingest_fiscal_document_bytes(
                session,
                filename=attachment_name,
                content_type=mime_type,
                raw_bytes=raw_bytes,
            )
            await upload_fiscal_document_to_drive(session, document, raw_bytes=raw_bytes)
            item = save_inbox_item(
                retry_item=retry_item,
                message_id=message_id,
                attachment_id=attachment_id,
                file_hash=file_hash,
                subject=subject,
                sender=sender,
                received_at=received_at,
                attachment_name=attachment_name,
                mime_type=mime_type,
                sync_status="imported",
                document_id=document.id,
                error_detail="",
            )
            known_hash_items[file_hash] = item
            return item

        def record_inbox_error(
            *,
            retry_item: FiscalDocumentInboxItemRecord | None,
            message_id: str,
            attachment_id: str,
            file_hash: str | None,
            subject: str,
            sender: str | None,
            received_at: str | None,
            attachment_name: str,
            mime_type: str,
            exc: Exception,
        ) -> FiscalDocumentInboxItemRecord:
            return save_inbox_item(
                retry_item=retry_item,
                message_id=message_id,
                attachment_id=attachment_id,
                file_hash=file_hash,
                subject=subject,
                sender=sender,
                received_at=received_at,
                attachment_name=attachment_name,
                mime_type=mime_type,
                sync_status="error",
                error_detail=str(exc)[:500],
            )

        client_timeout = get_settings().google_workspace_request_timeout_seconds

        async def gmail_get_json(
            client: httpx.AsyncClient,
            *,
            access_token: str,
            path: str,
            params: dict[str, object] | None = None,
        ) -> dict[str, object]:
            try:
                return await _gmail_get_json(client, access_token=access_token, path=path, params=params)
            except HTTPException as exc:
                if exc.status_code == 429:
                    _MAILBOX_RATE_LIMIT_UNTIL[mailbox_key] = time.monotonic() + _MAILBOX_RATE_LIMIT_COOLDOWN_SECONDS
                raise

        async with httpx.AsyncClient(timeout=client_timeout) as client:
            payload = await gmail_get_json(
                client,
                access_token=connection.access_token,
                path="/messages",
                params={"maxResults": max(1, min(max_messages, 50)), "q": query},
            )
            raw_messages = payload.get("messages", []) if isinstance(payload.get("messages"), list) else []

            for raw_message in raw_messages:
                if not isinstance(raw_message, dict):
                    continue
                message_id = str(raw_message.get("id") or "").strip()
                if not message_id:
                    continue

                message_payload = await gmail_get_json(
                    client,
                    access_token=connection.access_token,
                    path=f"/messages/{message_id}",
                    params={"format": "full"},
                )
                payload_root = message_payload.get("payload") if isinstance(message_payload.get("payload"), dict) else {}
                header_map = _extract_header_map(payload_root)
                subject = header_map.get("subject") or "Documento fornitori"
                sender = header_map.get("from")
                received_at = _parse_received_at(header_map, message_payload.get("internalDate"))
                snippet = str(message_payload.get("snippet") or "")
                attachment_parts = _collect_attachment_parts(payload_root)
                if not attachment_parts:
                    continue

                normalized_attachments: list[dict[str, str]] = []
                has_supported_attachment = False
                for attachment in attachment_parts:
                    attachment_name = normalize_asset_name(attachment["filename"], fallback="documento-fiscale")
                    mime_type = resolve_asset_mime_type(attachment_name, attachment["mime_type"])
                    kind = resolve_asset_kind(attachment_name, mime_type)
                    if kind != "other":
                        has_supported_attachment = True
                    normalized_attachments.append(
                        {
                            **attachment,
                            "attachment_name": attachment_name,
                            "resolved_mime_type": mime_type,
                            "resolved_kind": kind,
                        }
                    )

                if not has_supported_attachment:
                    continue

                if not dedicated_mailbox and not _looks_like_fiscal_email(subject, snippet, normalized_attachments):
                    continue

                image_attachments = sorted(
                    [attachment for attachment in normalized_attachments if _is_groupable_image_attachment(attachment)],
                    key=lambda attachment: _natural_sort_key(
                        attachment.get("attachment_name") or attachment.get("filename") or ""
                    ),
                )
                grouped_attachment_ids: set[str] = set()
                if len(image_attachments) >= 2 and len(image_attachments) == len(normalized_attachments):
                    group_attachment_id = _grouped_image_attachment_id(image_attachments)
                    group_attachment_name = _grouped_image_attachment_name(subject)
                    group_mime_type = "application/pdf"
                    grouped_attachment_ids = {attachment["attachment_id"] for attachment in image_attachments}
                    existing = store.get_fiscal_document_inbox_item(
                        session.tenant_id,
                        message_id=message_id,
                        attachment_id=group_attachment_id,
                    )
                    stable_existing = store.get_fiscal_document_inbox_item_by_message_attachment_name(
                        session.tenant_id,
                        message_id=message_id,
                        attachment_name=group_attachment_name,
                    )
                    completed_existing = stable_existing if _completed_inbox_item(stable_existing) else None
                    if completed_existing is None and _completed_inbox_item(existing):
                        completed_existing = existing

                    if completed_existing is not None:
                        synced_items.append(completed_existing)
                    else:
                        retry_item = existing or stable_existing
                        group_hash: str | None = None
                        try:
                            image_payloads: list[tuple[str, bytes]] = []
                            for attachment in image_attachments:
                                raw_bytes = await _download_attachment_bytes(
                                    client,
                                    access_token=connection.access_token,
                                    message_id=message_id,
                                    attachment_id=attachment["attachment_id"],
                                    inline_data=attachment["inline_data"],
                                )
                                image_payloads.append((attachment["attachment_name"], raw_bytes))

                            pdf_bytes = _images_to_pdf_bytes(image_payloads)
                            group_hash = hashlib.sha256(pdf_bytes).hexdigest()
                            item = await import_document_payload(
                                retry_item=retry_item,
                                message_id=message_id,
                                attachment_id=group_attachment_id,
                                subject=subject,
                                sender=sender,
                                received_at=received_at,
                                attachment_name=group_attachment_name,
                                mime_type=group_mime_type,
                                raw_bytes=pdf_bytes,
                            )
                        except HTTPException as exc:
                            if exc.status_code == 429:
                                _MAILBOX_RATE_LIMIT_UNTIL[mailbox_key] = time.monotonic() + _MAILBOX_RATE_LIMIT_COOLDOWN_SECONDS
                                raise
                            item = record_inbox_error(
                                retry_item=retry_item,
                                message_id=message_id,
                                attachment_id=group_attachment_id,
                                file_hash=group_hash,
                                subject=subject,
                                sender=sender,
                                received_at=received_at,
                                attachment_name=group_attachment_name,
                                mime_type=group_mime_type,
                                exc=exc,
                            )
                        except Exception as exc:
                            item = record_inbox_error(
                                retry_item=retry_item,
                                message_id=message_id,
                                attachment_id=group_attachment_id,
                                file_hash=group_hash,
                                subject=subject,
                                sender=sender,
                                received_at=received_at,
                                attachment_name=group_attachment_name,
                                mime_type=group_mime_type,
                                exc=exc,
                            )
                        synced_items.append(item)

                for attachment in normalized_attachments:
                    if attachment["attachment_id"] in grouped_attachment_ids:
                        continue

                    attachment_id = attachment["attachment_id"]
                    attachment_name = attachment["attachment_name"]
                    mime_type = attachment["resolved_mime_type"]
                    kind = attachment["resolved_kind"]
                    existing = store.get_fiscal_document_inbox_item(
                        session.tenant_id,
                        message_id=message_id,
                        attachment_id=attachment_id,
                    )
                    stable_existing = store.get_fiscal_document_inbox_item_by_message_attachment_name(
                        session.tenant_id,
                        message_id=message_id,
                        attachment_name=attachment_name,
                    )
                    completed_existing = stable_existing if _completed_inbox_item(stable_existing) else None
                    if completed_existing is None and _completed_inbox_item(existing):
                        completed_existing = existing
                    if completed_existing is not None:
                        synced_items.append(completed_existing)
                        continue

                    retry_item = existing or stable_existing

                    if kind == "other":
                        unsupported_error = "Formato allegato non supportato per l'analisi fiscale automatica."
                        item = save_inbox_item(
                            retry_item=retry_item,
                            message_id=message_id,
                            attachment_id=attachment_id,
                            file_hash="",
                            subject=subject,
                            sender=sender,
                            received_at=received_at,
                            attachment_name=attachment_name,
                            mime_type=mime_type,
                            sync_status="unsupported",
                            document_id="",
                            error_detail=unsupported_error,
                        )
                        synced_items.append(item)
                        continue

                    file_hash: str | None = None
                    try:
                        raw_bytes = await _download_attachment_bytes(
                            client,
                            access_token=connection.access_token,
                            message_id=message_id,
                            attachment_id=attachment_id,
                            inline_data=attachment["inline_data"],
                        )
                        file_hash = hashlib.sha256(raw_bytes).hexdigest()
                        item = await import_document_payload(
                            retry_item=retry_item,
                            message_id=message_id,
                            attachment_id=attachment_id,
                            subject=subject,
                            sender=sender,
                            received_at=received_at,
                            attachment_name=attachment_name,
                            mime_type=mime_type,
                            raw_bytes=raw_bytes,
                        )
                    except HTTPException as exc:
                        if exc.status_code == 429:
                            _MAILBOX_RATE_LIMIT_UNTIL[mailbox_key] = time.monotonic() + _MAILBOX_RATE_LIMIT_COOLDOWN_SECONDS
                            raise
                        item = record_inbox_error(
                            retry_item=retry_item,
                            message_id=message_id,
                            attachment_id=attachment_id,
                            file_hash=file_hash,
                            subject=subject,
                            sender=sender,
                            received_at=received_at,
                            attachment_name=attachment_name,
                            mime_type=mime_type,
                            exc=exc,
                        )
                    except Exception as exc:
                        item = record_inbox_error(
                            retry_item=retry_item,
                            message_id=message_id,
                            attachment_id=attachment_id,
                            file_hash=file_hash,
                            subject=subject,
                            sender=sender,
                            received_at=received_at,
                            attachment_name=attachment_name,
                            mime_type=mime_type,
                            exc=exc,
                        )
                    synced_items.append(item)

        return synced_items


async def sync_all_ready_fiscal_document_inboxes() -> None:
    tenant_store = get_tenant_store()
    seen_mailboxes: set[str] = set()
    for tenant_id in get_google_workspace_store().list_connected_tenant_ids():
        session = tenant_store.build_service_session(tenant_id)
        if session is None:
            continue
        settings = tenant_store.get_fiscal_document_settings(tenant_id)
        if not (settings.inbound_email or "").strip():
            continue
        connection = get_google_workspace_store().get_connection(tenant_id, adopt_legacy_if_needed=True)
        mailbox_key = ((connection.account_email if connection else None) or settings.inbound_email or "").strip().lower()
        if mailbox_key and mailbox_key in seen_mailboxes:
            continue
        if mailbox_key:
            seen_mailboxes.add(mailbox_key)
        try:
            await sync_fiscal_document_inbox(session, max_messages=10)
        except Exception:
            continue
