# -*- coding: utf-8 -*-
import argparse
import asyncio
import asyncpg
import aiohttp
from datetime import datetime
from io import BytesIO
from PIL import Image, UnidentifiedImageError
import imagehash

from dome_group_match import find_or_create_group_id
from shopprod_group_winner_policy import refresh_group_winner_and_tokens_if_needed
from db_config import DB_INFO_ASYNCPG as DB_INFO

# =========================
# CONFIG
# =========================
FETCH_SIZE = 100
DB_POOL_SIZE = 20
HTTP_CONCURRENCY = 64

# 성능 옵션:
# - shopprod_group 전체 preload는 DB read + 메모리 사용량이 큼(테이블이 크면 수 GB).
# - 대신 dome_group_match.find_or_create_group_id()가 group_key UNIQUE 인덱스로 exact lookup을 먼저 수행하도록 보강되어,
#   preload 없이도 정확 매칭은 빠르게 처리된다.
PRELOAD_GROUP_INDEX = False

http_sem = asyncio.Semaphore(HTTP_CONCURRENCY)

# in-memory group index
group_index = {}
group_lock = asyncio.Lock()

# =========================
# DATE UTILS
# =========================
def to_date_safe(value):
    """datetime/date/str -> date (실패 시 None)"""
    try:
        if value is None:
            return None
        # asyncpg가 datetime/date로 주는 케이스
        if hasattr(value, "date"):
            return value.date()
        s = str(value)
        if len(s) >= 10:
            return datetime.strptime(s[:10], "%Y-%m-%d").date()
    except Exception:
        return None
    return None

# =========================
# IMAGE UTILS
# =========================
async def get_image_hash(session, url):
    try:
        async with http_sem:
            async with session.get(
                url,
                timeout=aiohttp.ClientTimeout(total=8)
            ) as r:
                if r.status != 200:
                    return None, None
                content = await r.read()

        bio = BytesIO(content)
        img = Image.open(bio)
        img.verify()
        bio.seek(0)
        img = Image.open(bio).convert("RGB")

        return imagehash.phash(img), imagehash.dhash(img)

    except (
        aiohttp.ClientError,
        asyncio.TimeoutError,
        UnidentifiedImageError,
        OSError
    ):
        return None, None


# =========================
# DB PRELOAD
# =========================
async def load_group_index(pool):
    async with pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT group_id, group_key
            FROM mlinkdw.shopprod_group2
        """)
        for r in rows:
            try:
                ph, dh = r["group_key"].split("_")
                group_index[(ph, dh)] = r["group_id"]
            except Exception:
                continue


# =========================
# SQL
# =========================
CHANGE_SQL = """
SELECT
    a.status,
    c.group_id,
    c.vender_grade AS old_vender_grade,
    CASE WHEN b.vender_grade ~ '^[0-9]+$'
         THEN b.vender_grade::int ELSE 16 END AS new_vender_grade,
    c.iname   AS old_iname,
    a.iname   AS new_iname,
    c.price   AS old_price,
    a.price   AS new_price,
    c.img_url AS old_img_url,
    (a.img::jsonb)->>0 AS new_img_url,
    c.content AS old_content,
    a.content AS new_content,
    a.icode,
    a.vender_code,
    a.up_datetime,
    (c.img_url IS DISTINCT FROM (a.img::jsonb)->>0) AS img_changed
FROM mlinkdw.shopprodinfo_domeggook a
JOIN mlinkdw.supplier_info b
  ON b.dome='DMM' AND b.supplier_code=a.vender_code
JOIN mlinkdw.shopprod_group_map2 c
  ON c.dome_code='DMM' AND c.icode=a.icode
WHERE a.is_overseas='0'
  AND a.up_datetime > $1
  AND NOT EXISTS (
        SELECT 1
        FROM mlinkdw.shopprodinfo_unmaster d
        WHERE d.vender_code=a.vender_code
          AND d.icode=a.icode
  )
  AND (
        c.iname   IS DISTINCT FROM a.iname OR
        '0'       IS DISTINCT FROM a.status OR
        c.price   IS DISTINCT FROM a.price OR
        c.img_url IS DISTINCT FROM (a.img::jsonb)->>0 OR
        c.content IS DISTINCT FROM a.content OR
        c.vender_grade IS DISTINCT FROM coalesce(b.vender_grade,'16') 
  )
LIMIT $2;
"""

UPDATE_SQL = """
UPDATE mlinkdw.shopprod_group_map2
SET
    group_id     = $1,
    vender_grade = $2,
    iname        = $3,
    price        = $4,
    img_url      = $5,
    content      = $6,
    up_datetime  = to_char(now(), 'YYYY-MM-DD HH24:MI:SS'::text)
WHERE dome_code='DMM'
  AND icode=$7;
"""

DELETE_SQL = """
DELETE FROM mlinkdw.shopprod_group_map2
WHERE dome_code='DMM'
  AND icode=$1;
"""


# =========================
# MAIN
# =========================
async def main(last_up_dt: str):
    pool = await asyncpg.create_pool(
        **DB_INFO,
        min_size=5,
        max_size=DB_POOL_SIZE
    )

    if PRELOAD_GROUP_INDEX:
        await load_group_index(pool)

    timeout = aiohttp.ClientTimeout(total=10)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with pool.acquire() as conn:
            # 세션 식별(운영에서 pg_stat_activity/pg_stat_statements로 원인 추적 쉽게)
            try:
                await conn.execute("SET application_name = 'Update_DomeDuplicate_4stDomeme'")
            except Exception:
                pass

            while True:
                rows = await conn.fetch(
                    CHANGE_SQL,
                    last_up_dt,
                    FETCH_SIZE
                )
                if not rows:
                    print("[INFO] no more changes")
                    break

                # -----------------------------
                # 1️⃣ 이미지 변경된 row만 병렬 hash 계산
                # -----------------------------
                img_tasks = {}
                for r in rows:
                    if r["status"] == '0' and r["img_changed"] and r["new_img_url"]:
                        img_tasks[r["icode"]] = asyncio.create_task(
                            get_image_hash(session, r["new_img_url"])
                        )

                if img_tasks:
                    await asyncio.gather(
                        *img_tasks.values(),
                        return_exceptions=True
                    )

                # Task 결과 → 안전한 결과 dict
                img_results = {}
                for icode, task in img_tasks.items():
                    try:
                        res = task.result()
                        if isinstance(res, tuple) and len(res) == 2:
                            img_results[icode] = res
                        else:
                            img_results[icode] = (None, None)
                    except Exception:
                        img_results[icode] = (None, None)

                # -----------------------------
                # 2️⃣ DB 반영 (직렬)
                # -----------------------------
                async with conn.transaction():
                    for r in rows:
                        icode = r["icode"]
                        created_new_group = False

                        if r["status"] != '0':
                            # ❌ 판매중지/품절 → 삭제
                            old_gid = None
                            try:
                                old_gid = int(r["group_id"]) if r["group_id"] is not None else None
                            except Exception:
                                old_gid = None
                            await conn.execute(DELETE_SQL, icode)
                            if old_gid is not None:
                                await refresh_group_winner_and_tokens_if_needed(conn, old_gid)
                        else:
                            new_group_id = r["group_id"]

                            # 🔥 이미지 변경 → group 재계산
                            if r["img_changed"]:
                                # new_img_url이 없거나 해시 계산 실패(None)이면 unmaster로 이동
                                if not r["new_img_url"]:
                                    await conn.execute("""
                                        INSERT INTO mlinkdw.shopprodinfo_unmaster
                                        (icode, representative_img, iname, price,
                                         vender_code, vender_grade, reg_date)
                                        VALUES ($1,$2,$3,$4,$5,$6,$7)
                                        ON CONFLICT (vender_code,icode)
                                        DO UPDATE SET
                                            representative_img = EXCLUDED.representative_img,
                                            iname = EXCLUDED.iname,
                                            price = EXCLUDED.price,
                                            vender_grade = EXCLUDED.vender_grade,
                                            reg_date = EXCLUDED.reg_date,
                                            created_at = CURRENT_TIMESTAMP
                                    """,
                                        icode,
                                        r["new_img_url"],
                                        r["new_iname"],
                                        r["new_price"],
                                        r["vender_code"],
                                        r["new_vender_grade"],
                                        to_date_safe(r["up_datetime"]),
                                    )
                                    try:
                                        old_gid = int(r["group_id"]) if r["group_id"] is not None else None
                                    except Exception:
                                        old_gid = None
                                    await conn.execute(DELETE_SQL, icode)
                                    if old_gid is not None:
                                        await refresh_group_winner_and_tokens_if_needed(conn, old_gid)
                                    continue

                                phash, dhash = img_results.get(icode, (None, None))
                                if phash is None or dhash is None:
                                    await conn.execute("""
                                        INSERT INTO mlinkdw.shopprodinfo_unmaster
                                        (icode, representative_img, iname, price,
                                         vender_code, vender_grade, reg_date)
                                        VALUES ($1,$2,$3,$4,$5,$6,$7)
                                        ON CONFLICT (vender_code,icode)
                                        DO UPDATE SET
                                            representative_img = EXCLUDED.representative_img,
                                            iname = EXCLUDED.iname,
                                            price = EXCLUDED.price,
                                            vender_grade = EXCLUDED.vender_grade,
                                            reg_date = EXCLUDED.reg_date,
                                            created_at = CURRENT_TIMESTAMP
                                    """,
                                        icode,
                                        r["new_img_url"],
                                        r["new_iname"],
                                        r["new_price"],
                                        r["vender_code"],
                                        r["new_vender_grade"],
                                        to_date_safe(r["up_datetime"]),
                                    )
                                    try:
                                        old_gid = int(r["group_id"]) if r["group_id"] is not None else None
                                    except Exception:
                                        old_gid = None
                                    await conn.execute(DELETE_SQL, icode)
                                    if old_gid is not None:
                                        await refresh_group_winner_and_tokens_if_needed(conn, old_gid)
                                    continue

                                if phash and dhash:
                                    key = (str(phash), str(dhash))
                                    async with group_lock:
                                        new_group_id = group_index.get(key)
                                        if not new_group_id:
                                            new_group_id, created_new_group = await find_or_create_group_id(
                                                conn,
                                                group_index,
                                                phash,
                                                dhash,
                                                threshold=8,
                                                dhash_threshold=10,
                                                iname=r["new_iname"],
                                                iname_threshold=30,
                                                fallback_iname_tokens=True,
                                                fallback_content_html=r.get("new_content"),
                                                fallback_iname_dome_code='DMM',
                                                fallback_content_min_matches=1,
                                                fallback_content_min_ratio=1.0,
                                                current_img_url=r["new_img_url"],
                                                color_check=False,
                                                current_vender_code=r["vender_code"],
                                                current_icode=icode,
                                                current_price=r["new_price"],
                                                current_vender_grade=r["new_vender_grade"],
                                                current_reg_date=to_date_safe(r["up_datetime"]),
                                                current_dome_code='DMM',
                                                current_content_html=r.get("new_content"),
                                                verify_gid_exists=False,
                                                maintain_winner_tokens_on_existing_group=True,
                                                merge_moved_tokens_on_existing_group=True,
                                                return_meta=True,
                                            )

                            await conn.execute(
                                UPDATE_SQL,
                                new_group_id,
                                str(r["new_vender_grade"]),
                                r["new_iname"],
                                r["new_price"],
                                r["new_img_url"],
                                r["new_content"],
                                icode
                            )

                            # ✅ winner 재선정 + winner 변경 시 content_token 정리/재생성
                            old_gid = None
                            new_gid = None
                            try:
                                old_gid = int(r["group_id"]) if r["group_id"] is not None else None
                            except Exception:
                                old_gid = None
                            try:
                                new_gid = int(new_group_id) if new_group_id is not None else None
                            except Exception:
                                new_gid = None

                            need_refresh = False
                            try:
                                need_refresh = (r["old_price"] != r["new_price"]) or (r["old_vender_grade"] != r["new_vender_grade"])
                            except Exception:
                                pass
                            if old_gid is not None and new_gid is not None and old_gid != new_gid:
                                need_refresh = True

                            if need_refresh:
                                if new_gid is not None and not created_new_group:
                                    await refresh_group_winner_and_tokens_if_needed(conn, new_gid)
                                if old_gid is not None and old_gid != new_gid:
                                    await refresh_group_winner_and_tokens_if_needed(conn, old_gid)

                        # last_up_dt = max(last_up_dt, r["up_datetime"])

                print(f"[INFO] batch processed={len(rows)}, last_up_dt={last_up_dt}")

    await pool.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Update DomeDuplicate (DMM) based on up_datetime")
    parser.add_argument(
        "last_up_dt",
        nargs="?",
        default="2026-01-01",
        help="변경분 조회 시작 시각. 예) 2026-01-01 또는 2026-01-01 00:00:00 (default: 2026-01-01)",
    )
    args = parser.parse_args()
    asyncio.run(main(args.last_up_dt))
