#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
그룹 자동 병합 배치 (이미지 기준)

목표
- 서로 다른 group_id인데도 이미지가 사실상 동일(추천)인 케이스를 찾아 병합
- UI의 "그룹 병합" 로직과 동일하게:
  - shopprod_group_map.group_id 를 TARGET으로 이동
  - shopprod_sub_group.group_id 를 TARGET으로 이동
  - SOURCE 그룹이 비면 shopprod_group에서 삭제 시도(실패해도 무시)

후보 찾기 전략(안전/성능)
- 기본: phash_bands(LSH 밴딩)로 후보를 좁힌 뒤, phash/dhash 해밍거리(threshold)로 "동일(추천)" 판정
- 옵션: 동일 img_url(정규화) 기준으로 후보를 만들고, 필요시 거리 필터 적용

주의
- 대량 병합은 운영에 영향이 큼. 기본값은 --dry-run (미적용) 모드.
"""

from __future__ import annotations

import argparse
import asyncio
import difflib
import os
import re
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import AsyncIterator, Iterable

import asyncpg

from db_config import DB_INFO_ASYNCPG as DB_INFO

def _fmt_hms(seconds: float) -> str:
    try:
        s = int(seconds)
    except Exception:
        s = 0
    if s < 0:
        s = 0
    h = s // 3600
    m = (s % 3600) // 60
    ss = s % 60
    return f"{h:02d}:{m:02d}:{ss:02d}"

def _default_log_file_name() -> str:
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    return f"Batch_MergeGroups_ByImage_{ts}.log"


def _spawn_background(argv: list[str], log_file: str) -> subprocess.Popen:
    """
    Ubuntu/Linux: setsid(start_new_session=True)로 세션을 분리해서 백그라운드 실행.
    stdout/stderr는 log_file로 리다이렉트.
    """
    log_path = os.path.abspath(log_file)
    os.makedirs(os.path.dirname(log_path) or ".", exist_ok=True)
    f = open(log_path, "a", buffering=1, encoding="utf-8")
    return subprocess.Popen(
        argv,
        stdout=f,
        stderr=subprocess.STDOUT,
        close_fds=True,
        start_new_session=True,
    )


def _hamming64_hex(a: str, b: str) -> int:
    if not a or not b:
        return 10**9
    try:
        x = int(a, 16) ^ int(b, 16)
        return x.bit_count()
    except Exception:
        return 10**9


def _norm_img_url(u: str) -> str:
    """
    img_url 정규화:
    - 좌우 공백 제거
    - query string 제거(= 같은 리소스라도 hash 파라미터만 다른 케이스 완화)
    - 소문자화(스킴/호스트)
    """
    if not u:
        return ""
    u = str(u).strip()
    if not u:
        return ""
    u = u.split("#", 1)[0]
    u = u.split("?", 1)[0]
    # scheme/host만 lower 처리(간단화)
    m = re.match(r"^(https?://)([^/]+)(/.*)?$", u, flags=re.IGNORECASE)
    if m:
        scheme = m.group(1).lower()
        host = (m.group(2) or "").lower()
        rest = m.group(3) or ""
        return f"{scheme}{host}{rest}"
    return u


def _parse_dt(s: str | None) -> datetime | None:
    """
    CLI 입력 문자열을 datetime으로 변환(asyncpg timestamp 파라미터용).
    허용 형식:
    - YYYY-MM-DD
    - YYYY-MM-DD HH:MM:SS
    """
    if not s:
        return None
    s = str(s).strip()
    if not s:
        return None
    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
        try:
            return datetime.strptime(s, fmt)
        except Exception:
            pass
    raise ValueError(f"invalid datetime format: {s!r} (expected YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)")


def _norm_iname(s: str | None) -> str:
    if not s:
        return ""
    s = str(s).strip().lower()
    if not s:
        return ""
    s = re.sub(r"\s+", " ", s)
    return s


def _iname_similarity(a: str | None, b: str | None) -> float:
    """
    0~100 (%). 외부 의존성 없이 간단하게 문자열 유사도만 사용.
    - 한글/공백 포함 문자열에 대해 대략적인 오탐 제거용
    """
    a2 = _norm_iname(a)
    b2 = _norm_iname(b)
    if not a2 or not b2:
        return 0.0
    return difflib.SequenceMatcher(None, a2, b2).ratio() * 100.0


def _is_low_entropy_hex64(h: str, *, max_unique_nibbles: int = 4) -> bool:
    """
    64-bit hex(16자) 기준으로 '정보량이 낮은' 해시를 간단히 판정.
    - 단색/패턴/기본이미지에서 흔히 나오는 dhash/phash(예: 4d4d4d..., 0c0c0c...)
    - 정확한 판정이 아니라 '대량 오병합 방지용 안전장치'로 사용
    """
    if not h:
        return True
    h = str(h).strip().lower()
    if len(h) != 16:
        # 예상 길이가 아니면 여기서는 건드리지 않음
        return False
    # hex 문자만 남기고 검사(혹시 모를 언더스코어 등 방지)
    if not re.fullmatch(r"[0-9a-f]{16}", h):
        return False
    return len(set(h)) <= int(max_unique_nibbles)


@dataclass
class GroupRow:
    group_id: int
    created_at: datetime | None
    phash_hex: str
    dhash_hex: str
    phash_bands: list[int]
    winner_updated_at: datetime | None
    winner_price: int | None


async def fetch_seed_groups(
    conn: asyncpg.Connection,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
) -> list[GroupRow]:
    # winner_updated_at 기준으로 seed 범위 제한
    where = ["phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    sql = f"""
        SELECT group_id, created_at, phash_hex, dhash_hex, phash_bands, winner_updated_at, winner_price
        FROM mlinkdw.shopprod_group2
        WHERE {' AND '.join(where)}
    """
    rows = await conn.fetch(sql, *params) if params else await conn.fetch(sql)
    out: list[GroupRow] = []
    for r in rows:
        out.append(
            GroupRow(
                group_id=int(r["group_id"]),
                created_at=r["created_at"],
                phash_hex=str(r["phash_hex"] or ""),
                dhash_hex=str(r["dhash_hex"] or ""),
                phash_bands=list(r["phash_bands"] or []),
                winner_updated_at=r["winner_updated_at"],
                winner_price=int(r["winner_price"]) if r["winner_price"] is not None else None,
            )
        )
    return out


async def iter_seed_groups(
    conn: asyncpg.Connection,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
    *,
    chunk_size: int = 5000,
    max_seeds: int | None = None,
) -> AsyncIterator[GroupRow]:
    """
    seeds를 전체 메모리에 올리지 않고 서버 커서로 스트리밍.
    """
    where = ["phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    sql = f"""
        SELECT group_id, created_at, phash_hex, dhash_hex, phash_bands, winner_updated_at, winner_price
        FROM mlinkdw.shopprod_group2
        WHERE {' AND '.join(where)}
    """
    # asyncpg 서버 커서는 트랜잭션 내부에서만 생성/순회 가능
    async with conn.transaction():
        n = 0
        cur = conn.cursor(sql, *params, prefetch=chunk_size)
        try:
            async for r in cur:
                yield GroupRow(
                    group_id=int(r["group_id"]),
                    created_at=r["created_at"],
                    phash_hex=str(r["phash_hex"] or ""),
                    dhash_hex=str(r["dhash_hex"] or ""),
                    phash_bands=list(r["phash_bands"] or []),
                    winner_updated_at=r["winner_updated_at"],
                    winner_price=int(r["winner_price"]) if r["winner_price"] is not None else None,
                )
                n += 1
                if max_seeds is not None and n >= int(max_seeds):
                    break
        finally:
            # asyncpg cursor가 close()를 제공하는 경우 명시적으로 종료해 connection 반환 지연을 줄임
            try:
                close_meth = getattr(cur, "close", None)
                if close_meth:
                    res = close_meth()
                    if asyncio.iscoroutine(res):
                        await res
            except Exception:
                pass


async def fetch_seed_count(
    conn: asyncpg.Connection,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
) -> int:
    where = ["phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    sql = f"""
        SELECT COUNT(*) AS cnt
        FROM mlinkdw.shopprod_group2
        WHERE {' AND '.join(where)}
    """
    row = await conn.fetchrow(sql, *params) if params else await conn.fetchrow(sql)
    return int(row["cnt"] or 0) if row else 0


async def fetch_common_hashes(
    conn: asyncpg.Connection,
    *,
    kind: str,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
    min_count: int,
    limit: int,
) -> list[tuple[str, int]]:
    """
    seed 범위(필터 적용) 내에서 자주 등장하는 phash_hex/dhash_hex를 찾아서
    '기본/공통 이미지'에 가까운 해시를 배치에서 차단할 수 있게 한다.
    """
    kind = str(kind).strip().lower()
    if kind not in ("phash", "dhash"):
        raise ValueError(f"invalid kind: {kind!r}")
    col = "phash_hex" if kind == "phash" else "dhash_hex"

    where = [f"{col} IS NOT NULL", "phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    params.append(int(min_count))
    params.append(int(limit))

    sql = f"""
        SELECT {col} AS h, COUNT(*)::int AS cnt
        FROM mlinkdw.shopprod_group2
        WHERE {' AND '.join(where)}
        GROUP BY {col}
        HAVING COUNT(*) >= ${len(params)-1}::int
        ORDER BY cnt DESC
        LIMIT ${len(params)}::int
    """
    rows = await conn.fetch(sql, *params)
    return [(str(r["h"]), int(r["cnt"])) for r in rows if r["h"]]


async def fetch_hot_bands(
    conn: asyncpg.Connection,
    *,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
    min_count: int,
    limit: int,
) -> list[tuple[int, int]]:
    """
    seed 범위(필터 적용) 내에서 자주 등장하는 phash_bands 요소를 찾아,
    'stopword'처럼 후보 검색에서 제외할 수 있게 한다.

    예) 특정 밴드값이 너무 흔하면(phash_bands && ...) 후보가 폭주하고,
        전이(DSU)로 초대형 클러스터가 생길 수 있음.
    """
    where = ["phash_bands IS NOT NULL", "phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    params.append(int(min_count))
    params.append(int(limit))

    sql = f"""
        SELECT b AS band, COUNT(*)::int AS cnt
        FROM mlinkdw.shopprod_group2 g
        CROSS JOIN LATERAL unnest(g.phash_bands) AS b
        WHERE {' AND '.join(where)}
        GROUP BY b
        HAVING COUNT(*) >= ${len(params)-1}::int
        ORDER BY cnt DESC
        LIMIT ${len(params)}::int
    """
    rows = await conn.fetch(sql, *params)
    return [(int(r["band"]), int(r["cnt"])) for r in rows if r["band"] is not None]


async def fetch_candidates_by_bands(
    conn: asyncpg.Connection, *, group_id: int, bands: list[int], limit: int
) -> list[tuple[int, str, str]]:
    if not bands:
        return []
    # phash_bands GIN 인덱스가 있으면 && 조건이 빠르게 후보를 줄여줌
    rows = await conn.fetch(
        """
        SELECT group_id, phash_hex, dhash_hex
        FROM mlinkdw.shopprod_group2
        WHERE group_id <> $1
          AND phash_hex IS NOT NULL AND dhash_hex IS NOT NULL
          AND phash_bands && $2::int4[]
        LIMIT $3
        """,
        group_id,
        bands,
        limit,
    )
    return [(int(r["group_id"]), str(r["phash_hex"]), str(r["dhash_hex"])) for r in rows]


class DSU:
    def __init__(self):
        self.p: dict[int, int] = {}
        self.sz: dict[int, int] = {}

    def find(self, x: int) -> int:
        if x not in self.p:
            self.p[x] = x
            self.sz[x] = 1
            return x
        while self.p[x] != x:
            self.p[x] = self.p[self.p[x]]
            x = self.p[x]
        return x

    def union(self, a: int, b: int) -> None:
        ra, rb = self.find(a), self.find(b)
        if ra == rb:
            return
        if self.sz[ra] < self.sz[rb]:
            ra, rb = rb, ra
        self.p[rb] = ra
        self.sz[ra] = self.sz.get(ra, 1) + self.sz.get(rb, 1)

    def groups(self) -> dict[int, list[int]]:
        out: dict[int, list[int]] = {}
        for x in list(self.p.keys()):
            r = self.find(x)
            out.setdefault(r, []).append(x)
        return out


@dataclass
class GroupMeta:
    group_id: int
    created_at: datetime | None
    winner_price: int | None
    winner_vender_grade: int | None
    winner_img_url: str | None
    winner_iname: str | None


async def fetch_group_meta(conn: asyncpg.Connection, group_ids: Iterable[int]) -> dict[int, GroupMeta]:
    """
    클러스터별 TARGET 선택을 위한 메타:
    - winner_price(가장 낮은 가격)
    - winner_vender_grade(가장 낮은 vender_grade) : shopprod_group_map에서 winner_icode 행 기준
    - created_at(가장 오래된 그룹)
    """
    gids_all = list({int(x) for x in group_ids})
    if not gids_all:
        return {}

    m: dict[int, GroupMeta] = {}
    CHUNK = 50000  # 파라미터/패킷 크기 방지
    for off in range(0, len(gids_all), CHUNK):
        gids = gids_all[off : off + CHUNK]
        rows = await conn.fetch(
            """
            SELECT
              g.group_id,
              g.created_at,
              g.winner_price,
              CASE
                WHEN gm.vender_grade ~ '^[0-9]+$' THEN gm.vender_grade::int
                ELSE NULL
              END AS winner_vender_grade
              , gm.img_url AS winner_img_url
              , gm.iname   AS winner_iname
            FROM mlinkdw.shopprod_group2 g
            LEFT JOIN mlinkdw.shopprod_group_map2 gm ON gm.icode = g.winner_icode AND gm.group_id = g.group_id
            WHERE g.group_id = ANY($1::int[])
            """,
            gids,
        )
        for r in rows:
            m[int(r["group_id"])] = GroupMeta(
                group_id=int(r["group_id"]),
                created_at=r["created_at"],
                winner_price=int(r["winner_price"]) if r["winner_price"] is not None else None,
                winner_vender_grade=int(r["winner_vender_grade"]) if r["winner_vender_grade"] is not None else None,
                winner_img_url=str(r["winner_img_url"]) if r["winner_img_url"] else None,
                winner_iname=str(r["winner_iname"]) if r["winner_iname"] else None,
            )

    # 없는 것은 None
    for g in gids_all:
        m.setdefault(
            g,
            GroupMeta(
                group_id=g,
                created_at=None,
                winner_price=None,
                winner_vender_grade=None,
                winner_img_url=None,
                winner_iname=None,
            ),
        )
    return m


async def merge_cluster(
    conn: asyncpg.Connection,
    *,
    target: int,
    sources: list[int],
    dry_run: bool,
) -> None:
    if not sources:
        return
    if dry_run:
        return

    # UI 로직과 동일한 의미의 병합을 "배치 UPDATE"로 수행(대량에서 속도/락 시간 개선)
    async with conn.transaction():
        srcs = [int(x) for x in sources]
        # 0) content_token 정리 정책:
        # - winner_icode(=winner 상품 1개)의 token만 유지하는 구조이므로
        #   source token은 삭제하고, target에서도 winner가 아닌 token은 제거한다.
        try:
            await conn.execute(
                "DELETE FROM mlinkdw.shopprod_group_content_token WHERE group_id = ANY($1::int[])",
                srcs,
            )
            await conn.execute(
                """
                DELETE FROM mlinkdw.shopprod_group_content_token t
                WHERE t.group_id = $1
                  AND EXISTS (
                    SELECT 1
                    FROM mlinkdw.shopprod_group2 g
                    WHERE g.group_id = t.group_id
                      AND (t.vender_code IS DISTINCT FROM g.winner_vender_code
                           OR t.icode IS DISTINCT FROM g.winner_icode)
                  )
                """,
                int(target),
            )
        except Exception:
            pass

        await conn.execute(
            """
            UPDATE mlinkdw.shopprod_group_map2
            SET group_id = $1
            WHERE group_id = ANY($2::int[])
            """,
            target,
            srcs,
        )
        await conn.execute(
            """
            UPDATE mlinkdw.shopprod_sub_group2
            SET group_id = $1
            WHERE group_id = ANY($2::int[])
            """,
            target,
            srcs,
        )
        # SOURCE 그룹이 완전히 비었으면 삭제(존재하면 남김). 실패는 무시.
        try:
            await conn.execute(
                """
                DELETE FROM mlinkdw.shopprod_group2 g
                WHERE g.group_id = ANY($1::int[])
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_group_map2 gm WHERE gm.group_id = g.group_id)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_sub_group2 sg WHERE sg.group_id = g.group_id)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_group_content_token ct WHERE ct.group_id = g.group_id)
                """,
                srcs,
            )
        except Exception:
            pass


async def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--since", default="2026-01-01 00:00:00", help="seed 그룹 필터: winner_updated_at >= since (예: 2026-01-18 00:00:00)")
    ap.add_argument("--before", default="2026-01-16 23:59:59", help="seed 그룹 필터: winner_updated_at <= before (예: 2026-01-18 23:59:59)")
    ap.add_argument("--yesterday", action="store_true", help="--since 를 어제로 자동 설정 (00:00:00)")
    ap.add_argument(
        "--winner-icode",
        action="append",
        default=None,
        help="테스트용 seed 제한: winner_icode (여러 번 지정 가능). 예) --winner-icode WAC4CBF --winner-icode 60051975",
    )
    ap.add_argument(
        "--winner-icodes",
        default=None,
        help="테스트용 seed 제한: winner_icode 콤마 구분. 예) --winner-icodes WAC4CBF,60051975",
    )
    ap.add_argument("--ph-threshold", type=int, default=12)
    ap.add_argument("--dh-threshold", type=int, default=14)
    ap.add_argument("--candidate-limit", type=int, default=5000, help="seed 1개당 band 후보 상한")
    ap.add_argument(
        "--pairwise-seeds",
        action="store_true",
        default=False,
        help="LSH(bands) overlap으로 후보를 못 찾는 케이스 보완: seed들끼리 직접 해밍거리 비교(O(n^2)). "
             "테스트/소량 seed에서만 권장.",
    )
    ap.add_argument("--dry-run", action="store_true", default=True, help="기본: 적용 안함(로그만)")
    ap.add_argument("--apply", action="store_true", help="실제 DB 병합 적용")
    ap.add_argument("--workers", type=int, default=8, help="seed 후보 조회 병렬 워커 수(DB 부하에 맞게 조절)")
    ap.add_argument("--queue-size", type=int, default=2000, help="producer/consumer 큐 크기")
    ap.add_argument("--seed-chunk", type=int, default=5000, help="서버 커서 prefetch 크기")
    ap.add_argument("--max-seeds", type=int, default=None, help="테스트용 seed 상한(대량 실행시 미사용)")
    ap.add_argument("--no-count", action="store_true", help="seed count 미리 조회 생략(빠르게 시작)")
    ap.add_argument("--apply-limit", type=int, default=None, help="apply 모드에서 병합 작업을 N건까지만 수행(점진 적용용)")
    ap.add_argument("--report-top-clusters", type=int, default=10, help="컴포넌트(클러스터) 크기 상위 N개 리포트(기본 10)")
    ap.add_argument(
        "--background",
        action="store_true",
        help="Ubuntu/Linux: 백그라운드(detach)로 실행하고 즉시 종료. 로그는 --log-file로 저장 권장.",
    )
    ap.add_argument(
        "--log-file",
        default=None,
        help="--background 사용 시 stdout/stderr를 저장할 로그 파일 경로. 미지정이면 자동 생성.",
    )
    ap.add_argument("--_background-child", action="store_true", help=argparse.SUPPRESS)
    ap.add_argument(
        "--max-cluster-size",
        type=int,
        default=200,
        help="안전장치: 1개 컴포넌트(클러스터) 최대 크기. apply 모드에서 이 값을 초과하면 스킵(기본 200).",
    )
    ap.add_argument(
        "--allow-large-cluster",
        action="store_true",
        help="--max-cluster-size 초과 클러스터도 적용 허용(운영 주의).",
    )
    ap.add_argument(
        "--skip-common-phash",
        type=int,
        default=500,
        help="안전장치: seed 범위에서 phash_hex가 N회 이상 등장하면 '공통 이미지'로 보고 비교/병합에서 제외 (기본 500). 0이면 비활성",
    )
    ap.add_argument(
        "--skip-common-dhash",
        type=int,
        default=500,
        help="안전장치: seed 범위에서 dhash_hex가 N회 이상 등장하면 '공통 이미지'로 보고 비교/병합에서 제외 (기본 500). 0이면 비활성",
    )
    ap.add_argument(
        "--common-hash-top",
        type=int,
        default=20,
        help="skip-common-* 산정 시 상위 N개만 차단/출력 (기본 20)",
    )
    ap.add_argument(
        "--skip-low-entropy",
        action="store_true",
        default=True,
        help="안전장치: 단색/패턴/기본이미지 계열로 추정되는 low-entropy 해시(예: 4d4d..., 0c0c...)는 비교/병합에서 제외",
    )
    ap.add_argument(
        "--no-skip-low-entropy",
        action="store_true",
        help="--skip-low-entropy 비활성화",
    )
    ap.add_argument(
        "--low-entropy-max-unique",
        type=int,
        default=4,
        help="low-entropy 판정 기준: 16자리 hex에서 서로 다른 nibble 종류 수가 N 이하이면 제외(기본 4)",
    )
    ap.add_argument(
        "--progress-every",
        type=int,
        default=5000,
        help="진행 로그 출력 주기(기본 5000). producer 진행(스킵 포함)과 worker scanned를 함께 로깅",
    )
    ap.add_argument(
        "--skip-hot-bands",
        type=int,
        default=500,
        help="안전장치: seed 범위에서 특정 phash_bands 요소가 N회 이상 등장하면 후보검색(bands && ...)에서 제외(기본 500). 0이면 비활성",
    )
    ap.add_argument(
        "--hot-bands-top",
        type=int,
        default=50,
        help="skip-hot-bands 산정 시 상위 N개만 제외/출력 (기본 50)",
    )
    ap.add_argument(
        "--report-url-stats",
        action="store_true",
        default=True,
        help="DRY/APPLY 로그에 클러스터 내 winner img_url(정규화) 분포 통계를 함께 출력",
    )
    ap.add_argument(
        "--no-report-url-stats",
        action="store_true",
        help="--report-url-stats 비활성화",
    )
    ap.add_argument(
        "--url-stats-top",
        type=int,
        default=5,
        help="클러스터 url 통계 출력 시 상위 N개만 표시(기본 5)",
    )
    ap.add_argument(
        "--skip-if-url-distinct-ratio",
        type=float,
        default=0.8,
        help=(
            "안전장치(apply): 클러스터 내 winner img_url(정규화) distinct 비율이 임계값을 초과하면 병합 스킵. "
            "예) size=100, distinct=95 => 0.95. 기본 0.8. 0 이하면 비활성"
        ),
    )
    ap.add_argument(
        "--skip-if-url-min-size",
        type=int,
        default=10,
        help="--skip-if-url-distinct-ratio 적용 최소 클러스터 크기(기본 10). 너무 작은 클러스터는 예외로 둠.",
    )
    ap.add_argument(
        "--iname-min-sim",
        type=float,
        default=0.0,
        help="병합 필터: TARGET winner_iname 과 SOURCE winner_iname 유사도(%)가 이 값 미만이면 병합 제외. 예) 30",
    )
    args = ap.parse_args()

    # 백그라운드(detach) 실행: 부모는 즉시 종료, 자식 프로세스가 실제 작업 수행
    if args.background and not args._background_child:
        log_file = str(args.log_file or _default_log_file_name())
        # --background 제거 + 자식 플래그 추가
        child_args: list[str] = []
        skip_next = False
        for a in sys.argv[1:]:
            if skip_next:
                skip_next = False
                continue
            if a == "--background":
                continue
            if a == "--log-file":
                skip_next = True
                continue
            child_args.append(a)
        child_args.extend(["--_background-child", "--log-file", log_file])

        cmd = [sys.executable, os.path.abspath(sys.argv[0]), *child_args]
        p = _spawn_background(cmd, log_file=log_file)
        print(f"[INFO] background started: pid={p.pid} log_file={log_file}", flush=True)
        return

    t0 = time.perf_counter()
    argv_txt = " ".join(sys.argv)

    since_s: str | None = args.since
    before_s: str | None = args.before
    if args.yesterday:
        since_s = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d 00:00:00")

    try:
        since = _parse_dt(since_s)
        before = _parse_dt(before_s)
    except Exception as e:
        raise SystemExit(f"[ARG] since/before 파싱 실패: {e}")

    winner_icodes: list[str] | None = None
    if args.winner_icodes:
        winner_icodes = [s.strip() for s in str(args.winner_icodes).split(",") if s.strip()]
    if args.winner_icode:
        winner_icodes = (winner_icodes or []) + [str(s).strip() for s in args.winner_icode if str(s).strip()]
    if winner_icodes:
        # 중복 제거(순서 유지)
        seen = set()
        tmp = []
        for s in winner_icodes:
            if s in seen:
                continue
            seen.add(s)
            tmp.append(s)
        winner_icodes = tmp

    # winner_icode로 seed를 강하게 제한하는 테스트 모드라면 pairwise 비교를 기본 켬(안전)
    if winner_icodes and not args.pairwise_seeds:
        args.pairwise_seeds = True

    dry_run = True
    if args.apply:
        dry_run = False

    # skip-low-entropy 토글 (argparse는 상호배타를 안 걸었으므로 여기서 정리)
    if args.no_skip_low_entropy:
        args.skip_low_entropy = False
    if args.no_report_url_stats:
        args.report_url_stats = False

    pool = await asyncpg.create_pool(**DB_INFO, min_size=1, max_size=max(2, int(args.workers)))
    try:
        # 대량 실행에서는 count를 먼저 찍어주면 운영에서 진행률 판단이 쉬움(원치 않으면 --no-count)
        total_seeds: int | None = None
        if not args.no_count:
            async with pool.acquire() as c0:
                total_seeds = await fetch_seed_count(c0, since, before, winner_icodes)
        print(f"[INFO] argv={argv_txt}", flush=True)
        print(
            f"[INFO] seeds_count={total_seeds if total_seeds is not None else 'N/A'} "
            f"since={since} before={before} winner_icodes={winner_icodes!r} "
            f"workers={int(args.workers)}"
        )

        # 공통(기본) 이미지 해시 차단 세트 계산
        blocked_phash: set[str] = set()
        blocked_dhash: set[str] = set()
        blocked_bands: set[int] = set()
        if int(args.skip_common_phash) > 0 or int(args.skip_common_dhash) > 0:
            async with pool.acquire() as c_block:
                topn = max(1, int(args.common_hash_top))
                if int(args.skip_common_phash) > 0:
                    common = await fetch_common_hashes(
                        c_block,
                        kind="phash",
                        since=since,
                        before=before,
                        winner_icodes=winner_icodes,
                        min_count=int(args.skip_common_phash),
                        limit=topn,
                    )
                    blocked_phash = {h for h, _ in common}
                    if common:
                        print("[INFO] blocked common phash_hex (top): " + ", ".join([f"{h}({c})" for h, c in common]))
                if int(args.skip_common_dhash) > 0:
                    common = await fetch_common_hashes(
                        c_block,
                        kind="dhash",
                        since=since,
                        before=before,
                        winner_icodes=winner_icodes,
                        min_count=int(args.skip_common_dhash),
                        limit=topn,
                    )
                    blocked_dhash = {h for h, _ in common}
                    if common:
                        print("[INFO] blocked common dhash_hex (top): " + ", ".join([f"{h}({c})" for h, c in common]))

        # 핫 밴드(stopword) 차단 세트 계산
        if int(args.skip_hot_bands) > 0:
            async with pool.acquire() as c_band:
                topn_b = max(1, int(args.hot_bands_top))
                hot = await fetch_hot_bands(
                    c_band,
                    since=since,
                    before=before,
                    winner_icodes=winner_icodes,
                    min_count=int(args.skip_hot_bands),
                    limit=topn_b,
                )
                blocked_bands = {b for b, _ in hot}
                if hot:
                    print("[INFO] blocked hot bands (top): " + ", ".join([f"{b}({c})" for b, c in hot]), flush=True)

        dsu = DSU()
        dsu_lock = asyncio.Lock()
        ph_thr = int(args.ph_threshold)
        dh_thr = int(args.dh_threshold)

        q: asyncio.Queue[GroupRow | None] = asyncio.Queue(maxsize=int(args.queue_size))
        edge_cnt = 0
        edge_lock = asyncio.Lock()
        scanned = 0
        skipped_common = 0
        skipped_low_entropy = 0
        seen_total = 0  # producer가 seed를 몇 건 읽었는지(스킵 포함)
        progress_every = int(args.progress_every) if args.progress_every else 0

        async def producer():
            nonlocal skipped_common, skipped_low_entropy
            nonlocal seen_total
            async with pool.acquire() as c:
                async for s in iter_seed_groups(
                    c,
                    since,
                    before,
                    winner_icodes,
                    chunk_size=int(args.seed_chunk),
                    max_seeds=args.max_seeds,
                ):
                    seen_total += 1
                    if (blocked_phash and s.phash_hex in blocked_phash) or (blocked_dhash and s.dhash_hex in blocked_dhash):
                        skipped_common += 1
                    elif args.skip_low_entropy and (
                        _is_low_entropy_hex64(s.phash_hex, max_unique_nibbles=int(args.low_entropy_max_unique))
                        or _is_low_entropy_hex64(s.dhash_hex, max_unique_nibbles=int(args.low_entropy_max_unique))
                    ):
                        skipped_low_entropy += 1
                    else:
                        await q.put(s)

                    if progress_every and (seen_total % progress_every == 0):
                        elapsed = time.perf_counter() - t0
                        rate = (seen_total / elapsed) if elapsed > 0 else 0.0
                        if total_seeds:
                            pct = (100.0 * float(seen_total) / float(total_seeds)) if total_seeds else 0.0
                            eta = (float(total_seeds - seen_total) / rate) if (rate > 0 and total_seeds) else 0.0
                            print(
                                f"[INFO] producer seen={seen_total}/{total_seeds} ({pct:.2f}%) "
                                f"queued~{q.qsize()} skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy} "
                                f"elapsed={_fmt_hms(elapsed)} rate={rate:.1f}/s eta={_fmt_hms(eta)}",
                                flush=True,
                            )
                        else:
                            print(
                                f"[INFO] producer seen={seen_total} queued~{q.qsize()} "
                                f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy} "
                                f"elapsed={_fmt_hms(elapsed)} rate={rate:.1f}/s",
                                flush=True,
                            )

                    continue
            # 종료 시그널
            for _ in range(int(args.workers)):
                await q.put(None)

        async def worker(wid: int):
            nonlocal edge_cnt, scanned
            async with pool.acquire() as c:
                while True:
                    s = await q.get()
                    try:
                        if s is None:
                            return
                        scanned += 1
                        if not s.phash_bands:
                            continue
                        bands = s.phash_bands
                        if blocked_bands:
                            bands = [b for b in bands if b not in blocked_bands]
                        if not bands:
                            continue
                        cands = await fetch_candidates_by_bands(
                            c, group_id=s.group_id, bands=bands, limit=int(args.candidate_limit)
                        )
                        local_edges = 0
                        for gid2, ph2, dh2 in cands:
                            if (blocked_phash and ph2 in blocked_phash) or (blocked_dhash and dh2 in blocked_dhash):
                                continue
                            if args.skip_low_entropy and (
                                _is_low_entropy_hex64(ph2, max_unique_nibbles=int(args.low_entropy_max_unique))
                                or _is_low_entropy_hex64(dh2, max_unique_nibbles=int(args.low_entropy_max_unique))
                            ):
                                continue
                            ph_dist = _hamming64_hex(s.phash_hex, ph2)
                            if ph_dist > ph_thr:
                                continue
                            dh_dist = _hamming64_hex(s.dhash_hex, dh2)
                            if dh_dist > dh_thr:
                                continue
                            async with dsu_lock:
                                dsu.union(s.group_id, gid2)
                            local_edges += 1
                        if local_edges:
                            async with edge_lock:
                                edge_cnt += local_edges

                        if progress_every and (scanned % progress_every == 0):
                            elapsed = time.perf_counter() - t0
                            s_rate = (scanned / elapsed) if elapsed > 0 else 0.0
                            e_rate = (edge_cnt / elapsed) if elapsed > 0 else 0.0
                            if total_seeds:
                                pct = (100.0 * float(scanned) / float(total_seeds)) if total_seeds else 0.0
                                eta = (float(total_seeds - scanned) / s_rate) if (s_rate > 0 and total_seeds) else 0.0
                                print(
                                    f"[INFO] scanned seeds={scanned}/{total_seeds} ({pct:.2f}%) edges={edge_cnt} "
                                    f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy}"
                                    f" producer_seen={seen_total} q~{q.qsize()} "
                                    f"elapsed={_fmt_hms(elapsed)} rate={s_rate:.1f}/s edges_rate={e_rate:.1f}/s eta={_fmt_hms(eta)}",
                                    flush=True,
                                )
                            else:
                                print(
                                    f"[INFO] scanned seeds={scanned} edges={edge_cnt} "
                                    f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy}"
                                    f" producer_seen={seen_total} q~{q.qsize()} "
                                    f"elapsed={_fmt_hms(elapsed)} rate={s_rate:.1f}/s edges_rate={e_rate:.1f}/s",
                                    flush=True,
                                )
                    finally:
                        q.task_done()

        # 1) 후보 edge 수집 (producer + N workers)
        prod_task = asyncio.create_task(producer())
        worker_tasks = [asyncio.create_task(worker(i)) for i in range(int(args.workers))]
        try:
            await asyncio.gather(prod_task, *worker_tasks)
        finally:
            # 예외/취소 시에도 커넥션이 pool로 반환되도록 태스크 정리
            for t in [prod_task, *worker_tasks]:
                if not t.done():
                    t.cancel()
            await asyncio.gather(prod_task, *worker_tasks, return_exceptions=True)

        # 1-b) 보완: seed들끼리 직접 비교 (밴드 overlap이 0인 케이스 보정)
        if args.pairwise_seeds:
            # pairwise는 seed 리스트가 필요하므로, "테스트/소량"에서만 허용
            async with pool.acquire() as c1:
                seeds = await fetch_seed_groups(c1, since, before, winner_icodes)
            n = len(seeds)
            # 대용량이면 실수로 O(n^2) 터지는 것 방지
            if n > 5000:
                print(f"[WARN] pairwise-seeds skipped (n={n} > 5000). 좁혀서 실행하세요.")
            else:
                print(f"[INFO] pairwise-seeds enabled (n={n})")
                for i in range(n):
                    a = seeds[i]
                    for j in range(i + 1, n):
                        b = seeds[j]
                        ph_dist = _hamming64_hex(a.phash_hex, b.phash_hex)
                        if ph_dist > ph_thr:
                            continue
                        dh_dist = _hamming64_hex(a.dhash_hex, b.dhash_hex)
                        if dh_dist > dh_thr:
                            continue
                        dsu.union(a.group_id, b.group_id)
                        edge_cnt += 1

        comps = [v for v in dsu.groups().values() if len(v) >= 2]
        print(
            f"[INFO] components>=2 : {len(comps)}  (edges={edge_cnt}) "
            f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy}"
        )
        if not comps:
            return

        # 대량에서 이상징후(초대형 클러스터) 빠른 확인용 리포트
        try:
            topn = int(args.report_top_clusters) if args.report_top_clusters is not None else 0
        except Exception:
            topn = 0
        if topn and topn > 0:
            sizes = sorted(((len(c), i) for i, c in enumerate(comps)), reverse=True)[:topn]
            for rank, (sz, idx) in enumerate(sizes, start=1):
                sample = sorted(int(x) for x in comps[idx])[:20]
                more = max(0, sz - len(sample))
                print(f"[INFO] top_cluster#{rank} size={sz} sample={sample}" + (f" (+{more} more)" if more else ""))

        # 2) 타겟 선정
        # - winner_price 가장 낮은 그룹
        # - winner_vender_grade 가장 낮은 그룹
        # - created_at 가장 오래된 그룹
        # - tie-break: group_id 가장 작은 그룹
        all_ids = [gid for comp in comps for gid in comp]
        async with pool.acquire() as c2:
            metas = await fetch_group_meta(c2, all_ids)

        # 3) 병합 실행(또는 dry-run 출력)
        merge_jobs = 0
        skipped_large = 0
        merge_t0 = time.perf_counter()
        for comp in comps:
            comp = sorted({int(x) for x in comp})
            def _key(g: int):
                m = metas.get(g) or GroupMeta(group_id=g, created_at=None, winner_price=None, winner_vender_grade=None, winner_img_url=None, winner_iname=None)
                # winner_price 낮을수록 우선
                wp = m.winner_price if m.winner_price is not None else 10**18
                # vender_grade 낮을수록 우선 (없으면 뒤로)
                vg = m.winner_vender_grade if m.winner_vender_grade is not None else 10**9
                # created_at 오래될수록 우선 => datetime 오름차순
                ca = m.created_at or datetime.max
                return (wp, vg, ca, g)

            target = sorted(comp, key=_key)[0]
            sources = [g for g in comp if g != target]

            # iname 유사도 필터(오탐 병합 방지)
            iname_thr = float(args.iname_min_sim or 0.0)
            if iname_thr > 0:
                t_iname = metas.get(target).winner_iname if metas.get(target) else None
                kept: list[int] = []
                dropped = 0
                for s in sources:
                    s_iname = metas.get(s).winner_iname if metas.get(s) else None
                    sim = _iname_similarity(t_iname, s_iname)
                    if sim < iname_thr:
                        dropped += 1
                        continue
                    kept.append(s)
                if dropped:
                    print(f"[INFO] iname filter: target={target} dropped={dropped} kept={len(kept)} thr={iname_thr}", flush=True)
                sources = kept
                if not sources:
                    continue

            # (선택) 클러스터 내 winner img_url 분포 출력(오병합 원인 파악용)
            url_stats_txt = ""
            url_distinct = 0
            url_topk: list[tuple[str, int]] = []
            if args.report_url_stats:
                counts: dict[str, int] = {}
                for gid in comp:
                    u = (metas.get(gid).winner_img_url if metas.get(gid) else None) if gid in metas else None
                    nu = _norm_img_url(u) if u else ""
                    key = nu if nu else "(null)"
                    counts[key] = counts.get(key, 0) + 1
                url_distinct = len(counts)
                url_topk = sorted(counts.items(), key=lambda kv: (-kv[1], kv[0]))[: max(1, int(args.url_stats_top))]
                # url_stats_txt = " url_top=" + str(url_topk) + f" url_distinct={url_distinct}"

            # 안전장치: url distinct 비율이 너무 높으면 apply에서 스킵(오탐 가능성 높음)
            if not dry_run:
                thr = float(args.skip_if_url_distinct_ratio or 0.0)
                min_sz = int(args.skip_if_url_min_size or 0)
                if thr > 0 and len(comp) >= max(1, min_sz):
                    # url 통계를 안 찍더라도(distinct 계산은 필요) apply 안전을 위해 계산
                    if url_distinct == 0:
                        counts2: dict[str, int] = {}
                        for gid in comp:
                            u = (metas.get(gid).winner_img_url if metas.get(gid) else None) if gid in metas else None
                            nu = _norm_img_url(u) if u else ""
                            key = nu if nu else "(null)"
                            counts2[key] = counts2.get(key, 0) + 1
                        url_distinct = len(counts2)
                    ratio = (url_distinct / float(len(comp))) if comp else 1.0
                    if ratio > thr:
                        print(
                            f"[SKIP] url_distinct_ratio too high: {ratio:.3f} (distinct={url_distinct} size={len(comp)} thr={thr}) "
                            f"target={target} sources_count={len(sources)}",
                            flush=True,
                        )
                        skipped_large += 1
                        continue

            # 안전장치: 클러스터가 비정상적으로 크면 apply에서 자동 스킵(옵션으로 해제 가능)
            if not dry_run:
                max_sz = int(args.max_cluster_size) if args.max_cluster_size is not None else 0
                if max_sz and len(comp) > max_sz and not args.allow_large_cluster:
                    print(f"[SKIP] cluster too large (size={len(comp)} > max={max_sz}) target={target} (sources_count={len(sources)})")
                    skipped_large += 1
                    continue

            # 로그가 과도하게 길어지는 것 방지: sources 전체 대신 개수 + 샘플만 출력
            SAMPLE_N = 30
            src_sample = sources[:SAMPLE_N]
            src_more = max(0, len(sources) - len(src_sample))
            if dry_run:
                tm = metas.get(target)
                print(
                    f"[DRY] target={target} (winner_price={getattr(tm,'winner_price',None)}, "
                    f"winner_vender_grade={getattr(tm,'winner_vender_grade',None)}, "
                    f"created_at={getattr(tm,'created_at',None)}) <- sources_count={len(sources)} sample={src_sample}"
                    + (f" (+{src_more} more)" if src_more else "")
                    # + url_stats_txt
                )
            else:
                tm = metas.get(target)
                print(
                    f"[APPLY] target={target} (winner_price={getattr(tm,'winner_price',None)}, "
                    f"winner_vender_grade={getattr(tm,'winner_vender_grade',None)}, "
                    f"created_at={getattr(tm,'created_at',None)}) <- sources_count={len(sources)} sample={src_sample}"
                    + (f" (+{src_more} more)" if src_more else "")
                    # + url_stats_txt
                )
            async with pool.acquire() as c3:
                await merge_cluster(c3, target=target, sources=sources, dry_run=dry_run)
            merge_jobs += 1
            if progress_every and (merge_jobs % progress_every == 0):
                melapsed = time.perf_counter() - merge_t0
                mrate = (merge_jobs / melapsed) if melapsed > 0 else 0.0
                print(
                    f"[INFO] merge progress: merge_jobs={merge_jobs} skipped_large={skipped_large} "
                    f"elapsed={_fmt_hms(melapsed)} rate={mrate:.2f}/s",
                    flush=True,
                )

            # 점진 적용: N건 수행 후 종료
            if not dry_run and args.apply_limit is not None and merge_jobs >= int(args.apply_limit):
                print(f"[INFO] apply_limit reached: {merge_jobs}")
                break

        print(f"[INFO] merge_jobs={merge_jobs} skipped_large={skipped_large} dry_run={dry_run}")
    finally:
        # close가 오래 걸리면(미반환 커넥션 등) terminate로 강제 종료
        try:
            await asyncio.wait_for(pool.close(), timeout=30)
        except Exception:
            try:
                pool.terminate()
            except Exception:
                pass


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        # Ctrl+C 시 stacktrace 대신 깔끔한 종료
        print("[INFO] interrupted by user (Ctrl+C).", flush=True)

