Skip to content

API

MigrationResult dataclass

Result of a content migration operation.

Parameters:

Name Type Description Default
migrated int

Number of items successfully cloned to the destination portal.

0
skipped int

Number of items skipped because they were already present (resume mode).

0
failed int

Number of items that could not be cloned due to errors.

0
failures list[dict]

List of failure records, each containing item_id, title, type, and error keys.

list()
Source code in src/arcgis_cloning/_main.py
@dataclass
class MigrationResult:
    """Result of a content migration operation.

    Args:
        migrated: Number of items successfully cloned to the destination portal.
        skipped: Number of items skipped because they were already present (resume mode).
        failed: Number of items that could not be cloned due to errors.
        failures: List of failure records, each containing ``item_id``, ``title``,
            ``type``, and ``error`` keys.
    """

    migrated: int = 0
    skipped: int = 0
    failed: int = 0
    failures: list[dict] = field(default_factory=list)

migrate_content(source_gis=None, destination_gis=None, source_env='source', destination_env='destination', resume=False, query=None, max_items=None, url_csv_path=None)

Migrate ArcGIS portal content from a source portal to a destination portal.

Connects to both portals, discovers all matching source items, and clones each one to the destination while mirroring the source folder structure. Per-item clone failures are recorded in the returned MigrationResult and do not halt the migration.

Note

Resume mode compares items by (title, type) only. Items with the same title but a different type are treated as absent and will be cloned. A stable ID-based mapping is deferred to a future version.

result = migrate_content(resume=True, query="owner:myuser")
print(f"Migrated: {result.migrated}, Skipped: {result.skipped}, Failed: {result.failed}")

Parameters:

Name Type Description Default
source_gis GIS | None

Pre-built authenticated GIS object for the source portal. When None, credentials are loaded from secrets.yml using source_env.

None
destination_gis GIS | None

Pre-built authenticated GIS object for the destination portal. When None, credentials are loaded from secrets.yml using destination_env.

None
source_env str

Key name in secrets.yml for the source portal credentials. Defaults to "source". Ignored when source_gis is provided.

'source'
destination_env str

Key name in secrets.yml for the destination portal credentials. Defaults to "destination". Ignored when destination_gis is provided.

'destination'
resume bool

When True, items already present in the destination (matched by title and type) are skipped. Defaults to False (fresh-run mode always clones).

False
query str | None

Item search query string passed to gis.content.search(). None returns all items accessible to the authenticated source user.

None
max_items int | None

Optional cap on the number of source items to process. None means no limit. Primarily intended for safety checks and testing.

None
url_csv_path Path | str | None

Optional path for a CSV file that records the original and updated URLs of every successfully migrated item. When None (default), no CSV is written. The CSV contains columns name, type, original_url, and updated_url. The parent directory must already exist.

None

Returns:

Name Type Description
MigrationResult MigrationResult

Dataclass with migrated, skipped, failed, and failures fields summarising the migration outcome.

Raises:

Type Description
ValueError

If the source and destination portal URLs are identical, or if url_csv_path is provided but its parent directory does not exist.

KeyError

If credentials for source_env or destination_env are missing from secrets.yml when a pre-built GIS object is not supplied.

RuntimeError

If the connection to either portal fails.

Source code in src/arcgis_cloning/_main.py
def migrate_content(
    source_gis: GIS | None = None,
    destination_gis: GIS | None = None,
    source_env: str = "source",
    destination_env: str = "destination",
    resume: bool = False,
    query: str | None = None,
    max_items: int | None = None,
    url_csv_path: Path | str | None = None,
) -> MigrationResult:
    """Migrate ArcGIS portal content from a source portal to a destination portal.

    Connects to both portals, discovers all matching source items, and clones each one
    to the destination while mirroring the source folder structure. Per-item clone
    failures are recorded in the returned ``MigrationResult`` and do not halt the
    migration.

    !!! note
        Resume mode compares items by ``(title, type)`` only. Items with the same title
        but a different type are treated as absent and will be cloned. A stable ID-based
        mapping is deferred to a future version.

    ```python
    result = migrate_content(resume=True, query="owner:myuser")
    print(f"Migrated: {result.migrated}, Skipped: {result.skipped}, Failed: {result.failed}")
    ```

    Args:
        source_gis: Pre-built authenticated ``GIS`` object for the source portal. When
            ``None``, credentials are loaded from ``secrets.yml`` using ``source_env``.
        destination_gis: Pre-built authenticated ``GIS`` object for the destination
            portal. When ``None``, credentials are loaded from ``secrets.yml`` using
            ``destination_env``.
        source_env: Key name in ``secrets.yml`` for the source portal credentials.
            Defaults to ``"source"``. Ignored when ``source_gis`` is provided.
        destination_env: Key name in ``secrets.yml`` for the destination portal
            credentials. Defaults to ``"destination"``. Ignored when ``destination_gis``
            is provided.
        resume: When ``True``, items already present in the destination (matched by
            title and type) are skipped. Defaults to ``False`` (fresh-run mode always
            clones).
        query: Item search query string passed to ``gis.content.search()``. ``None``
            returns all items accessible to the authenticated source user.
        max_items: Optional cap on the number of source items to process. ``None`` means
            no limit. Primarily intended for safety checks and testing.
        url_csv_path: Optional path for a CSV file that records the original and updated
            URLs of every successfully migrated item. When ``None`` (default), no CSV is
            written. The CSV contains columns ``name``, ``type``, ``original_url``, and
            ``updated_url``. The parent directory must already exist.

    Returns:
        MigrationResult: Dataclass with ``migrated``, ``skipped``, ``failed``, and
            ``failures`` fields summarising the migration outcome.

    Raises:
        ValueError: If the source and destination portal URLs are identical, or if
            ``url_csv_path`` is provided but its parent directory does not exist.
        KeyError: If credentials for ``source_env`` or ``destination_env`` are missing
            from ``secrets.yml`` when a pre-built ``GIS`` object is not supplied.
        RuntimeError: If the connection to either portal fails.
    """
    logger.debug(
        f"migrate_content called: source_env={source_env!r}, destination_env={destination_env!r}, "
        f"resume={resume}, query={query!r}, max_items={max_items}, "
        f"url_csv_path={url_csv_path!r}, "
        f"source_gis={'<provided>' if source_gis is not None else None}, "
        f"destination_gis={'<provided>' if destination_gis is not None else None}"
    )

    result = MigrationResult()

    # --- Pre-flight: connect to both portals ---
    src_gis = _connect_gis(source_env, source_gis)
    dest_gis = _connect_gis(destination_env, destination_gis)

    # --- Pre-flight: reject identical source/destination ---
    if src_gis.url == dest_gis.url:
        msg = (
            f"Source and destination portals share the same URL: '{src_gis.url}'. "
            "Migration aborted to prevent unintended self-cloning."
        )
        logger.critical(msg)
        raise ValueError(msg)

    # --- Pre-flight: validate CSV output directory exists ---
    if url_csv_path is not None:
        csv_path = Path(url_csv_path)
        if not csv_path.parent.exists():
            msg = f"CSV output directory does not exist: '{csv_path.parent}'"
            logger.error(msg)
            raise ValueError(msg)

    # --- Discover source items ---
    items = _get_all_items(src_gis, query, max_items)
    total = len(items)

    logger.info(
        f"Migration started | source={src_gis.url} | destination={dest_gis.url} | "
        f"resume={resume} | query={query!r} | items_found={total}"
    )

    if total == 0:
        logger.warning(
            f"No items found in source portal for query={query!r}. Nothing to migrate."
        )
        if url_csv_path is not None:
            csv_path = Path(url_csv_path)
            logger.debug(f"Writing empty URL mapping CSV: {csv_path} (0 rows)")
            pd.DataFrame(columns=["name", "type", "original_url", "updated_url"]).to_csv(
                csv_path, index=False
            )
            logger.info(f"URL mapping CSV written: {csv_path}")
        return result

    # --- Resume: build destination index ---
    dest_index: set[tuple[str, str]] = set()
    if resume:
        dest_index = _build_dest_index(dest_gis)
        logger.debug(f"Resume mode active: {len(dest_index)} items already in destination")

    src_folder_map = {f.id: f.title for f in src_gis.users.me.folders}
    logger.debug(f"Source folder map built: {len(src_folder_map)} folders")

    # --- Per-item migration loop with progress tracking ---
    url_rows: list[dict] = []
    for n, item in enumerate(tqdm(items, total=total, desc="Migrating items"), start=1):
        title = item.title
        item_type = item.type
        item_id = item.itemid

        logger.debug(
            f"Processing item {n}/{total}: '{title}' ({item_type}, id={item_id})"
        )

        # Skip already-present items in resume mode
        if resume and (title, item_type) in dest_index:
            logger.info(f"Skipping already-present item: {title} ({item_type})")
            result.skipped += 1
            continue

        # Mirror source folder structure
        folder_name = _resolve_folder_name(item, src_folder_map)
        _ensure_folder(dest_gis, folder_name)

        logger.info(f"Migrating item {n} of {total}: {title} ({item_type})")
        t0 = time.perf_counter()
        try:
            cloned = dest_gis.content.clone_items(items=[item], folder=folder_name)
            # TODO (post-clone WARNING): compare cloned item properties against source
            elapsed = time.perf_counter() - t0
            logger.debug(f"Cloned '{title}' ({item_type}) in {elapsed:.2f}s")
            dest_url = cloned[0].url if cloned else None
            url_rows.append({
                "name": title,
                "type": item_type,
                "original_url": item.url or "",
                "updated_url": dest_url or "",
            })
            result.migrated += 1
        except Exception as e:
            msg = f"Failed to clone '{title}' ({item_type}, id={item_id}): {e}"
            logger.error(msg)
            result.failed += 1
            result.failures.append(
                {"item_id": item_id, "title": title, "type": item_type, "error": str(e)}
            )

    logger.info(
        f"Migration complete | migrated={result.migrated} | "
        f"skipped={result.skipped} | failed={result.failed}"
    )

    # --- Write URL mapping CSV if requested ---
    if url_csv_path is not None:
        csv_path = Path(url_csv_path)
        logger.debug(f"Writing URL mapping CSV: {csv_path} ({len(url_rows)} rows)")
        pd.DataFrame(url_rows, columns=["name", "type", "original_url", "updated_url"]).to_csv(
            csv_path, index=False
        )
        logger.info(f"URL mapping CSV written: {csv_path}")

    return result

Module arcgis_cloning.utils

format_df_for_logging(pandas_df, title, line_tab_prefix='\t\t')

Helper function facilitating outputting a Pandas DataFrame into a logfile. This function only formats the data frame into text for output. It should be used in conjunction with a logging method.

logging.info(format_df_for_logging(df, title='Summary Statistics'))

Parameters:

Name Type Description Default
pandas_df DataFrame

Pandas DataFrame to be converted to a string and included in the logfile.

required
title str

String title describing the data frame.

required
line_tab_prefix str

Optional string comprised of tabs (\t\t) to prefix each line with providing indentation.

'\t\t'
Source code in src/arcgis_cloning/utils/_logging.py
def format_df_for_logging(
    pandas_df: "pd.DataFrame", title: str, line_tab_prefix: str="\t\t"
) -> str:
    """
    Helper function facilitating outputting a Pandas DataFrame into a logfile. This function only
        formats the data frame into text for output. It should be used in conjunction with a logging method.

    ``` python
    logging.info(format_df_for_logging(df, title='Summary Statistics'))
    ```

    Args:
        pandas_df: Pandas DataFrame to be converted to a string and included in the logfile.
        title: String title describing the data frame.
        line_tab_prefix: Optional string comprised of tabs (``\\t\\t``) to prefix each line with providing indentation.
    """
    if find_spec('pandas') is None:
        raise ImportError("Pandas is required to use 'format_df_for_logging'.")

    # late import to avoid issues in non-Pandas environments
    import pandas as pd

    # ensure proper type
    if not isinstance(pandas_df, pd.DataFrame):
        raise TypeError("The 'pandas_df' argument must be a Pandas DataFrame.")

    # format the data frame to a string with each line prefixed by the provided tab prefix
    log_str = line_tab_prefix.join(pandas_df.to_string(index=False).splitlines(True))

    # add title
    log_str = f"{title}:\n{line_tab_prefix}{log_str}"

    return log_str

get_logger(logger_name=None, level='INFO', logfile_path=None, log_format='%(asctime)s | %(name)s | %(levelname)s | %(message)s', propagate=True, add_stream_handler=False, add_arcpy_handler=False)

Get Python logging.Logger configured to provide stream, file or, if available, ArcPy output. The way the method is set up, logging will be routed through ArcPy messaging using ArcpyHandler if ArcPy is available. If ArcPy is not available, messages will be sent to the console using a logging.StreamHandler. Next, if the logfile_path is provided, log messages will also be written to the provided path to a logfile using a logging.FileHandler.

Valid log_level inputs include: * DEBUG - Detailed information, typically of interest only when diagnosing problems. * INFO - Confirmation that things are working as expected. * WARNING or WARN - An indication that something unexpected happened, or indicative of some problem in the near future (e.g. "disk space low"). The software is still working as expected. * ERROR - Due to a more serious problem, the software has not been able to perform some function. * CRITICAL - A serious error, indicating that the program itself may be unable to continue running.

Note

Logging levels can be provided as strings (e.g. 'DEBUG'), corresponding integer values or using the logging module constants (e.g. logging.DEBUG).

Parameters:

Name Type Description Default
logger_name Optional[str]

Name of the logger. If None, the root logger is used.

None
level Optional[Union[str, int]]

Logging level to use. Default is INFO.

'INFO'
log_format Optional[str]

Format string for the logging messages. Default is '%(asctime)s | %(name)s | %(levelname)s | %(message)s'.

'%(asctime)s | %(name)s | %(levelname)s | %(message)s'
propagate bool

If True, log messages are passed to the handlers of ancestor loggers. Default is False.

True
logfile_path Optional[Union[Path, str]]

Where to save the logfile if file output is desired.

None
add_stream_handler bool

If True, add a StreamHandler to route logging to the console. Default is False.

False
add_arcpy_handler bool

If True and ArcPy is available, add the ArcpyHandler to route logging through ArcPy messaging. Default is False.

False
logger = get_logger('DEBUG')
logger.debug('nauseatingly detailed debugging message')
logger.info('something actually useful to know')
logger.warning('The sky may be falling')
logger.error('The sky is falling.')
logger.critical('The sky appears to be falling because a giant meteor is colliding with the earth.')
Source code in src/arcgis_cloning/utils/_logging.py
def get_logger(
    logger_name: Optional[str] = None,
    level: Optional[Union[str, int]] = "INFO",
    logfile_path: Optional[Union[Path, str]] = None,
    log_format: Optional[str] = "%(asctime)s | %(name)s | %(levelname)s | %(message)s",
    propagate: bool = True,
    add_stream_handler: bool = False,
    add_arcpy_handler: bool = False,
) -> logging.Logger:
    """
    Get Python `logging.Logger` configured to provide stream, file or, if available, ArcPy output.
    The way the method is set up, logging will be routed through ArcPy messaging using `ArcpyHandler` if
    ArcPy is available. If ArcPy is *not* available, messages will be sent to the console using a
    `logging.StreamHandler`. Next, if the `logfile_path` is provided, log messages will also
    be written to the provided path to a logfile using a `logging.FileHandler`.

    Valid `log_level` inputs include:
    * `DEBUG` - Detailed information, typically of interest only when diagnosing problems.
    * `INFO` - Confirmation that things are working as expected.
    * `WARNING` or ``WARN`` -  An indication that something unexpected happened, or indicative of some problem in the
        near future (e.g. "disk space low"). The software is still working as expected.
    * `ERROR` - Due to a more serious problem, the software has not been able to perform some function.
    * `CRITICAL` - A serious error, indicating that the program itself may be unable to continue running.

    !!! note

        Logging levels can be provided as strings (e.g. `'DEBUG'`), corresponding integer values or using the
        logging module constants (e.g. `logging.DEBUG`).

    Args:
        logger_name: Name of the logger. If `None`, the root logger is used.
        level: Logging level to use. Default is INFO.
        log_format: Format string for the logging messages. Default is `'%(asctime)s | %(name)s | %(levelname)s | %(message)s'`.
        propagate: If `True`, log messages are passed to the handlers of ancestor loggers. Default is `False`.
        logfile_path: Where to save the logfile if file output is desired.
        add_stream_handler: If `True`, add a `StreamHandler` to route logging to the console. Default is `False`.
        add_arcpy_handler: If `True` and ArcPy is available, add the `ArcpyHandler` to route logging through
            ArcPy messaging. Default is `False`.

    ``` python
    logger = get_logger('DEBUG')
    logger.debug('nauseatingly detailed debugging message')
    logger.info('something actually useful to know')
    logger.warning('The sky may be falling')
    logger.error('The sky is falling.')
    logger.critical('The sky appears to be falling because a giant meteor is colliding with the earth.')
    ```

    """
    # ensure valid logging level
    log_str_lst = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "WARN", "FATAL"]
    log_int_lst = [0, 10, 20, 30, 40, 50]

    if not isinstance(level, (str, int)):
        raise ValueError(
            "You must define a specific logging level for log_level as a string or integer."
        )
    elif isinstance(level, str) and level not in log_str_lst:
        raise ValueError(
            f'The log_level must be one of {log_str_lst}. You provided "{level}".'
        )
    elif isinstance(level, int) and level not in log_int_lst:
        raise ValueError(
            f"If providing an integer for log_level, it must be one of the following, {log_int_lst}."
        )

    # get default logger and set logging level at the same time
    logger = logging.getLogger(logger_name)
    logger.setLevel(level=level)

    # configure formatting
    log_frmt = logging.Formatter(log_format)

    # set propagation
    logger.propagate = propagate

    # add or update stream handler if requested
    if add_stream_handler:

        # get existing stream hander if one in the logger
        stream_handler = next(
            (h for h in logger.handlers if isinstance(h, logging.StreamHandler)),
            None
        )

        # if no stream handler exists, create one and add it to the logger
        if stream_handler is None:
            stream_handler = logging.StreamHandler()
            logger.addHandler(stream_handler)

        # set the formatter for the stream handler
        stream_handler.setFormatter(log_frmt)


    # if in an environment with ArcPy, and desired, add handler to bubble logging up to ArcGIS through ArcPy
    if find_spec("arcpy") is not None and add_arcpy_handler:
        ah = ArcpyHandler()
        ah.setFormatter(log_frmt)
        logger.addHandler(ah)

    # if a path for the logfile is provided, log results to the file
    if logfile_path is not None:
        # ensure the full path exists
        if not logfile_path.parent.exists():
            logfile_path.parent.mkdir(parents=True)

        # create and add the file handler
        fh = logging.FileHandler(str(logfile_path))
        fh.setFormatter(log_frmt)
        logger.addHandler(fh)

    return logger