"""
CREATE MATERIALIZED VIEW mlinkdw.mv_split_candidate
TABLESPACE pg_default
AS SELECT a.group_id,
    a.vender_code,
    count(*) AS cnt
   FROM mlinkdw.shopprod_group_map2 a
     JOIN mlinkdw.shopprod_group2 b ON a.group_id = b.group_id AND a.vender_code::text = b.winner_vender_code::text
  GROUP BY a.group_id, a.vender_code
 HAVING count(*) > 1
WITH DATA;
"""

import argparse
import psycopg2
from psycopg2.extras import DictCursor, execute_values
from db_config import DB_INFO_PSYCOPG2
import time
import subprocess
import sys
import os
from datetime import datetime

FETCH_SIZE = 2000

SNAPSHOT_TABLE = "mlinkdw.split_candidate_snapshot"

def _page_size(values) -> int:
    """
    psycopg2.extras.execute_values는 기본 page_size=100으로 내부적으로 여러 번 실행될 수 있다.
    SELECT ... JOIN (VALUES %s) 형태에서 fetchall()을 기대한다면 page_size를 충분히 크게 줘서
    '한 번의 실행'으로 끝나게 해야 결과 누락(마지막 page만 fetch) 버그를 피할 수 있다.
    """
    try:
        n = len(values)
    except Exception:
        n = 0
    return n if n and n > 0 else 1

def refresh_mv(conn):
    with conn.cursor() as cur:
        cur.execute("REFRESH MATERIALIZED VIEW mlinkdw.mv_split_candidate;")
    conn.commit()

def ensure_snapshot_table(conn):
    """
    mv_split_candidate를 실행 시점에 고정하기 위한 스냅샷 테이블.
    snapshot_id 단위로 후보(group_id, vender_code, cnt)를 저장한다.
    """
    with conn.cursor() as cur:
        cur.execute(
            f"""
            CREATE TABLE IF NOT EXISTS {SNAPSHOT_TABLE} (
              snapshot_id   text        NOT NULL,
              created_at    timestamptz NOT NULL DEFAULT now(),
              group_id      bigint      NOT NULL,
              vender_code   text        NOT NULL,
              cnt           int         NOT NULL,
              PRIMARY KEY (snapshot_id, group_id, vender_code)
            );
            """
        )
    conn.commit()


def _default_snapshot_id() -> str:
    # 사람이 보기 쉬운 run id
    return datetime.now().strftime("%Y%m%d_%H%M%S")


def create_snapshot(conn, snapshot_id: str) -> int:
    """
    현재 mv_split_candidate 내용을 스냅샷 테이블에 저장.
    이미 존재하는 동일 snapshot_id는 그대로 두고(재실행 안전), 중복은 무시.
    """
    ensure_snapshot_table(conn)
    with conn.cursor() as cur:
        cur.execute(
            f"""
            INSERT INTO {SNAPSHOT_TABLE} (snapshot_id, group_id, vender_code, cnt)
            SELECT %s, group_id, vender_code, cnt
            FROM mlinkdw.mv_split_candidate
            ON CONFLICT DO NOTHING
            """,
            (snapshot_id,),
        )
        inserted = cur.rowcount if cur.rowcount is not None else 0
    conn.commit()
    return int(inserted)

def cleanup_snapshots(conn, *, ttl_days: int) -> int:
    """
    오래된 스냅샷 정리(테이블 용량 관리).
    ttl_days 이전(created_at 기준)의 스냅샷 row들을 삭제한다.
    """
    try:
        days = int(ttl_days)
    except Exception:
        days = 0
    if days <= 0:
        return 0

    ensure_snapshot_table(conn)
    with conn.cursor() as cur:
        cur.execute(
            f"""
            DELETE FROM {SNAPSHOT_TABLE}
            WHERE created_at < (now() - (%s || ' days')::interval)
            """,
            (days,),
        )
        deleted = cur.rowcount if cur.rowcount is not None else 0
    conn.commit()
    return int(deleted)


def _default_log_file_name() -> str:
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    return f"Split_Products-by_Thumbnail_2st_{ts}.log"

def _spawn_background(argv: list[str], log_file: str) -> subprocess.Popen:
    """
    Ubuntu/Linux: setsid(start_new_session=True)로 세션을 분리해서 백그라운드 실행.
    stdout/stderr는 log_file로 리다이렉트.
    """
    log_path = os.path.abspath(log_file)
    os.makedirs(os.path.dirname(log_path) or ".", exist_ok=True)
    f = open(log_path, "a", buffering=1, encoding="utf-8")
    return subprocess.Popen(
        argv,
        stdout=f,
        stderr=subprocess.STDOUT,
        close_fds=True,
        start_new_session=True,
    )

def _fmt_hms(seconds: float) -> str:
    try:
        s = int(seconds)
    except Exception:
        s = 0
    if s < 0:
        s = 0
    h = s // 3600
    m = (s % 3600) // 60
    ss = s % 60
    return f"{h:02d}:{m:02d}:{ss:02d}"


def split_groups(
    conn_read,
    conn_write,
    *,
    log_every_batches: int = 1,
    workers: int = 1,
    worker_id: int = 0,
    refresh: bool = True,
    only_group_ids: list[int] | None = None,
    snapshot: bool = False,
    snapshot_id: str | None = None,
    snapshot_ttl_days: int = 0,
):
    # 입력을 스냅샷으로 고정(권장): 실행 중 winner 변경/merge 등으로 MV가 흔들려도 입력이 변하지 않음
    if snapshot:
        sid = str(snapshot_id or _default_snapshot_id())
        # refresh는 0번 worker가 하도록 상위 레벨에서 제어하지만,
        # workers=1로 직접 실행할 때도 일관성을 위해 refresh 옵션을 존중한다.
        if refresh:
            refresh_mv(conn_write)
        if worker_id == 0:
            if snapshot_ttl_days and int(snapshot_ttl_days) > 0:
                deleted = cleanup_snapshots(conn_write, ttl_days=int(snapshot_ttl_days))
                if deleted:
                    print(f"[INFO] snapshot cleanup: ttl_days={int(snapshot_ttl_days)} deleted_rows={deleted}", flush=True)
            ins = create_snapshot(conn_write, sid)
            print(f"[INFO] snapshot created: snapshot_id={sid} inserted={ins}", flush=True)
        snapshot_id = sid
    else:
        if refresh:
            refresh_mv(conn_write)

    read_cur = conn_read.cursor(
        name="split_cursor",
        cursor_factory=DictCursor
    )

    workers = int(workers) if workers and int(workers) > 0 else 1
    worker_id = int(worker_id) if worker_id is not None else 0
    if worker_id < 0 or worker_id >= workers:
        raise ValueError(f"invalid worker_id={worker_id} for workers={workers}")

    if workers > 1:
        # worker 분할: group_id 기준으로 서로 겹치지 않게 분리
        if only_group_ids:
            # 디버그/검증: 특정 group만 강제로 처리 (partition 무시)
            if snapshot:
                read_cur.execute(
                    f"""
                    SELECT group_id, vender_code
                    FROM {SNAPSHOT_TABLE}
                    WHERE snapshot_id = %s
                      AND group_id = ANY(%s)
                    ORDER BY group_id
                    """,
                    (snapshot_id, only_group_ids),
                )
            else:
                read_cur.execute(
                    """
                    SELECT group_id, vender_code
                    FROM mlinkdw.mv_split_candidate
                    WHERE group_id = ANY(%s)
                    ORDER BY group_id
                    """,
                    (only_group_ids,),
                )
            print(f"[INFO] only_group_ids enabled: {only_group_ids}", flush=True)
        else:
            if snapshot:
                read_cur.execute(
                    f"""
                    SELECT group_id, vender_code
                    FROM {SNAPSHOT_TABLE}
                    WHERE snapshot_id = %s
                      AND mod(group_id, %s) = %s
                    ORDER BY group_id
                    """,
                    (snapshot_id, workers, worker_id),
                )
            else:
                read_cur.execute(
                    """
                    SELECT group_id, vender_code
                    FROM mlinkdw.mv_split_candidate
                    WHERE mod(group_id, %s) = %s
                    ORDER BY group_id
                    """,
                    (workers, worker_id),
                )
            print(f"[INFO] worker partition enabled: worker_id={worker_id}/{workers}", flush=True)
    else:
        if only_group_ids:
            if snapshot:
                read_cur.execute(
                    f"""
                    SELECT group_id, vender_code
                    FROM {SNAPSHOT_TABLE}
                    WHERE snapshot_id = %s
                      AND group_id = ANY(%s)
                    ORDER BY group_id
                    """,
                    (snapshot_id, only_group_ids),
                )
            else:
                read_cur.execute(
                    """
                    SELECT group_id, vender_code
                    FROM mlinkdw.mv_split_candidate
                    WHERE group_id = ANY(%s)
                    ORDER BY group_id
                    """,
                    (only_group_ids,),
                )
            print(f"[INFO] only_group_ids enabled: {only_group_ids}", flush=True)
        else:
            if snapshot:
                read_cur.execute(
                    f"""
                    SELECT group_id, vender_code
                    FROM {SNAPSHOT_TABLE}
                    WHERE snapshot_id = %s
                    ORDER BY group_id
                    """,
                    (snapshot_id,),
                )
            else:
                read_cur.execute(
                    """
                    SELECT group_id, vender_code
                    FROM mlinkdw.mv_split_candidate
                    ORDER BY group_id
                    """
                )

    write_cur = conn_write.cursor(cursor_factory=DictCursor)

    t0 = time.perf_counter()
    batch_no = 0
    total_pairs = 0
    total_obsolete_subs = 0
    total_new_sub_groups = 0
    total_inserts = 0
    total_deletes = 0
    log_every_batches = int(log_every_batches) if log_every_batches and int(log_every_batches) > 0 else 1

    while True:
        rows = read_cur.fetchmany(FETCH_SIZE)
        if not rows:
            break

        batch_no += 1
        bt0 = time.perf_counter()

        # ==========
        # 배치 단위 최적화: (group_id, vender_code) 여러 건을 한 번에 처리
        # ==========
        pairs = [(r["group_id"], r["vender_code"]) for r in rows]
        # 중복 제거(순서 유지)
        seen = set()
        uniq_pairs = []
        for p in pairs:
            if p in seen:
                continue
            seen.add(p)
            uniq_pairs.append(p)
        pairs = uniq_pairs
        total_pairs += len(pairs)

        group_ids = list({gid for gid, _ in pairs})

        # 🔥 0️⃣ 현재 group에서 무효가 된 sub_group 정리 (배치)
        obsolete_count = 0
        if group_ids:
            execute_values(
                write_cur,
                """
                SELECT sg.sub_group_id
                  FROM mlinkdw.shopprod_sub_group2 sg
                  JOIN mlinkdw.shopprod_group2 g ON g.group_id = sg.group_id
                  JOIN (VALUES %s) AS v(group_id) ON v.group_id = sg.group_id
                 WHERE sg.winner_vender_code != g.winner_vender_code
                   AND NOT EXISTS (
                        SELECT 1
                          FROM mlinkdw.shopprod_sub_group_map2 sm
                         WHERE sm.sub_group_id = sg.sub_group_id
                           AND sm.vender_code  = g.winner_vender_code
                   )
                """,
                [(gid,) for gid in group_ids],
                page_size=_page_size(group_ids),
            )
            obsolete_subs = [rr["sub_group_id"] for rr in write_cur.fetchall()]
            obsolete_count = len(obsolete_subs)
            total_obsolete_subs += obsolete_count

            if obsolete_subs:
                execute_values(
                    write_cur,
                    """
                    DELETE FROM mlinkdw.shopprod_sub_group_map2 sm
                    USING (VALUES %s) AS v(sub_group_id)
                    WHERE sm.sub_group_id = v.sub_group_id
                    """,
                    [(sid,) for sid in obsolete_subs],
                )
                execute_values(
                    write_cur,
                    """
                    DELETE FROM mlinkdw.shopprod_sub_group2 sg
                    USING (VALUES %s) AS v(sub_group_id)
                    WHERE sg.sub_group_id = v.sub_group_id
                    """,
                    [(sid,) for sid in obsolete_subs],
                )

        # 1️⃣ sub_group 존재 여부 확인 (배치)
        execute_values(
            write_cur,
            """
            SELECT sg.group_id, sg.winner_vender_code, sg.sub_group_id
              FROM mlinkdw.shopprod_sub_group2 sg
              JOIN (VALUES %s) AS v(group_id, vender_code)
                ON sg.group_id = v.group_id
               AND sg.winner_vender_code = v.vender_code
            """,
            pairs,
            page_size=_page_size(pairs),
        )
        sub_map = {(rr["group_id"], rr["winner_vender_code"]): rr["sub_group_id"] for rr in write_cur.fetchall()}

        missing = [p for p in pairs if p not in sub_map]

        # 신규 sub_group 생성 (winner는 임시 대표) - 배치
        new_sub_groups = 0
        if missing:
            execute_values(
                write_cur,
                """
                SELECT DISTINCT ON (m.group_id, m.vender_code)
                       m.group_id, m.vender_code, m.icode, m.price
                  FROM mlinkdw.shopprod_group_map2 m
                  JOIN (VALUES %s) AS v(group_id, vender_code)
                    ON m.group_id = v.group_id
                   AND m.vender_code = v.vender_code
                 ORDER BY m.group_id, m.vender_code, m.price NULLS LAST, m.icode
                """,
                missing,
                page_size=_page_size(missing),
            )
            winners = write_cur.fetchall()

            insert_rows = []
            for w in winners:
                insert_rows.append((w["group_id"], "WINNER_VENDOR_MULTI", w["icode"], w["vender_code"], w["price"]))

            if insert_rows:
                execute_values(
                    write_cur,
                    """
                    INSERT INTO mlinkdw.shopprod_sub_group2
                      (group_id, split_reason, winner_icode, winner_vender_code, winner_price, winner_updated_at)
                    VALUES %s
                    RETURNING sub_group_id, group_id, winner_vender_code
                    """,
                    insert_rows,
                    template="(%s, %s, %s, %s, %s, now())",
                    page_size=_page_size(insert_rows),
                )
                inserted = write_cur.fetchall()
                new_sub_groups = len(inserted)
                total_new_sub_groups += new_sub_groups
                for ins in inserted:
                    sub_map[(ins["group_id"], ins["winner_vender_code"])] = ins["sub_group_id"]

        # 2️⃣ 현재 group_map 상품 (배치)
        execute_values(
            write_cur,
            """
            SELECT m.group_id, m.vender_code, m.icode, m.iname, m.price
              FROM mlinkdw.shopprod_group_map2 m
              JOIN (VALUES %s) AS v(group_id, vender_code)
                ON m.group_id = v.group_id
               AND m.vender_code = v.vender_code
            """,
            pairs,
            page_size=_page_size(pairs),
        )
        current_rows = write_cur.fetchall()

        # 3️⃣ 기존 sub_group_map 상품 (배치)
        sub_ids = list({sub_map[p] for p in pairs if p in sub_map})
        existing_rows = []
        if sub_ids:
            execute_values(
                write_cur,
                """
                SELECT sm.sub_group_id, sm.vender_code, sm.icode
                  FROM mlinkdw.shopprod_sub_group_map2 sm
                  JOIN (VALUES %s) AS v(sub_group_id) ON v.sub_group_id = sm.sub_group_id
                """,
                [(sid,) for sid in sub_ids],
                page_size=_page_size(sub_ids),
            )
            existing_rows = write_cur.fetchall()

        # (sub_group_id)별 current/existing set 구성
        current_keys_by_sub = {}
        current_detail_by_sub = {}
        for cr in current_rows:
            key_pair = (cr["group_id"], cr["vender_code"])
            sid = sub_map.get(key_pair)
            if not sid:
                continue
            k = (cr["vender_code"], cr["icode"])
            current_keys_by_sub.setdefault(sid, set()).add(k)
            current_detail_by_sub.setdefault(sid, {})[k] = cr

        existing_keys_by_sub = {}
        for er in existing_rows:
            sid = er["sub_group_id"]
            k = (er["vender_code"], er["icode"])
            existing_keys_by_sub.setdefault(sid, set()).add(k)

        inserts = []
        deletes = []
        for sid in sub_ids:
            cur_set = current_keys_by_sub.get(sid, set())
            ex_set = existing_keys_by_sub.get(sid, set())

            for k in cur_set - ex_set:
                d = current_detail_by_sub[sid][k]
                inserts.append((sid, d["vender_code"], d["icode"], d["iname"], d["price"]))

            for k in ex_set - cur_set:
                deletes.append((sid, k[0], k[1]))

        # 4️⃣ INSERT (신규) - 배치
        ins_count = len(inserts)
        if inserts:
            execute_values(
                write_cur,
                """
                INSERT INTO mlinkdw.shopprod_sub_group_map2
                  (sub_group_id, vender_code, icode, iname, price)
                VALUES %s
                ON CONFLICT DO NOTHING
                """,
                inserts,
            )
            total_inserts += ins_count

        # 5️⃣ DELETE (사라진 상품) - 배치
        del_count = len(deletes)
        if deletes:
            execute_values(
                write_cur,
                """
                DELETE FROM mlinkdw.shopprod_sub_group_map2 sm
                USING (VALUES %s) AS v(sub_group_id, vender_code, icode)
                WHERE sm.sub_group_id = v.sub_group_id
                  AND sm.vender_code  = v.vender_code
                  AND sm.icode        = v.icode
                """,
                deletes,
            )
            total_deletes += del_count

        conn_write.commit()

        if (batch_no % log_every_batches) == 0:
            belapsed = time.perf_counter() - bt0
            elapsed = time.perf_counter() - t0
            rate = (total_pairs / elapsed) if elapsed > 0 else 0.0
            print(
                f"[INFO] batch={batch_no} rows={len(rows)} pairs={len(pairs)} "
                f"obsolete_subs={obsolete_count} new_sub_groups={new_sub_groups} "
                f"ins={ins_count} del={del_count} "
                f"batch_elapsed={_fmt_hms(belapsed)} total_elapsed={_fmt_hms(elapsed)} rate={rate:.1f}pairs/s",
                flush=True,
            )

    read_cur.close()
    write_cur.close()

    elapsed = time.perf_counter() - t0
    rate = (total_pairs / elapsed) if elapsed > 0 else 0.0
    print(
        f"[INFO] done batches={batch_no} total_pairs={total_pairs} "
        f"obsolete_subs={total_obsolete_subs} new_sub_groups={total_new_sub_groups} "
        f"ins={total_inserts} del={total_deletes} elapsed={_fmt_hms(elapsed)} rate={rate:.1f}pairs/s",
        flush=True,
    )


if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    ap.add_argument(
        "--log-every-batches",
        type=int,
        default=1,
        help="진행 로그 출력 주기(배치 단위). 1이면 매 배치 출력, 10이면 10배치마다 출력.",
    )
    ap.add_argument(
        "--background",
        action="store_true",
        help="Ubuntu/Linux: 백그라운드(detach)로 실행하고 즉시 종료. 로그는 --log-file로 저장 권장.",
    )
    ap.add_argument(
        "--log-file",
        default=None,
        help="--background 사용 시 stdout/stderr를 저장할 로그 파일 경로. 미지정이면 자동 생성.",
    )
    ap.add_argument("--_background-child", action="store_true", help=argparse.SUPPRESS)
    ap.add_argument(
        "--workers",
        type=int,
        default=1,
        help="병렬 실행 프로세스 수. (group_id %% workers 기준으로 입력을 분할하여 중복 없이 처리)",
    )
    ap.add_argument(
        "--worker-id",
        type=int,
        default=None,
        help="(내부용) workers>1일 때 현재 프로세스의 worker id (0..workers-1).",
    )
    ap.add_argument(
        "--no-refresh",
        action="store_true",
        help="MV refresh 생략(병렬 실행 시 자식 프로세스에서 사용).",
    )
    ap.add_argument(
        "--only-group-id",
        type=int,
        nargs="*",
        default=None,
        help="디버그/검증용: 지정한 group_id만 mv_split_candidate에서 골라 처리. (partition 무시)",
    )
    ap.add_argument(
        "--snapshot",
        action="store_true",
        help="실행 시작 시점의 mv_split_candidate를 스냅샷 테이블로 고정해서 처리(권장). MV가 실행 중/후에 변해도 입력이 흔들리지 않음.",
    )
    ap.add_argument(
        "--snapshot-id",
        default=None,
        help="--snapshot 사용 시 스냅샷 식별자. 미지정이면 자동 생성(YYYYmmdd_HHMMSS).",
    )
    ap.add_argument(
        "--snapshot-ttl-days",
        type=int,
        default=0,
        help="--snapshot 사용 시 스냅샷 테이블 정리 보관 기간(일). 0이면 정리 안함. 예) 14",
    )
    args = ap.parse_args()

    # 백그라운드(detach) 실행: 부모는 즉시 종료, 자식 프로세스가 실제 작업 수행
    if args.background and not args._background_child:
        log_file = str(args.log_file or _default_log_file_name())
        # --background 제거 + 자식 플래그 추가
        child_args: list[str] = []
        skip_next = False
        for a in sys.argv[1:]:
            if skip_next:
                skip_next = False
                continue
            if a == "--background":
                continue
            if a == "--log-file":
                skip_next = True
                continue
            child_args.append(a)
        child_args.extend(["--_background-child", "--log-file", log_file])

        cmd = [sys.executable, os.path.abspath(sys.argv[0]), *child_args]
        p = _spawn_background(cmd, log_file=log_file)
        print(f"[INFO] background started: pid={p.pid} log_file={log_file}", flush=True)
        sys.exit(0)

    # workers>1 이고 worker-id 미지정이면, 부모가 자식 프로세스들을 띄우고 종료
    if args.workers and int(args.workers) > 1 and args.worker_id is None:
        workers = int(args.workers)
        script = os.path.abspath(sys.argv[0])
        base = [sys.executable, script, "--workers", str(workers), "--log-every-batches", str(args.log_every_batches)]
        procs = []

        # 스냅샷 모드면 "부모가 먼저 스냅샷을 생성"한 뒤 자식들을 띄운다.
        # (동시에 시작한 w1/w2/w3가 snapshot table/row 생성 전에 SELECT하여 죽는 레이스 방지)
        if args.snapshot:
            snapshot_id = str(args.snapshot_id or _default_snapshot_id())

            conn_write_parent = psycopg2.connect(**DB_INFO_PSYCOPG2)
            try:
                if not args.no_refresh:
                    refresh_mv(conn_write_parent)
                ttl = int(args.snapshot_ttl_days or 0)
                if ttl > 0:
                    deleted = cleanup_snapshots(conn_write_parent, ttl_days=ttl)
                    if deleted:
                        print(f"[INFO] snapshot cleanup(parent): ttl_days={ttl} deleted_rows={deleted}", flush=True)
                ins = create_snapshot(conn_write_parent, snapshot_id)
                print(f"[INFO] snapshot created(parent): snapshot_id={snapshot_id} inserted={ins}", flush=True)
            finally:
                try:
                    conn_write_parent.close()
                except Exception:
                    pass

            base += [
                "--snapshot",
                "--snapshot-id",
                snapshot_id,
                "--snapshot-ttl-days",
                str(int(args.snapshot_ttl_days or 0)),
            ]

        for wid in range(workers):
            cmd = base + ["--worker-id", str(wid)]
            if wid != 0:
                cmd.append("--no-refresh")  # refresh는 0번 worker만 수행(락/부하 감소)
            # only-group-id 전달(있으면 worker들이 동일 대상만 처리; partition은 split_groups에서 무시)
            if args.only_group_id:
                cmd += ["--only-group-id", *[str(x) for x in args.only_group_id]]
            # 로그 파일이 지정되면 워커별로 분리 저장(추천)
            if args.log_file:
                cmd += ["--log-file", f"{args.log_file}.w{wid}"]
            if args.log_file:
                log_path = os.path.abspath(f"{args.log_file}.w{wid}")
                os.makedirs(os.path.dirname(log_path) or ".", exist_ok=True)
                f = open(log_path, "a", buffering=1, encoding="utf-8")
                procs.append(subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT, close_fds=True))
            else:
                procs.append(subprocess.Popen(cmd))
        print(f"[INFO] spawned {len(procs)} workers. pids={[p.pid for p in procs]}", flush=True)
        sys.exit(0)

    conn_read = psycopg2.connect(**DB_INFO_PSYCOPG2)
    conn_write = psycopg2.connect(**DB_INFO_PSYCOPG2)

    split_groups(
        conn_read,
        conn_write,
        log_every_batches=args.log_every_batches,
        workers=int(args.workers) if args.workers else 1,
        worker_id=int(args.worker_id) if args.worker_id is not None else 0,
        refresh=(not args.no_refresh),
        only_group_ids=list(args.only_group_id) if args.only_group_id else None,
        snapshot=bool(args.snapshot),
        snapshot_id=str(args.snapshot_id) if args.snapshot_id else None,
        snapshot_ttl_days=int(args.snapshot_ttl_days or 0),
    )

    conn_read.close()
    conn_write.close()
