# -*- coding: utf-8 -*-
"""
OWN ↔ DMM similarity
멀티프로세싱 버전 (실전용)
"""

import psycopg2
from psycopg2.extras import DictCursor
from rapidfuzz import fuzz
from collections import defaultdict
from multiprocessing import Pool, cpu_count
import argparse
import time
from datetime import datetime
import importlib
import re
from functools import lru_cache
import os
import subprocess
import sys

try:
    # optional dependency (content/image hashing)
    _pil = importlib.import_module("PIL.Image")
    _pil_ops = importlib.import_module("PIL.ImageOps")
    _pil_filter = importlib.import_module("PIL.ImageFilter")
    _imagehash = importlib.import_module("imagehash")
    _requests = importlib.import_module("requests")

    Image = _pil
    ImageOps = _pil_ops
    ImageFilter = _pil_filter
    imagehash = _imagehash
    requests = _requests
except Exception:
    Image = None
    ImageOps = None
    ImageFilter = None
    imagehash = None
    requests = None


def _default_log_file_name() -> str:
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    return f"ownerTodomemeTransfer_{ts}.log"


def _spawn_background(argv: list[str], log_file: str) -> subprocess.Popen:
    """
    백그라운드(detach)로 실행하고 즉시 반환.
    - Linux/Unix: setsid로 세션 분리
    - Windows: DETACHED_PROCESS/CREATE_NEW_PROCESS_GROUP
    """
    log_path = os.path.abspath(str(log_file))
    os.makedirs(os.path.dirname(log_path) or ".", exist_ok=True)
    f = open(log_path, "a", encoding="utf-8")
    # 파일이 "즉시" 생성/표시되도록 첫 줄을 강제로 기록(+fsync)
    f.write(f"[{_now_str()}] BACKGROUND spawn argv={' '.join(argv)}\n")
    f.flush()
    try:
        os.fsync(f.fileno())
    except Exception:
        pass

    env = dict(os.environ)
    env["PYTHONUNBUFFERED"] = "1"
    kwargs: dict = {"stdout": f, "stderr": f, "stdin": subprocess.DEVNULL, "env": env}
    if os.name == "posix":
        kwargs["preexec_fn"] = os.setsid
        kwargs["close_fds"] = True
    else:
        creationflags = 0
        # type: ignore[attr-defined]
        creationflags |= getattr(subprocess, "DETACHED_PROCESS", 0)
        # type: ignore[attr-defined]
        creationflags |= getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
        kwargs["creationflags"] = creationflags
        kwargs["close_fds"] = True

    # NOTE: 반환된 Popen이 GC되어도 파일 핸들은 계속 살아있음(프로세스가 사용).
    return subprocess.Popen(argv, **kwargs)


def _now_str() -> str:
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


def _fmt_seconds(seconds: float) -> str:
    seconds = float(seconds)
    if seconds < 60:
        return f"{seconds:.1f}s"
    minutes = int(seconds // 60)
    rem = seconds - (minutes * 60)
    if minutes < 60:
        return f"{minutes}m {rem:.0f}s"
    hours = minutes // 60
    rem_min = minutes % 60
    return f"{hours}h {rem_min}m {rem:.0f}s"

# ===============================
# 설정
# ===============================
from db_config import DB_INFO_PSYCOPG2 as DB_CONFIG

FETCH_SIZE = 2000
SIM_THRESHOLD = 0.5
PRICE_RATIO = 1
MIN_TOKEN_LEN = 4
MAX_CANDIDATE_PER_OWN = 30
MAX_TOKEN_FREQ_OWN = 500   # 너무 흔한 토큰(OWN 쪽)을 제외 (튜닝 포인트)
MAX_TOKEN_FREQ_DMM = 500   # 너무 흔한 토큰(DMM 쪽)을 제외 (튜닝 포인트)

STOP_TOKENS = {
    '여성','남성','신상','정품','무료배송',
    '겨울','여름','봄','가을','용','용품'
}

WORKERS = max(cpu_count() - 1, 1)

# ===============================
# content 이미지셋 매칭 (Batch_MergeGroups_ByIname_1st.py --require-content-imgset와 동일 로직)
# ===============================
@lru_cache(maxsize=4096)
def _fetch_http_bytes(url: str, *, timeout: int = 10) -> bytes | None:
    u = str(url or "").strip()
    if not u:
        return None
    if u.startswith("//"):
        u = "https:" + u
    if not (u.startswith("http://") or u.startswith("https://")):
        return None
    if requests is None:
        return None
    try:
        try:
            m = re.match(r"^(https?://[^/]+)", u, flags=re.IGNORECASE)
            referer = (m.group(1) + "/") if m else "https://www.alibaba.com/"
        except Exception:
            referer = "https://www.alibaba.com/"

        headers = {
            "User-Agent": "Mozilla/5.0",
            "Referer": referer,
            "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
        }
        r = requests.get(u, timeout=int(timeout), headers=headers)
        r.raise_for_status()
        data = bytes(r.content)
        if len(data) > 10 * 1024 * 1024:
            return None
        return data
    except Exception:
        return None


def _extract_img_urls_from_html(html_text: str | None, *, limit: int = 20) -> list[str]:
    raw = str(html_text or "")
    srcs = re.findall(r"""<img[^>]+src\s*=\s*(?:(["'])(.*?)\1|([^'"\s>]+))""", raw, flags=re.IGNORECASE)
    urls: list[str] = []
    for q, a, b in srcs:
        u = (a or b or "").strip()
        if not u:
            continue
        if u.startswith("//"):
            u = "https:" + u
        if u.startswith("http://") or u.startswith("https://"):
            urls.append(u)
    seen = set()
    out: list[str] = []
    for u in urls:
        if u in seen:
            continue
        seen.add(u)
        out.append(u)
        if len(out) >= int(limit):
            break
    return out


@lru_cache(maxsize=4096)
def _image_hash_pair_from_url(img_url: str):
    if not img_url or not (Image and ImageOps and ImageFilter and imagehash):
        return None
    data = _fetch_http_bytes(str(img_url), timeout=10)
    if not data:
        return None
    try:
        from io import BytesIO

        bio = BytesIO(data)
        img = Image.open(bio)
        try:
            img.verify()
        except Exception:
            pass
        bio.seek(0)
        img = Image.open(bio)
        img = ImageOps.exif_transpose(img)
        # palette(P) + transparency 경고/품질 이슈 회피
        try:
            if getattr(img, "mode", None) == "P" and "transparency" in getattr(img, "info", {}):
                img = img.convert("RGBA")
        except Exception:
            pass
        img = img.convert("RGB")

        img = img.filter(ImageFilter.GaussianBlur(radius=0.6))
        resampling = getattr(Image, "Resampling", None)
        method = resampling.LANCZOS if resampling else getattr(Image, "LANCZOS", 1)
        img = ImageOps.fit(img, (256, 256), method=method)

        ph = imagehash.phash(img)
        dh = imagehash.dhash(img)
        return (ph, dh)
    except Exception:
        return None


def _content_imgset_match_stats(
    html_a: str | None,
    html_b: str | None,
    *,
    max_imgs: int = 12,
    ph_thr: int = 12,
    dh_thr: int = 14,
) -> dict:
    if imagehash is None or Image is None:
        return {"err": "NO_LIB"}
    urls_a = _extract_img_urls_from_html(html_a, limit=max_imgs)
    urls_b = _extract_img_urls_from_html(html_b, limit=max_imgs)
    if not urls_a or not urls_b:
        return {"err": "NO_IMG", "na": len(urls_a), "nb": len(urls_b)}

    pairs_a = []
    for u in urls_a:
        p = _image_hash_pair_from_url(u)
        if p:
            pairs_a.append(p)
    pairs_b = []
    for u in urls_b:
        p = _image_hash_pair_from_url(u)
        if p:
            pairs_b.append(p)

    if not pairs_a or not pairs_b:
        return {"err": "HASH_FAIL", "na": len(pairs_a), "nb": len(pairs_b)}

    match = 0
    min_ph = None
    min_dh = None
    for pb in pairs_b:
        best_ok = False
        for pa in pairs_a:
            try:
                ph = int(pa[0] - pb[0])
                dh = int(pa[1] - pb[1])
            except Exception:
                continue
            if min_ph is None or ph < min_ph:
                min_ph = ph
            if min_dh is None or dh < min_dh:
                min_dh = dh
            if ph <= ph_thr and dh <= dh_thr:
                best_ok = True
                break
        if best_ok:
            match += 1

    denom = min(len(pairs_a), len(pairs_b))
    ratio = (match / float(denom)) if denom > 0 else 0.0
    return {
        "err": None,
        "na": len(pairs_a),
        "nb": len(pairs_b),
        "match": match,
        "ratio": ratio,
        "min_ph": min_ph,
        "min_dh": min_dh,
        "ph_thr": ph_thr,
        "dh_thr": dh_thr,
    }


def _fetch_winner_content(conn, group_id: int) -> str | None:
    cur = conn.cursor(cursor_factory=DictCursor)
    try:
        cur.execute(
            """
            SELECT gm.content AS winner_content
            FROM mlinkdw.shopprod_group2 g
            LEFT JOIN mlinkdw.shopprod_group_map2 gm
              ON gm.group_id = g.group_id
             AND gm.vender_code = g.winner_vender_code
             AND gm.icode = g.winner_icode
            WHERE g.group_id = %s
            """,
            (int(group_id),),
        )
        row = cur.fetchone()
        return row["winner_content"] if row else None
    finally:
        cur.close()

def _fmt_f3(v) -> str:
    try:
        if v is None:
            return "NA"
        return f"{float(v):.3f}"
    except Exception:
        return "NA"


def merge_cluster(conn, *, target: int, sources: list[int], dry_run: bool) -> None:
    """
    Batch_MergeGroups_ByIname_1st.py의 merge_cluster와 동일 동작.
    """
    if not sources or dry_run:
        return

    srcs = [int(x) for x in sources]
    cur = conn.cursor()
    try:
        # 0) content_token 정리 정책:
        # - winner_icode(=winner 상품 1개)의 token만 유지하는 구조이므로
        #   source token은 삭제하고, target에서도 winner가 아닌 token은 제거한다.
        try:
            cur.execute(
                "DELETE FROM mlinkdw.shopprod_group_content_token WHERE group_id = ANY(%s)",
                (srcs,),
            )
            cur.execute(
                """
                DELETE FROM mlinkdw.shopprod_group_content_token t
                WHERE t.group_id = %s
                  AND EXISTS (
                    SELECT 1
                    FROM mlinkdw.shopprod_group2 g
                    WHERE g.group_id = t.group_id
                      AND (t.vender_code IS DISTINCT FROM g.winner_vender_code
                           OR t.icode IS DISTINCT FROM g.winner_icode)
                  )
                """,
                (int(target),),
            )
        except Exception:
            pass

        cur.execute(
            """
            UPDATE mlinkdw.shopprod_group_map2
            SET group_id = %s
            WHERE group_id = ANY(%s)
            """,
            (int(target), srcs),
        )
        cur.execute(
            """
            UPDATE mlinkdw.shopprod_sub_group2
            SET group_id = %s
            WHERE group_id = ANY(%s)
            """,
            (int(target), srcs),
        )
        try:
            cur.execute(
                """
                DELETE FROM mlinkdw.shopprod_group2 g
                WHERE g.group_id = ANY(%s)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_group_map2 gm WHERE gm.group_id = g.group_id)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_sub_group2 sg WHERE sg.group_id = g.group_id)
                  AND NOT EXISTS (SELECT 1 FROM mlinkdw.shopprod_group_content_token ct WHERE ct.group_id = g.group_id)
                """,
                (srcs,),
            )
        except Exception:
            pass
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        cur.close()

# STEP1: tmp_tok_ok를 필요 시에만 재생성하기 위한 플래그
STEP1_PREP_TMP_TOK_OK = False

# ===============================
# STEP 0. 사전 준비: shopprod_name_token 재생성
# ===============================
def step0_prepare_shopprod_name_token():
    t0 = time.perf_counter()
    print(f"[{_now_str()}] STEP0 start: rebuild mlinkdw.shopprod_name_token")
    conn = psycopg2.connect(**DB_CONFIG)
    cur = conn.cursor()
    try:
        # 실행 대상 DB/서버를 로그로 명확히 남김(“DROP이 안 됐다” 원인: 다른 DB 접속/권한/락 등 식별)
        try:
            cur.execute(
                """
                SELECT
                  current_database()::text AS db,
                  current_user::text       AS usr,
                  current_schema()::text   AS schema,
                  inet_server_addr()::text AS server_addr,
                  inet_server_port()::int  AS server_port
                """
            )
            info = cur.fetchone()
            print(
                f"[{_now_str()}] STEP0 db_info: db={info[0]} user={info[1]} schema={info[2]} server={info[3]}:{info[4]}",
                flush=True,
            )
        except Exception as e:
            print(f"[{_now_str()}] STEP0 warn: failed to fetch db_info: {e}", flush=True)

        # NOTE: 요청 SQL을 기준으로 테이블을 DROP 후 재생성합니다.
        # - DROP/CREATE 성공 여부를 중간중간 검증 로그로 남김
        try:
            cur.execute("SELECT to_regclass('mlinkdw.shopprod_name_token')")
            before_reg = cur.fetchone()[0]
        except Exception:
            before_reg = None
        print(f"[{_now_str()}] STEP0 before: to_regclass={before_reg}", flush=True)

        cur.execute("DROP TABLE IF EXISTS mlinkdw.shopprod_name_token;")
        conn.commit()

        try:
            cur.execute("SELECT to_regclass('mlinkdw.shopprod_name_token')")
            after_drop_reg = cur.fetchone()[0]
        except Exception:
            after_drop_reg = None
        print(f"[{_now_str()}] STEP0 after_drop: to_regclass={after_drop_reg}", flush=True)

        cur.execute(
            """
            CREATE UNLOGGED TABLE mlinkdw.shopprod_name_token (
                dome_code text,
                group_id bigint,
                icode text,
                token text
            );
            """
        )
        conn.commit()

        # UNLOGGED 여부 확인(relpersistence='u')
        try:
            cur.execute(
                """
                SELECT c.relpersistence::text
                FROM pg_class c
                JOIN pg_namespace n ON n.oid = c.relnamespace
                WHERE n.nspname = 'mlinkdw' AND c.relname = 'shopprod_name_token'
                """
            )
            relp = cur.fetchone()
            relp = relp[0] if relp else None
        except Exception:
            relp = None
        print(f"[{_now_str()}] STEP0 after_create: relpersistence={relp}", flush=True)

        cur.execute(
            """
            INSERT INTO mlinkdw.shopprod_name_token (dome_code, group_id, icode, token)
            SELECT
                a.dome_code,
                a.group_id,
                a.icode,
                lower(trim(t))
            FROM mlinkdw.shopprod_group_map2 a
            JOIN mlinkdw.shopprod_group2 b ON b.group_id = a.group_id
             AND b.winner_vender_code = a.vender_code
             AND b.winner_icode = a.icode
            CROSS JOIN LATERAL regexp_split_to_table(
                regexp_replace(a.iname, '[^가-힣a-zA-Z0-9 ]', ' ', 'g'),
                '\\s+'
            ) t
            WHERE length(t) >= 2;
            """
        )
        inserted = cur.rowcount
        conn.commit()
        print(f"[{_now_str()}] STEP0 insert: rows={inserted}", flush=True)

        cur.execute(
            """
            CREATE INDEX idx_token_dome_token
            ON mlinkdw.shopprod_name_token (dome_code, token);
            """
        )
        cur.execute(
            """
            CREATE INDEX idx_token_icode
            ON mlinkdw.shopprod_name_token (icode);
            """
        )
        cur.execute("ANALYZE mlinkdw.shopprod_name_token;")
        conn.commit()
    except Exception as e:
        try:
            conn.rollback()
        except Exception:
            pass
        print(f"[{_now_str()}] STEP0 ERROR: {e}", flush=True)
        raise
    finally:
        try:
            cur.close()
        except Exception:
            pass
        try:
            conn.close()
        except Exception:
            pass
        print(f"[{_now_str()}] STEP0 end: elapsed={_fmt_seconds(time.perf_counter() - t0)}", flush=True)

# ===============================
# STEP 1. 토큰 스트리밍 (단일)
# ===============================
def stream_token_candidates():
    t0 = time.perf_counter()
    print(f"[{_now_str()}] STEP1 start: stream_token_candidates")
    conn = psycopg2.connect(**DB_CONFIG)

    # STEP1 사전 준비: 토큰 유효 집합을 UNLOGGED 테이블로 물리화(플랜 안정/재실행 가속)
    # - 기본은 "기존 tmp_tok_ok 사용"
    # - 필요할 때만 main에서 step1_prep_tmp_tok_ok=True로 호출
    # NOTE: 함수 시그니처 변경을 최소화하기 위해, 아래 내부 함수를 사용합니다.
    def _step1_prepare_tmp_tok_ok():
        t_prep0 = time.perf_counter()
        cur_prep = conn.cursor()
        cur_prep.execute("DROP TABLE IF EXISTS tmp_tok_ok;")
        cur_prep.execute("""
            CREATE UNLOGGED TABLE tmp_tok_ok AS
            SELECT token
            FROM (
              SELECT
                token,
                COUNT(*) FILTER (WHERE dome_code = 'OWN') AS own_cnt,
                COUNT(*) FILTER (WHERE dome_code = 'DMM') AS dmm_cnt
              FROM mlinkdw.shopprod_name_token
              WHERE dome_code IN ('OWN','DMM')
                AND length(token) >= %s
                AND NOT (token = ANY(%s))
              GROUP BY token
            ) s
            WHERE own_cnt BETWEEN 1 AND %s
              AND dmm_cnt BETWEEN 1 AND %s
        """, (MIN_TOKEN_LEN, list(STOP_TOKENS), MAX_TOKEN_FREQ_OWN, MAX_TOKEN_FREQ_DMM))
        cur_prep.execute("CREATE INDEX idx_tmp_tok_ok_token ON tmp_tok_ok(token);")
        cur_prep.execute("ANALYZE tmp_tok_ok;")
        conn.commit()
        cur_prep.close()
        print(f"[{_now_str()}] STEP1 prep: tmp_tok_ok ready elapsed={_fmt_seconds(time.perf_counter() - t_prep0)}")

    def _step1_ensure_tmp_tok_ok_exists():
        try:
            cur_chk = conn.cursor()
            cur_chk.execute("SELECT 1 FROM tmp_tok_ok LIMIT 1;")
            cur_chk.close()
        except psycopg2.Error as e:
            # UndefinedTable 등: 사용자가 prep 플래그 없이 실행했는데 tmp_tok_ok가 없을 때
            conn.close()
            raise RuntimeError(
                "tmp_tok_ok 테이블이 없습니다. "
                "처음 실행이거나 tmp_tok_ok를 재생성해야 하면 "
                "`python ownerTodomemeTransfer.py --step1-prep-tmp-tok-ok` 로 먼저 준비하세요."
            ) from e

    if STEP1_PREP_TMP_TOK_OK:
        _step1_prepare_tmp_tok_ok()
    else:
        _step1_ensure_tmp_tok_ok_exists()

    cur = conn.cursor(
        name="token_stream_cursor",
        cursor_factory=DictCursor
    )

    cur.execute("""
        WITH pairs AS (
            SELECT
                o.icode AS own_icode,
                d.icode AS dmm_icode,
                min(o.token) AS token
            FROM mlinkdw.shopprod_name_token o
            JOIN mlinkdw.shopprod_name_token d ON o.token = d.token
            JOIN tmp_tok_ok t ON t.token = o.token
            WHERE o.dome_code = 'OWN'
              AND d.dome_code = 'DMM'
            GROUP BY o.icode, d.icode
        ),
        ranked AS (
            SELECT
                own_icode,
                dmm_icode,
                row_number() OVER (PARTITION BY own_icode ORDER BY token) AS rn
            FROM pairs
        )
        SELECT own_icode, dmm_icode
        FROM ranked
        WHERE rn <= %s
        ORDER BY own_icode, dmm_icode
    """, (
        MAX_CANDIDATE_PER_OWN,
    ))

    candidate_map = defaultdict(set)
    rows_read = 0

    step1_own_limit = None
    try:
        step1_own_limit = int(globals().get("STEP1_OWN_LIMIT") or 0)
    except Exception:
        step1_own_limit = 0
    if step1_own_limit <= 0:
        step1_own_limit = None

    while True:
        rows = cur.fetchmany(FETCH_SIZE)
        if not rows:
            break

        for r in rows:
            rows_read += 1
            own_icode = r["own_icode"]

            # 테스트용: OWN icode N개 기준으로 중단
            if step1_own_limit is not None and own_icode not in candidate_map and len(candidate_map) >= step1_own_limit:
                break

            if len(candidate_map[own_icode]) >= MAX_CANDIDATE_PER_OWN:
                continue

            candidate_map[own_icode].add(r["dmm_icode"])

        if step1_own_limit is not None and len(candidate_map) >= step1_own_limit:
            break

        if rows_read % 200000 == 0:
            elapsed = time.perf_counter() - t0
            rate = (rows_read / elapsed) if elapsed > 0 else 0.0
            print(
                f"[{_now_str()}] STEP1 progress: rows={rows_read:,} own={len(candidate_map):,} "
                f"elapsed={_fmt_seconds(elapsed)} rate={rate:,.0f} rows/s"
            )

    cur.close()
    conn.close()
    elapsed = time.perf_counter() - t0
    rate = (rows_read / elapsed) if elapsed > 0 else 0.0
    print(
        f"[{_now_str()}] STEP1 end: rows={rows_read:,} own={len(candidate_map):,} "
        f"elapsed={_fmt_seconds(elapsed)} rate={rate:,.0f} rows/s"
    )
    return candidate_map

# ===============================
# STEP 2. Worker 함수
# ===============================
def similarity_worker(args):
    own_icode, dmm_set = args

    conn = psycopg2.connect(**DB_CONFIG)
    cur = conn.cursor(cursor_factory=DictCursor)

    # OWN 정보
    cur.execute("""
        SELECT group_id, iname, price, img_url, content
        FROM mlinkdw.shopprod_group_map2
        WHERE dome_code = 'OWN' AND icode = %s
    """, (own_icode,))
    own = cur.fetchone()
    if not own:
        conn.close()
        return None

    best = None

    dmm_list = list(dmm_set)
    if not dmm_list:
        conn.close()
        return None

    # DMM 후보는 한 번에 가져오기 (N+1 제거)
    cur.execute("""
        SELECT icode, group_id, iname, price, img_url
        FROM mlinkdw.shopprod_group_map2
        WHERE dome_code = 'DMM' AND icode = ANY(%s)
    """, (dmm_list,))
    dmm_rows = cur.fetchall()

    for dmm in dmm_rows:
        dmm_icode = dmm["icode"]
        score = fuzz.token_set_ratio(
            own["iname"],
            dmm["iname"]
        ) / 100.0

        if score < SIM_THRESHOLD:
            continue

        if best is None or score > best["similarity"]:
            best = {
                "own_group_id": own["group_id"],
                "own_icode": own_icode,
                "own_iname": own["iname"],
                "own_price": own["price"],
                "own_img_url": own["img_url"],

                "dmm_group_id": dmm["group_id"],
                "dmm_icode": dmm_icode,
                "dmm_iname": dmm["iname"],
                "dmm_price": dmm["price"],
                "dmm_img_url": dmm["img_url"],

                "similarity": score
            }

    # 보수모드 판단
    if best and best["dmm_price"] < best["own_price"] * PRICE_RATIO:
        # content는 큰 컬럼이라 최종 후보에 대해서만 로드
        cur.execute("""
            SELECT content
            FROM mlinkdw.shopprod_group_map2
            WHERE dome_code = 'OWN' AND icode = %s
        """, (best["own_icode"],))
        own_content_row = cur.fetchone()
        best["own_content"] = own_content_row["content"] if own_content_row else None

        cur.execute("""
            SELECT content
            FROM mlinkdw.shopprod_group_map2
            WHERE dome_code = 'DMM' AND icode = %s
        """, (best["dmm_icode"],))
        dmm_content_row = cur.fetchone()
        best["dmm_content"] = dmm_content_row["content"] if dmm_content_row else None

    conn.close()
    return best

# ===============================
# STEP 3. 메인 실행
# ===============================
if __name__ == "__main__":

    parser = argparse.ArgumentParser(description="OWN ↔ DMM similarity (multiprocessing)")
    parser.add_argument("--workers", type=int, default=WORKERS, help="multiprocessing workers")
    parser.add_argument("--step0", action="store_true", help="STEP0: rebuild mlinkdw.shopprod_name_token")
    parser.add_argument("--step0-only", action="store_true", help="run STEP0 then exit")
    parser.add_argument("--step1-prep-tmp-tok-ok", action="store_true",
                        help="STEP1 prep: rebuild tmp_tok_ok (DROP/CREATE/INDEX/ANALYZE)")
    parser.add_argument("--apply", action="store_true", help="그룹 병합을 실제 DB에 적용")
    parser.add_argument("--apply-limit", type=int, default=None, help="apply 모드에서 병합을 N건까지만 수행")
    parser.add_argument("--check-limit", type=int, default=None, help="dry-run/검증에서 처리할 병합 후보 상한(N). 미지정이면 전체.")
    parser.add_argument("--require-content-imgset", action="store_true", default=False,
                        help="병합 전 winner_content 이미지셋 매칭 PASS만 병합")
    parser.add_argument("--content-max-imgs", type=int, default=12, help="content 비교 최대 이미지 수(기본 12)")
    parser.add_argument("--content-ph-threshold", type=int, default=12, help="content 이미지 매칭 기준: phash 거리(기본 12)")
    parser.add_argument("--content-dh-threshold", type=int, default=14, help="content 이미지 매칭 기준: dhash 거리(기본 14)")
    parser.add_argument("--content-min-matches", type=int, default=2, help="PASS 기준: 최소 매칭 이미지 수(기본 2)")
    parser.add_argument("--content-min-ratio", type=float, default=0.30, help="PASS 기준: 매칭 비율(기본 0.30)")
    parser.add_argument("--step1-own-limit", type=int, default=None, help="테스트용: STEP1에서 OWN icode를 N개까지만 수집하고 진행")
    parser.add_argument(
        "--background",
        action="store_true",
        help="백그라운드(detach)로 실행하고 즉시 종료. 로그는 --log-file로 저장 권장.",
    )
    parser.add_argument("--log-file", default=None, help="--background 사용 시 stdout/stderr 로그 파일")
    parser.add_argument("--_background-child", action="store_true", help=argparse.SUPPRESS)
    args = parser.parse_args()

    # 백그라운드(detach) 실행
    if args.background and not args._background_child:
        log_file = str(args.log_file or _default_log_file_name())
        log_file_abs = os.path.abspath(log_file)
        child_args: list[str] = []
        skip_next = False
        for a in sys.argv[1:]:
            if skip_next:
                skip_next = False
                continue
            if a == "--background":
                continue
            if a == "--log-file":
                skip_next = True
                continue
            child_args.append(a)
        # -u: stdout/stderr unbuffered (로그 파일이 비어 보이는 문제 방지)
        child_argv = [sys.executable, "-u", os.path.abspath(__file__), *child_args, "--_background-child"]
        p = _spawn_background(child_argv, log_file_abs)
        print(f"[{_now_str()}] BACKGROUND started pid={p.pid} log_file={log_file_abs}")
        raise SystemExit(0)

    WORKERS = max(int(args.workers), 1)

    t_all = time.perf_counter()
    print(f"[{_now_str()}] START ownerTodomemeTransfer")
    print(f"Workers: {WORKERS}")

    if args.step0 or args.step0_only:
        step0_prepare_shopprod_name_token()
        if args.step0_only:
            print(f"[{_now_str()}] DONE(step0-only) total_elapsed={_fmt_seconds(time.perf_counter() - t_all)}")
            raise SystemExit(0)

    # 1️⃣ 토큰 스트리밍
    t1 = time.perf_counter()
    STEP1_PREP_TMP_TOK_OK = bool(args.step1_prep_tmp_tok_ok)
    STEP1_OWN_LIMIT = int(args.step1_own_limit) if args.step1_own_limit is not None else None
    candidate_map = stream_token_candidates()
    print(f"OWN count: {len(candidate_map)}")
    print(f"[{_now_str()}] STEP1 total_elapsed={_fmt_seconds(time.perf_counter() - t1)}")

    # 2️⃣ 멀티프로세싱 similarity
    tasks = list(candidate_map.items())

    final_results = []
    t2 = time.perf_counter()
    processed = 0

    pool = Pool(processes=WORKERS)
    try:
        for res in pool.imap_unordered(similarity_worker, tasks, chunksize=50):
            processed += 1
            if res:
                final_results.append(res)

            if processed % 50 == 0:
                elapsed = time.perf_counter() - t2
                rate = (processed / elapsed) if elapsed > 0 else 0.0
                remain = len(tasks) - processed
                eta = (remain / rate) if rate > 0 else 0.0
                print(
                    f"[{_now_str()}] STEP2 progress: processed={processed:,}/{len(tasks):,} "
                    f"found={len(final_results):,} elapsed={_fmt_seconds(elapsed)} "
                    f"rate={rate:,.1f} own/s eta={_fmt_seconds(eta)}"
                )

        # 정상 종료 시 close/join
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        # Windows에서 Ctrl+C/강제 종료 시 내부 feeder thread가 남아
        # 'memoryview exported buffer' 경고가 뜨는 경우가 있어 명시적으로 정리
        print(f"[{_now_str()}] STEP2 interrupted: terminating pool...")
        pool.terminate()
        pool.join()
        raise
    except Exception:
        print(f"[{_now_str()}] STEP2 error: terminating pool...")
        pool.terminate()
        pool.join()
        raise

    print(f"Final conservative matches: {len(final_results)}")
    print(f"[{_now_str()}] STEP2 end: elapsed={_fmt_seconds(time.perf_counter() - t2)}")

    # 3️⃣ 그룹 자동 병합 (DMM_group_id <- OWN_group_id)
    # - best 선정 로직이 "OWN이 DMM으로 흡수(OWN -> DMM)" 방향이므로
    #   여기서도 target=DMM, source=OWN 으로 병합한다.
    # - 기본: dry-run (로그만)
    # - --apply: 실제 병합 적용
    # 중요: worker에서 best는 항상 리턴되므로, 병합 후보는 "가격조건(OWN->DMM)"을 통과한 것만 사용한다.
    # 또한 동일 OWN group_id가 여러 icode로 중복 매칭될 수 있어, source(OWN group)당 1개 target만 선택한다.
    by_source_best: dict[int, dict] = {}
    for r in final_results:
        try:
            own_gid = int(r.get("own_group_id") or 0)
            dmm_gid = int(r.get("dmm_group_id") or 0)
            sim = float(r.get("similarity") or 0.0)
            own_price = float(r.get("own_price") or 0.0)
            dmm_price = float(r.get("dmm_price") or 0.0)
        except Exception:
            continue
        if own_gid <= 0 or dmm_gid <= 0 or own_gid == dmm_gid:
            continue
        # OWN -> DMM 방향 조건
        if not (dmm_price < (own_price * float(PRICE_RATIO))):
            continue

        cur = by_source_best.get(own_gid)
        if cur is None or sim > float(cur.get("similarity") or 0.0):
            by_source_best[own_gid] = {
                "target_gid": dmm_gid,   # target = DMM
                "source_gid": own_gid,   # source = OWN
                "similarity": sim,
                "own_icode": r.get("own_icode"),
                "dmm_icode": r.get("dmm_icode"),
                "own_price": r.get("own_price"),
                "dmm_price": r.get("dmm_price"),
            }

    # 중복 제거(동일 (target, source))
    seen_ms = set()
    merge_pairs_dedup = []
    for own_gid, v in by_source_best.items():
        t_gid = int(v["target_gid"])
        s_gid = int(v["source_gid"])
        sim = float(v["similarity"])
        k = (t_gid, s_gid)
        if k in seen_ms:
            continue
        seen_ms.add(k)
        merge_pairs_dedup.append((t_gid, s_gid, sim, v.get("own_icode"), v.get("dmm_icode"), v.get("own_price"), v.get("dmm_price")))

    do_apply = bool(args.apply)
    apply_limit = int(args.apply_limit) if args.apply_limit is not None else None
    check_limit = int(args.check_limit) if args.check_limit is not None else None

    if args.require_content_imgset and (Image is None or imagehash is None or requests is None):
        raise RuntimeError("--require-content-imgset 사용에는 PIL/imagehash/requests 설치가 필요합니다.")

    if merge_pairs_dedup:
        print(
            f"[{_now_str()}] STEP3 start: merge_groups pairs={len(merge_pairs_dedup):,} "
            f"mode={'APPLY' if do_apply else 'DRY-RUN'} require_content_imgset={bool(args.require_content_imgset)} "
            f"apply_limit={apply_limit} check_limit={check_limit}"
        )
        conn_write = psycopg2.connect(**DB_CONFIG)
        merged = 0
        skipped = 0
        checked = 0
        for i, (t_gid, s_gid, sim, own_icode, dmm_icode, own_price, dmm_price) in enumerate(merge_pairs_dedup, start=1):
            if check_limit is not None and i > check_limit:
                break
            if do_apply and apply_limit is not None and merged >= apply_limit:
                break

            ok = True
            st = None
            if args.require_content_imgset:
                html_t = _fetch_winner_content(conn_write, t_gid)
                html_s = _fetch_winner_content(conn_write, s_gid)
                st = _content_imgset_match_stats(
                    html_t,
                    html_s,
                    max_imgs=int(args.content_max_imgs),
                    ph_thr=int(args.content_ph_threshold),
                    dh_thr=int(args.content_dh_threshold),
                )
                checked += 1
                if st.get("err") is not None:
                    ok = False
                else:
                    if int(st.get("match") or 0) < int(args.content_min_matches):
                        ok = False
                    if float(st.get("ratio") or 0.0) < float(args.content_min_ratio):
                        ok = False

            if not ok:
                skipped += 1
                if st:
                    print(
                        f"[{_now_str()}] STEP3 skip: {s_gid} -> {t_gid} sim={sim:.3f} "
                        f"own_icode={own_icode} dmm_icode={dmm_icode} own_price={own_price} dmm_price={dmm_price} "
                        f"content_err={st.get('err')} match={st.get('match')} ratio={_fmt_f3(st.get('ratio'))}"
                    )
                else:
                    print(
                        f"[{_now_str()}] STEP3 skip: {s_gid} -> {t_gid} sim={sim:.3f} "
                        f"own_icode={own_icode} dmm_icode={dmm_icode} own_price={own_price} dmm_price={dmm_price}"
                    )
                continue

            if st:
                print(
                    f"[{_now_str()}] STEP3 merge: {s_gid} -> {t_gid} sim={sim:.3f} "
                    f"own_icode={own_icode} dmm_icode={dmm_icode} own_price={own_price} dmm_price={dmm_price} "
                    f"content_match={st.get('match')} ratio={_fmt_f3(st.get('ratio'))}"
                )
            else:
                print(
                    f"[{_now_str()}] STEP3 merge: {s_gid} -> {t_gid} sim={sim:.3f} "
                    f"own_icode={own_icode} dmm_icode={dmm_icode} own_price={own_price} dmm_price={dmm_price}"
                )

            try:
                merge_cluster(conn_write, target=t_gid, sources=[s_gid], dry_run=(not do_apply))
                merged += 1
            except Exception as e:
                skipped += 1
                print(f"[{_now_str()}] STEP3 error: {s_gid} -> {t_gid} err={type(e).__name__}: {e}")

        try:
            conn_write.close()
        except Exception:
            pass

        print(
            f"[{_now_str()}] STEP3 end: merged={merged:,} skipped={skipped:,} "
            f"checked={checked:,} total_elapsed={_fmt_seconds(time.perf_counter() - t_all)}"
        )

    # 샘플 출력
    for r in final_results[:14]:
        print(
            f"[OWN] {r.get('own_icode')} | {r.get('own_iname')} | {r.get('own_price')} | {r.get('own_img_url')} | {r.get('own_content')}\n"
            f"[DMM] {r.get('dmm_icode')} | {r.get('dmm_iname')} | {r.get('dmm_price')} | {r.get('dmm_img_url')} | {r.get('dmm_content')}\n"
            f"SIM  : {r['similarity']:.3f}\n"
            "----------------------------------"
        )

    print(f"[{_now_str()}] DONE total_elapsed={_fmt_seconds(time.perf_counter() - t_all)}")
