# -*- coding: utf-8 -*-
import asyncio
import asyncpg
import aiohttp
from datetime import datetime
from io import BytesIO
from PIL import Image, UnidentifiedImageError
import imagehash
from urllib.parse import urlparse, urlunparse

from dome_group_match import find_or_create_group_id

FETCH_SIZE = 100
DB_POOL_SIZE = 20

# 성능 옵션:
# - shopprod_group 전체 preload는 DB read + 메모리 사용량이 큼(테이블이 크면 수 GB).
# - 대신 dome_group_match.find_or_create_group_id()가 group_key UNIQUE 인덱스로 exact lookup을 먼저 수행하도록 보강되어,
#   preload 없이도 정확 매칭은 빠르게 처리된다.
PRELOAD_GROUP_INDEX = False

HTTP_CONCURRENCY = 64
http_sem = asyncio.Semaphore(HTTP_CONCURRENCY)

group_index = {}
group_lock = asyncio.Lock()
counter_lock = asyncio.Lock()
processed = 0


# -----------------------------
# Utils
# -----------------------------
def extract_reg_date(img_url):
    try:
        path = img_url[:10]
        y, m, d = path.split("-")[:3]
        return datetime.strptime(f"{y}-{m}-{d}", "%Y-%m-%d").date()
    except:
        return None
        
async def get_image_hash(session, url):
    def _fallback_image_urls(u: str) -> list[str]:
        u = str(u or "").strip()
        if not u:
            return []
        out: list[str] = []

        def add(x: str):
            x = str(x or "").strip()
            if x and x != u and x not in out:
                out.append(x)

        # scheme swap
        if u.startswith("http://"):
            add("https://" + u[len("http://") :])
        elif u.startswith("https://"):
            add("http://" + u[len("https://") :])

        try:
            p = urlparse(u)
            host = (p.netloc or "").lower()
            # zentrade mirror
            if host == "zentrade.hgodo.com":
                add(urlunparse(p._replace(netloc="zentrade2.hgodo.com")))
                add(urlunparse(p._replace(netloc="zentrade2.hgodo.com", scheme="https")))
            elif host == "zentrade2.hgodo.com":
                add(urlunparse(p._replace(netloc="zentrade.hgodo.com")))
                add(urlunparse(p._replace(netloc="zentrade.hgodo.com", scheme="https")))
            # esmplus는 https에서만 열리는 케이스가 있어 보정
            if host == "gi.esmplus.com" and p.scheme == "http":
                add(urlunparse(p._replace(scheme="https")))
        except Exception:
            pass

        return out

    async def _fetch_bytes(u: str):
        async with http_sem:
            async with session.get(u, timeout=aiohttp.ClientTimeout(total=8)) as r:
                if r.status != 200:
                    return None, int(r.status)
                return await r.read(), 200

    url_s = str(url or "").strip()
    if not url_s:
        return None, None

    tried: set[str] = set()
    for u in [url_s] + _fallback_image_urls(url_s):
        if not u or u in tried:
            continue
        tried.add(u)
        try:
            content, status = await _fetch_bytes(u)
            if not content:
                continue

            bio = BytesIO(content)
            img = Image.open(bio)
            img.verify()
            bio.seek(0)
            img = Image.open(bio).convert("RGB")

            return imagehash.phash(img), imagehash.dhash(img)
        except (
            aiohttp.ClientError,
            asyncio.TimeoutError,
            UnidentifiedImageError,
            OSError,
        ):
            continue
        except Exception:
            continue

    return None, None


# -----------------------------
# DB preload
# -----------------------------
async def load_group_index(pool):
    async with pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT group_id, group_key
            FROM mlinkdw.shopprod_group2
        """)
        for r in rows:
            ph, dh = r["group_key"].split("_")
            group_index[(ph, dh)] = r["group_id"]


# -----------------------------
# HTTP / image stage (병렬)
# -----------------------------
async def process_row_http(session, row):
    vender_code  = row["vender_code"]
    vender_grade = row["vender_grade"]
    icode        = row["icode"]
    iname        = row["iname"]
    price        = row["price"]
    img          = row["img"]
    reg_datetime = row["up_datetime"] if row["up_datetime"] else row["reg_datetime"]

    reg_date = extract_reg_date(reg_datetime)
    phash, dhash = None, None
    if img:
        phash, dhash = await get_image_hash(session, img)

    return {
        "vender_code": vender_code,
        "vender_grade": vender_grade, 
        "icode": icode,
        "iname": iname,
        "price": price,
        "img": img,
        "reg_date": reg_date,
        "phash": phash,
        "dhash": dhash,
        "content": row["content"],
    }


# -----------------------------
# main
# -----------------------------
async def main():
    global processed

    from db_config import DB_INFO_ASYNCPG
    pool = await asyncpg.create_pool(
        **DB_INFO_ASYNCPG,
        min_size=5,
        max_size=DB_POOL_SIZE
    )

    if PRELOAD_GROUP_INDEX:
        await load_group_index(pool)

    SQL = """
        SELECT
            coalesce(a.vender_code, 'ZEN') as vender_code,
            16 AS vender_grade,
            a.icode,
            a.iname,
            a.price,
            (a.img::jsonb) ->> 0 AS img,
            a.reg_datetime,
            a.up_datetime,
            a.content 
         FROM mlinkdw.shopprodinfo_zentrade a
        WHERE a.status='0'
          AND is_overseas = '0'
          AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprod_group_map2 c
                WHERE c.icode = a.icode
          )
          AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprodinfo_unmaster d
                WHERE d.vender_code = coalesce(a.vender_code, 'ZEN')
                  AND d.icode = a.icode
          )
        LIMIT $1
    """

    timeout = aiohttp.ClientTimeout(total=10)

    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with pool.acquire() as conn:
            # 세션 식별(운영에서 pg_stat_activity/pg_stat_statements로 원인 추적 쉽게)
            try:
                await conn.execute("SET application_name = 'Search_DomeDuplicate_4stZent'")
            except Exception:
                pass

            while True:
                async with conn.transaction():  # LIMIT = 1 commit
                    rows = await conn.fetch(SQL, FETCH_SIZE)
                    print(f"[INFO] batch start, size={len(rows)}")

                    if not rows:
                        print("[INFO] no more rows, batch finished")
                        break

                    # 1️HTTP / image 병렬 처리
                    results = await asyncio.gather(
                        *(process_row_http(session, r) for r in rows),
                        return_exceptions=True
                    )

                    # 2DB 작업은 반드시 직렬
                    for r in results:
                        if isinstance(r, Exception):
                            continue

                        if r["phash"] is None or r["dhash"] is None:
                            await conn.execute("""
                                INSERT INTO mlinkdw.shopprodinfo_unmaster
                                (icode, representative_img, iname, price,
                                 vender_code, vender_grade, reg_date)
                                VALUES ($1,$2,$3,$4,$5,$6,$7)
                                ON CONFLICT (vender_code,icode)
                                DO UPDATE SET
                                    representative_img = EXCLUDED.representative_img,
                                    iname = EXCLUDED.iname,
                                    price = EXCLUDED.price,
                                    vender_grade = EXCLUDED.vender_grade,
                                    reg_date = EXCLUDED.reg_date,
                                    created_at = CURRENT_TIMESTAMP
                            """,
                                r["icode"], r["img"], r["iname"], r["price"],
                                r["vender_code"], r["vender_grade"], r["reg_date"]
                            )
                        else:
                            key = (str(r["phash"]), str(r["dhash"]))

                            async with group_lock:
                                # 캐시 hit라도 dome_group_match에서 DB 실존 검증/무효화 가능하도록
                                group_id = await find_or_create_group_id(
                                    conn,
                                    group_index,
                                    r["phash"],
                                    r["dhash"],
                                    threshold=8,
                                    dhash_threshold=10,
                                    iname=r["iname"],
                                    iname_threshold=30,
                                    fallback_iname_tokens=True,
                                    fallback_content_html=r.get("content"),
                                    fallback_iname_dome_code='ZEN',
                                    fallback_content_min_matches=1,
                                    fallback_content_min_ratio=1.0,
                                    current_img_url=r["img"],
                                    color_check=False,
                                    current_vender_code=r["vender_code"],
                                    current_icode=r["icode"],
                                    current_price=r["price"],
                                    current_vender_grade=r.get("vender_grade"),
                                    current_reg_date=r.get("reg_date"),
                                    current_dome_code='ZEN',
                                    current_content_html=r.get("content"),
                                    verify_gid_exists=False,
                                    maintain_winner_tokens_on_existing_group=True,
                                    merge_moved_tokens_on_existing_group=True,
                                )

                            insert_sql = """
                                INSERT INTO mlinkdw.shopprod_group_map2
                                (group_id, vender_code, icode, iname,
                                 price, img_url, reg_date, vender_grade,content,dome_code)
                                VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
                                ON CONFLICT DO NOTHING
                            """
                            try:
                                await conn.execute(
                                    insert_sql,
                                    group_id,
                                    r["vender_code"],
                                    r["icode"],
                                    r["iname"],
                                    r["price"],
                                    r["img"],
                                    r["reg_date"],
                                    str(r["vender_grade"]),
                                    r["content"],
                                    'ZEN',
                                )
                            except asyncpg.exceptions.ForeignKeyViolationError:
                                async with group_lock:
                                    group_index.pop(key, None)
                                    group_id = await find_or_create_group_id(
                                        conn,
                                        group_index,
                                        r["phash"],
                                        r["dhash"],
                                        threshold=8,
                                        dhash_threshold=10,
                                        iname=r["iname"],
                                        iname_threshold=30,
                                        fallback_iname_tokens=True,
                                        fallback_content_html=r.get("content"),
                                        fallback_iname_dome_code='ZEN',
                                        fallback_content_min_matches=1,
                                        fallback_content_min_ratio=1.0,
                                        current_img_url=r["img"],
                                        color_check=False,
                                        current_vender_code=r["vender_code"],
                                        current_icode=r["icode"],
                                        current_price=r["price"],
                                        current_vender_grade=r.get("vender_grade"),
                                        current_reg_date=r.get("reg_date"),
                                        current_dome_code='ZEN',
                                        current_content_html=r.get("content"),
                                        verify_gid_exists=False,
                                        maintain_winner_tokens_on_existing_group=True,
                                        merge_moved_tokens_on_existing_group=True,
                                    )
                                await conn.execute(
                                    insert_sql,
                                    group_id,
                                    r["vender_code"],
                                    r["icode"],
                                    r["iname"],
                                    r["price"],
                                    r["img"],
                                    r["reg_date"],
                                    str(r["vender_grade"]),
                                    r["content"],
                                    'ZEN',
                                )

                        async with counter_lock:
                            processed += 1
                            if processed % 50 == 0:
                                print(f"[INFO] processed={processed}, groups={len(group_index)}")

                    print(f"[INFO] batch committed, size={len(rows)}")

    await pool.close()


if __name__ == "__main__":
    asyncio.run(main())
