How to migrate notebook environments at scale

Python script to batch-migrate notebook environment versions and Workspace Base Environments using the Workspace REST API.

Written by Dinesh Pawar

Last published at: April 16th, 2026

Problem

Customers who need to upgrade their serverless environment version (for example, from version 1 to version 4) or adopt a Workspace Base Environment (WBE) must update the environment metadata on every notebook in their workspace. The Databricks UI only supports updating one notebook at a time. For workspaces with hundreds or thousands of notebooks, manual migration is impractical.

 

Cause

Each Databricks notebook stores its environment configuration individually, including environment_version, base_environment, and dependencies. There is no built-in bulk update mechanism in the UI. The Workspace REST API does support reading and writing this metadata through the export and import endpoints, but migrating at scale requires orchestration for recursive folder traversal, error handling, post-import validation, and resilience to transient API errors.

 

Solution

Use the following Python script to recursively scan a workspace folder and migrate all notebooks to a target environment version and, optionally, a target base environment (WBE).

 

Use cases

This script supports two migration scenarios:

  • Environment version upgrade: Update all notebooks to a newer serverless environment version (for example, v3 to v4). This applies to all customers regardless of whether they use Workspace Base Environments.
  • Workspace Base Environment (WBE) adoption: Migrate notebooks to bind to a specific WBE. Pass the --base_environment argument with the WBE ID.

 

Prerequisites

  • Python 3.6 or later
  • The requests library (pip install requests)
  • A Databricks personal access token (PAT) with workspace-level permissions
  • Your workspace URL and Org ID (found in your workspace URL after ?o=)

 

Configuration

Before running the script, update the three configuration constants at the top of the file:

  • DOMAIN: Your workspace URL, for example, https://your-domain.cloud.databricks.com
  • TOKEN: Your personal access token
  • ORG_ID: Your org ID, found in your workspace URL after ?o=

 

Warning

Do not commit your API token to version control. Consider using environment variables or a secrets manager for production use.

 

 

Finding your WBE ID (optional, only for WBE adoption)

If you are migrating notebooks to a Workspace Base Environment, you need the WBE ID:

  1. In your Databricks workspace, go to Settings > Workspace admin > Compute.
  2. Find the workspace base environment row.
  3. Click the three-dot menu and select Copy ID. The ID format is: workspace-base-environments/wbe-id-here.

 

Script

import requests

import base64

import json

import time

import random

import datetime

import sys

import argparse

import gc

import re

import hashlib

 

# --- Configuration ---

DOMAIN = 'https://<your-domain>.cloud.databricks.com'

TOKEN = '<your-api-token>'

ORG_ID = '<org-id>'  # Found in your workspace URL after ?o=

 

 

# --- Retry / Backoff ---

MAX_RETRIES = 5

BASE_DELAY = 1.0

 

# --- Memory Cleanup ---

GC_INTERVAL = 50

 

PREAMBLE = "# Databricks notebook source"

RETRYABLE_STATUS_CODES = {429500502503504}

 

# PEP 723 regex to match `# /// script ... # ///` blocks

PEP723_REGEX = re.compile(r'(?m)^# /// (?P<type>[a-zA-Z0-9-]+)$\s(?P<content>(^#(| .*)$\s)+)^# ///$\n*')

 

# Use a persistent Session for connection pooling across 140K+ requests

session = requests.Session()

session.headers.update({

    'Authorization'f'Bearer {TOKEN}',

    'X-Databricks-Org-Id': ORG_ID,

    'Content-Type''application/json'

})

 

 

# --- Custom Exception ---

 

class MigrationError(Exception):

    """Raised when a migration operation fails after exhausting retries."""

 

    def __init__(self, operation, path, message, status_code=None, response_body=None):

        self.operation = operation

        self.path = path

        self.status_code = status_code

        self.response_body = response_body

        detail = f"{message}"

        if status_code:

            detail += f" (HTTP {status_code})"

        if response_body:

            detail += f"\n        Response: {response_body[:500]}"

        super().__init__(f"{operation} failed for {path}: {detail}")

 

 

# --- Helper Functions ---

 

def format_dependencies(deps):

    """Formats a list of strings into a TOML array string with comments."""

    if not deps or not isinstance(deps, list):

        return "[]"

    dep_lines = [f'#   "{d}"' for in deps]

    return "[\n" ",\n".join(dep_lines) + "\n# ]"

 

 

def make_request(method, url, operation, path, **kwargs):

    """

    HTTP request wrapper with exponential backoff using persistent Session.

    Retries on 429 and 5xx errors up to MAX_RETRIES times.

    Raises MigrationError if all retries are exhausted or a non-retryable error occurs.

    """

    for attempt in range(MAX_RETRIES + 1):

        response = None

        try:

            response = session.request(method, url, **kwargs)

 

            if response.status_code == 200:

                return response

 

            status = response.status_code

            body = response.text

            response.close()

            response = None

 

            if status in RETRYABLE_STATUS_CODES and attempt < MAX_RETRIES:

                delay = BASE_DELAY * (** attempt) + random.uniform(01)

                print(f"  [RETRY] Attempt {attempt + 1}/{MAX_RETRIES} for {operation} — "

                      f"waiting {delay:.1f}s (HTTP {status})")

                time.sleep(delay)

                continue

 

            raise MigrationError(

                operation=operation,

                path=path,

                message=f"HTTP request failed after {attempt + 1} attempt(s)",

                status_code=status,

                response_body=body

            )

 

        except requests.exceptions.ConnectionError as e:

            if response:

                response.close()

            if attempt < MAX_RETRIES:

                delay = BASE_DELAY * (** attempt) + random.uniform(01)

                print(f"  [RETRY] Attempt {attempt + 1}/{MAX_RETRIES} for {operation} — "

                      f"waiting {delay:.1f}s (ConnectionError)")

                time.sleep(delay)

                continue

            raise MigrationError(

                operation=operation,

                path=path,

                message=f"Connection error after {MAX_RETRIES + 1} attempts: {e}"

            )

 

        except requests.exceptions.Timeout as e:

            if response:

                response.close()

            if attempt < MAX_RETRIES:

                delay = BASE_DELAY * (** attempt) + random.uniform(01)

                print(f"  [RETRY] Attempt {attempt + 1}/{MAX_RETRIES} for {operation} — "

                      f"waiting {delay:.1f}s (Timeout)")

                time.sleep(delay)

                continue

            raise MigrationError(

                operation=operation,

                path=path,

                message=f"Request timed out after {MAX_RETRIES + 1} attempts: {e}"

            )

 

        except MigrationError:

            raise

 

        except requests.exceptions.RequestException as e:

            if response:

                response.close()

            raise MigrationError(

                operation=operation,

                path=path,

                message=f"Unexpected request error: {e}"

            )

 

 

def compute_source_sha(source_code):

    """Compute SHA-256 of SOURCE export after stripping preamble and PEP 723 blocks."""

    # Remove preamble line

    stripped = source_code.replace(PREAMBLE + '\n'''1)

    # Remove all PEP 723 blocks with [tool.databricks.environment]

    stripped = PEP723_REGEX.sub('', stripped)

    stripped = stripped.strip()

    return hashlib.sha256(stripped.encode('utf-8')).hexdigest()

 

 

def strip_pep723_blocks(source_code):

    """Remove any existing PEP 723 blocks that appear right after the preamble line."""

    lines = source_code.split('\n'1)

    if len(lines) < 2:

        return source_code

    preamble = lines[0]

    rest = lines[1]

    # Strip leading blank lines before checking for PEP 723

    stripped_rest = rest.lstrip('\n')

    # Remove all PEP 723 blocks from the top of the remaining source

    while PEP723_REGEX.match(stripped_rest):

        stripped_rest = PEP723_REGEX.sub('', stripped_rest, count=1).lstrip('\n')

    return preamble + '\n' + stripped_rest

 

 

def export_as_jupyter(obj_path):

    """Export a notebook as JUPYTER and return parsed notebook JSON."""

    res = make_request(

        'GET'f"{DOMAIN}/api/2.0/workspace/export",

        operation="Export notebook as JUPYTER",

        path=obj_path,

        params={'path': obj_path, 'format''JUPYTER'}

    )

    data = res.json()

    res.close()

    nb_json = json.loads(base64.b64decode(data['content']).decode('utf-8'))

    del data

    return nb_json

 

 

def load_previous_results(path):

    """Load previous migration log and return the results dict."""

    try:

        with open(path, 'r'as f:

            data = json.load(f)

    except FileNotFoundError:

        print(f"[FATAL] Previous results file not found: {path}")

        sys.exit(1)

    except json.JSONDecodeError as e:

        print(f"[FATAL] Previous results file is not valid JSON: {path}\n        {e}")

        sys.exit(1)

 

    results = data.get("results")

    if results is None:

        print(f"[FATAL] Previous results file has no 'results' key: {path}")

        sys.exit(1)

 

    if not isinstance(results, dict):

        print(f"[FATAL] 'results' in previous results file is not a dict: {path}")

        sys.exit(1)

 

    count = sum(for in results.values() if v.get("status") == "success")

    print(f"[INFO] Loaded {count} successful migrations from previous results: {path}")

    return results

 

 

def save_log(migration_log, log_file):

    """Write migration log to disk as formatted JSON."""

    with open(log_file, 'w'as f:

        json.dump(migration_log, f, indent=2, default=str)

 

 

def print_summary(migration_log):

    """Print a summary of the migration results."""

    results = migration_log.get("results", {})

    success = 0

    failed = 0

    from_previous = 0

    for entry in results.values():

        if entry.get("from_previous_results"):

            from_previous += 1

        elif entry.get("status") == "success":

            success += 1

        elif entry.get("status") == "failed":

            failed += 1

 

    print(f"\n{'=' * 60}")

    print(f"Migration Summary")

    print(f"{'=' * 60}")

    print(f"  Succeeded (this run):     {success}")

    print(f"  Failed:                   {failed}")

    print(f"  From previous results:    {from_previous}")

    print(f"  Total:                    {len(results)}")

    print(f"{'=' * 60}")

 

 

# --- Main Migration Logic ---

 

def process_hybrid_upgrade(source_path, dry_run, migration_log, log_file, previous_results, notebook_counter, target_base_environment, target_environment_version, target_format):

    """

    Recursively process all notebooks under the given workspace path.

    Returns the updated notebook_counter.

    """

    # 1. List objects

    res = make_request(

        'GET'f"{DOMAIN}/api/2.0/workspace/list",

        operation="List workspace objects",

        path=source_path,

        params={'path': source_path}

    )

    objects = res.json().get('objects', [])

    res.close()

    del res

 

    for obj in objects:

        if obj['object_type'] == 'DIRECTORY':

            notebook_counter = process_hybrid_upgrade(

                obj['path'], dry_run, migration_log, log_file, previous_results, notebook_counter,

                target_base_environment, target_environment_version, target_format

            )

            continue

 

        obj_path = obj['path']

        print(f"\n--- Processing: {obj_path} ---")

 

        # Check previous results — skip if already successful

        if previous_results and obj_path in previous_results:

            prev_entry = previous_results[obj_path]

            if prev_entry.get("status") == "success":

                if not dry_run:

                    carried_entry = dict(prev_entry)

                    carried_entry["from_previous_results"] = True

                    migration_log["results"][obj_path] = carried_entry

                    save_log(migration_log, log_file)

                print(f"  [SKIP] Already migrated in previous run.")

                continue

 

        start_time = time.time()

 

        try:

            # 2. Export as JUPYTER — read metadata

            nb_json = export_as_jupyter(obj_path)

            db_meta = nb_json.get("metadata", {}).get("application/vnd.databricks.v1+notebook", {})

            env_meta = db_meta.get("environmentMetadata")

 

            # Build env_before snapshot

            if env_meta:

                env_before = {

                    "environment_version"str(env_meta.get("environment_version""")),

                    "base_environment": env_meta.get("base_environment"""),

                    "dependencies": env_meta.get("dependencies", [])

                }

            else:

                env_before = {

                    "environment_version"None,

                    "base_environment""",

                    "dependencies": []

                }

 

            # Check if already at target version and base environment

            already_at_target = (

                env_meta

                and str(env_meta.get("environment_version")) == target_environment_version

                and (target_base_environment is None or env_meta.get("base_environment") == target_base_environment)

            )

            if already_at_target:

                if not dry_run:

                    duration_ms = int((time.time() - start_time) * 1000)

                    migration_log["results"][obj_path] = {

                        "status""success",

                        "env_before": env_before,

                        "env_after": env_before,

                        "duration_milliseconds": duration_ms,

                        "from_previous_results": False

                    }

                    save_log(migration_log, log_file)

                print(f"  [OK] Already at target (version={target_environment_version}, base_environment={target_base_environment}). Skipping.")

                del nb_json, db_meta, env_meta

                continue

 

            base_env = target_base_environment if target_base_environment is not None else env_before["base_environment"]

            deps = env_before["dependencies"]

 

            env_after = {

                "environment_version": target_environment_version,

                "base_environment": base_env,

                "dependencies": deps

            }

 

            if target_format == "JUPYTER":

                # --- JUPYTER path: patch metadata in-place and re-import ---

                del env_meta

                env_meta_new = {

                    "base_environment": base_env,

                    "environment_version": target_environment_version,

                    "dependencies": deps

                }

                db_meta["environmentMetadata"] = env_meta_new

                nb_json["metadata"]["application/vnd.databricks.v1+notebook"] = db_meta

 

                if not dry_run:

                    jupyter_encoded = base64.b64encode(

                        json.dumps(nb_json).encode('utf-8')

                    ).decode('utf-8')

                    del nb_json, db_meta

 

                    payload = {

                        "path": obj_path,

                        "format""JUPYTER",

                        "content": jupyter_encoded,

                        "overwrite": True

                    }

                    import_res = make_request(

                        'POST'f"{DOMAIN}/api/2.0/workspace/import",

                        operation="Import updated notebook (JUPYTER)",

                        path=obj_path,

                        json=payload

                    )

                    import_res.close()

                    del jupyter_encoded, payload, import_res

 

                    # Validate by re-exporting JUPYTER metadata

                    nb_json_after = export_as_jupyter(obj_path)

                    env_meta_after = nb_json_after.get("metadata", {}).get(

                        "application/vnd.databricks.v1+notebook", {}

                    ).get("environmentMetadata")

                    del nb_json_after

 

                    if not env_meta_after or str(env_meta_after.get("environment_version")) != target_environment_version:

                        actual_ver = env_meta_after.get("environment_version"if env_meta_after else None

                        raise MigrationError(

                            operation="Post-import validation",

                            path=obj_path,

                            message=f"environment_version is '{actual_ver}' after import, expected '{target_environment_version}'"

                        )

                    if target_base_environment is not None and env_meta_after.get("base_environment") != target_base_environment:

                        actual_base = env_meta_after.get("base_environment")

                        raise MigrationError(

                            operation="Post-import validation",

                            path=obj_path,

                            message=f"base_environment is '{actual_base}' after import, expected '{target_base_environment}'"

                        )

                    del env_meta_after

 

                    duration_ms = int((time.time() - start_time) * 1000)

                    migration_log["results"][obj_path] = {

                        "status""success",

                        "env_before": env_before,

                        "env_after": env_after,

                        "duration_milliseconds": duration_ms,

                        "from_previous_results": False

                    }

                    save_log(migration_log, log_file)

                    print(f"  [SUCCESS] v{target_environment_version} metadata applied via JUPYTER (base_environment={base_env}). ({duration_ms}ms)")

                else:

                    del nb_json, db_meta

                    duration_ms = int((time.time() - start_time) * 1000)

                    print(f"  [DRY RUN] Target: Version {target_environment_version}, base_environment={base_env} (JUPYTER). ({duration_ms}ms)")

 

            else:

                # --- SOURCE path: PEP 723 header manipulation ---

                del nb_json, db_meta, env_meta

 

                # 3. Export in SOURCE format + compute SHA before

                export_s_res = make_request(

                    'GET'f"{DOMAIN}/api/2.0/workspace/export",

                    operation="Export notebook as SOURCE",

                    path=obj_path,

                    params={'path': obj_path, 'format''SOURCE'}

                )

                export_s = export_s_res.json()

                source_code = base64.b64decode(export_s['content']).decode('utf-8')

                language = export_s.get('language''PYTHON')

                export_s_res.close()

                del export_s_res

 

                source_sha_before = compute_source_sha(source_code)

 

                if not source_code.startswith(PREAMBLE):

                    raise MigrationError(

                        operation="Validate notebook preamble",

                        path=obj_path,

                        message=f"Notebook missing required preamble '{PREAMBLE}'"

                    )

 

                # 4. Strip existing PEP 723 blocks, then construct new header

                source_code = strip_pep723_blocks(source_code)

 

                pep723_header = f"""# /// script

# [tool.databricks.environment]

# base_environment = "{base_env}"

# environment_version = "{target_environment_version}"

# dependencies = {format_dependencies(deps)}

# ///

 

"""

                lines = source_code.splitlines()

                new_source = lines[0] + "\n" + pep723_header + "\n".join(lines[1:])

 

                del source_code, lines, export_s

 

                # 5. Import back

                if not dry_run:

                    encoded_content = base64.b64encode(new_source.encode('utf-8')).decode('utf-8')

                    payload = {

                        "path": obj_path,

                        "format""SOURCE",

                        "language": language,

                        "content": encoded_content,

                        "overwrite": True

                    }

 

                    import_res = make_request(

                        'POST'f"{DOMAIN}/api/2.0/workspace/import",

                        operation="Import updated notebook",

                        path=obj_path,

                        json=payload

                    )

                    import_res.close()

                    del encoded_content, payload, import_res, new_source

 

                    # 6. Validation — re-export as SOURCE and verify SHA

                    verify_s_res = make_request(

                        'GET'f"{DOMAIN}/api/2.0/workspace/export",

                        operation="Post-import SOURCE export for validation",

                        path=obj_path,

                        params={'path': obj_path, 'format''SOURCE'}

                    )

                    verify_s = verify_s_res.json()

                    source_after = base64.b64decode(verify_s['content']).decode('utf-8')

                    verify_s_res.close()

                    del verify_s_res, verify_s

 

                    source_sha_after = compute_source_sha(source_after)

                    del source_after

 

                    # Verify env metadata via JUPYTER

                    nb_json_after = export_as_jupyter(obj_path)

                    db_meta_after = nb_json_after.get("metadata", {}).get("application/vnd.databricks.v1+notebook", {})

                    env_meta_after = db_meta_after.get("environmentMetadata")

                    del nb_json_after, db_meta_after

 

                    if not env_meta_after or str(env_meta_after.get("environment_version")) != target_environment_version:

                        actual_ver = env_meta_after.get("environment_version"if env_meta_after else None

                        del env_meta_after

                        raise MigrationError(

                            operation="Post-import validation",

                            path=obj_path,

                            message=f"environment_version is '{actual_ver}' after import, expected '{target_environment_version}'"

                        )

 

                    if target_base_environment is not None and env_meta_after.get("base_environment") != target_base_environment:

                        actual_base = env_meta_after.get("base_environment")

                        del env_meta_after

                        raise MigrationError(

                            operation="Post-import validation",

                            path=obj_path,

                            message=f"base_environment is '{actual_base}' after import, expected '{target_base_environment}'"

                        )

                    del env_meta_after

 

                    # Verify source integrity

                    if source_sha_before != source_sha_after:

                        raise MigrationError(

                            operation="Post-import validation",

                            path=obj_path,

                            message=f"Source SHA mismatch! Before: {source_sha_before}, After: {source_sha_after}. Notebook content may have been corrupted."

                        )

 

                    duration_ms = int((time.time() - start_time) * 1000)

                    migration_log["results"][obj_path] = {

                        "status""success",

                        "env_before": env_before,

                        "env_after": env_after,

                        "source_sha_before": source_sha_before,

                        "source_sha_after": source_sha_after,

                        "duration_milliseconds": duration_ms,

                        "from_previous_results": False

                    }

                    save_log(migration_log, log_file)

                    print(f"  [SUCCESS] v{target_environment_version} metadata applied (base_environment={base_env}). Source SHA verified. ({duration_ms}ms)")

                else:

                    duration_ms = int((time.time() - start_time) * 1000)

                    print(f"  [DRY RUN] Target: Version {target_environment_version}, base_environment={base_env}. ({duration_ms}ms)")

                    del new_source

 

        except MigrationError as e:

            duration_ms = int((time.time() - start_time) * 1000)

            migration_log["results"][obj_path] = {

                "status""failed",

                "error"str(e),

                "duration_milliseconds": duration_ms,

                "from_previous_results": False

            }

            save_log(migration_log, log_file)

 

            print(f"\n{'=' * 60}")

            print(f"[FATAL] Migration failed.")

            print(f"        Notebook: {obj_path}")

            print(f"        Operation: {e.operation}")

            print(f"        Error: {e}")

            print(f"")

            print(f"Results saved to: {log_file}")

            print(f"")

            print(f"To resume after fixing the issue, run:")

            print(f"  python large_scale_environment_migration.py \\")

            print(f"    --source_path '{source_path}' \\")

            print(f"    --environment_version '{target_environment_version}' \\")

            if target_base_environment is not None:

                print(f"    --base_environment '{target_base_environment}' \\")

            print(f"    --target_format '{target_format}' \\")

            print(f"    --resume_with_results true \\")

            print(f"    --previous_results_path {log_file}")

            print(f"{'=' * 60}")

            sys.exit(1)

 

        except Exception as e:

            duration_ms = int((time.time() - start_time) * 1000)

            migration_log["results"][obj_path] = {

                "status""failed",

                "error"f"Unexpected error: {e}",

                "duration_milliseconds": duration_ms,

                "from_previous_results": False

            }

            save_log(migration_log, log_file)

 

            print(f"\n{'=' * 60}")

            print(f"[FATAL] Unexpected error during migration.")

            print(f"        Notebook: {obj_path}")

            print(f"        Error: {e}")

            print(f"")

            print(f"Results saved to: {log_file}")

            print(f"")

            print(f"To resume after fixing the issue, run:")

            print(f"  python large_scale_environment_migration.py \\")

            print(f"    --source_path '{source_path}' \\")

            print(f"    --environment_version '{target_environment_version}' \\")

            if target_base_environment is not None:

                print(f"    --base_environment '{target_base_environment}' \\")

            print(f"    --target_format '{target_format}' \\")

            print(f"    --resume_with_results true \\")

            print(f"    --previous_results_path {log_file}")

            print(f"{'=' * 60}")

            sys.exit(1)

 

        # Periodic garbage collection

        notebook_counter += 1

        if notebook_counter % GC_INTERVAL == 0:

            gc.collect()

            print(f"  [GC] Memory cleanup after {notebook_counter} notebooks.")

 

    return notebook_counter

 

 

# --- Entry Point ---

 

def main():

    parser = argparse.ArgumentParser(

        description="Migrate Databricks notebook environments to a target base_environment and environment_version (PEP 723)."

    )

    parser.add_argument(

        '--source_path',

        type=str,

        required=True,

        help="Databricks workspace folder path to scan recursively (e.g. '/Users/user@email.com/')."

    )

    parser.add_argument(

        '--environment_version',

        type=str,

        required=True,

        help="Target environment_version to set on all notebooks (e.g. '4')."

    )

    parser.add_argument(

        '--base_environment',

        type=str,

        default=None,

        help="Target base_environment to set on all notebooks. If omitted, each notebook keeps its existing base_environment. "

             "For workspace base environments (WBE), the ID can be found in the workspace UI: "

             "Settings > Workspace admin > Compute > three dots in the workspace base environment row > Copy ID. "

             "Format: 'workspace-base-environments/dbe_<uuid>'."

    )

    parser.add_argument(

        '--target_format',

        type=str,

        choices=['SOURCE''JUPYTER'],

        default='SOURCE',

        help="Format to use for importing notebooks. SOURCE (default) uses PEP 723 headers; JUPYTER patches notebook metadata directly."

    )

    parser.add_argument(

        '--dry_run',

        type=lambda v: v.lower() in ('true''1''yes'),

        default=True,

        help='Preview changes without writing back to the workspace (true/false). Default: true.'

    )

    parser.add_argument(

        '--resume_with_results',

        type=lambda v: v.lower() in ('true''1''yes'),

        default=False,

        help='Resume migration using results from a previous run (true/false). Default: false.'

    )

    parser.add_argument(

        '--previous_results_path',

        type=str,

        default=None,

        help='Path to the JSON log file from a previous run (required when --resume_with_results is set).'

    )

    args = parser.parse_args()

 

    # Validate resume args

    if args.resume_with_results and not args.previous_results_path:

        parser.error("--previous_results_path is required when --resume_with_results is set.")

 

    if args.previous_results_path and not args.resume_with_results:

        parser.error("--resume_with_results must be set when --previous_results_path is provided.")

 

    # Load previous results if resuming

    previous_results = None

    if args.resume_with_results:

        previous_results = load_previous_results(args.previous_results_path)

 

    # Initialize migration log

    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

    log_file = f"migration_log_{timestamp}.json"

    migration_log = {

        "started_at": datetime.datetime.now().isoformat(),

        "source_path": args.source_path,

        "target_environment_version": args.environment_version,

        "target_base_environment": args.base_environment,

        "target_format": args.target_format,

        "domain": DOMAIN,

        "org_id": ORG_ID,

        "dry_run": args.dry_run,

        "results": {}

    }

 

    print(f"--- Running Environment Migration (Dry Run: {args.dry_run}) ---")

    print(f"--- Target: environment_version={args.environment_version}, base_environment={args.base_environment or '(keep existing)'}, format={args.target_format} ---")

    if args.resume_with_results:

        print(f"--- Resuming with previous results from: {args.previous_results_path} ---")

    print(f"--- Log file: {log_file} ---\n")

 

    try:

        process_hybrid_upgrade(

            args.source_path, args.dry_run, migration_log, log_file, previous_results, 0,

            args.base_environment, args.environment_version, args.target_format

        )

    except SystemExit:

        raise

    except Exception as e:

        save_log(migration_log, log_file)

        print(f"\n{'=' * 60}")

        print(f"[FATAL] Unexpected error: {e}")

        print(f"")

        print(f"Results saved to: {log_file}")

        print(f"")

        print(f"To resume after fixing the issue, run:")

        print(f"  python large_scale_environment_migration.py \\")

        print(f"    --source_path '{args.source_path}' \\")

        print(f"    --environment_version '{args.environment_version}' \\")

        if args.base_environment is not None:

            print(f"    --base_environment '{args.base_environment}' \\")

        print(f"    --target_format '{args.target_format}' \\")

        print(f"    --resume_with_results true \\")

        print(f"    --previous_results_path {log_file}")

        print(f"{'=' * 60}")

        sys.exit(1)

    finally:

        session.close()

        gc.collect()

 

    # Final save and summary

    save_log(migration_log, log_file)

    print_summary(migration_log)

    print(f"\nResults saved to: {log_file}")

 

 

if __name__ == "__main__":

    main()

 

 
 

This script recursively scans a Databricks workspace folder and migrates all notebooks to a target environment_version and optionally a target base_environment. It supports two import formats: SOURCE (PEP 723 headers) and JUPYTER (metadata patching). The script skips notebooks already at the target state, supports resuming from a previous run's log file, saves results after each notebook for crash recovery, and includes retry with exponential backoff for transient HTTP errors. SHA-256 validation ensures notebook content is not corrupted during migration. Dry-run mode is enabled by default.