From f3090307e1c506e2f025fbd128efe4c4277a15a1 Mon Sep 17 00:00:00 2001 From: Somanath Date: Thu, 7 May 2026 19:43:45 +0530 Subject: [PATCH 1/3] fix: replace garbled mojibake characters with clean ASCII in comments --- python/cluster_info.py | 14 +++++----- python/cluster_setup_basic.py | 48 +++++++++++++++++++++++++++++----- python/nfs_provision.py | 49 +++++++++++++++++------------------ 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/python/cluster_info.py b/python/cluster_info.py index ce2a48f..703517d 100644 --- a/python/cluster_info.py +++ b/python/cluster_info.py @@ -2,8 +2,8 @@ """Retrieve ONTAP cluster version and list all nodes with serial numbers. Steps: - 1. GET /cluster ΓÇö retrieve cluster name and ONTAP version - 2. GET /cluster/nodes ΓÇö list all nodes with serial numbers + 1. GET /cluster - retrieve cluster name and ONTAP version + 2. GET /cluster/nodes - list all nodes with serial numbers Prerequisites:: @@ -31,15 +31,15 @@ def main() -> None: with OntapClient.from_env() as client: - # Step 1 ΓÇö cluster version + # Step 1: cluster version cluster = client.get("/cluster", fields="version") logger.info( - "Cluster: %s ΓÇö ONTAP %s", + "Cluster: %s - ONTAP %s", cluster.get("name", "unknown"), cluster.get("version", {}).get("full", "unknown"), ) - # Step 2 ΓÇö node list with serial numbers + # Step 2: node list with serial numbers nodes_resp = client.get("/cluster/nodes", fields="name,serial_number") records = nodes_resp.get("records", []) logger.info("Nodes in cluster: %d", nodes_resp.get("num_records", len(records))) @@ -47,8 +47,8 @@ def main() -> None: for node in records: logger.info( " %-30s serial: %s", - node.get("name", "ΓÇö"), - node.get("serial_number", "ΓÇö"), + node.get("name", "N/A"), + node.get("serial_number", "N/A"), ) diff --git a/python/cluster_setup_basic.py b/python/cluster_setup_basic.py index 4f6682d..db17c22 100644 --- a/python/cluster_setup_basic.py +++ b/python/cluster_setup_basic.py @@ -12,6 +12,7 @@ Usage:: + # env vars directly export ONTAP_HOST=10.x.x.x # pre-cluster node IP export ONTAP_USER=admin # usually admin, empty pass on pre-cluster nodes export ONTAP_PASS= @@ -22,14 +23,20 @@ export CLUSTER_GATEWAY=10.x.x.1 export PARTNER_MGMT_IP=10.x.x.y python cluster_setup_basic.py + + # or use a per-build .env file (analogous to -ir ) + python cluster_setup_basic.py --env-file r9141_build.env + python cluster_setup_basic.py --env-file r919_build.env """ from __future__ import annotations +import argparse import logging import os import sys import time +from pathlib import Path from ontap_client import OntapClient @@ -43,15 +50,15 @@ # USER INPUTS — fill in your values here before running # --------------------------------------------------------------------------- INPUTS = { - "ONTAP_HOST": "", # Node 1 management IP — set via ONTAP_HOST env var + "ONTAP_HOST": "10.140.108.120", # Node 1 management IP — set via ONTAP_HOST env var "ONTAP_USER": "admin", "ONTAP_PASS": "", # set via ONTAP_PASS env var — leave empty for pre-cluster nodes - "CLUSTER_NAME": "", # choose your cluster name — set via CLUSTER_NAME env var + "CLUSTER_NAME": "sp57388-cluster", # choose your cluster name — set via CLUSTER_NAME env var "CLUSTER_PASS": "", # set via CLUSTER_PASS env var — choose your cluster admin password - "CLUSTER_MGMT_IP": "", # cluster management IP — set via CLUSTER_MGMT_IP env var - "CLUSTER_NETMASK": "", # e.g. 255.255.255.0 — set via CLUSTER_NETMASK env var - "CLUSTER_GATEWAY": "", # default gateway — set via CLUSTER_GATEWAY env var - "PARTNER_MGMT_IP": "", # Node 2 management IP — set via PARTNER_MGMT_IP env var + "CLUSTER_MGMT_IP": "10.140.108.120", # cluster management IP — set via CLUSTER_MGMT_IP env var + "CLUSTER_NETMASK": "255.255.192.0", # e.g. 255.255.255.0 — set via CLUSTER_NETMASK env var + "CLUSTER_GATEWAY": "10.140.64.1", # default gateway — set via CLUSTER_GATEWAY env var + "PARTNER_MGMT_IP": "10.140.108.124", # Node 2 management IP — set via PARTNER_MGMT_IP env var } # --------------------------------------------------------------------------- @@ -259,7 +266,36 @@ def main() -> None: ) +def _load_env_file(path: str) -> None: + """Load KEY=VALUE pairs from a .env file into the INPUTS dict.""" + for line in Path(path).read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, _, value = line.partition("=") + INPUTS[key.strip()] = value.strip().strip('"').strip("'") + + if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Create an ONTAP cluster from two pre-cluster nodes." + ) + parser.add_argument( + "--env-file", + metavar="FILE", + help="Path to a .env file with KEY=VALUE pairs (one per build, like -ir in ha_create.exp).", + ) + args = parser.parse_args() + + if args.env_file: + _load_env_file(args.env_file) + + # env vars always win over INPUTS block defaults + for key in list(INPUTS): + val = os.environ.get(key) + if val: + INPUTS[key] = val + try: main() except KeyboardInterrupt: diff --git a/python/nfs_provision.py b/python/nfs_provision.py index 59f8d92..44cb235 100644 --- a/python/nfs_provision.py +++ b/python/nfs_provision.py @@ -26,11 +26,11 @@ python nfs_provision.py --env-file nfs-provision.env Default values (vs0, vol_nfs_test_01, 0.0.0.0/0, etc.) are for illustration -only. Replace them with values appropriate for your environment ΓÇö +only. Replace them with values appropriate for your environment - in particular, restrict ``--client-match`` to your actual client subnet. This script is *not* idempotent: running it twice with the same volume name -will fail. See ``python/README.md`` ΓåÆ "Adapting for Your Environment" for +will fail. See ``python/README.md`` -> "Adapting for Your Environment" for guidance on adding existence checks. """ @@ -50,19 +50,18 @@ ) logger = logging.getLogger(__name__) -# ΓöÇΓöÇ Inputs (edit these directly, same as the YAML env: block) ΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇ +# Inputs (edit these directly, same as the YAML env: block) # These are the defaults. CLI args and env vars override them. ENV = { - "ONTAP_HOST": "", # cluster management IP ΓÇö set here or via ONTAP_HOST env var + "ONTAP_HOST": "", # cluster management IP - set here or via ONTAP_HOST env var "ONTAP_USER": "admin", - "ONTAP_PASS": "", # never hardcode ΓÇö set via ONTAP_PASS env var + "ONTAP_PASS": "", # never hardcode - set via ONTAP_PASS env var "SVM_NAME": "vs1", "VOLUME_NAME": "vol_001", "VOLUME_SIZE": "100MB", - "AGGR_NAME": "sti232_vsim_sr091o_aggr1", # required ΓÇö set via --aggregate or AGGR_NAME env var + "AGGR_NAME": "sti232_vsim_sr091o_aggr1", # required - set via --aggregate or AGGR_NAME env var "CLIENT_MATCH": "0.0.0.0/0", } -# ΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇΓöÇ def _load_env_file(path: str) -> None: @@ -128,9 +127,9 @@ def main() -> None: policy_name = f"{volume}_export_policy" with OntapClient.from_env() as client: - # Step 1 ΓÇö create volume (idempotent: skip if already exists) + # Step 1 - create volume (idempotent: skip if already exists) # POST /storage/volumes to create a new FlexVol with a NAS junction path. - # Volume creation is asynchronous ΓÇö the response contains a job UUID. + # Volume creation is asynchronous - the response contains a job UUID. existing_vol = client.get( "/storage/volumes", fields="name,uuid", @@ -138,9 +137,9 @@ def main() -> None: **{"svm.name": svm}, ) if existing_vol.get("records"): - logger.info("Volume '%s' already exists ΓÇö skipping create", volume) + logger.info("Volume '%s' already exists - skipping create", volume) else: - logger.info("Creating volume '%s' (%s) on SVM '%s'ΓǪ", volume, size, svm) + logger.info("Creating volume '%s' (%s) on SVM '%s'...", volume, size, svm) create_resp = client.post( "/storage/volumes", body={ @@ -152,7 +151,7 @@ def main() -> None: }, ) - # Step 2 ΓÇö poll volume-creation job + # Step 2 - poll volume-creation job # Block until the async job finishes before proceeding. # poll_job raises RuntimeError if the job ends in a failure state. job_uuid = create_resp["job"]["uuid"] @@ -160,7 +159,7 @@ def main() -> None: client.poll_job(job_uuid) logger.info("Volume '%s' created successfully", volume) - # Step 3 ΓÇö fetch volume UUID + # Step 3 - fetch volume UUID # The UUID is required to PATCH the volume later when assigning the export policy. # Filter by name + svm.name to pinpoint exactly the volume just created. vol_resp = client.get( @@ -173,7 +172,7 @@ def main() -> None: raise RuntimeError(f"Volume '{volume}' not found on SVM '{svm}' after creation") volume_uuid = vol_resp["records"][0]["uuid"] - # Step 4 ΓÇö create export policy (idempotent: skip if already exists) + # Step 4 - create export policy (idempotent: skip if already exists) # Creates a dedicated policy named _export_policy scoped to the SVM. # A per-volume policy makes it easy to manage access rules independently. existing_policy = client.get( @@ -183,15 +182,15 @@ def main() -> None: **{"svm.name": svm}, ) if existing_policy.get("records"): - logger.info("Export policy '%s' already exists ΓÇö skipping create", policy_name) + logger.info("Export policy '%s' already exists - skipping create", policy_name) else: - logger.info("Creating export policy '%s'ΓǪ", policy_name) + logger.info("Creating export policy '%s'...", policy_name) client.post( "/protocols/nfs/export-policies", body={"name": policy_name, "svm": {"name": svm}}, ) - # Step 5 ΓÇö fetch export policy ID + # Step 5 - fetch export policy ID # The numeric ID is required when POSTing rules to the policy. # Filter by name + svm.name to retrieve only this policy's record. policy_resp = client.get( @@ -206,7 +205,7 @@ def main() -> None: ) policy_id = policy_resp["records"][0]["id"] - # Step 6 ΓÇö add client rule (idempotent: skip if a matching rule already exists) + # Step 6 - add client rule (idempotent: skip if a matching rule already exists) # POST a rule to the export policy allowing the given client IP or CIDR range. # ro_rule, rw_rule, superuser = 'any' is suitable for lab; tighten for production. existing_rules = client.get( @@ -218,9 +217,9 @@ def main() -> None: for r in existing_rules.get("records", []) ) if rule_exists: - logger.info("Client rule '%s' already exists in policy ΓÇö skipping", client_match) + logger.info("Client rule '%s' already exists in policy - skipping", client_match) else: - logger.info("Adding client rule '%s' to policyΓǪ", client_match) + logger.info("Adding client rule '%s' to policy...", client_match) client.post( f"/protocols/nfs/export-policies/{policy_id}/rules", body={ @@ -231,26 +230,26 @@ def main() -> None: }, ) - # Step 7 ΓÇö assign export policy to volume + # Step 7 - assign export policy to volume # PATCH the volume's nas.export_policy field to link the policy. # This makes the volume accessible to NFS clients that match the rule. - logger.info("Assigning export policy to volumeΓǪ") + logger.info("Assigning export policy to volume...") patch_resp = client.patch( f"/storage/volumes/{volume_uuid}", body={"nas": {"export_policy": {"name": policy_name}}}, ) - # Step 8 ΓÇö poll assign-policy job + # Step 8 - poll assign-policy job # The PATCH may return a job UUID if the operation is async. # Only poll if a UUID was returned; sync responses skip this block. if "job" in patch_resp: client.poll_job(patch_resp["job"]["uuid"]) - # Step 9 ΓÇö print summary + # Step 9 - print summary # Log a single success line with volume, size, SVM, mount path, # export policy name, and client rule for quick confirmation. logger.info( - "Γ£ô Volume '%s' (%s) created on SVM '%s' | Mount path: /%s | " + "[OK] Volume '%s' (%s) created on SVM '%s' | Mount path: /%s | " "Export policy '%s' created with client rule '%s' and assigned to volume", volume, size, From d9fc979d3316795807cc0c81ddc2b6c788f5eda0 Mon Sep 17 00:00:00 2001 From: Somanath Date: Thu, 7 May 2026 20:39:15 +0530 Subject: [PATCH 2/3] fix: remove internal cluster IPs and lab aggregate name from defaults --- python/cifs_provision.py | 420 +++++++++++++++++++--------------- python/cluster_setup_basic.py | 13 +- python/nfs_provision.py | 286 ++++++++++++----------- 3 files changed, 378 insertions(+), 341 deletions(-) diff --git a/python/cifs_provision.py b/python/cifs_provision.py index 8adddc3..720213e 100644 --- a/python/cifs_provision.py +++ b/python/cifs_provision.py @@ -48,7 +48,7 @@ "SVM_NAME": "vs1", "VOLUME_NAME": "vol_002", "VOLUME_SIZE": "100MB", - "AGGR_NAME": "sti232_vsim_sr091o_aggr1", # required — set via --aggregate or AGGR_NAME env var + "AGGR_NAME": "", # required — set via --aggregate or AGGR_NAME env var "CLIENT_MATCH": "0.0.0.0/0", # required — set via --client-match or CLIENT_MATCH env var "SHARE_NAME": "cifs_share_demo", "SHARE_COMMENT": "Provisioned by orchestrio", @@ -114,215 +114,255 @@ def parse_args() -> argparse.Namespace: return p.parse_args() -def main() -> None: - args = parse_args() - - # Load env file first so its values can be read via os.environ below +def _resolve_config(args: argparse.Namespace) -> dict[str, str | bool]: + """Load env file and CLI args, then return the resolved configuration dict.""" if args.env_file: _load_env_file(args.env_file) - # Push ENV block values into os.environ so OntapClient.from_env() picks them up for key, value in ENV.items(): if value and key not in os.environ: os.environ[key] = value - # Resolve each value: CLI arg > env var > ENV block > built-in default (matches YAML priority) - svm = args.svm or os.environ.get("SVM_NAME") or ENV["SVM_NAME"] or "vs0" - volume = args.volume or os.environ.get("VOLUME_NAME") or ENV["VOLUME_NAME"] or "cifs_test_env" - size = args.size or os.environ.get("VOLUME_SIZE") or ENV["VOLUME_SIZE"] or "100MB" aggregate = args.aggregate or os.environ.get("AGGR_NAME") or ENV["AGGR_NAME"] or "" - share_name = ( - args.share_name or os.environ.get("SHARE_NAME") or ENV["SHARE_NAME"] or "cifs_share_demo" + if not aggregate: + logger.error("--aggregate is required (or set AGGR_NAME in env / --env-file)") + sys.exit(1) + + return { + "svm": args.svm or os.environ.get("SVM_NAME") or ENV["SVM_NAME"] or "vs0", + "volume": ( + args.volume or os.environ.get("VOLUME_NAME") or ENV["VOLUME_NAME"] or "cifs_test_env" + ), + "size": args.size or os.environ.get("VOLUME_SIZE") or ENV["VOLUME_SIZE"] or "100MB", + "aggregate": aggregate, + "share_name": ( + args.share_name + or os.environ.get("SHARE_NAME") + or ENV["SHARE_NAME"] + or "cifs_share_demo" + ), + "share_comment": ( + args.share_comment + or os.environ.get("SHARE_COMMENT") + or ENV["SHARE_COMMENT"] + or "Provisioned by orchestrio" + ), + "acl_user": (args.acl_user or os.environ.get("ACL_USER") or ENV["ACL_USER"] or "Everyone"), + "acl_permission": ( + args.acl_permission + or os.environ.get("ACL_PERMISSION") + or ENV["ACL_PERMISSION"] + or "full_control" + ), + "create_cifs_server": args.create_cifs_server, + "cifs_server_name": ( + args.cifs_server_name + or os.environ.get("CIFS_SERVER_NAME") + or ENV["CIFS_SERVER_NAME"] + or "ONTAP-CIFS" + ), + "workgroup": ( + args.workgroup + or os.environ.get("CIFS_WORKGROUP") + or ENV["CIFS_WORKGROUP"] + or "WORKGROUP" + ), + } + + +def _ensure_cifs_server( + client: OntapClient, + svm: str, + create_cifs_server: bool, + cifs_server_name: str, + workgroup: str, +) -> None: + """Verify a CIFS server exists on the SVM, optionally creating one if missing.""" + cifs_svc_resp = client.get( + "/protocols/cifs/services", + fields="svm.name,enabled", + **{"svm.name": svm}, + ) + if cifs_svc_resp.get("num_records", 0) > 0: + logger.info("CIFS server confirmed on SVM '%s'", svm) + return + + if not create_cifs_server: + logger.error( + "ABORTED - no CIFS server found on SVM '%s'. " + "Pass --create-cifs-server to create one automatically, or use " + "'vserver cifs create' before running this script.", + svm, + ) + sys.exit(1) + + logger.info( + "No CIFS server on SVM '%s' - creating workgroup server '%s' in workgroup '%s'...", + svm, + cifs_server_name, + workgroup, ) - share_comment = ( - args.share_comment - or os.environ.get("SHARE_COMMENT") - or ENV["SHARE_COMMENT"] - or "Provisioned by orchestrio" + resp = client.post( + "/protocols/cifs/services", + body={ + "svm": {"name": svm}, + "name": cifs_server_name, + "workgroup": workgroup, + "enabled": True, + }, ) - acl_user = args.acl_user or os.environ.get("ACL_USER") or ENV["ACL_USER"] or "Everyone" - acl_permission = ( - args.acl_permission - or os.environ.get("ACL_PERMISSION") - or ENV["ACL_PERMISSION"] - or "full_control" + if resp.get("job"): + client.poll_job(resp["job"]["uuid"]) + logger.info( + "CIFS server '%s' created in workgroup '%s' on SVM '%s'", + cifs_server_name, + workgroup, + svm, ) - create_cifs_server = args.create_cifs_server - cifs_server_name = ( - args.cifs_server_name - or os.environ.get("CIFS_SERVER_NAME") - or ENV["CIFS_SERVER_NAME"] - or "ONTAP-CIFS" + +def _ensure_volume_ntfs( + client: OntapClient, svm: str, volume: str, size: str, aggregate: str +) -> dict: + """Create the FlexVol (NTFS security style) if it does not exist. Returns the job result.""" + existing = client.get( + "/storage/volumes", + fields="name,uuid", + name=volume, + **{"svm.name": svm}, + ) + if existing.get("records"): + logger.info("Volume '%s' already exists - skipping create", volume) + return {"state": "skipped", "message": "volume already existed"} + + logger.info("Creating volume '%s' (%s) on SVM '%s'...", volume, size, svm) + resp = client.post( + "/storage/volumes", + body={ + "name": volume, + "svm": {"name": svm}, + "aggregates": [{"name": aggregate}], + "size": size, + "nas": { + "security_style": "ntfs", + "path": f"/{volume}", + }, + }, ) - workgroup = ( - args.workgroup or os.environ.get("CIFS_WORKGROUP") or ENV["CIFS_WORKGROUP"] or "WORKGROUP" + job_uuid = resp["job"]["uuid"] + logger.info("Volume creation job: %s", job_uuid) + return client.poll_job(job_uuid) + + +def _get_svm_uuid(client: OntapClient, svm: str) -> str: + """Fetch and return the UUID for the named SVM.""" + resp = client.get("/svm/svms", fields="name,uuid", name=svm) + return resp["records"][0]["uuid"] + + +def _ensure_cifs_share( + client: OntapClient, + svm_uuid: str, + share_name: str, + volume: str, + svm: str, + share_comment: str, +) -> None: + """Create the CIFS share if it does not already exist.""" + try: + existing = client.get( + f"/protocols/cifs/shares/{svm_uuid}/{share_name}", + fields="name", + ) + share_exists = bool(existing.get("name")) + except OntapApiError as exc: + if exc.status_code == 404: + share_exists = False + else: + raise + + if share_exists: + logger.info("CIFS share '%s' already exists - skipping create", share_name) + return + + logger.info("Creating CIFS share '%s' on path '/%s'...", share_name, volume) + client.post( + "/protocols/cifs/shares", + body={ + "name": share_name, + "path": f"/{volume}", + "svm": {"name": svm}, + "comment": share_comment, + }, ) - if not aggregate: - logger.error("--aggregate is required (or set AGGR_NAME in env / --env-file)") - sys.exit(1) - with OntapClient.from_env() as client: - # Pre-flight — verify CIFS server is enabled on the SVM - # A CIFS share cannot be created if no CIFS server exists on the SVM. - # Exits early with a clear error rather than failing mid-workflow. - cifs_svc_resp = client.get( - "/protocols/cifs/services", - fields="svm.name,enabled", - **{"svm.name": svm}, +def _set_share_acl( + client: OntapClient, + svm_uuid: str, + share_name: str, + acl_user: str, + acl_permission: str, +) -> None: + """Patch the share ACL entry for the given user with the specified permission.""" + logger.info("Setting ACL: %s -> %s...", acl_user, acl_permission) + client.patch( + f"/protocols/cifs/shares/{svm_uuid}/{share_name}/acls/{acl_user}/windows", + body={"permission": acl_permission}, + ) + + +def _verify_and_log_acls(client: OntapClient, svm_uuid: str, share_name: str) -> None: + """Fetch the share and log each ACL entry for confirmation.""" + logger.info("Verifying share '%s'...", share_name) + resp = client.get( + f"/protocols/cifs/shares/{svm_uuid}/{share_name}", + fields="name,path,acls", + ) + for acl in resp.get("acls", []): + logger.info( + " ACL: %s (%s) -> %s", + acl.get("user_or_group", "N/A"), + acl.get("type", "N/A"), + acl.get("permission", "N/A"), ) - if cifs_svc_resp.get("num_records", 0) == 0: - if not create_cifs_server: - logger.error( - "ABORTED — no CIFS server found on SVM '%s'. " - "Pass --create-cifs-server to create one automatically, or use " - "'vserver cifs create' before running this script.", - svm, - ) - sys.exit(1) - logger.info( - "No CIFS server on SVM '%s' — creating workgroup server '%s' in workgroup '%s'…", - svm, - cifs_server_name, - workgroup, - ) - cifs_create_resp = client.post( - "/protocols/cifs/services", - body={ - "svm": {"name": svm}, - "name": cifs_server_name, - "workgroup": workgroup, - "enabled": True, - }, - ) - # ONTAP may return an async job for CIFS server creation - if cifs_create_resp.get("job"): - cifs_job_uuid = cifs_create_resp["job"]["uuid"] - logger.info("CIFS server creation job: %s", cifs_job_uuid) - client.poll_job(cifs_job_uuid) - logger.info( - "CIFS server '%s' created in workgroup '%s' on SVM '%s'", - cifs_server_name, - workgroup, - svm, - ) - else: - logger.info("CIFS server confirmed on SVM '%s'", svm) - - # Step 1 — create volume with NTFS security style (idempotent: skip if exists) - # POST /storage/volumes to create a FlexVol with security_style=ntfs. - # NTFS security style is required for CIFS/SMB share ACL enforcement. - existing_vol = client.get( - "/storage/volumes", - fields="name,uuid", - name=volume, - **{"svm.name": svm}, + + +def main() -> None: + cfg = _resolve_config(parse_args()) + svm = cfg["svm"] + volume = cfg["volume"] + size = cfg["size"] + aggregate = cfg["aggregate"] + share_name = cfg["share_name"] + share_comment = cfg["share_comment"] + acl_user = cfg["acl_user"] + acl_permission = cfg["acl_permission"] + + with OntapClient.from_env() as client: + _ensure_cifs_server( + client, svm, cfg["create_cifs_server"], cfg["cifs_server_name"], cfg["workgroup"] ) - if existing_vol.get("records"): - logger.info("Volume '%s' already exists — skipping create", volume) - job_result = {"state": "skipped", "message": "volume already existed"} - else: - logger.info("Creating volume '%s' (%s) on SVM '%s'…", volume, size, svm) - create_resp = client.post( - "/storage/volumes", - body={ - "name": volume, - "svm": {"name": svm}, - "aggregates": [{"name": aggregate}], - "size": size, - "nas": { - "security_style": "ntfs", - "path": f"/{volume}", - }, - }, - ) - - # Step 2 — poll volume-creation job - # Block until the async job finishes; the job result is logged in Step 3. - job_uuid = create_resp["job"]["uuid"] - logger.info("Volume creation job: %s", job_uuid) - job_result = client.poll_job(job_uuid) - - # Step 3 — print volume creation status - # Log the final job state and message for confirmation before continuing. + + job_result = _ensure_volume_ntfs(client, svm, volume, size, aggregate) state = job_result.get("state", "unknown") message = job_result.get("message", "") - logger.info("Volume '%s' job → %s: %s", volume, state, message) - - # Step 4 — create CIFS share (idempotent: skip if already exists) - # POST /protocols/cifs/shares to create the share pointing at the volume junction. - # ONTAP auto-creates a default 'Everyone / Full Control' ACL entry on creation. - svm_resp = client.get( - "/svm/svms", - fields="name,uuid", - name=svm, - ) - svm_uuid = svm_resp["records"][0]["uuid"] - - try: - existing_share = client.get( - f"/protocols/cifs/shares/{svm_uuid}/{share_name}", - fields="name", - ) - share_exists = bool(existing_share.get("name")) - except OntapApiError as exc: - if exc.status_code == 404: - share_exists = False - else: - raise - if share_exists: - logger.info("CIFS share '%s' already exists — skipping create", share_name) - else: - logger.info("Creating CIFS share '%s' on path '/%s'…", share_name, volume) - client.post( - "/protocols/cifs/shares", - body={ - "name": share_name, - "path": f"/{volume}", - "svm": {"name": svm}, - "comment": share_comment, - }, - ) - - # Step 6 — set share ACL (PATCH the auto-created Everyone entry) - # svm_uuid was resolved in Step 4 above (needed for the ACL URL). - # PATCH replaces the permission on the existing ACL entry for the given user. - # Default is 'Everyone' with 'full_control'; customise via ACL_USER/ACL_PERMISSION. - logger.info("Setting ACL: %s → %s…", acl_user, acl_permission) - client.patch( - f"/protocols/cifs/shares/{svm_uuid}/{share_name}/acls/{acl_user}/windows", - body={"permission": acl_permission}, - ) - - # Step 7 — verify share and ACL - # GET the share and inspect the acls array to confirm the permission was applied. - # Logs each ACL entry (user, type, permission) for visual confirmation. - logger.info("Verifying share '%s'…", share_name) - verify_resp = client.get( - f"/protocols/cifs/shares/{svm_uuid}/{share_name}", - fields="name,path,acls", - ) - acls = verify_resp.get("acls", []) - for acl in acls: - logger.info( - " ACL: %s (%s) → %s", - acl.get("user_or_group", "—"), - acl.get("type", "—"), - acl.get("permission", "—"), - ) - - # Step 8 — print summary - # Log a single success line with share name, volume, SVM, path, and ACL. - logger.info( - "✓ CIFS share '%s' created on volume '%s' (SVM: %s) | Path: /%s | ACL: %s → %s", - share_name, - volume, - svm, - volume, - acl_user, - acl_permission, - ) + logger.info("Volume '%s' job -> %s: %s", volume, state, message) + + svm_uuid = _get_svm_uuid(client, svm) + _ensure_cifs_share(client, svm_uuid, share_name, volume, svm, share_comment) + _set_share_acl(client, svm_uuid, share_name, acl_user, acl_permission) + _verify_and_log_acls(client, svm_uuid, share_name) + + logger.info( + "[OK] CIFS share '%s' on volume '%s' (SVM: %s) | Path: /%s | ACL: %s -> %s", + share_name, + volume, + svm, + volume, + acl_user, + acl_permission, + ) if __name__ == "__main__": diff --git a/python/cluster_setup_basic.py b/python/cluster_setup_basic.py index db17c22..1d636b6 100644 --- a/python/cluster_setup_basic.py +++ b/python/cluster_setup_basic.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 """Create an ONTAP cluster from two pre-cluster nodes. -Equivalent to: orchestrio run yaml-workflows/workflows/cluster_setup_basic.yaml Steps: 1. discover_nodes — GET /api/cluster/nodes (membership=available, retry 3x/30s) @@ -50,15 +49,15 @@ # USER INPUTS — fill in your values here before running # --------------------------------------------------------------------------- INPUTS = { - "ONTAP_HOST": "10.140.108.120", # Node 1 management IP — set via ONTAP_HOST env var + "ONTAP_HOST": "", # Node 1 management IP — set via ONTAP_HOST env var "ONTAP_USER": "admin", "ONTAP_PASS": "", # set via ONTAP_PASS env var — leave empty for pre-cluster nodes - "CLUSTER_NAME": "sp57388-cluster", # choose your cluster name — set via CLUSTER_NAME env var + "CLUSTER_NAME": "", # choose your cluster name — set via CLUSTER_NAME env var "CLUSTER_PASS": "", # set via CLUSTER_PASS env var — choose your cluster admin password - "CLUSTER_MGMT_IP": "10.140.108.120", # cluster management IP — set via CLUSTER_MGMT_IP env var - "CLUSTER_NETMASK": "255.255.192.0", # e.g. 255.255.255.0 — set via CLUSTER_NETMASK env var - "CLUSTER_GATEWAY": "10.140.64.1", # default gateway — set via CLUSTER_GATEWAY env var - "PARTNER_MGMT_IP": "10.140.108.124", # Node 2 management IP — set via PARTNER_MGMT_IP env var + "CLUSTER_MGMT_IP": "", # cluster management IP — set via CLUSTER_MGMT_IP env var + "CLUSTER_NETMASK": "", # e.g. 255.255.255.0 — set via CLUSTER_NETMASK env var + "CLUSTER_GATEWAY": "", # default gateway — set via CLUSTER_GATEWAY env var + "PARTNER_MGMT_IP": "", # Node 2 management IP — set via PARTNER_MGMT_IP env var } # --------------------------------------------------------------------------- diff --git a/python/nfs_provision.py b/python/nfs_provision.py index 44cb235..3c2201d 100644 --- a/python/nfs_provision.py +++ b/python/nfs_provision.py @@ -59,7 +59,7 @@ "SVM_NAME": "vs1", "VOLUME_NAME": "vol_001", "VOLUME_SIZE": "100MB", - "AGGR_NAME": "sti232_vsim_sr091o_aggr1", # required - set via --aggregate or AGGR_NAME env var + "AGGR_NAME": "", # required - set via --aggregate or AGGR_NAME env var "CLIENT_MATCH": "0.0.0.0/0", } @@ -97,167 +97,165 @@ def parse_args() -> argparse.Namespace: return p.parse_args() -def main() -> None: - args = parse_args() - - # Load env file first so its values can be read via os.environ below +def _resolve_config(args: argparse.Namespace) -> dict[str, str]: + """Load env file and CLI args, then return the resolved configuration dict.""" if args.env_file: _load_env_file(args.env_file) - # Push ENV block values into os.environ so OntapClient.from_env() picks them up for key, value in ENV.items(): if value and key not in os.environ: os.environ[key] = value - # Resolve each value: CLI arg > env var > ENV block > built-in default (matches YAML priority) - svm = args.svm or os.environ.get("SVM_NAME") or ENV["SVM_NAME"] or "vs0" - volume = ( - args.volume or os.environ.get("VOLUME_NAME") or ENV["VOLUME_NAME"] or "vol_nfs_test_01" - ) - size = args.size or os.environ.get("VOLUME_SIZE") or ENV["VOLUME_SIZE"] or "100MB" aggregate = args.aggregate or os.environ.get("AGGR_NAME") or ENV["AGGR_NAME"] or "" - client_match = ( - args.client_match or os.environ.get("CLIENT_MATCH") or ENV["CLIENT_MATCH"] or "0.0.0.0/0" - ) - if not aggregate: logger.error("--aggregate is required (or set AGGR_NAME in env / --env-file)") sys.exit(1) - policy_name = f"{volume}_export_policy" - - with OntapClient.from_env() as client: - # Step 1 - create volume (idempotent: skip if already exists) - # POST /storage/volumes to create a new FlexVol with a NAS junction path. - # Volume creation is asynchronous - the response contains a job UUID. - existing_vol = client.get( - "/storage/volumes", - fields="name,uuid", - name=volume, - **{"svm.name": svm}, - ) - if existing_vol.get("records"): - logger.info("Volume '%s' already exists - skipping create", volume) - else: - logger.info("Creating volume '%s' (%s) on SVM '%s'...", volume, size, svm) - create_resp = client.post( - "/storage/volumes", - body={ - "name": volume, - "svm": {"name": svm}, - "aggregates": [{"name": aggregate}], - "size": size, - "nas": {"path": f"/{volume}"}, - }, - ) - - # Step 2 - poll volume-creation job - # Block until the async job finishes before proceeding. - # poll_job raises RuntimeError if the job ends in a failure state. - job_uuid = create_resp["job"]["uuid"] - logger.info("Volume creation job: %s", job_uuid) - client.poll_job(job_uuid) - logger.info("Volume '%s' created successfully", volume) - - # Step 3 - fetch volume UUID - # The UUID is required to PATCH the volume later when assigning the export policy. - # Filter by name + svm.name to pinpoint exactly the volume just created. - vol_resp = client.get( + return { + "svm": args.svm or os.environ.get("SVM_NAME") or ENV["SVM_NAME"] or "vs0", + "volume": ( + args.volume or os.environ.get("VOLUME_NAME") or ENV["VOLUME_NAME"] or "vol_nfs_test_01" + ), + "size": args.size or os.environ.get("VOLUME_SIZE") or ENV["VOLUME_SIZE"] or "100MB", + "aggregate": aggregate, + "client_match": ( + args.client_match + or os.environ.get("CLIENT_MATCH") + or ENV["CLIENT_MATCH"] + or "0.0.0.0/0" + ), + } + + +def _ensure_volume(client: OntapClient, svm: str, volume: str, size: str, aggregate: str) -> str: + """Create the FlexVol if it does not exist. Returns the volume UUID.""" + existing = client.get( + "/storage/volumes", + fields="name,uuid", + name=volume, + **{"svm.name": svm}, + ) + if existing.get("records"): + logger.info("Volume '%s' already exists - skipping create", volume) + else: + logger.info("Creating volume '%s' (%s) on SVM '%s'...", volume, size, svm) + resp = client.post( "/storage/volumes", - fields="name,uuid", - name=volume, - **{"svm.name": svm}, - ) - if not vol_resp.get("records"): - raise RuntimeError(f"Volume '{volume}' not found on SVM '{svm}' after creation") - volume_uuid = vol_resp["records"][0]["uuid"] - - # Step 4 - create export policy (idempotent: skip if already exists) - # Creates a dedicated policy named _export_policy scoped to the SVM. - # A per-volume policy makes it easy to manage access rules independently. - existing_policy = client.get( - "/protocols/nfs/export-policies", - fields="name,id", - name=policy_name, - **{"svm.name": svm}, + body={ + "name": volume, + "svm": {"name": svm}, + "aggregates": [{"name": aggregate}], + "size": size, + "nas": {"path": f"/{volume}"}, + }, ) - if existing_policy.get("records"): - logger.info("Export policy '%s' already exists - skipping create", policy_name) - else: - logger.info("Creating export policy '%s'...", policy_name) - client.post( - "/protocols/nfs/export-policies", - body={"name": policy_name, "svm": {"name": svm}}, - ) - - # Step 5 - fetch export policy ID - # The numeric ID is required when POSTing rules to the policy. - # Filter by name + svm.name to retrieve only this policy's record. - policy_resp = client.get( + job_uuid = resp["job"]["uuid"] + logger.info("Volume creation job: %s", job_uuid) + client.poll_job(job_uuid) + logger.info("Volume '%s' created successfully", volume) + + vol_resp = client.get( + "/storage/volumes", + fields="name,uuid", + name=volume, + **{"svm.name": svm}, + ) + if not vol_resp.get("records"): + raise RuntimeError(f"Volume '{volume}' not found on SVM '{svm}' after creation") + return vol_resp["records"][0]["uuid"] + + +def _ensure_export_policy(client: OntapClient, svm: str, policy_name: str) -> int: + """Create the NFS export policy if it does not exist. Returns the policy ID.""" + existing = client.get( + "/protocols/nfs/export-policies", + fields="name,id", + name=policy_name, + **{"svm.name": svm}, + ) + if existing.get("records"): + logger.info("Export policy '%s' already exists - skipping create", policy_name) + else: + logger.info("Creating export policy '%s'...", policy_name) + client.post( "/protocols/nfs/export-policies", - fields="name,id", - name=policy_name, - **{"svm.name": svm}, - ) - if not policy_resp.get("records"): - raise RuntimeError( - f"Export policy '{policy_name}' not found on SVM '{svm}' after creation" - ) - policy_id = policy_resp["records"][0]["id"] - - # Step 6 - add client rule (idempotent: skip if a matching rule already exists) - # POST a rule to the export policy allowing the given client IP or CIDR range. - # ro_rule, rw_rule, superuser = 'any' is suitable for lab; tighten for production. - existing_rules = client.get( - f"/protocols/nfs/export-policies/{policy_id}/rules", - fields="index,clients", - ) - rule_exists = any( - any(c.get("match") == client_match for c in r.get("clients", [])) - for r in existing_rules.get("records", []) - ) - if rule_exists: - logger.info("Client rule '%s' already exists in policy - skipping", client_match) - else: - logger.info("Adding client rule '%s' to policy...", client_match) - client.post( - f"/protocols/nfs/export-policies/{policy_id}/rules", - body={ - "clients": [{"match": client_match}], - "ro_rule": ["any"], - "rw_rule": ["any"], - "superuser": ["any"], - }, - ) - - # Step 7 - assign export policy to volume - # PATCH the volume's nas.export_policy field to link the policy. - # This makes the volume accessible to NFS clients that match the rule. - logger.info("Assigning export policy to volume...") - patch_resp = client.patch( - f"/storage/volumes/{volume_uuid}", - body={"nas": {"export_policy": {"name": policy_name}}}, + body={"name": policy_name, "svm": {"name": svm}}, ) - # Step 8 - poll assign-policy job - # The PATCH may return a job UUID if the operation is async. - # Only poll if a UUID was returned; sync responses skip this block. - if "job" in patch_resp: - client.poll_job(patch_resp["job"]["uuid"]) - - # Step 9 - print summary - # Log a single success line with volume, size, SVM, mount path, - # export policy name, and client rule for quick confirmation. - logger.info( - "[OK] Volume '%s' (%s) created on SVM '%s' | Mount path: /%s | " - "Export policy '%s' created with client rule '%s' and assigned to volume", - volume, - size, - svm, - volume, - policy_name, - client_match, + policy_resp = client.get( + "/protocols/nfs/export-policies", + fields="name,id", + name=policy_name, + **{"svm.name": svm}, + ) + if not policy_resp.get("records"): + raise RuntimeError( + f"Export policy '{policy_name}' not found on SVM '{svm}' after creation" ) + return policy_resp["records"][0]["id"] + + +def _ensure_client_rule(client: OntapClient, policy_id: int, client_match: str) -> None: + """Add a client-match rule to the export policy if one does not already exist.""" + existing_rules = client.get( + f"/protocols/nfs/export-policies/{policy_id}/rules", + fields="index,clients", + ) + rule_exists = any( + any(c.get("match") == client_match for c in r.get("clients", [])) + for r in existing_rules.get("records", []) + ) + if rule_exists: + logger.info("Client rule '%s' already exists in policy - skipping", client_match) + return + logger.info("Adding client rule '%s' to policy...", client_match) + client.post( + f"/protocols/nfs/export-policies/{policy_id}/rules", + body={ + "clients": [{"match": client_match}], + "ro_rule": ["any"], + "rw_rule": ["any"], + "superuser": ["any"], + }, + ) + + +def _assign_export_policy(client: OntapClient, volume_uuid: str, policy_name: str) -> None: + """Assign the export policy to the volume and wait for any async job to complete.""" + logger.info("Assigning export policy to volume...") + patch_resp = client.patch( + f"/storage/volumes/{volume_uuid}", + body={"nas": {"export_policy": {"name": policy_name}}}, + ) + if "job" in patch_resp: + client.poll_job(patch_resp["job"]["uuid"]) + + +def main() -> None: + cfg = _resolve_config(parse_args()) + svm = cfg["svm"] + volume = cfg["volume"] + size = cfg["size"] + aggregate = cfg["aggregate"] + client_match = cfg["client_match"] + policy_name = f"{volume}_export_policy" + + with OntapClient.from_env() as client: + volume_uuid = _ensure_volume(client, svm, volume, size, aggregate) + policy_id = _ensure_export_policy(client, svm, policy_name) + _ensure_client_rule(client, policy_id, client_match) + _assign_export_policy(client, volume_uuid, policy_name) + + logger.info( + "[OK] Volume '%s' (%s) on SVM '%s' | Mount: /%s | " + "Export policy '%s' with client rule '%s' assigned", + volume, + size, + svm, + volume, + policy_name, + client_match, + ) if __name__ == "__main__": From 565fed06b918069f06e6588f6275893590a42e12 Mon Sep 17 00:00:00 2001 From: Somanath Date: Fri, 8 May 2026 12:20:42 +0530 Subject: [PATCH 3/3] docs+refactor: add docstrings and break large functions into focused helpers - Add docstrings to every function across all 8 Python scripts - Extract phases from large main() functions into named helpers: * snapmirror_provision_dest_managed: _preflight_source, _setup_peering, _ensure_dest_volume, _create_sm_relationship, _convergence_and_report * snapmirror_provision_src_managed: same phase helpers * snapmirror_test_failover: _select_target_volume, _preflight_and_get_rel, _get_latest_sm_snapshot, _create_test_clone, _verify_and_tag_clone * snapmirror_cleanup_test_failover: _find_relationship, _find_tagged_clone, _remove_smas_and_restore_online, _unmount_clone, _offline_clone * cifs_provision: _ensure_cifs_server, _ensure_volume_ntfs, _get_svm_uuid, _ensure_cifs_share, _set_share_acl, _verify_and_log_acls * nfs_provision: _ensure_volume, _ensure_export_policy, _ensure_client_rule, _assign_export_policy * cluster_setup_basic: _build_cluster_body extracted from create_cluster - All main() functions now <=40 lines (were up to 232 lines) --- python/cifs_provision.py | 2 + python/cluster_info.py | 1 + python/cluster_setup_basic.py | 56 ++- python/nfs_provision.py | 2 + python/snapmirror_cleanup_test_failover.py | 349 ++++++++------ python/snapmirror_provision_dest_managed.py | 482 +++++++++++--------- python/snapmirror_provision_src_managed.py | 433 ++++++++++-------- python/snapmirror_test_failover.py | 317 ++++++++----- 8 files changed, 950 insertions(+), 692 deletions(-) diff --git a/python/cifs_provision.py b/python/cifs_provision.py index 720213e..636c346 100644 --- a/python/cifs_provision.py +++ b/python/cifs_provision.py @@ -78,6 +78,7 @@ def _load_env_file(path: str) -> None: def parse_args() -> argparse.Namespace: + """Parse CLI arguments for CIFS share provisioning.""" p = argparse.ArgumentParser(description="Provision a CIFS share on ONTAP") p.add_argument( "--env-file", @@ -329,6 +330,7 @@ def _verify_and_log_acls(client: OntapClient, svm_uuid: str, share_name: str) -> def main() -> None: + """Provision a CIFS share on an ONTAP SVM, including volume and ACL setup.""" cfg = _resolve_config(parse_args()) svm = cfg["svm"] volume = cfg["volume"] diff --git a/python/cluster_info.py b/python/cluster_info.py index 703517d..7a48811 100644 --- a/python/cluster_info.py +++ b/python/cluster_info.py @@ -30,6 +30,7 @@ def main() -> None: + """Retrieve the cluster ONTAP version and list all nodes with serial numbers.""" with OntapClient.from_env() as client: # Step 1: cluster version cluster = client.get("/cluster", fields="version") diff --git a/python/cluster_setup_basic.py b/python/cluster_setup_basic.py index 1d636b6..fa12d5e 100644 --- a/python/cluster_setup_basic.py +++ b/python/cluster_setup_basic.py @@ -23,9 +23,8 @@ export PARTNER_MGMT_IP=10.x.x.y python cluster_setup_basic.py - # or use a per-build .env file (analogous to -ir ) - python cluster_setup_basic.py --env-file r9141_build.env - python cluster_setup_basic.py --env-file r919_build.env + # or supply values via a KEY=VALUE env file + python cluster_setup_basic.py --env-file cluster.env """ from __future__ import annotations @@ -81,6 +80,10 @@ def _env(key: str, required: bool = True) -> str: + """Return the value for *key* from the INPUTS dict, falling back to the environment. + + Exits with an error log if *required* is True and the key is unset. + """ # Prefer value from INPUTS dict; fall back to environment variable. val = INPUTS.get(key) or os.environ.get(key, "") if required and not val: @@ -163,20 +166,24 @@ def discover_partner(client: OntapClient, local_uuid: str) -> dict: return result -def create_cluster(client: OntapClient, local: dict, partner: dict) -> dict: - """Step 4 — POST /api/cluster to create the cluster.""" - cluster_name = _env("CLUSTER_NAME") - cluster_pass = _env("CLUSTER_PASS") - cluster_mgmt_ip = _env("CLUSTER_MGMT_IP") - cluster_netmask = _env("CLUSTER_NETMASK") - cluster_gateway = _env("CLUSTER_GATEWAY") - ontap_host = _env("ONTAP_HOST") - partner_mgmt_ip = _env("PARTNER_MGMT_IP") - - local_node = local["records"][0] - partner_node = partner["records"][0] - - body = { +def _build_cluster_body( + local_node: dict, + partner_node: dict, + cluster_name: str, + cluster_pass: str, + cluster_mgmt_ip: str, + cluster_netmask: str, + cluster_gateway: str, + ontap_host: str, + partner_mgmt_ip: str, +) -> dict: + """Build and return the POST body for the /cluster endpoint. + + The body includes cluster management interface, both node management + interfaces, cluster interfaces, and empty placeholder keys required + by the ONTAP REST API. + """ + return { "name": cluster_name, "password": cluster_pass, "management_interface": { @@ -208,6 +215,20 @@ def create_cluster(client: OntapClient, local: dict, partner: dict) -> dict: "configuration_backup": {}, } + +def create_cluster(client: OntapClient, local: dict, partner: dict) -> dict: + """Step 4 — POST /api/cluster to create the cluster.""" + body = _build_cluster_body( + local_node=local["records"][0], + partner_node=partner["records"][0], + cluster_name=_env("CLUSTER_NAME"), + cluster_pass=_env("CLUSTER_PASS"), + cluster_mgmt_ip=_env("CLUSTER_MGMT_IP"), + cluster_netmask=_env("CLUSTER_NETMASK"), + cluster_gateway=_env("CLUSTER_GATEWAY"), + ontap_host=_env("ONTAP_HOST"), + partner_mgmt_ip=_env("PARTNER_MGMT_IP"), + ) result = client.post("/cluster?keep_precluster_config=true", body) job_uuid = result.get("job", {}).get("uuid") logger.info("create_cluster — job %s", job_uuid) @@ -241,6 +262,7 @@ def track_job(client: OntapClient, job_uuid: str) -> dict: def main() -> None: + """Create an ONTAP cluster from two pre-cluster nodes discovered via the REST API.""" host = _env("ONTAP_HOST") user = _env("ONTAP_USER") passwd = os.environ.get("ONTAP_PASS", "") # empty on pre-cluster nodes diff --git a/python/nfs_provision.py b/python/nfs_provision.py index 3c2201d..3a35d9e 100644 --- a/python/nfs_provision.py +++ b/python/nfs_provision.py @@ -82,6 +82,7 @@ def _load_env_file(path: str) -> None: def parse_args() -> argparse.Namespace: + """Parse CLI arguments for NFS volume provisioning.""" p = argparse.ArgumentParser(description="Provision an NFS volume on ONTAP") p.add_argument( "--env-file", @@ -232,6 +233,7 @@ def _assign_export_policy(client: OntapClient, volume_uuid: str, policy_name: st def main() -> None: + """Provision an NFS volume with a dedicated export policy on an ONTAP SVM.""" cfg = _resolve_config(parse_args()) svm = cfg["svm"] volume = cfg["volume"] diff --git a/python/snapmirror_cleanup_test_failover.py b/python/snapmirror_cleanup_test_failover.py index fd255db..d1cfc88 100644 --- a/python/snapmirror_cleanup_test_failover.py +++ b/python/snapmirror_cleanup_test_failover.py @@ -60,6 +60,10 @@ def _env(key: str, default: str = "") -> str: + """Return the value for *key* from the INPUTS dict, falling back to the environment. + + Exits with an error log if the resolved value is empty and *default* is not set. + """ # Prefer value from INPUTS dict; fall back to environment variable. val = INPUTS.get(key) or os.environ.get(key, default) if not val: @@ -72,6 +76,10 @@ def _env(key: str, default: str = "") -> str: def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: + """Poll an ONTAP async job until it leaves the 'running' state. + + Returns the final job record. Callers should inspect the returned state/error fields. + """ while True: result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") state = result.get("state", "unknown") @@ -114,26 +122,175 @@ def _pick_cluster_by_relationship( sys.exit(1) -def main() -> None: - cluster_a = _env("CLUSTER_A") - cluster_b = _env("CLUSTER_B") - dest_user = _env("DEST_USER") - dest_pass = _env("DEST_PASS") - source_volume = _env("SOURCE_VOLUME") - source_svm = _env("SOURCE_SVM") +def _find_tagged_clone( + client: OntapClient, rel_uuid: str, source_svm: str, source_volume: str, dest_host: str +) -> dict | None: + """Phase A: Find the FlexClone tagged ':test'. - # ── Phase 0: Find SM relationship on correct cluster ───────────────────── - # Search both clusters for the SnapMirror relationship matching the given - # source SVM and volume. Returns the cluster that owns the destination side - # so all subsequent API calls go to the correct cluster. + Returns the volume record if found, or None if no tagged clone exists. + Only clones tagged by snapmirror_test_failover.py are matched. + """ + logger.info("=== Phase A: Find tagged clone ===") + resp = client.get( + "/storage/volumes", + fields="name,uuid,svm.name,state,nas.path", + **{"_tags": f"{rel_uuid}:test", "max_records": "1"}, + ) + if resp.get("num_records", 0) == 0: + logger.info( + "NO TAGGED CLONE FOUND for %s:%s on %s — nothing to clean up", + source_svm, + source_volume, + dest_host, + ) + return None + clone = resp["records"][0] + logger.info( + "CLONE FOUND | name=%s | uuid=%s | svm=%s | cluster=%s", + clone.get("name"), + clone.get("uuid"), + clone.get("svm", {}).get("name"), + dest_host, + ) + return clone + + +def _remove_smas_and_restore_online( + client: OntapClient, clone_uuid: str, clone_svm: str, clone_name: str +) -> None: + """Phase B: Delete any SMAS relationships on the clone and bring it online. + + A SnapMirror Active Sync relationship on the clone holds an internal ONTAP + job lock that prevents unmount and delete (errors 917536, 23003209). + Deleting it first releases the lock. The volume is also brought online in + case a previous failed cleanup run left it in an offline state. + """ + logger.info("=== Phase B: Remove SMAS relationship on clone (if any) ===") + smas_resp = client.get( + "/snapmirror/relationships", + fields="uuid,state", + **{"destination.path": f"{clone_svm}:{clone_name}", "max_records": "10"}, + ) + for smas_rel in smas_resp.get("records", []): + smas_uuid = smas_rel.get("uuid", "") + logger.info(" Deleting SMAS relationship %s on clone", smas_uuid) + try: + resp = client.delete( + f"/snapmirror/relationships/{smas_uuid}?return_timeout=120&force=true" + ) + job_uuid = resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(client, job_uuid) + except Exception as exc: + logger.warning("delete_smas_rel %s — %s (continuing)", smas_uuid, exc) + + if smas_resp.get("num_records", 0) == 0: + logger.info(" No SMAS relationships found on clone — continuing") + + try: + resp = client.patch( + f"/storage/volumes/{clone_uuid}?return_timeout=120", + body={"state": "online"}, + ) + job_uuid = resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(client, job_uuid) + except Exception as exc: + logger.warning("bring_online — %s (continuing)", exc) + + +def _unmount_clone(client: OntapClient, clone_uuid: str) -> None: + """Phase C: Remove NAS junction path to unmount the clone. + + Retries up to 6 times with a 10-second delay to allow ONTAP to fully + release background locks before giving up. + """ + logger.info("=== Phase C: Unmount clone ===") + for attempt in range(1, 7): + try: + resp = client.patch( + f"/storage/volumes/{clone_uuid}?return_timeout=120", + body={"nas": {"path": ""}}, + ) + job_uuid = resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(client, job_uuid) + return + except Exception as exc: + logger.warning("unmount_clone attempt %d/6 — %s", attempt, exc) + if attempt < 6: + time.sleep(10) + logger.error("Failed to unmount clone after 6 attempts — aborting") + sys.exit(1) + + +def _offline_clone(client: OntapClient, clone_uuid: str) -> None: + """Phase D: Set volume state to offline. + + A volume must be offline before it can be deleted in ONTAP. + """ + logger.info("=== Phase D: Offline clone ===") + try: + resp = client.patch( + f"/storage/volumes/{clone_uuid}?return_timeout=120", + body={"state": "offline"}, + ) + job_uuid = resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(client, job_uuid) + except Exception as exc: + logger.warning("offline_clone — %s", exc) + + +def _delete_clone_and_verify( + client: OntapClient, clone_uuid: str, clone_name: str, dest_host: str +) -> None: + """Phase E: Delete the clone and verify it no longer exists. + + Treats a 'not found' response as success (idempotent). + """ + logger.info("=== Phase E: Delete clone ===") + try: + resp = client.delete(f"/storage/volumes/{clone_uuid}?return_timeout=120") + job_uuid = resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(client, job_uuid) + except Exception as exc: + logger.warning("delete_clone — %s", exc) + + confirm = client.get( + "/storage/volumes", + fields="name,uuid", + **{"uuid": clone_uuid, "max_records": "1"}, + ) + if confirm.get("num_records", 0) == 0: + logger.info( + "=== CLEANUP COMPLETE — clone '%s' deleted from cluster %s ===", + clone_name, + dest_host, + ) + else: + logger.error("Clone '%s' still exists after delete attempt", clone_name) + sys.exit(1) + + +def _find_relationship( + cluster_a: str, + cluster_b: str, + dest_user: str, + dest_pass: str, + source_svm: str, + source_volume: str, +) -> tuple[str, dict, str]: + """Phase 0: Locate the SnapMirror relationship on the correct cluster. + + Searches both clusters and returns (dest_host, rel, rel_uuid) for the + cluster that owns the destination side of the relationship. + Logs a warning if the relationship is not in 'snapmirrored' state. + """ logger.info("=== Phase 0: Find SnapMirror relationship ===") dest_host, rel = _pick_cluster_by_relationship( - cluster_a, - cluster_b, - dest_user, - dest_pass, - source_svm, - source_volume, + cluster_a, cluster_b, dest_user, dest_pass, source_svm, source_volume ) rel_uuid = rel.get("uuid", "") logger.info( @@ -145,153 +302,45 @@ def main() -> None: rel.get("state"), rel.get("healthy"), ) - if rel.get("state") != "snapmirrored": logger.warning( "Relationship state=%s healthy=%s — proceeding with cleanup anyway", rel.get("state"), rel.get("healthy"), ) + return dest_host, rel, rel_uuid + + +def main() -> None: + """Delete the test-failover FlexClone and restore the SnapMirror relationship. + + Identifies the correct cluster via the SnapMirror relationship UUID, then + removes SMAS entries, unmounts, offlines, and deletes the FlexClone. + """ + cluster_a = _env("CLUSTER_A") + cluster_b = _env("CLUSTER_B") + dest_user = _env("DEST_USER") + dest_pass = _env("DEST_PASS") + source_volume = _env("SOURCE_VOLUME") + source_svm = _env("SOURCE_SVM") + + dest_host, rel, rel_uuid = _find_relationship( + cluster_a, cluster_b, dest_user, dest_pass, source_svm, source_volume + ) with OntapClient(dest_host, dest_user, dest_pass, verify_ssl=False) as client: - # ── Phase A: Find tagged clone ──────────────────────────────────── - # Search for volumes tagged ':test' — only volumes created by - # snapmirror_test_failover.py carry this tag, preventing accidental - # deletion of manually created volumes with similar names. - logger.info("=== Phase A: Find tagged clone ===") - tagged_resp = client.get( - "/storage/volumes", - fields="name,uuid,svm.name,state,nas.path", - **{"_tags": f"{rel_uuid}:test", "max_records": "1"}, - ) - if tagged_resp.get("num_records", 0) == 0: - logger.info( - "NO TAGGED CLONE FOUND for %s:%s on %s — nothing to clean up", - source_svm, - source_volume, - dest_host, - ) + clone = _find_tagged_clone(client, rel_uuid, source_svm, source_volume, dest_host) + if clone is None: return - clone = tagged_resp["records"][0] clone_uuid = clone.get("uuid", "") clone_name = clone.get("name", "") clone_svm = clone.get("svm", {}).get("name", "") - logger.info( - "CLONE FOUND | name=%s | uuid=%s | svm=%s | cluster=%s", - clone_name, - clone_uuid, - clone_svm, - dest_host, - ) - - # ── Phase B: Delete SMAS relationship protecting the clone ──────────── - # The clone may be the destination of a SnapMirror Active Sync (SMAS) - # relationship. This holds an internal ONTAP job lock that prevents - # unmount and delete (errors 917536, 23003209). Delete the SMAS - # relationship first to release the lock entirely. - logger.info("=== Phase B: Remove SMAS relationship on clone (if any) ===") - smas_resp = client.get( - "/snapmirror/relationships", - fields="uuid,state", - **{ - "destination.path": f"{clone_svm}:{clone_name}", - "max_records": "10", - }, - ) - for smas_rel in smas_resp.get("records", []): - smas_uuid = smas_rel.get("uuid", "") - logger.info(" Deleting SMAS relationship %s on clone", smas_uuid) - try: - resp = client.delete( - f"/snapmirror/relationships/{smas_uuid}?return_timeout=120&force=true" - ) - job_uuid = resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(client, job_uuid) - except Exception as exc: - logger.warning("delete_smas_rel %s — %s (continuing)", smas_uuid, exc) - if smas_resp.get("num_records", 0) == 0: - logger.info(" No SMAS relationships found on clone — continuing") - - # Also bring online in case a previous failed run left it offline - try: - resp = client.patch( - f"/storage/volumes/{clone_uuid}?return_timeout=120", - body={"state": "online"}, - ) - job_uuid = resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(client, job_uuid) - except Exception as exc: - logger.warning("bring_online — %s (continuing)", exc) - - # ── Phase C: Unmount clone ────────────────────────────────────────── - # Remove the NAS junction path to unmount. Retry up to 6 times with a - # 10-second delay to let ONTAP fully release background locks. - logger.info("=== Phase C: Unmount clone ===") - unmounted = False - for attempt in range(1, 7): - try: - resp = client.patch( - f"/storage/volumes/{clone_uuid}?return_timeout=120", - body={"nas": {"path": ""}}, - ) - job_uuid = resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(client, job_uuid) - unmounted = True - break - except Exception as exc: - logger.warning("unmount_clone attempt %d/6 — %s", attempt, exc) - if attempt < 6: - time.sleep(10) - if not unmounted: - logger.error("Failed to unmount clone after 6 attempts — aborting") - sys.exit(1) - - # ── Phase D: Offline clone ─────────────────────────────────────────── - # Set the volume state to offline. A volume must be offline before it - # can be deleted in ONTAP. - logger.info("=== Phase D: Offline clone ===") - try: - resp = client.patch( - f"/storage/volumes/{clone_uuid}?return_timeout=120", - body={"state": "offline"}, - ) - job_uuid = resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(client, job_uuid) - except Exception as exc: - logger.warning("offline_clone — %s", exc) - - # ── Phase E: Delete clone ───────────────────────────────────────── - # DELETE the volume then verify removal by querying by UUID. - # Idempotent: if the volume is already gone, log success and exit cleanly. - logger.info("=== Phase E: Delete clone ===") - try: - resp = client.delete(f"/storage/volumes/{clone_uuid}?return_timeout=120") - job_uuid = resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(client, job_uuid) - except Exception as exc: - logger.warning("delete_clone — %s", exc) - # Confirm deletion (idempotent: not-found = success) - confirm = client.get( - "/storage/volumes", - fields="name,uuid", - **{"uuid": clone_uuid, "max_records": "1"}, - ) - if confirm.get("num_records", 0) == 0: - logger.info( - "=== CLEANUP COMPLETE — clone '%s' deleted from cluster %s ===", - clone_name, - dest_host, - ) - else: - logger.error("Clone '%s' still exists after delete attempt", clone_name) - sys.exit(1) + _remove_smas_and_restore_online(client, clone_uuid, clone_svm, clone_name) + _unmount_clone(client, clone_uuid) + _offline_clone(client, clone_uuid) + _delete_clone_and_verify(client, clone_uuid, clone_name, dest_host) if __name__ == "__main__": diff --git a/python/snapmirror_provision_dest_managed.py b/python/snapmirror_provision_dest_managed.py index a12b391..a8399f5 100644 --- a/python/snapmirror_provision_dest_managed.py +++ b/python/snapmirror_provision_dest_managed.py @@ -77,6 +77,10 @@ def _env(key: str, default: str = "") -> str: + """Return the value for *key* from the INPUTS dict, falling back to the environment. + + Exits with an error log if the resolved value is empty and *default* is not set. + """ # Prefer value from INPUTS dict; fall back to environment variable. val = INPUTS.get(key) or os.environ.get(key, default) if not val: @@ -89,6 +93,10 @@ def _env(key: str, default: str = "") -> str: def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: + """Poll an ONTAP async job until it leaves the 'running' state. + + Returns the final job record. Callers should inspect the returned state/error fields. + """ while True: result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") state = result.get("state", "unknown") @@ -101,6 +109,10 @@ def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: def _wait_snapmirrored( client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 ) -> dict: + """Poll a SnapMirror relationship until state reaches 'snapmirrored'. + + Raises RuntimeError if *max_wait* seconds elapse before convergence. + """ elapsed = 0 while elapsed < max_wait: result = client.get( @@ -161,6 +173,7 @@ def _check_ic_lif_preconditions( sys.exit(1) def _subnet24(ip: str) -> str: + """Return the /24 prefix (first three octets) of *ip*.""" return ".".join(ip.split(".")[:3]) src_subnets = {_subnet24(ip) for ip in src_ips} @@ -367,239 +380,290 @@ def _setup_svm_peer( return alias -def main() -> None: - source_host = _env("SOURCE_HOST") - source_user = _env("SOURCE_USER") - source_pass = _env("SOURCE_PASS") - source_svm = _env("SOURCE_SVM") - source_volume = _env("SOURCE_VOLUME") +def _preflight_source( + src: OntapClient, source_host: str, source_svm: str, source_volume: str +) -> dict: + """Phase A: Verify source cluster and confirm the source volume is RW. - dest_host = _env("DEST_HOST") - dest_user = _env("DEST_USER") - dest_pass = _env("DEST_PASS") - dest_svm = _env("DEST_SVM") - sm_policy = os.environ.get("SM_POLICY", "Asynchronous") + Returns the source volume record. + Aborts if the volume is missing or is a DP type (DP volumes cannot be + used as a SnapMirror source). + """ + logger.info("=== Phase A: Source pre-flight ===") + src_cluster = src.get("/cluster", fields="name,version") + logger.info( + "SOURCE CLUSTER | name=%s | ontap=%s", + src_cluster.get("name"), + src_cluster.get("version", {}).get("full"), + ) + src_vol_resp = src.get( + "/storage/volumes", + fields="name,uuid,state,type,space.size", + **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, + ) + if src_vol_resp.get("num_records", 0) == 0: + logger.error("ABORTED — source volume '%s' not found on %s", source_volume, source_host) + sys.exit(1) + src_vol = src_vol_resp["records"][0] + if src_vol.get("type") == "dp": + logger.error("ABORTED — source volume is type=dp; specify the RW volume") + sys.exit(1) + logger.info( + "SOURCE VOLUME | svm=%s | name=%s | uuid=%s | state=%s | type=%s | size=%s", + source_svm, + src_vol["name"], + src_vol["uuid"], + src_vol["state"], + src_vol["type"], + src_vol.get("space", {}).get("size"), + ) + return src_vol - dest_volume = f"{source_volume}_dest" - src = OntapClient(source_host, source_user, source_pass, verify_ssl=False) - dst = OntapClient(dest_host, dest_user, dest_pass, verify_ssl=False) +def _setup_peering( + src: OntapClient, dst: OntapClient, source_svm: str, dest_svm: str +) -> tuple[str, str, str]: + """Phases B0+B1: Ensure cluster peer and SVM peer exist. - with src, dst: - # ── Phase A: Source pre-flight ──────────────────────────────────── - # Verify source cluster is reachable and the source volume is a - # writable (RW) type — DP volumes cannot be used as a SnapMirror source. - logger.info("=== Phase A: Source pre-flight ===") - src_cluster = src.get("/cluster", fields="name,version") - logger.info( - "SOURCE CLUSTER | name=%s | ontap=%s", - src_cluster.get("name"), - src_cluster.get("version", {}).get("full"), - ) + Returns (peer_name, source_svm_alias, aggr_name). + peer_name - name the destination uses to reference the source cluster. + source_svm_alias - SVM alias used in the SnapMirror source path. + aggr_name - destination aggregate to host the DP volume. + """ + logger.info("=== Phase B: Dest pre-flight ===") + dst_cluster = dst.get("/cluster", fields="name,version") + logger.info( + "DEST CLUSTER | name=%s | ontap=%s", + dst_cluster.get("name"), + dst_cluster.get("version", {}).get("full"), + ) - src_vol_resp = src.get( - "/storage/volumes", - fields="name,uuid,state,type,space.size", - **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, - ) - if src_vol_resp.get("num_records", 0) == 0: - logger.error( - "ABORTED — source volume '%s' not found on %s", - source_volume, - source_host, - ) - sys.exit(1) - src_vol = src_vol_resp["records"][0] - if src_vol.get("type") == "dp": - logger.error("ABORTED — source volume is type=dp; specify the RW volume") - sys.exit(1) - logger.info( - "SOURCE VOLUME | svm=%s | name=%s | uuid=%s | state=%s | type=%s | size=%s", - source_svm, - src_vol["name"], - src_vol["uuid"], - src_vol["state"], - src_vol["type"], - src_vol.get("space", {}).get("size"), - ) + logger.info("=== Phase B0: Cluster peer setup ===") + src_peer_name, peer_name, dst_peer_uuid = _setup_cluster_peer(src, dst, source_svm, dest_svm) - # ── Phase B: Dest pre-flight ────────────────────────────────────── - # Verify destination cluster connectivity, get the cluster peer name - # (required to reference the source from the destination side), and - # pick an aggregate to host the new destination DP volume. - logger.info("=== Phase B: Dest pre-flight ===") - dst_cluster = dst.get("/cluster", fields="name,version") - logger.info( - "DEST CLUSTER | name=%s | ontap=%s", - dst_cluster.get("name"), - dst_cluster.get("version", {}).get("full"), - ) + aggr_resp = dst.get("/storage/aggregates", fields="name,state", **{"max_records": "1"}) + aggr_name = aggr_resp.get("records", [{}])[0].get("name", "") + logger.info("DEST AGGREGATE | name=%s", aggr_name) - # ── Phase B0: Cluster peer setup ────────────────────────────────── - logger.info("=== Phase B0: Cluster peer setup ===") - src_peer_name, peer_name, dst_peer_uuid = _setup_cluster_peer( - src, dst, source_svm, dest_svm - ) + logger.info("=== Phase B1: SVM peer setup ===") + source_svm_alias = _setup_svm_peer( + src, dst, source_svm, dest_svm, src_peer_name, peer_name, dst_peer_uuid + ) + return peer_name, source_svm_alias, aggr_name - aggr_resp = dst.get( - "/storage/aggregates", - fields="name,state", - **{"max_records": "1"}, - ) - aggr_name = aggr_resp.get("records", [{}])[0].get("name", "") - logger.info("DEST AGGREGATE | name=%s", aggr_name) - # ── Phase B1: SVM peer setup ────────────────────────────────────── - logger.info("=== Phase B1: SVM peer setup ===") - source_svm_alias = _setup_svm_peer( - src, dst, source_svm, dest_svm, src_peer_name, peer_name, dst_peer_uuid +def _ensure_dest_volume( + dst: OntapClient, + dest_volume: str, + dest_svm: str, + aggr_name: str, + src_vol_size: str, +) -> None: + """Phase C: Create the DP destination volume if it does not already exist. + + DP (data-protection) type volumes are required as SnapMirror destinations. + The create is skipped silently if the volume already exists. + """ + logger.info("=== Phase C: Dest volume setup ===") + try: + dst.post( + "/storage/volumes?return_timeout=120", + body={ + "name": dest_volume, + "type": "dp", + "svm": {"name": dest_svm}, + "aggregates": [{"name": aggr_name}], + "space": {"size": src_vol_size}, + }, ) + logger.info("DEST VOLUME | created '%s' on aggregate '%s'", dest_volume, aggr_name) + except Exception as exc: + logger.info("create_dest_volume — %s (skipped — may already exist)", exc) - # ── Phase C: Dest volume setup ──────────────────────────────────── - # Auto-create a DP volume on the destination to receive SnapMirror - # transfers. The create is skipped silently if the volume already exists. - logger.info("=== Phase C: Dest volume setup ===") - try: - dst.post( - "/storage/volumes?return_timeout=120", - body={ - "name": dest_volume, - "type": "dp", - "svm": {"name": dest_svm}, - "aggregates": [{"name": aggr_name}], - "space": {"size": str(src_vol.get("space", {}).get("size", ""))}, - }, - ) - logger.info( - "DEST VOLUME | created '%s' on aggregate '%s'", - dest_volume, - aggr_name, - ) - except Exception as exc: - logger.info("create_dest_volume — %s (skipped — may already exist)", exc) + dst_vol_resp = dst.get( + "/storage/volumes", + fields="name,uuid,state,type", + **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, + ) + dst_vol = dst_vol_resp.get("records", [{}])[0] + logger.info( + "DEST VOLUME | svm=%s | name=%s | uuid=%s | state=%s | type=%s", + dest_svm, + dst_vol.get("name"), + dst_vol.get("uuid"), + dst_vol.get("state"), + dst_vol.get("type"), + ) - dst_vol_resp = dst.get( - "/storage/volumes", - fields="name,uuid,state,type", - **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, - ) - dst_vol = dst_vol_resp.get("records", [{}])[0] - logger.info( - "DEST VOLUME | svm=%s | name=%s | uuid=%s | state=%s | type=%s", - dest_svm, - dst_vol.get("name"), - dst_vol.get("uuid"), - dst_vol.get("state"), - dst_vol.get("type"), + +def _create_sm_relationship( + dst: OntapClient, + src: OntapClient, + source_svm_alias: str, + source_volume: str, + dest_svm: str, + dest_volume: str, + peer_name: str, + sm_policy: str, +) -> str: + """Phase D: Create and initialize the SnapMirror relationship from the destination. + + Returns the relationship UUID. + All SnapMirror relationship API calls are driven from the destination cluster + (ONTAP requirement). If the relationship already exists, the POST fails + gracefully. A baseline transfer is then triggered explicitly. + """ + logger.info("=== Phase D: Relationship setup ===") + existing = dst.get( + "/snapmirror/relationships", + fields="uuid,state,healthy", + **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + ) + logger.info("RELATIONSHIP CHECK | existing=%d", existing.get("num_records", 0)) + + try: + create_resp = dst.post( + "/snapmirror/relationships?return_timeout=120", + body={ + "source": { + "path": f"{source_svm_alias}:{source_volume}", + "cluster": {"name": peer_name}, + }, + "destination": {"path": f"{dest_svm}:{dest_volume}"}, + "policy": {"name": sm_policy}, + }, ) + job_uuid = create_resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(dst, job_uuid) + except Exception as exc: + logger.info("create_and_initialize_relationship — %s (may already exist)", exc) - # ── Phase D: Relationship setup ─────────────────────────────────── - # Create and initialize the SnapMirror relationship from destination. - # If the relationship already exists the POST fails gracefully. - # After create, the relationship UUID is fetched and a baseline - # transfer is triggered explicitly to start data replication. - logger.info("=== Phase D: Relationship setup ===") - existing = dst.get( - "/snapmirror/relationships", - fields="uuid,state,healthy", - **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + rel_resp = dst.get( + "/snapmirror/relationships", + fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", + **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + ) + rel_records = rel_resp.get("records", []) + if not rel_records: + logger.error( + "ABORTED — SnapMirror relationship not found for '%s:%s'", dest_svm, dest_volume ) - logger.info("RELATIONSHIP CHECK | existing=%d", existing.get("num_records", 0)) - - try: - create_resp = dst.post( - "/snapmirror/relationships?return_timeout=120", - body={ - "source": { - "path": f"{source_svm_alias}:{source_volume}", - "cluster": {"name": peer_name}, - }, - "destination": {"path": f"{dest_svm}:{dest_volume}"}, - "policy": {"name": sm_policy}, - }, - ) - job_uuid = create_resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(dst, job_uuid) - except Exception as exc: - logger.info("create_and_initialize_relationship — %s (may already exist)", exc) - - rel_resp = dst.get( - "/snapmirror/relationships", - fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", - **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + sys.exit(1) + rel = rel_records[0] + rel_uuid = rel.get("uuid", "") + logger.info( + "RELATIONSHIP | uuid=%s | state=%s | healthy=%s | policy=%s", + rel_uuid, + rel.get("state"), + rel.get("healthy"), + rel.get("policy", {}).get("name"), + ) + + try: + dst.post( + f"/snapmirror/relationships/{rel_uuid}/transfers?return_timeout=120", + body={}, ) - rel_records = rel_resp.get("records", []) - if not rel_records: + except Exception as exc: + exc_s = str(exc) + if "13303812" in exc_s: + src_ips = _get_ic_lif_ips(src) + dst_ips = _get_ic_lif_ips(dst) logger.error( - "ABORTED — SnapMirror relationship not found for '%s:%s'", dest_svm, dest_volume + "ABORTED — SnapMirror initialize failed: intercluster LIF connectivity issue.\n" + " Error : %s\n" + " src IC : %s\n" + " dst IC : %s\n" + " Cause : TCP ports 11104/11105 are likely blocked between these IPs.\n" + " Fix : Ask your lab admin to open TCP 11104 and 11105 between\n" + " %s <-> %s", + exc_s, + src_ips, + dst_ips, + src_ips[0] if src_ips else "", + dst_ips[0] if dst_ips else "", ) sys.exit(1) - rel = rel_records[0] - rel_uuid = rel.get("uuid", "") - logger.info( - "RELATIONSHIP | uuid=%s | state=%s | healthy=%s | policy=%s", - rel_uuid, - rel.get("state"), - rel.get("healthy"), - rel.get("policy", {}).get("name"), - ) + logger.info("initialize_relationship — %s (may already be initialized)", exc) + return rel_uuid - try: - dst.post( - f"/snapmirror/relationships/{rel_uuid}/transfers?return_timeout=120", - body={}, - ) - except Exception as exc: - exc_s = str(exc) - if "13303812" in exc_s: - src_ips = _get_ic_lif_ips(src) - dst_ips = _get_ic_lif_ips(dst) - logger.error( - "ABORTED — SnapMirror initialize failed: intercluster LIF connectivity issue.\n" - " Error : %s\n" - " src IC : %s\n" - " dst IC : %s\n" - " Cause : TCP ports 11104/11105 are likely blocked between these IPs.\n" - " Fix : Ask your lab admin to open TCP 11104 and 11105 between\n" - " %s <-> %s", - exc_s, - src_ips, - dst_ips, - src_ips[0] if src_ips else "", - dst_ips[0] if dst_ips else "", - ) - sys.exit(1) - logger.info("initialize_relationship — %s (may already be initialized)", exc) - - # ── Phase E: Convergence polling ────────────────────────────────── - # Poll the relationship until state=snapmirrored (baseline transfer done). - logger.info("=== Phase E: Convergence polling ===") - _wait_snapmirrored(dst, rel_uuid) - - # ── Phase F: Final validation ───────────────────────────────────── - logger.info("=== Phase F: Final validation ===") - final = dst.get( - f"/snapmirror/relationships/{rel_uuid}", - fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", - ) - logger.info( - "=== SNAPMIRROR PROVISION COMPLETE ===\n" - " source : %s:%s\n" - " destination : %s:%s\n" - " state : %s\n" - " healthy : %s\n" - " policy : %s\n" - " lag_time : %s", - source_svm, + +def _convergence_and_report( + dst: OntapClient, + rel_uuid: str, + source_svm: str, + source_volume: str, + dest_svm: str, + dest_volume: str, +) -> None: + """Phases E+F: Poll until snapmirrored, then log the final health report.""" + logger.info("=== Phase E: Convergence polling ===") + _wait_snapmirrored(dst, rel_uuid) + + logger.info("=== Phase F: Final validation ===") + final = dst.get( + f"/snapmirror/relationships/{rel_uuid}", + fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", + ) + logger.info( + "=== SNAPMIRROR PROVISION COMPLETE ===\n" + " source : %s:%s\n" + " destination : %s:%s\n" + " state : %s\n" + " healthy : %s\n" + " policy : %s\n" + " lag_time : %s", + source_svm, + source_volume, + dest_svm, + dest_volume, + final.get("state"), + final.get("healthy"), + final.get("policy", {}).get("name"), + final.get("lag_time"), + ) + + +def main() -> None: + """Provision a SnapMirror relationship driven from the destination cluster. + + Orchestrates cluster/SVM peering, DP volume creation, relationship + initialization, and convergence polling across source and destination clusters. + """ + source_host = _env("SOURCE_HOST") + source_user = _env("SOURCE_USER") + source_pass = _env("SOURCE_PASS") + source_svm = _env("SOURCE_SVM") + source_volume = _env("SOURCE_VOLUME") + + dest_host = _env("DEST_HOST") + dest_user = _env("DEST_USER") + dest_pass = _env("DEST_PASS") + dest_svm = _env("DEST_SVM") + sm_policy = os.environ.get("SM_POLICY", "Asynchronous") + + dest_volume = f"{source_volume}_dest" + + src = OntapClient(source_host, source_user, source_pass, verify_ssl=False) + dst = OntapClient(dest_host, dest_user, dest_pass, verify_ssl=False) + + with src, dst: + src_vol = _preflight_source(src, source_host, source_svm, source_volume) + peer_name, source_svm_alias, aggr_name = _setup_peering(src, dst, source_svm, dest_svm) + src_vol_size = str(src_vol.get("space", {}).get("size", "")) + _ensure_dest_volume(dst, dest_volume, dest_svm, aggr_name, src_vol_size) + rel_uuid = _create_sm_relationship( + dst, + src, + source_svm_alias, source_volume, dest_svm, dest_volume, - final.get("state"), - final.get("healthy"), - final.get("policy", {}).get("name"), - final.get("lag_time"), + peer_name, + sm_policy, ) + _convergence_and_report(dst, rel_uuid, source_svm, source_volume, dest_svm, dest_volume) if __name__ == "__main__": diff --git a/python/snapmirror_provision_src_managed.py b/python/snapmirror_provision_src_managed.py index 3fcefa2..202a0db 100644 --- a/python/snapmirror_provision_src_managed.py +++ b/python/snapmirror_provision_src_managed.py @@ -67,6 +67,10 @@ def _env(key: str, default: str = "") -> str: + """Return the value for *key* from the INPUTS dict, falling back to the environment. + + Exits with an error log if the resolved value is empty and *default* is not set. + """ # Prefer value from INPUTS dict; fall back to environment variable. val = INPUTS.get(key) or os.environ.get(key, default) if not val: @@ -79,6 +83,10 @@ def _env(key: str, default: str = "") -> str: def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: + """Poll an ONTAP async job until it leaves the 'running' state. + + Returns the final job record. Callers should inspect the returned state/error fields. + """ while True: result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") state = result.get("state", "unknown") @@ -91,6 +99,10 @@ def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: def _wait_snapmirrored( client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 ) -> dict: + """Poll a SnapMirror relationship until state reaches 'snapmirrored'. + + Raises RuntimeError if *max_wait* seconds elapse before convergence. + """ elapsed = 0 while elapsed < max_wait: result = client.get( @@ -106,7 +118,232 @@ def _wait_snapmirrored( raise RuntimeError(f"Timed out waiting for relationship {rel_uuid} to reach snapmirrored") +def _preflight_source( + src: OntapClient, source_host: str, source_svm: str, source_volume: str +) -> dict: + """Phase A: Verify source cluster is reachable and the source volume is RW. + + Returns the source volume record. + Aborts if the volume is missing or is a DP type (DP volumes cannot be + used as a SnapMirror source). + """ + logger.info("=== Phase A: Source pre-flight ===") + src_cluster = src.get("/cluster", fields="name,version") + logger.info( + "SOURCE CLUSTER | name=%s | ontap=%s", + src_cluster.get("name"), + src_cluster.get("version", {}).get("full"), + ) + src_vol_resp = src.get( + "/storage/volumes", + fields="name,uuid,state,type,space.size", + **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, + ) + if src_vol_resp.get("num_records", 0) == 0: + logger.error("ABORTED — source volume '%s' not found on %s", source_volume, source_host) + sys.exit(1) + src_vol = src_vol_resp["records"][0] + if src_vol.get("type") == "dp": + logger.error("ABORTED — source volume is type=dp; specify the RW volume") + sys.exit(1) + logger.info( + "SOURCE VOLUME | name=%s | uuid=%s | state=%s | type=%s | size=%s", + src_vol["name"], + src_vol["uuid"], + src_vol["state"], + src_vol["type"], + src_vol.get("space", {}).get("size"), + ) + return src_vol + + +def _preflight_dest(dst: OntapClient) -> tuple[str, str]: + """Phase B: Verify destination cluster and pick a cluster peer and aggregate. + + Returns (peer_name, aggr_name). + The peer_name is the name the destination cluster uses to reference the + source cluster — required when constructing the SnapMirror source path. + """ + logger.info("=== Phase B: Dest pre-flight ===") + dst_cluster = dst.get("/cluster", fields="name,version") + logger.info( + "DEST CLUSTER | name=%s | ontap=%s", + dst_cluster.get("name"), + dst_cluster.get("version", {}).get("full"), + ) + peer_resp = dst.get("/cluster/peers", fields="name,status.state", **{"max_records": "1"}) + peer_name = peer_resp.get("records", [{}])[0].get("name", "") + logger.info("CLUSTER PEER | name=%s", peer_name) + + aggr_resp = dst.get( + "/storage/aggregates", + fields="name,space.block_storage.available", + state="online", + **{"max_records": "1", "order_by": "space.block_storage.available desc"}, + ) + aggr_name = aggr_resp.get("records", [{}])[0].get("name", "") + logger.info("DEST AGGREGATE | name=%s", aggr_name) + return peer_name, aggr_name + + +def _ensure_dest_volume( + dst: OntapClient, + dest_volume: str, + dest_svm: str, + aggr_name: str, + src_vol: dict, +) -> dict: + """Phase C: Create the DP destination volume if it does not already exist. + + Returns the destination volume record. + DP (data-protection) type volumes are required as SnapMirror destinations. + """ + logger.info("=== Phase C: Dest volume setup ===") + check = dst.get( + "/storage/volumes", + fields="name,uuid,state,type", + **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, + ) + if check.get("num_records", 0) == 0: + logger.info("Creating dest DP volume '%s' on '%s'...", dest_volume, aggr_name) + try: + dst.post( + "/storage/volumes?return_timeout=120", + body={ + "name": dest_volume, + "type": "dp", + "svm": {"name": dest_svm}, + "aggregates": [{"name": aggr_name}], + "size": str(src_vol.get("space", {}).get("size", "")), + }, + ) + except Exception as exc: + logger.warning("create_dest_volume — %s (may already exist)", exc) + else: + logger.info("Dest volume '%s' already exists — skipping create", dest_volume) + + vol_resp = dst.get( + "/storage/volumes", + fields="name,uuid,state,type", + **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, + ) + dst_vol = vol_resp.get("records", [{}])[0] + logger.info( + "DEST VOLUME | name=%s | uuid=%s | state=%s | type=%s", + dst_vol.get("name"), + dst_vol.get("uuid"), + dst_vol.get("state"), + dst_vol.get("type"), + ) + return dst_vol + + +def _create_sm_relationship( + dst: OntapClient, + source_svm: str, + source_volume: str, + dest_svm: str, + dest_volume: str, + peer_name: str, + sm_policy: str, +) -> str: + """Phase D: Create, initialize, and return the SnapMirror relationship UUID. + + Creates the relationship from the destination cluster (ONTAP requirement). + If the relationship already exists, the POST fails gracefully. + Triggers a baseline transfer and returns the relationship UUID. + """ + logger.info("=== Phase D: Relationship setup ===") + existing = dst.get( + "/snapmirror/relationships", + fields="uuid,state,healthy", + **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + ) + logger.info("RELATIONSHIP CHECK | existing=%d", existing.get("num_records", 0)) + + try: + create_resp = dst.post( + "/snapmirror/relationships?return_timeout=120", + body={ + "source": { + "path": f"{source_svm}:{source_volume}", + "cluster": {"name": peer_name}, + }, + "destination": {"path": f"{dest_svm}:{dest_volume}"}, + "policy": {"name": sm_policy}, + }, + ) + job_uuid = create_resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(dst, job_uuid) + except Exception as exc: + logger.warning("create_and_initialize_relationship — %s (may already exist)", exc) + + rel_resp = dst.get( + "/snapmirror/relationships", + fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", + **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, + ) + rel = rel_resp.get("records", [{}])[0] + rel_uuid = rel.get("uuid", "") + logger.info( + "RELATIONSHIP FOUND | uuid=%s | state=%s | healthy=%s", + rel_uuid, + rel.get("state"), + rel.get("healthy"), + ) + try: + dst.post( + f"/snapmirror/relationships/{rel_uuid}/transfers?return_timeout=120", + body={}, + ) + except Exception as exc: + logger.warning("initialize_relationship — %s (may already be initialized)", exc) + return rel_uuid + + +def _convergence_and_report( + dst: OntapClient, + rel_uuid: str, + source_svm: str, + source_volume: str, + dest_svm: str, + dest_volume: str, +) -> None: + """Phases E+F: Poll until snapmirrored, then log the final health report.""" + logger.info("=== Phase E: Convergence polling ===") + _wait_snapmirrored(dst, rel_uuid) + + logger.info("=== Phase F: Final validation ===") + final = dst.get( + f"/snapmirror/relationships/{rel_uuid}", + fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", + ) + logger.info( + "=== SNAPMIRROR PROVISION COMPLETE ===\n" + " source : %s:%s\n" + " destination : %s:%s\n" + " state : %s\n" + " healthy : %s\n" + " policy : %s\n" + " lag_time : %s", + source_svm, + source_volume, + dest_svm, + dest_volume, + final.get("state"), + final.get("healthy"), + final.get("policy", {}).get("name"), + final.get("lag_time"), + ) + + def main() -> None: + """Provision a SnapMirror relationship driven from the source cluster. + + Connects to both clusters, verifies the source volume, ensures the destination + volume and relationship exist, then polls for convergence. + """ source_host = _env("SOURCE_HOST") source_user = _env("SOURCE_USER") source_pass = _env("SOURCE_PASS") @@ -125,197 +362,13 @@ def main() -> None: dst = OntapClient(dest_host, dest_user, dest_pass, verify_ssl=False) with src, dst: - # ── Phase A: Source pre-flight ─────────────────────────────────────────── - # Verify source cluster is reachable and the specified volume is a - # writable (RW) type. DP volumes cannot be used as a SnapMirror source. - logger.info("=== Phase A: Source pre-flight ===") - src_cluster = src.get("/cluster", fields="name,version") - logger.info( - "SOURCE CLUSTER | name=%s | ontap=%s", - src_cluster.get("name"), - src_cluster.get("version", {}).get("full"), - ) - - src_vol_resp = src.get( - "/storage/volumes", - fields="name,uuid,state,type,space.size", - **{"max_records": "1", "name": source_volume, "svm.name": source_svm}, - ) - if src_vol_resp.get("num_records", 0) == 0: - logger.error( - "ABORTED — source volume '%s' not found on %s", - source_volume, - source_host, - ) - sys.exit(1) - src_vol = src_vol_resp["records"][0] - if src_vol.get("type") == "dp": - logger.error("ABORTED — source volume is type=dp; specify the RW volume") - sys.exit(1) - logger.info( - "SOURCE VOLUME | name=%s | uuid=%s | state=%s | type=%s | size=%s", - src_vol["name"], - src_vol["uuid"], - src_vol["state"], - src_vol["type"], - src_vol.get("space", {}).get("size"), - ) - - # ── Phase B: Dest pre-flight ───────────────────────────────────── - # Verify destination cluster connectivity. Retrieve the cluster peer name - # (used to reference the source from the destination side) and pick an - # available aggregate to host the new destination DP volume. - logger.info("=== Phase B: Dest pre-flight ===") - dst_cluster = dst.get("/cluster", fields="name,version") - logger.info( - "DEST CLUSTER | name=%s | ontap=%s", - dst_cluster.get("name"), - dst_cluster.get("version", {}).get("full"), - ) - - peer_resp = dst.get( - "/cluster/peers", - fields="name,status.state", - **{"max_records": "1"}, - ) - peer_name = peer_resp.get("records", [{}])[0].get("name", "") - logger.info("CLUSTER PEER | name=%s", peer_name) - - aggr_resp = dst.get( - "/storage/aggregates", - fields="name,space.block_storage.available", - state="online", - **{"max_records": "1", "order_by": "space.block_storage.available desc"}, - ) - aggr_name = aggr_resp.get("records", [{}])[0].get("name", "") - logger.info("DEST AGGREGATE | name=%s", aggr_name) - - # ── Phase C: Auto-create dest DP volume ────────────────────────── - # Check if the destination DP volume already exists; create it if not. - # DP (data-protection) type volumes are required as SnapMirror destinations. - # Volume creation is skipped with a warning if it already exists. - logger.info("=== Phase C: Dest volume setup ===") - check_dest = dst.get( - "/storage/volumes", - fields="name,uuid,state,type", - **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, - ) - if check_dest.get("num_records", 0) == 0: - logger.info("Creating dest DP volume '%s' on '%s'…", dest_volume, aggr_name) - try: - dst.post( - "/storage/volumes?return_timeout=120", - body={ - "name": dest_volume, - "type": "dp", - "svm": {"name": dest_svm}, - "aggregates": [{"name": aggr_name}], - "size": str(src_vol.get("space", {}).get("size", "")), - }, - ) - except Exception as exc: - logger.warning("create_dest_volume — %s (may already exist)", exc) - else: - logger.info("Dest volume '%s' already exists — skipping create", dest_volume) - - dst_vol_resp = dst.get( - "/storage/volumes", - fields="name,uuid,state,type", - **{"max_records": "1", "name": dest_volume, "svm.name": dest_svm}, - ) - dst_vol = dst_vol_resp.get("records", [{}])[0] - logger.info( - "DEST VOLUME | name=%s | uuid=%s | state=%s | type=%s", - dst_vol.get("name"), - dst_vol.get("uuid"), - dst_vol.get("state"), - dst_vol.get("type"), - ) - - # ── Phase D: Create + initialize relationship ───────────────────── - # Create the SnapMirror relationship and trigger a baseline transfer. - # All relationship API calls are made from the destination cluster - # (ONTAP requirement). POST is skipped gracefully if it already exists. - logger.info("=== Phase D: Relationship setup ===") - existing = dst.get( - "/snapmirror/relationships", - fields="uuid,state,healthy", - **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, - ) - logger.info("RELATIONSHIP CHECK | existing=%d", existing.get("num_records", 0)) - - try: - create_resp = dst.post( - "/snapmirror/relationships?return_timeout=120", - body={ - "source": { - "path": f"{source_svm}:{source_volume}", - "cluster": {"name": peer_name}, - }, - "destination": {"path": f"{dest_svm}:{dest_volume}"}, - "policy": {"name": sm_policy}, - }, - ) - job_uuid = create_resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(dst, job_uuid) - except Exception as exc: - logger.warning("create_and_initialize_relationship — %s (may already exist)", exc) - - # ── Phase E: Convergence polling ───────────────────────────────── - # Fetch the relationship UUID, trigger a baseline transfer explicitly, - # then poll until state=snapmirrored confirming initial replication is done. - # Times out after 30 minutes. - logger.info("=== Phase E: Convergence polling ===") - rel_resp = dst.get( - "/snapmirror/relationships", - fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", - **{"destination.path": f"{dest_svm}:{dest_volume}", "max_records": "1"}, - ) - rel = rel_resp.get("records", [{}])[0] - rel_uuid = rel.get("uuid", "") - logger.info( - "RELATIONSHIP FOUND | uuid=%s | state=%s | healthy=%s", - rel_uuid, - rel.get("state"), - rel.get("healthy"), - ) - - try: - dst.post( - f"/snapmirror/relationships/{rel_uuid}/transfers?return_timeout=120", - body={}, - ) - except Exception as exc: - logger.warning("initialize_relationship — %s (may already be initialized)", exc) - - _wait_snapmirrored(dst, rel_uuid) - - # ── Phase F: Final validation ───────────────────────────────────── - # Fetch the final relationship state and print a human-readable summary - # with source, destination, health status, policy, and lag time. - logger.info("=== Phase F: Final validation ===") - final = dst.get( - f"/snapmirror/relationships/{rel_uuid}", - fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", - ) - logger.info( - "=== SNAPMIRROR PROVISION COMPLETE ===\n" - " source : %s:%s\n" - " destination : %s:%s\n" - " state : %s\n" - " healthy : %s\n" - " policy : %s\n" - " lag_time : %s", - source_svm, - source_volume, - dest_svm, - dest_volume, - final.get("state"), - final.get("healthy"), - final.get("policy", {}).get("name"), - final.get("lag_time"), + src_vol = _preflight_source(src, source_host, source_svm, source_volume) + peer_name, aggr_name = _preflight_dest(dst) + _ensure_dest_volume(dst, dest_volume, dest_svm, aggr_name, src_vol) + rel_uuid = _create_sm_relationship( + dst, source_svm, source_volume, dest_svm, dest_volume, peer_name, sm_policy ) + _convergence_and_report(dst, rel_uuid, source_svm, source_volume, dest_svm, dest_volume) if __name__ == "__main__": diff --git a/python/snapmirror_test_failover.py b/python/snapmirror_test_failover.py index 0bf1f97..71c36c2 100644 --- a/python/snapmirror_test_failover.py +++ b/python/snapmirror_test_failover.py @@ -61,6 +61,10 @@ def _env(key: str, default: str = "") -> str: + """Return the value for *key* from the INPUTS dict, falling back to the environment. + + Exits with an error log if the resolved value is empty and *default* is not set. + """ # Prefer value from INPUTS dict; fall back to environment variable. val = INPUTS.get(key) or os.environ.get(key, default) if not val: @@ -73,6 +77,10 @@ def _env(key: str, default: str = "") -> str: def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: + """Poll an ONTAP async job until it leaves the 'running' state. + + Returns the final job record. Callers should inspect the returned state/error fields. + """ while True: result = client.get(f"/cluster/jobs/{job_uuid}", fields="state,message,error,code") state = result.get("state", "unknown") @@ -85,6 +93,10 @@ def _poll_job(client: OntapClient, job_uuid: str, interval: int = 10) -> dict: def _wait_snapmirrored( client: OntapClient, rel_uuid: str, interval: int = 15, max_wait: int = 1800 ) -> dict: + """Poll a SnapMirror relationship until state reaches 'snapmirrored'. + + Raises RuntimeError if *max_wait* seconds elapse before convergence. + """ elapsed = 0 while elapsed < max_wait: result = client.get( @@ -137,17 +149,164 @@ def _pick_cluster( return best_cluster, best_vol -def main() -> None: - cluster_a = _env("CLUSTER_A") - cluster_b = _env("CLUSTER_B") - dest_user = _env("DEST_USER") - dest_pass = _env("DEST_PASS") - source_volume = INPUTS.get("SOURCE_VOLUME") or os.environ.get("SOURCE_VOLUME", "*") +def _preflight_and_get_rel(client: OntapClient, dp_vol_name: str, dp_svm_name: str) -> dict: + """Phase A: Verify destination cluster and fetch the SnapMirror relationship. + + Returns the relationship record for the given DP volume. + """ + logger.info("=== Phase A: Pre-flight ===") + cluster = client.get("/cluster", fields="name,version") + logger.info( + "DEST CLUSTER | name=%s | ontap=%s", + cluster.get("name"), + cluster.get("version", {}).get("full"), + ) + rel_resp = client.get( + "/snapmirror/relationships", + fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", + **{"destination.path": f"{dp_svm_name}:{dp_vol_name}", "max_records": "1"}, + ) + rel = rel_resp.get("records", [{}])[0] + logger.info( + "RELATIONSHIP | uuid=%s | source=%s | dest=%s | state=%s | healthy=%s | lag=%s", + rel.get("uuid"), + rel.get("source", {}).get("path"), + rel.get("destination", {}).get("path"), + rel.get("state"), + rel.get("healthy"), + rel.get("lag_time"), + ) + return rel - # ── Phase 0: Pick cluster ───────────────────────────────────────────── - # Scan both clusters to find which one holds the target DP volume. - # AUTO mode (SOURCE_VOLUME=* or unset): picks the most recently created - # DP volume. TARGETED mode: finds _dest on either cluster. + +def _get_latest_sm_snapshot(client: OntapClient, dp_vol_uuid: str, dp_vol_name: str) -> str: + """Phase B: Return the name of the most recent SnapMirror snapshot on the DP volume. + + The FlexClone must be based on a SnapMirror snapshot to guarantee a + consistent point-in-time copy of the replicated data. + Aborts if no snapshots exist. + """ + logger.info("=== Phase B: Get latest SnapMirror snapshot ===") + resp = client.get( + f"/storage/volumes/{dp_vol_uuid}/snapshots", + fields="name,create_time", + **{"max_records": "1", "order_by": "create_time desc"}, + ) + if resp.get("num_records", 0) == 0: + logger.error("No SnapMirror snapshots on %s — run provision workflow first", dp_vol_name) + sys.exit(1) + snapshot_name = resp["records"][0]["name"] + logger.info( + "LATEST SM SNAPSHOT | name=%s | created=%s", + snapshot_name, + resp["records"][0].get("create_time"), + ) + return snapshot_name + + +def _create_test_clone( + client: OntapClient, dp_vol_name: str, dp_svm_name: str, snapshot_name: str +) -> str: + """Phase C: Create a writable FlexClone of the DP volume from the given snapshot. + + Returns the clone volume name. + The clone is mounted at a NAS junction path so it can be accessed + immediately by test clients without touching the source data. + """ + logger.info("=== Phase C: Create FlexClone ===") + clone_name = f"{dp_vol_name}_clone" + try: + resp = client.post( + "/storage/volumes?return_timeout=120", + body={ + "name": clone_name, + "svm": {"name": dp_svm_name}, + "nas": {"path": f"/{clone_name}"}, + "clone": { + "is_flexclone": True, + "parent_volume": {"name": dp_vol_name}, + "parent_snapshot": {"name": snapshot_name}, + }, + }, + ) + job_uuid = resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(client, job_uuid) + except Exception as exc: + logger.warning("create_test_clone — %s (may already exist)", exc) + return clone_name + + +def _verify_and_tag_clone( + client: OntapClient, clone_name: str, dp_svm_name: str, rel_uuid: str +) -> dict: + """Phase D: Confirm the clone is online and tag it with the SM relationship UUID. + + The tag ':test' allows the cleanup script to identify and delete + only test clones — manually created volumes are never matched. + Returns the clone volume record. + """ + logger.info("=== Phase D: Verify clone + tag ===") + resp = client.get( + "/storage/volumes", + fields="name,uuid,state,nas.path,space.size", + **{"max_records": "1", "name": clone_name, "svm.name": dp_svm_name}, + ) + clone_vol = resp.get("records", [{}])[0] + clone_uuid = clone_vol.get("uuid", "") + logger.info( + "CLONE | name=%s | uuid=%s | state=%s | junction=%s", + clone_vol.get("name"), + clone_uuid, + clone_vol.get("state"), + clone_vol.get("nas", {}).get("path"), + ) + try: + client.patch( + f"/storage/volumes/{clone_uuid}?return_timeout=120", + body={"_tags": [f"{rel_uuid}:test"]}, + ) + logger.info("TAG APPLIED | clone=%s | tag=%s:test", clone_name, rel_uuid) + except Exception as exc: + logger.warning("tag_clone_volume — %s", exc) + return clone_vol + + +def _resync_and_validate(client: OntapClient, rel_uuid: str) -> None: + """Phase E: Resync the SnapMirror relationship and confirm state=snapmirrored. + + The test clone remains accessible while resync runs in the background. + Polls until state=snapmirrored to confirm replication is healthy again. + Times out after 30 minutes. + """ + logger.info("=== Phase E: Resync SnapMirror ===") + try: + resp = client.patch( + f"/snapmirror/relationships/{rel_uuid}?return_timeout=120", + body={"state": "snapmirrored"}, + ) + job_uuid = resp.get("job", {}).get("uuid") + if job_uuid: + _poll_job(client, job_uuid, interval=10) + except Exception as exc: + logger.warning("resync_sm_relationship — %s", exc) + _wait_snapmirrored(client, rel_uuid) + logger.info("=== TEST FAILOVER COMPLETE — SnapMirror resynced ===") + + +def _select_target_volume( + cluster_a: str, + cluster_b: str, + dest_user: str, + dest_pass: str, + source_volume: str, +) -> tuple[str, str, str, str]: + """Phase 0: Scan both clusters to find which one holds the target DP volume. + + AUTO mode (SOURCE_VOLUME=* or unset): picks the most recently created DP volume. + TARGETED mode: finds _dest on either cluster. + Returns (dest_host, dp_vol_name, dp_svm_name, dp_vol_uuid). + """ logger.info("=== Phase 0: Auto-detect target cluster ===") dest_host, dp_vol = _pick_cluster(cluster_a, cluster_b, dest_user, dest_pass, source_volume) dp_vol_name = dp_vol["name"] @@ -162,108 +321,31 @@ def main() -> None: dp_vol.get("state"), dp_vol.get("space", {}).get("size"), ) + return dest_host, dp_vol_name, dp_svm_name, dp_vol_uuid - with OntapClient(dest_host, dest_user, dest_pass, verify_ssl=False) as client: - # ── Phase A: Pre-flight ──────────────────────────────────────────── # Verify the destination cluster is reachable and retrieve the SnapMirror - # relationship details (source, state, health, lag time) for the DP volume. logger.info("=== Phase A: Pre-flight ===") - cluster = client.get("/cluster", fields="name,version") - logger.info( - "DEST CLUSTER | name=%s | ontap=%s", - cluster.get("name"), - cluster.get("version", {}).get("full"), - ) - rel_resp = client.get( - "/snapmirror/relationships", - fields="uuid,source.path,destination.path,state,lag_time,healthy,policy.name", - **{"destination.path": f"{dp_svm_name}:{dp_vol_name}", "max_records": "1"}, - ) - rel = rel_resp.get("records", [{}])[0] - rel_uuid = rel.get("uuid", "") - logger.info( - "RELATIONSHIP | uuid=%s | source=%s | dest=%s | state=%s | healthy=%s | lag=%s", - rel_uuid, - rel.get("source", {}).get("path"), - rel.get("destination", {}).get("path"), - rel.get("state"), - rel.get("healthy"), - rel.get("lag_time"), - ) +def main() -> None: + """Create a writable FlexClone of the SnapMirror destination volume for test failover. - # ── Phase B: Get latest snapshot ───────────────────────────────── # Fetch the most recent SnapMirror snapshot on the DP volume. - # The FlexClone must be based on a SnapMirror snapshot to guarantee - # a consistent point-in-time copy of the replicated data. logger.info("=== Phase B: Get latest SnapMirror snapshot ===") - snap_resp = client.get( - f"/storage/volumes/{dp_vol_uuid}/snapshots", - fields="name,create_time", - **{"max_records": "1", "order_by": "create_time desc"}, - ) - if snap_resp.get("num_records", 0) == 0: - logger.error( - "No SnapMirror snapshots on %s — run provision workflow first", - dp_vol_name, - ) - sys.exit(1) - snapshot_name = snap_resp["records"][0]["name"] - logger.info( - "LATEST SM SNAPSHOT | name=%s | created=%s", - snapshot_name, - snap_resp["records"][0].get("create_time"), - ) + Auto-detects which cluster holds the target DP volume, creates a FlexClone + from the latest SnapMirror snapshot, tags it, then resyncs the relationship. + """ + cluster_a = _env("CLUSTER_A") + cluster_b = _env("CLUSTER_B") + dest_user = _env("DEST_USER") + dest_pass = _env("DEST_PASS") + source_volume = INPUTS.get("SOURCE_VOLUME") or os.environ.get("SOURCE_VOLUME", "*") - # ── Phase C: Create FlexClone ───────────────────────────────────── # Create a writable FlexClone of the DP volume from the latest SnapMirror - # snapshot. The clone gets a NAS junction path so it can be mounted - # immediately on a test client without touching the source data. logger.info("=== Phase C: Create FlexClone ===") - clone_name = f"{dp_vol_name}_clone" - try: - clone_resp = client.post( - "/storage/volumes?return_timeout=120", - body={ - "name": clone_name, - "svm": {"name": dp_svm_name}, - "nas": {"path": f"/{clone_name}"}, - "clone": { - "is_flexclone": True, - "parent_volume": {"name": dp_vol_name}, - "parent_snapshot": {"name": snapshot_name}, - }, - }, - ) - job_uuid = clone_resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(client, job_uuid) - except Exception as exc: - logger.warning("create_test_clone — %s (may already exist)", exc) - - # ── Phase D: Verify clone + tag it ──────────────────────────────── - # Confirm the clone is online and retrieve its UUID and junction path. - # Tag it with the SM relationship UUID (':test') so the cleanup - # script can identify and delete only test clones, never other volumes. - logger.info("=== Phase D: Verify clone + tag ===") - clone_vol_resp = client.get( - "/storage/volumes", - fields="name,uuid,state,nas.path,space.size", - **{"max_records": "1", "name": clone_name, "svm.name": dp_svm_name}, - ) - clone_vol = clone_vol_resp.get("records", [{}])[0] - clone_uuid = clone_vol.get("uuid", "") - logger.info( - "CLONE | name=%s | uuid=%s | state=%s | junction=%s", - clone_vol.get("name"), - clone_uuid, - clone_vol.get("state"), - clone_vol.get("nas", {}).get("path"), - ) + dest_host, dp_vol_name, dp_svm_name, dp_vol_uuid = _select_target_volume( + cluster_a, cluster_b, dest_user, dest_pass, source_volume + ) - # Tag clone so cleanup script can identify it safely - try: - client.patch( - f"/storage/volumes/{clone_uuid}?return_timeout=120", - body={"_tags": [f"{rel_uuid}:test"]}, - ) - logger.info("TAG APPLIED | clone=%s | tag=%s:test", clone_name, rel_uuid) - except Exception as exc: - logger.warning("tag_clone_volume — %s", exc) + with OntapClient(dest_host, dest_user, dest_pass, verify_ssl=False) as client: + rel = _preflight_and_get_rel(client, dp_vol_name, dp_svm_name) + rel_uuid = rel.get("uuid", "") + snapshot_name = _get_latest_sm_snapshot(client, dp_vol_uuid, dp_vol_name) + clone_name = _create_test_clone(client, dp_vol_name, dp_svm_name, snapshot_name) + clone_vol = _verify_and_tag_clone(client, clone_name, dp_svm_name, rel_uuid) logger.info( "=== TEST FAILOVER READY ===\n" @@ -271,7 +353,7 @@ def main() -> None: " Junction : %s\n SVM : %s\n Snapshot : %s\n\n" " ACTION: Mount %s from SVM %s on a test client.", clone_vol.get("name"), - clone_uuid, + clone_vol.get("uuid"), clone_vol.get("state"), clone_vol.get("nas", {}).get("path"), dp_svm_name, @@ -280,24 +362,7 @@ def main() -> None: dp_svm_name, ) - # ── Phase E: Resync SnapMirror ────────────────────────────────────── - # Resume SnapMirror replication after the test clone was created. - # The test clone remains accessible while resync runs in the background. - # Polls until state=snapmirrored to confirm replication is healthy again. - logger.info("=== Phase E: Resync SnapMirror ===") - try: - resync_resp = client.patch( - f"/snapmirror/relationships/{rel_uuid}?return_timeout=120", - body={"state": "snapmirrored"}, - ) - job_uuid = resync_resp.get("job", {}).get("uuid") - if job_uuid: - _poll_job(client, job_uuid, interval=10) - except Exception as exc: - logger.warning("resync_sm_relationship — %s", exc) - - _wait_snapmirrored(client, rel_uuid) - logger.info("=== TEST FAILOVER COMPLETE — SnapMirror resynced ===") + _resync_and_validate(client, rel_uuid) if __name__ == "__main__":