"""
거리 기반(해밍거리 임계치) 이미지 그룹 매칭 유틸.

전제:
- shopprod_group.group_key = "{phash}_{dhash}" (기존과 동일)
- 아래 컬럼이 있으면 성능이 좋아짐(권장, DBA 권한으로 추가 가능):
    - phash_hex TEXT
    - dhash_hex TEXT
    - phash_bands INT4[]  (pHash 256bit를 32bit x 8 band로 나눈 값)

동작:
1) (phash,dhash) 완전일치 group_key가 메모리 인덱스에 있으면 즉시 매칭
2) 없으면 phash_bands overlap 후보를 DB에서 조회
3) 후보들의 phash_hex와 신규 phash의 해밍거리 계산 후, threshold 이내면 해당 그룹 매칭
4) 없으면 신규 그룹 생성
"""

from __future__ import annotations

import difflib
import html as _html
import os
import re
import time
import asyncio
import random
from datetime import datetime
from urllib.parse import urlparse, unquote
from typing import Dict, Tuple, Optional, List, Iterable, Set

import imagehash
from functools import lru_cache

from PIL import Image, ImageOps, ImageFilter  # type: ignore
import requests  # type: ignore

# asyncpg는 UI/일부 환경에서 미설치일 수 있으므로 optional import
try:
    import asyncpg  # type: ignore
except Exception:  # pragma: no cover
    asyncpg = None  # type: ignore

HASH_SIZE = 16         # util 내부 기준 (단, 기존 데이터는 hash_size=8(16 hex)일 수 있음)
BAND_BITS = 32

# content token(파일명/짧은 링크) 추출 시, 너무 흔한 안내/공지 이미지는 후보 폭발을 유발하므로 제외한다.
STOP_IMG_FILENAMES: set[str] = {
    "배송공지.jpg",
    "delivery.jpg",
    "notice.jpg",
    "noti.jpg",
    "top_open_dome.jpg",
    "down_open_dome.jpg",
    "dome_bottom.jpg",
    "vender_top.jpg",
    "top.jpg",
    "title.jpg",
    "info-title.jpg",
    "info.jpg",
    "information.jpg",
    "product_info_bottom.jpg",
    "info_issue.jpg",
    "도매의신.jpg",
    "dome.jpg",
    # derived from shopprod_group_content_token_202602101706.txt (high-frequency template images)
    "privacy_policy.jpg",
    "lawprivacy.jpg",
    "bslogo-on.jpg",
    "sllogo.jpg",
    "info-dome.jpg",
    "info-slmall.jpg",
    "all_top_img.jpg",
    "openmarket_top_oc.jpg",
    "openmarket_bottom_oc.jpg",
    "outletbanner.jpg",
    "coatingbottom.jpg",
    "acftop.jpg",
    "acfbottom.jpg",
    "foodnine_parcel_info_001.jpg",
    "20231214165047_product_bottom.jpg",
}
STOP_IMG_STEMS: set[str] = {
    "notice",
    # 너무 범용적인 파일명 stem은 제외
    "img",
    "image",
    "photo",
    "pic",
    "logo",
    "banner",
    # derived from shopprod_group_content_token_202602101706.txt (high-frequency stems)
    "privacy_policy",
    "lawprivacy",
    "info-dome",
    "info-slmall",
    "bslogo-on",
    "sllogo",
    "all_top_img",
    "openmarket_top_oc",
    "openmarket_bottom_oc",
    "outletbanner",
    "coatingbottom",
    "acftop",
    "acfbottom",
    "foodnine_parcel_info_001",
    "20231214165047_product_bottom", 
}
STOP_IMG_STEM_PREFIXES: set[str] = {
    "top",
    "bottom",
    "title",
    "noti",
    "intro",
    "dome",
    "도매의신",
    "무료배송",
    "개인정보",
    "교환반품",
    "안내서",
    "공지",
    "도매신",
}
STOP_IMG_STEM_SUBSTRINGS: set[str] = {
    "배송공지",
    "delivery",
    "notice",
    "top_open_dome",
    "down_open_dome",
    "dome_bottom",
    "vender_top",
    "info-title",
    "information",
    "product_info_bottom",
    "info_issue",
    "도매의신",
    "정품인증",
    "스튜디오",
    # derived from shopprod_group_content_token_202602101706.txt (template keyword substrings)
    "privacy",
    "policy",
    "return",
    "exchange",
    "refund",
    "parcel_info",
    "product_bottom",
}

# URL path 레벨 stop 규칙:
# - 파일명이 정상이어도(예: info_holicmaker1.jpg) 경로에 "common/..bottom" 같은 템플릿 디렉토리가 있으면
#   여러 상품에서 반복되는 공통 이미지일 확률이 높아 content-token 후보 폭발/오탐 merge를 유발한다.
# - 너무 공격적으로 막지 않기 위해 "common" + ("top"/"bottom") 조합 중심으로 차단.
STOP_IMG_URL_PATH_SUBSTRINGS: set[str] = {
    "/bottom/",
    "/top/",
    ".bottom/",
    ".top/",
}

# 디버그 로그 토글:
# - Windows CMD/PowerShell: set DOME_GROUP_MATCH_DEBUG=1
# - bash: export DOME_GROUP_MATCH_DEBUG=1
_DBG_FALLBACK = str(os.environ.get("DOME_GROUP_MATCH_DEBUG") or "").strip().lower() in ("1", "true", "yes", "y", "on")


def _dbg(msg: str) -> None:
    if not _DBG_FALLBACK:
        return
    try:
        print(f"[dome_group_match] {msg}", flush=True)
    except Exception:
        pass


def _is_low_entropy_hex64(h: str, *, max_unique_nibbles: int = 4) -> bool:
    """
    64-bit hex(16자) 기준으로 '정보량이 낮은' 해시를 간단히 판정.
    - 단색/패턴/기본이미지에서 흔히 나오는 dhash/phash(예: 4d4d4d..., 0c0c0c...)
    - 정확한 판정이 아니라 '오탐 그룹 매칭 방지용 안전장치'로 사용
    """
    if not h:
        return True
    h = str(h).strip().lower()
    if len(h) != 16:
        return False
    if not re.fullmatch(r"[0-9a-f]{16}", h):
        return False
    return len(set(h)) <= int(max_unique_nibbles)


def _norm_iname(s: str | None) -> str:
    if not s:
        return ""
    s = str(s).strip().lower()
    s = re.sub(r"\s+", " ", s)
    return s


def _iname_similarity(a: str | None, b: str | None) -> float:
    """
    0~100 (%) 문자열 유사도. 외부 의존성 없이 difflib 기반.
    """
    a2 = _norm_iname(a)
    b2 = _norm_iname(b)
    if not a2 or not b2:
        return 0.0
    return difflib.SequenceMatcher(None, a2, b2).ratio() * 100.0


def _tokenize_iname(s: str | None, *, min_token_len: int = 4) -> List[str]:
    """
    ownerTodomemeTransfer.py의 토큰 생성 규칙과 유사하게 iname을 토큰화.
    - 한글/영문/숫자/공백만 남기고 나머지는 공백으로 치환
    - 소문자화, 공백 정리
    - 길이 기준 필터
    """
    if not s:
        return []
    s2 = str(s)
    s2 = re.sub(r"[^가-힣a-zA-Z0-9 ]", " ", s2)
    s2 = s2.strip().lower()
    s2 = re.sub(r"\s+", " ", s2)
    toks = [t.strip() for t in s2.split(" ") if t.strip()]
    mt = int(min_token_len)
    toks = [t for t in toks if len(t) >= mt]
    # 중복 제거(순서 유지)
    seen = set()
    out: List[str] = []
    for t in toks:
        if t in seen:
            continue
        seen.add(t)
        out.append(t)
    return out


async def _candidate_groups_by_iname_tokens(
    conn,
    *,
    iname: str, 
    limit: int = 50,
    min_token_len: int = 2,
) -> List[Tuple[int, int]]:
    """
    iname 토큰을 이용해 shopprod_name_token에서 후보 group_id를 Top-K로 뽑는다.
    반환: [(group_id, shared_token_cnt), ...] (shared_token_cnt 내림차순)

    전제:
    - mlinkdw.shopprod_name_token 존재
    - tmp_tok_ok 존재(권장). 없으면 자동으로 tmp_tok_ok 조인 없이 재시도
    """
    toks = _tokenize_iname(iname, min_token_len=min_token_len)
    if not toks:
        return []
    rows = []
    try:
        # 1) tmp_tok_ok가 있으면 가장 안전(흔한 토큰 제거)하므로 우선 사용
        rows = await conn.fetch(
            """
            WITH toks AS (
              SELECT unnest($1::text[]) AS token
            )
            SELECT
              n.group_id,
              COUNT(*)::int AS shared_cnt
            FROM mlinkdw.shopprod_name_token n
            JOIN tmp_tok_ok t ON t.token = n.token
            JOIN toks ON toks.token = n.token
            GROUP BY n.group_id
            ORDER BY shared_cnt DESC
            LIMIT $2
            """,
            toks,
            int(limit),
        )
    except Exception:
        rows = []

    # rows가 비면, tmp_tok_ok 조인 없이 한 번 더 시도한다.
    if not rows:
        try:
            rows = await conn.fetch(
                """
                WITH toks AS (
                  SELECT unnest($1::text[]) AS token
                )
                SELECT
                  n.group_id,
                  COUNT(*)::int AS shared_cnt
                FROM mlinkdw.shopprod_name_token n
                JOIN toks ON toks.token = n.token
                GROUP BY n.group_id
                ORDER BY shared_cnt DESC
                LIMIT $2
                """,
                toks,
                int(limit),
            )
        except Exception as e:
            raise RuntimeError(
                "iname 토큰 후보 조회를 위해서는 mlinkdw.shopprod_name_token이 필요합니다. "
                "먼저 ownerTodomemeTransfer.py의 --step0 로 준비하세요."
            ) from e
    out: List[Tuple[int, int]] = []
    for r in rows:
        try:
            out.append((int(r["group_id"]), int(r["shared_cnt"])))
        except Exception:
            continue
    return out


def _candidate_groups_by_iname_tokens_sync(
    conn,
    *,
    iname: str, 
    limit: int = 50,
    min_token_len: int = 2,
) -> List[Tuple[int, int]]:
    """
    동기(psycopyg2/DB-API) 버전: iname 토큰을 이용해 shopprod_name_token에서 후보 group_id를 Top-K로 뽑는다.
    반환: [(group_id, shared_token_cnt), ...] (shared_token_cnt 내림차순)
    """
    toks = _tokenize_iname(iname, min_token_len=min_token_len)
    if not toks:
        return []

    cur = conn.cursor()
    rows = []
    try:
        # 1) tmp_tok_ok가 있으면 가장 안전(흔한 토큰 제거)하므로 우선 사용
        cur.execute(
            """
            WITH toks AS (
              SELECT unnest(%s::text[]) AS token
            )
            SELECT
              n.group_id,
              COUNT(*)::int AS shared_cnt
            FROM mlinkdw.shopprod_name_token n
            JOIN tmp_tok_ok t ON t.token = n.token
            JOIN toks ON toks.token = n.token
            GROUP BY n.group_id
            ORDER BY shared_cnt DESC
            LIMIT %s
            """,
            (toks, int(limit)),
        )
        rows = cur.fetchall()
    except Exception:
        rows = []

    # rows가 비면, tmp_tok_ok 조인 없이 한 번 더 시도한다.
    if not rows:
        try:
            cur.execute(
                """
                WITH toks AS (
                  SELECT unnest(%s::text[]) AS token
                )
                SELECT
                  n.group_id,
                  COUNT(*)::int AS shared_cnt
                FROM mlinkdw.shopprod_name_token n
                JOIN toks ON toks.token = n.token
                GROUP BY n.group_id
                ORDER BY shared_cnt DESC
                LIMIT %s
                """,
                (toks, int(limit)),
            )
            rows = cur.fetchall()
        except Exception as e:
            raise RuntimeError(
                "iname 토큰 후보 조회를 위해서는 mlinkdw.shopprod_name_token이 필요합니다. "
                "먼저 ownerTodomemeTransfer.py의 --step0 로 준비하세요."
            ) from e

    out: List[Tuple[int, int]] = []
    for r in rows:
        try:
            # DictCursor / tuple 모두 대응
            try:
                gid = int(r[0])
                cnt = int(r[1])
            except Exception:
                gid = int(r["group_id"])
                cnt = int(r["shared_cnt"])
            out.append((gid, cnt))
        except Exception:
            continue
    return out


def _normalize_img_url_token(u: str) -> str | None:
    """
    content_token용 이미지 URL 정규화.
    - query/fragment 제거(추적 파라미터 등으로 토큰 분산 방지)
    - scheme/host는 소문자
    - path는 unquote 후 소문자(매칭 우선)
    """
    try:
        # Guard against Postgres btree index entry size limit (~8KB).
        # Very long URL tokens can crash inserts into shopprod_group_content_token.
        MAX_LEN = 512
        raw = str(u).strip()
        if not raw:
            return None
        # scheme-less URL: //cdn.example.com/x.jpg
        if raw.startswith("//"):
            raw = "https:" + raw
        if raw.startswith("http:////"):
            raw = "http://" + raw[len("http:////") :]
        if raw.startswith("https:////"):
            raw = "https://" + raw[len("https:////") :]
        if raw.startswith("http:/") and (not raw.startswith("http://")):
            raw = "http://" + raw[len("http:/") :]
        if raw.startswith("https:/") and (not raw.startswith("https://")):
            raw = "https://" + raw[len("https:/") :]
        if raw.startswith("/"):
            path = unquote(raw).strip().lower()
            tok = f"rel:{path}"
            return tok if len(tok) <= MAX_LEN else None
        if raw.startswith("./") or raw.startswith("../"):
            path = unquote(raw).strip().lower()
            tok = f"rel:{path}"
            return tok if len(tok) <= MAX_LEN else None
        if (not re.match(r"^[a-zA-Z][a-zA-Z0-9+.-]*:", raw)) and ("/" in raw):
            path = unquote(raw).strip().lower()
            tok = f"rel:{path}"
            return tok if len(tok) <= MAX_LEN else None
        p = urlparse(raw)
        if p.scheme not in ("http", "https"):
            return None
        netloc = (p.netloc or "").strip().lower()
        if not netloc:
            return None
        path = unquote(p.path or "").strip()
        if not path:
            return None
        path = path.lower()
        # http/https는 동일 리소스로 취급(토큰 분산 방지): scheme을 https로 통일
        tok = f"https://{netloc}{path}"
        return tok if len(tok) <= MAX_LEN else None
    except Exception:
        return None


def _extract_img_filenames_from_html(
    html_text: str | None,
    *,
    max_imgs: int = 12,
    max_filenames: int = 4,
    min_fname_len: int = 6,
) -> List[str]:
    """
    (호환용 함수명 유지) content HTML에서 <img src>의 "정규화된 URL 토큰"을 추출.
    - query/fragment 제거(추적 파라미터로 인한 토큰 분산 방지)
    - scheme/host 소문자화, path unquote 후 소문자화(매칭 우선)
    - 너무 흔한 안내/공지 이미지는 제외(휴리스틱)
    """
    urls = _extract_img_urls_from_html(html_text, limit=int(max_imgs))
    if not urls:
        return []
    out: List[str] = []
    seen: set[str] = set()
    for u in urls:
        try:
            # ✅ 추가 정제: URL 끝에 잘못된 태그가 남아있는 경우 제거
            u_clean = re.sub(r'\[?/img\]?$', '', str(u), flags=re.IGNORECASE)
            u_clean = re.sub(r'</?img[^>]*>$', '', u_clean, flags=re.IGNORECASE)
            u_clean = u_clean.strip()
            if not u_clean:
                continue
            p = urlparse(u_clean)
            path_lower = str(p.path or "").strip().lower()
            fn = os.path.basename(p.path or "")
            fn = unquote(fn or "").strip().lower()
            # ✅ 파일명에서도 잘못된 태그 제거
            fn = re.sub(r'\[?/img\]?$', '', fn, flags=re.IGNORECASE)
            fn = re.sub(r'</?img[^>]*>$', '', fn, flags=re.IGNORECASE)
            fn = fn.strip()
        except Exception:
            continue
        if not fn:
            continue

        # ✅ path 기반 템플릿 이미지 차단:
        # - 예: /.../1holicmaker_common/3.bottom/info_...jpg 같은 케이스는 상품 고유 이미지가 아니라 공통 템플릿일 확률↑
        # - 공통(common) + (top/bottom) 경로 조합일 때만 차단(과차단 방지)
        try:
            if "common" in path_lower:
                if any(s in path_lower for s in STOP_IMG_URL_PATH_SUBSTRINGS):
                    continue
                # /3.bottom/ 처럼 숫자 prefix가 붙는 케이스도 포함
                if re.search(r"/\d+\.(?:top|bottom)/", path_lower):
                    continue
        except Exception:
            pass
        # ✅ 템플릿 경로 휴리스틱(오탐/후보폭발 방지)
        # - 예: .../1holicmaker_common/3.bottom/info_holicmaker1.jpg
        # - "common + (top/bottom)" 디렉토리 조합은 안내/푸터/헤더 템플릿일 확률이 높아 토큰에서 제외
        try:
            path_l = unquote(p.path or "").strip().lower()
        except Exception:
            path_l = ""
        if path_l:
            has_common = ("common" in path_l)
            has_top_bottom_dir = bool(re.search(r"/[^/]*(top|bottom)[^/]*/", path_l))
            looks_like_info = (fn.startswith("info") or ("notice" in fn) or fn.startswith("top") or fn.startswith("bottom"))
            if has_common and (has_top_bottom_dir or looks_like_info):
                continue
        # ✅ 안전장치:
        # - 공백이 포함된 URL이 따옴표 없이 들어오면 src 파싱이 첫 토큰까지만 잡혀
        #   "simple" 같은 너무 일반적인 leaf가 토큰으로 저장될 수 있다(오탐/merge 유발).
        # - 기본은 "이미지 파일(확장자 있음)"만 허용.
        # - 단, 과거 데이터/일부 CDN은 확장자 없이 이미지 리소스를 제공하기도 하므로
        #   확장자 없는 경우는 "충분히 구체적인 leaf(숫자/해시 등)"일 때만 예외 허용.
        root, ext = os.path.splitext(fn)
        ext = (ext or "").lstrip(".").lower()
        allowed_exts = ("jpg", "jpeg", "png", "gif", "webp", "bmp", "svg")
        if ext in allowed_exts:
            stem = root.strip()
        else:
            # 확장자 없음(또는 비이미지 확장자): leaf가 너무 일반적이면 제외
            leaf = (fn or "").strip()
            # 구체성 휴리스틱: 숫자 포함 또는 긴 hex(해시) 형태만 허용
            is_hexish = bool(re.fullmatch(r"[0-9a-f]{12,}", leaf))
            has_digit = any(ch.isdigit() for ch in leaf)
            is_all_digits = bool(re.fullmatch(r"\d+", leaf))  # 숫자로만 구성된 경우
            if not (has_digit or is_hexish):
                continue
            # ✅ 숫자로만 구성된 파일명(예: "800", "12345")은 구체적이므로 min_fname_len 완화
            # - 예: gmarket의 "still/800" 같은 케이스 허용
            if is_all_digits:
                # 숫자만으로 구성된 경우 최소 3자 이상만 허용 (너무 짧은 "1", "2" 제외)
                if len(leaf) < 3:
                    continue
            elif int(min_fname_len) > 0 and len(leaf) < int(min_fname_len):
                # 숫자+문자 혼합 또는 hex 형태는 기존 min_fname_len 체크 적용
                continue
            stem = leaf

        # stoplist(정확/포함)
        if fn in STOP_IMG_FILENAMES:
            continue
        # STOP_IMG_FILENAMES 변형(문자 포함)도 제외: 예) steadylady_notice.jpg, NOTICE_end.jpg ...
        if any(sf in fn for sf in STOP_IMG_FILENAMES):
            continue
        if stem in STOP_IMG_STEMS:
            continue
        if any(stem.startswith(p) for p in STOP_IMG_STEM_PREFIXES):
            continue
        if any(p in stem for p in STOP_IMG_STEM_SUBSTRINGS):
            continue

        tok = _normalize_img_url_token(u_clean)
        if not tok:
            continue
        if tok in seen:
            continue
        seen.add(tok)
        out.append(tok)
        if len(out) >= int(max_filenames):
            break
    return out


async def _candidate_groups_by_content_img_filenames(
    conn,
    *,
    content_html: str, 
    limit: int = 80,
    max_imgs: int = 12,
    max_filenames: int = 4,
    min_fname_len: int = 6,
) -> List[Tuple[int, int]]:
    """
    content HTML 이미지 파일명(예: dd045.jpg) 기반으로 후보 group_id를 Top-K로 뽑는다.
    반환: [(group_id, hit_cnt), ...] (hit_cnt 내림차순)

    주의: content LIKE/position 기반이라 인덱스 없으면 느릴 수 있음.
    그래서 폴백 후보 생성용으로만 쓰고 limit을 작게 유지한다.
    """
    fns = _extract_img_filenames_from_html(
        content_html,
        max_imgs=int(max_imgs),
        max_filenames=int(max_filenames),
        min_fname_len=int(min_fname_len),
    )
    if not fns:
        return []

    # 토큰이 너무 흔한 케이스(dd045.jpg 등)면 후보가 과도하게 늘어난다.
    # - 현재 content 폴백은 후보 생성용이므로 "희귀 토큰 우선"으로 fns를 재정렬/필터링한다.
    # - token df(등장 group 수)가 큰 토큰은 버리고, 남은 토큰 중 상위 K만 사용.
    try:
        df_rows = await conn.fetch(
            """
            SELECT
              token,
              COUNT(DISTINCT t.group_id)::int AS group_cnt
            FROM mlinkdw.shopprod_group_content_token t
            JOIN mlinkdw.shopprod_group2 g ON g.group_id = t.group_id
            WHERE token = ANY($1::text[])
            AND t.vender_code = g.winner_vender_code
            AND t.icode = g.winner_icode
            GROUP BY token
            """,
            fns,
        )
        df_map = {str(r["token"]): int(r["group_cnt"]) for r in df_rows}
        # 너무 흔한 토큰은 제외(후보 폭발 방지)
        MAX_TOKEN_GROUP_CNT = 2000
        fns2 = [t for t in fns if df_map.get(t, 0) <= MAX_TOKEN_GROUP_CNT]
        if not fns2:
            # 전부 흔하면, 그나마 제일 덜 흔한 1개는 유지
            fns2 = sorted(fns, key=lambda t: df_map.get(t, 10**9))[:1]
        # 희귀 토큰 우선
        fns = sorted(fns2, key=lambda t: df_map.get(t, 10**9))
        fns = fns[: int(max_filenames)]
    except Exception:
        pass

    min_hits = 2 if len(fns) >= 2 else 1
    try:
        rows = await conn.fetch(
            """
            SELECT
              t.group_id,
              COUNT(*)::int AS hit_cnt
            FROM mlinkdw.shopprod_group_content_token t
            JOIN mlinkdw.shopprod_group2 g ON g.group_id = t.group_id
            WHERE t.token = ANY($1::text[]) 
            AND t.vender_code = g.winner_vender_code
            AND t.icode = g.winner_icode
            GROUP BY t.group_id
            HAVING COUNT(*)::int >= $2
            ORDER BY hit_cnt DESC
            LIMIT $3
            """,
            fns, 
            int(min_hits),
            int(limit),
        )
    except Exception:
        return []
    out: List[Tuple[int, int]] = []
    for r in rows:
        try:
            out.append((int(r["group_id"]), int(r["hit_cnt"])))
        except Exception:
            continue
    return out


def _candidate_groups_by_content_img_filenames_sync(
    conn,
    *,
    content_html: str,
    limit: int = 80,
    max_imgs: int = 12,
    max_filenames: int = 4,
    min_fname_len: int = 6,
) -> List[Tuple[int, int]]:
    """
    동기(psycopyg2/DB-API) 버전: content HTML 이미지 파일명 기반 후보 group_id Top-K.
    반환: [(group_id, hit_cnt), ...] (hit_cnt 내림차순)
    """
    fns = _extract_img_filenames_from_html(
        content_html,
        max_imgs=int(max_imgs),
        max_filenames=int(max_filenames),
        min_fname_len=int(min_fname_len),
    )
    if not fns:
        return []

    cur = conn.cursor()

    # token df 기반으로 "희귀 토큰 우선" 정렬/필터링
    try:
        cur.execute(
            """
            SELECT
              token,
              COUNT(DISTINCT group_id)::int AS group_cnt
            FROM mlinkdw.shopprod_group_content_token
            WHERE token = ANY(%s::text[])
            GROUP BY token
            """,
            (fns,),
        )
        df_rows = cur.fetchall()
        df_map: dict[str, int] = {}
        for r in df_rows:
            try:
                tok = str(r[0])
                cnt = int(r[1])
            except Exception:
                tok = str(r["token"])
                cnt = int(r["group_cnt"])
            df_map[tok] = cnt

        MAX_TOKEN_GROUP_CNT = 2000
        fns2 = [t for t in fns if df_map.get(t, 0) <= MAX_TOKEN_GROUP_CNT]
        if not fns2:
            fns2 = sorted(fns, key=lambda t: df_map.get(t, 10**9))[:1]
        fns = sorted(fns2, key=lambda t: df_map.get(t, 10**9))
        fns = fns[: int(max_filenames)]
    except Exception:
        pass

    min_hits = 2 if len(fns) >= 2 else 1
    try:
        cur.execute(
            """
            SELECT
              t.group_id,
              COUNT(*)::int AS hit_cnt
            FROM mlinkdw.shopprod_group_content_token t
            WHERE t.token = ANY(%s::text[])
            GROUP BY t.group_id
            HAVING COUNT(*)::int >= %s
            ORDER BY hit_cnt DESC
            LIMIT %s
            """,
            (fns, int(min_hits), int(limit)),
        )
        rows = cur.fetchall()
    except Exception:
        return []

    out: List[Tuple[int, int]] = []
    for r in rows:
        try:
            try:
                out.append((int(r[0]), int(r[1])))
            except Exception:
                out.append((int(r["group_id"]), int(r["hit_cnt"])))
        except Exception:
            continue
    return out


@lru_cache(maxsize=4096)
def _fetch_http_bytes(url: str, *, timeout: int = 10) -> bytes | None:
    u = str(url or "").strip()
    if not u:
        return None
    if u.startswith("//"):
        u = "https:" + u
    if not (u.startswith("http://") or u.startswith("https://")):
        return None
    if requests is None:
        return None
    try:
        # 간단 referer (CDN 차단 완화)
        m = re.match(r"^(https?://[^/]+)", u, flags=re.IGNORECASE)
        referer = (m.group(1) + "/") if m else "https://www.alibaba.com/"
        headers = {
            "User-Agent": "Mozilla/5.0",
            "Referer": referer,
            "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
        }
        r = requests.get(u, timeout=int(timeout), headers=headers)
        r.raise_for_status()
        data = bytes(r.content)
        if len(data) > 10 * 1024 * 1024:
            return None
        return data
    except Exception:
        return None


def _extract_img_urls_from_html(html_text: str | None, *, limit: int = 20) -> List[str]:
    raw = str(html_text or "")
    # DB/JSON 등을 거치며 \" 형태로 이스케이프된 HTML 대응
    raw = raw.replace('\\"', '"').replace("\\'", "'")
    raw = _html.unescape(raw)
    # <img ...> 태그를 먼저 뽑고, 속성(src/data-src/srcset 등)을 안전하게 추출한다.
    attr_names = ("src", "data-src", "data-original", "data-lazy-src", "data-zoom-image", "data-url")
    urls: List[str] = []
    for m in re.finditer(r"<img\b[^>]*>", raw, flags=re.IGNORECASE):
        tag = m.group(0) or ""
        for an in attr_names:
            mm = re.search(
                rf"""\b{re.escape(an)}\s*=\s*(?:(["'])(.*?)\1|([^'"\s>]+))""",
                tag,
                flags=re.IGNORECASE | re.DOTALL,
            )
            if mm:
                u = (mm.group(2) or mm.group(3) or "").strip()
                if u:
                    urls.append(u)
                break
        mm2 = re.search(
            r"""\bsrcset\s*=\s*(?:(["'])(.*?)\1|([^'"\s>]+))""",
            tag,
            flags=re.IGNORECASE | re.DOTALL,
        )
        if mm2:
            ss = (mm2.group(2) or mm2.group(3) or "").strip()
            if ss:
                first = ss.split(",")[0].strip()
                first = first.split(" ")[0].strip()
                if first:
                    urls.append(first)

        # 깨진 HTML 방어:
        # - 예: <img src="https:<img src="https://ai.esmplus.com/...jpg">
        for u2 in re.findall(r"""https?://[^\s<>"']+""", tag, flags=re.IGNORECASE):
            u2 = str(u2).strip()
            if u2:
                urls.append(u2)

    if not urls:
        for u in re.findall(
            r"""https?://[^\s<>"']+\.(?:jpg|jpeg|png|gif|webp|bmp|svg)(?:\?[^\s<>"']*)?""",
            raw,
            flags=re.IGNORECASE,
        ):
            urls.append(str(u).strip())
    seen = set()
    out: List[str] = []
    for u in urls:
        # ✅ 잘못된 HTML 태그가 URL에 포함된 경우 정제:
        # - 예: "https://i.imgur.com/03SuoHc.png[/img]" -> "https://i.imgur.com/03SuoHc.png"
        # - 예: "https://example.com/img.jpg</img>" -> "https://example.com/img.jpg"
        u = re.sub(r'\[?/img\]?$', '', u, flags=re.IGNORECASE)
        u = re.sub(r'</?img[^>]*>$', '', u, flags=re.IGNORECASE)
        u = u.strip()
        if not u:
            continue
        if u in seen:
            continue
        seen.add(u)
        out.append(u)
        if len(out) >= int(limit):
            break
    return out


@lru_cache(maxsize=4096)
def _image_hash_pair_from_url(img_url: str):
    if not img_url or not (Image and ImageOps and ImageFilter):
        return None
    data = _fetch_http_bytes(str(img_url), timeout=10)
    if not data:
        return None
    try:
        from io import BytesIO

        bio = BytesIO(data)
        img = Image.open(bio)
        try:
            img.verify()
        except Exception:
            pass
        bio.seek(0)
        img = Image.open(bio)
        img = ImageOps.exif_transpose(img)
        # palette(P) + transparency 경고/품질 이슈 회피
        try:
            if getattr(img, "mode", None) == "P" and "transparency" in getattr(img, "info", {}):
                img = img.convert("RGBA")
        except Exception:
            pass
        img = img.convert("RGB")
        img = img.filter(ImageFilter.GaussianBlur(radius=0.6))
        resampling = getattr(Image, "Resampling", None)
        method = resampling.LANCZOS if resampling else getattr(Image, "LANCZOS", 1)
        img = ImageOps.fit(img, (256, 256), method=method)
        ph = imagehash.phash(img)
        dh = imagehash.dhash(img)
        return (ph, dh)
    except Exception:
        return None

@lru_cache(maxsize=4096)
def _image_hash_pair_from_url_raw(img_url: str):
    """
    배치(Search_DomeDuplicate_4st*.py)에서 사용하는 방식과 최대한 동일하게
    "원본 RGB" 기준으로 phash/dhash를 계산한다.

    - shopprod_group2의 group_key/phash_hex/dhash_hex는 이 값과 일치해야 후보매칭이 안정적이다.
    - (기존 _image_hash_pair_from_url은 blur/fit을 적용하여 결과가 달라질 수 있음)
    """
    if not img_url or not (Image and ImageOps):
        return None
    data = _fetch_http_bytes(str(img_url), timeout=10)
    if not data:
        return None
    try:
        from io import BytesIO

        bio = BytesIO(data)
        img = Image.open(bio)
        try:
            img.verify()
        except Exception:
            pass
        bio.seek(0)
        img = Image.open(bio)
        img = ImageOps.exif_transpose(img)
        img = img.convert("RGB")
        ph = imagehash.phash(img)
        dh = imagehash.dhash(img)
        return (ph, dh)
    except Exception:
        return None


def _group_hash_fields_from_ph_dh_hex(ph_hex: str, dh_hex: str):
    ph_hex_s = str(ph_hex or "").strip()
    dh_hex_s = str(dh_hex or "").strip()
    if not ph_hex_s or not dh_hex_s:
        return None
    try:
        bands = bands_from_phash_hex(ph_hex_s)
    except Exception:
        bands = []
    group_key = f"{ph_hex_s}_{dh_hex_s}"
    return (group_key, ph_hex_s, dh_hex_s, bands)

@lru_cache(maxsize=4096)
def _thumb_bw_sig_from_url(img_url: str):
    """
    썸네일의 "흑/백(밝기 분포)" 차이를 구분하기 위한 간단 시그니처.
    - phash/dhash는 색상/밝기 변화에 강건해서(=차이를 잘 못 느껴서) 흑/백만 다른 상품이 붙을 수 있다.
    - 여기서는 그레이스케일에서 dark/bright 비율을 비교해 큰 차이가 나면 같은 이미지로 보지 않도록 한다.
    """
    if not img_url or not (Image and ImageOps and ImageFilter):
        return None
    data = _fetch_http_bytes(str(img_url), timeout=10)
    if not data:
        return None
    try:
        from io import BytesIO
        from PIL import ImageStat

        bio = BytesIO(data)
        img = Image.open(bio)
        try:
            img.verify()
        except Exception:
            pass
        bio.seek(0)
        img = Image.open(bio)
        img = ImageOps.exif_transpose(img)
        img = img.convert("RGB")
        # 너무 큰 이미지는 비용이 크므로 작게 맞춘다.
        resampling = getattr(Image, "Resampling", None)
        method = resampling.LANCZOS if resampling else getattr(Image, "LANCZOS", 1)
        img = ImageOps.fit(img, (96, 96), method=method)

        gray = img.convert("L")
        stat = ImageStat.Stat(gray)
        mean_v = float(stat.mean[0]) if stat.mean else 0.0  # 0..255

        # 밝기 분포(어두움/밝음 비율). 흑/백 제품은 이 비율 차이가 크게 난다.
        px = list(gray.getdata())
        n = float(len(px)) if px else 1.0
        dark = 0
        bright = 0
        # 임계는 경험적으로 64/192가 안정적(압축/노이즈에 덜 흔들림)
        for p in px:
            try:
                if p < 64:
                    dark += 1
                elif p > 192:
                    bright += 1
            except Exception:
                continue
        dark_ratio = float(dark) / n
        bright_ratio = float(bright) / n
        return (mean_v, dark_ratio, bright_ratio)
    except Exception:
        return None


def _thumb_bw_sig_is_similar(
    sig_a,
    sig_b,
    *,
    max_mean_v_diff: float = 40.0,
    max_dark_ratio_diff: float = 0.25,
    max_bright_ratio_diff: float = 0.25,
) -> bool:
    if not sig_a or not sig_b:
        return True  # 시그니처 계산 불가 시에는 기존 동작 유지(호환/성능 우선)
    try:
        mean_a, dark_a, bright_a = sig_a
        mean_b, dark_b, bright_b = sig_b
    except Exception:
        return True

    try:
        if abs(float(dark_a) - float(dark_b)) > float(max_dark_ratio_diff):
            return False
        if abs(float(bright_a) - float(bright_b)) > float(max_bright_ratio_diff):
            return False
        # dark/bright가 비슷한데 전체 평균 밝기만 크게 다른 경우도 방어
        if abs(float(mean_a) - float(mean_b)) > float(max_mean_v_diff):
            return False
    except Exception:
        return True

    return True


HASH_CACHE: dict[str, tuple] = {}
def _image_hash_pair_from_url_cached(url: str):
    if not url:
        return None

    if url in HASH_CACHE:
        return HASH_CACHE[url]

    try:
        pair = _image_hash_pair_from_url(url)
    except Exception:
        pair = None

    if pair:
        HASH_CACHE[url] = pair

    return pair

def _extract_hash_pairs_from_html(
    html: str | None,
    *,
    max_imgs: int = 12,
):
    if not html:
        return []

    urls = _extract_img_urls_from_html(html, limit=max_imgs)

    pairs = []
    for u in urls:
        p = _image_hash_pair_from_url_cached(u)
        if p:
            pairs.append(p)

    return pairs


def _hash_pairs_match_stats(
    pairs_a,
    pairs_b,
    *,
    ph_thr: int = 8,
    dh_thr: int = 10,
):
    if not pairs_a or not pairs_b:
        return {"err": "NO_HASH"}

    # -------------------------
    # 1️⃣ 거의 동일 이미지 제거
    # -------------------------
    similar_thr = 3

    used_b = set()
    filtered_a = []

    for i, pa in enumerate(pairs_a):
        remove = False
        for j, pb in enumerate(pairs_b):
            if j in used_b:
                continue
            try:
                ph = int(pa[0] - pb[0])
                dh = int(pa[1] - pb[1])
            except Exception:
                continue

            if ph <= similar_thr and dh <= similar_thr:
                remove = True
                used_b.add(j)
                break

        if not remove:
            filtered_a.append(pa)

    filtered_b = [pb for j, pb in enumerate(pairs_b) if j not in used_b]

    # 전부 제거되면 거의 동일 상품
    if not filtered_a or not filtered_b:
        return {
            "err": None,
            "match": len(pairs_a),
            "ratio": 0.85,
        }

    pairs_a = filtered_a
    pairs_b = filtered_b

    # -------------------------
    # 2️⃣ 양방향 평균 매칭
    # -------------------------
    def directional_match(src, tgt):
        match = 0
        for s in src:
            best_ok = False
            for t in tgt:
                try:
                    ph = int(s[0] - t[0])
                    dh = int(s[1] - t[1])
                except Exception:
                    continue

                if ph <= ph_thr and dh <= dh_thr:
                    best_ok = True
                    break

            if best_ok:
                match += 1

        return match / float(len(src)) if src else 0.0

    score_ab = directional_match(pairs_a, pairs_b)
    score_ba = directional_match(pairs_b, pairs_a)

    ratio = (score_ab + score_ba) / 2.0
    ratio = min(ratio, 0.95)

    return {
        "err": None,
        "match": int(score_ab * len(pairs_a)),
        "ratio": ratio,
    }


def bands_from_phash_hex(phash_hex: str) -> List[int]:
    """
    phash(hex) -> 32bit band 배열 (big-endian)
    - 16 chars (64bit)  => 32bit x 2
    - 64 chars (256bit) => 32bit x 8
    """
    phash_hex = (phash_hex or "").strip()
    if len(phash_hex) not in (16, 64):
        return []
    try:
        n = int(phash_hex, 16)
    except Exception:
        return []

    num_bands = 2 if len(phash_hex) == 16 else 8
    bands: List[int] = []
    for i in range(num_bands):
        shift = (num_bands - 1 - i) * BAND_BITS
        band = (n >> shift) & 0xFFFFFFFF  # unsigned 32-bit
        # DB 컬럼은 int4[](signed)라서 범위를 맞춰 저장 (2^31 이상이면 음수로 매핑)
        if band >= 0x80000000:
            band -= 0x100000000
        bands.append(int(band))
    return bands


def _grade_to_int(v):
    try:
        s = str(v or "").strip()
        return int(s) if s.isdigit() else 10**9
    except Exception:
        return 10**9


def _price_to_int(v):
    try:
        return int(v) if v is not None else 10**18
    except Exception:
        return 10**18


def _reg_date_to_key(v):
    """
    reg_date 오래된(과거) 것이 우선.
    - v가 date/datetime/문자열일 수 있어 안전하게 정규화
    - 값이 없거나 파싱 실패면 아주 '최근'으로 취급(불리)
    """
    try:
        if v is None:
            return datetime.max.date()
        # asyncpg/psycopg2가 datetime/date로 주는 케이스
        if hasattr(v, "date") and not isinstance(v, str):
            return v.date()  # datetime -> date
        if hasattr(v, "year") and hasattr(v, "month") and hasattr(v, "day") and not isinstance(v, str):
            return v
        s = str(v).strip()
        if not s:
            return datetime.max.date()
        if len(s) >= 10:
            s = s[:10]
        y, m, d = s.split("-")
        return datetime(int(y), int(m), int(d)).date()
    except Exception:
        return datetime.max.date()


async def _maybe_update_winner_and_tokens_for_group_insertion_asyncpg(
    conn,
    *,
    target_gid: int,
    moved_vender_code: str | None,
    moved_icode: str | int | None,
    moved_price: int | float | None,
    moved_vender_grade: str | int | None = None,
    moved_reg_date=None,
    moved_content_html: str | None = None,
    moved_dome_code: str | None = None,
    moved_phash_hex: str | None = None,
    moved_dhash_hex: str | None = None,
    max_imgs: int = 12,
    max_filenames: int = 8,
    min_fname_len: int = 6,
) -> bool:
    """
    "기존 그룹(target_gid)에 편입될 상품(moved_*)이 winner 조건을 만족하면" shopprod_group.winner_*를 갱신하고,
    winner가 바뀌는 경우 token 정책(비-winner 삭제 + 새 winner 토큰 재생성)을 적용한다.

    - 호출 시점에 아직 shopprod_group_map의 group_id 업데이트 전이어도,
      같은 트랜잭션 내에서 곧 편입될 것을 전제로 winner/token을 "미리" 갱신할 수 있다.
    - 실패해도 그룹 매칭/생성 흐름을 막지 않도록 항상 조용히 False를 반환하도록 설계.
    """
    try:
        gid = int(target_gid)
    except Exception:
        return False

    vcode = str(moved_vender_code or "").strip()
    icode = str(moved_icode or "").strip()
    if not vcode or not icode:
        return False

    # moved key (UI와 동일 기준)
    m_key = (
        _price_to_int(moved_price),
        _grade_to_int(moved_vender_grade),
        _reg_date_to_key(moved_reg_date),
        icode,
    )

    # current winner
    try:
        gw = await conn.fetchrow(
            """
            SELECT winner_icode, winner_price, winner_vender_code
            FROM mlinkdw.shopprod_group2
            WHERE group_id = $1
            FOR UPDATE SKIP LOCKED 
            """,
            gid,
        )
    except Exception:
        return False

    if gw is None:
        return False

    cur_w_icode = None
    cur_w_vcode = None
    try:
        cur_w_icode = str(gw["winner_icode"]) if gw and gw.get("winner_icode") else None
        cur_w_vcode = str(gw["winner_vender_code"]) if gw and gw.get("winner_vender_code") else None
    except Exception:
        try:
            cur_w_icode = str(gw[0]) if gw and gw[0] else None
            cur_w_vcode = str(gw[2]) if gw and len(gw) > 2 and gw[2] else None
        except Exception:
            cur_w_icode = None
            cur_w_vcode = None

    # if current winner missing -> choose moved
    choose_moved = (cur_w_icode is None) or (cur_w_vcode is None)

    if not choose_moved and cur_w_icode and cur_w_vcode:
        # current winner meta (tie-break)
        try:
            w = await conn.fetchrow(
                """
                SELECT icode, price, vender_code, vender_grade, reg_date
                FROM mlinkdw.shopprod_group_map2
                WHERE group_id = $1 AND vender_code = $2 AND icode = $3
                LIMIT 1
                """,
                gid,
                cur_w_vcode,
                cur_w_icode,
            )
        except Exception:
            w = None

        w_key = (
            _price_to_int(w["price"]) if w else 10**18,
            _grade_to_int(w["vender_grade"]) if w else 10**9,
            _reg_date_to_key(w["reg_date"]) if w else datetime.max.date(),
            str(w["icode"]) if w and w.get("icode") else "~~~~",
        )
        choose_moved = (m_key < w_key)

    if not choose_moved:
        return False

    # winner 갱신
    try:
        await conn.execute(
            """
            UPDATE shopprod_group2 g
            SET winner_icode = $2,
                winner_price = $3,
                winner_vender_code = $4,
                winner_updated_at = now()
            WHERE g.group_id = $1
            AND (
                g.winner_icode IS NULL
                OR (
                    $3,
                    COALESCE(NULLIF($5, '')::int, 2147483647),
                    $6,
                    $2
                )
                <
                (
                    g.winner_price,
                    COALESCE(
                        (
                            SELECT NULLIF(m.vender_grade, '')::int
                            FROM mlinkdw.shopprod_group_map2 m
                            WHERE m.group_id = g.group_id
                            AND m.vender_code = g.winner_vender_code
                            AND m.icode = g.winner_icode
                            LIMIT 1
                        ),
                        2147483647
                    ),
                    (
                        SELECT reg_date
                        FROM mlinkdw.shopprod_group_map2 m
                        WHERE m.group_id = g.group_id
                        AND m.vender_code = g.winner_vender_code
                        AND m.icode = g.winner_icode
                        LIMIT 1
                    ),
                    g.winner_icode
                )
            )
            """,
            gid,
            icode,
            moved_price,
            vcode,
        )
    except Exception:
        return False

    # ✅ winner 변경 시 shopprod_group2의 해시 필드도 같이 갱신한다.
    # - moved 상품의 phash/dhash는 이미 호출자에서 계산되어 넘어오므로(배치 HTTP 단계),
    #   여기서 재다운로드 없이 group_key/phash_hex/dhash_hex/phash_bands를 업데이트할 수 있다.
    try:
        if moved_phash_hex and moved_dhash_hex:
            hf = _group_hash_fields_from_ph_dh_hex(str(moved_phash_hex), str(moved_dhash_hex))
        else:
            hf = None
        if hf:
            group_key, ph_hex_s, dh_hex_s, bands = hf
            await conn.execute(
                """
                UPDATE mlinkdw.shopprod_group2
                SET group_key = $2,
                    phash_hex = $3,
                    dhash_hex = $4,
                    phash_bands = $5
                WHERE group_id = $1
                """,
                gid,
                group_key,
                ph_hex_s,
                dh_hex_s,
                bands,
            )
    except Exception:
        # UNIQUE(group_key) 충돌/컬럼 권한/기타 문제는 winner 갱신 자체를 막지 않는다.
        pass

    # token 정책 적용 (winner 변경 시에만)
    # try:
    #     await conn.execute(
    #         """
    #         DELETE FROM mlinkdw.shopprod_group_content_token
    #         WHERE group_id = $1
    #         AND (vender_code <> $2 OR icode <> $3)
    #         """,
    #         gid,
    #         vcode,
    #         icode,
    #     )
    # except Exception:
    #     # token 테이블/권한/스키마 미존재 등은 winner 갱신 자체를 막지 않음
    #     return True

    html_for_tok = str(moved_content_html or "")
    dome_code_for_tok = str(moved_dome_code or "").strip()
    if not html_for_tok or not dome_code_for_tok:
        return True

    try:
        toks = _extract_img_filenames_from_html(
            html_for_tok,
            max_imgs=int(max_imgs),
            max_filenames=int(max_filenames),
            min_fname_len=int(min_fname_len),
        )
    except Exception:
        toks = []

    if not toks:
        return True

    try:
        await conn.executemany(
            """
            INSERT INTO mlinkdw.shopprod_group_content_token
              (group_id, vender_code, icode, token, dome_code)
            VALUES ($1,$2,$3,$4,$5)
            ON CONFLICT (group_id, vender_code, icode, token)
            DO UPDATE SET dome_code = EXCLUDED.dome_code
            """,
            [(gid, vcode, icode, str(t), dome_code_for_tok) for t in toks],
        )
    except Exception:
        pass

    return True


async def merge_source_tokens_into_target_winner_asyncpg(conn, *, target_gid: int, source_gids: list[int]) -> None:
    """
    UI의 그룹 병합 토큰 정책을 asyncpg로 재사용 가능하게 분리:
    - TARGET에서는 winner가 아닌 token 제거
    - SOURCE들의 token 중 TARGET(winner 키)에 없는 token만 TARGET에 누적(INSERT)
    - SOURCE token 삭제(고아 방지)
    """
    try:
        gid = int(target_gid)
        src = [int(x) for x in (source_gids or [])]
    except Exception:
        return
    if not src:
        return

    try:
        wr = await conn.fetchrow(
            "SELECT winner_vender_code, winner_icode FROM mlinkdw.shopprod_group2 WHERE group_id = $1",
            gid,
        )
    except Exception:
        return
    if not wr:
        return
    try:
        wv = wr["winner_vender_code"]
        wi = wr["winner_icode"]
    except Exception:
        wv = wr[0] if len(wr) > 0 else None
        wi = wr[1] if len(wr) > 1 else None

    if wv is None or wi is None:
        return

    try:
        # 1) target에서 winner가 아닌 token 제거
        await conn.execute(
            """
            DELETE FROM mlinkdw.shopprod_group_content_token t
            WHERE t.group_id = $1
              AND EXISTS (
                SELECT 1
                FROM mlinkdw.shopprod_group2 g
                WHERE g.group_id = t.group_id
                  AND (t.vender_code IS DISTINCT FROM g.winner_vender_code
                       OR t.icode IS DISTINCT FROM g.winner_icode)
              )
            """,
            gid,
        )

        # 2) source token -> target winner 키로 누적(없는 것만)
        await conn.execute(
            """
            INSERT INTO mlinkdw.shopprod_group_content_token
              (group_id, vender_code, icode, token, dome_code)
            SELECT
              $1 AS group_id,
              $2 AS vender_code,
              $3 AS icode,
              s.token,
              COALESCE(s.dome_code, '') AS dome_code
            FROM mlinkdw.shopprod_group_content_token s
            WHERE s.group_id = ANY($4::bigint[])
              AND NOT EXISTS (
                SELECT 1
                FROM mlinkdw.shopprod_group_content_token t
                WHERE t.group_id = $1
                  AND t.vender_code = $2
                  AND t.icode = $3
                  AND t.token = s.token
              )
            ON CONFLICT (group_id, vender_code, icode, token) DO NOTHING
            """,
            gid,
            str(wv),
            str(wi),
            src,
        )

        # 3) source token 삭제
        await conn.execute(
            "DELETE FROM mlinkdw.shopprod_group_content_token WHERE group_id = ANY($1::bigint[])",
            src,
        )
    except Exception:
        return


async def _merge_moved_item_tokens_into_target_winner_asyncpg(
    conn,
    *,
    target_gid: int,
    moved_content_html: str | None,
    moved_dome_code: str | None,
    max_imgs: int = 12,
    max_filenames: int = 8,
    min_fname_len: int = 6,
) -> None:
    """
    (UI 그룹 병합 정책의 변형)
    - moved item(content)에서 토큰을 추출하여,
      TARGET 그룹의 winner 키(vender_code/icode)에 "없는 token만" 누적 삽입한다.
    - token 테이블이 "winner 1개만 유지" 구조이므로, 먼저 TARGET의 비-winner token을 제거한다.
    """
    try:
        gid = int(target_gid)
    except Exception:
        return
    html_for_tok = str(moved_content_html or "")
    dome_code_for_tok = str(moved_dome_code or "").strip()
    if not html_for_tok or not dome_code_for_tok:
        return

    # target winner key
    try:
        wr = await conn.fetchrow(
            "SELECT winner_vender_code, winner_icode FROM mlinkdw.shopprod_group2 WHERE group_id = $1",
            gid,
        )
    except Exception:
        return
    if not wr:
        return
    try:
        wv = wr["winner_vender_code"]
        wi = wr["winner_icode"]
    except Exception:
        wv = wr[0] if len(wr) > 0 else None
        wi = wr[1] if len(wr) > 1 else None
    if wv is None or wi is None:
        return

    # keep "winner only" invariant for target
    # try:
    #     await conn.execute(
    #         """
    #         DELETE FROM mlinkdw.shopprod_group_content_token t
    #         WHERE t.group_id = $1
    #           AND EXISTS (
    #             SELECT 1
    #             FROM mlinkdw.shopprod_group2 g
    #             WHERE g.group_id = t.group_id
    #               AND (t.vender_code IS DISTINCT FROM g.winner_vender_code
    #                    OR t.icode IS DISTINCT FROM g.winner_icode)
    #           )
    #         """,
    #         gid,
    #     )
    # except Exception:
    #     return

    try:
        toks = _extract_img_filenames_from_html(
            html_for_tok,
            max_imgs=int(max_imgs),
            max_filenames=int(max_filenames),
            min_fname_len=int(min_fname_len),
        )
    except Exception:
        toks = []
    if not toks:
        return

    # "없는 token만" -> ON CONFLICT로 안전 처리 (winner 키로 저장)
    try:
        await conn.executemany(
            """
            INSERT INTO mlinkdw.shopprod_group_content_token
              (group_id, vender_code, icode, token, dome_code)
            VALUES ($1,$2,$3,$4,$5)
            ON CONFLICT (group_id, vender_code, icode, token)
            DO UPDATE SET dome_code = EXCLUDED.dome_code
            """,
            [(gid, str(wv), str(wi), str(t), dome_code_for_tok) for t in toks],
        )
    except Exception:
        return


def _rec_get_sync(rec, key: str, default=None):
    try:
        if rec is None:
            return default
        return rec[key]
    except Exception:
        try:
            return rec.get(key, default)  # type: ignore[attr-defined]
        except Exception:
            return default


def refresh_group_winner_and_tokens_if_needed_sync(
    conn,
    group_id: int,
    *,
    cur=None,
    force_token_cleanup: bool = True,
    max_imgs: int = 12,
    max_filenames: int = 8,
    min_fname_len: int = 6,
) -> bool:
    """
    psycopg2(conn) 기준:
    - group_id의 best winner를 shopprod_group_map에서 재선정(가격->등급->reg_date->icode)
    - winner가 바뀌면 content_token 정책(비-winner 삭제 + 새 winner 토큰 재생성/업서트) 적용
    - 반환: winner가 실제로 변경되었으면 True
    """
    try:
        gid = int(group_id)
    except Exception:
        return False

    own_cur = False
    if cur is None:
        try:
            from psycopg2.extras import DictCursor  # type: ignore
        except Exception:
            DictCursor = None  # type: ignore
        cur = conn.cursor(cursor_factory=DictCursor) if DictCursor else conn.cursor()
        own_cur = True

    try:
        # current winner
        cur.execute(
            """
            SELECT winner_icode, winner_vender_code, winner_price, group_key, phash_hex, dhash_hex
            FROM mlinkdw.shopprod_group2
            WHERE group_id = %s
            LIMIT 1
            """,
            (gid,),
        )
        cur_w = cur.fetchone()

        # best winner from map (UI/배치와 동일)
        cur.execute(
            """
            SELECT vender_code, icode, price, vender_grade, reg_date
            FROM mlinkdw.shopprod_group_map2
            WHERE group_id = %s
            ORDER BY
              price NULLS LAST,
              CASE
                WHEN vender_grade ~ '^[0-9]+$' THEN vender_grade::int
                ELSE 2147483647
              END ASC,
              reg_date ASC NULLS LAST,
              icode ASC
            LIMIT 1
            """,
            (gid,),
        )
        best = cur.fetchone()
        if not best:
            return False

        best_icode = str(_rec_get_sync(best, "icode", "") or "")
        best_vcode = str(_rec_get_sync(best, "vender_code", "") or "")
        best_price = _rec_get_sync(best, "price", None)

        cur_icode = str(_rec_get_sync(cur_w, "winner_icode", "") or "") if cur_w else ""
        cur_vcode = str(_rec_get_sync(cur_w, "winner_vender_code", "") or "") if cur_w else ""
        cur_price = _rec_get_sync(cur_w, "winner_price", None) if cur_w else None
        cur_group_key = str(_rec_get_sync(cur_w, "group_key", "") or "") if cur_w else ""
        cur_ph_hex = str(_rec_get_sync(cur_w, "phash_hex", "") or "") if cur_w else ""
        cur_dh_hex = str(_rec_get_sync(cur_w, "dhash_hex", "") or "") if cur_w else ""

        same_winner = (best_icode, best_vcode, best_price) == (cur_icode, cur_vcode, cur_price)
        if same_winner:
            # ✅ winner가 동일하더라도 shopprod_group2 해시 필드가 누락/오염될 수 있으므로 보정한다.
            if (not cur_group_key) or (not cur_ph_hex) or (not cur_dh_hex):
                try:
                    cur.execute(
                        """
                        SELECT img_url
                        FROM mlinkdw.shopprod_group_map2
                        WHERE vender_code = %s AND icode = %s
                        LIMIT 1
                        """,
                        (best_vcode, best_icode),
                    )
                    rr_img = cur.fetchone()
                    win_img_url = str(_rec_get_sync(rr_img, "img_url", "") or "") if rr_img else ""
                except Exception:
                    win_img_url = ""
                if win_img_url:
                    try:
                        p = _image_hash_pair_from_url_raw(win_img_url)
                    except Exception:
                        p = None
                    if p:
                        try:
                            ph_hex_new = str(p[0])
                            dh_hex_new = str(p[1])
                            hf = _group_hash_fields_from_ph_dh_hex(ph_hex_new, dh_hex_new)
                        except Exception:
                            hf = None
                        if hf:
                            group_key_new, ph_hex_new, dh_hex_new, bands_new = hf
                            try:
                                cur.execute(
                                    """
                                    UPDATE mlinkdw.shopprod_group2
                                    SET group_key=%s, phash_hex=%s, dhash_hex=%s, phash_bands=%s
                                    WHERE group_id=%s
                                    """,
                                    (group_key_new, ph_hex_new, dh_hex_new, bands_new, gid),
                                )
                            except Exception:
                                pass

            # winner가 같아도, 과거 데이터/병합/정책 변경 등으로 non-winner token이 남아있을 수 있다.
            # UI에서는 "winner 1개만 유지" 불변식을 최대한 유지하기 위해 token 정리를 강제한다.
            if force_token_cleanup:
                try:
                    # ✅ winner가 동일하더라도, 토큰이 누적/오염되어 있을 수 있으므로 "그룹 단위로 전부 삭제" 후
                    #    현재 winner의 content로 즉시 재생성한다.
                    cur.execute(
                        "DELETE FROM mlinkdw.shopprod_group_content_token WHERE group_id = %s",
                        (gid,),
                    )
                except Exception:
                    pass

                # winner content/dome_code로 토큰 재생성(가능한 경우에만)
                try:
                    cur.execute(
                        """
                        SELECT content, dome_code
                        FROM mlinkdw.shopprod_group_map2
                        WHERE vender_code = %s AND icode = %s
                        LIMIT 1
                        """,
                        (best_vcode, best_icode),
                    )
                    rr = cur.fetchone()
                    content_html = str(_rec_get_sync(rr, "content", "") or "") if rr else ""
                    dome_code = _rec_get_sync(rr, "dome_code", None) if rr else None
                    dome_code_s = str(dome_code) if dome_code is not None else ""
                    if content_html and dome_code_s:
                        toks = _extract_img_filenames_from_html(
                            content_html,
                            max_imgs=int(max_imgs),
                            max_filenames=int(max_filenames),
                            min_fname_len=int(min_fname_len),
                        )
                        if toks:
                            cur.executemany(
                                """
                                INSERT INTO mlinkdw.shopprod_group_content_token
                                (group_id, vender_code, icode, token, dome_code)
                                VALUES (%s,%s,%s,%s,%s)
                                ON CONFLICT (group_id, vender_code, icode, token)
                                DO UPDATE SET dome_code = EXCLUDED.dome_code
                                """,
                                [(gid, best_vcode, best_icode, str(t), dome_code_s) for t in toks],
                            )
                except Exception:
                    pass
            return False

        # winner update
        cur.execute(
            """
            UPDATE mlinkdw.shopprod_group2
            SET winner_icode=%s, winner_vender_code=%s, winner_price=%s, winner_updated_at=CURRENT_TIMESTAMP
            WHERE group_id=%s
            """,
            (best_icode, best_vcode, best_price, gid),
        )

        # ✅ winner 변경 시 shopprod_group2 해시 필드도 winner 이미지 기준으로 갱신
        try:
            cur.execute(
                """
                SELECT img_url
                FROM mlinkdw.shopprod_group_map2
                WHERE vender_code = %s AND icode = %s
                LIMIT 1
                """,
                (best_vcode, best_icode),
            )
            rr_img = cur.fetchone()
            win_img_url = str(_rec_get_sync(rr_img, "img_url", "") or "") if rr_img else ""
        except Exception:
            win_img_url = ""

        if win_img_url:
            try:
                p = _image_hash_pair_from_url_raw(win_img_url)
            except Exception:
                p = None
            if p:
                try:
                    ph_hex_new = str(p[0])
                    dh_hex_new = str(p[1])
                    hf = _group_hash_fields_from_ph_dh_hex(ph_hex_new, dh_hex_new)
                except Exception:
                    hf = None
                if hf:
                    group_key_new, ph_hex_new, dh_hex_new, bands_new = hf
                    try:
                        cur.execute(
                            """
                            UPDATE mlinkdw.shopprod_group2
                            SET group_key=%s, phash_hex=%s, dhash_hex=%s, phash_bands=%s
                            WHERE group_id=%s
                            """,
                            (group_key_new, ph_hex_new, dh_hex_new, bands_new, gid),
                        )
                    except Exception:
                        # UNIQUE(group_key) 충돌 등은 본 흐름을 막지 않는다.
                        pass

        # delete non-winner tokens
        try:
            cur.execute(
                """
                DELETE FROM mlinkdw.shopprod_group_content_token t
                WHERE t.group_id = %s
                  AND EXISTS (
                    SELECT 1
                    FROM mlinkdw.shopprod_group2 g
                    WHERE g.group_id = t.group_id
                      AND (t.vender_code IS DISTINCT FROM g.winner_vender_code
                           OR t.icode IS DISTINCT FROM g.winner_icode)
                  )
                """,
                (gid,),
            )
        except Exception:
            return True

        # fetch winner content/dome_code
        cur.execute(
            """
            SELECT content, dome_code
            FROM mlinkdw.shopprod_group_map2
            WHERE vender_code = %s AND icode = %s
            LIMIT 1
            """,
            (best_vcode, best_icode),
        )
        rr = cur.fetchone()
        content_html = str(_rec_get_sync(rr, "content", "") or "") if rr else ""
        dome_code = _rec_get_sync(rr, "dome_code", None) if rr else None
        dome_code_s = str(dome_code) if dome_code is not None else ""
        if not content_html or not dome_code_s:
            return True

        toks = _extract_img_filenames_from_html(
            content_html,
            max_imgs=int(max_imgs),
            max_filenames=int(max_filenames),
            min_fname_len=int(min_fname_len),
        )
        if not toks:
            return True

        # upsert tokens
        try:
            cur.executemany(
                """
                INSERT INTO mlinkdw.shopprod_group_content_token
                (group_id, vender_code, icode, token, dome_code)
                VALUES (%s,%s,%s,%s,%s)
                ON CONFLICT (group_id, vender_code, icode, token)
                DO UPDATE SET dome_code = EXCLUDED.dome_code
                """,
                [(gid, best_vcode, best_icode, str(t), dome_code_s) for t in toks],
            )
        except Exception:
            pass

        return True
    finally:
        if own_cur:
            try:
                cur.close()
            except Exception:
                pass


def merge_moved_item_tokens_into_target_winner_sync(
    conn,
    *,
    target_gid: int,
    moved_content_html: str | None,
    moved_dome_code: str | None,
    cur=None,
    max_imgs: int = 12,
    max_filenames: int = 8,
    min_fname_len: int = 6,
) -> None:
    """
    (UI 그룹 병합 정책의 변형 - sync/psycopg2)
    - moved item(content)에서 토큰을 추출하여,
      TARGET 그룹의 winner 키(vender_code/icode)에 "없는 token만" 누적 삽입한다.
    - token 테이블이 "winner 1개만 유지" 구조이므로, 먼저 TARGET의 비-winner token을 제거한다.
    """
    try:
        gid = int(target_gid)
    except Exception:
        return
    html_for_tok = str(moved_content_html or "")
    dome_code_for_tok = str(moved_dome_code or "").strip()
    if not html_for_tok or not dome_code_for_tok:
        return

    own_cur = False
    if cur is None:
        try:
            from psycopg2.extras import DictCursor  # type: ignore
        except Exception:
            DictCursor = None  # type: ignore
        cur = conn.cursor(cursor_factory=DictCursor) if DictCursor else conn.cursor()
        own_cur = True

    try:
        # target winner key
        cur.execute(
            "SELECT winner_vender_code, winner_icode FROM mlinkdw.shopprod_group2 WHERE group_id = %s LIMIT 1",
            (gid,),
        )
        wr = cur.fetchone()
        if not wr:
            return
        wv = _rec_get_sync(wr, "winner_vender_code", None)
        wi = _rec_get_sync(wr, "winner_icode", None)
        if wv is None or wi is None:
            return

        # keep winner-only invariant
        try:
            cur.execute(
                """
                DELETE FROM mlinkdw.shopprod_group_content_token t
                WHERE t.group_id = %s
                  AND EXISTS (
                    SELECT 1
                    FROM mlinkdw.shopprod_group2 g
                    WHERE g.group_id = t.group_id
                      AND (t.vender_code IS DISTINCT FROM g.winner_vender_code
                           OR t.icode IS DISTINCT FROM g.winner_icode)
                  )
                """,
                (gid,),
            )
        except Exception:
            return

        toks = _extract_img_filenames_from_html(
            html_for_tok,
            max_imgs=int(max_imgs),
            max_filenames=int(max_filenames),
            min_fname_len=int(min_fname_len),
        )
        if not toks:
            return

        try:
            cur.executemany(
                """
                INSERT INTO mlinkdw.shopprod_group_content_token
                (group_id, vender_code, icode, token, dome_code)
                VALUES (%s,%s,%s,%s,%s)
                ON CONFLICT (group_id, vender_code, icode, token)
                DO UPDATE SET dome_code = EXCLUDED.dome_code
                """,
                [(gid, str(wv), str(wi), str(t), dome_code_for_tok) for t in toks],
            )
        except Exception:
            return
    finally:
        if own_cur:
            try:
                cur.close()
            except Exception:
                pass

def dual_buckets_from_phash_hex(ph_hex: str):
    if not ph_hex or len(ph_hex) != 16:
        return None, None
    try:
        n = int(ph_hex, 16)
        bucket1 = (n >> 48) & 0xFFFF   # 상위 16bit
        bucket2 = (n >> 24) & 0xFFFF   # 중앙 16bit
        return bucket1, bucket2
    except Exception:
        return None, None

async def find_or_create_group_id(
    conn,
    group_index_exact: Dict[Tuple[str, str], int],
    phash,
    dhash,
    *,
    threshold: int = 8,
    dhash_threshold: int = 10,
    candidate_limit: int = 500, 
    iname: str | None = None,
    iname_threshold: float = 50,
    fallback_iname_tokens: bool = False, 
    fallback_iname_token_limit: int = 50,
    fallback_iname_min_token_len: int = 2,
    fallback_content_html: str | None = None,
    fallback_content_max_imgs: int = 12,
    fallback_content_ph_threshold: int = 8,
    fallback_content_dh_threshold: int = 10,
    fallback_content_min_matches: int = 1,
    fallback_content_min_ratio: float = 0.80, 
    skip_low_entropy: bool = True,
    low_entropy_max_unique: int = 3, 
    current_vender_code: str | None = None,
    current_icode: str | int | None = None,
    current_price: int | float | None = None,
    current_vender_grade: str | int | None = None,
    current_reg_date=None,
    current_dome_code: str | None = None,  
    current_img_url: str | None = None,
    color_check: bool = False,
    color_check_top_k: int = 5,
    color_max_mean_v_diff: float = 40.0,
    color_max_dark_ratio_diff: float = 0.25,
    color_max_bright_ratio_diff: float = 0.25, 
    verify_gid_exists: bool = False,
    maintain_winner_tokens_on_existing_group: bool = True,
    merge_moved_tokens_on_existing_group: bool = True,
    return_meta: bool = False,
) -> int | tuple[int, bool]:
    """
    asyncpg Connection 기준으로 group_id를 찾거나 생성.
    - phash/dhash: imagehash.ImageHash
    - threshold: 해밍거리 임계치 (phash 256bit 기준 권장 8~14)
    """
    ph_hex = str(phash)
    dh_hex = str(dhash)
    key = (ph_hex, dh_hex)
    group_key = f"{ph_hex}_{dh_hex}" 

    # print("step 1")

    # ✅ 캐시가 비어있거나 preload를 하지 않은 배치 환경에서도 exact 매칭을 매우 싸게 처리:
    # - shopprod_group.group_key는 UNIQUE 인덱스가 있으므로 point lookup이 빠르다.
    # - low-entropy skip/bands 로직에 앞서 수행해서 "이미 존재하는 exact group"을 놓치지 않는다.
    if key not in group_index_exact:
        try:
            gid_exact = await conn.fetchval(
                "SELECT group_id FROM mlinkdw.shopprod_group2 WHERE group_key = $1 LIMIT 1",
                group_key,
            )
        except Exception:
            gid_exact = None

        if gid_exact:

            # print(f"found exact group_id: {gid_exact}")

            try:
                gid_i = int(gid_exact)
                group_index_exact[key] = gid_i
                if bool(maintain_winner_tokens_on_existing_group):
                    try:
                        await _maybe_update_winner_and_tokens_for_group_insertion_asyncpg(
                            conn,
                            target_gid=gid_i,
                            moved_vender_code=current_vender_code,
                            moved_icode=current_icode,
                            moved_price=current_price,
                            moved_vender_grade=current_vender_grade,
                            moved_reg_date=current_reg_date,
                            moved_content_html=fallback_content_html,
                            moved_dome_code=current_dome_code ,
                            moved_phash_hex=str(phash),
                            moved_dhash_hex=str(dhash),
                        )
                    except Exception:
                        pass
                if bool(merge_moved_tokens_on_existing_group):
                    try:
                        await _merge_moved_item_tokens_into_target_winner_asyncpg(
                            conn,
                            target_gid=gid_i,
                            moved_content_html=fallback_content_html,
                            moved_dome_code=current_dome_code ,
                        )
                    except Exception:
                        pass
                if return_meta:
                    return gid_i, False
                return gid_i
            except Exception:
                pass

    # print("step 2")

    if key in group_index_exact:
        # 메모리 캐시가 DB 롤백/외부정리 등으로 "죽은 group_id"를 들고 있는 경우를 방지
        cached_gid = int(group_index_exact[key])
        if bool(verify_gid_exists):
            try:
                exists = await conn.fetchval(
                    "SELECT 1 FROM mlinkdw.shopprod_group2 WHERE group_id = $1",
                    cached_gid,
                )
            except Exception:
                exists = 1  # 검증 실패 시에는 기존 동작 유지(성능/호환 우선)
        else:
            exists = 1

        if exists:

            # print(f"using cached group_id: {cached_gid}")

            if bool(maintain_winner_tokens_on_existing_group):
                # ✅ 기존 그룹으로 편입될 케이스: moved item이 winner 조건이면 winner/token 정책을 즉시 적용
                # - (동일 트랜잭션 내에서) group_map의 group_id 업데이트 전이라도, 곧 편입될 것을 전제로 갱신 가능
                try:
                    await _maybe_update_winner_and_tokens_for_group_insertion_asyncpg(
                        conn,
                        target_gid=int(cached_gid),
                        moved_vender_code=current_vender_code,
                        moved_icode=current_icode,
                        moved_price=current_price,
                        moved_vender_grade=current_vender_grade,
                        moved_reg_date=current_reg_date,
                        moved_content_html=fallback_content_html,
                        moved_dome_code=current_dome_code,
                        moved_phash_hex=str(phash),
                        moved_dhash_hex=str(dhash),
                    )
                except Exception:
                    pass
            if bool(merge_moved_tokens_on_existing_group):
                # ✅ 추가: moved item의 content token을 TARGET winner 키에 누적(없는 token만)
                try:
                    await _merge_moved_item_tokens_into_target_winner_asyncpg(
                        conn,
                        target_gid=int(cached_gid),
                        moved_content_html=fallback_content_html,
                        moved_dome_code=current_dome_code,
                    )
                except Exception:
                    pass
            if return_meta:
                return int(cached_gid), False
            return cached_gid
        # 캐시 무효화 후 아래 로직(후보검색/신규생성) 진행
        try:
            del group_index_exact[key]
        except Exception:
            pass

    # print("step 3")

    # 단색/패턴/공통 이미지 계열은 오탐 위험이 커서(특히 대량 데이터에서) 후보 매칭을 제한
    # - 단, exact key 매칭은 위에서 처리했으므로 여기서는 "기존 그룹으로 흡수"를 줄이는 방향
    bands = []
    bucket1, bucket2 = None, None
    if not (bool(skip_low_entropy) and (
        _is_low_entropy_hex64(ph_hex, max_unique_nibbles=low_entropy_max_unique) or 
        _is_low_entropy_hex64(dh_hex, max_unique_nibbles=low_entropy_max_unique))
        ): # 후보검색 생략 -> 신규 그룹 생성로 진행
        bands = bands_from_phash_hex(ph_hex)
        bucket1, bucket2 = dual_buckets_from_phash_hex(ph_hex)

    # if blocked_bands and bands:
    #     bands = [b for b in bands if b not in blocked_bands]

    # 1) 후보 검색 (band overlap)
    best_gid: Optional[int] = None
    best_score: Optional[int] = None
    best_ph_dist: Optional[int] = None
    best_dh_dist: Optional[int] = None

    rows = []
    used_band_fallback = False

    if bucket1 is not None:
        try:
            # iname 필터를 쓸 때만 winner_iname을 조인해서 가져온다(비용 최소화)
            use_iname = bool(iname_threshold and iname)
            use_color = bool(color_check and current_img_url)
            need_gm = bool(use_iname or use_color)

            if need_gm:
                rows = await conn.fetch(
                    """
                    (
                    SELECT
                      g.group_id,
                      g.phash_hex,
                      g.dhash_hex,
                      gm.iname AS winner_iname,
                      gm.img_url AS winner_img_url
                    FROM mlinkdw.shopprod_group2 g
                    LEFT JOIN mlinkdw.shopprod_group_map2 gm ON gm.group_id = g.group_id
                     AND gm.vender_code = g.winner_vender_code
                     AND gm.icode = g.winner_icode
                    WHERE g.phash_bucket = $1
                    LIMIT 250
                    )
                    union all
                    (
                    SELECT g.group_id,
                      g.phash_hex,
                      g.dhash_hex,
                      gm.iname AS winner_iname,
                      gm.img_url AS winner_img_url
                    FROM mlinkdw.shopprod_group2 g
                    LEFT JOIN mlinkdw.shopprod_group_map2 gm ON gm.group_id = g.group_id
                     AND gm.vender_code = g.winner_vender_code
                     AND gm.icode = g.winner_icode
                    WHERE g.phash_bucket2 = $2
                    LIMIT 250
                    )
                    LIMIT $3
                    """,
                    int(bucket1),
                    int(bucket2),
                    int(candidate_limit),
                )
            else:
                rows = await conn.fetch(
                    """
                    (
                    SELECT group_id, phash_hex, dhash_hex
                    FROM mlinkdw.shopprod_group2
                    WHERE phash_bucket = $1 
                    LIMIT 250
                    )
                    union all
                    (
                    SELECT group_id, phash_hex, dhash_hex
                    FROM mlinkdw.shopprod_group2
                    WHERE phash_bucket2 = $2
                    LIMIT 250
                    )
                    LIMIT $3
                    """,
                    int(bucket1),
                    int(bucket2),
                    int(candidate_limit),
                )

            seen = set()
            filtered_rows = []

            for r in rows:
                gid = r["group_id"]
                if gid not in seen:
                    seen.add(gid)
                    filtered_rows.append(r)

            rows = filtered_rows

            if len(rows) < 5 and bands:
                used_band_fallback = True

                band_rows = await conn.fetch(
                    """
                    SELECT group_id, phash_hex, dhash_hex
                    FROM mlinkdw.shopprod_group2
                    WHERE phash_bands && $1
                    LIMIT $2
                    """,
                    bands,
                    int(30),
                )

                for r in band_rows:
                    gid = r["group_id"]
                    if gid not in seen:
                        seen.add(gid)
                        rows.append(r)

            # best 후보를 1개만 고르면(기존 로직) color_check 실패 시 대안이 없다.
            # score 기준 상위 K개만 모아서(작게) color_check를 통과하는 첫 후보를 선택한다.
            top_k = max(1, int(color_check_top_k or 1))
            scored: list[tuple[int, int, int | None, int, str | None]] = []

            for r in rows:
                cand_hex = r.get("phash_hex")
                cand_dh = r.get("dhash_hex")
                if not cand_hex:
                    continue
                if use_iname:
                    cand_iname = r.get("winner_iname")
                    # 후보 iname이 없거나 유사도가 낮으면 후보 제외
                    if _iname_similarity(iname, cand_iname) < float(iname_threshold):
                        continue
                try:
                    cand_hash = imagehash.hex_to_hash(cand_hex)
                    ph_dist = int(phash - cand_hash)
                    # 1차 강제 필터 (여기 추가)
                    if ph_dist >= threshold:
                        continue
                    dh_dist = None
                    if cand_dh:
                        dh_hash = imagehash.hex_to_hash(cand_dh)
                        dh_dist = int(dhash - dh_hash)
                        # dhash도 강제 필터
                        if dh_dist >= dhash_threshold:
                            continue
                    score = ph_dist + (dh_dist if dh_dist is not None else 999)
                except Exception:
                    continue

                cand_gid = r["group_id"]
                cand_img_url = None
                if need_gm:
                    try:
                        cand_img_url = r.get("winner_img_url")
                    except Exception:
                        cand_img_url = None

                # 상위 K개 유지
                try:
                    scored.append((int(score), int(ph_dist), int(dh_dist) if dh_dist is not None else None, int(cand_gid), str(cand_img_url) if cand_img_url else None))
                except Exception:
                    continue

            if scored:
                scored.sort(key=lambda x: x[0])
                if len(scored) > top_k:
                    scored = scored[:top_k]

                # color_check 통과하는 첫 후보를 선택
                cur_sig = _thumb_bw_sig_from_url(str(current_img_url)) if use_color and current_img_url else None
                chosen = None
                for score, ph_dist, dh_dist, gid, cimg in scored:
                    if use_color and current_img_url and cimg:
                        cand_sig = _thumb_bw_sig_from_url(str(cimg))
                        if not _thumb_bw_sig_is_similar(
                            cur_sig,
                            cand_sig,
                            max_mean_v_diff=float(color_max_mean_v_diff),
                            max_dark_ratio_diff=float(color_max_dark_ratio_diff),
                            max_bright_ratio_diff=float(color_max_bright_ratio_diff),
                        ):
                            continue
                    chosen = (score, ph_dist, dh_dist, gid)
                    break

                if chosen is not None:
                    score, ph_dist, dh_dist, gid = chosen
                    best_score = int(score)
                    best_ph_dist = int(ph_dist)
                    best_dh_dist = int(dh_dist) if dh_dist is not None else None
                    best_gid = int(gid)
        except Exception:
            # 컬럼 미존재 등 -> 후보검색 생략하고 신규 그룹 생성으로 진행
            best_gid = None
            best_score = None
            best_ph_dist = None
            best_dh_dist = None

    # 2) 임계치 이내면 기존 그룹
    # - phash는 threshold 이내
    # - dhash는 있으면 dhash_threshold 이내
    if (
        best_gid is not None
        and best_ph_dist is not None
        and best_ph_dist <= threshold
        and (best_dh_dist is None or best_dh_dist <= dhash_threshold)
    ):
        best_gid_i = int(best_gid)
        # 안전장치: FK 위반을 막기 위해 선택된 group_id가 실제로 존재하는지 확인
        if bool(verify_gid_exists):
            try:
                exists = await conn.fetchval(
                    "SELECT 1 FROM mlinkdw.shopprod_group2 WHERE group_id = $1",
                    best_gid_i,
                )
            except Exception:
                exists = 1
        else:
            exists = 1
        if exists:

            # print(f"found existing group_id by candidate search: {best_gid_i}")

            group_index_exact[key] = best_gid_i
            if bool(maintain_winner_tokens_on_existing_group):
                # ✅ 기존 그룹으로 편입될 케이스: moved item이 winner 조건이면 winner/token 정책을 즉시 적용
                try:
                    await _maybe_update_winner_and_tokens_for_group_insertion_asyncpg(
                        conn,
                        target_gid=int(best_gid_i),
                        moved_vender_code=current_vender_code,
                        moved_icode=current_icode,
                        moved_price=current_price,
                        moved_vender_grade=current_vender_grade,
                        moved_reg_date=current_reg_date,
                        moved_content_html=fallback_content_html,
                        moved_dome_code=current_dome_code,
                        moved_phash_hex=str(phash),
                        moved_dhash_hex=str(dhash),
                    )
                except Exception:
                    pass
            if bool(merge_moved_tokens_on_existing_group):
                # ✅ 추가: moved item의 content token을 TARGET winner 키에 누적(없는 token만)
                try:
                    await _merge_moved_item_tokens_into_target_winner_asyncpg(
                        conn,
                        target_gid=int(best_gid_i),
                        moved_content_html=fallback_content_html,
                        moved_dome_code=current_dome_code,
                    )
                except Exception:
                    pass
            if return_meta:
                return int(best_gid_i), False
            return best_gid_i
        # 존재하지 않으면 신규 생성으로 진행
        best_gid = None


    # print("step 4")



    # 2.5) 폴백: 이미지 후보가 없거나 임계치 밖이면, iname 토큰 후보군을 만들고
    # winner_content 이미지셋으로 검증하여 그룹을 선택한다(옵션).
    if (
        fallback_iname_tokens
        and iname
        and fallback_content_html
        and (best_gid is None)
    ):
        # 라이브러리/환경 준비가 안 되어 있으면 조용히 스킵(기존 동작 유지)
        if requests is not None:
            try:
                cands = await _candidate_groups_by_content_img_filenames(
                    conn,
                    content_html=str(fallback_content_html), 
                    limit=int(fallback_iname_token_limit),
                    max_imgs=int(fallback_content_max_imgs), 
                    max_filenames=int(fallback_iname_token_limit) if int(fallback_iname_token_limit) <= 2 else 4,
                )
            except Exception:
                cands = [] 

            # if not cands: 
            #     try:
            #         cands = await _candidate_groups_by_iname_tokens(
            #             conn,
            #             iname=str(iname), 
            #             limit=int(fallback_iname_token_limit),
            #             min_token_len=int(fallback_iname_min_token_len),
            #         )
            #     except Exception:
            #         cands = [] 

            best_fb_gid: int | None = None  # type: ignore[assignment]

            if cands: 
                # 후보 생성 단계에서의 hit_cnt(shared_cnt)를 tie-breaker로 사용
                cand_hit_cnt: dict[int, int] = {}
                try:
                    cand_hit_cnt = {int(gid): int(cnt) for gid, cnt in cands}
                except Exception:
                    cand_hit_cnt = {}

                # 너무 많은 후보에 대해 네트워크(이미지 fetch)를 하지 않도록 상위 일부만 검증
                cand_gids = [gid for gid, _ in cands][: min(10, len(cands))]
                rows = await conn.fetch(
                    """
                    SELECT
                      g.group_id,
                      gm.iname AS winner_iname,
                      gm.content AS winner_content
                    FROM mlinkdw.shopprod_group2 g
                    LEFT JOIN mlinkdw.shopprod_group_map2 gm ON gm.group_id = g.group_id
                     AND gm.vender_code = g.winner_vender_code
                     AND gm.icode = g.winner_icode
                    WHERE g.group_id = ANY($1::bigint[])
                    """,
                    cand_gids,
                ) 
 
                best_fb_score: tuple = None  # type: ignore[assignment]
                checked = 0
                passed = 0

                if rows is not None:
                    try:
                        fallback_pairs = _extract_hash_pairs_from_html(
                            str(fallback_content_html),
                            max_imgs=int(fallback_content_max_imgs),
                        )
                    except Exception:
                        fallback_pairs = []

                    if not fallback_pairs:
                        rows = []  # 검증 불가 -> 후보 전체 스킵

                # print("rows: ", len(rows))

                for r in rows:
                    try:
                        gid = int(r["group_id"])
                    except Exception:
                        continue
                    win_iname = r.get("winner_iname")
                    win_content = r.get("winner_content")

                    # print(f"win_iname: {win_iname} , iname: {iname} , iname_threshold: {iname_threshold}")

                    if iname_threshold and iname:
                        # print(f"iname_similarity: {_iname_similarity(iname, win_iname)}")
                        try:
                            if _iname_similarity(iname, win_iname) < float(iname_threshold):
                                continue
                        except Exception:
                            pass

                    # ✅ content 이미지셋 검증(템플릿 이미지 1~2장만 같은 케이스 방지)
                    # - fallback_content_min_matches / fallback_content_min_ratio 파라미터를 실제로 강제한다. 
                    try:
                        win_pairs = _extract_hash_pairs_from_html(
                            str(win_content or ""),
                            max_imgs=int(fallback_content_max_imgs),
                        ) 
                        st = _hash_pairs_match_stats(
                            fallback_pairs,
                            win_pairs,
                            ph_thr=int(fallback_content_ph_threshold),
                            dh_thr=int(fallback_content_dh_threshold),
                        )
                    except Exception:
                        st = {"err": "EXC"}
                    if st.get("err") is not None:
                        continue
                    try:
                        mcnt = int(st.get("match") or 0)
                        mratio = float(st.get("ratio") or 0.0)
                    except Exception:
                        mcnt = 0
                        mratio = 0.0

                    # print(f"mcnt: {mcnt}, mratio: {mratio}")

                    if mcnt < int(fallback_content_min_matches):
                        continue
                    if mratio < float(fallback_content_min_ratio):
                        continue

                    checked += 1
                    try:
                        name_sim = float(_iname_similarity(iname, win_iname)) if iname else 0.0
                    except Exception:
                        name_sim = 0.0
                    hit_cnt = int(cand_hit_cnt.get(int(gid), 0))
                    passed += 1

                    # 랭킹: name_sim -> hit_cnt -> (match, ratio)
                    score = (name_sim, hit_cnt, mcnt, mratio)
                    if best_fb_score is None or score > best_fb_score:
                        best_fb_score = score
                        best_fb_gid = gid 
                        
            # if not best_fb_gid:
            #     # ✅ 후보가 전혀 없으면(UI의 iname 검색과 동일한 전략으로) group_map 전체 상품명에서 후보 group_id를 직접 뽑는다.
            #     try:
            #         from iname_search import candidate_group_ids_by_iname_async

            #         cand_gids2 = await candidate_group_ids_by_iname_async(
            #             conn,
            #             iname=str(iname),
            #             exclude_icode=current_icode,
            #             min_ratio=70,   #배치실행은 95으로 강화
            #             limit=max(50, int(fallback_iname_token_limit)),
            #             min_token_len=int(fallback_iname_min_token_len),
            #             # 성능: pg_trgm 결과가 있는 경우에는 ILIKE(req-only) 추가검색을 생략한다.
            #             # 운영(pg_stat_statements)에서 trgm + ILIKE가 쌍으로 실행되며 디스크 read가 폭증하는 것이 관측됨.
            #             merge_ilike_when_trgm_empty_only=True,
            #             # 배치/폴백 성능: 토큰이 많은 상품명에서 ILIKE AND를 너무 길게 만들지 않도록
            #             # "앞 토큰 + 구체 토큰" 혼합으로 required를 고르고, SQL WHERE required 토큰 수를 캡한다.
            #             required_token_mode="head",
            #             hybrid_head_n=2,
            #             hybrid_specificity_n=1,
            #             max_req_tokens_sql=3,
            #             # out이 비었을 때도 ILIKE 폴백을 최소화: 숫자/치수 같은 구체 토큰이 있을 때만 수행
            #             ilike_fallback_when_trgm_empty=True,
            #             ilike_fallback_requires_digit_or_dim=True,  #true: 필수 토큰 중 숫자/치수 토큰이 있을 때만 폴백 수행(없으면 스킵)
            #         )
            #     except Exception:
            #         cand_gids2 = [] 

            #     if cand_gids2:
            #         print(f"iname={iname}, search hit group_id count={len(cand_gids2)}")
            #         try:    # iname_search 후보 그룹 조회 
            #             async with conn.transaction():
            #                 rows2 = await conn.fetch(
            #                     """
            #                     SELECT
            #                       g.group_id,
            #                       gm.iname AS winner_iname
            #                     FROM mlinkdw.shopprod_group2 g
            #                     LEFT JOIN mlinkdw.shopprod_group_map2 gm ON gm.group_id = g.group_id
            #                      AND gm.vender_code = g.winner_vender_code
            #                      AND gm.icode = g.winner_icode
            #                     WHERE g.group_id = ANY($1::bigint[])
            #                     """,
            #                     cand_gids2,
            #                 )
            #         except Exception:
            #             rows2 = []  

            #         best_fb_score2: tuple | None = None
            #         for r in rows2:
            #             try:
            #                 gid = int(r["group_id"])
            #             except Exception:
            #                 continue
            #             win_iname = r.get("winner_iname")
            #             try:
            #                 name_sim = float(_iname_similarity(iname, win_iname)) if iname else 0.0
            #             except Exception:
            #                 name_sim = 0.0
            #                 print(f"iname={iname}, search hit group_id={gid}, name_sim: {name_sim}")

            #             if name_sim < float(70):    #배치실행은 70으로 강화
            #                 continue
                        
            #             print(f"fallback iname candidate gid={gid}, name_sim: {name_sim}")

            #             score2 = (name_sim, 0)
            #             if best_fb_score2 is None or score2 > best_fb_score2:
            #                 best_fb_score2 = score2
            #                 best_fb_gid = gid


            if best_fb_gid is not None: 
                # 안전장치: 존재 확인 후 반환
                if bool(verify_gid_exists):
                    try:
                        async with conn.transaction():
                            exists = await conn.fetchval(
                                "SELECT 1 FROM mlinkdw.shopprod_group2 WHERE group_id = $1",
                                int(best_fb_gid),
                            )
                    except Exception:
                        exists = 1
                else:
                    exists = 1
                if exists: 
                    group_index_exact[key] = int(best_fb_gid)
                    if bool(maintain_winner_tokens_on_existing_group):
                        # ✅ 폴백 매칭으로 기존 그룹 편입: moved item winner/token 정책 적용
                        try:
                            await _maybe_update_winner_and_tokens_for_group_insertion_asyncpg(
                                conn,
                                target_gid=int(best_fb_gid),
                                moved_vender_code=current_vender_code,
                                moved_icode=current_icode,
                                moved_price=current_price,
                                moved_vender_grade=current_vender_grade,
                                moved_reg_date=current_reg_date,
                                moved_content_html=fallback_content_html,
                                moved_dome_code=current_dome_code,
                            )
                        except Exception:
                            pass
                    if bool(merge_moved_tokens_on_existing_group):
                        # ✅ 추가: moved item의 content token을 TARGET winner 키에 누적(없는 token만)
                        try:
                            await _merge_moved_item_tokens_into_target_winner_asyncpg(
                                conn,
                                target_gid=int(best_fb_gid),
                                moved_content_html=fallback_content_html,
                                moved_dome_code=current_dome_code,
                            )
                        except Exception:
                            pass
                    if return_meta:
                        return int(best_fb_gid), False
                    return int(best_fb_gid) 

    # print("step 5")

    # 3) 신규 그룹 생성 (멀티 프로세스/멀티 서버 경쟁조건 대비)
    # - group_key UNIQUE 충돌 시 예외로 트랜잭션이 abort되면 이후 SQL이 모두 실패하므로,
    #   반드시 ON CONFLICT로 안전하게 처리하고, 충돌 시 기존 group_id를 조회해 반환한다.
    bands = bands_from_phash_hex(ph_hex)
    bucket1, bucket2 = dual_buckets_from_phash_hex(ph_hex)
    async def _create_or_get_gid_once() -> int:
        row = None
        try:
            row = await conn.fetchrow(
                """
                INSERT INTO mlinkdw.shopprod_group2 (
                  group_key, phash_hex, dhash_hex, phash_bands, phash_bucket, phash_bucket2,
                  winner_icode, winner_price, winner_vender_code, winner_updated_at
                )
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, now())
                ON CONFLICT (group_key) DO NOTHING
                RETURNING group_id
                """,
                group_key,
                ph_hex,
                dh_hex,
                bands if bands else None,
                bucket1,
                bucket2,
                (str(current_icode) if current_icode is not None else None),
                current_price,
                (str(current_vender_code) if current_vender_code is not None else None),
            )
        except Exception:
            # 스키마/컬럼 미존재 등 fallback (기존 방식)에서도 ON CONFLICT를 적용
            try:
                row = await conn.fetchrow(
                    """
                    INSERT INTO mlinkdw.shopprod_group2 
                    (group_key, phash_hex, dhash_hex, phash_bands, phash_bucket, phash_bucket2, 
                    winner_icode, winner_price, winner_vender_code, winner_updated_at)
                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, now())
                    ON CONFLICT (group_key) DO NOTHING
                    RETURNING group_id
                    """,
                    group_key,
                    ph_hex,
                    dh_hex,
                    bands,
                    bucket1,
                    bucket2,
                    (str(current_icode) if current_icode is not None else None),
                    current_price,
                    (str(current_vender_code) if current_vender_code is not None else None),
                )
            except Exception:
                row = await conn.fetchrow(
                    """
                    INSERT INTO mlinkdw.shopprod_group2 
                    (group_key, phash_hex, dhash_hex, phash_bands, phash_bucket, phash_bucket2, 
                    winner_icode, winner_price, winner_vender_code, winner_updated_at) 
                    VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, now())
                    ON CONFLICT (group_key) DO NOTHING
                    RETURNING group_id
                    """,
                    group_key,
                    ph_hex,
                    dh_hex,
                    bands,
                    bucket1,
                    bucket2,
                    (str(current_icode) if current_icode is not None else None),
                    current_price,
                    (str(current_vender_code) if current_vender_code is not None else None),
                )

        if row is not None:
            return int(row["group_id"])

        gid_existing = await conn.fetchval(
            "SELECT group_id FROM mlinkdw.shopprod_group2 WHERE group_key = $1 LIMIT 1",
            group_key,
        )
        if gid_existing:
            return int(gid_existing)
        raise RuntimeError(f"failed to create/find group_id for group_key={group_key!r}")

    # deadlock/serialization 오류는 SAVEPOINT로 격리 후 재시도
    max_retry = 4
    last_exc = None
    for attempt in range(max_retry + 1):
        try:
            # nested transaction => SAVEPOINT
            async with conn.transaction():
                group_id = await _create_or_get_gid_once()
            break
        except Exception as e:
            last_exc = e
            name = type(e).__name__
            is_retryable = name in ("DeadlockDetectedError", "SerializationError")
            if not is_retryable or attempt >= max_retry:
                raise
            # 지수 백오프 + 지터
            await asyncio.sleep(0.05 * (2**attempt) + random.random() * 0.05)
    else:
        raise last_exc  # pragma: no cover

    # 신규 그룹 생성 직후: content token도 즉시 생성/삽입(가능한 경우에만)
    try:
        html_for_tok = fallback_content_html
        if html_for_tok and current_vender_code and current_icode and current_dome_code:
            toks = _extract_img_filenames_from_html(
                html_for_tok,
                max_imgs=12,
                max_filenames=8,
                min_fname_len=6,
            )
            if toks:
                insert_sql = """
                    INSERT INTO mlinkdw.shopprod_group_content_token
                    (group_id, vender_code, icode, token, dome_code)
                    VALUES ($1,$2,$3,$4,$5)
                    ON CONFLICT (group_id, vender_code, icode, token)
                    DO UPDATE SET dome_code = EXCLUDED.dome_code
                """
                rows_to_insert = [
                    (
                        int(group_id),
                        str(current_vender_code),
                        str(current_icode),
                        str(t),
                        str(current_dome_code),
                    )
                    for t in toks
                ]
                await conn.executemany(insert_sql, rows_to_insert)
    except Exception:
        # token table/권한/스키마 미존재 등은 본 흐름(그룹 생성)을 막지 않는다.
        pass

    group_index_exact[key] = group_id
    if return_meta:
        return int(group_id), True
    return group_id


def find_or_create_group_id_sync(
    conn,
    group_index_exact: Dict[Tuple[str, str], int],
    phash,
    dhash,
    *,
    threshold: int = 8,
    dhash_threshold: int = 10,
    candidate_limit: int = 30,
    # --- 오탐 방지 옵션(기본은 기존 동작 유지) ---
    iname: str | None = None,
    iname_threshold: float = 50.0,
    # --- 폴백: iname 토큰 후보 + winner_content 이미지셋 검증 ---
    fallback_iname_tokens: bool = False,
    # fallback_iname_dome_code: str | None = None,
    fallback_iname_token_limit: int = 50,
    fallback_iname_min_token_len: int = 2,
    fallback_content_html: str | None = None,
    fallback_content_max_imgs: int = 12,
    fallback_content_ph_threshold: int = 8,
    fallback_content_dh_threshold: int = 10,
    fallback_content_min_matches: int = 1,
    fallback_content_min_ratio: float = 0.80,
    blocked_bands: Set[int] | None = None,
    blocked_phash_hex: Set[str] | None = None,
    blocked_dhash_hex: Set[str] | None = None,
    skip_low_entropy: bool = True,
    low_entropy_max_unique: int = 3,
    # --- 신규 그룹 생성 시 winner 필드/컨텐츠 토큰 즉시 반영 ---
    current_vender_code: str | None = None,
    current_icode: str | int | None = None,
    current_price: int | float | None = None,
    current_vender_grade: str | int | None = None,
    current_reg_date=None,
    current_dome_code: str | None = None,
    # current_content_html: str | None = None,
    # --- 추가: 썸네일 "흑/백만 다른" 케이스 방지(밝기 분포 체크) ---
    current_img_url: str | None = None,
    color_check: bool = False,
    color_check_top_k: int = 5,
    color_max_mean_v_diff: float = 40.0,
    color_max_dark_ratio_diff: float = 0.25,
    color_max_bright_ratio_diff: float = 0.25,
    manage_tx: bool = True,
) -> int:
    """
    psycopg2 등 DB-API(동기) Connection 기준으로 group_id를 찾거나 생성.
    - UI(SearchGroupPositionUI_FullIntegrated.py)에서 asyncpg 없이 동일 로직을 재사용하기 위한 wrapper.
    - 핵심 로직은 async 버전(find_or_create_group_id)과 동일: bands overlap 후보 -> 해밍거리 -> 신규생성
    """
    ph_hex = str(phash)
    dh_hex = str(dhash)
    key = (ph_hex, dh_hex)

    cur = conn.cursor()

    # 0) exact 캐시
    if key in group_index_exact:
        cached_gid = int(group_index_exact[key])
        try:
            cur.execute("SELECT 1 FROM mlinkdw.shopprod_group2 WHERE group_id = %s", (cached_gid,))
            exists = cur.fetchone()
        except Exception:
            exists = (1,)
        if exists:
            return cached_gid
        try:
            del group_index_exact[key]
        except Exception:
            pass

    # low-entropy 안전장치
    if skip_low_entropy and (
        _is_low_entropy_hex64(ph_hex, max_unique_nibbles=low_entropy_max_unique)
        or _is_low_entropy_hex64(dh_hex, max_unique_nibbles=low_entropy_max_unique)
    ):
        bands = []
    else:
        bands = bands_from_phash_hex(ph_hex)    #32bit band 분리

    if blocked_bands and bands:
        bands = [b for b in bands if b not in blocked_bands]

    best_gid = None
    best_score = None
    best_ph_dist = None
    best_dh_dist = None

    # 1) 후보 검색(band overlap) + 거리계산
    if bands:
        try:
            use_iname = bool(iname_threshold and iname)
            use_color = bool(color_check and current_img_url)
            need_gm = bool(use_iname or use_color)
            if need_gm:
                cur.execute(
                    """
                    SELECT
                      g.group_id,
                      g.phash_hex,
                      g.dhash_hex,
                      gm.iname AS winner_iname,
                      gm.img_url AS winner_img_url
                    FROM mlinkdw.shopprod_group2 g
                    LEFT JOIN mlinkdw.shopprod_group_map2 gm
                      ON gm.group_id = g.group_id
                     AND gm.vender_code = g.winner_vender_code
                     AND gm.icode = g.winner_icode
                    WHERE g.phash_bands && %s::int4[]
                    LIMIT %s
                    """,
                    (bands, int(candidate_limit)),
                )
            else:
                cur.execute(
                    """
                    SELECT group_id, phash_hex, dhash_hex
                    FROM mlinkdw.shopprod_group2
                    WHERE phash_bands && %s::int4[]
                    LIMIT %s
                    """,
                    (bands, int(candidate_limit)),
                )

            rows = cur.fetchall()
            top_k = max(1, int(color_check_top_k or 1))
            scored: list[tuple[int, int, int | None, int, str | None]] = []
            for r in rows:
                # DictCursor일 수도 있고 tuple일 수도 있음
                try:
                    cand_hex = r["phash_hex"]
                    cand_dh = r.get("dhash_hex")
                    gid = r["group_id"]
                    cand_iname = r.get("winner_iname") if use_iname else None
                    cand_img_url = r.get("winner_img_url") if need_gm else None
                except Exception:
                    gid = r[0]
                    cand_hex = r[1] if len(r) > 1 else None
                    cand_dh = r[2] if len(r) > 2 else None
                    cand_iname = r[3] if (use_iname and len(r) > 3) else None
                    cand_img_url = r[4] if (need_gm and len(r) > 4) else None

                if not cand_hex:
                    continue
                if blocked_phash_hex and str(cand_hex) in blocked_phash_hex:
                    continue
                if blocked_dhash_hex and cand_dh and str(cand_dh) in blocked_dhash_hex:
                    continue
                if use_iname:
                    if _iname_similarity(iname, cand_iname) < float(iname_threshold):
                        continue
                try:
                    cand_hash = imagehash.hex_to_hash(str(cand_hex))
                    ph_dist = int(phash - cand_hash) 
                    if ph_dist >= int(threshold):
                        continue
                    dh_dist = None
                    if cand_dh:
                        dh_hash = imagehash.hex_to_hash(str(cand_dh))
                        dh_dist = int(dhash - dh_hash) 
                        if dh_dist >= int(dhash_threshold):
                            continue
                    score = ph_dist + (dh_dist if dh_dist is not None else 999)
                except Exception:
                    continue
                try:
                    scored.append((int(score), int(ph_dist), int(dh_dist) if dh_dist is not None else None, int(gid), str(cand_img_url) if cand_img_url else None))
                except Exception:
                    continue

            if scored:
                scored.sort(key=lambda x: x[0])
                if len(scored) > top_k:
                    scored = scored[:top_k]

                cur_sig = _thumb_bw_sig_from_url(str(current_img_url)) if use_color and current_img_url else None
                chosen = None
                for score, ph_dist, dh_dist, gid, cimg in scored:
                    if use_color and current_img_url and cimg:
                        cand_sig = _thumb_bw_sig_from_url(str(cimg))
                        if not _thumb_bw_sig_is_similar(
                            cur_sig,
                            cand_sig,
                            max_mean_v_diff=float(color_max_mean_v_diff),
                            max_dark_ratio_diff=float(color_max_dark_ratio_diff),
                            max_bright_ratio_diff=float(color_max_bright_ratio_diff),
                        ):
                            continue
                    chosen = (score, ph_dist, dh_dist, gid)
                    break

                if chosen is not None:
                    score, ph_dist, dh_dist, gid = chosen
                    best_score = int(score)
                    best_ph_dist = int(ph_dist)
                    best_dh_dist = int(dh_dist) if dh_dist is not None else None
                    best_gid = int(gid)
        except Exception:
            best_gid = None
            best_score = None
            best_ph_dist = None
            best_dh_dist = None

    # 2) 임계치 이내면 기존 그룹
    if (
        best_gid is not None
        and best_ph_dist is not None
        and best_ph_dist <= int(threshold)
        and (best_dh_dist is None or best_dh_dist <= int(dhash_threshold))
    ):
        try:
            cur.execute("SELECT 1 FROM mlinkdw.shopprod_group2 WHERE group_id = %s", (int(best_gid),))
            exists = cur.fetchone()
        except Exception:
            exists = (1,)
        if exists:
            group_index_exact[key] = int(best_gid)
            return int(best_gid)

    # 2.5) 폴백: 이미지 후보가 없거나 임계치 밖이면, iname 토큰 후보군을 만들고
    # winner_content 이미지셋으로 검증하여 그룹을 선택한다(옵션).
    if (
        fallback_iname_tokens
        and iname
        and (best_gid is None)
    ):
        # 라이브러리/환경 준비가 안 되어 있으면 조용히 스킵(기존 동작 유지)
        if Image is not None and ImageOps is not None and ImageFilter is not None and requests is not None:
            
            try:
                cands = _candidate_groups_by_content_img_filenames_sync(
                    conn,
                    content_html=str(fallback_content_html), 
                    limit=int(fallback_iname_token_limit),
                    max_imgs=int(fallback_content_max_imgs), 
                    max_filenames=int(fallback_iname_token_limit) if int(fallback_iname_token_limit) <= 2 else 4,
                )
            except Exception:
                cands = []

            # if not cands: 
            #     try:
            #         cands = await _candidate_groups_by_iname_tokens_sync(
            #             conn,
            #             iname=str(iname),
            #             limit=int(fallback_iname_token_limit),
            #             min_token_len=int(fallback_iname_min_token_len),
            #         )
            #     except Exception:
            #         cands = [] 

            best_fb_gid: int | None = None  # type: ignore[assignment]
            if cands:  
                cand_hit_cnt: dict[int, int] = {}
                try:
                    cand_hit_cnt = {int(gid): int(cnt) for gid, cnt in cands}
                except Exception:
                    cand_hit_cnt = {}

                cand_gids = [int(gid) for gid, _ in cands] 
                try:
                    cur_fb = conn.cursor()
                    cur_fb.execute(
                        """
                        SELECT
                          g.group_id,
                          gm.iname AS winner_iname,
                          gm.content AS winner_content
                        FROM mlinkdw.shopprod_group2 g
                        LEFT JOIN mlinkdw.shopprod_group_map2 gm ON gm.group_id = g.group_id
                         AND gm.vender_code = g.winner_vender_code
                         AND gm.icode = g.winner_icode
                        WHERE g.group_id = ANY(%s::bigint[])
                        """,
                        (cand_gids,),
                    )
                    rows = cur_fb.fetchall()
                except Exception:
                    rows = [] 

                best_fb_gid: int | None = None
                best_fb_score: tuple = None  # type: ignore[assignment]
                checked = 0
                passed = 0

                if rows is not None:
                    try:
                        fallback_pairs = _extract_hash_pairs_from_html(
                            str(fallback_content_html),
                            max_imgs=int(fallback_content_max_imgs),
                        )
                    except Exception:
                        fallback_pairs = []

                    if not fallback_pairs:
                        rows = []  # 검증 불가 -> 후보 전체 스킵

                for r in rows:
                    try:
                        # DictCursor / tuple 모두 대응
                        try:
                            gid = int(r[0])
                            win_iname = r[1] if len(r) > 1 else None
                            win_content = r[2] if len(r) > 2 else None
                        except Exception:
                            gid = int(r["group_id"])
                            win_iname = r.get("winner_iname")
                            win_content = r.get("winner_content")
                    except Exception:
                        continue

                    # iname_threshold가 켜져 있으면 여기서도 동일하게 필터링
                    if iname_threshold and iname:
                        try:
                            if _iname_similarity(iname, win_iname) < float(iname_threshold):
                                continue
                        except Exception:
                            pass
                    
                    # ✅ content 이미지셋 검증(템플릿 이미지 1~2장만 같은 케이스 방지)
                    # - fallback_content_min_matches / fallback_content_min_ratio 파라미터를 실제로 강제한다.
                    try:
                        win_pairs = _extract_hash_pairs_from_html(
                            str(win_content or ""),
                            max_imgs=int(fallback_content_max_imgs),
                        ) 
                        st = _hash_pairs_match_stats(
                            fallback_pairs,
                            win_pairs,
                            ph_thr=int(fallback_content_ph_threshold),
                            dh_thr=int(fallback_content_dh_threshold),
                        )
                    except Exception:
                        st = {"err": "EXC"}
                    if st.get("err") is not None:
                        continue
                    try:
                        mcnt = int(st.get("match") or 0)
                        mratio = float(st.get("ratio") or 0.0)
                    except Exception:
                        mcnt = 0
                        mratio = 0.0
                    if mcnt < int(fallback_content_min_matches):
                        continue
                    if mratio < int(fallback_content_min_ratio):
                        continue

                    checked += 1
                    try:
                        name_sim = float(_iname_similarity(iname, win_iname)) if iname else 0.0
                    except Exception:
                        name_sim = 0.0
                    hit_cnt = int(cand_hit_cnt.get(int(gid), 0))
                    passed += 1

                    # 랭킹: name_sim -> hit_cnt
                    score = (name_sim, hit_cnt)
                    if best_fb_score is None or score > best_fb_score:
                        best_fb_score = score
                        best_fb_gid = gid 

            # if not best_fb_gid:
            #     # ✅ 후보가 전혀 없으면(UI의 iname 검색과 동일한 전략으로) group_map 전체 상품명에서 후보 group_id를 직접 뽑는다.
            #     try:
            #         from iname_search import candidate_group_ids_by_iname_sync

            #         cand_gids2 = candidate_group_ids_by_iname_sync(
            #             conn,
            #             iname=str(iname),
            #             exclude_icode=current_icode,
            #             min_ratio=int(float(iname_threshold)) if (iname_threshold and str(iname_threshold).strip()) else 70,
            #             limit=max(50, int(fallback_iname_token_limit)),
            #             min_token_len=int(fallback_iname_min_token_len),
            #             winner_only=True,
            #             required_token_mode="head",
            #             hybrid_head_n=2,
            #             hybrid_specificity_n=1,
            #             max_req_tokens_sql=3,
            #         )
            #     except Exception:
            #         cand_gids2 = []

            #     if cand_gids2:
            #         try:
            #             cur_fb = conn.cursor()
            #             cur_fb.execute(
            #                 """
            #                 SELECT
            #                   g.group_id,
            #                   gm.iname AS winner_iname
            #                 FROM mlinkdw.shopprod_group2 g
            #                 LEFT JOIN mlinkdw.shopprod_group_map2 gm ON gm.group_id = g.group_id
            #                  AND gm.vender_code = g.winner_vender_code
            #                  AND gm.icode = g.winner_icode
            #                 WHERE g.group_id = ANY(%s::bigint[])
            #                 """,
            #                 (list(cand_gids2),),
            #             )
            #             rows2 = cur_fb.fetchall()
            #         except Exception:
            #             rows2 = []

            #         best_fb_score2: tuple | None = None
            #         for r in rows2:
            #             try:
            #                 gid = int(r[0])
            #             except Exception:
            #                 continue
            #             win_iname = r[1] if (len(r) > 1) else None
            #             try:
            #                 name_sim = float(_iname_similarity(iname, win_iname)) if iname else 0.0
            #             except Exception:
            #                 name_sim = 0.0
            #             score2 = (name_sim, 0)
            #             if best_fb_score2 is None or score2 > best_fb_score2:
            #                 best_fb_score2 = score2
            #                 best_fb_gid = gid


            if best_fb_gid is not None: 
                # 안전장치: 존재 확인 후 반환
                try:
                    cur_ex = conn.cursor()
                    cur_ex.execute("SELECT 1 FROM mlinkdw.shopprod_group2 WHERE group_id = %s", (int(best_fb_gid),))
                    exists = cur_ex.fetchone()
                except Exception:
                    exists = (1,)
                if exists:
                    group_index_exact[key] = int(best_fb_gid)
                    return int(best_fb_gid) 

    # 3) 신규 그룹 생성
    group_key = f"{ph_hex}_{dh_hex}"
    try:
        # 외부(배치) 트랜잭션에서 호출될 수 있어, 내부에서 COMMIT/ROLLBACK을 하면
        # 호출자 savepoint/트랜잭션이 깨진다. manage_tx=False면 savepoint로 격리한다.
        sp_name = "sp_dgm_new_group"
        if manage_tx:
            cur.execute("BEGIN")
        else:
            try:
                cur.execute(f"SAVEPOINT {sp_name}")
            except Exception:
                # 호출자가 이미 트랜잭션을 열지 않은 경우를 대비
                cur.execute("BEGIN")
                cur.execute(f"SAVEPOINT {sp_name}")
        try:
            cur.execute(
                """
                INSERT INTO mlinkdw.shopprod_group2 (
                  group_key, phash_hex, dhash_hex, phash_bands,
                  winner_icode, winner_price, winner_vender_code, winner_updated_at
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, now())
                RETURNING group_id
                """,
                (
                    group_key,
                    ph_hex,
                    dh_hex,
                    bands if bands else None,
                    (str(current_icode) if current_icode is not None else None),
                    current_price,
                    (str(current_vender_code) if current_vender_code is not None else None),
                ),
            )
            row = cur.fetchone()
        except Exception:
            # 스키마가 아직 준비 안 된 경우 fallback (기존 방식)
            try:
                cur.execute(
                    """
                    INSERT INTO mlinkdw.shopprod_group2 (group_key, phash_hex, dhash_hex, phash_bands)
                    VALUES (%s, %s, %s, %s)
                    RETURNING group_id
                    """,
                    (group_key, ph_hex, dh_hex, bands if bands else None),
                )
                row = cur.fetchone()
            except Exception:
                cur.execute(
                    """
                    INSERT INTO mlinkdw.shopprod_group2 (group_key)
                    VALUES (%s)
                    RETURNING group_id
                    """,
                    (group_key,),
                )
                row = cur.fetchone()

        if not row:
            raise RuntimeError("신규 그룹 생성 실패(RETURNING group_id 없음)")
        group_id = int(row[0] if not isinstance(row, dict) else row.get("group_id"))

        # 신규 그룹 생성 직후: content token도 즉시 생성/삽입(가능한 경우에만)
        try:
            html_for_tok = fallback_content_html
            if html_for_tok and current_vender_code and current_icode and current_dome_code:
                toks = _extract_img_filenames_from_html(
                    str(html_for_tok),
                    max_imgs=12,
                    max_filenames=8,
                    min_fname_len=6,
                )
                if toks:
                    insert_sql = """
                        INSERT INTO mlinkdw.shopprod_group_content_token
                        (group_id, vender_code, icode, token, dome_code)
                        VALUES (%s,%s,%s,%s,%s)
                        ON CONFLICT (group_id, vender_code, icode, token)
                        DO UPDATE SET dome_code = EXCLUDED.dome_code
                    """
                    for t in toks:
                        cur.execute(
                            insert_sql,
                            (int(group_id), str(current_vender_code), str(current_icode), str(t), str(current_dome_code)),
                        )
        except Exception:
            pass

        if manage_tx:
            cur.execute("COMMIT")
        else:
            # 내부 savepoint 정리(트랜잭션은 호출자가 커밋)
            try:
                cur.execute(f"RELEASE SAVEPOINT {sp_name}")
            except Exception:
                pass
    except Exception:
        if manage_tx:
            try:
                conn.rollback()
            except Exception:
                pass
        else:
            # 오류 발생 시: savepoint로만 롤백해서 외부 트랜잭션을 살린다.
            try:
                cur2 = conn.cursor()
                cur2.execute("ROLLBACK TO SAVEPOINT sp_dgm_new_group")
                cur2.execute("RELEASE SAVEPOINT sp_dgm_new_group")
            except Exception:
                # savepoint 사용 불가한 상태면 호출자에게 예외 전달(외부에서 롤백 처리)
                pass
        # group_key가 이미 있으면 기존 그룹 사용
        cur = conn.cursor()
        cur.execute("SELECT group_id FROM mlinkdw.shopprod_group2 WHERE group_key = %s LIMIT 1", (group_key,))
        r2 = cur.fetchone()
        if not r2:
            raise
        group_id = int(r2[0] if not isinstance(r2, dict) else r2.get("group_id"))

    group_index_exact[key] = int(group_id)
    return int(group_id)

