#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
그룹 자동 병합 배치 (winner 상품명(iname) 기준)

목표
- public.mv_winner_iname(group_id, iname)에서 동일 iname으로 분리된 group들을 찾아 병합한다.
- 병합 동작은 UI/이미지 배치와 동일하게:
  - shopprod_group_map.group_id 를 TARGET으로 이동
  - shopprod_sub_group.group_id 를 TARGET으로 이동
  - SOURCE 그룹이 비면 shopprod_group에서 삭제 시도(실패해도 무시)

주의(운영 안전)
- iname이 같다고 해서 항상 같은 상품이 아님(일반명/카테고리명 등). 기본은 --dry-run.
- 기본 안전장치:
  - 클러스터 크기 상한(--max-cluster-size)
  - (선택) winner 해시 유사도 요구(--require-hash)
  - (선택) winner img_url 분산 비율이 높으면 스킵(--skip-if-url-distinct-ratio)

환경
- 프로젝트에 이미 포함된 psycopg2 기반(Windows에서도 바로 실행 가능).
"""

from __future__ import annotations

import argparse
import difflib
import importlib
import os
import re
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable

import psycopg2
from psycopg2.extras import DictCursor
import requests
from functools import lru_cache

from db_config import DB_INFO_PSYCOPG2 as DB_INFO

try:
    # optional dependency (content/image hashing)
    _pil = importlib.import_module("PIL.Image")
    _pil_ops = importlib.import_module("PIL.ImageOps")
    _pil_filter = importlib.import_module("PIL.ImageFilter")
    _imagehash = importlib.import_module("imagehash")

    Image = _pil
    ImageOps = _pil_ops
    ImageFilter = _pil_filter
    imagehash = _imagehash
except Exception:
    Image = None
    ImageOps = None
    ImageFilter = None
    imagehash = None


@lru_cache(maxsize=4096)
def _fetch_http_bytes(url: str, *, timeout: int = 10) -> bytes | None:
    """
    http/https 리소스 다운로드 헬퍼.
    - 일부 CDN(예: alicdn)은 기본 요청을 차단(420/403)하므로 UA/Referer를 강제한다.
    - 너무 큰 파일(10MB 초과)은 메모리/시간 문제로 스킵.
    """
    u = str(url or "").strip()
    if not u:
        return None
    if u.startswith("//"):
        u = "https:" + u
    if not (u.startswith("http://") or u.startswith("https://")):
        return None
    try:
        try:
            m = re.match(r"^(https?://[^/]+)", u, flags=re.IGNORECASE)
            referer = (m.group(1) + "/") if m else "https://www.alibaba.com/"
        except Exception:
            referer = "https://www.alibaba.com/"

        headers = {
            "User-Agent": "Mozilla/5.0",
            "Referer": referer,
            "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
        }
        r = requests.get(u, timeout=int(timeout), headers=headers)
        r.raise_for_status()
        data = bytes(r.content)
        if len(data) > 10 * 1024 * 1024:
            return None
        return data
    except Exception:
        return None


def _extract_img_urls_from_html(html_text: str | None, *, limit: int = 20) -> list[str]:
    raw = str(html_text or "")
    srcs = re.findall(r"""<img[^>]+src\s*=\s*(?:(["'])(.*?)\1|([^'"\s>]+))""", raw, flags=re.IGNORECASE)
    urls = []
    for q, a, b in srcs:
        u = (a or b or "").strip()
        if not u:
            continue
        if u.startswith("//"):
            u = "https:" + u
        if u.startswith("http://") or u.startswith("https://"):
            urls.append(u)
    seen = set()
    out = []
    for u in urls:
        if u in seen:
            continue
        seen.add(u)
        out.append(u)
        if len(out) >= int(limit):
            break
    return out


@lru_cache(maxsize=4096)
def _image_hash_pair_from_url(img_url: str):
    """
    URL -> bytes(fetch with headers) -> PIL -> (phash, dhash)
    """
    if not img_url or not (Image and ImageOps and ImageFilter and imagehash):
        return None
    data = _fetch_http_bytes(str(img_url), timeout=10)
    if not data:
        return None
    try:
        from io import BytesIO

        bio = BytesIO(data)
        img = Image.open(bio)
        try:
            img.verify()
        except Exception:
            pass
        bio.seek(0)
        img = Image.open(bio)
        img = ImageOps.exif_transpose(img)
        # palette(P) + transparency 경고/품질 이슈 회피
        try:
            if getattr(img, "mode", None) == "P" and "transparency" in getattr(img, "info", {}):
                img = img.convert("RGBA")
        except Exception:
            pass
        img = img.convert("RGB")

        img = img.filter(ImageFilter.GaussianBlur(radius=0.6))
        resampling = getattr(Image, "Resampling", None)
        method = resampling.LANCZOS if resampling else getattr(Image, "LANCZOS", 1)
        img = ImageOps.fit(img, (256, 256), method=method)

        ph = imagehash.phash(img)  # ImageHash
        dh = imagehash.dhash(img)  # ImageHash
        return (ph, dh)
    except Exception:
        return None


def _content_imgset_match_stats(
    html_a: str | None,
    html_b: str | None,
    *,
    max_imgs: int = 12,
    ph_thr: int = 12,
    dh_thr: int = 14,
) -> dict:
    """
    content 내 이미지 세트 간 매칭 기반 판단.
    - A/B 각각 최대 max_imgs개 이미지 해시(ph/dh)를 계산
    - B의 각 이미지가 A의 어떤 이미지와도 (ph<=thr and dh<=thr)면 match로 카운트
    """
    if imagehash is None or Image is None:
        return {"err": "NO_LIB"}
    urls_a = _extract_img_urls_from_html(html_a, limit=max_imgs)
    urls_b = _extract_img_urls_from_html(html_b, limit=max_imgs)
    if not urls_a or not urls_b:
        return {"err": "NO_IMG", "na": len(urls_a), "nb": len(urls_b)}

    pairs_a = []
    for u in urls_a:
        p = _image_hash_pair_from_url(u)
        if p:
            pairs_a.append(p)
    pairs_b = []
    for u in urls_b:
        p = _image_hash_pair_from_url(u)
        if p:
            pairs_b.append(p)

    if not pairs_a or not pairs_b:
        return {"err": "HASH_FAIL", "na": len(pairs_a), "nb": len(pairs_b)}

    match = 0
    min_ph = None
    min_dh = None
    for pb in pairs_b:
        best_ok = False
        for pa in pairs_a:
            try:
                ph = int(pa[0] - pb[0])
                dh = int(pa[1] - pb[1])
            except Exception:
                continue
            if min_ph is None or ph < min_ph:
                min_ph = ph
            if min_dh is None or dh < min_dh:
                min_dh = dh
            if ph <= ph_thr and dh <= dh_thr:
                best_ok = True
                break
        if best_ok:
            match += 1

    denom = min(len(pairs_a), len(pairs_b))
    ratio = (match / float(denom)) if denom > 0 else 0.0
    return {
        "err": None,
        "na": len(pairs_a),
        "nb": len(pairs_b),
        "match": match,
        "ratio": ratio,
        "min_ph": min_ph,
        "min_dh": min_dh,
        "ph_thr": ph_thr,
        "dh_thr": dh_thr,
    }


def _fmt_hms(seconds: float) -> str:
    try:
        s = int(seconds)
    except Exception:
        s = 0
    if s < 0:
        s = 0
    h = s // 3600
    m = (s % 3600) // 60
    ss = s % 60
    return f"{h:02d}:{m:02d}:{ss:02d}"


def _default_log_file_name() -> str:
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    return f"Batch_MergeGroups_ByIname_{ts}.log"


def _spawn_background(argv: list[str], log_file: str) -> subprocess.Popen:
    """
    Ubuntu/Linux: setsid(start_new_session=True)로 세션을 분리해서 백그라운드 실행.
    stdout/stderr는 log_file로 리다이렉트.
    """
    log_path = os.path.abspath(log_file)
    os.makedirs(os.path.dirname(log_path) or ".", exist_ok=True)
    f = open(log_path, "a", buffering=1, encoding="utf-8")
    return subprocess.Popen(
        argv,
        stdout=f,
        stderr=subprocess.STDOUT,
        close_fds=True,
        start_new_session=True,
    )


def _hamming64_hex(a: str | None, b: str | None) -> int:
    if not a or not b:
        return 10**9
    try:
        x = int(str(a), 16) ^ int(str(b), 16)
        return x.bit_count()
    except Exception:
        return 10**9


def _norm_img_url(u: str | None) -> str:
    if not u:
        return ""
    u = str(u).strip()
    if not u:
        return ""
    u = u.split("#", 1)[0]
    u = u.split("?", 1)[0]
    m = re.match(r"^(https?://)([^/]+)(/.*)?$", u, flags=re.IGNORECASE)
    if m:
        scheme = m.group(1).lower()
        host = (m.group(2) or "").lower()
        rest = m.group(3) or ""
        return f"{scheme}{host}{rest}"
    return u


def _norm_iname(s: str | None) -> str:
    if not s:
        return ""
    s = str(s).strip().lower()
    if not s:
        return ""
    s = re.sub(r"\s+", " ", s)
    return s


def _iname_similarity(a: str | None, b: str | None) -> float:
    a2 = _norm_iname(a)
    b2 = _norm_iname(b)
    if not a2 or not b2:
        return 0.0
    return difflib.SequenceMatcher(None, a2, b2).ratio() * 100.0


@dataclass
class GroupMeta:
    group_id: int
    created_at: datetime | None
    winner_price: int | None
    winner_vender_grade: int | None
    winner_img_url: str | None
    winner_iname: str | None
    phash_hex: str | None
    dhash_hex: str | None
    winner_content: str | None


def fetch_group_meta(conn, group_ids: Iterable[int]) -> dict[int, GroupMeta]:
    gids_all = list({int(x) for x in group_ids})
    if not gids_all:
        return {}

    m: dict[int, GroupMeta] = {}
    CHUNK = 50000
    for off in range(0, len(gids_all), CHUNK):
        gids = gids_all[off : off + CHUNK]
        cur = conn.cursor(cursor_factory=DictCursor)
        cur.execute(
            """
            SELECT
              g.group_id,
              g.created_at,
              g.winner_price,
              CASE
                WHEN gm.vender_grade ~ '^[0-9]+$' THEN gm.vender_grade::int
                ELSE NULL
              END AS winner_vender_grade,
              gm.img_url AS winner_img_url,
              gm.iname   AS winner_iname,
              g.phash_hex,
              g.dhash_hex,
              gm.content AS winner_content
            FROM mlinkdw.shopprod_group2 g
            LEFT JOIN mlinkdw.shopprod_group_map2 gm
                   ON gm.icode = g.winner_icode AND gm.group_id = g.group_id
            WHERE g.group_id = ANY(%s)
            """,
            (gids,),
        )
        rows = cur.fetchall()
        cur.close()
        for r in rows:
            m[int(r["group_id"])] = GroupMeta(
                group_id=int(r["group_id"]),
                created_at=r["created_at"],
                winner_price=int(r["winner_price"]) if r["winner_price"] is not None else None,
                winner_vender_grade=int(r["winner_vender_grade"]) if r["winner_vender_grade"] is not None else None,
                winner_img_url=str(r["winner_img_url"]) if r["winner_img_url"] else None,
                winner_iname=str(r["winner_iname"]) if r["winner_iname"] else None,
                phash_hex=str(r["phash_hex"]) if r["phash_hex"] else None,
                dhash_hex=str(r["dhash_hex"]) if r["dhash_hex"] else None,
                winner_content=str(r["winner_content"]) if r["winner_content"] else None,
            )

    # 없는 것은 None으로 채움
    for g in gids_all:
        m.setdefault(
            g,
            GroupMeta(
                group_id=g,
                created_at=None,
                winner_price=None,
                winner_vender_grade=None,
                winner_img_url=None,
                winner_iname=None,
                phash_hex=None,
                dhash_hex=None,
                winner_content=None,
            ),
        )
    return m


def merge_cluster(conn, *, target: int, sources: list[int], dry_run: bool) -> None:
    if not sources or dry_run:
        return

    srcs = [int(x) for x in sources]
    cur = conn.cursor()
    try:
        # 0) content_token 정리 정책:
        # - winner_icode(=winner 상품 1개)의 token만 유지하는 구조이므로
        #   source token은 삭제하고, target에서도 winner가 아닌 token은 제거한다.
        try:
            # source token 삭제
            cur.execute(
                "DELETE FROM mlinkdw.shopprod_group_content_token WHERE group_id = ANY(%s)",
                (srcs,),
            )
            # target token 중 winner가 아닌 row 제거
            cur.execute(
                """
                DELETE FROM mlinkdw.shopprod_group_content_token t
                WHERE t.group_id = %s
                  AND EXISTS (
                    SELECT 1
                    FROM mlinkdw.shopprod_group2 g
                    WHERE g.group_id = t.group_id
                      AND (t.vender_code IS DISTINCT FROM g.winner_vender_code
                           OR t.icode IS DISTINCT FROM g.winner_icode)
                  )
                """,
                (int(target),),
            )
        except Exception:
            pass

        cur.execute(
            """
            UPDATE mlinkdw.shopprod_group_map2
            SET group_id = %s
            WHERE group_id = ANY(%s)
            """,
            (target, srcs),
        )
        cur.execute(
            """
            UPDATE mlinkdw.shopprod_sub_group2
            SET group_id = %s
            WHERE group_id = ANY(%s)
            """,
            (target, srcs),
        )
        try:
            cur.execute(
                """
                DELETE FROM mlinkdw.shopprod_group2 g
                WHERE g.group_id = ANY(%s)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_group_map2 gm WHERE gm.group_id = g.group_id)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_sub_group2 sg WHERE sg.group_id = g.group_id)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_group_content_token ct WHERE ct.group_id = g.group_id)
                """,
                (srcs,),
            )
        except Exception:
            pass
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        cur.close()


def main() -> None:
    ap = argparse.ArgumentParser()
    ap.add_argument("--dry-run", action="store_true", default=True, help="기본: 적용 안함(로그만)")
    ap.add_argument("--apply", action="store_true", help="실제 DB 병합 적용")
    ap.add_argument("--apply-limit", type=int, default=None, help="apply 모드에서 병합 작업을 N건까지만 수행")
    ap.add_argument("--min-count", type=int, default=2, help="동일 iname 최소 group 개수(기본 2)")
    ap.add_argument("--min-iname-len", type=int, default=8, help="안전장치: 너무 짧은 iname은 스킵(기본 8)")
    ap.add_argument("--max-cluster-size", type=int, default=50, help="안전장치: 1개 iname 클러스터 최대 크기(기본 50)")
    ap.add_argument(
        "--top-inames",
        type=int,
        default=None,
        help="분석/튜닝용: (cnt desc) 상위 N개의 iname 클러스터만 처리. 미지정이면 전체 대상.",
    )
    ap.add_argument("--require-hash", action="store_true", default=False, help="안전장치: winner phash/dhash 유사도 통과한 source만 병합")
    ap.add_argument("--ph-threshold", type=int, default=12, help="--require-hash 시 phash 해밍거리 임계값(기본 12)")
    ap.add_argument("--dh-threshold", type=int, default=14, help="--require-hash 시 dhash 해밍거리 임계값(기본 14)")
    ap.add_argument(
        "--require-content-imgset",
        action="store_true",
        default=False,
        help="안전장치: winner_content의 <img> 세트 매칭이 통과한 source만 병합(기본 OFF).",
    )
    ap.add_argument("--content-max-imgs", type=int, default=12, help="--require-content-imgset 사용 시 content에서 비교할 최대 이미지 수(기본 12)")
    ap.add_argument("--content-ph-threshold", type=int, default=12, help="content 이미지 매칭 기준: phash 거리 임계값(기본 12)")
    ap.add_argument("--content-dh-threshold", type=int, default=14, help="content 이미지 매칭 기준: dhash 거리 임계값(기본 14)")
    ap.add_argument("--content-min-matches", type=int, default=2, help="content 이미지 매칭 PASS 기준: 최소 매칭 이미지 수(기본 2)")
    ap.add_argument("--content-min-ratio", type=float, default=0.30, help="content 이미지 매칭 PASS 기준: 매칭 비율(기본 0.30)")
    ap.add_argument("--iname-min-sim", type=float, default=100.0, help="추가 필터: winner_iname 유사도(%) (기본 100=사실상 동일)")
    ap.add_argument("--skip-if-url-distinct-ratio", type=float, default=0.8, help="apply 안전장치: winner img_url distinct 비율이 높으면 스킵(기본 0.8). 0 이하면 비활성")
    ap.add_argument("--skip-if-url-min-size", type=int, default=10, help="url distinct ratio 적용 최소 클러스터 크기(기본 10)")
    ap.add_argument("--progress-every", type=int, default=200, help="진행 로그 출력 주기(iname 클러스터 단위)")
    ap.add_argument("--max-inames", type=int, default=None, help="테스트용: 처리할 iname 클러스터 상한")
    ap.add_argument(
        "--no-refresh-mv",
        action="store_true",
        default=False,
        help="기본은 시작 시 public.mv_winner_iname 을 REFRESH CONCURRENTLY 수행. 이 옵션으로 비활성화.",
    )
    ap.add_argument(
        "--background",
        action="store_true",
        help="Ubuntu/Linux: 백그라운드(detach)로 실행하고 즉시 종료. 로그는 --log-file로 저장 권장.",
    )
    ap.add_argument("--log-file", default=None, help="--background 사용 시 stdout/stderr 로그 파일")
    ap.add_argument("--_background-child", action="store_true", help=argparse.SUPPRESS)
    args = ap.parse_args()

    # 백그라운드(detach) 실행
    if args.background and not args._background_child:
        log_file = str(args.log_file or _default_log_file_name())
        child_args: list[str] = []
        skip_next = False
        for a in sys.argv[1:]:
            if skip_next:
                skip_next = False
                continue
            if a == "--background":
                continue
            if a == "--log-file":
                skip_next = True
                continue
            child_args.append(a)
        child_args.extend(["--_background-child", "--log-file", log_file])
        cmd = [sys.executable, os.path.abspath(sys.argv[0]), *child_args]
        p = _spawn_background(cmd, log_file=log_file)
        print(f"[INFO] background started: pid={p.pid} log_file={log_file}", flush=True)
        return

    dry_run = not bool(args.apply)
    min_count = max(2, int(args.min_count or 2))
    min_iname_len = max(0, int(args.min_iname_len or 0))
    max_cluster_size = max(2, int(args.max_cluster_size or 2))
    max_inames = int(args.max_inames) if args.max_inames is not None else None
    thr_sim = float(args.iname_min_sim or 0.0)
    require_hash = bool(args.require_hash)
    ph_thr = int(args.ph_threshold)
    dh_thr = int(args.dh_threshold)
    require_content = bool(args.require_content_imgset)
    c_max_imgs = int(args.content_max_imgs or 12)
    c_ph_thr = int(args.content_ph_threshold or 12)
    c_dh_thr = int(args.content_dh_threshold or 14)
    c_min_matches = int(args.content_min_matches or 2)
    c_min_ratio = float(args.content_min_ratio or 0.0)
    top_inames = int(args.top_inames) if args.top_inames is not None else None

    t0 = time.perf_counter()

    # mv_winner_iname 최신화
    # - REFRESH MATERIALIZED VIEW CONCURRENTLY 는 트랜잭션 블록 내 실행 불가 → autocommit 필요
    if not bool(args.no_refresh_mv):
        t_mv0 = time.perf_counter()
        try:
            conn_refresh = psycopg2.connect(**DB_INFO)
            conn_refresh.autocommit = True
            cur_mv = conn_refresh.cursor()
            cur_mv.execute("REFRESH MATERIALIZED VIEW CONCURRENTLY public.mv_winner_iname;")
            cur_mv.close()
            conn_refresh.close()
            mv_elapsed = time.perf_counter() - t_mv0
            print(f"[INFO] mv refresh ok: public.mv_winner_iname elapsed={mv_elapsed:.1f}s", flush=True)
        except Exception as e:
            mv_elapsed = time.perf_counter() - t_mv0
            print(f"[WARN] mv refresh failed: public.mv_winner_iname elapsed={mv_elapsed:.1f}s err={e!r}", flush=True)

    # mv_winner_iname 은 public 스키마 matview
    sql = """
        SELECT group_id, iname
          FROM public.mv_winner_iname
         WHERE iname IS NOT NULL 
           AND iname <> ''
         ORDER BY iname, group_id
    """

    processed_inames = 0
    merge_jobs = 0
    skipped = 0
    seen_rows = 0

    # 중요: mv_winner_iname 스트리밍 커서(named cursor)를 사용하는 동안
    # 같은 커넥션에서 commit/rollback을 하면 portal이 무효화되어 반복 중 예외가 발생할 수 있다.
    # 따라서 읽기/쓰기 커넥션을 분리한다.
    conn_read = psycopg2.connect(**DB_INFO)
    conn_write = psycopg2.connect(**DB_INFO)
    try:
        allowed_inames: set[str] | None = None
        if top_inames is not None and top_inames > 0:
            cur0 = conn_read.cursor()
            cur0.execute(
                """
                SELECT iname
                FROM public.mv_winner_iname
                WHERE iname IS NOT NULL AND iname <> ''
                GROUP BY iname
                HAVING COUNT(*) >= %s
                ORDER BY COUNT(*) DESC, iname
                LIMIT %s
                """,
                (min_count, top_inames),
            )
            allowed_inames = {str(r[0]) for r in cur0.fetchall() if r and r[0]}
            cur0.close()
            print(f"[INFO] top_inames enabled: N={top_inames} selected={len(allowed_inames)}", flush=True)

        read_cur = conn_read.cursor(name="mv_iname_cursor", cursor_factory=DictCursor)
        read_cur.itersize = 5000
        read_cur.execute(sql)

        current_iname: str | None = None
        current_ids: list[int] = []
        stopped_early = False

        def flush_cluster(iname: str, gids: list[int]) -> None:
            nonlocal processed_inames, merge_jobs, skipped
            if not iname:
                return
            if allowed_inames is not None and iname not in allowed_inames:
                return
            if len(gids) < min_count:
                return
            if min_iname_len and len(str(iname)) < min_iname_len:
                return
            if len(gids) > max_cluster_size:
                skipped += 1
                print(f"[SKIP] cluster too large: iname={iname!r} size={len(gids)} > max={max_cluster_size}", flush=True)
                return

            processed_inames += 1
            metas = fetch_group_meta(conn_write, gids)

            def _key(g: int):
                m = metas.get(g) or GroupMeta(g, None, None, None, None, None, None, None)
                wp = m.winner_price if m.winner_price is not None else 10**18
                vg = m.winner_vender_grade if m.winner_vender_grade is not None else 10**9
                ca = m.created_at or datetime.max
                return (wp, vg, ca, g)

            comp = sorted({int(x) for x in gids})
            target = sorted(comp, key=_key)[0]
            sources_all = [g for g in comp if g != target]

            kept: list[int] = []
            dropped_sim = 0
            t_iname = metas.get(target).winner_iname if metas.get(target) else None
            for s in sources_all:
                s_iname = metas.get(s).winner_iname if metas.get(s) else None
                sim = _iname_similarity(t_iname, s_iname)
                if sim < thr_sim:
                    dropped_sim += 1
                    continue
                kept.append(s)
            if dropped_sim:
                print(f"[INFO] iname filter: iname={iname!r} target={target} dropped={dropped_sim} kept={len(kept)} thr={thr_sim}", flush=True)
            sources = kept
            if not sources:
                return

            url_thr = float(args.skip_if_url_distinct_ratio or 0.0)
            min_sz = int(args.skip_if_url_min_size or 0)
            if url_thr > 0 and len(comp) >= max(1, min_sz):
                counts: dict[str, int] = {}
                for gid in comp:
                    u = metas.get(gid).winner_img_url if metas.get(gid) else None
                    nu = _norm_img_url(u)
                    key = nu if nu else "(null)"
                    counts[key] = counts.get(key, 0) + 1
                ratio = (len(counts) / float(len(comp))) if comp else 1.0
                if (not dry_run) and ratio > url_thr:
                    skipped += 1
                    print(
                        f"[SKIP] url_distinct_ratio too high: {ratio:.3f} (distinct={len(counts)} size={len(comp)} thr={url_thr}) "
                        f"iname={iname!r} target={target} sources={len(sources)}",
                        flush=True,
                    )
                    return

            if require_hash:
                t_ph = metas.get(target).phash_hex if metas.get(target) else None
                t_dh = metas.get(target).dhash_hex if metas.get(target) else None
                kept2: list[int] = []
                dropped_hash = 0
                for s in sources:
                    s_ph = metas.get(s).phash_hex if metas.get(s) else None
                    s_dh = metas.get(s).dhash_hex if metas.get(s) else None
                    if _hamming64_hex(t_ph, s_ph) <= ph_thr and _hamming64_hex(t_dh, s_dh) <= dh_thr:
                        kept2.append(s)
                    else:
                        dropped_hash += 1
                if dropped_hash:
                    print(
                        f"[INFO] hash filter: iname={iname!r} target={target} dropped={dropped_hash} kept={len(kept2)} ph_thr={ph_thr} dh_thr={dh_thr}",
                        flush=True,
                    )
                sources = kept2
                if not sources:
                    return

            # (옵션) content(이미지세트) 매칭 요구: target과 유사한 source만 남김
            if require_content:
                if imagehash is None or Image is None:
                    raise RuntimeError("--require-content-imgset 사용에는 imagehash/PIL 설치가 필요합니다.")
                t_ct = metas.get(target).winner_content if metas.get(target) else None
                kept3: list[int] = []
                dropped_ct = 0
                for s in sources:
                    s_ct = metas.get(s).winner_content if metas.get(s) else None
                    st = _content_imgset_match_stats(
                        t_ct,
                        s_ct,
                        max_imgs=max(1, c_max_imgs),
                        ph_thr=c_ph_thr,
                        dh_thr=c_dh_thr,
                    )
                    if st.get("err"):
                        dropped_ct += 1
                        continue
                    match = int(st.get("match") or 0)
                    ratio = float(st.get("ratio") or 0.0)
                    if match >= c_min_matches or ratio >= c_min_ratio:
                        kept3.append(s)
                    else:
                        dropped_ct += 1
                if dropped_ct:
                    print(
                        f"[INFO] content-imgset filter: iname={iname!r} target={target} "
                        f"dropped={dropped_ct} kept={len(kept3)} "
                        f"(matches>={c_min_matches} or ratio>={c_min_ratio:.2f}) "
                        f"(ph_thr={c_ph_thr} dh_thr={c_dh_thr} max_imgs={c_max_imgs})",
                        flush=True,
                    )
                sources = kept3
                if not sources:
                    return

            SAMPLE_N = 30
            src_sample = sources[:SAMPLE_N]
            src_more = max(0, len(sources) - len(src_sample))
            if dry_run:
                print(f"[DRY] iname={iname!r} target={target} <- sources_count={len(sources)} sample={src_sample}" + (f" (+{src_more} more)" if src_more else ""))
            else:
                print(f"[APPLY] iname={iname!r} target={target} <- sources_count={len(sources)} sample={src_sample}" + (f" (+{src_more} more)" if src_more else ""))

            merge_cluster(conn_write, target=target, sources=sources, dry_run=dry_run)
            merge_jobs += 1

            if args.apply_limit is not None and (not dry_run) and merge_jobs >= int(args.apply_limit):
                raise StopIteration

            if args.progress_every and (processed_inames % int(args.progress_every) == 0):
                elapsed = time.perf_counter() - t0
                rate = (processed_inames / elapsed) if elapsed > 0 else 0.0
                print(
                    f"[INFO] progress: inames={processed_inames} merge_jobs={merge_jobs} skipped={skipped} "
                    f"seen_rows={seen_rows} elapsed={_fmt_hms(elapsed)} rate={rate:.2f}inames/s",
                    flush=True,
                )

            if max_inames is not None and processed_inames >= max_inames:
                raise StopIteration

        try:
            for r in read_cur:
                seen_rows += 1
                gid = int(r["group_id"])
                iname = str(r["iname"] or "")
                if current_iname is None:
                    current_iname = iname
                    current_ids = [gid]
                    continue
                if iname == current_iname:
                    current_ids.append(gid)
                    continue
                flush_cluster(current_iname, current_ids)
                current_iname = iname
                current_ids = [gid]
        except StopIteration:
            stopped_early = True
        finally:
            # 조기 중단 시 마지막 클러스터를 중복 flush하지 않도록 방지
            if (not stopped_early) and current_iname is not None and current_ids:
                try:
                    flush_cluster(current_iname, current_ids)
                except StopIteration:
                    pass

        read_cur.close()

        elapsed = time.perf_counter() - t0
        print(
            f"[INFO] done dry_run={dry_run} inames={processed_inames} merge_jobs={merge_jobs} skipped={skipped} "
            f"seen_rows={seen_rows} elapsed={_fmt_hms(elapsed)}",
            flush=True,
        )
    finally:
        try:
            conn_read.close()
        except Exception:
            pass
        try:
            conn_write.close()
        except Exception:
            pass


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("[INFO] interrupted by user (Ctrl+C).", flush=True)

