# -*- coding: utf-8 -*-
from __future__ import annotations
import asyncio
try:
    import asyncpg  # type: ignore
except Exception:  # pragma: no cover
    asyncpg = None  # type: ignore
try:
    import aiohttp  # type: ignore
except Exception:  # pragma: no cover
    aiohttp = None  # type: ignore

import os
import re
import html as _html
from urllib.parse import urlparse, unquote
from typing import List

FETCH_SIZE = 30000
DB_POOL_SIZE = 20

counter_lock = asyncio.Lock()
processed = 0
MAX_CONTENT_TOKENS_PER_GROUP = 8

# 너무 흔한(배송/공지/안내) 이미지 파일명은 토큰으로 쓰면 후보군이 과도해져서 제외한다.
# - lower/unquote/basename 적용 후 비교됨
STOP_IMG_FILENAMES: set[str] = {
    "배송공지.jpg",
    "delivery.jpg",
    "notice.jpg",
    "noti.jpg",
    "top_open_dome.jpg",
    "down_open_dome.jpg",
    "dome_bottom.jpg",
    "vender_top.jpg",
    "top.jpg",
    "title.jpg",
    "info-title.jpg",
    "info.jpg",
    "information.jpg",
    "product_info_bottom.jpg",
    "info_issue.jpg",
    "도매의신.jpg",
    "dome.jpg",
    # derived from shopprod_group_content_token_202602101706.txt (high-frequency template images)
    "privacy_policy.jpg",
    "lawprivacy.jpg",
    "bslogo-on.jpg",
    "sllogo.jpg",
    "info-dome.jpg",
    "info-slmall.jpg",
    "all_top_img.jpg",
    "openmarket_top_oc.jpg",
    "openmarket_bottom_oc.jpg",
    "outletbanner.jpg",
    "coatingbottom.jpg",
    "acftop.jpg",
    "acfbottom.jpg",
    "foodnine_parcel_info_001.jpg",
    "20231214165047_product_bottom.jpg",
}
STOP_IMG_STEMS: set[str] = {
    "notice",
    # 너무 범용적인 파일명 stem은 제외
    "img",
    "image",
    "photo",
    "pic",
    "logo",
    "banner",
    # derived from shopprod_group_content_token_202602101706.txt (high-frequency stems)
    "privacy_policy",
    "lawprivacy",
    "info-dome",
    "info-slmall",
    "bslogo-on",
    "sllogo",
    "all_top_img",
    "openmarket_top_oc",
    "openmarket_bottom_oc",
    "outletbanner",
    "coatingbottom",
    "acftop",
    "acfbottom",
    "foodnine_parcel_info_001",
    "20231214165047_product_bottom",
}
STOP_IMG_STEM_PREFIXES: set[str] = {
    "top",
    "bottom",
    "title",
    "noti",
    "intro",
    "dome",
    "도매의신",
    "무료배송",
    "개인정보",
    "교환반품",
    "안내서",
    "공지",
    "도매신",
}
STOP_IMG_STEM_SUBSTRINGS: set[str] = {
    "배송공지",
    "delivery",
    "notice",
    "top_open_dome",
    "down_open_dome",
    "dome_bottom",
    "vender_top",
    "info-title",
    "information",
    "product_info_bottom",
    "info_issue",
    "도매의신",
    "정품인증",
    "스튜디오",
    # derived from shopprod_group_content_token_202602101706.txt (template keyword substrings)
    "privacy",
    "policy",
    "return",
    "exchange",
    "refund",
    "parcel_info",
    "product_bottom",
}

def _extract_img_urls_from_html(html_text: str | None) -> List[str]:
    raw = str(html_text or "")
    # DB/JSON 등을 거치며 \" 형태로 이스케이프된 HTML이 들어오는 케이스 대응
    # - 예: <img src=\"http://...jpg\" ...>
    raw = raw.replace('\\"', '"').replace("\\'", "'")
    # HTML 엔티티(&quot; 등)도 정리
    raw = _html.unescape(raw)
    # <img ...> 태그를 먼저 뽑고, 속성(src/data-src/srcset 등)을 안전하게 추출한다.
    attr_names = ("src", "data-src", "data-original", "data-lazy-src", "data-zoom-image", "data-url")
    urls: List[str] = []
    for m in re.finditer(r"<img\b[^>]*>", raw, flags=re.IGNORECASE):
        tag = m.group(0) or ""
        # 1) 우선순위 속성들
        for an in attr_names:
            mm = re.search(
                rf"""\b{re.escape(an)}\s*=\s*(?:(["'])(.*?)\1|([^'"\s>]+))""",
                tag,
                flags=re.IGNORECASE | re.DOTALL,
            )
            if mm:
                u = (mm.group(2) or mm.group(3) or "").strip()
                if u:
                    urls.append(u)
                break
        # 2) srcset (첫 URL만)
        mm2 = re.search(
            r"""\bsrcset\s*=\s*(?:(["'])(.*?)\1|([^'"\s>]+))""",
            tag,
            flags=re.IGNORECASE | re.DOTALL,
        )
        if mm2:
            ss = (mm2.group(2) or mm2.group(3) or "").strip()
            if ss:
                first = ss.split(",")[0].strip()
                first = first.split(" ")[0].strip()
                if first:
                    urls.append(first)

        # 깨진 HTML 방어:
        # - 예: <img src="https:<img src="https://ai.esmplus.com/...jpg">
        #   속성 파싱이 깨져도 태그 문자열 안에 실제 URL이 존재할 수 있다.
        for u2 in re.findall(r"""https?://[^\s<>"']+""", tag, flags=re.IGNORECASE):
            u2 = str(u2).strip()
            if u2:
                urls.append(u2)

    # <img> 태그가 없는 케이스: 본문에 URL이 단독으로 들어오는 경우를 보완적으로 허용
    if not urls:
        for u in re.findall(
            r"""https?://[^\s<>"']+\.(?:jpg|jpeg|png|gif|webp|bmp|svg)(?:\?[^\s<>"']*)?""",
            raw,
            flags=re.IGNORECASE,
        ):
            urls.append(str(u).strip())

    # 스키마/상대경로/깨진 URL 최소 보정은 normalize 단계에서 처리한다.
    seen = set()
    out: List[str] = []
    for u in urls:
        if u in seen:
            continue
        seen.add(u)
        out.append(u)

    return out


def _normalize_img_url_token(u: str) -> str | None:
    """
    content_token용 이미지 URL 정규화.
    - query/fragment 제거(추적 파라미터 등으로 토큰 분산 방지)
    - scheme/host는 소문자
    - path는 unquote 후 소문자(대부분 호스팅은 케이스 무시/또는 동일 컨텐츠로 운영되는 경우가 많아 매칭 우선)
    """
    try:
        # Postgres btree index entry has a hard size limit (~8KB).
        # Very long URL paths can crash inserts with:
        #   asyncpg.exceptions.ProgramLimitExceededError: index row requires ... maximum size is 8191
        MAX_LEN = 512
        raw = str(u).strip()
        if not raw:
            return None
        # scheme-less URL: //cdn.example.com/x.jpg
        if raw.startswith("//"):
            raw = "https:" + raw
        if raw.startswith("http:////"):
            raw = "http://" + raw[len("http:////") :]
        if raw.startswith("https:////"):
            raw = "https://" + raw[len("https:////") :]
        if raw.startswith("http:/") and (not raw.startswith("http://")):
            raw = "http://" + raw[len("http:/") :]
        if raw.startswith("https:/") and (not raw.startswith("https://")):
            raw = "https://" + raw[len("https:/") :]
        # 상대경로도 토큰으로 허용(도메인 정보가 없어도 동일 콘텐츠끼리 매칭 가능)
        if raw.startswith("/"):
            path = unquote(raw).strip().lower()
            tok = f"rel:{path}"
            return tok if len(tok) <= MAX_LEN else None
        if raw.startswith("./") or raw.startswith("../"):
            path = unquote(raw).strip().lower()
            tok = f"rel:{path}"
            return tok if len(tok) <= MAX_LEN else None
        # leading slash가 없는 상대경로도 일부 존재 → rel:로 허용(단, scheme 없는 경우만)
        if (not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*:", raw)) and ("/" in raw):
            path = unquote(raw).strip().lower()
            tok = f"rel:{path}"
            return tok if len(tok) <= MAX_LEN else None
        p = urlparse(raw)
        if p.scheme not in ("http", "https"):
            return None
        netloc = (p.netloc or "").strip().lower()
        if not netloc:
            return None
        path = unquote(p.path or "").strip()
        if not path:
            return None
        # URL path 대소문자 변형 대응(매칭 우선)
        path = path.lower()
        # http/https는 동일 리소스로 취급(토큰 분산 방지): scheme을 https로 통일
        tok = f"https://{netloc}{path}"
        return tok if len(tok) <= MAX_LEN else None
    except Exception:
        return None


def _extract_img_url_tokens_from_html(
    html_text: str | None,
) -> List[str]:
    urls = _extract_img_urls_from_html(html_text)
    if not urls:
        return []
    out: List[str] = []
    seen: set[str] = set()
    for u in urls:
        try:
            p = urlparse(str(u))
            fn = os.path.basename(p.path or "")
            fn = unquote(fn or "").strip().lower()
        except Exception:
            continue
        if not fn:
            continue
        stem = fn.rsplit(".", 1)[0].strip()
        # 너무 흔한 파일명은 제외(정확히 일치)
        if fn in STOP_IMG_FILENAMES:
            continue
        # STOP_IMG_FILENAMES 변형(문자 포함)도 제외: 예) steadylady_notice.jpg, NOTICE_end.jpg ...
        if any(sf in fn for sf in STOP_IMG_FILENAMES):
            continue
        if stem in STOP_IMG_STEMS:
            continue
        # 너무 흔한 파일명이 "포함"된 변형도 제외
        if any(stem.startswith(p) for p in STOP_IMG_STEM_PREFIXES):
            continue
        if any(p in stem for p in STOP_IMG_STEM_SUBSTRINGS):
            continue
        tok = _normalize_img_url_token(str(u))
        if not tok:
            continue
        if tok in seen:
            continue
        seen.add(tok)
        out.append(tok)
        if len(out) >= int(MAX_CONTENT_TOKENS_PER_GROUP):
            break

    return out

def _row_to_content_tokens(row):
    group_id           = row["group_id"]
    winner_vender_code = row["winner_vender_code"]
    winner_icode       = row["winner_icode"]
    content_html       = row["content"]
    dome_code          = row["dome_code"]

    fns = _extract_img_url_tokens_from_html(content_html)

    return {
        "group_id": group_id,
        "winner_vender_code": winner_vender_code,
        "winner_icode": winner_icode,
        "fns": fns,
        "dome_code": dome_code,
    }

# -----------------------------
# main
# -----------------------------
async def main():
    global processed
    if asyncpg is None or aiohttp is None:
        raise ImportError("Z_Batch_Create_Content_Token.py 실행에는 asyncpg/aiohttp 설치가 필요합니다.")
    from db_config import DB_INFO_ASYNCPG
    pool = await asyncpg.create_pool(
        **DB_INFO_ASYNCPG,
        min_size=5,
        max_size=DB_POOL_SIZE
    )

    SQL = """
            SELECT 
                g.group_id,
                g.winner_vender_code,
                g.winner_icode,
                gm.content,
                gm.dome_code
            FROM mlinkdw.shopprod_group2 g
            JOIN mlinkdw.shopprod_group_map2 gm ON gm.group_id = g.group_id AND gm.vender_code = g.winner_vender_code AND gm.icode = g.winner_icode
            WHERE g.group_id > $2
              and NOT EXISTS (
                    SELECT 1 FROM mlinkdw.shopprod_group_content_token m
                    WHERE m.group_id = g.group_id
                      AND m.vender_code = g.winner_vender_code
                      AND m.icode = g.winner_icode
                    )
            ORDER BY g.group_id
            LIMIT $1
        """

    timeout = aiohttp.ClientTimeout(total=10)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with pool.acquire() as conn:

            last_gid = 0
            while True:
                async with conn.transaction():
                    rows = await conn.fetch(SQL, int(FETCH_SIZE), int(last_gid))
                    print(f"[INFO] batch start, size={len(rows)}")
                    if not rows:
                        break
                    last_gid = int(rows[-1]["group_id"])

                    # HTML 파싱만 수행하므로 asyncio.gather가 필요 없다(오버헤드만 증가)
                    results = [_row_to_content_tokens(r) for r in rows]

                    insert_sql = """
                        INSERT INTO mlinkdw.shopprod_group_content_token
                        (group_id, vender_code, icode, token, dome_code)
                        VALUES ($1,$2,$3,$4,$5)
                        ON CONFLICT (group_id, vender_code, icode, token)
                        DO UPDATE SET
                            dome_code = EXCLUDED.dome_code
                    """

                    rows_to_insert = []
                    for r in results:
                        fns = r.get("fns") or []
                        for tok in fns:
                            if not tok:
                                continue
                            rows_to_insert.append((
                                r["group_id"],
                                r["winner_vender_code"],
                                r["winner_icode"],
                                str(tok),
                                r["dome_code"],
                            ))

                    if rows_to_insert:
                        await conn.executemany(insert_sql, rows_to_insert)

                    async with counter_lock:
                        processed += 1
                        if processed % 50 == 0:
                            print(f"[INFO] processed={processed}")

                print(f"[INFO] batch committed, size={len(rows)}")

    await pool.close()

if __name__ == "__main__":
    asyncio.run(main())
