from collections import defaultdict
from datetime import datetime, timedelta, timezone
import csv
from difflib import SequenceMatcher
import hashlib
import io
import json
import re
import unicodedata
from zoneinfo import ZoneInfo

from fastapi import APIRouter, Body, Depends, HTTPException, Query, Response, status
from fastapi.responses import StreamingResponse
from sqlalchemy import case, inspect, select, text
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, selectinload

from app.api.deps import (
    get_db,
    require_management_token,
    require_query_token,
    require_token,
)
from app.core.config import get_settings
from app.core.database import db_session_context
from app.core.tenancy import AuthContext, authenticate_login
from app.models.order_batch import OrderBatch
from app.models.order_item import OrderItem
from app.models.product import Product
from app.models.seasonal_goal import SeasonalGoal
from app.models.shared_note import SharedNote
from app.models.supplier_catalog import SupplierCatalog, SupplierCatalogItem
from app.models.suspended_order import SuspendedOrder
from app.schemas.common import (
    GoalUpsert,
    LoginRequest,
    OrderCreateRequest,
    OrderDeleteRequest,
    OrderUpdateRequest,
    ProductUpsert,
    SafetyStockSettingsUpdate,
    SharedNoteCreate,
    SupplierCatalogCreateRequest,
    SupplierCatalogPreviewRequest,
    SupplierCatalogRow,
)
from app.services.push_notifications import send_push_payload


router = APIRouter()
protected = APIRouter(dependencies=[Depends(require_management_token)])
DEFAULT_SAFETY_STOCK_DAYS = 7.0
SAFETY_STOCK_NOTIFICATION_STATE_KEY = "understock_items"
ITALIAN_TIMEZONE = ZoneInfo("Europe/Rome")


def _utc_year_bounds(year: int) -> tuple[datetime, datetime]:
    start = datetime(year, 1, 1, tzinfo=timezone.utc)
    end = datetime(year + 1, 1, 1, tzinfo=timezone.utc)
    return start, end


def _format_iso(value: datetime | None) -> str | None:
    if value is None:
        return None
    if value.tzinfo is None:
        value = value.replace(tzinfo=timezone.utc)
    return value.isoformat()


def _product_inventory_key(product_name: str | None, supplier_name: str | None) -> tuple[str, str]:
    return (_normalize_catalog_text(product_name), _normalize_catalog_text(supplier_name))


def _table_exists(db: Session, table_name: str) -> bool:
    bind = db.get_bind()
    if bind is None:
        return False
    return inspect(bind).has_table(table_name)


def _iso_now() -> str:
    return datetime.now(timezone.utc).isoformat()


def _ensure_safety_stock_tables(db: Session) -> None:
    db.execute(
        text(
            """
            CREATE TABLE IF NOT EXISTS ordini_safety_stock_settings (
                key TEXT PRIMARY KEY,
                value TEXT NOT NULL,
                updated_at TEXT NOT NULL
            )
            """
        )
    )
    db.execute(
        text(
            """
            CREATE TABLE IF NOT EXISTS tenant_inventory_consumption_product_stats (
                id TEXT PRIMARY KEY,
                product_id INTEGER,
                product_lookup TEXT NOT NULL,
                supplier_lookup TEXT NOT NULL,
                product_name TEXT NOT NULL,
                supplier_name TEXT NOT NULL,
                total_consumed_units REAL NOT NULL DEFAULT 0,
                workdays_count INTEGER NOT NULL DEFAULT 0,
                consumed_days_count INTEGER NOT NULL DEFAULT 0,
                average_daily_consumed_units REAL NOT NULL DEFAULT 0,
                average_consumed_units_on_consumption_days REAL NOT NULL DEFAULT 0,
                movement_count INTEGER NOT NULL DEFAULT 0,
                first_consumption_date TEXT,
                last_consumption_date TEXT,
                calculation_source TEXT NOT NULL DEFAULT 'manual_consumption_movements',
                created_at TEXT NOT NULL,
                updated_at TEXT NOT NULL,
                UNIQUE (product_lookup, supplier_lookup)
            )
            """
        )
    )
    db.execute(
        text(
            """
            CREATE TABLE IF NOT EXISTS ordini_safety_stock_notification_state (
                key TEXT PRIMARY KEY,
                signature TEXT NOT NULL DEFAULT '',
                item_keys_json TEXT NOT NULL DEFAULT '[]',
                count INTEGER NOT NULL DEFAULT 0,
                notified_at TEXT,
                updated_at TEXT NOT NULL
            )
            """
        )
    )


def _read_safety_stock_days(db: Session) -> float:
    _ensure_safety_stock_tables(db)
    row = db.execute(
        text("SELECT value FROM ordini_safety_stock_settings WHERE key = 'minimum_days' LIMIT 1")
    ).mappings().first()
    if row is None:
        return DEFAULT_SAFETY_STOCK_DAYS
    try:
        parsed = float(row.get("value") or DEFAULT_SAFETY_STOCK_DAYS)
    except (TypeError, ValueError):
        return DEFAULT_SAFETY_STOCK_DAYS
    return max(0.0, min(parsed, 365.0))


def _write_safety_stock_days(db: Session, minimum_days: float) -> float:
    _ensure_safety_stock_tables(db)
    sanitized_days = max(0.0, min(float(minimum_days), 365.0))
    db.execute(
        text(
            """
            INSERT INTO ordini_safety_stock_settings (key, value, updated_at)
            VALUES ('minimum_days', :value, :updated_at)
            ON CONFLICT(key) DO UPDATE SET
                value = excluded.value,
                updated_at = excluded.updated_at
            """
        ),
        {"value": str(round(sanitized_days, 3)), "updated_at": _iso_now()},
    )
    db.commit()
    return sanitized_days


def _inventory_consumption_stat_id(product_lookup: str, supplier_lookup: str) -> str:
    import hashlib

    digest = hashlib.sha1(f"{product_lookup}\0{supplier_lookup}".encode("utf-8")).hexdigest()
    return f"inventory_consumption_stat_{digest}"


def _refresh_consumption_product_stats(db: Session) -> int:
    _ensure_safety_stock_tables(db)
    if _table_exists(db, "tenant_inventory_estimated_consumptions"):
        estimated_rows = db.execute(
            text(
                """
                WITH positive AS (
                    SELECT *
                    FROM tenant_inventory_estimated_consumptions
                    WHERE COALESCE(consumed_units, 0) > 0
                      AND COALESCE(product_lookup, '') <> ''
                      AND COALESCE(supplier_lookup, '') <> ''
                      AND COALESCE(consumption_date, '') <> ''
                ),
                product_periods AS (
                    SELECT
                        product_lookup,
                        supplier_lookup,
                        period_start_date,
                        period_end_date,
                        MAX(COALESCE(period_days, 0)) AS period_days
                    FROM positive
                    WHERE COALESCE(period_start_date, '') <> ''
                      AND COALESCE(period_end_date, '') <> ''
                      AND COALESCE(period_days, 0) > 0
                    GROUP BY product_lookup, supplier_lookup, period_start_date, period_end_date
                ),
                product_days AS (
                    SELECT
                        product_lookup,
                        supplier_lookup,
                        ROUND(SUM(period_days), 6) AS workdays_count
                    FROM product_periods
                    GROUP BY product_lookup, supplier_lookup
                )
                SELECT
                    MIN(positive.product_id) AS product_id,
                    positive.product_lookup,
                    positive.supplier_lookup,
                    MAX(positive.product_name) AS product_name,
                    MAX(positive.supplier_name) AS supplier_name,
                    ROUND(SUM(positive.consumed_units), 6) AS total_consumed_units,
                    MAX(COALESCE(product_days.workdays_count, 0)) AS workdays_count,
                    COUNT(DISTINCT positive.consumption_date) AS consumed_days_count,
                    COUNT(*) AS movement_count,
                    MIN(positive.consumption_date) AS first_consumption_date,
                    MAX(positive.consumption_date) AS last_consumption_date
                FROM positive
                LEFT JOIN product_days
                    ON product_days.product_lookup = positive.product_lookup
                   AND product_days.supplier_lookup = positive.supplier_lookup
                GROUP BY positive.product_lookup, positive.supplier_lookup
                HAVING total_consumed_units > 0
                   AND workdays_count > 0
                """
            )
        ).mappings().all()
        if estimated_rows:
            db.execute(text("DELETE FROM tenant_inventory_consumption_product_stats"))
            updated_at = _iso_now()
            inserted_count = 0
            for row in estimated_rows:
                product_lookup = str(row.get("product_lookup") or "").strip()
                supplier_lookup = str(row.get("supplier_lookup") or "").strip()
                total_consumed_units = round(float(row.get("total_consumed_units") or 0), 6)
                workdays_count = float(row.get("workdays_count") or 0)
                if not product_lookup or not supplier_lookup or total_consumed_units <= 0 or workdays_count <= 0:
                    continue
                consumed_days_count = max(int(row.get("consumed_days_count") or 0), 1)
                db.execute(
                    text(
                        """
                        INSERT INTO tenant_inventory_consumption_product_stats (
                            id,
                            product_id,
                            product_lookup,
                            supplier_lookup,
                            product_name,
                            supplier_name,
                            total_consumed_units,
                            workdays_count,
                            consumed_days_count,
                            average_daily_consumed_units,
                            average_consumed_units_on_consumption_days,
                            movement_count,
                            first_consumption_date,
                            last_consumption_date,
                            calculation_source,
                            created_at,
                            updated_at
                        ) VALUES (
                            :id,
                            :product_id,
                            :product_lookup,
                            :supplier_lookup,
                            :product_name,
                            :supplier_name,
                            :total_consumed_units,
                            :workdays_count,
                            :consumed_days_count,
                            :average_daily_consumed_units,
                            :average_consumed_units_on_consumption_days,
                            :movement_count,
                            :first_consumption_date,
                            :last_consumption_date,
                            'estimated_from_inventory_confirmed_orders',
                            :created_at,
                            :updated_at
                        )
                        """
                    ),
                    {
                        "id": _inventory_consumption_stat_id(product_lookup, supplier_lookup),
                        "product_id": int(row.get("product_id")) if row.get("product_id") is not None else None,
                        "product_lookup": product_lookup,
                        "supplier_lookup": supplier_lookup,
                        "product_name": str(row.get("product_name") or product_lookup),
                        "supplier_name": str(row.get("supplier_name") or supplier_lookup),
                        "total_consumed_units": total_consumed_units,
                        "workdays_count": int(round(workdays_count)),
                        "consumed_days_count": consumed_days_count,
                        "average_daily_consumed_units": round(total_consumed_units / workdays_count, 6),
                        "average_consumed_units_on_consumption_days": round(total_consumed_units / float(consumed_days_count), 6),
                        "movement_count": int(row.get("movement_count") or 0),
                        "first_consumption_date": str(row.get("first_consumption_date") or "") or None,
                        "last_consumption_date": str(row.get("last_consumption_date") or "") or None,
                        "created_at": updated_at,
                        "updated_at": updated_at,
                    },
                )
                inserted_count += 1
            return inserted_count

    if not _table_exists(db, "tenant_inventory_movements"):
        return 0

    workdays_row = db.execute(
        text(
            """
            SELECT COUNT(DISTINCT substr(occurred_at, 1, 10)) AS total_workdays
            FROM tenant_inventory_movements
            WHERE movement_kind = 'out'
              AND source_type = 'manual_consumption'
              AND COALESCE(equivalent_units, 0) > 0
              AND substr(COALESCE(occurred_at, ''), 1, 10) <> ''
            """
        )
    ).mappings().first()
    workdays_count = int((workdays_row or {}).get("total_workdays") or 0)

    db.execute(text("DELETE FROM tenant_inventory_consumption_product_stats"))
    if workdays_count <= 0:
        return 0

    rows = db.execute(
        text(
            """
            SELECT
                MIN(product_id) AS product_id,
                product_lookup,
                supplier_lookup,
                MAX(product_name) AS product_name,
                MAX(supplier_name) AS supplier_name,
                ROUND(SUM(equivalent_units), 6) AS total_consumed_units,
                COUNT(DISTINCT substr(occurred_at, 1, 10)) AS consumed_days_count,
                COUNT(*) AS movement_count,
                MIN(substr(occurred_at, 1, 10)) AS first_consumption_date,
                MAX(substr(occurred_at, 1, 10)) AS last_consumption_date
            FROM tenant_inventory_movements
            WHERE movement_kind = 'out'
              AND source_type = 'manual_consumption'
              AND COALESCE(equivalent_units, 0) > 0
              AND COALESCE(product_lookup, '') <> ''
              AND COALESCE(supplier_lookup, '') <> ''
              AND substr(COALESCE(occurred_at, ''), 1, 10) <> ''
            GROUP BY product_lookup, supplier_lookup
            """
        )
    ).mappings().all()

    updated_at = _iso_now()
    inserted_count = 0
    for row in rows:
        product_lookup = str(row.get("product_lookup") or "").strip()
        supplier_lookup = str(row.get("supplier_lookup") or "").strip()
        total_consumed_units = round(float(row.get("total_consumed_units") or 0), 6)
        if not product_lookup or not supplier_lookup or total_consumed_units <= 0:
            continue
        consumed_days_count = max(int(row.get("consumed_days_count") or 0), 1)
        db.execute(
            text(
                """
                INSERT INTO tenant_inventory_consumption_product_stats (
                    id,
                    product_id,
                    product_lookup,
                    supplier_lookup,
                    product_name,
                    supplier_name,
                    total_consumed_units,
                    workdays_count,
                    consumed_days_count,
                    average_daily_consumed_units,
                    average_consumed_units_on_consumption_days,
                    movement_count,
                    first_consumption_date,
                    last_consumption_date,
                    calculation_source,
                    created_at,
                    updated_at
                ) VALUES (
                    :id,
                    :product_id,
                    :product_lookup,
                    :supplier_lookup,
                    :product_name,
                    :supplier_name,
                    :total_consumed_units,
                    :workdays_count,
                    :consumed_days_count,
                    :average_daily_consumed_units,
                    :average_consumed_units_on_consumption_days,
                    :movement_count,
                    :first_consumption_date,
                    :last_consumption_date,
                    'manual_consumption_movements',
                    :created_at,
                    :updated_at
                )
                """
            ),
            {
                "id": _inventory_consumption_stat_id(product_lookup, supplier_lookup),
                "product_id": int(row.get("product_id")) if row.get("product_id") is not None else None,
                "product_lookup": product_lookup,
                "supplier_lookup": supplier_lookup,
                "product_name": str(row.get("product_name") or product_lookup),
                "supplier_name": str(row.get("supplier_name") or supplier_lookup),
                "total_consumed_units": total_consumed_units,
                "workdays_count": workdays_count,
                "consumed_days_count": consumed_days_count,
                "average_daily_consumed_units": round(total_consumed_units / float(workdays_count), 6),
                "average_consumed_units_on_consumption_days": round(total_consumed_units / float(consumed_days_count), 6),
                "movement_count": int(row.get("movement_count") or 0),
                "first_consumption_date": str(row.get("first_consumption_date") or "") or None,
                "last_consumption_date": str(row.get("last_consumption_date") or "") or None,
                "created_at": updated_at,
                "updated_at": updated_at,
            },
        )
        inserted_count += 1

    return inserted_count


def _load_safety_stock_status(db: Session, *, minimum_days: float | None = None) -> dict[str, object]:
    safety_days = _read_safety_stock_days(db) if minimum_days is None else max(0.0, min(float(minimum_days), 365.0))
    _refresh_consumption_product_stats(db)
    db.commit()

    workdays_row = db.execute(
        text(
            """
            SELECT COALESCE(MAX(workdays_count), 0) AS workdays_count
            FROM tenant_inventory_consumption_product_stats
            """
        )
    ).mappings().first()
    workdays_count = int((workdays_row or {}).get("workdays_count") or 0)

    if not _table_exists(db, "tenant_inventory_stock_items"):
        return {
            "minimum_days": safety_days,
            "workdays_count": workdays_count,
            "count": 0,
            "items": [],
            "calculation_mode": "stock corrente / media consumi giornalieri manuali",
        }

    rows = db.execute(
        text(
            """
            WITH stock AS (
                SELECT
                    product_lookup,
                    supplier_lookup,
                    MAX(product_name) AS product_name,
                    MAX(supplier_name) AS supplier_name,
                    ROUND(SUM(COALESCE(total_equivalent_units, 0)), 6) AS current_stock_units,
                    COUNT(DISTINCT warehouse_id) AS warehouse_count
                FROM tenant_inventory_stock_items
                GROUP BY product_lookup, supplier_lookup
            )
            SELECT
                stats.product_id,
                COALESCE(stock.product_name, stats.product_name) AS product_name,
                COALESCE(stock.supplier_name, stats.supplier_name) AS supplier_name,
                COALESCE(stock.current_stock_units, 0) AS current_stock_units,
                COALESCE(stock.warehouse_count, 0) AS warehouse_count,
                stats.total_consumed_units,
                stats.workdays_count,
                stats.consumed_days_count,
                stats.average_daily_consumed_units,
                stats.average_consumed_units_on_consumption_days,
                stats.first_consumption_date,
                stats.last_consumption_date,
                ROUND(stats.average_daily_consumed_units * :minimum_days, 6) AS minimum_required_units,
                ROUND((stats.average_daily_consumed_units * :minimum_days) - COALESCE(stock.current_stock_units, 0), 6) AS missing_units,
                CASE
                    WHEN stats.average_daily_consumed_units > 0
                    THEN ROUND(COALESCE(stock.current_stock_units, 0) / stats.average_daily_consumed_units, 3)
                    ELSE NULL
                END AS covered_days
            FROM tenant_inventory_consumption_product_stats AS stats
            LEFT JOIN stock
                ON stock.product_lookup = stats.product_lookup
               AND stock.supplier_lookup = stats.supplier_lookup
            WHERE stats.average_daily_consumed_units > 0
              AND COALESCE(stock.current_stock_units, 0) < stats.average_daily_consumed_units * :minimum_days
            ORDER BY
                missing_units DESC,
                lower(COALESCE(stock.product_name, stats.product_name)) ASC,
                lower(COALESCE(stock.supplier_name, stats.supplier_name)) ASC
            """
        ),
        {"minimum_days": safety_days},
    ).mappings().all()

    return {
        "minimum_days": safety_days,
        "workdays_count": workdays_count,
        "count": len(rows),
        "items": [
            {
                "product_id": int(row.get("product_id")) if row.get("product_id") is not None else None,
                "product_name": str(row.get("product_name") or ""),
                "supplier_name": str(row.get("supplier_name") or ""),
                "current_stock_units": round(float(row.get("current_stock_units") or 0), 6),
                "minimum_required_units": round(float(row.get("minimum_required_units") or 0), 6),
                "missing_units": round(float(row.get("missing_units") or 0), 6),
                "covered_days": float(row.get("covered_days")) if row.get("covered_days") is not None else None,
                "average_daily_consumed_units": round(float(row.get("average_daily_consumed_units") or 0), 6),
                "average_consumed_units_on_consumption_days": round(
                    float(row.get("average_consumed_units_on_consumption_days") or 0),
                    6,
                ),
                "total_consumed_units": round(float(row.get("total_consumed_units") or 0), 6),
                "workdays_count": int(row.get("workdays_count") or 0),
                "consumed_days_count": int(row.get("consumed_days_count") or 0),
                "warehouse_count": int(row.get("warehouse_count") or 0),
                "first_consumption_date": str(row.get("first_consumption_date") or "") or None,
                "last_consumption_date": str(row.get("last_consumption_date") or "") or None,
            }
            for row in rows
        ],
        "calculation_mode": "sotto-stock quando stock corrente < media giornaliera consumi manuali * giorni safety",
    }


def _safety_stock_item_key(item: dict[str, object]) -> str:
    return "\0".join(
        [
            _normalize_catalog_text(str(item.get("product_name") or "")),
            _normalize_catalog_text(str(item.get("supplier_name") or "")),
        ]
    )


def _safety_stock_item_keys(items: list[dict[str, object]]) -> list[str]:
    keys = sorted({key for item in items if (key := _safety_stock_item_key(item)) != "\0"})
    return keys


def _safety_stock_signature(item_keys: list[str]) -> str:
    return hashlib.sha256("\n".join(item_keys).encode("utf-8")).hexdigest()


def _read_safety_stock_notification_state(db: Session) -> dict[str, object] | None:
    _ensure_safety_stock_tables(db)
    row = db.execute(
        text(
            """
            SELECT signature, item_keys_json, count
            FROM ordini_safety_stock_notification_state
            WHERE key = :key
            LIMIT 1
            """
        ),
        {"key": SAFETY_STOCK_NOTIFICATION_STATE_KEY},
    ).mappings().first()
    if row is None:
        return None

    try:
        item_keys = json.loads(str(row.get("item_keys_json") or "[]"))
    except json.JSONDecodeError:
        item_keys = []
    if not isinstance(item_keys, list):
        item_keys = []

    return {
        "signature": str(row.get("signature") or ""),
        "item_keys": [str(item) for item in item_keys if str(item or "").strip()],
        "count": int(row.get("count") or 0),
    }


def _write_safety_stock_notification_state(
    db: Session,
    *,
    signature: str,
    item_keys: list[str],
    count: int,
    notified: bool,
) -> None:
    timestamp = _iso_now()
    db.execute(
        text(
            """
            INSERT INTO ordini_safety_stock_notification_state (
                key,
                signature,
                item_keys_json,
                count,
                notified_at,
                updated_at
            ) VALUES (
                :key,
                :signature,
                :item_keys_json,
                :count,
                CASE WHEN :notified = 1 THEN :timestamp ELSE NULL END,
                :timestamp
            )
            ON CONFLICT(key) DO UPDATE SET
                signature = excluded.signature,
                item_keys_json = excluded.item_keys_json,
                count = excluded.count,
                notified_at = CASE
                    WHEN :notified = 1 THEN :timestamp
                    ELSE ordini_safety_stock_notification_state.notified_at
                END,
                updated_at = excluded.updated_at
            """
        ),
        {
            "key": SAFETY_STOCK_NOTIFICATION_STATE_KEY,
            "signature": signature,
            "item_keys_json": json.dumps(item_keys, ensure_ascii=False),
            "count": count,
            "notified": 1 if notified else 0,
            "timestamp": timestamp,
        },
    )
    db.commit()


def _format_stock_number(value: object) -> str:
    try:
        numeric = float(value or 0)
    except (TypeError, ValueError):
        numeric = 0.0
    if numeric.is_integer():
        return str(int(numeric))
    return f"{numeric:.1f}".rstrip("0").rstrip(".")


def _build_safety_stock_push_payload(
    *,
    new_items: list[dict[str, object]],
    total_count: int,
    minimum_days: float,
) -> dict[str, object]:
    first_item = new_items[0] if new_items else {}
    first_name = str(first_item.get("product_name") or "Prodotto").strip()
    missing_units = _format_stock_number(first_item.get("missing_units"))
    if len(new_items) == 1:
        body = f"{first_name}: mancano {missing_units} unita rispetto al minimo di {minimum_days:g} giorni."
    else:
        body = f"{len(new_items)} nuovi prodotti sotto-scorta. Primo: {first_name}, mancano {missing_units} unita."
    return {
        "title": "Merce sotto-scorta",
        "body": body[:180],
        "url": "/modules/ordini",
        "tag": "ordini-sotto-stock",
        "data": {
            "type": "safety_stock",
            "new_count": len(new_items),
            "total_count": total_count,
        },
    }


def notify_safety_stock_changes(db: Session, status_payload: dict[str, object]) -> int:
    raw_items = status_payload.get("items")
    items = [item for item in raw_items if isinstance(item, dict)] if isinstance(raw_items, list) else []
    item_keys = _safety_stock_item_keys(items)
    signature = _safety_stock_signature(item_keys)
    previous_state = _read_safety_stock_notification_state(db)

    if previous_state is None:
        _write_safety_stock_notification_state(
            db,
            signature=signature,
            item_keys=item_keys,
            count=len(items),
            notified=False,
        )
        return 0

    previous_keys = set(previous_state.get("item_keys") or [])
    new_items = [item for item in items if _safety_stock_item_key(item) not in previous_keys]
    if not new_items:
        if previous_state.get("signature") != signature or int(previous_state.get("count") or 0) != len(items):
            _write_safety_stock_notification_state(
                db,
                signature=signature,
                item_keys=item_keys,
                count=len(items),
                notified=False,
            )
        return 0

    try:
        minimum_days = float(status_payload.get("minimum_days") or DEFAULT_SAFETY_STOCK_DAYS)
    except (TypeError, ValueError):
        minimum_days = DEFAULT_SAFETY_STOCK_DAYS
    sent_count = send_push_payload(
        db,
        _build_safety_stock_push_payload(
            new_items=new_items,
            total_count=len(items),
            minimum_days=minimum_days,
        ),
    )
    _write_safety_stock_notification_state(
        db,
        signature=signature,
        item_keys=item_keys,
        count=len(items),
        notified=sent_count > 0,
    )
    return sent_count


def run_safety_stock_notification_scan(database_urls: list[str]) -> dict[str, int]:
    checked = 0
    sent = 0
    failed = 0
    for database_url in database_urls:
        try:
            with db_session_context(database_url) as db:
                status_payload = _load_safety_stock_status(db)
                sent += notify_safety_stock_changes(db, status_payload)
                checked += 1
        except Exception:
            failed += 1
    return {"checked": checked, "sent": sent, "failed": failed}


def _load_product_inventory_summaries(db: Session) -> dict[tuple[str, str], dict[str, object]]:
    if not _table_exists(db, "tenant_inventory_stock_items") or not _table_exists(db, "tenant_inventory_warehouses"):
        return {}

    item_rows = db.execute(
        text(
            """
            SELECT
                items.product_name,
                items.supplier_name,
                items.total_equivalent_units,
                warehouses.name AS warehouse_name
            FROM tenant_inventory_stock_items AS items
            JOIN tenant_inventory_warehouses AS warehouses
                ON warehouses.id = items.warehouse_id
            WHERE COALESCE(items.total_equivalent_units, 0) > 0
            ORDER BY lower(warehouses.name) ASC, lower(items.product_name) ASC, lower(items.supplier_name) ASC
            """
        )
    ).mappings().all()

    inventory_by_product: dict[tuple[str, str], dict[str, object]] = {}
    seen_by_product: dict[tuple[str, str], set[str]] = {}
    for row in item_rows:
        product_key = _product_inventory_key(row.get("product_name"), row.get("supplier_name"))
        warehouse_name = str(row.get("warehouse_name") or "").strip()
        if not product_key[0] or not product_key[1] or not warehouse_name:
            continue
        summary = inventory_by_product.setdefault(
            product_key,
            {
                "warehouse_names": [],
                "total_equivalent_units": 0.0,
                "_lot_summaries": {},
            },
        )
        summary["total_equivalent_units"] = float(summary.get("total_equivalent_units") or 0) + float(
            row.get("total_equivalent_units") or 0
        )
        normalized_warehouse_name = _normalize_catalog_text(warehouse_name)
        seen_names = seen_by_product.setdefault(product_key, set())
        if normalized_warehouse_name in seen_names:
            continue
        seen_names.add(normalized_warehouse_name)
        warehouse_names = summary.get("warehouse_names")
        if isinstance(warehouse_names, list):
            warehouse_names.append(warehouse_name)

    if _table_exists(db, "tenant_inventory_stock_lots"):
        lot_rows = db.execute(
            text(
                """
                SELECT
                    items.product_name,
                    items.supplier_name,
                    lots.lot_code,
                    COALESCE(lots.quantity, 0) AS quantity,
                    COALESCE(lots.equivalent_units, 0) AS equivalent_units
                FROM tenant_inventory_stock_lots AS lots
                JOIN tenant_inventory_stock_items AS items
                    ON items.id = lots.item_id
                WHERE COALESCE(lots.quantity, 0) > 0 OR COALESCE(lots.equivalent_units, 0) > 0
                ORDER BY lower(items.product_name) ASC, lower(items.supplier_name) ASC, lower(lots.lot_code) ASC
                """
            )
        ).mappings().all()

        for row in lot_rows:
            product_key = _product_inventory_key(row.get("product_name"), row.get("supplier_name"))
            if not product_key[0] or not product_key[1]:
                continue
            lot_code = str(row.get("lot_code") or "").strip()
            lot_lookup = _normalize_catalog_lot_code(lot_code)
            if not lot_lookup:
                continue
            summary = inventory_by_product.setdefault(
                product_key,
                {
                    "warehouse_names": [],
                    "total_equivalent_units": 0.0,
                    "_lot_summaries": {},
                },
            )
            lot_summaries = summary.get("_lot_summaries")
            if not isinstance(lot_summaries, dict):
                lot_summaries = {}
                summary["_lot_summaries"] = lot_summaries
            lot_summary = lot_summaries.setdefault(
                lot_lookup,
                {
                    "lot_code": lot_code,
                    "quantity": 0.0,
                    "equivalent_units": 0.0,
                },
            )
            lot_summary["quantity"] = float(lot_summary.get("quantity") or 0) + float(row.get("quantity") or 0)
            lot_summary["equivalent_units"] = float(lot_summary.get("equivalent_units") or 0) + float(
                row.get("equivalent_units") or 0
            )

    for summary in inventory_by_product.values():
        summary["total_equivalent_units"] = round(float(summary.get("total_equivalent_units") or 0), 6)
        raw_lot_summaries = summary.pop("_lot_summaries", {})
        if not isinstance(raw_lot_summaries, dict):
            summary["lot_summaries"] = []
            continue
        lot_summaries = []
        for lot_summary in raw_lot_summaries.values():
            if not isinstance(lot_summary, dict):
                continue
            lot_summaries.append(
                {
                    "lot_code": str(lot_summary.get("lot_code") or ""),
                    "quantity": round(float(lot_summary.get("quantity") or 0), 6),
                    "equivalent_units": round(float(lot_summary.get("equivalent_units") or 0), 6),
                }
            )
        summary["lot_summaries"] = sorted(
            lot_summaries,
            key=lambda item: (
                _normalize_catalog_lot_code(str(item.get("lot_code") or "")),
                str(item.get("lot_code") or ""),
            ),
        )

    return inventory_by_product


def _serialize_product(
    product: Product,
    *,
    inventory_summary: dict[str, object] | None = None,
    warehouse_names: list[str] | None = None,
) -> dict[str, object]:
    resolved_inventory_summary = inventory_summary if isinstance(inventory_summary, dict) else {}
    base_warehouse_names = warehouse_names
    if base_warehouse_names is None:
        summary_warehouse_names = resolved_inventory_summary.get("warehouse_names")
        base_warehouse_names = summary_warehouse_names if isinstance(summary_warehouse_names, list) else []
    resolved_warehouse_names = [name for name in (base_warehouse_names or []) if str(name).strip()]
    lot_summaries = (
        resolved_inventory_summary.get("lot_summaries")
        if isinstance(resolved_inventory_summary.get("lot_summaries"), list)
        else []
    )
    normalized_product_lot = _normalize_catalog_lot_code(product.lot_code)
    matched_lot_summary = next(
        (
            lot_summary
            for lot_summary in lot_summaries
            if isinstance(lot_summary, dict)
            and _normalize_catalog_lot_code(str(lot_summary.get("lot_code") or "")) == normalized_product_lot
        ),
        None,
    )
    return {
        "id": product.id,
        "product_name": product.product_name,
        "lot_code": product.lot_code,
        "supplier_name": product.supplier_name,
        "product_code": product.product_code,
        "final_price_vat": product.final_price_vat,
        "vat_rate": product.vat_rate,
        "weight_kg": product.weight_kg,
        "unit_price_per_kg": product.unit_price_per_kg,
        "category": product.category,
        "notes": product.notes,
        "units_per_pack": product.units_per_pack,
        "liters_per_unit": product.liters_per_unit,
        "warehouse_names": resolved_warehouse_names,
        "warehouse_count": len(resolved_warehouse_names),
        "inventory_total_equivalent_units": round(float(resolved_inventory_summary.get("total_equivalent_units") or 0), 6),
        "inventory_lot_quantity": round(float((matched_lot_summary or {}).get("quantity") or 0), 6),
        "inventory_lot_equivalent_units": round(float((matched_lot_summary or {}).get("equivalent_units") or 0), 6),
    }


def _serialize_supplier_catalog(catalog: SupplierCatalog) -> dict[str, object]:
    return {
        "id": catalog.id,
        "catalog_name": catalog.catalog_name,
        "supplier_name": catalog.supplier_name,
        "source_file_name": catalog.source_file_name,
        "total_items": catalog.total_items,
        "created_at": _format_iso(catalog.created_at),
        "updated_at": _format_iso(catalog.updated_at),
    }


def _serialize_supplier_catalog_item(item: SupplierCatalogItem) -> dict[str, object]:
    return {
        "id": item.id,
        "catalog_id": item.catalog_id,
        "source_name": item.source_name,
        "source_lot_code": item.source_lot_code,
        "source_supplier_name": item.source_supplier_name,
        "product_code": item.product_code,
        "final_price_vat": item.final_price_vat,
        "vat_rate": item.vat_rate,
        "weight_kg": item.weight_kg,
        "unit_price_per_kg": item.unit_price_per_kg,
        "category": item.category,
        "notes": item.notes,
        "units_per_pack": item.units_per_pack,
        "liters_per_unit": item.liters_per_unit,
        "sort_order": item.sort_order,
    }


def _normalize_supplier_catalog_row(
    row: SupplierCatalogRow,
    *,
    supplier_name: str | None = None,
    default_lot_code: str | None = None,
) -> dict[str, object]:
    resolved_supplier_name = (row.source_supplier_name or supplier_name or "").strip() or None
    resolved_lot_code = (row.source_lot_code or default_lot_code or "").strip() or None
    return {
        "source_name": row.source_name.strip(),
        "source_lot_code": resolved_lot_code,
        "source_supplier_name": resolved_supplier_name,
        "product_code": row.product_code,
        "final_price_vat": row.final_price_vat,
        "vat_rate": row.vat_rate,
        "weight_kg": row.weight_kg,
        "unit_price_per_kg": row.unit_price_per_kg,
        "category": row.category,
        "notes": row.notes,
        "units_per_pack": row.units_per_pack,
        "liters_per_unit": row.liters_per_unit,
    }


def _estimate_line_total(
    quantity: int,
    final_price_vat: float | None,
    *,
    weight_kg: float | None = None,
    unit_price_per_kg: float | None = None,
    vat_rate: float | None = None,
) -> float | None:
    if final_price_vat is not None and final_price_vat > 0:
        return round(float(quantity) * float(final_price_vat), 2)
    if unit_price_per_kg is None or unit_price_per_kg <= 0:
        return None
    if weight_kg is None or weight_kg <= 0:
        return None
    unit_total = float(unit_price_per_kg) * float(weight_kg)
    if vat_rate is not None and vat_rate > 0:
        unit_total *= 1 + (float(vat_rate) / 100)
    return round(float(quantity) * unit_total, 2)


def _resolved_item_line_total(item: OrderItem) -> float | None:
    if item.estimated_line_total is not None and item.estimated_line_total > 0:
        return round(float(item.estimated_line_total), 2)
    if item.final_price_vat_snapshot is not None and item.final_price_vat_snapshot > 0:
        return _estimate_line_total(item.quantity, item.final_price_vat_snapshot)
    product = item.product
    if product is None:
        return None
    return _estimate_line_total(
        item.quantity,
        product.final_price_vat,
        weight_kg=product.weight_kg,
        unit_price_per_kg=product.unit_price_per_kg,
        vat_rate=product.vat_rate,
    )


def _serialize_item(item: OrderItem) -> dict[str, object]:
    resolved_line_total = _resolved_item_line_total(item)
    return {
        "id": item.id,
        "product_id": item.product_id,
        "product_name": item.product_name,
        "lot_code": item.lot_code,
        "supplier_name": item.supplier_name,
        "quantity": item.quantity,
        "final_price_vat_snapshot": item.final_price_vat_snapshot,
        "estimated_line_total": resolved_line_total,
        "units_per_pack": item.units_per_pack,
        "liters_per_unit": item.liters_per_unit,
    }


def _serialize_batch(batch: OrderBatch) -> dict[str, object]:
    serialized_items = [_serialize_item(item) for item in batch.items]
    resolved_batch_total = batch.total_estimated_amount
    if resolved_batch_total is None or resolved_batch_total <= 0:
        line_totals = [float(item["estimated_line_total"]) for item in serialized_items if item.get("estimated_line_total") is not None]
        resolved_batch_total = round(sum(line_totals), 2) if line_totals else None
    return {
        "id": batch.id,
        "staff": batch.staff,
        "total_estimated_amount": resolved_batch_total,
        "confirmed_at": _format_iso(batch.confirmed_at),
        "fiscal_document_matched": bool(batch.fiscal_document_id),
        "fiscal_document_id": batch.fiscal_document_id,
        "fiscal_document_name": batch.fiscal_document_name,
        "fiscal_document_type": batch.fiscal_document_type,
        "fiscal_document_matched_at": batch.fiscal_document_matched_at,
        "items": serialized_items,
    }


def _extract_liters_from_text(value: str) -> float | None:
    normalized = (value or "").lower().replace(",", ".")
    for pattern, multiplier in (
        (r"(\d+(?:\.\d+)?)\s*(?:lt|l)\b", 1.0),
        (r"(\d+(?:\.\d+)?)\s*cl\b", 0.01),
        (r"(\d+(?:\.\d+)?)\s*ml\b", 0.001),
    ):
        match = re.search(pattern, normalized)
        if not match:
            continue
        try:
            amount = float(match.group(1))
        except ValueError:
            continue
        liters = amount * multiplier
        if liters > 0:
            return liters
    return None


def _resolve_liters_per_unit(product_name: str, lot_code: str, liters_per_unit: float | None) -> float | None:
    if liters_per_unit is not None and liters_per_unit > 0:
        return liters_per_unit
    for source in (lot_code, product_name):
        estimated = _extract_liters_from_text(source or "")
        if estimated is not None:
            return estimated
    return None


def _login_response(context: AuthContext) -> dict[str, object]:
    return {
        "success": True,
        "token": context.token,
        "staff": context.default_staff,
        "tenant_name": context.tenant_name,
        "tenant_slug": context.tenant_slug,
        "user_name": context.user_name or context.username or context.user_email,
    }


def _load_batches_for_year(year: int, db: Session) -> list[OrderBatch]:
    start, end = _utc_year_bounds(year)
    stmt = (
        select(OrderBatch)
        .where(OrderBatch.confirmed_at >= start, OrderBatch.confirmed_at < end)
        .options(selectinload(OrderBatch.items).selectinload(OrderItem.product))
        .order_by(OrderBatch.confirmed_at.desc(), OrderBatch.id.desc())
    )
    return list(db.scalars(stmt))


def _all_years(db: Session) -> list[int]:
    years = {
        (batch.confirmed_at.replace(tzinfo=timezone.utc) if batch.confirmed_at.tzinfo is None else batch.confirmed_at).year
        for batch in db.scalars(select(OrderBatch))
    }
    years.update(db.scalars(select(SeasonalGoal.year)))
    if not years:
        years.add(datetime.now(timezone.utc).year)
    return sorted(years, reverse=True)


def _normalize_staff(staff: str | None, fallback: str | None = None) -> str:
    value = (staff or fallback or get_settings().default_staff).strip().lower()
    return value or get_settings().default_staff


def _normalize_suspended_payload(payload: dict[str, int]) -> dict[str, int]:
    normalized: dict[str, int] = {}
    for key, raw_value in payload.items():
        try:
            quantity = int(raw_value)
        except (TypeError, ValueError):
            continue
        if quantity > 0:
            normalized[str(key)] = quantity
    return normalized


def _normalize_note_text(value: str) -> str:
    text = value.strip()
    if not text:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Il testo della nota e' obbligatorio")
    return text


def _normalize_product_identity(entry: ProductUpsert) -> tuple[str, str, str]:
    return (
        entry.product_name.strip(),
        entry.lot_code.strip(),
        entry.supplier_name.strip(),
    )


def _dedupe_product_payload(payload: list[ProductUpsert]) -> list[ProductUpsert]:
    deduped: dict[tuple[str, str, str], ProductUpsert] = {}
    for entry in payload:
        deduped[_normalize_product_identity(entry)] = entry
    return list(deduped.values())


def _apply_product_metadata(product: Product, entry: ProductUpsert) -> None:
    optional_fields = {
        "product_code": entry.product_code,
        "final_price_vat": entry.final_price_vat,
        "vat_rate": entry.vat_rate,
        "weight_kg": entry.weight_kg,
        "unit_price_per_kg": entry.unit_price_per_kg,
        "category": entry.category,
        "notes": entry.notes,
        "units_per_pack": entry.units_per_pack,
        "liters_per_unit": entry.liters_per_unit,
    }
    for field_name, value in optional_fields.items():
        if field_name in entry.model_fields_set:
            setattr(product, field_name, value)


def _normalize_catalog_text(value: str | None) -> str:
    normalized = unicodedata.normalize("NFKD", str(value or "")).encode("ascii", "ignore").decode("ascii")
    lowered = normalized.lower().replace("’", "'")

    def convert_centiliters(match: re.Match[str]) -> str:
        raw_number = match.group(1).replace(",", ".")
        try:
            liters = float(raw_number) / 100.0
        except ValueError:
            return match.group(0)
        rendered = f"{liters:.2f}".rstrip("0").rstrip(".")
        return f"{rendered}l"

    lowered = re.sub(r"(\d+(?:[.,]\d+)?)\s*cl\b", convert_centiliters, lowered)
    lowered = re.sub(r"[^a-z0-9]+", " ", lowered)
    return " ".join(lowered.split())


def _normalize_catalog_lot_code(value: str | None) -> str:
    normalized = _normalize_catalog_text(value).replace(" ", "")
    if not normalized:
        return ""
    if normalized in {"bt", "bottiglia", "bottiglie"} or normalized.startswith("bottigli"):
        return "bt"
    if normalized in {"ct", "cartone", "cartoni", "cassa", "casse"} or normalized.startswith("carton") or normalized.startswith("cass"):
        return "ct"
    if normalized in {"pz", "pezzo", "pezzi"} or normalized.startswith("pezz"):
        return "pz"
    return normalized


def _italian_day_utc_bounds(reference: datetime | None = None) -> tuple[datetime, datetime]:
    current = reference or datetime.now(timezone.utc)
    if current.tzinfo is None:
        current = current.replace(tzinfo=timezone.utc)
    local = current.astimezone(ITALIAN_TIMEZONE)
    local_start = datetime(local.year, local.month, local.day, tzinfo=ITALIAN_TIMEZONE)
    return (local_start.astimezone(timezone.utc), (local_start + timedelta(days=1)).astimezone(timezone.utc))


def _find_open_daily_supplier_batch(db: Session, supplier_name: str) -> OrderBatch | None:
    supplier_key = _normalize_catalog_text(supplier_name)
    if not supplier_key:
        return None
    start, end = _italian_day_utc_bounds()
    batches = db.scalars(
        select(OrderBatch)
        .where(
            OrderBatch.confirmed_at >= start,
            OrderBatch.confirmed_at < end,
            OrderBatch.fiscal_document_id.is_(None),
        )
        .options(selectinload(OrderBatch.items).selectinload(OrderItem.product))
        .order_by(OrderBatch.confirmed_at.asc(), OrderBatch.id.asc())
    )
    for batch in batches:
        batch_supplier_keys = {
            _normalize_catalog_text(item.supplier_name)
            for item in batch.items
            if _normalize_catalog_text(item.supplier_name)
        }
        if batch_supplier_keys == {supplier_key}:
            return batch
    return None


def _update_order_item_from_product(item: OrderItem, product: Product, quantity: int) -> None:
    item.product = product
    item.product_id = product.id
    item.product_name = product.product_name
    item.lot_code = product.lot_code
    item.supplier_name = product.supplier_name
    item.quantity = quantity
    item.final_price_vat_snapshot = product.final_price_vat
    item.estimated_line_total = _estimate_line_total(
        quantity,
        product.final_price_vat,
        weight_kg=product.weight_kg,
        unit_price_per_kg=product.unit_price_per_kg,
        vat_rate=product.vat_rate,
    )
    item.units_per_pack = product.units_per_pack
    item.liters_per_unit = _resolve_liters_per_unit(product.product_name, product.lot_code, product.liters_per_unit)


def _append_product_to_batch(batch: OrderBatch, product: Product, quantity: int) -> None:
    existing_item = next((item for item in batch.items if item.product_id == product.id), None)
    if existing_item is None:
        item = OrderItem()
        _update_order_item_from_product(item, product, quantity)
        batch.items.append(item)
        return
    _update_order_item_from_product(existing_item, product, int(existing_item.quantity or 0) + quantity)


def _recalculate_batch_total(batch: OrderBatch) -> None:
    line_totals = [
        float(line_total)
        for item in batch.items
        if (line_total := _resolved_item_line_total(item)) is not None
    ]
    batch.total_estimated_amount = round(sum(line_totals), 2) if line_totals else None


def _catalog_name_tokens(value: str | None) -> set[str]:
    return {token for token in _normalize_catalog_text(value).split(" ") if token}


def _catalog_name_similarity(left: str | None, right: str | None) -> tuple[float, float]:
    left_normalized = _normalize_catalog_text(left)
    right_normalized = _normalize_catalog_text(right)
    if not left_normalized or not right_normalized:
        return 0.0, 0.0
    if left_normalized == right_normalized:
        return 1.0, 1.0

    left_tokens = _catalog_name_tokens(left)
    right_tokens = _catalog_name_tokens(right)
    token_overlap = len(left_tokens & right_tokens) / max(len(left_tokens), len(right_tokens), 1)
    sequence_score = SequenceMatcher(None, left_normalized, right_normalized).ratio()
    score = min(1.0, (sequence_score * 0.68) + (token_overlap * 0.32))
    return score, token_overlap


def _catalog_match_label(match_type: str) -> str:
    labels = {
        "exact_code": "codice",
        "exact_identity": "identita esatta",
        "exact_name": "nome esatto",
        "strong_name": "nome forte",
        "new": "nuovo",
        "ambiguous": "ambiguo",
        "skipped": "saltato",
    }
    return labels.get(match_type, match_type)


def _product_upsert_payload_from_preview(
    row: SupplierCatalogRow,
    *,
    matched_product: Product | None,
    supplier_name: str | None,
    lot_code: str | None,
) -> dict[str, object]:
    product_name = matched_product.product_name if matched_product is not None else row.source_name.strip()
    resolved_supplier = (supplier_name or (matched_product.supplier_name if matched_product is not None else None) or "").strip()
    resolved_lot = (lot_code or (matched_product.lot_code if matched_product is not None else None) or "").strip()

    payload: dict[str, object] = {
        "product_name": product_name,
        "supplier_name": resolved_supplier,
        "lot_code": resolved_lot,
    }

    optional_fields = {
        "product_code": row.product_code if row.product_code is not None else (matched_product.product_code if matched_product is not None else None),
        "final_price_vat": row.final_price_vat,
        "vat_rate": row.vat_rate,
        "weight_kg": row.weight_kg,
        "unit_price_per_kg": row.unit_price_per_kg,
        "category": row.category,
        "notes": row.notes,
        "units_per_pack": row.units_per_pack,
        "liters_per_unit": row.liters_per_unit,
    }
    for field_name, value in optional_fields.items():
        if value is not None:
            payload[field_name] = value

    return payload


def _preview_supplier_catalog_rows(
    request: SupplierCatalogPreviewRequest,
    products: list[Product],
) -> dict[str, object]:
    products_by_code: defaultdict[str, list[Product]] = defaultdict(list)
    products_by_identity: defaultdict[tuple[str, str, str], list[Product]] = defaultdict(list)
    products_by_supplier: defaultdict[str, list[Product]] = defaultdict(list)
    products_by_name_and_lot: defaultdict[tuple[str, str], list[Product]] = defaultdict(list)

    for product in products:
        normalized_name = _normalize_catalog_text(product.product_name)
        normalized_supplier = _normalize_catalog_text(product.supplier_name)
        normalized_lot = _normalize_catalog_lot_code(product.lot_code)
        normalized_code = _normalize_catalog_text(product.product_code)

        if normalized_code:
            products_by_code[normalized_code].append(product)
        if normalized_name and normalized_supplier and normalized_lot:
            products_by_identity[(normalized_name, normalized_lot, normalized_supplier)].append(product)
        if normalized_supplier:
            products_by_supplier[normalized_supplier].append(product)
        if normalized_name and normalized_lot:
            products_by_name_and_lot[(normalized_name, normalized_lot)].append(product)

    preview_rows: list[dict[str, object]] = []
    summary = {
        "total_rows": 0,
        "importable_rows": 0,
        "matched_rows": 0,
        "matched_exact_code": 0,
        "matched_exact_identity": 0,
        "matched_exact_name": 0,
        "matched_strong_name": 0,
        "new_rows": 0,
        "ambiguous_rows": 0,
        "skipped_rows": 0,
    }

    default_supplier_name = (request.supplier_name or "").strip()
    default_lot_code = (request.default_lot_code or "").strip()

    for index, row in enumerate(request.rows, start=1):
        summary["total_rows"] += 1
        source_name = row.source_name.strip()
        source_supplier = (row.source_supplier_name or default_supplier_name or "").strip()
        source_lot = (row.source_lot_code or default_lot_code or "").strip()
        normalized_name = _normalize_catalog_text(source_name)
        normalized_supplier = _normalize_catalog_text(source_supplier)
        normalized_lot = _normalize_catalog_lot_code(source_lot)
        normalized_code = _normalize_catalog_text(row.product_code)

        warnings: list[str] = []
        matched_product: Product | None = None
        match_type = "skipped"
        confidence = 0.0
        importable = False

        if not source_name:
            warnings.append("Nome prodotto mancante.")
        if not source_supplier:
            warnings.append("Fornitore mancante: imposta una colonna fornitore o un fornitore di default.")
        if not source_lot:
            warnings.append("Lotto mancante: imposta una colonna lotto o un lotto di default.")

        if normalized_code:
            code_candidates = products_by_code.get(normalized_code, [])
            if normalized_supplier:
                supplier_filtered = [
                    candidate
                    for candidate in code_candidates
                    if _normalize_catalog_text(candidate.supplier_name) == normalized_supplier
                ]
                if supplier_filtered:
                    code_candidates = supplier_filtered
            if len(code_candidates) == 1:
                matched_product = code_candidates[0]
                match_type = "exact_code"
                confidence = 1.0
            elif len(code_candidates) > 1:
                warnings.append("Codice prodotto presente ma associato a piu righe del catalogo locale.")
                match_type = "ambiguous"

        if matched_product is None and normalized_name and normalized_supplier and normalized_lot:
            identity_candidates = products_by_identity.get((normalized_name, normalized_lot, normalized_supplier), [])
            if len(identity_candidates) == 1:
                matched_product = identity_candidates[0]
                match_type = "exact_identity"
                confidence = 0.99
            elif len(identity_candidates) > 1:
                warnings.append("Prodotto gia presente piu volte con la stessa identita locale.")
                match_type = "ambiguous"

        if matched_product is None and normalized_name and normalized_supplier:
            supplier_candidates = products_by_supplier.get(normalized_supplier, [])
            if normalized_lot:
                lot_candidates = [
                    candidate for candidate in supplier_candidates if _normalize_catalog_lot_code(candidate.lot_code) == normalized_lot
                ]
            else:
                lot_candidates = supplier_candidates

            exact_name_candidates = [
                candidate for candidate in lot_candidates if _normalize_catalog_text(candidate.product_name) == normalized_name
            ]
            if len(exact_name_candidates) == 1:
                matched_product = exact_name_candidates[0]
                match_type = "exact_name"
                confidence = 0.97
            elif len(exact_name_candidates) > 1:
                warnings.append("Nome prodotto coincidente su piu varianti locali: specifica meglio il lotto.")
                match_type = "ambiguous"
            elif lot_candidates:
                scored_candidates: list[tuple[float, float, Product]] = []
                for candidate in lot_candidates:
                    score, overlap = _catalog_name_similarity(source_name, candidate.product_name)
                    if normalized_lot and _normalize_catalog_lot_code(candidate.lot_code) == normalized_lot:
                        score = min(1.0, score + 0.02)
                    scored_candidates.append((score, overlap, candidate))
                scored_candidates.sort(key=lambda item: item[0], reverse=True)
                best_score, best_overlap, best_candidate = scored_candidates[0]
                second_score = scored_candidates[1][0] if len(scored_candidates) > 1 else 0.0
                if best_score >= 0.965 or (best_score >= 0.93 and best_overlap >= 0.84 and (best_score - second_score) >= 0.05):
                    matched_product = best_candidate
                    match_type = "strong_name"
                    confidence = round(best_score, 4)
                elif best_score >= 0.86:
                    warnings.append(
                        f"Riconoscimento non abbastanza sicuro: proposta {best_candidate.product_name} ({int(round(best_score * 100))}%)."
                    )
                    match_type = "ambiguous"

        if matched_product is None and match_type != "ambiguous" and normalized_name and normalized_lot:
            fallback_candidates = products_by_name_and_lot.get((normalized_name, normalized_lot), [])
            if len(fallback_candidates) == 1:
                matched_product = fallback_candidates[0]
                match_type = "exact_name"
                confidence = 0.94

        if matched_product is not None:
            source_supplier = matched_product.supplier_name
            source_lot = matched_product.lot_code
            importable = True
            summary["matched_rows"] += 1
            summary[f"matched_{match_type}"] += 1
        elif match_type == "ambiguous":
            summary["ambiguous_rows"] += 1
        elif normalized_name and source_supplier and source_lot and request.create_missing:
            match_type = "new"
            confidence = 0.0
            importable = True
            summary["new_rows"] += 1
        else:
            match_type = "skipped"
            summary["skipped_rows"] += 1

        if importable:
            summary["importable_rows"] += 1

        proposed_product = (
            _product_upsert_payload_from_preview(
                row,
                matched_product=matched_product,
                supplier_name=source_supplier,
                lot_code=source_lot,
            )
            if importable
            else None
        )

        preview_rows.append(
            {
                "row_index": index,
                "source_name": source_name,
                "source_supplier_name": row.source_supplier_name,
                "source_lot_code": row.source_lot_code,
                "source_product_code": row.product_code,
                "match_type": match_type,
                "match_label": _catalog_match_label(match_type),
                "confidence": round(confidence, 4),
                "importable": importable,
                "warnings": warnings,
                "matched_product": _serialize_product(matched_product) if matched_product is not None else None,
                "proposed_product": proposed_product,
            }
        )

    return {
        "ok": True,
        "summary": summary,
        "rows": preview_rows,
    }


def _goal_matches(item: OrderItem, goal: SeasonalGoal, *, secondary: bool = False) -> bool:
    product_match = goal.secondary_product_match if secondary else goal.product_match
    if secondary and not product_match:
        return False
    product_terms = [part.strip(" .,-") for part in re.split(r"\s*\|\s*|\s*,\s*|\s*;\s*", product_match or "") if part.strip(" .,-")]
    if product_terms:
        product_name = item.product_name.lower()
        if not any(term.lower() in product_name for term in product_terms):
            return False
    if goal.supplier_match and goal.supplier_match.lower() not in item.supplier_name.lower():
        return False
    return True


def _is_pack_lot_code(lot_code: str | None) -> bool:
    return _normalize_catalog_lot_code(lot_code) == "ct"


def _order_item_units_per_pack(item: OrderItem) -> float | None:
    raw_value = item.units_per_pack if item.units_per_pack is not None else (item.product.units_per_pack if item.product is not None else None)
    if raw_value is None:
        return None
    try:
        units_per_pack = float(raw_value)
    except (TypeError, ValueError):
        return None
    return units_per_pack if units_per_pack > 1 else None


def _order_item_pack_multiplier(item: OrderItem) -> float:
    if not _is_pack_lot_code(item.lot_code):
        return 1.0
    return _order_item_units_per_pack(item) or 1.0


def _goal_counts_order_packs(goal: SeasonalGoal) -> bool:
    unit_label = _normalize_catalog_text(goal.unit_label or "")
    if not unit_label:
        return False
    return unit_label in {"ct", "cartone", "cartoni", "cassa", "casse"} or unit_label.startswith("carton") or unit_label.startswith("cass")


def _order_item_goal_quantity(item: OrderItem, goal: SeasonalGoal) -> float:
    quantity = float(item.quantity or 0)
    if _goal_counts_order_packs(goal):
        return quantity
    return quantity * _order_item_pack_multiplier(item)


def _goal_progress(goal: SeasonalGoal, batches: list[OrderBatch]) -> dict[str, object]:
    if goal.goal_type == "note":
        return {
            "id": goal.id,
            "year": goal.year,
            "name": goal.name,
            "type": "note",
            "description": goal.description or "",
            "product_match": goal.product_match,
            "secondary_product_match": goal.secondary_product_match,
            "supplier_match": goal.supplier_match,
            "target": goal.target,
            "secondary_target": goal.secondary_target,
            "unit": goal.unit_label,
            "bonus": goal.bonus_label,
        }

    if goal.goal_type == "liters_dual":
        primary = 0.0
        secondary = 0.0
        for batch in batches:
            for item in batch.items:
                resolved_liters = _resolve_liters_per_unit(
                    item.product_name,
                    item.lot_code,
                    item.liters_per_unit if item.liters_per_unit is not None else (item.product.liters_per_unit if item.product is not None else None),
                )
                if resolved_liters is None:
                    continue
                liters = resolved_liters * item.quantity * _order_item_pack_multiplier(item)
                if _goal_matches(item, goal):
                    primary += liters
                if _goal_matches(item, goal, secondary=True):
                    secondary += liters
        return {
            "id": goal.id,
            "year": goal.year,
            "name": goal.name,
            "type": "liters_dual",
            "target_grey": goal.target or 0,
            "target_patron": goal.secondary_target or 0,
            "progress_grey": round(primary, 2),
            "progress_patron": round(secondary, 2),
            "bonus": goal.bonus_label,
            "description": goal.description or "",
            "product_match": goal.product_match,
            "secondary_product_match": goal.secondary_product_match,
            "supplier_match": goal.supplier_match,
            "target": goal.target,
            "secondary_target": goal.secondary_target,
            "unit": goal.unit_label or "L",
        }

    progress = 0.0
    for batch in batches:
        for item in batch.items:
            if not _goal_matches(item, goal):
                continue
            if goal.goal_type == "liters":
                resolved_liters = _resolve_liters_per_unit(
                    item.product_name,
                    item.lot_code,
                    item.liters_per_unit if item.liters_per_unit is not None else (item.product.liters_per_unit if item.product is not None else None),
                )
                if resolved_liters is None:
                    continue
                progress += resolved_liters * item.quantity * _order_item_pack_multiplier(item)
            else:
                progress += _order_item_goal_quantity(item, goal)

    return {
        "id": goal.id,
        "year": goal.year,
        "name": goal.name,
        "type": goal.goal_type,
        "target": goal.target or 0,
        "progress": round(progress, 2),
        "unit": goal.unit_label or ("L" if goal.goal_type == "liters" else "articoli"),
        "bonus": goal.bonus_label,
        "description": goal.description or "",
        "product_match": goal.product_match,
        "secondary_product_match": goal.secondary_product_match,
        "supplier_match": goal.supplier_match,
        "secondary_target": goal.secondary_target,
    }


@router.get("/health")
def health_check() -> dict[str, str]:
    return {"status": "ok", "service": "ordini-backend"}


@router.post("/api/login")
def login(payload: LoginRequest) -> dict[str, object]:
    try:
        context = authenticate_login(
            identifier=payload.identifier,
            password=payload.password,
            session_token=payload.session_token,
        )
    except ValueError:
        return {"success": False}
    if context.role == "staff" and "ordini" not in context.permissions:
        return {"success": False, "detail": "Questo account non puo accedere a Ordini"}
    return _login_response(context)


@protected.get("/api/session")
def session_info(auth: AuthContext = Depends(require_management_token)) -> dict[str, object]:
    return {
        "authenticated": True,
        "token": auth.token,
        "staff": auth.default_staff,
        "tenant_name": auth.tenant_name,
        "tenant_slug": auth.tenant_slug,
        "user_name": auth.user_name or auth.username or auth.user_email,
        "source": auth.source,
    }


@protected.get("/api/sotto-stock")
def get_safety_stock_status(db: Session = Depends(get_db)) -> dict[str, object]:
    status_payload = _load_safety_stock_status(db)
    notify_safety_stock_changes(db, status_payload)
    return status_payload


@protected.get("/api/sotto-stock/settings")
def get_safety_stock_settings(db: Session = Depends(get_db)) -> dict[str, object]:
    return {"minimum_days": _read_safety_stock_days(db)}


@protected.put("/api/sotto-stock/settings")
def update_safety_stock_settings(
    payload: SafetyStockSettingsUpdate,
    db: Session = Depends(get_db),
) -> dict[str, object]:
    minimum_days = _write_safety_stock_days(db, payload.minimum_days)
    status_payload = _load_safety_stock_status(db, minimum_days=minimum_days)
    notify_safety_stock_changes(db, status_payload)
    return status_payload


@protected.get("/api/prodotti")
def list_products(db: Session = Depends(get_db)) -> list[dict[str, object]]:
    stmt = select(Product).where(Product.active.is_(True)).order_by(Product.supplier_name, Product.product_name, Product.lot_code)
    products = list(db.scalars(stmt))
    inventory_summaries_by_product = _load_product_inventory_summaries(db)
    return [
        _serialize_product(
            product,
            inventory_summary=inventory_summaries_by_product.get(
                _product_inventory_key(product.product_name, product.supplier_name),
                {},
            ),
        )
        for product in products
    ]


@protected.get("/api/fornitori/cataloghi")
def list_supplier_catalogs(db: Session = Depends(get_db)) -> dict[str, object]:
    catalogs = list(
        db.scalars(
            select(SupplierCatalog).order_by(SupplierCatalog.updated_at.desc(), SupplierCatalog.id.desc())
        )
    )
    return {"catalogs": [_serialize_supplier_catalog(catalog) for catalog in catalogs]}


@protected.get("/api/fornitori/cataloghi/{catalog_id}")
def get_supplier_catalog(catalog_id: int, db: Session = Depends(get_db)) -> dict[str, object]:
    catalog = db.get(SupplierCatalog, catalog_id)
    if catalog is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Catalogo fornitore non trovato")

    items = list(
        db.scalars(
            select(SupplierCatalogItem)
            .where(SupplierCatalogItem.catalog_id == catalog_id)
            .order_by(SupplierCatalogItem.sort_order.asc(), SupplierCatalogItem.id.asc())
        )
    )
    return {
        "catalog": _serialize_supplier_catalog(catalog),
        "items": [_serialize_supplier_catalog_item(item) for item in items],
    }


@protected.delete("/api/fornitori/cataloghi/{catalog_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_supplier_catalog(catalog_id: int, db: Session = Depends(get_db)) -> Response:
    catalog = db.get(SupplierCatalog, catalog_id)
    if catalog is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Catalogo fornitore non trovato")
    db.delete(catalog)
    db.commit()
    return Response(status_code=status.HTTP_204_NO_CONTENT)


@protected.get("/api/fornitori/cataloghi/items/search")
def search_supplier_catalog_items(
    q: str = Query(default="", alias="q"),
    limit: int = Query(default=80, ge=1, le=300),
    db: Session = Depends(get_db),
) -> dict[str, object]:
    search = (q or "").strip().lower()
    stmt = (
        select(SupplierCatalogItem, SupplierCatalog)
        .join(SupplierCatalog, SupplierCatalog.id == SupplierCatalogItem.catalog_id)
        .order_by(SupplierCatalog.updated_at.desc(), SupplierCatalogItem.sort_order.asc(), SupplierCatalogItem.id.asc())
    )

    if search:
        like_value = f"%{search}%"
        stmt = stmt.where(
            SupplierCatalogItem.source_name.ilike(like_value)
            | SupplierCatalogItem.source_supplier_name.ilike(like_value)
            | SupplierCatalogItem.source_lot_code.ilike(like_value)
            | SupplierCatalogItem.product_code.ilike(like_value)
            | SupplierCatalog.catalog_name.ilike(like_value)
            | SupplierCatalog.supplier_name.ilike(like_value)
        )

    rows = db.execute(stmt.limit(limit)).all()
    items: list[dict[str, object]] = []
    for item, catalog in rows:
        serialized = _serialize_supplier_catalog_item(item)
        serialized["catalog"] = _serialize_supplier_catalog(catalog)
        items.append(serialized)
    return {"items": items}


@protected.post("/api/fornitori/cataloghi")
def create_supplier_catalog(payload: SupplierCatalogCreateRequest, db: Session = Depends(get_db)) -> dict[str, object]:
    normalized_rows = [
        _normalize_supplier_catalog_row(
            row,
            supplier_name=payload.supplier_name,
            default_lot_code=payload.default_lot_code,
        )
        for row in payload.rows
        if row.source_name.strip()
    ]
    if not normalized_rows:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Il catalogo fornitore non contiene righe valide")

    resolved_supplier_name = (payload.supplier_name or normalized_rows[0].get("source_supplier_name") or "").strip() or None
    resolved_catalog_name = (
        (payload.catalog_name or "").strip()
        or (payload.source_file_name or "").strip()
        or (f"Catalogo {resolved_supplier_name}" if resolved_supplier_name else "Catalogo fornitore")
    )

    catalog = SupplierCatalog(
        catalog_name=resolved_catalog_name,
        supplier_name=resolved_supplier_name,
        source_file_name=(payload.source_file_name or "").strip() or None,
        total_items=len(normalized_rows),
    )
    db.add(catalog)
    db.flush()

    for index, row in enumerate(normalized_rows, start=1):
        db.add(
            SupplierCatalogItem(
                catalog_id=catalog.id,
                source_name=str(row.get("source_name") or "").strip(),
                source_lot_code=row.get("source_lot_code"),
                source_supplier_name=row.get("source_supplier_name"),
                product_code=row.get("product_code"),
                final_price_vat=row.get("final_price_vat"),
                vat_rate=row.get("vat_rate"),
                weight_kg=row.get("weight_kg"),
                unit_price_per_kg=row.get("unit_price_per_kg"),
                category=row.get("category"),
                notes=row.get("notes"),
                units_per_pack=row.get("units_per_pack"),
                liters_per_unit=row.get("liters_per_unit"),
                sort_order=index,
            )
        )

    db.commit()
    db.refresh(catalog)
    return {"ok": True, "catalog": _serialize_supplier_catalog(catalog)}


@protected.post("/api/prodotti/catalogo/preview")
def preview_supplier_catalog(payload: SupplierCatalogPreviewRequest, db: Session = Depends(get_db)) -> dict[str, object]:
    if not payload.rows:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Nessuna riga catalogo ricevuta")

    products = list(
        db.scalars(
            select(Product).where(Product.active.is_(True)).order_by(Product.supplier_name, Product.product_name, Product.lot_code)
        )
    )
    return _preview_supplier_catalog_rows(payload, products)


@protected.put("/api/prodotti/{product_id}")
def update_product(product_id: int, payload: ProductUpsert, db: Session = Depends(get_db)) -> dict[str, object]:
    product = db.get(Product, product_id)
    if product is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Prodotto non trovato")

    product_name, lot_code, supplier_name = _normalize_product_identity(payload)
    conflicting_product = db.scalar(
        select(Product).where(
            Product.id != product_id,
            Product.product_name == product_name,
            Product.lot_code == lot_code,
            Product.supplier_name == supplier_name,
        )
    )
    if conflicting_product is not None:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Esiste gia un altro prodotto con lo stesso nome, lotto e fornitore.",
        )

    product.product_name = product_name
    product.lot_code = lot_code
    product.supplier_name = supplier_name
    _apply_product_metadata(product, payload)
    product.active = True

    try:
        db.commit()
    except IntegrityError as exc:
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Aggiornamento prodotto in conflitto: controlla nome, lotto e fornitore.",
        ) from exc

    db.refresh(product)
    return {"ok": True, "product": _serialize_product(product)}


@protected.delete("/api/prodotti/{product_id}")
def delete_product(product_id: int, db: Session = Depends(get_db)) -> dict[str, object]:
    product = db.get(Product, product_id)
    if product is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Prodotto non trovato")

    product.active = False

    try:
        db.commit()
    except IntegrityError as exc:
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Eliminazione prodotto non riuscita.",
        ) from exc

    return {"ok": True, "product_id": product_id}


@protected.post("/api/prodotti")
def upsert_products(payload: list[ProductUpsert], db: Session = Depends(get_db)) -> dict[str, object]:
    if not payload:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Nessun prodotto ricevuto")

    normalized_payload = _dedupe_product_payload(payload)
    created = 0
    updated = 0
    for entry in normalized_payload:
        product_name, lot_code, supplier_name = _normalize_product_identity(entry)
        existing = db.scalar(
            select(Product).where(
                Product.product_name == product_name,
                Product.lot_code == lot_code,
                Product.supplier_name == supplier_name,
            )
        )
        if existing is None:
            product = Product(
                product_name=product_name,
                lot_code=lot_code,
                supplier_name=supplier_name,
                active=True,
            )
            _apply_product_metadata(product, entry)
            db.add(product)
            created += 1
            continue

        _apply_product_metadata(existing, entry)
        existing.active = True
        updated += 1

    duplicate_rows = len(payload) - len(normalized_payload)
    try:
        db.commit()
    except IntegrityError as exc:
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Import prodotti in conflitto: controlla se il file contiene righe duplicate o se lo stesso prodotto e' stato creato in parallelo.",
        ) from exc
    return {"ok": True, "created": created, "updated": updated, "duplicate_rows": duplicate_rows}


@protected.get("/api/ordini")
def list_orders(year: int | None = Query(default=None), db: Session = Depends(get_db)) -> list[dict[str, object]]:
    target_year = year or datetime.now(timezone.utc).year
    return [_serialize_batch(batch) for batch in _load_batches_for_year(target_year, db)]


@protected.get("/api/ordini/years")
def list_order_years(db: Session = Depends(get_db)) -> dict[str, object]:
    return {"years": _all_years(db)}


@protected.post("/api/ordini")
def create_order(
    payload: OrderCreateRequest,
    auth: AuthContext = Depends(require_token),
    db: Session = Depends(get_db),
) -> dict[str, object]:
    if not payload.items:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Nessun prodotto selezionato")

    product_ids = [item.product_id for item in payload.items]
    products = {
        product.id: product
        for product in db.scalars(select(Product).where(Product.id.in_(product_ids), Product.active.is_(True)))
    }
    missing = [item.product_id for item in payload.items if item.product_id not in products]
    if missing:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Prodotti non trovati: {missing}")

    supplier_names = {
        products[entry.product_id].supplier_name.strip()
        for entry in payload.items
        if products[entry.product_id].supplier_name.strip()
    }
    batch = _find_open_daily_supplier_batch(db, next(iter(supplier_names))) if len(supplier_names) == 1 else None
    if batch is None:
        batch = OrderBatch(staff=_normalize_staff(payload.staff, auth.default_staff))
        db.add(batch)
    else:
        batch.staff = _normalize_staff(payload.staff, batch.staff or auth.default_staff)

    aggregated_quantities: defaultdict[int, int] = defaultdict(int)
    for entry in payload.items:
        aggregated_quantities[entry.product_id] += entry.quantity
    for product_id, quantity in aggregated_quantities.items():
        _append_product_to_batch(batch, products[product_id], quantity)
    _recalculate_batch_total(batch)

    db.commit()
    db.refresh(batch)
    loaded_batch = db.scalar(select(OrderBatch).where(OrderBatch.id == batch.id).options(selectinload(OrderBatch.items)))
    if loaded_batch is None:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Ordine salvato ma non ricaricabile")
    return {"ok": True, "batch": _serialize_batch(loaded_batch)}


@protected.put("/api/ordini")
def update_order(
    payload: OrderUpdateRequest,
    auth: AuthContext = Depends(require_token),
    db: Session = Depends(get_db),
) -> dict[str, object]:
    if not payload.items:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Nessun prodotto selezionato")

    batch = db.scalar(select(OrderBatch).where(OrderBatch.id == payload.batch_id).options(selectinload(OrderBatch.items)))
    if batch is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Ordine non trovato")

    product_ids = [item.product_id for item in payload.items]
    products = {
        product.id: product
        for product in db.scalars(select(Product).where(Product.id.in_(product_ids), Product.active.is_(True)))
    }
    missing = [item.product_id for item in payload.items if item.product_id not in products]
    if missing:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Prodotti non trovati: {missing}")

    batch.staff = _normalize_staff(payload.staff, batch.staff or auth.default_staff)
    batch_total_estimated_amount = 0.0
    has_priced_lines = False

    batch.items.clear()
    db.flush()

    for entry in payload.items:
        product = products[entry.product_id]
        line_total = _estimate_line_total(
            entry.quantity,
            product.final_price_vat,
            weight_kg=product.weight_kg,
            unit_price_per_kg=product.unit_price_per_kg,
            vat_rate=product.vat_rate,
        )
        if line_total is not None:
            has_priced_lines = True
            batch_total_estimated_amount += line_total
        db.add(
            OrderItem(
                batch_id=batch.id,
                product_id=product.id,
                product_name=product.product_name,
                lot_code=product.lot_code,
                supplier_name=product.supplier_name,
                quantity=entry.quantity,
                final_price_vat_snapshot=product.final_price_vat,
                estimated_line_total=line_total,
                units_per_pack=product.units_per_pack,
                liters_per_unit=_resolve_liters_per_unit(product.product_name, product.lot_code, product.liters_per_unit),
            )
        )

    batch.total_estimated_amount = round(batch_total_estimated_amount, 2) if has_priced_lines else None

    db.commit()
    loaded_batch = db.scalar(select(OrderBatch).where(OrderBatch.id == batch.id).options(selectinload(OrderBatch.items)))
    if loaded_batch is None:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Ordine aggiornato ma non ricaricabile")
    return {"ok": True, "batch": _serialize_batch(loaded_batch)}


@protected.delete("/api/ordini")
def delete_order(payload: OrderDeleteRequest, db: Session = Depends(get_db)) -> dict[str, object]:
    batch: OrderBatch | None = None
    if payload.batch_id is not None:
        batch = db.scalar(select(OrderBatch).where(OrderBatch.id == payload.batch_id).options(selectinload(OrderBatch.items)))
    elif payload.timestamp:
        try:
            target = datetime.fromisoformat(payload.timestamp.replace("Z", "+00:00"))
        except ValueError as exc:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Timestamp ordine non valido") from exc
        batch = db.scalar(select(OrderBatch).where(OrderBatch.confirmed_at == target).options(selectinload(OrderBatch.items)))
    else:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Specifica batch_id o timestamp")

    if batch is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Ordine non trovato")

    removed_items = [_serialize_item(item) for item in batch.items]
    db.delete(batch)
    db.commit()
    return {"ok": True, "removed": removed_items}


@protected.get("/api/ordini/ultimo")
def latest_order(db: Session = Depends(get_db)) -> dict[str, object]:
    stmt = select(OrderBatch).options(selectinload(OrderBatch.items)).order_by(OrderBatch.confirmed_at.desc(), OrderBatch.id.desc())
    batch = db.scalar(stmt)
    if batch is None:
        return {"batch": None}
    return {"batch": _serialize_batch(batch)}


@protected.get("/api/ordini/confronto")
def compare_orders(year: int | None = Query(default=None), db: Session = Depends(get_db)) -> dict[str, object]:
    target_year = year or datetime.now(timezone.utc).year
    current_batches = _load_batches_for_year(target_year, db)
    previous_year = target_year - 1
    previous_batches = _load_batches_for_year(previous_year, db)

    current_totals: dict[str, int] = defaultdict(int)
    previous_totals: dict[str, int] = defaultdict(int)

    for batch in current_batches:
        for item in batch.items:
            current_totals[item.product_name] += item.quantity

    for batch in previous_batches:
        for item in batch.items:
            previous_totals[item.product_name] += item.quantity

    product_names = sorted(set(current_totals) | set(previous_totals), key=lambda name: (-current_totals.get(name, 0), name))
    items = []
    for product_name in product_names:
        current_quantity = current_totals.get(product_name, 0)
        previous_quantity = previous_totals.get(product_name, 0)
        delta = current_quantity - previous_quantity
        delta_percent = None
        if previous_quantity:
            delta_percent = round((delta / previous_quantity) * 100, 1)
        elif current_quantity:
            delta_percent = None
        items.append(
            {
                "product_name": product_name,
                "current_quantity": current_quantity,
                "previous_quantity": previous_quantity,
                "delta": delta,
                "delta_percent": delta_percent,
            }
        )

    return {
        "year": target_year,
        "previous_year": previous_year,
        "current_total": sum(current_totals.values()),
        "previous_total": sum(previous_totals.values()),
        "items": items,
    }


@protected.get("/api/ordine_sospeso")
def get_suspended_order(
    staff: str | None = Query(default=None),
    auth: AuthContext = Depends(require_token),
    db: Session = Depends(get_db),
) -> dict[str, int]:
    suspended = db.scalar(select(SuspendedOrder).where(SuspendedOrder.staff == _normalize_staff(staff, auth.default_staff)))
    return suspended.payload if suspended else {}


@protected.post("/api/ordine_sospeso")
def save_suspended_order(
    payload: dict[str, int] = Body(...),
    staff: str | None = Query(default=None),
    auth: AuthContext = Depends(require_management_token),
    db: Session = Depends(get_db),
) -> dict[str, object]:
    normalized = _normalize_suspended_payload(payload)
    if not normalized:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Nessun prodotto da sospendere")

    target_staff = _normalize_staff(staff, auth.default_staff)
    suspended = db.scalar(select(SuspendedOrder).where(SuspendedOrder.staff == target_staff))
    if suspended is None:
        suspended = SuspendedOrder(staff=target_staff, payload=normalized)
        db.add(suspended)
    else:
        suspended.payload = normalized
    db.commit()
    return {"ok": True}


@protected.delete("/api/ordine_sospeso")
def delete_suspended_order(
    staff: str | None = Query(default=None),
    auth: AuthContext = Depends(require_management_token),
    db: Session = Depends(get_db),
) -> Response:
    suspended = db.scalar(select(SuspendedOrder).where(SuspendedOrder.staff == _normalize_staff(staff, auth.default_staff)))
    if suspended is not None:
        db.delete(suspended)
        db.commit()
    return Response(status_code=status.HTTP_204_NO_CONTENT)


@protected.get("/api/note")
def list_notes(db: Session = Depends(get_db)) -> list[dict[str, object]]:
    stmt = select(SharedNote).order_by(SharedNote.created_at.asc(), SharedNote.id.asc())
    return [
        {
            "id": note.id,
            "author": note.author,
            "text": note.text,
            "timestamp": _format_iso(note.created_at),
        }
        for note in db.scalars(stmt)
    ]


@protected.post("/api/note")
def create_note(
    payload: SharedNoteCreate,
    auth: AuthContext = Depends(require_management_token),
    db: Session = Depends(get_db),
) -> dict[str, object]:
    note = SharedNote(author=_normalize_staff(None, auth.default_staff), text=_normalize_note_text(payload.text))
    db.add(note)
    db.commit()
    db.refresh(note)
    return {
        "note": {
            "id": note.id,
            "author": note.author,
            "text": note.text,
            "timestamp": _format_iso(note.created_at),
        }
    }


@protected.delete("/api/note/{note_id}")
def delete_note(note_id: int, db: Session = Depends(get_db)) -> Response:
    note = db.get(SharedNote, note_id)
    if note is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Nota non trovata")
    db.delete(note)
    db.commit()
    return Response(status_code=status.HTTP_204_NO_CONTENT)


@protected.put("/api/obiettivi/{goal_id}")
def update_goal(goal_id: int, payload: GoalUpsert, db: Session = Depends(get_db)) -> dict[str, object]:
    goal = db.get(SeasonalGoal, goal_id)
    if goal is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Obiettivo non trovato")

    goal_type = str(payload.goal_type or "").strip()
    if goal_type not in {"quantity", "liters", "liters_dual", "note"}:
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Tipo obiettivo non valido")

    goal.name = payload.name.strip()
    goal.goal_type = goal_type
    goal.year = payload.year or goal.year
    goal.description = payload.description
    goal.product_match = payload.product_match
    goal.secondary_product_match = payload.secondary_product_match
    goal.supplier_match = payload.supplier_match
    goal.target = payload.target
    goal.secondary_target = payload.secondary_target
    goal.unit_label = payload.unit_label
    goal.bonus_label = payload.bonus_label

    if goal.goal_type == "note":
        goal.product_match = None
        goal.secondary_product_match = None
        goal.supplier_match = None
        goal.target = None
        goal.secondary_target = None
        goal.unit_label = None
        if not goal.description:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Per una nota serve una descrizione.")
    elif goal.goal_type == "liters_dual":
        if not goal.product_match or not goal.secondary_product_match:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Per un doppio target servono prodotto principale e secondo prodotto.",
            )
        if goal.target is None or goal.secondary_target is None:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Per un doppio target servono target principale e target secondario.",
            )
        goal.unit_label = goal.unit_label or "L"
    else:
        if goal.target is None:
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Per questo obiettivo serve un target numerico.")
        if not (goal.product_match or goal.supplier_match):
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="Per questo obiettivo indica almeno prodotto o fornitore.",
            )
        goal.secondary_product_match = None
        goal.secondary_target = None
        goal.unit_label = goal.unit_label or ("L" if goal.goal_type == "liters" else "articoli")

    db.commit()
    db.refresh(goal)
    batches = _load_batches_for_year(goal.year, db)
    return {"ok": True, "goal": _goal_progress(goal, batches)}


@protected.get("/api/obiettivi")
def list_goals(year: int | None = Query(default=None), db: Session = Depends(get_db)) -> dict[str, object]:
    target_year = year or datetime.now(timezone.utc).year
    stmt = (
        select(SeasonalGoal)
        .where(SeasonalGoal.year == target_year)
        .order_by(case((SeasonalGoal.goal_type == "note", 1), else_=0).asc(), SeasonalGoal.id.asc())
    )
    goals = list(db.scalars(stmt))
    batches = _load_batches_for_year(target_year, db)
    return {
        "year": target_year,
        "available": _all_years(db),
        "goals": [_goal_progress(goal, batches) for goal in goals],
    }


@router.get("/storico_ordini.csv")
def export_orders_csv(
    year: int | None = Query(default=None),
    _: AuthContext = Depends(require_management_token),
    db: Session = Depends(get_db),
) -> StreamingResponse:
    target_year = year or datetime.now(timezone.utc).year
    batches = _load_batches_for_year(target_year, db)
    buffer = io.StringIO()
    writer = csv.writer(buffer)
    writer.writerow(["Data", "Prodotto", "LOTTO", "FORNITORE", "Quantita"])
    for batch in batches:
        timestamp = _format_iso(batch.confirmed_at) or ""
        for item in batch.items:
            writer.writerow([timestamp, item.product_name, item.lot_code, item.supplier_name, item.quantity])
    buffer.seek(0)
    headers = {"Content-Disposition": f'attachment; filename="storico_ordini_{target_year}.csv"'}
    return StreamingResponse(iter([buffer.getvalue()]), media_type="text/csv; charset=utf-8", headers=headers)


router.include_router(protected)
