# -*- coding: utf-8 -*-
import asyncio
import asyncpg
import aiohttp
from datetime import datetime
from io import BytesIO
from PIL import Image, UnidentifiedImageError
import imagehash
import os

from env_loader import load_env_file

# 로컬 env 파일(프로젝트 루트의 `env`)을 자동 로드한다.
# - PowerShell의 $env:... 설정이 있으면 override=False로 유지(환경변수 우선)
load_env_file("env", override=False)

from dome_group_match import find_or_create_group_id
from dmm_itemview_api import (
    fetch_dmm_item_status as fetch_dmm_item_status_api,
    extract_best_thumb_url,
)
import time

FETCH_SIZE = 200

def _env_int(name: str, default: int) -> int:
    try:
        v = str(os.environ.get(name, "")).strip()
        if not v:
            return int(default)
        return int(float(v))
    except Exception:
        return int(default)


# 1GB급 서버에서 안정적으로 돌리기 위한 기본값(환경변수로 조절 가능)
# ⚠️ 우선순위: 스크립트 export > env 파일 > 기본값
# - dmm_multi_8_process.sh에서 export한 값이 있으면 그것이 우선 적용됩니다.
# - env_loader.py가 override=False로 호출되므로, 이미 환경변수가 있으면 env 파일 값을 무시합니다.
FETCH_SIZE = _env_int("DMM_FETCH_SIZE", 200)
DB_POOL_SIZE = _env_int("DMM_DB_POOL_SIZE", 20)
# 후보군 LIMIT(너무 낮으면 신규 그룹이 과도하게 생기고, 너무 높으면 CPU/DB 부하 증가)
CANDIDATE_LIMIT = _env_int("DMM_CANDIDATE_LIMIT", 30)
# fallback content 검증 시, HTML에서 볼 이미지 개수 상한(네트워크/블로킹 최소화)
FALLBACK_CONTENT_MAX_IMGS = _env_int("DMM_FALLBACK_CONTENT_MAX_IMGS", 6)

# 적용된 값 확인(디버깅/운영 확인용)
print(f"[CONFIG] FETCH_SIZE={FETCH_SIZE}, DB_POOL_SIZE={DB_POOL_SIZE}, CANDIDATE_LIMIT={CANDIDATE_LIMIT}, FALLBACK_CONTENT_MAX_IMGS={FALLBACK_CONTENT_MAX_IMGS}", flush=True)

# 멀티 서버 병렬 실행(샤딩)
# - vender_code는 사용하지 않고 icode 기반으로만 분할한다.
# - 예: total=4, index=0..3
# DMM_SHARD_TOTAL = max(1, _env_int("DMM_SHARD_TOTAL", 1))
# DMM_SHARD_INDEX = _env_int("DMM_SHARD_INDEX", 0)
# if DMM_SHARD_TOTAL > 1:
#     # 음수/범위 밖 방어
#     DMM_SHARD_INDEX = int(DMM_SHARD_INDEX) % int(DMM_SHARD_TOTAL)

# =========================
# SHARD CONFIG
# =========================
DMM_SHARD_TOTAL = int(os.getenv("DMM_SHARD_TOTAL", "1"))
DMM_SHARD_INDEX = int(os.getenv("DMM_SHARD_INDEX", "0"))

if DMM_SHARD_TOTAL < 1:
    DMM_SHARD_TOTAL = 1

if DMM_SHARD_INDEX < 0 or DMM_SHARD_INDEX >= DMM_SHARD_TOTAL:
    raise ValueError("Invalid shard index configuration")

# 성능 옵션:
# - shopprod_group 전체 preload는 DB read + 메모리 사용량이 큼(테이블이 크면 수 GB).
# - 대신 dome_group_match.find_or_create_group_id()가 group_key UNIQUE 인덱스로 exact lookup을 먼저 수행하도록 보강되어,
#   preload 없이도 정확 매칭은 빠르게 처리된다.
PRELOAD_GROUP_INDEX = False

HTTP_CONCURRENCY = _env_int("DMM_HTTP_CONCURRENCY", 32)
http_sem = asyncio.Semaphore(HTTP_CONCURRENCY)

print(f"[SHARD] total={DMM_SHARD_TOTAL}, index={DMM_SHARD_INDEX} HTTP_CONCURRENCY={HTTP_CONCURRENCY}")   
print(f"DMM_DB_POOL_SIZE={DB_POOL_SIZE},DMM_FETCH_SIZE={FETCH_SIZE},DMM_CANDIDATE_LIMIT={CANDIDATE_LIMIT},DMM_FALLBACK_CONTENT_MAX_IMGS={FALLBACK_CONTENT_MAX_IMGS}")

group_index = {}
group_lock = asyncio.Lock()
counter_lock = asyncio.Lock()
processed = 0


# -----------------------------
# Utils
# -----------------------------
def extract_reg_date(img_url):
    try:
        path = img_url[:10]
        y, m, d = path.split("-")[:3]
        return datetime.strptime(f"{y}-{m}-{d}", "%Y-%m-%d").date()
    except:
        return None
        
async def get_image_hash(session, url):
    try:
        async with http_sem:
            async with session.get(
                url,
                timeout=aiohttp.ClientTimeout(total=8)
            ) as r:
                if r.status != 200:
                    return None, None
                content = await r.read()

        bio = BytesIO(content)
        img = Image.open(bio)
        img.verify()
        bio.seek(0)
        img = Image.open(bio).convert("RGB")

        return imagehash.phash(img), imagehash.dhash(img)

    except (
        aiohttp.ClientError,
        asyncio.TimeoutError,   
        UnidentifiedImageError,
        OSError
    ): 
        return None, None


# -----------------------------
# DB preload
# -----------------------------
async def load_group_index(pool):
    async with pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT group_id, group_key
            FROM mlinkdw.shopprod_group2
        """)
        for r in rows:
            ph, dh = r["group_key"].split("_")
            group_index[(ph, dh)] = r["group_id"]


# -----------------------------
# HTTP / image stage (병렬)
# -----------------------------
async def process_row_http(session, row):
    vender_code  = row["vender_code"]
    vender_grade = row["vender_grade"]
    icode        = row["icode"]
    iname        = row["iname"]
    price        = row["price"]
    img          = row["img"]
    reg_datetime = row["up_datetime"] if row["up_datetime"] else row["reg_datetime"]

    reg_date = extract_reg_date(reg_datetime)
    phash, dhash = None, None
    if img:
        phash, dhash = await get_image_hash(session, img)

    return {
        "vender_code": vender_code,
        "vender_grade": vender_grade, 
        "icode": icode,
        "iname": iname,
        "price": price,
        "img": img,
        "reg_date": reg_date,
        "phash": phash,
        "dhash": dhash,
        "content": row["content"],
    }


# -----------------------------
# main
# -----------------------------
async def main():
       
    global processed

    from db_config import DB_INFO_ASYNCPG
    pool = await asyncpg.create_pool(
        **DB_INFO_ASYNCPG,
        min_size=5,
        max_size=DB_POOL_SIZE
    )

    if PRELOAD_GROUP_INDEX:
        await load_group_index(pool)        

    SQL = """
        SELECT
            a.vender_code,
            CASE
                WHEN b.vender_grade ~ '^[0-9]+$'
                    THEN b.vender_grade::int
                ELSE 16
            END AS vender_grade,
            a.icode,
            a.iname,
            a.price,
            (a.img::jsonb) ->> 0 AS img,
            a.reg_datetime,
            a.up_datetime,
            a.content 
        FROM mlinkdw.shopprodinfo_ownerclan a
        JOIN mlinkdw.supplier_info b ON b.dome='OWN' AND b.supplier_code = a.vender_code
        WHERE a.status='0'
          AND is_overseas = '0'
          AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprod_group_map2 c
                WHERE c.icode = a.icode
          )
          AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprodinfo_unmaster d
                WHERE d.vender_code = a.vender_code
                  AND d.icode = a.icode
          )
          {SHARD_CLAUSE}          
        LIMIT $1
    """

    timeout = aiohttp.ClientTimeout(total=10)

    async with aiohttp.ClientSession(timeout=timeout) as session:
                    
        while True:
            # -------------------------
            # 1️⃣ READ STAGE
            # -------------------------
            async with pool.acquire() as read_conn:

                try:
                    await read_conn.execute(
                        "SET application_name = 'Search_DomeDuplicate_4stDomeme_READ'"
                    )
                except Exception:
                    pass

                if int(DMM_SHARD_TOTAL) > 1:
                    shard_clause = """
                    AND (
                        CASE
                        WHEN a.icode ~ '^[0-9]+$' THEN (a.icode::bigint % $2)
                        ELSE (abs(hashtext(a.icode))::bigint % $2)
                        END
                    ) = $3
                    """
                    q = SQL.format(SHARD_CLAUSE=shard_clause)

                    rows = await read_conn.fetch(
                        q,
                        int(FETCH_SIZE),
                        int(DMM_SHARD_TOTAL),
                        int(DMM_SHARD_INDEX),
                    )
                else:
                    q = SQL.format(SHARD_CLAUSE="")
                    rows = await read_conn.fetch(q, int(FETCH_SIZE))

            if not rows:
                print("[INFO] no more rows, batch finished")
                break

            print(f"[INFO] fetched {len(rows)} rows")
            start_time = time.time()
            print(f"[INFO] batch start, size={len(rows)}, time={time.time() - start_time:.2f}s")


            # -------------------------
            # 2️⃣ HTTP STAGE
            # -------------------------
            results = await asyncio.gather(
                *(process_row_http(session, r) for r in rows),
                return_exceptions=True
            )

            # -------------------------
            # 3️⃣ WRITE STAGE
            # -------------------------
            async with pool.acquire() as write_conn:

                try:
                    await write_conn.execute(
                        "SET application_name = 'Search_DomeDuplicate_4stOwner_WRITE'"
                    )
                except Exception:
                    pass

                for r in results:
                    if isinstance(r, Exception) or r is None:
                        continue

                    try:
                        # 1) unmaster 처리(짧은 트랜잭션)
                        if r["phash"] is None or r["dhash"] is None:
                            async with write_conn.transaction():
                                await write_conn.execute(
                                    """
                                    INSERT INTO mlinkdw.shopprodinfo_unmaster
                                    (icode, representative_img, iname, price,
                                     vender_code, vender_grade, reg_date)
                                    VALUES ($1,$2,$3,$4,$5,$6,$7)
                                    ON CONFLICT (vender_code,icode)
                                    DO UPDATE SET
                                        representative_img = EXCLUDED.representative_img,
                                        iname = EXCLUDED.iname,
                                        price = EXCLUDED.price,
                                        vender_grade = EXCLUDED.vender_grade,
                                        reg_date = EXCLUDED.reg_date,
                                        created_at = CURRENT_TIMESTAMP
                                    """,
                                    r["icode"],
                                    r["img"],
                                    r["iname"],
                                    r["price"],
                                    r["vender_code"],
                                    r["vender_grade"],
                                    r["reg_date"],
                                )
                            continue

                        # 2) group_id 계산은 "명시적 트랜잭션 밖"에서 수행
                        # - dome_group_match의 fallback 검증은 requests(동기 HTTP)를 사용할 수 있어,
                        #   트랜잭션을 열린 채로 네트워크를 기다리면 idle-in-transaction 문제가 재발할 수 있음.
                        key = (str(r["phash"]), str(r["dhash"]))
                        async with group_lock:
                            group_id = await find_or_create_group_id(
                                write_conn,
                                group_index,
                                r["phash"],
                                r["dhash"],
                                threshold=8,
                                dhash_threshold=10,
                                candidate_limit=int(CANDIDATE_LIMIT),
                                iname=r["iname"],
                                iname_threshold=30,
                                fallback_iname_tokens=True,
                                fallback_content_html=r.get("content"),
                                fallback_content_max_imgs=int(FALLBACK_CONTENT_MAX_IMGS),
                                fallback_content_min_matches=1,
                                fallback_content_min_ratio=0.80,
                                current_img_url=r["img"],
                                color_check=False,
                                current_vender_code=r["vender_code"],
                                current_icode=r["icode"],
                                current_price=r["price"],
                                current_vender_grade=r.get("vender_grade"),
                                current_reg_date=r.get("reg_date"),
                                current_dome_code="OWN",
                                verify_gid_exists=False,
                                maintain_winner_tokens_on_existing_group=True,
                                merge_moved_tokens_on_existing_group=True,
                            )

                        # 3) group_map insert는 짧은 트랜잭션
                        insert_sql = """
                            INSERT INTO mlinkdw.shopprod_group_map2
                            (group_id, vender_code, icode, iname,
                             price, img_url, reg_date, vender_grade, content, dome_code)
                            VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
                            ON CONFLICT DO NOTHING
                        """
                        try:
                            async with write_conn.transaction():
                                await write_conn.execute(
                                    insert_sql,
                                    group_id,
                                    r["vender_code"],
                                    r["icode"],
                                    r["iname"],
                                    r["price"],
                                    r["img"],
                                    r["reg_date"],
                                    str(r["vender_grade"]),
                                    r.get("content"),
                                    "OWN",
                                )
                        except asyncpg.exceptions.ForeignKeyViolationError:
                            # cache에만 있고 DB에 없는 group_id(롤백/경쟁상황)면 cache 제거 후 재계산 후 재시도
                            async with group_lock:
                                group_index.pop(key, None)
                                group_id = await find_or_create_group_id(
                                    write_conn,
                                    group_index,
                                    r["phash"],
                                    r["dhash"],
                                    threshold=8,
                                    dhash_threshold=10,
                                    candidate_limit=int(CANDIDATE_LIMIT),
                                    iname=r["iname"],
                                    iname_threshold=30,
                                    fallback_iname_tokens=True,
                                    fallback_content_html=r.get("content"),
                                    fallback_content_max_imgs=int(FALLBACK_CONTENT_MAX_IMGS),
                                    fallback_content_min_matches=1,
                                    fallback_content_min_ratio=0.80,
                                    current_img_url=r["img"],
                                    color_check=False,
                                    current_vender_code=r["vender_code"],
                                    current_icode=r["icode"],
                                    current_price=r["price"],
                                    current_vender_grade=r.get("vender_grade"),
                                    current_reg_date=r.get("reg_date"),
                                    current_dome_code="OWN",
                                    verify_gid_exists=False,
                                    maintain_winner_tokens_on_existing_group=True,
                                    merge_moved_tokens_on_existing_group=True,
                                )
                            async with write_conn.transaction():
                                await write_conn.execute(
                                    insert_sql,
                                    group_id,
                                    r["vender_code"],
                                    r["icode"],
                                    r["iname"],
                                    r["price"],
                                    r["img"],
                                    r["reg_date"],
                                    str(r["vender_grade"]),
                                    r.get("content"),
                                    "OWN",
                                )
                    except Exception as e:
                        if os.environ.get("DOME_OWN_DB_DEBUG", ""):
                            try:
                                print(
                                    f"[DB_DEBUG] row failed: vender={r.get('vender_code')} icode={r.get('icode')} err={type(e).__name__}: {e}",
                                    flush=True,
                                )
                            except Exception:
                                pass
                        # 해당 row는 롤백되고 다음 row로 진행
                        continue

            print(f"[INFO] batch committed: {len(rows)} rows")
            async with counter_lock:
                processed += 1
                if processed % 200 == 0:
                    print(f"[INFO] processed={processed}, groups={len(group_index)}")

            print(f"[INFO] batch committed, size={len(rows)}, time={time.time() - start_time:.2f}s")

    await pool.close()
    
if __name__ == "__main__":
    asyncio.run(main())
