#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
그룹 자동 병합 배치 (이미지 기준)

목표
- 서로 다른 group_id인데도 이미지가 사실상 동일(추천)인 케이스를 찾아 병합
- UI의 "그룹 병합" 로직과 동일하게:
  - shopprod_group_map.group_id 를 TARGET으로 이동
  - shopprod_sub_group.group_id 를 TARGET으로 이동
  - SOURCE 그룹이 비면 shopprod_group에서 삭제 시도(실패해도 무시)

후보 찾기 전략(안전/성능)
- 기본: phash_bands(LSH 밴딩)로 후보를 좁힌 뒤, phash/dhash 해밍거리(threshold)로 "동일(추천)" 판정
- 옵션: 동일 img_url(정규화) 기준으로 후보를 만들고, 필요시 거리 필터 적용

주의
- 대량 병합은 운영에 영향이 큼. 기본값은 --dry-run (미적용) 모드.
"""

from __future__ import annotations

import argparse
import asyncio
import difflib
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import AsyncIterator, Iterable

import asyncpg

from db_config import DB_INFO_ASYNCPG as DB_INFO


def _hamming64_hex(a: str, b: str) -> int:
    if not a or not b:
        return 10**9
    try:
        x = int(a, 16) ^ int(b, 16)
        return x.bit_count()
    except Exception:
        return 10**9


def _norm_img_url(u: str) -> str:
    """
    img_url 정규화:
    - 좌우 공백 제거
    - query string 제거(= 같은 리소스라도 hash 파라미터만 다른 케이스 완화)
    - 소문자화(스킴/호스트)
    """
    if not u:
        return ""
    u = str(u).strip()
    if not u:
        return ""
    u = u.split("#", 1)[0]
    u = u.split("?", 1)[0]
    # scheme/host만 lower 처리(간단화)
    m = re.match(r"^(https?://)([^/]+)(/.*)?$", u, flags=re.IGNORECASE)
    if m:
        scheme = m.group(1).lower()
        host = (m.group(2) or "").lower()
        rest = m.group(3) or ""
        return f"{scheme}{host}{rest}"
    return u


def _parse_dt(s: str | None) -> datetime | None:
    """
    CLI 입력 문자열을 datetime으로 변환(asyncpg timestamp 파라미터용).
    허용 형식:
    - YYYY-MM-DD
    - YYYY-MM-DD HH:MM:SS
    """
    if not s:
        return None
    s = str(s).strip()
    if not s:
        return None
    for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d"):
        try:
            return datetime.strptime(s, fmt)
        except Exception:
            pass
    raise ValueError(f"invalid datetime format: {s!r} (expected YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)")


def _norm_iname(s: str | None) -> str:
    if not s:
        return ""
    s = str(s).strip().lower()
    if not s:
        return ""
    s = re.sub(r"\s+", " ", s)
    return s


def _iname_similarity(a: str | None, b: str | None) -> float:
    """
    0~100 (%). 외부 의존성 없이 간단하게 문자열 유사도만 사용.
    - 한글/공백 포함 문자열에 대해 대략적인 오탐 제거용
    """
    a2 = _norm_iname(a)
    b2 = _norm_iname(b)
    if not a2 or not b2:
        return 0.0
    return difflib.SequenceMatcher(None, a2, b2).ratio() * 100.0


def _is_low_entropy_hex64(h: str, *, max_unique_nibbles: int = 4) -> bool:
    """
    64-bit hex(16자) 기준으로 '정보량이 낮은' 해시를 간단히 판정.
    - 단색/패턴/기본이미지에서 흔히 나오는 dhash/phash(예: 4d4d4d..., 0c0c0c...)
    - 정확한 판정이 아니라 '대량 오병합 방지용 안전장치'로 사용
    """
    if not h:
        return True
    h = str(h).strip().lower()
    if len(h) != 16:
        # 예상 길이가 아니면 여기서는 건드리지 않음
        return False
    # hex 문자만 남기고 검사(혹시 모를 언더스코어 등 방지)
    if not re.fullmatch(r"[0-9a-f]{16}", h):
        return False
    return len(set(h)) <= int(max_unique_nibbles)


@dataclass
class GroupRow:
    group_id: int
    created_at: datetime | None
    phash_hex: str
    dhash_hex: str
    phash_bands: list[int]
    winner_updated_at: datetime | None
    winner_price: int | None


async def fetch_seed_groups(
    conn: asyncpg.Connection,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
) -> list[GroupRow]:
    # winner_updated_at 기준으로 seed 범위 제한
    where = ["phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    sql = f"""
        SELECT group_id, created_at, phash_hex, dhash_hex, phash_bands, winner_updated_at, winner_price
        FROM mlinkdw.shopprod_group2
        WHERE {' AND '.join(where)}
    """
    rows = await conn.fetch(sql, *params) if params else await conn.fetch(sql)
    out: list[GroupRow] = []
    for r in rows:
        out.append(
            GroupRow(
                group_id=int(r["group_id"]),
                created_at=r["created_at"],
                phash_hex=str(r["phash_hex"] or ""),
                dhash_hex=str(r["dhash_hex"] or ""),
                phash_bands=list(r["phash_bands"] or []),
                winner_updated_at=r["winner_updated_at"],
                winner_price=int(r["winner_price"]) if r["winner_price"] is not None else None,
            )
        )
    return out


async def iter_seed_groups(
    conn: asyncpg.Connection,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
    *,
    chunk_size: int = 5000,
    max_seeds: int | None = None,
) -> AsyncIterator[GroupRow]:
    """
    seeds를 전체 메모리에 올리지 않고 서버 커서로 스트리밍.
    """
    where = ["phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    sql = f"""
        SELECT group_id, created_at, phash_hex, dhash_hex, phash_bands, winner_updated_at, winner_price
        FROM mlinkdw.shopprod_group2
        WHERE {' AND '.join(where)}
    """
    # asyncpg 서버 커서는 트랜잭션 내부에서만 생성/순회 가능
    async with conn.transaction():
        n = 0
        cur = conn.cursor(sql, *params, prefetch=chunk_size)
        try:
            async for r in cur:
                yield GroupRow(
                    group_id=int(r["group_id"]),
                    created_at=r["created_at"],
                    phash_hex=str(r["phash_hex"] or ""),
                    dhash_hex=str(r["dhash_hex"] or ""),
                    phash_bands=list(r["phash_bands"] or []),
                    winner_updated_at=r["winner_updated_at"],
                    winner_price=int(r["winner_price"]) if r["winner_price"] is not None else None,
                )
                n += 1
                if max_seeds is not None and n >= int(max_seeds):
                    break
        finally:
            # asyncpg cursor가 close()를 제공하는 경우 명시적으로 종료해 connection 반환 지연을 줄임
            try:
                close_meth = getattr(cur, "close", None)
                if close_meth:
                    res = close_meth()
                    if asyncio.iscoroutine(res):
                        await res
            except Exception:
                pass


async def fetch_seed_count(
    conn: asyncpg.Connection,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
) -> int:
    where = ["phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    sql = f"""
        SELECT COUNT(*) AS cnt
        FROM mlinkdw.shopprod_group2
        WHERE {' AND '.join(where)}
    """
    row = await conn.fetchrow(sql, *params) if params else await conn.fetchrow(sql)
    return int(row["cnt"] or 0) if row else 0


async def fetch_common_hashes(
    conn: asyncpg.Connection,
    *,
    kind: str,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
    min_count: int,
    limit: int,
) -> list[tuple[str, int]]:
    """
    seed 범위(필터 적용) 내에서 자주 등장하는 phash_hex/dhash_hex를 찾아서
    '기본/공통 이미지'에 가까운 해시를 배치에서 차단할 수 있게 한다.
    """
    kind = str(kind).strip().lower()
    if kind not in ("phash", "dhash"):
        raise ValueError(f"invalid kind: {kind!r}")
    col = "phash_hex" if kind == "phash" else "dhash_hex"

    where = [f"{col} IS NOT NULL", "phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    params.append(int(min_count))
    params.append(int(limit))

    sql = f"""
        SELECT {col} AS h, COUNT(*)::int AS cnt
        FROM mlinkdw.shopprod_group2
        WHERE {' AND '.join(where)}
        GROUP BY {col}
        HAVING COUNT(*) >= ${len(params)-1}::int
        ORDER BY cnt DESC
        LIMIT ${len(params)}::int
    """
    rows = await conn.fetch(sql, *params)
    return [(str(r["h"]), int(r["cnt"])) for r in rows if r["h"]]


async def fetch_hot_bands(
    conn: asyncpg.Connection,
    *,
    since: datetime | None,
    before: datetime | None,
    winner_icodes: list[str] | None,
    min_count: int,
    limit: int,
) -> list[tuple[int, int]]:
    """
    seed 범위(필터 적용) 내에서 자주 등장하는 phash_bands 요소를 찾아,
    'stopword'처럼 후보 검색에서 제외할 수 있게 한다.

    예) 특정 밴드값이 너무 흔하면(phash_bands && ...) 후보가 폭주하고,
        전이(DSU)로 초대형 클러스터가 생길 수 있음.
    """
    where = ["phash_bands IS NOT NULL", "phash_hex IS NOT NULL", "dhash_hex IS NOT NULL"]
    params: list[object] = []
    if since:
        params.append(since)
        where.append(f"winner_updated_at >= ${len(params)}::timestamp")
    if before:
        params.append(before)
        where.append(f"winner_updated_at <= ${len(params)}::timestamp")
    if winner_icodes:
        params.append(winner_icodes)
        where.append(f"winner_icode = ANY(${len(params)}::text[])")

    params.append(int(min_count))
    params.append(int(limit))

    sql = f"""
        SELECT b AS band, COUNT(*)::int AS cnt
        FROM mlinkdw.shopprod_group2 g
        CROSS JOIN LATERAL unnest(g.phash_bands) AS b
        WHERE {' AND '.join(where)}
        GROUP BY b
        HAVING COUNT(*) >= ${len(params)-1}::int
        ORDER BY cnt DESC
        LIMIT ${len(params)}::int
    """
    rows = await conn.fetch(sql, *params)
    return [(int(r["band"]), int(r["cnt"])) for r in rows if r["band"] is not None]


async def fetch_candidates_by_bands(
    conn: asyncpg.Connection, *, group_id: int, bands: list[int], limit: int
) -> list[tuple[int, str, str]]:
    if not bands:
        return []
    # phash_bands GIN 인덱스가 있으면 && 조건이 빠르게 후보를 줄여줌
    rows = await conn.fetch(
        """
        SELECT group_id, phash_hex, dhash_hex
        FROM mlinkdw.shopprod_group2
        WHERE group_id <> $1
          AND phash_hex IS NOT NULL AND dhash_hex IS NOT NULL
          AND phash_bands && $2::int4[]
        LIMIT $3
        """,
        group_id,
        bands,
        limit,
    )
    return [(int(r["group_id"]), str(r["phash_hex"]), str(r["dhash_hex"])) for r in rows]


class DSU:
    def __init__(self):
        self.p: dict[int, int] = {}
        self.sz: dict[int, int] = {}

    def find(self, x: int) -> int:
        if x not in self.p:
            self.p[x] = x
            self.sz[x] = 1
            return x
        while self.p[x] != x:
            self.p[x] = self.p[self.p[x]]
            x = self.p[x]
        return x

    def union(self, a: int, b: int) -> None:
        ra, rb = self.find(a), self.find(b)
        if ra == rb:
            return
        if self.sz[ra] < self.sz[rb]:
            ra, rb = rb, ra
        self.p[rb] = ra
        self.sz[ra] = self.sz.get(ra, 1) + self.sz.get(rb, 1)

    def groups(self) -> dict[int, list[int]]:
        out: dict[int, list[int]] = {}
        for x in list(self.p.keys()):
            r = self.find(x)
            out.setdefault(r, []).append(x)
        return out


@dataclass
class GroupMeta:
    group_id: int
    created_at: datetime | None
    winner_price: int | None
    winner_vender_grade: int | None
    winner_img_url: str | None
    winner_iname: str | None


async def fetch_group_meta(conn: asyncpg.Connection, group_ids: Iterable[int]) -> dict[int, GroupMeta]:
    """
    클러스터별 TARGET 선택을 위한 메타:
    - winner_price(가장 낮은 가격)
    - winner_vender_grade(가장 낮은 vender_grade) : shopprod_group_map에서 winner_icode 행 기준
    - created_at(가장 오래된 그룹)
    """
    gids_all = list({int(x) for x in group_ids})
    if not gids_all:
        return {}

    m: dict[int, GroupMeta] = {}
    CHUNK = 50000  # 파라미터/패킷 크기 방지
    for off in range(0, len(gids_all), CHUNK):
        gids = gids_all[off : off + CHUNK]
        rows = await conn.fetch(
            """
            SELECT
              g.group_id,
              g.created_at,
              g.winner_price,
              CASE
                WHEN gm.vender_grade ~ '^[0-9]+$' THEN gm.vender_grade::int
                ELSE NULL
              END AS winner_vender_grade
              , gm.img_url AS winner_img_url
              , gm.iname   AS winner_iname
            FROM mlinkdw.shopprod_group2 g
            LEFT JOIN mlinkdw.shopprod_group_map2 gm ON gm.icode = g.winner_icode AND gm.group_id = g.group_id
            WHERE g.group_id = ANY($1::int[])
            """,
            gids,
        )
        for r in rows:
            m[int(r["group_id"])] = GroupMeta(
                group_id=int(r["group_id"]),
                created_at=r["created_at"],
                winner_price=int(r["winner_price"]) if r["winner_price"] is not None else None,
                winner_vender_grade=int(r["winner_vender_grade"]) if r["winner_vender_grade"] is not None else None,
                winner_img_url=str(r["winner_img_url"]) if r["winner_img_url"] else None,
                winner_iname=str(r["winner_iname"]) if r["winner_iname"] else None,
            )

    # 없는 것은 None
    for g in gids_all:
        m.setdefault(
            g,
            GroupMeta(
                group_id=g,
                created_at=None,
                winner_price=None,
                winner_vender_grade=None,
                winner_img_url=None,
                winner_iname=None,
            ),
        )
    return m


async def merge_cluster(
    conn: asyncpg.Connection,
    *,
    target: int,
    sources: list[int],
    dry_run: bool,
) -> None:
    if not sources:
        return
    if dry_run:
        return

    # UI 로직과 동일한 의미의 병합을 "배치 UPDATE"로 수행(대량에서 속도/락 시간 개선)
    async with conn.transaction():
        srcs = [int(x) for x in sources]
        # 0) content_token 정리 정책:
        # - winner_icode(=winner 상품 1개)의 token만 유지하는 구조이므로
        #   source token은 삭제하고, target에서도 winner가 아닌 token은 제거한다.
        try:
            await conn.execute(
                "DELETE FROM mlinkdw.shopprod_group_content_token WHERE group_id = ANY($1::int[])",
                srcs,
            )
            await conn.execute(
                """
                DELETE FROM mlinkdw.shopprod_group_content_token t
                WHERE t.group_id = $1
                  AND EXISTS (
                    SELECT 1
                    FROM mlinkdw.shopprod_group2 g
                    WHERE g.group_id = t.group_id
                      AND (t.vender_code IS DISTINCT FROM g.winner_vender_code
                           OR t.icode IS DISTINCT FROM g.winner_icode)
                  )
                """,
                int(target),
            )
        except Exception:
            pass

        await conn.execute(
            """
            UPDATE mlinkdw.shopprod_group_map2
            SET group_id = $1
            WHERE group_id = ANY($2::int[])
            """,
            target,
            srcs,
        )
        await conn.execute(
            """
            UPDATE mlinkdw.shopprod_sub_group2
            SET group_id = $1
            WHERE group_id = ANY($2::int[])
            """,
            target,
            srcs,
        )
        # SOURCE 그룹이 완전히 비었으면 삭제(존재하면 남김). 실패는 무시.
        try:
            await conn.execute(
                """
                DELETE FROM mlinkdw.shopprod_group2 g
                WHERE g.group_id = ANY($1::int[])
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_group_map2 gm WHERE gm.group_id = g.group_id)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_sub_group2 sg WHERE sg.group_id = g.group_id)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_group_content_token ct WHERE ct.group_id = g.group_id)
                """,
                srcs,
            )
        except Exception:
            pass


async def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--since", default="2026-01-01 00:00:00", help="seed 그룹 필터: winner_updated_at >= since (예: 2026-01-18 00:00:00)")
    ap.add_argument("--before", default="2026-01-16 23:59:59", help="seed 그룹 필터: winner_updated_at <= before (예: 2026-01-18 23:59:59)")
    ap.add_argument("--yesterday", action="store_true", help="--since 를 어제로 자동 설정 (00:00:00)")
    ap.add_argument(
        "--winner-icode",
        action="append",
        default=None,
        help="테스트용 seed 제한: winner_icode (여러 번 지정 가능). 예) --winner-icode WAC4CBF --winner-icode 60051975",
    )
    ap.add_argument(
        "--winner-icodes",
        default=None,
        help="테스트용 seed 제한: winner_icode 콤마 구분. 예) --winner-icodes WAC4CBF,60051975",
    )
    ap.add_argument("--ph-threshold", type=int, default=12)
    ap.add_argument("--dh-threshold", type=int, default=14)
    ap.add_argument("--candidate-limit", type=int, default=5000, help="seed 1개당 band 후보 상한")
    ap.add_argument(
        "--pairwise-seeds",
        action="store_true",
        default=False,
        help="LSH(bands) overlap으로 후보를 못 찾는 케이스 보완: seed들끼리 직접 해밍거리 비교(O(n^2)). "
             "테스트/소량 seed에서만 권장.",
    )
    ap.add_argument("--dry-run", action="store_true", default=True, help="기본: 적용 안함(로그만)")
    ap.add_argument("--apply", action="store_true", help="실제 DB 병합 적용")
    ap.add_argument("--workers", type=int, default=8, help="seed 후보 조회 병렬 워커 수(DB 부하에 맞게 조절)")
    ap.add_argument("--queue-size", type=int, default=2000, help="producer/consumer 큐 크기")
    ap.add_argument("--seed-chunk", type=int, default=5000, help="서버 커서 prefetch 크기")
    ap.add_argument("--max-seeds", type=int, default=None, help="테스트용 seed 상한(대량 실행시 미사용)")
    ap.add_argument("--no-count", action="store_true", help="seed count 미리 조회 생략(빠르게 시작)")
    ap.add_argument("--apply-limit", type=int, default=None, help="apply 모드에서 병합 작업을 N건까지만 수행(점진 적용용)")
    ap.add_argument("--report-top-clusters", type=int, default=10, help="컴포넌트(클러스터) 크기 상위 N개 리포트(기본 10)")
    ap.add_argument(
        "--max-cluster-size",
        type=int,
        default=200,
        help="안전장치: 1개 컴포넌트(클러스터) 최대 크기. apply 모드에서 이 값을 초과하면 스킵(기본 200).",
    )
    ap.add_argument(
        "--allow-large-cluster",
        action="store_true",
        help="--max-cluster-size 초과 클러스터도 적용 허용(운영 주의).",
    )
    ap.add_argument(
        "--skip-common-phash",
        type=int,
        default=500,
        help="안전장치: seed 범위에서 phash_hex가 N회 이상 등장하면 '공통 이미지'로 보고 비교/병합에서 제외 (기본 500). 0이면 비활성",
    )
    ap.add_argument(
        "--skip-common-dhash",
        type=int,
        default=500,
        help="안전장치: seed 범위에서 dhash_hex가 N회 이상 등장하면 '공통 이미지'로 보고 비교/병합에서 제외 (기본 500). 0이면 비활성",
    )
    ap.add_argument(
        "--common-hash-top",
        type=int,
        default=20,
        help="skip-common-* 산정 시 상위 N개만 차단/출력 (기본 20)",
    )
    ap.add_argument(
        "--skip-low-entropy",
        action="store_true",
        default=True,
        help="안전장치: 단색/패턴/기본이미지 계열로 추정되는 low-entropy 해시(예: 4d4d..., 0c0c...)는 비교/병합에서 제외",
    )
    ap.add_argument(
        "--no-skip-low-entropy",
        action="store_true",
        help="--skip-low-entropy 비활성화",
    )
    ap.add_argument(
        "--low-entropy-max-unique",
        type=int,
        default=4,
        help="low-entropy 판정 기준: 16자리 hex에서 서로 다른 nibble 종류 수가 N 이하이면 제외(기본 4)",
    )
    ap.add_argument(
        "--progress-every",
        type=int,
        default=5000,
        help="진행 로그 출력 주기(기본 5000). producer 진행(스킵 포함)과 worker scanned를 함께 로깅",
    )
    ap.add_argument(
        "--skip-hot-bands",
        type=int,
        default=500,
        help="안전장치: seed 범위에서 특정 phash_bands 요소가 N회 이상 등장하면 후보검색(bands && ...)에서 제외(기본 500). 0이면 비활성",
    )
    ap.add_argument(
        "--hot-bands-top",
        type=int,
        default=50,
        help="skip-hot-bands 산정 시 상위 N개만 제외/출력 (기본 50)",
    )
    ap.add_argument(
        "--report-url-stats",
        action="store_true",
        default=True,
        help="DRY/APPLY 로그에 클러스터 내 winner img_url(정규화) 분포 통계를 함께 출력",
    )
    ap.add_argument(
        "--no-report-url-stats",
        action="store_true",
        help="--report-url-stats 비활성화",
    )
    ap.add_argument(
        "--url-stats-top",
        type=int,
        default=5,
        help="클러스터 url 통계 출력 시 상위 N개만 표시(기본 5)",
    )
    ap.add_argument(
        "--skip-if-url-distinct-ratio",
        type=float,
        default=0.8,
        help=(
            "안전장치(apply): 클러스터 내 winner img_url(정규화) distinct 비율이 임계값을 초과하면 병합 스킵. "
            "예) size=100, distinct=95 => 0.95. 기본 0.8. 0 이하면 비활성"
        ),
    )
    ap.add_argument(
        "--skip-if-url-min-size",
        type=int,
        default=10,
        help="--skip-if-url-distinct-ratio 적용 최소 클러스터 크기(기본 10). 너무 작은 클러스터는 예외로 둠.",
    )
    ap.add_argument(
        "--iname-min-sim",
        type=float,
        default=0.0,
        help="병합 필터: TARGET winner_iname 과 SOURCE winner_iname 유사도(%)가 이 값 미만이면 병합 제외. 예) 30",
    )
    args = ap.parse_args()

    since_s: str | None = args.since
    before_s: str | None = args.before
    if args.yesterday:
        since_s = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d 00:00:00")

    try:
        since = _parse_dt(since_s)
        before = _parse_dt(before_s)
    except Exception as e:
        raise SystemExit(f"[ARG] since/before 파싱 실패: {e}")

    winner_icodes: list[str] | None = None
    if args.winner_icodes:
        winner_icodes = [s.strip() for s in str(args.winner_icodes).split(",") if s.strip()]
    if args.winner_icode:
        winner_icodes = (winner_icodes or []) + [str(s).strip() for s in args.winner_icode if str(s).strip()]
    if winner_icodes:
        # 중복 제거(순서 유지)
        seen = set()
        tmp = []
        for s in winner_icodes:
            if s in seen:
                continue
            seen.add(s)
            tmp.append(s)
        winner_icodes = tmp

    # winner_icode로 seed를 강하게 제한하는 테스트 모드라면 pairwise 비교를 기본 켬(안전)
    if winner_icodes and not args.pairwise_seeds:
        args.pairwise_seeds = True

    dry_run = True
    if args.apply:
        dry_run = False

    # skip-low-entropy 토글 (argparse는 상호배타를 안 걸었으므로 여기서 정리)
    if args.no_skip_low_entropy:
        args.skip_low_entropy = False
    if args.no_report_url_stats:
        args.report_url_stats = False

    pool = await asyncpg.create_pool(**DB_INFO, min_size=1, max_size=max(2, int(args.workers)))
    try:
        # 대량 실행에서는 count를 먼저 찍어주면 운영에서 진행률 판단이 쉬움(원치 않으면 --no-count)
        total_seeds: int | None = None
        if not args.no_count:
            async with pool.acquire() as c0:
                total_seeds = await fetch_seed_count(c0, since, before, winner_icodes)
        print(
            f"[INFO] seeds_count={total_seeds if total_seeds is not None else 'N/A'} "
            f"since={since} before={before} winner_icodes={winner_icodes!r} "
            f"workers={int(args.workers)}"
        )

        # 공통(기본) 이미지 해시 차단 세트 계산
        blocked_phash: set[str] = set()
        blocked_dhash: set[str] = set()
        blocked_bands: set[int] = set()
        if int(args.skip_common_phash) > 0 or int(args.skip_common_dhash) > 0:
            async with pool.acquire() as c_block:
                topn = max(1, int(args.common_hash_top))
                if int(args.skip_common_phash) > 0:
                    common = await fetch_common_hashes(
                        c_block,
                        kind="phash",
                        since=since,
                        before=before,
                        winner_icodes=winner_icodes,
                        min_count=int(args.skip_common_phash),
                        limit=topn,
                    )
                    blocked_phash = {h for h, _ in common}
                    if common:
                        print("[INFO] blocked common phash_hex (top): " + ", ".join([f"{h}({c})" for h, c in common]))
                if int(args.skip_common_dhash) > 0:
                    common = await fetch_common_hashes(
                        c_block,
                        kind="dhash",
                        since=since,
                        before=before,
                        winner_icodes=winner_icodes,
                        min_count=int(args.skip_common_dhash),
                        limit=topn,
                    )
                    blocked_dhash = {h for h, _ in common}
                    if common:
                        print("[INFO] blocked common dhash_hex (top): " + ", ".join([f"{h}({c})" for h, c in common]))

        # 핫 밴드(stopword) 차단 세트 계산
        if int(args.skip_hot_bands) > 0:
            async with pool.acquire() as c_band:
                topn_b = max(1, int(args.hot_bands_top))
                hot = await fetch_hot_bands(
                    c_band,
                    since=since,
                    before=before,
                    winner_icodes=winner_icodes,
                    min_count=int(args.skip_hot_bands),
                    limit=topn_b,
                )
                blocked_bands = {b for b, _ in hot}
                if hot:
                    print("[INFO] blocked hot bands (top): " + ", ".join([f"{b}({c})" for b, c in hot]), flush=True)

        dsu = DSU()
        dsu_lock = asyncio.Lock()
        ph_thr = int(args.ph_threshold)
        dh_thr = int(args.dh_threshold)

        q: asyncio.Queue[GroupRow | None] = asyncio.Queue(maxsize=int(args.queue_size))
        edge_cnt = 0
        edge_lock = asyncio.Lock()
        scanned = 0
        skipped_common = 0
        skipped_low_entropy = 0
        seen_total = 0  # producer가 seed를 몇 건 읽었는지(스킵 포함)

        async def producer():
            nonlocal skipped_common, skipped_low_entropy
            nonlocal seen_total
            async with pool.acquire() as c:
                async for s in iter_seed_groups(
                    c,
                    since,
                    before,
                    winner_icodes,
                    chunk_size=int(args.seed_chunk),
                    max_seeds=args.max_seeds,
                ):
                    seen_total += 1
                    if (blocked_phash and s.phash_hex in blocked_phash) or (blocked_dhash and s.dhash_hex in blocked_dhash):
                        skipped_common += 1
                    elif args.skip_low_entropy and (
                        _is_low_entropy_hex64(s.phash_hex, max_unique_nibbles=int(args.low_entropy_max_unique))
                        or _is_low_entropy_hex64(s.dhash_hex, max_unique_nibbles=int(args.low_entropy_max_unique))
                    ):
                        skipped_low_entropy += 1
                    else:
                        await q.put(s)

                    pe = int(args.progress_every) if args.progress_every else 0
                    if pe and (seen_total % pe == 0):
                        if total_seeds:
                            print(
                                f"[INFO] producer seen={seen_total}/{total_seeds} queued~{q.qsize()} "
                                f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy}",
                                flush=True,
                            )
                        else:
                            print(
                                f"[INFO] producer seen={seen_total} queued~{q.qsize()} "
                                f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy}",
                                flush=True,
                            )

                    continue
            # 종료 시그널
            for _ in range(int(args.workers)):
                await q.put(None)

        async def worker(wid: int):
            nonlocal edge_cnt, scanned
            async with pool.acquire() as c:
                while True:
                    s = await q.get()
                    try:
                        if s is None:
                            return
                        scanned += 1
                        if not s.phash_bands:
                            continue
                        bands = s.phash_bands
                        if blocked_bands:
                            bands = [b for b in bands if b not in blocked_bands]
                        if not bands:
                            continue
                        cands = await fetch_candidates_by_bands(
                            c, group_id=s.group_id, bands=bands, limit=int(args.candidate_limit)
                        )
                        local_edges = 0
                        for gid2, ph2, dh2 in cands:
                            if (blocked_phash and ph2 in blocked_phash) or (blocked_dhash and dh2 in blocked_dhash):
                                continue
                            if args.skip_low_entropy and (
                                _is_low_entropy_hex64(ph2, max_unique_nibbles=int(args.low_entropy_max_unique))
                                or _is_low_entropy_hex64(dh2, max_unique_nibbles=int(args.low_entropy_max_unique))
                            ):
                                continue
                            ph_dist = _hamming64_hex(s.phash_hex, ph2)
                            if ph_dist > ph_thr:
                                continue
                            dh_dist = _hamming64_hex(s.dhash_hex, dh2)
                            if dh_dist > dh_thr:
                                continue
                            async with dsu_lock:
                                dsu.union(s.group_id, gid2)
                            local_edges += 1
                        if local_edges:
                            async with edge_lock:
                                edge_cnt += local_edges

                        if scanned % 5000 == 0:
                            if total_seeds:
                                print(
                                    f"[INFO] scanned seeds={scanned}/{total_seeds} edges={edge_cnt} "
                                    f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy}"
                                    f" producer_seen={seen_total} q~{q.qsize()}",
                                    flush=True,
                                )
                            else:
                                print(
                                    f"[INFO] scanned seeds={scanned} edges={edge_cnt} "
                                    f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy}"
                                    f" producer_seen={seen_total} q~{q.qsize()}",
                                    flush=True,
                                )
                    finally:
                        q.task_done()

        # 1) 후보 edge 수집 (producer + N workers)
        prod_task = asyncio.create_task(producer())
        worker_tasks = [asyncio.create_task(worker(i)) for i in range(int(args.workers))]
        try:
            await asyncio.gather(prod_task, *worker_tasks)
        finally:
            # 예외/취소 시에도 커넥션이 pool로 반환되도록 태스크 정리
            for t in [prod_task, *worker_tasks]:
                if not t.done():
                    t.cancel()
            await asyncio.gather(prod_task, *worker_tasks, return_exceptions=True)

        # 1-b) 보완: seed들끼리 직접 비교 (밴드 overlap이 0인 케이스 보정)
        if args.pairwise_seeds:
            # pairwise는 seed 리스트가 필요하므로, "테스트/소량"에서만 허용
            async with pool.acquire() as c1:
                seeds = await fetch_seed_groups(c1, since, before, winner_icodes)
            n = len(seeds)
            # 대용량이면 실수로 O(n^2) 터지는 것 방지
            if n > 5000:
                print(f"[WARN] pairwise-seeds skipped (n={n} > 5000). 좁혀서 실행하세요.")
            else:
                print(f"[INFO] pairwise-seeds enabled (n={n})")
                for i in range(n):
                    a = seeds[i]
                    for j in range(i + 1, n):
                        b = seeds[j]
                        ph_dist = _hamming64_hex(a.phash_hex, b.phash_hex)
                        if ph_dist > ph_thr:
                            continue
                        dh_dist = _hamming64_hex(a.dhash_hex, b.dhash_hex)
                        if dh_dist > dh_thr:
                            continue
                        dsu.union(a.group_id, b.group_id)
                        edge_cnt += 1

        comps = [v for v in dsu.groups().values() if len(v) >= 2]
        print(
            f"[INFO] components>=2 : {len(comps)}  (edges={edge_cnt}) "
            f"skipped_common={skipped_common} skipped_low_entropy={skipped_low_entropy}"
        )
        if not comps:
            return

        # 대량에서 이상징후(초대형 클러스터) 빠른 확인용 리포트
        try:
            topn = int(args.report_top_clusters) if args.report_top_clusters is not None else 0
        except Exception:
            topn = 0
        if topn and topn > 0:
            sizes = sorted(((len(c), i) for i, c in enumerate(comps)), reverse=True)[:topn]
            for rank, (sz, idx) in enumerate(sizes, start=1):
                sample = sorted(int(x) for x in comps[idx])[:20]
                more = max(0, sz - len(sample))
                print(f"[INFO] top_cluster#{rank} size={sz} sample={sample}" + (f" (+{more} more)" if more else ""))

        # 2) 타겟 선정
        # - winner_price 가장 낮은 그룹
        # - winner_vender_grade 가장 낮은 그룹
        # - created_at 가장 오래된 그룹
        # - tie-break: group_id 가장 작은 그룹
        all_ids = [gid for comp in comps for gid in comp]
        async with pool.acquire() as c2:
            metas = await fetch_group_meta(c2, all_ids)

        # 3) 병합 실행(또는 dry-run 출력)
        merge_jobs = 0
        skipped_large = 0
        for comp in comps:
            comp = sorted({int(x) for x in comp})
            def _key(g: int):
                m = metas.get(g) or GroupMeta(group_id=g, created_at=None, winner_price=None, winner_vender_grade=None, winner_img_url=None, winner_iname=None)
                # winner_price 낮을수록 우선
                wp = m.winner_price if m.winner_price is not None else 10**18
                # vender_grade 낮을수록 우선 (없으면 뒤로)
                vg = m.winner_vender_grade if m.winner_vender_grade is not None else 10**9
                # created_at 오래될수록 우선 => datetime 오름차순
                ca = m.created_at or datetime.max
                return (wp, vg, ca, g)

            target = sorted(comp, key=_key)[0]
            sources = [g for g in comp if g != target]

            # iname 유사도 필터(오탐 병합 방지)
            iname_thr = float(args.iname_min_sim or 0.0)
            if iname_thr > 0:
                t_iname = metas.get(target).winner_iname if metas.get(target) else None
                kept: list[int] = []
                dropped = 0
                for s in sources:
                    s_iname = metas.get(s).winner_iname if metas.get(s) else None
                    sim = _iname_similarity(t_iname, s_iname)
                    if sim < iname_thr:
                        dropped += 1
                        continue
                    kept.append(s)
                if dropped:
                    print(f"[INFO] iname filter: target={target} dropped={dropped} kept={len(kept)} thr={iname_thr}", flush=True)
                sources = kept
                if not sources:
                    continue

            # (선택) 클러스터 내 winner img_url 분포 출력(오병합 원인 파악용)
            url_stats_txt = ""
            url_distinct = 0
            url_topk: list[tuple[str, int]] = []
            if args.report_url_stats:
                counts: dict[str, int] = {}
                for gid in comp:
                    u = (metas.get(gid).winner_img_url if metas.get(gid) else None) if gid in metas else None
                    nu = _norm_img_url(u) if u else ""
                    key = nu if nu else "(null)"
                    counts[key] = counts.get(key, 0) + 1
                url_distinct = len(counts)
                url_topk = sorted(counts.items(), key=lambda kv: (-kv[1], kv[0]))[: max(1, int(args.url_stats_top))]
                # url_stats_txt = " url_top=" + str(url_topk) + f" url_distinct={url_distinct}"

            # 안전장치: url distinct 비율이 너무 높으면 apply에서 스킵(오탐 가능성 높음)
            if not dry_run:
                thr = float(args.skip_if_url_distinct_ratio or 0.0)
                min_sz = int(args.skip_if_url_min_size or 0)
                if thr > 0 and len(comp) >= max(1, min_sz):
                    # url 통계를 안 찍더라도(distinct 계산은 필요) apply 안전을 위해 계산
                    if url_distinct == 0:
                        counts2: dict[str, int] = {}
                        for gid in comp:
                            u = (metas.get(gid).winner_img_url if metas.get(gid) else None) if gid in metas else None
                            nu = _norm_img_url(u) if u else ""
                            key = nu if nu else "(null)"
                            counts2[key] = counts2.get(key, 0) + 1
                        url_distinct = len(counts2)
                    ratio = (url_distinct / float(len(comp))) if comp else 1.0
                    if ratio > thr:
                        print(
                            f"[SKIP] url_distinct_ratio too high: {ratio:.3f} (distinct={url_distinct} size={len(comp)} thr={thr}) "
                            f"target={target} sources_count={len(sources)}",
                            flush=True,
                        )
                        skipped_large += 1
                        continue

            # 안전장치: 클러스터가 비정상적으로 크면 apply에서 자동 스킵(옵션으로 해제 가능)
            if not dry_run:
                max_sz = int(args.max_cluster_size) if args.max_cluster_size is not None else 0
                if max_sz and len(comp) > max_sz and not args.allow_large_cluster:
                    print(f"[SKIP] cluster too large (size={len(comp)} > max={max_sz}) target={target} (sources_count={len(sources)})")
                    skipped_large += 1
                    continue

            # 로그가 과도하게 길어지는 것 방지: sources 전체 대신 개수 + 샘플만 출력
            SAMPLE_N = 30
            src_sample = sources[:SAMPLE_N]
            src_more = max(0, len(sources) - len(src_sample))
            if dry_run:
                tm = metas.get(target)
                print(
                    f"[DRY] target={target} (winner_price={getattr(tm,'winner_price',None)}, "
                    f"winner_vender_grade={getattr(tm,'winner_vender_grade',None)}, "
                    f"created_at={getattr(tm,'created_at',None)}) <- sources_count={len(sources)} sample={src_sample}"
                    + (f" (+{src_more} more)" if src_more else "")
                    # + url_stats_txt
                )
            else:
                tm = metas.get(target)
                print(
                    f"[APPLY] target={target} (winner_price={getattr(tm,'winner_price',None)}, "
                    f"winner_vender_grade={getattr(tm,'winner_vender_grade',None)}, "
                    f"created_at={getattr(tm,'created_at',None)}) <- sources_count={len(sources)} sample={src_sample}"
                    + (f" (+{src_more} more)" if src_more else "")
                    # + url_stats_txt
                )
            async with pool.acquire() as c3:
                await merge_cluster(c3, target=target, sources=sources, dry_run=dry_run)
            merge_jobs += 1

            # 점진 적용: N건 수행 후 종료
            if not dry_run and args.apply_limit is not None and merge_jobs >= int(args.apply_limit):
                print(f"[INFO] apply_limit reached: {merge_jobs}")
                break

        print(f"[INFO] merge_jobs={merge_jobs} skipped_large={skipped_large} dry_run={dry_run}")
    finally:
        # close가 오래 걸리면(미반환 커넥션 등) terminate로 강제 종료
        try:
            await asyncio.wait_for(pool.close(), timeout=30)
        except Exception:
            try:
                pool.terminate()
            except Exception:
                pass


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        # Ctrl+C 시 stacktrace 대신 깔끔한 종료
        print("[INFO] interrupted by user (Ctrl+C).", flush=True)

