from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from difflib import SequenceMatcher
import json
from pathlib import Path
import re
from urllib.parse import quote
import sqlite3
from typing import Any
import unicodedata
from urllib.parse import urlencode

from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import RedirectResponse
import httpx

from app.api.deps import require_session
from app.core.config import get_settings
from app.models.google_workspace import (
    GoogleWorkspaceAuthorizationResponse,
    GoogleWorkspaceConnection,
    GoogleWorkspaceCreateRequest,
    GoogleWorkspaceCreateResponse,
    GoogleWorkspaceDocumentPreview,
    GoogleWorkspacePreviewRequest,
    GoogleWorkspaceStatusResponse,
)
from app.services.google_workspace_client import (
    GOOGLE_DOCS_ENDPOINT,
    GOOGLE_DRIVE_ENDPOINT,
    GOOGLE_SHEETS_ENDPOINT,
    build_google_authorization_url,
    build_google_code_challenge,
    exchange_google_code_for_tokens,
    fetch_google_account_email,
    is_google_workspace_configured,
)
from app.services.google_workspace_session import (
    build_google_workspace_connection,
    ensure_google_workspace_configured,
    get_active_google_workspace_connection,
)
from app.services.google_workspace_store import get_google_workspace_store
from app.services.llm_client import request_llm_chat_completion
from app.services.tenant_store import SessionIdentity, get_tenant_store
from shared.assistant_profiles import get_assistant_profile


router = APIRouter()
GOOGLE_GMAIL_READONLY_SCOPE = "https://www.googleapis.com/auth/gmail.readonly"


_DOCUMENT_QUERY_STOPWORDS = {
    "a",
    "ad",
    "ai",
    "al",
    "alla",
    "allo",
    "che",
    "con",
    "da",
    "dal",
    "dalla",
    "dei",
    "del",
    "della",
    "di",
    "doc",
    "docs",
    "documento",
    "documenti",
    "e",
    "economico",
    "economici",
    "economiche",
    "fai",
    "fammi",
    "foglio",
    "fogli",
    "genera",
    "genera",
    "google",
    "ho",
    "i",
    "il",
    "in",
    "lista",
    "lo",
    "piu",
    "più",
    "prepara",
    "prezzo",
    "prezzi",
    "prodotti",
    "prodotto",
    "salva",
    "scrivi",
    "sheet",
    "tutta",
    "tutte",
    "tutti",
    "tutto",
    "tra",
    "un",
    "una",
}

_FOOD_QUERY_ALIASES = {
    "food",
    "cibo",
    "cucina",
    "ingredienti",
    "alimentari",
    "mangiare",
    "pranzo",
    "cena",
}

_DRINK_QUERY_ALIASES = {
    "drink",
    "bevande",
    "bevanda",
    "bar",
    "cocktail",
    "spirits",
    "distillati",
    "birra",
    "birre",
    "vino",
    "vini",
    "liquori",
    "analcolici",
}

_FOOD_CATEGORY_HINTS = {
    "carne",
    "pesce",
    "pane",
    "salse",
    "formaggio",
    "pasta",
    "riso",
    "verd",
    "dolc",
    "cibo",
    "food",
}

_DRINK_CATEGORY_HINTS = {
    "drink",
    "bevand",
    "bar",
    "wine",
    "vino",
    "birra",
    "spirit",
    "liquor",
    "cocktail",
    "analcol",
    "soft",
}

_FOOD_NAME_HINTS = {
    "pane",
    "pizza",
    "pasta",
    "riso",
    "pomodoro",
    "mozzarella",
    "bacon",
    "pollo",
    "manzo",
    "maiale",
    "salmone",
    "gamber",
    "salsa",
    "formaggio",
    "burro",
    "farina",
    "crocchett",
    "bun",
    "toast",
    "piadina",
    "bruschett",
    "burger",
    "club",
}

_DRINK_NAME_HINTS = {
    "gin",
    "vodka",
    "rum",
    "whisky",
    "whiskey",
    "tequila",
    "mezcal",
    "amaro",
    "aperol",
    "campari",
    "bitter",
    "birra",
    "prosecco",
    "vino",
    "tonica",
    "coca cola",
    "coca",
    "sprite",
    "fanta",
    "chinotto",
    "aranciata",
    "tisana",
    "succo",
    "soda",
    "brandy",
    "cachaca",
    "cognac",
    "distillato",
    "liq",
    "liquore",
    "giffard",
    "amarico",
    "crodino",
    "gingerino",
    "acq ",
    "acqua",
}


def _document_preview_reply_text(preview: GoogleWorkspaceDocumentPreview) -> str:
    summary = (preview.summary or "").strip()
    if summary:
        return summary
    if preview.kind == "doc":
        content = (preview.content or "").strip()
        compact = " ".join(content.split())
        if compact:
            return compact[:280]
        return f"Anteprima Google Doc pronta: {preview.title}"
    if preview.rows:
        return f"Anteprima Google Sheet pronta: {preview.title} con {len(preview.rows)} righe."
    return f"Anteprima Google Sheet pronta: {preview.title}"


def _record_documents_run(
    *,
    session: SessionIdentity,
    user_prompt: str,
    assistant_reply: str,
    route: str,
    model: str,
    trace: dict[str, object],
) -> None:
    store = get_tenant_store()
    thread = store.ensure_assistant_thread(session, surface="documents")
    store.append_assistant_messages(
        thread.id,
        [
            {"role": "user", "content": user_prompt},
            {"role": "assistant", "content": assistant_reply},
        ],
    )
    store.create_assistant_run(
        thread_id=thread.id,
        session=session,
        surface="documents",
        route=route,
        model=model,
        user_message=user_prompt,
        assistant_reply=assistant_reply,
        trace=trace,
    )


@dataclass(frozen=True)
class CatalogProduct:
    id: int
    product_name: str
    lot_code: str
    supplier_name: str
    product_code: str | None
    final_price_vat: float | None
    category: str | None


def _normalize_search_text(value: str) -> str:
    normalized = unicodedata.normalize("NFKD", value or "").encode("ascii", "ignore").decode("ascii")
    lowered = normalized.lower().replace("’", "'")
    lowered = re.sub(r"[^a-z0-9]+", " ", lowered)
    return " ".join(lowered.split())


def _tokenize_search_query(value: str) -> list[str]:
    tokens = [token for token in _normalize_search_text(value).split(" ") if token]
    filtered = [token for token in tokens if token not in _DOCUMENT_QUERY_STOPWORDS]
    return filtered or tokens


def _tokens_match(left: str, right: str) -> bool:
    if left == right:
        return True
    if len(left) >= 4 and len(right) >= 4 and left[:-1] == right[:-1]:
        return True
    return min(len(left), len(right)) >= 5 and SequenceMatcher(None, left, right).ratio() >= 0.82


def _score_catalog_match(query: str, candidate: str) -> float:
    normalized_query = _normalize_search_text(query)
    normalized_candidate = _normalize_search_text(candidate)
    if not normalized_query or not normalized_candidate:
        return 0.0

    query_tokens = _tokenize_search_query(query)
    candidate_tokens = normalized_candidate.split()
    overlap = sum(1 for token in query_tokens if any(_tokens_match(token, candidate_token) for candidate_token in candidate_tokens))
    phrase_bonus = 2.0 if " " in normalized_query and normalized_query in normalized_candidate else 0.0
    prefix_bonus = sum(1.0 for token in query_tokens if len(token) >= 3 and any(candidate_token.startswith(token) for candidate_token in candidate_tokens))
    if query_tokens and overlap == 0 and phrase_bonus == 0 and prefix_bonus == 0:
        return 0.0
    return overlap * 2.0 + phrase_bonus + prefix_bonus + SequenceMatcher(None, normalized_query, normalized_candidate).ratio()


@contextmanager
def _connect_orders_database(session: SessionIdentity):
    database_path = Path(session.database_path)
    connection = sqlite3.connect(database_path)
    connection.row_factory = sqlite3.Row
    connection.execute("PRAGMA foreign_keys = ON;")
    try:
        yield connection
    finally:
        connection.close()


def _orders_table_columns(connection: sqlite3.Connection, table_name: str) -> set[str]:
    rows = connection.execute(f"PRAGMA table_info({table_name})").fetchall()
    return {str(row["name"]) for row in rows}


def _load_catalog_products(session: SessionIdentity) -> list[CatalogProduct]:
    with _connect_orders_database(session) as connection:
        product_columns = _orders_table_columns(connection, "ordini_products")
        select_product_code = "product_code" if "product_code" in product_columns else "NULL AS product_code"
        select_final_price_vat = "final_price_vat" if "final_price_vat" in product_columns else "NULL AS final_price_vat"
        select_category = "category" if "category" in product_columns else "NULL AS category"
        rows = connection.execute(
            f"""
            SELECT
                id,
                product_name,
                lot_code,
                supplier_name,
                {select_product_code},
                {select_final_price_vat},
                {select_category}
            FROM ordini_products
            WHERE active = 1
            ORDER BY supplier_name ASC, product_name ASC, lot_code ASC
            """
        ).fetchall()

    return [
        CatalogProduct(
            id=int(row["id"]),
            product_name=str(row["product_name"] or "").strip(),
            lot_code=str(row["lot_code"] or "").strip(),
            supplier_name=str(row["supplier_name"] or "").strip(),
            product_code=str(row["product_code"]).strip() if row["product_code"] is not None else None,
            final_price_vat=float(row["final_price_vat"]) if row["final_price_vat"] is not None else None,
            category=str(row["category"]).strip() if row["category"] is not None else None,
        )
        for row in rows
    ]


def _is_catalog_document_request(prompt: str) -> bool:
    normalized = _normalize_search_text(prompt)
    if "prodott" not in normalized and "catalog" not in normalized:
        return False
    if any(
        fragment in normalized
        for fragment in (
            " ordine ",
            " ordini ",
            " ordine#",
            "storico ordini",
            "storico acquisti",
            "righe ordine",
            "batch ordini",
            "acquisti reali",
            "dettagli dell ordine",
            "dettaglio dell ordine",
        )
    ):
        return False
    return any(
        fragment in normalized
        for fragment in (
            "compriamo",
            "acquistiamo",
            "catalogo",
            "prezzo",
            "costoso",
            "economico",
            "fornitore",
            "fornitori",
            "categoria",
            "ordinata",
            "ordinato",
            "ordinati",
            "ordinare",
            "tra i prodotti",
            "nostri fornitori",
            "dei nostri fornitori",
        )
    )


def _clean_query_candidate(candidate: str) -> str | None:
    cleaned = _normalize_search_text(candidate)
    cleaned = re.sub(
        r"\b(?:piu|più|meno|caro|cari|cara|care|economico|economici|economica|economiche|costoso|costosi|costosa|costose|ordinato|ordinata|ordinati|ordinate|prezzo|prezzi)\b",
        " ",
        cleaned,
    )
    tokens = [token for token in cleaned.split() if token and token not in _DOCUMENT_QUERY_STOPWORDS]
    return " ".join(tokens[:6]).strip() or None


def _extract_catalog_query(prompt: str) -> str | None:
    patterns = (
        r"\bprodott[oi]\s+(.+?)\s+che\s+(?:compriamo|acquistiamo|teniamo|usiamo)\b",
        r"\bprodott[oi]\s+(.+?)\s+ordinat[oi]\b",
        r"\bcategoria\s+([A-Za-zÀ-ÿ0-9' ./&_-]{2,80})\b",
    )
    for pattern in patterns:
        match = re.search(pattern, prompt, re.IGNORECASE)
        if not match:
            continue
        candidate = _clean_query_candidate(match.group(1))
        if candidate:
            return candidate

    generic_candidate = _clean_query_candidate(prompt)
    if not generic_candidate:
        return None

    tokens = generic_candidate.split()
    meaningful = [token for token in tokens if token not in {"compriamo", "acquistiamo", "catalogo"}]
    return " ".join(meaningful[:4]).strip() or None


def _extract_catalog_bucket(query: str | None) -> str | None:
    if not query:
        return None
    normalized = _normalize_search_text(query)
    query_tokens = set(_tokenize_search_query(query))
    if normalized in _FOOD_QUERY_ALIASES or query_tokens & _FOOD_QUERY_ALIASES:
        return "food"
    if normalized in _DRINK_QUERY_ALIASES or query_tokens & _DRINK_QUERY_ALIASES:
        return "drink"
    return None


def _product_bucket_labels(product: CatalogProduct) -> set[str]:
    category = _normalize_search_text(product.category or "")
    name = _normalize_search_text(product.product_name)
    supplier = _normalize_search_text(product.supplier_name)
    lot_code = _normalize_search_text(product.lot_code)
    labels: set[str] = set()

    if category:
        if any(hint in category for hint in _FOOD_CATEGORY_HINTS):
            labels.add("food")
        if any(hint in category for hint in _DRINK_CATEGORY_HINTS):
            labels.add("drink")

    if any(hint in name for hint in _DRINK_NAME_HINTS):
        labels.add("drink")
    if any(hint in name for hint in _FOOD_NAME_HINTS):
        labels.add("food")

    if not labels and any(tag in supplier for tag in ("chef", "marr")):
        labels.add("food")

    if not labels and lot_code == "bt":
        labels.add("drink")

    if not labels and any(fragment in lot_code for fragment in ("kg", "g", "pz")) and "ml" not in lot_code and "cl" not in lot_code:
        labels.add("food")

    return labels


def _sort_catalog_products(products: list[CatalogProduct], prompt: str) -> list[CatalogProduct]:
    normalized = _normalize_search_text(prompt)
    ascending_fragments = (
        "piu economico al piu costoso",
        "piu economica al piu costosa",
        "meno caro al piu caro",
        "meno costoso al piu costoso",
        "crescente",
    )
    descending_fragments = (
        "piu costoso al piu economico",
        "piu cara alla meno cara",
        "decrescente",
    )

    def price_key(product: CatalogProduct) -> tuple[int, float, str, str]:
        missing_price = 1 if product.final_price_vat is None else 0
        price_value = float(product.final_price_vat or 0.0)
        return (missing_price, price_value, product.supplier_name.lower(), product.product_name.lower())

    if any(fragment in normalized for fragment in ascending_fragments):
        return sorted(products, key=price_key)
    if any(fragment in normalized for fragment in descending_fragments):
        return sorted(
            products,
            key=lambda product: (
                1 if product.final_price_vat is None else 0,
                -(float(product.final_price_vat or 0.0)),
                product.supplier_name.lower(),
                product.product_name.lower(),
            ),
        )
    return sorted(products, key=lambda product: (product.supplier_name.lower(), product.product_name.lower(), product.lot_code.lower()))


def _filter_catalog_products(products: list[CatalogProduct], query: str | None) -> list[CatalogProduct]:
    if not query:
        return products

    bucket = _extract_catalog_bucket(query)
    if bucket:
        bucket_matches = [product for product in products if bucket in _product_bucket_labels(product)]
        reduced_query = " ".join(
            token
            for token in _tokenize_search_query(query)
            if token not in (_FOOD_QUERY_ALIASES | _DRINK_QUERY_ALIASES)
        ).strip()
        if not reduced_query:
            return bucket_matches
        if bucket_matches:
            nested_matches = _filter_catalog_products(bucket_matches, reduced_query)
            if nested_matches:
                return nested_matches
            return bucket_matches

    ranked: list[tuple[float, CatalogProduct]] = []
    for product in products:
        searchable = " ".join(
            filter(
                None,
                [
                    product.product_name,
                    product.lot_code,
                    product.supplier_name,
                    product.product_code or "",
                    product.category or "",
                ],
            )
        )
        score = _score_catalog_match(query, searchable)
        if score <= 0:
            continue
        ranked.append((score, product))

    ranked.sort(key=lambda item: (-item[0], item[1].supplier_name.lower(), item[1].product_name.lower(), item[1].lot_code.lower()))
    return [product for _, product in ranked]


def _format_eur(value: float | None) -> str:
    if value is None:
        return "Prezzo non disponibile"
    rendered = f"{value:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")
    return f"€{rendered}"


def _product_format_label(product: CatalogProduct) -> str:
    return product.lot_code or product.product_code or "-"


def _build_catalog_doc_preview(
    payload: GoogleWorkspacePreviewRequest,
    *,
    products: list[CatalogProduct],
    query: str | None,
) -> GoogleWorkspaceDocumentPreview:
    if not products:
        detail = "Non ho trovato prodotti reali coerenti con questa richiesta nel catalogo ordini del locale."
        if query:
            detail = f"{detail} Filtro usato: {query}."
        return GoogleWorkspaceDocumentPreview(
            kind="doc",
            title=payload.title,
            summary="Nessun prodotto reale trovato nel catalogo per questa bozza.",
            destination_folder_id=payload.destination_folder_id,
            content=detail,
        )

    supplier_names = sorted({product.supplier_name.strip() for product in products if product.supplier_name.strip()})
    lines = [payload.title, f"Prodotti reali trovati: {len(products)}"]
    if query:
        lines.append(f"Filtro: {query}")
    if supplier_names:
        visible_suppliers = ", ".join(supplier_names)
        lines.append(f"Fornitori di riferimento: {visible_suppliers}")
    lines.append("")

    for index, product in enumerate(products, start=1):
        details = [f"fornitore {product.supplier_name or '-'}"]
        if product.category:
            details.append(f"categoria {product.category}")
        details.append(f"formato { _product_format_label(product) }")
        details.append(_format_eur(product.final_price_vat))
        lines.append(f"{index}. {product.product_name} | " + " | ".join(details))

    return GoogleWorkspaceDocumentPreview(
        kind="doc",
        title=payload.title,
        summary=f"Anteprima basata su {len(products)} prodotti reali del catalogo locale.",
        destination_folder_id=payload.destination_folder_id,
        content="\n".join(lines).strip(),
    )


def _build_catalog_sheet_preview(
    payload: GoogleWorkspacePreviewRequest,
    *,
    products: list[CatalogProduct],
    query: str | None,
) -> GoogleWorkspaceDocumentPreview:
    rows = [
        [
            product.product_name,
            product.category or "",
            product.supplier_name,
            _product_format_label(product),
            _format_eur(product.final_price_vat),
        ]
        for product in products
    ]
    summary = (
        f"Anteprima basata su {len(products)} prodotti reali del catalogo locale."
        if products
        else "Nessun prodotto reale trovato nel catalogo per questa bozza."
    )
    if query:
        summary = f"{summary} Filtro: {query}."
    return GoogleWorkspaceDocumentPreview(
        kind="sheet",
        title=payload.title,
        summary=summary,
        destination_folder_id=payload.destination_folder_id,
        columns=["Prodotto", "Categoria", "Fornitore", "Formato", "Prezzo ivato"],
        rows=rows,
    )


def _build_grounded_catalog_preview(
    session: SessionIdentity,
    payload: GoogleWorkspacePreviewRequest,
) -> GoogleWorkspaceDocumentPreview | None:
    if not _is_catalog_document_request(payload.prompt):
        return None

    products = _load_catalog_products(session)
    query = _extract_catalog_query(payload.prompt)
    filtered_products = _filter_catalog_products(products, query)
    sorted_products = _sort_catalog_products(filtered_products, payload.prompt)

    if payload.kind == "doc":
        return _build_catalog_doc_preview(payload, products=sorted_products, query=query)
    return _build_catalog_sheet_preview(payload, products=sorted_products, query=query)


def _granted_scopes(connection: GoogleWorkspaceConnection | None) -> list[str]:
    if connection is None or not connection.scope.strip():
        return []
    return [item.strip() for item in connection.scope.split() if item.strip()]


def _missing_required_scopes(connection: GoogleWorkspaceConnection | None) -> list[str]:
    required = get_settings().google_workspace_scopes_list
    granted = set(_granted_scopes(connection))
    return [scope for scope in required if scope not in granted]


def _authorization_scopes_for_profile(scope_profile: str | None) -> list[str]:
    base_scopes = list(get_settings().google_workspace_scopes_list)
    normalized = (scope_profile or "").strip().lower()
    if normalized == "fiscal-documents" and GOOGLE_GMAIL_READONLY_SCOPE not in base_scopes:
        base_scopes.append(GOOGLE_GMAIL_READONLY_SCOPE)
    return base_scopes


def _ensure_google_workspace_permissions(
    connection: GoogleWorkspaceConnection,
    *,
    kind: str,
) -> None:
    granted = set(_granted_scopes(connection))
    required_scopes = ["https://www.googleapis.com/auth/drive"]
    if kind == "doc":
        required_scopes.append("https://www.googleapis.com/auth/documents")
    else:
        required_scopes.append("https://www.googleapis.com/auth/spreadsheets")

    missing = [scope for scope in required_scopes if scope not in granted]
    if missing:
        raise HTTPException(
            status_code=409,
            detail=(
                "L'account Google e' collegato ma non ha ancora autorizzato tutti i permessi necessari. "
                "Usa 'Ricollega Google' e accetta anche l'accesso a Docs, Sheets e Drive."
            ),
        )


def _safe_return_to(return_to: str | None) -> str:
    if not return_to:
        return "/documents"
    if not return_to.startswith("/") or return_to.startswith("//"):
        return "/documents"
    return return_to


def _portal_redirect(status: str, *, return_to: str | None = None, detail: str | None = None) -> str:
    settings = get_settings()
    params = {"google": status}
    if detail:
        params["detail"] = detail[:180]
    query = urlencode(params)
    return f"{settings.portal_public_url.rstrip('/')}{_safe_return_to(return_to)}?{query}"


def _extract_json_payload(text: str) -> dict[str, Any]:
    raw = text.strip()
    try:
        payload = json.loads(raw)
        if isinstance(payload, dict):
            return payload
    except json.JSONDecodeError:
        pass

    start_index = raw.find("{")
    end_index = raw.rfind("}")
    if start_index == -1 or end_index == -1 or end_index <= start_index:
        raise HTTPException(status_code=502, detail="L'LLM non ha restituito un JSON valido per la preview")

    try:
        payload = json.loads(raw[start_index : end_index + 1])
    except json.JSONDecodeError as exc:
        raise HTTPException(status_code=502, detail=f"Preview LLM non parseabile: {exc}") from exc

    if not isinstance(payload, dict):
        raise HTTPException(status_code=502, detail="La preview LLM deve essere un oggetto JSON")
    return payload


def _normalize_sheet_rows(payload: dict[str, Any]) -> tuple[list[str], list[list[str]]]:
    settings = get_settings()
    raw_columns = payload.get("columns")
    raw_rows = payload.get("rows")

    if not isinstance(raw_columns, list):
        raise HTTPException(status_code=502, detail="La preview del foglio non contiene colonne valide")

    columns = [str(item).strip() for item in raw_columns if str(item).strip()]
    if not columns:
        raise HTTPException(status_code=502, detail="La preview del foglio richiede almeno una colonna")

    rows: list[list[str]] = []
    if isinstance(raw_rows, list):
        for row in raw_rows[: settings.google_workspace_sheet_preview_max_rows]:
            if not isinstance(row, list):
                continue
            normalized = [str(cell).strip() for cell in row[: len(columns)]]
            while len(normalized) < len(columns):
                normalized.append("")
            rows.append(normalized)

    return columns, rows


async def _generate_preview(
    payload: GoogleWorkspacePreviewRequest,
    *,
    session: SessionIdentity | None = None,
) -> GoogleWorkspaceDocumentPreview:
    documents_profile = get_assistant_profile("documents")
    if session is not None:
        grounded_preview = _build_grounded_catalog_preview(session, payload)
        if grounded_preview is not None:
            return grounded_preview

    if payload.kind == "doc":
        llm_reply, _ = await request_llm_chat_completion(
            [
                {
                    "role": "system",
                    "content": (
                        f"{documents_profile.base_prompt} "
                        "Prepari bozze operative per un locale hospitality. "
                        'Restituisci solo JSON valido con le chiavi "summary" e "document_content". '
                        "document_content deve essere testo semplice in italiano, pronto per Google Docs."
                    ),
                },
                {
                    "role": "user",
                    "content": (
                        f"Titolo documento: {payload.title}\n"
                        f"Istruzioni del gestore:\n{payload.prompt}\n\n"
                        "Crea una bozza concreta, sintetica ma utile."
                    ),
                },
            ],
            temperature=0.2,
        )
        parsed = _extract_json_payload(llm_reply)
        content = str(parsed.get("document_content") or parsed.get("content") or "").strip()
        if not content:
            raise HTTPException(status_code=502, detail="La preview Google Doc non contiene testo utile")
        return GoogleWorkspaceDocumentPreview(
            kind="doc",
            title=payload.title,
            summary=str(parsed.get("summary") or "Bozza pronta per la conferma."),
            destination_folder_id=payload.destination_folder_id,
            content=content,
        )

    llm_reply, _ = await request_llm_chat_completion(
        [
            {
                "role": "system",
                "content": (
                    f"{documents_profile.base_prompt} "
                    "Generi bozze tabellari per un locale hospitality. "
                    'Restituisci solo JSON valido con le chiavi "summary", "columns" e "rows". '
                    "columns deve essere un array di stringhe. rows deve essere un array di righe, ogni riga un array di stringhe."
                ),
            },
            {
                "role": "user",
                "content": (
                    f"Titolo foglio: {payload.title}\n"
                    f"Istruzioni del gestore:\n{payload.prompt}\n\n"
                    "Crea una tabella pronta da salvare su Google Sheets con colonne chiare. Se il brief richiede dati fattuali ma non li hai, dichiaralo nel summary invece di inventarli."
                ),
            },
        ],
        temperature=0.2,
    )
    parsed = _extract_json_payload(llm_reply)
    columns, rows = _normalize_sheet_rows(parsed)
    return GoogleWorkspaceDocumentPreview(
        kind="sheet",
        title=payload.title,
        summary=str(parsed.get("summary") or "Tabella pronta per la conferma."),
        destination_folder_id=payload.destination_folder_id,
        columns=columns,
        rows=rows,
    )


async def _move_file_to_folder(
    client: httpx.AsyncClient,
    *,
    access_token: str,
    file_id: str,
    folder_id: str,
) -> None:
    headers = {"Authorization": f"Bearer {access_token}"}
    metadata_response = await client.get(
        f"{GOOGLE_DRIVE_ENDPOINT}/files/{file_id}",
        headers=headers,
        params={"fields": "parents"},
    )
    metadata_response.raise_for_status()
    parents = metadata_response.json().get("parents", [])
    update_params = {
        "addParents": folder_id,
        "fields": "id, parents",
    }
    if parents:
        update_params["removeParents"] = ",".join(parents)
    update_response = await client.patch(
        f"{GOOGLE_DRIVE_ENDPOINT}/files/{file_id}",
        headers=headers,
        params=update_params,
    )
    update_response.raise_for_status()


async def _resolve_or_create_drive_folder(
    client: httpx.AsyncClient,
    *,
    access_token: str,
    folder_reference: str,
) -> str:
    normalized_reference = folder_reference.strip()
    if not normalized_reference:
        raise HTTPException(status_code=400, detail="La cartella Drive indicata e' vuota")

    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }

    if "/" not in normalized_reference:
        lookup_response = await client.get(
            f"{GOOGLE_DRIVE_ENDPOINT}/files/{quote(normalized_reference, safe='')}",
            headers={"Authorization": f"Bearer {access_token}"},
            params={"fields": "id, name, mimeType"},
        )
        if lookup_response.status_code == 200:
            payload = lookup_response.json()
            if payload.get("mimeType") != "application/vnd.google-apps.folder":
                raise HTTPException(
                    status_code=400,
                    detail="Il valore inserito punta a un file Drive esistente ma non a una cartella",
                )
            return str(payload["id"])

        if lookup_response.status_code not in {400, 403, 404}:
            lookup_response.raise_for_status()

    escaped_reference = normalized_reference.replace("\\", "\\\\").replace("'", "\\'")
    search_response = await client.get(
        f"{GOOGLE_DRIVE_ENDPOINT}/files",
        headers={"Authorization": f"Bearer {access_token}"},
        params={
            "q": (
                "mimeType = 'application/vnd.google-apps.folder' "
                f"and trashed = false and name = '{escaped_reference}'"
            ),
            "pageSize": 1,
            "orderBy": "createdTime",
            "fields": "files(id, name, mimeType, createdTime)",
        },
    )
    search_response.raise_for_status()
    files = search_response.json().get("files", [])
    if isinstance(files, list) and files:
        payload = files[0] if isinstance(files[0], dict) else None
        if payload and payload.get("id"):
            return str(payload["id"])

    create_response = await client.post(
        f"{GOOGLE_DRIVE_ENDPOINT}/files",
        headers=headers,
        params={"fields": "id, name"},
        json={
            "name": normalized_reference,
            "mimeType": "application/vnd.google-apps.folder",
        },
    )
    try:
        create_response.raise_for_status()
    except httpx.HTTPError as exc:
        raise HTTPException(
            status_code=502,
            detail=f"Creazione cartella Drive fallita per '{normalized_reference}': {exc}",
        ) from exc

    return str(create_response.json()["id"])


async def _create_google_doc(
    payload: GoogleWorkspaceCreateRequest,
    *,
    connection: GoogleWorkspaceConnection,
) -> GoogleWorkspaceCreateResponse:
    _ensure_google_workspace_permissions(connection, kind="doc")
    settings = get_settings()
    headers = {
        "Authorization": f"Bearer {connection.access_token}",
        "Content-Type": "application/json",
    }

    try:
        async with httpx.AsyncClient(timeout=settings.google_workspace_request_timeout_seconds) as client:
            create_response = await client.post(
                f"{GOOGLE_DOCS_ENDPOINT}/documents",
                headers=headers,
                json={"title": payload.title},
            )
            create_response.raise_for_status()
            document = create_response.json()
            document_id = str(document["documentId"])

            update_response = await client.post(
                f"{GOOGLE_DOCS_ENDPOINT}/documents/{document_id}:batchUpdate",
                headers=headers,
                json={
                    "requests": [
                        {
                            "insertText": {
                                "location": {"index": 1},
                                "text": payload.content,
                            }
                        }
                    ]
                },
            )
            update_response.raise_for_status()

            if payload.destination_folder_id:
                folder_id = await _resolve_or_create_drive_folder(
                    client,
                    access_token=connection.access_token,
                    folder_reference=payload.destination_folder_id,
                )
                await _move_file_to_folder(
                    client,
                    access_token=connection.access_token,
                    file_id=document_id,
                    folder_id=folder_id,
                )
    except httpx.HTTPError as exc:
        raise HTTPException(status_code=502, detail=f"Creazione Google Doc fallita: {exc}") from exc

    return GoogleWorkspaceCreateResponse(
        kind="doc",
        file_id=document_id,
        title=payload.title,
        web_url=f"https://docs.google.com/document/d/{document_id}/edit",
        destination_folder_id=payload.destination_folder_id,
        account_email=connection.account_email,
    )


async def _create_google_sheet(
    payload: GoogleWorkspaceCreateRequest,
    *,
    connection: GoogleWorkspaceConnection,
) -> GoogleWorkspaceCreateResponse:
    _ensure_google_workspace_permissions(connection, kind="sheet")
    settings = get_settings()
    headers = {
        "Authorization": f"Bearer {connection.access_token}",
        "Content-Type": "application/json",
    }

    try:
        async with httpx.AsyncClient(timeout=settings.google_workspace_request_timeout_seconds) as client:
            create_response = await client.post(
                f"{GOOGLE_SHEETS_ENDPOINT}/spreadsheets",
                headers=headers,
                json={"properties": {"title": payload.title}},
            )
            create_response.raise_for_status()
            spreadsheet = create_response.json()
            spreadsheet_id = str(spreadsheet["spreadsheetId"])
            sheet_id = spreadsheet.get("sheets", [{}])[0].get("properties", {}).get("sheetId", 0)

            values_response = await client.put(
                f"{GOOGLE_SHEETS_ENDPOINT}/spreadsheets/{spreadsheet_id}/values/A1",
                headers=headers,
                params={"valueInputOption": "USER_ENTERED"},
                json={
                    "range": "A1",
                    "majorDimension": "ROWS",
                    "values": [payload.columns, *payload.rows],
                },
            )
            values_response.raise_for_status()

            freeze_response = await client.post(
                f"{GOOGLE_SHEETS_ENDPOINT}/spreadsheets/{spreadsheet_id}:batchUpdate",
                headers=headers,
                json={
                    "requests": [
                        {
                            "updateSheetProperties": {
                                "properties": {
                                    "sheetId": sheet_id,
                                    "gridProperties": {"frozenRowCount": 1},
                                },
                                "fields": "gridProperties.frozenRowCount",
                            }
                        }
                    ]
                },
            )
            freeze_response.raise_for_status()

            if payload.destination_folder_id:
                folder_id = await _resolve_or_create_drive_folder(
                    client,
                    access_token=connection.access_token,
                    folder_reference=payload.destination_folder_id,
                )
                await _move_file_to_folder(
                    client,
                    access_token=connection.access_token,
                    file_id=spreadsheet_id,
                    folder_id=folder_id,
                )
    except httpx.HTTPError as exc:
        raise HTTPException(status_code=502, detail=f"Creazione Google Sheet fallita: {exc}") from exc

    return GoogleWorkspaceCreateResponse(
        kind="sheet",
        file_id=spreadsheet_id,
        title=payload.title,
        web_url=f"https://docs.google.com/spreadsheets/d/{spreadsheet_id}/edit",
        destination_folder_id=payload.destination_folder_id,
        account_email=connection.account_email,
    )


@router.get("/status", response_model=GoogleWorkspaceStatusResponse)
def google_workspace_status(session: SessionIdentity = Depends(require_session)) -> GoogleWorkspaceStatusResponse:
    settings = get_settings()
    connection = get_google_workspace_store().get_connection(session.tenant_id, adopt_legacy_if_needed=True)
    configured = is_google_workspace_configured()
    granted_scopes = _granted_scopes(connection)
    missing_scopes = _missing_required_scopes(connection) if configured and connection else []
    ready = configured and connection is not None and not missing_scopes

    if ready:
        next_step = "Account Google collegato. Genera una preview e conferma la creazione nel cloud."
    elif configured and connection:
        next_step = (
            "Account collegato ma autorizzazione incompleta. Usa 'Ricollega Google' e accetta tutti i permessi "
            "per creare file nel cloud."
        )
    elif configured:
        next_step = "Collega l'account Google del titolare per abilitare la creazione di file nel cloud."
    else:
        next_step = "Configura le credenziali OAuth Google nel backend-hub prima di collegare un account."

    return GoogleWorkspaceStatusResponse(
        configured=configured,
        connected=connection is not None,
        ready=ready,
        account_email=connection.account_email if connection else None,
        expires_at=connection.expires_at if connection else None,
        redirect_uri=settings.google_workspace_redirect_uri if configured else None,
        scopes=settings.google_workspace_scopes_list if configured else [],
        granted_scopes=granted_scopes,
        missing_scopes=missing_scopes,
        next_step=next_step,
    )


@router.get("/oauth/start", response_model=GoogleWorkspaceAuthorizationResponse)
def google_workspace_oauth_start(
    return_to: str | None = Query(default="/documents"),
    scope_profile: str | None = Query(default=None),
    session: SessionIdentity = Depends(require_session),
) -> GoogleWorkspaceAuthorizationResponse:
    ensure_google_workspace_configured()
    store = get_google_workspace_store()
    pending = store.create_pending_authorization(session.tenant_id, _safe_return_to(return_to))
    authorization_url = build_google_authorization_url(
        state=pending.state,
        code_challenge=build_google_code_challenge(pending.code_verifier),
        scopes=_authorization_scopes_for_profile(scope_profile),
    )
    return GoogleWorkspaceAuthorizationResponse(authorization_url=authorization_url)


@router.get("/oauth/callback")
async def google_workspace_oauth_callback(
    state: str | None = None,
    code: str | None = None,
    error: str | None = None,
    error_description: str | None = None,
) -> RedirectResponse:
    store = get_google_workspace_store()

    if error:
        return RedirectResponse(
            url=_portal_redirect("error", detail=error_description or error),
            status_code=307,
        )

    if not state or not code:
        return RedirectResponse(
            url=_portal_redirect("error", detail="Callback Google incompleto"),
            status_code=307,
        )

    pending = store.consume_pending_authorization(state)
    if pending is None:
        return RedirectResponse(
            url=_portal_redirect("error", detail="Stato OAuth Google non valido"),
            status_code=307,
        )

    try:
        token_payload = await exchange_google_code_for_tokens(code=code, code_verifier=pending.code_verifier)
        access_token = token_payload.get("access_token")
        if not access_token:
            raise HTTPException(status_code=502, detail="Google non ha restituito un access token")
        account_email = await fetch_google_account_email(str(access_token))
        if not pending.tenant_id:
            raise HTTPException(status_code=409, detail="Autorizzazione Google priva di tenant associato")
        existing_connection = store.get_connection(pending.tenant_id, adopt_legacy_if_needed=True)
        refresh_token = str(token_payload.get("refresh_token") or "")
        if not refresh_token and existing_connection:
            refresh_token = str(existing_connection.refresh_token or "")
        if not refresh_token:
            raise HTTPException(status_code=502, detail="Google non ha restituito un refresh token")
        connection = build_google_workspace_connection(
            token_payload,
            refresh_token=refresh_token,
            account_email=account_email,
            connected_at=existing_connection.connected_at if existing_connection else None,
        )
        if existing_connection and not connection.scope:
            connection.scope = existing_connection.scope
        store.set_connection(pending.tenant_id, connection)
    except (HTTPException, httpx.HTTPError) as exc:
        detail = exc.detail if isinstance(exc, HTTPException) else str(exc)
        return RedirectResponse(
            url=_portal_redirect("error", return_to=pending.return_to, detail=detail),
            status_code=307,
        )

    return RedirectResponse(
        url=_portal_redirect("connected", return_to=pending.return_to),
        status_code=307,
    )


@router.post("/disconnect")
def google_workspace_disconnect(session: SessionIdentity = Depends(require_session)) -> dict[str, bool]:
    get_google_workspace_store().clear_connection(session.tenant_id)
    return {"disconnected": True}


@router.post("/preview", response_model=GoogleWorkspaceDocumentPreview)
async def google_workspace_preview(
    payload: GoogleWorkspacePreviewRequest,
    session: SessionIdentity = Depends(require_session),
) -> GoogleWorkspaceDocumentPreview:
    from app.services.operational_assistant_service import prepare_google_workspace_preview_with_trace

    outcome = await prepare_google_workspace_preview_with_trace(session, payload)
    _record_documents_run(
        session=session,
        user_prompt=payload.prompt,
        assistant_reply=_document_preview_reply_text(outcome.preview),
        route=outcome.route,
        model=outcome.model,
        trace=outcome.trace,
    )
    return outcome.preview


@router.post("/create", response_model=GoogleWorkspaceCreateResponse)
async def google_workspace_create(
    payload: GoogleWorkspaceCreateRequest,
    session: SessionIdentity = Depends(require_session),
) -> GoogleWorkspaceCreateResponse:
    ensure_google_workspace_configured()
    connection = await get_active_google_workspace_connection(session)

    if payload.kind == "doc":
        created = await _create_google_doc(payload, connection=connection)
    else:
        created = await _create_google_sheet(payload, connection=connection)

    _record_documents_run(
        session=session,
        user_prompt=f"Crea {payload.kind} '{payload.title}'",
        assistant_reply=f"Creato {created.kind} '{created.title}'. Link: {created.web_url}",
        route="documents-direct-create",
        model="documents-direct-create",
        trace={
            "surface": "documents",
            "kind": payload.kind,
            "title": payload.title,
            "row_count": len(payload.rows or []),
            "column_count": len(payload.columns or []),
            "destination_folder_id": payload.destination_folder_id,
            "web_url": created.web_url,
            "file_id": created.file_id,
        },
    )
    return created
