# -*- coding: utf-8 -*-
import argparse
import asyncio
import asyncpg
import aiohttp
from datetime import datetime
from io import BytesIO
from PIL import Image, UnidentifiedImageError
import imagehash
from concurrent.futures import ThreadPoolExecutor
import requests

from dome_group_match import find_or_create_group_id
from shopprod_group_winner_policy import refresh_group_winner_and_tokens_if_needed
from db_config import DB_INFO_ASYNCPG as DB_INFO

# =========================
# CONFIG
# =========================
FETCH_SIZE = 100
DB_POOL_SIZE = 20
HTTP_WORKERS = 32   # requests 동시 실행 수

# 성능 옵션:
# - shopprod_group 전체 preload는 DB read + 메모리 사용량이 큼(테이블이 크면 수 GB).
# - 대신 dome_group_match.find_or_create_group_id()가 group_key UNIQUE 인덱스로 exact lookup을 먼저 수행하도록 보강되어,
#   preload 없이도 정확 매칭은 빠르게 처리된다.
PRELOAD_GROUP_INDEX = False

executor = ThreadPoolExecutor(max_workers=HTTP_WORKERS)

REQ_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    ),
    "Accept": "image/*,*/*;q=0.8",
    "Connection": "close",
}

# in-memory group index
group_index = {}
group_lock = asyncio.Lock()

# =========================
# DATE UTILS
# =========================
def to_date_safe(value):
    """datetime/date/str -> date (실패 시 None)"""
    try:
        if value is None:
            return None
        # asyncpg가 datetime/date로 주는 케이스
        if hasattr(value, "date"):
            return value.date()
        s = str(value)
        if len(s) >= 10:
            return datetime.strptime(s[:10], "%Y-%m-%d").date()
    except Exception:
        return None
    return None

# -----------------------------
# HTTP (requests, sync)
# -----------------------------
def get_image_hash_requests(url):
    if not url or not isinstance(url, str) or url.strip() == "":
        return None, None

    try:
        r = requests.get(
            url,
            headers=REQ_HEADERS,
            timeout=(5, 10)
        )
        if r.status_code != 200:
            return None, None

        bio = BytesIO(r.content)
        img = Image.open(bio)
        img.verify()

        bio.seek(0)
        img = Image.open(bio).convert("RGB")

        return imagehash.phash(img), imagehash.dhash(img)

    except Exception:
        return None, None


# -----------------------------
# async wrapper
# -----------------------------
async def get_image_hash(url):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        executor, get_image_hash_requests, url
    )


# =========================
# DB PRELOAD
# =========================
async def load_group_index(pool):
    async with pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT group_id, group_key
            FROM mlinkdw.shopprod_group2
        """)
        for r in rows:
            try:
                ph, dh = r["group_key"].split("_")
                group_index[(ph, dh)] = r["group_id"]
            except Exception:
                continue


# =========================
# SQL
# =========================
CHANGE_SQL = """
SELECT
    a.status,
    c.group_id,
    c.vender_grade AS old_vender_grade,
    16 AS new_vender_grade,
    c.iname   AS old_iname,
    a.iname   AS new_iname,
    c.price   AS old_price,
    a.price   AS new_price,
    c.img_url AS old_img_url,
    (a.img::jsonb)->>0 AS new_img_url,
    c.content AS old_content,
    a.content AS new_content,
    a.icode,
    a.vender_code,
    a.up_datetime,
    (c.img_url IS DISTINCT FROM (a.img::jsonb)->>0) AS img_changed
FROM mlinkdw.shopprodinfo_domechanggo a
JOIN mlinkdw.shopprod_group_map2 c ON c.dome_code='CHG' AND c.icode=a.icode
WHERE a.is_overseas='0'
  AND a.up_datetime > $1
  AND NOT EXISTS (
        SELECT 1
        FROM mlinkdw.shopprodinfo_unmaster d
        WHERE d.vender_code=a.vender_code
          AND d.icode=a.icode
  )
  AND (
        c.iname   IS DISTINCT FROM a.iname OR
        '0'       IS DISTINCT FROM a.status OR
        c.price   IS DISTINCT FROM a.price OR
        c.img_url IS DISTINCT FROM (a.img::jsonb)->>0 OR
        c.content IS DISTINCT FROM a.content
  )
LIMIT $2;
"""

UPDATE_SQL = """
UPDATE mlinkdw.shopprod_group_map2
SET
    group_id     = $1,
    vender_grade = $2,
    iname        = $3,
    price        = $4,
    img_url      = $5,
    content      = $6,
    up_datetime  = to_char(now(), 'YYYY-MM-DD HH24:MI:SS'::text)
WHERE dome_code='CHG'
  AND icode=$7;
"""

DELETE_SQL = """
DELETE FROM mlinkdw.shopprod_group_map2
WHERE dome_code='CHG'
  AND icode=$1;
"""


# =========================
# MAIN
# =========================
async def main(last_up_dt: str):
    pool = await asyncpg.create_pool(
        **DB_INFO,
        min_size=5,
        max_size=DB_POOL_SIZE
    )

    if PRELOAD_GROUP_INDEX:
        await load_group_index(pool)

    timeout = aiohttp.ClientTimeout(total=10)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with pool.acquire() as conn:
            # 세션 식별(운영에서 pg_stat_activity/pg_stat_statements로 원인 추적 쉽게)
            try:
                await conn.execute("SET application_name = 'Update_DomeDuplicate_4stChanggo'")
            except Exception:
                pass

            while True:
                rows = await conn.fetch(
                    CHANGE_SQL,
                    last_up_dt,
                    FETCH_SIZE
                )
                if not rows:
                    print("[INFO] no more changes")
                    break

                # -----------------------------
                # 1️⃣ 이미지 변경된 row만 병렬 hash 계산
                # -----------------------------
                img_tasks = {}
                for r in rows:
                    if r["status"] == '0' and r["img_changed"] and r["new_img_url"]:
                        img_tasks[r["icode"]] = asyncio.create_task(
                            get_image_hash(r["new_img_url"])
                        )

                if img_tasks:
                    await asyncio.gather(
                        *img_tasks.values(),
                        return_exceptions=True
                    )

                # Task 결과 → 안전한 결과 dict
                img_results = {}
                for icode, task in img_tasks.items():
                    try:
                        res = task.result()
                        if isinstance(res, tuple) and len(res) == 2:
                            img_results[icode] = res
                        else:
                            img_results[icode] = (None, None)
                    except Exception:
                        img_results[icode] = (None, None)

                # -----------------------------
                # 2️⃣ DB 반영 (직렬)
                # -----------------------------
                async with conn.transaction():
                    for r in rows:
                        icode = r["icode"]
                        # img_changed 경로에서 신규 그룹을 만들었는지 여부 (중복 refresh/token화 방지용)
                        created_new_group = False

                        if r["status"] != '0':
                            # ❌ 판매중지/품절 → 삭제
                            old_gid = None
                            try:
                                old_gid = int(r["group_id"]) if r["group_id"] is not None else None
                            except Exception:
                                old_gid = None
                            await conn.execute(DELETE_SQL, icode)
                            # 삭제로 winner가 바뀔 수 있으므로 refresh
                            if old_gid is not None:
                                await refresh_group_winner_and_tokens_if_needed(conn, old_gid)
                        else:
                            new_group_id = r["group_id"]

                            # 🔥 이미지 변경 → group 재계산
                            if r["img_changed"]:
                                # new_img_url이 없거나 해시 계산 실패(None)이면 unmaster로 이동
                                if not r["new_img_url"]:
                                    await conn.execute("""
                                        INSERT INTO mlinkdw.shopprodinfo_unmaster
                                        (icode, representative_img, iname, price,
                                         vender_code, vender_grade, reg_date)
                                        VALUES ($1,$2,$3,$4,$5,$6,$7)
                                        ON CONFLICT (vender_code,icode)
                                        DO UPDATE SET
                                            representative_img = EXCLUDED.representative_img,
                                            iname = EXCLUDED.iname,
                                            price = EXCLUDED.price,
                                            vender_grade = EXCLUDED.vender_grade,
                                            reg_date = EXCLUDED.reg_date,
                                            created_at = CURRENT_TIMESTAMP
                                    """,
                                        icode,
                                        r["new_img_url"],
                                        r["new_iname"],
                                        r["new_price"],
                                        r["vender_code"],
                                        r["new_vender_grade"],
                                        to_date_safe(r["up_datetime"]),
                                    )
                                    # group_map에서 빠졌으므로 winner refresh
                                    try:
                                        old_gid = int(r["group_id"]) if r["group_id"] is not None else None
                                    except Exception:
                                        old_gid = None
                                    await conn.execute(DELETE_SQL, icode)
                                    if old_gid is not None:
                                        await refresh_group_winner_and_tokens_if_needed(conn, old_gid)
                                    continue

                                phash, dhash = img_results.get(icode, (None, None))
                                if phash is None or dhash is None:
                                    await conn.execute("""
                                        INSERT INTO mlinkdw.shopprodinfo_unmaster
                                        (icode, representative_img, iname, price,
                                         vender_code, vender_grade, reg_date)
                                        VALUES ($1,$2,$3,$4,$5,$6,$7)
                                        ON CONFLICT (vender_code,icode)
                                        DO UPDATE SET
                                            representative_img = EXCLUDED.representative_img,
                                            iname = EXCLUDED.iname,
                                            price = EXCLUDED.price,
                                            vender_grade = EXCLUDED.vender_grade,
                                            reg_date = EXCLUDED.reg_date,
                                            created_at = CURRENT_TIMESTAMP
                                    """,
                                        icode,
                                        r["new_img_url"],
                                        r["new_iname"],
                                        r["new_price"],
                                        r["vender_code"],
                                        r["new_vender_grade"],
                                        to_date_safe(r["up_datetime"]),
                                    )
                                    # group_map에서 빠졌으므로 winner refresh
                                    try:
                                        old_gid = int(r["group_id"]) if r["group_id"] is not None else None
                                    except Exception:
                                        old_gid = None
                                    await conn.execute(DELETE_SQL, icode)
                                    if old_gid is not None:
                                        await refresh_group_winner_and_tokens_if_needed(conn, old_gid)
                                    continue

                                if phash and dhash:
                                    created_new_group = False
                                    key = (str(phash), str(dhash))
                                    async with group_lock:
                                        new_group_id = group_index.get(key)
                                        if not new_group_id:
                                            new_group_id, created_new_group = await find_or_create_group_id(
                                                conn,
                                                group_index,
                                                phash,
                                                dhash,
                                                threshold=8,
                                                dhash_threshold=10,
                                                iname=r["new_iname"],
                                                iname_threshold=30,
                                                fallback_iname_tokens=True,
                                                fallback_content_html=r.get("new_content"),
                                                fallback_iname_dome_code='CHG',
                                                fallback_content_min_matches=1,
                                                fallback_content_min_ratio=1.0,
                                                current_img_url=r["new_img_url"],
                                                color_check=False,
                                                current_vender_code=r["vender_code"],
                                                current_icode=icode,
                                                current_price=r["new_price"],
                                                current_vender_grade=r["new_vender_grade"],
                                                current_reg_date=to_date_safe(r["up_datetime"]),
                                                current_dome_code='CHG',
                                                current_content_html=r.get("new_content"),
                                                verify_gid_exists=False,
                                                maintain_winner_tokens_on_existing_group=True,
                                                merge_moved_tokens_on_existing_group=True,
                                                return_meta=True,
                                            )

                            await conn.execute(
                                UPDATE_SQL,
                                new_group_id,
                                str(r["new_vender_grade"]),
                                r["new_iname"],
                                r["new_price"],
                                r["new_img_url"],
                                r["new_content"],
                                icode
                            )

                            # ✅ winner 재선정 + winner 변경 시 content_token 정리/재생성
                            old_gid = None
                            new_gid = None
                            try:
                                old_gid = int(r["group_id"]) if r["group_id"] is not None else None
                            except Exception:
                                old_gid = None
                            try:
                                new_gid = int(new_group_id) if new_group_id is not None else None
                            except Exception:
                                new_gid = None

                            need_refresh = False
                            try:
                                need_refresh = (r["old_price"] != r["new_price"]) or (r["old_vender_grade"] != r["new_vender_grade"])
                            except Exception:
                                pass
                            if old_gid is not None and new_gid is not None and old_gid != new_gid:
                                need_refresh = True

                            if need_refresh:
                                # 신규 그룹을 방금 만든 케이스면 find_or_create_group_id에서
                                # winner/token을 이미 반영하므로 new_gid refresh는 스킵(중복 작업 방지)
                                if new_gid is not None and not created_new_group:
                                    await refresh_group_winner_and_tokens_if_needed(conn, new_gid)
                                if old_gid is not None and old_gid != new_gid:
                                    await refresh_group_winner_and_tokens_if_needed(conn, old_gid)

                        # last_up_dt = max(last_up_dt, r["up_datetime"])

                print(f"[INFO] batch processed={len(rows)}, last_up_dt={last_up_dt}")

    await pool.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Update DomeDuplicate (CHG) based on up_datetime")
    parser.add_argument(
        "last_up_dt",
        nargs="?",
        default="2026-01-01",
        help="변경분 조회 시작 시각. 예) 2026-01-01 또는 2026-01-01 00:00:00 (default: 2026-01-01)",
    )
    args = parser.parse_args()
    asyncio.run(main(args.last_up_dt))
