# -*- coding: utf-8 -*-
import asyncio
import asyncpg
import requests
from datetime import datetime
from io import BytesIO
from PIL import Image, UnidentifiedImageError
import imagehash
from concurrent.futures import ThreadPoolExecutor

from dome_group_match import find_or_create_group_id

# -----------------------------
# CONFIG
# -----------------------------
FETCH_SIZE = 100
DB_POOL_SIZE = 20
HTTP_WORKERS = 32   # requests 동시 실행 수

# 성능 옵션:
# - shopprod_group 전체 preload는 DB read + 메모리 사용량이 큼(테이블이 크면 수 GB).
# - 대신 dome_group_match.find_or_create_group_id()가 group_key UNIQUE 인덱스로 exact lookup을 먼저 수행하도록 보강되어,
#   preload 없이도 정확 매칭은 빠르게 처리된다.
PRELOAD_GROUP_INDEX = False

# -----------------------------
# GLOBAL
# -----------------------------
group_index = {}
group_lock = asyncio.Lock()
counter_lock = asyncio.Lock()
processed = 0

executor = ThreadPoolExecutor(max_workers=HTTP_WORKERS)

REQ_HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0.0.0 Safari/537.36"
    ),
    "Accept": "image/*,*/*;q=0.8",
    "Connection": "close",
}

# -----------------------------
# Utils
# -----------------------------
def extract_reg_date(img_url):
    try:
        path = img_url[:10]
        y, m, d = path.split("-")[:3]
        return datetime.strptime(f"{y}-{m}-{d}", "%Y-%m-%d").date()
    except:
        return None


# -----------------------------
# HTTP (requests, sync)
# -----------------------------
def get_image_hash_requests(url):
    if not url or not isinstance(url, str) or url.strip() == "":
        return None, None

    try:
        r = requests.get(
            url,
            headers=REQ_HEADERS,
            timeout=(5, 10)
        )
        if r.status_code != 200:
            return None, None

        bio = BytesIO(r.content)
        img = Image.open(bio)
        img.verify()

        bio.seek(0)
        img = Image.open(bio).convert("RGB")

        return imagehash.phash(img), imagehash.dhash(img)

    except Exception:
        return None, None


# -----------------------------
# async wrapper
# -----------------------------
async def get_image_hash(url):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        executor, get_image_hash_requests, url
    )

import re

CONTENT_IMG_PATTERN = re.compile(
    r'https?://[^\s"\']+\.(?:jpg|jpeg|png|gif|webp)',
    re.IGNORECASE
)

import socket
from urllib.parse import urlparse

def content_has_invalid_image(content_html):
    """
    - content 이미지 1개만 검사
    - 404 / 410 일 때만 invalid 처리
    - 네트워크 오류/timeout은 정상으로 간주 (대량 unmaster 방지)
    """ 
    
    if not content_html:
        return False

    urls = CONTENT_IMG_PATTERN.findall(content_html)
    if not urls:
        return False

    # 🔥 옵션 3 : 첫 번째 이미지만 검사
    url = urls[0]
    host = urlparse(url).hostname
    try:
        socket.gethostbyname(host)
    except socket.gaierror: 
        return True

    try:
        r = requests.head(
            url,
            headers=REQ_HEADERS,
            timeout=(3, 5),
            allow_redirects=True
        )

        # 🔥 404 / 410만 unmaster
        if r.status_code in (404, 410):
            return True

        return False

    except requests.exceptions.Timeout:
        # 🔥 timeout은 무시
        return False
    except requests.exceptions.ConnectionError as e:
        # 🔥 네트워크 오류 무시
        return False
    except Exception as e2:
        # 예외 발생해도 unmaster 하지 않음
        return False


async def check_content_images(content_html):
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(
        executor,
        content_has_invalid_image,
        content_html
    )

# -----------------------------
# DB preload
# -----------------------------
async def load_group_index(pool):
    async with pool.acquire() as conn:
        rows = await conn.fetch("""
            SELECT group_id, group_key
            FROM mlinkdw.shopprod_group2
        """)
        for r in rows:
            ph, dh = r["group_key"].split("_")
            group_index[(ph, dh)] = r["group_id"]


# -----------------------------
# main
# -----------------------------
async def main():
    global processed

    from db_config import DB_INFO_ASYNCPG
    pool = await asyncpg.create_pool(
        **DB_INFO_ASYNCPG,
        min_size=5,
        max_size=DB_POOL_SIZE
    )

    if PRELOAD_GROUP_INDEX:
        await load_group_index(pool)

    SQL = """
        SELECT
            coalesce(a.vender_code, 'Chango') as vender_code,
            16 AS vender_grade,
            a.icode,
            a.iname,
            a.price,
            (a.img::jsonb) ->> 0 AS img,
            a.reg_datetime,
            a.up_datetime,
            a.content 
        FROM mlinkdw.shopprodinfo_domechanggo a
        WHERE a.status='0'
          AND is_overseas = '0'
          AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprod_group_map2 c
                WHERE c.icode = a.icode
          )
          AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprodinfo_unmaster d
                WHERE d.vender_code = coalesce(a.vender_code, 'Chango')
                  AND d.icode = a.icode
          )  
        LIMIT $1
    """

    async with pool.acquire() as conn:
        # 세션 식별(운영에서 pg_stat_activity/pg_stat_statements로 원인 추적 쉽게)
        try:
            await conn.execute("SET application_name = 'Search_DomeDuplicate_4stChanggo'")
        except Exception:
            pass
        while True:
            async with conn.transaction():  # 🔥 LIMIT 단위 commit
                rows = await conn.fetch(SQL, FETCH_SIZE)
                print(f"[INFO] batch start, size={len(rows)}")

                if not rows:
                    print("[INFO] no more rows, batch finished")
                    break

                # 1️⃣ HTTP 병렬
                hash_tasks = [get_image_hash(r["img"]) for r in rows]
                content_tasks = [check_content_images(r["content"]) for r in rows]

                hash_results = await asyncio.gather(*hash_tasks)
                content_invalid_results = await asyncio.gather(*content_tasks)

                # 2️⃣ DB 직렬 처리
                for i, (row, (phash, dhash)) in enumerate(zip(rows, hash_results)):

                    vender_code  = row["vender_code"]
                    vender_grade = row["vender_grade"]
                    icode        = row["icode"]
                    iname        = row["iname"]
                    price        = row["price"]
                    img          = row["img"]
                    reg_datetime = row["up_datetime"] if row["up_datetime"] else row["reg_datetime"]
                    reg_date     = extract_reg_date(reg_datetime)
                    content      = row["content"]
                    
                    if (
                        phash is None
                        or dhash is None
                        or content_invalid_results[i]  # 🔥 content 404/410
                    ):
                        await conn.execute("""
                            INSERT INTO mlinkdw.shopprodinfo_unmaster
                            (icode, representative_img, iname, price,
                             vender_code, vender_grade, reg_date)
                            VALUES ($1,$2,$3,$4,$5,$6,$7)
                            ON CONFLICT (vender_code,icode)
                            DO UPDATE SET
                                representative_img = EXCLUDED.representative_img,
                                iname = EXCLUDED.iname,
                                price = EXCLUDED.price,
                                vender_grade = EXCLUDED.vender_grade,
                                reg_date = EXCLUDED.reg_date,
                                created_at = CURRENT_TIMESTAMP
                        """,
                            icode, img, iname, price,
                            vender_code, vender_grade, reg_date
                        )
                        continue

                    key = (str(phash), str(dhash))
                    async with group_lock:
                        # 캐시 hit라도 dome_group_match에서 DB 실존 검증/무효화 가능하도록
                        group_id = await find_or_create_group_id(
                            conn,
                            group_index,
                            phash,
                            dhash,
                            threshold=8,
                            dhash_threshold=10,
                            iname=iname,
                            iname_threshold=30,
                            fallback_iname_tokens=True,
                            fallback_content_html=content,
                            fallback_iname_dome_code='CHG',
                            fallback_content_min_matches=1,
                            fallback_content_min_ratio=0.8,
                            current_img_url=img,
                            color_check=False,
                            current_vender_code=vender_code,
                            current_icode=icode,
                            current_price=price,
                            current_vender_grade=vender_grade,
                            current_reg_date=reg_date,
                            current_dome_code='CHG',
                            current_content_html=content,
                            verify_gid_exists=False,
                            maintain_winner_tokens_on_existing_group=True,
                            merge_moved_tokens_on_existing_group=True,
                        )

                    insert_sql = """
                        INSERT INTO mlinkdw.shopprod_group_map2
                        (group_id, vender_code, icode, iname,
                         price, img_url, reg_date, vender_grade,content,dome_code)
                        VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
                        ON CONFLICT DO NOTHING
                    """
                    try:
                        await conn.execute(
                            insert_sql,
                            group_id, vender_code, icode,
                            iname, price, img, reg_date, str(vender_grade),
                            content, 'CHG',
                        )
                    except asyncpg.exceptions.ForeignKeyViolationError:
                        # 캐시가 죽은 group_id를 들고 있거나 외부정리/롤백 등으로 group이 사라진 경우: 1회 재시도
                        async with group_lock:
                            group_index.pop(key, None)
                            group_id = await find_or_create_group_id(
                                conn,
                                group_index,
                                phash,
                                dhash,
                                threshold=8,
                                dhash_threshold=10,
                                iname=iname,
                                iname_threshold=30,
                                fallback_iname_tokens=True,
                                fallback_content_html=content,
                                fallback_iname_dome_code='CHG',
                                fallback_content_min_matches=1,
                                fallback_content_min_ratio=0.8,
                                current_img_url=img,
                                color_check=False,
                                current_vender_code=vender_code,
                                current_icode=icode,
                                current_price=price,
                                current_vender_grade=vender_grade,
                                current_reg_date=reg_date,
                                current_dome_code='CHG',
                                current_content_html=content,
                                verify_gid_exists=False,
                                maintain_winner_tokens_on_existing_group=True,
                                merge_moved_tokens_on_existing_group=True,
                            )
                        await conn.execute(
                            insert_sql,
                            group_id, vender_code, icode,
                            iname, price, img, reg_date, str(vender_grade),
                            content, 'CHG',
                        )

                    async with counter_lock:
                        processed += 1
                        if processed % 50 == 0:
                            print(
                                f"[INFO] processed={processed}, "
                                f"groups={len(group_index)}"
                            )

                print(f"[INFO] batch committed, size={len(rows)}")

    await pool.close()
    executor.shutdown(wait=True)


if __name__ == "__main__":
    asyncio.run(main())
