# -*- coding: utf-8 -*-
import asyncio
import asyncpg
import aiohttp
from datetime import datetime
from io import BytesIO
from PIL import Image, UnidentifiedImageError
import imagehash

from dome_group_match import find_or_create_group_id

FETCH_SIZE = 100
DB_POOL_SIZE = 20

# 성능 옵션:
# - shopprod_group 전체 preload는 DB read + 메모리 사용량이 큼(테이블이 크면 수 GB).
# - 대신 dome_group_match.find_or_create_group_id()가 group_key UNIQUE 인덱스로 exact lookup을 먼저 수행하도록 보강되어,
#   preload 없이도 정확 매칭은 빠르게 처리된다.
PRELOAD_GROUP_INDEX = False

HTTP_CONCURRENCY = 64
http_sem = asyncio.Semaphore(HTTP_CONCURRENCY)

group_index = {}
group_lock = asyncio.Lock()
counter_lock = asyncio.Lock()
processed = 0


# -----------------------------
# Utils
# -----------------------------
def extract_reg_date(img_url):
    try:
        path = img_url[:10]
        y, m, d = path.split("-")[:3]
        return datetime.strptime(f"{y}-{m}-{d}", "%Y-%m-%d").date()
    except:
        return None
        
async def get_image_hash(session, url):
    try:
        async with http_sem:
            async with session.get(
                url,
                timeout=aiohttp.ClientTimeout(total=8)
            ) as r:
                if r.status != 200:
                    return None, None
                content = await r.read()

        bio = BytesIO(content)
        img = Image.open(bio)
        img.verify()
        bio.seek(0)
        img = Image.open(bio).convert("RGB")

        return imagehash.phash(img), imagehash.dhash(img)

    except (
        aiohttp.ClientError,
        asyncio.TimeoutError,   
        UnidentifiedImageError,
        OSError
    ): 
        return None, None


# -----------------------------
# DB preload
# -----------------------------
async def load_group_index(pool):
    async with pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT group_id, group_key
            FROM mlinkdw.shopprod_group2
        """)
        for r in rows:
            ph, dh = r["group_key"].split("_")
            group_index[(ph, dh)] = r["group_id"]


# -----------------------------
# HTTP / image stage (병렬)
# -----------------------------
async def process_row_http(session, row):
    vender_code  = row["vender_code"]
    vender_grade = row["vender_grade"]
    icode        = row["icode"]
    iname        = row["iname"]
    price        = row["price"]
    img          = row["img"]
    reg_datetime = row["up_datetime"] if row["up_datetime"] else row["reg_datetime"]

    reg_date = extract_reg_date(reg_datetime)
    phash, dhash = None, None
    if img:
        phash, dhash = await get_image_hash(session, img)

    return {
        "vender_code": vender_code,
        "vender_grade": vender_grade, 
        "icode": icode,
        "iname": iname,
        "price": price,
        "img": img,
        "reg_date": reg_date,
        "phash": phash,
        "dhash": dhash,
        "content": row["content"],
    }


# -----------------------------
# main
# -----------------------------
async def main():
    global processed

    from db_config import DB_INFO_ASYNCPG
    pool = await asyncpg.create_pool(
        **DB_INFO_ASYNCPG,
        min_size=5,
        max_size=DB_POOL_SIZE
    )

    if PRELOAD_GROUP_INDEX:
        await load_group_index(pool)

    SQL = """
        SELECT
            coalesce(a.vender_code, '3MRO') as vender_code,
            16 AS vender_grade,
            a.icode,
            a.iname,
            a.price,
            (a.img::jsonb) ->> 0 AS img, 
            a.reg_datetime,
            a.up_datetime,
            a.content 
         FROM mlinkdw.shopprodinfo_threemro a
        WHERE a.status='0'
          AND is_overseas = '0'
          AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprod_group_map2 c
                WHERE c.icode = a.icode
          )
          AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprodinfo_unmaster d
                WHERE d.vender_code = coalesce(a.vender_code, '3MRO')
                  AND d.icode = a.icode
          )
        LIMIT $1
    """

    timeout = aiohttp.ClientTimeout(total=10)

    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with pool.acquire() as conn:
            # 세션 식별(운영에서 pg_stat_activity/pg_stat_statements로 원인 추적 쉽게)
            try:
                await conn.execute("SET application_name = 'Search_DomeDuplicate_4st3Mro'")
            except Exception:
                pass

            while True:
                async with conn.transaction():  # LIMIT = 1 commit
                    rows = await conn.fetch(SQL, FETCH_SIZE)
                    print(f"[INFO] batch start, size={len(rows)}")

                    if not rows:
                        print("[INFO] no more rows, batch finished")
                        break

                    # 1️HTTP / image 병렬 처리
                    results = await asyncio.gather(
                        *(process_row_http(session, r) for r in rows),
                        return_exceptions=True
                    )

                    # 2DB 작업은 반드시 직렬
                    for r in results:
                        if isinstance(r, Exception):
                            continue

                        if r["phash"] is None or r["dhash"] is None:
                            await conn.execute("""
                                INSERT INTO mlinkdw.shopprodinfo_unmaster
                                (icode, representative_img, iname, price,
                                 vender_code, vender_grade, reg_date)
                                VALUES ($1,$2,$3,$4,$5,$6,$7)
                                ON CONFLICT (vender_code,icode)
                                DO UPDATE SET
                                    representative_img = EXCLUDED.representative_img,
                                    iname = EXCLUDED.iname,
                                    price = EXCLUDED.price,
                                    vender_grade = EXCLUDED.vender_grade,
                                    reg_date = EXCLUDED.reg_date,
                                    created_at = CURRENT_TIMESTAMP
                            """,
                                r["icode"], r["img"], r["iname"], r["price"],
                                r["vender_code"], r["vender_grade"], r["reg_date"]
                            )
                        else:
                            key = (str(r["phash"]), str(r["dhash"]))

                            async with group_lock:
                                # 캐시 hit라도 dome_group_match에서 DB 실존 검증/무효화 가능하도록
                                group_id = await find_or_create_group_id(
                                    conn,
                                    group_index,
                                    r["phash"],
                                    r["dhash"],
                                    threshold=8,
                                    dhash_threshold=10,
                                    iname=r["iname"],
                                    iname_threshold=30,
                                    fallback_iname_tokens=True,
                                    fallback_content_html=r.get("content"),
                                    fallback_iname_dome_code='3MR',
                                    fallback_content_min_matches=1,
                                    fallback_content_min_ratio=1.0,
                                current_img_url=r["img"],
                                color_check=False,
                                    current_vender_code=r["vender_code"],
                                    current_icode=r["icode"],
                                    current_price=r["price"],
                                    current_vender_grade=r.get("vender_grade"),
                                    current_reg_date=r.get("reg_date"),
                                    current_dome_code='3MR',
                                    current_content_html=r.get("content"),
                                    verify_gid_exists=False,
                                    maintain_winner_tokens_on_existing_group=True,
                                    merge_moved_tokens_on_existing_group=True,
                                )

                            insert_sql = """
                                INSERT INTO mlinkdw.shopprod_group_map2
                                (group_id, vender_code, icode, iname,
                                 price, img_url, reg_date, vender_grade,content,dome_code)
                                VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
                                ON CONFLICT DO NOTHING
                            """
                            try:
                                await conn.execute(
                                    insert_sql,
                                    group_id,
                                    r["vender_code"],
                                    r["icode"],
                                    r["iname"],
                                    r["price"],
                                    r["img"],
                                    r["reg_date"],
                                    str(r["vender_grade"]),
                                    r["content"],
                                    '3MR',
                                )
                            except asyncpg.exceptions.ForeignKeyViolationError:
                                async with group_lock:
                                    group_index.pop(key, None)
                                    group_id = await find_or_create_group_id(
                                        conn,
                                        group_index,
                                        r["phash"],
                                        r["dhash"],
                                        threshold=8,
                                        dhash_threshold=10,
                                        iname=r["iname"],
                                        iname_threshold=30,
                                        fallback_iname_tokens=True,
                                        fallback_content_html=r.get("content"),
                                        fallback_iname_dome_code='3MR',
                                        fallback_content_min_matches=1,
                                        fallback_content_min_ratio=1.0,
                                        current_img_url=r["img"],
                                        color_check=False,
                                        current_vender_code=r["vender_code"],
                                        current_icode=r["icode"],
                                        current_price=r["price"],
                                        current_vender_grade=r.get("vender_grade"),
                                        current_reg_date=r.get("reg_date"),
                                        current_dome_code='3MR',
                                        current_content_html=r.get("content"),
                                        verify_gid_exists=False,
                                        maintain_winner_tokens_on_existing_group=True,
                                        merge_moved_tokens_on_existing_group=True,
                                    )
                                await conn.execute(
                                    insert_sql,
                                    group_id,
                                    r["vender_code"],
                                    r["icode"],
                                    r["iname"],
                                    r["price"],
                                    r["img"],
                                    r["reg_date"],
                                    str(r["vender_grade"]),
                                    r["content"],
                                    '3MR',
                                )

                        async with counter_lock:
                            processed += 1
                            if processed % 50 == 0:
                                print(f"[INFO] processed={processed}, groups={len(group_index)}")

                    print(f"[INFO] batch committed, size={len(rows)}")

    await pool.close()


if __name__ == "__main__":
    asyncio.run(main())
