Skip to content

Python API

get_features(output_feature_class, overture_type, bbox, connect_timeout=None, request_timeout=None)

Retrieve data from Overture Maps and save it as an ArcGIS Feature Class.

Parameters:

Name Type Description Default
output_feature_class Union[str, Path]

Path to the output feature class.

required
overture_type str

Overture feature type to retrieve.

required
bbox tuple[float, float, float, float]

Bounding box to filter the data. Format: (minx, miny, maxx, maxy).

required
connect_timeout int

Optional timeout in seconds for establishing a connection to the AWS S3.

None
request_timeout int

Optional timeout in seconds for waiting for a response from the AWS S3.

None

Returns:

Type Description
Path

Path to the created feature class.

Source code in src/overture_to_arcgis/_main.py
def get_features(
    output_feature_class: Union[str, Path],
    overture_type: str,
    bbox: tuple[float, float, float, float],
    connect_timeout: int = None,
    request_timeout: int = None,
) -> Path:
    """
    Retrieve data from Overture Maps and save it as an ArcGIS Feature Class.

    Args:
        output_feature_class: Path to the output feature class.
        overture_type: Overture feature type to retrieve.
        bbox: Bounding box to filter the data. Format: (minx, miny, maxx, maxy).
        connect_timeout: Optional timeout in seconds for establishing a connection to the AWS S3.
        request_timeout: Optional timeout in seconds for waiting for a response from the AWS S3.

    Returns:
        Path to the created feature class.
    """
    # ensure arcpy is available
    if find_spec("arcpy") is None:
        raise EnvironmentError("ArcPy is required for get_as_feature_class.")

    # validate the bounding box
    bbox = validate_bounding_box(bbox)

    # get a temporary geodatabase to hold the batch feature classes
    tmp_gdb = get_temp_gdb()

    # list to hold the feature classes
    fc_list = []

    # get the record batch generator
    batches = get_record_batches(overture_type, bbox, connect_timeout, request_timeout)

    # iterate through the record batches to see if we have any data
    for btch_idx, batch in enumerate(batches):
        # skip empty batches
        if batch.num_rows == 0:
            continue

        # if there is data to work with, process it
        else:
            # report progress
            if logger.level <= logging.DEBUG:
                tbl_cnt = batch.num_rows
                logger.debug(
                    f"In batch {btch_idx:,} fetched {tbl_cnt:,} rows of '{overture_type}' data from Overture Maps."
                )

            # create the temporary feature class path
            tmp_fc = tmp_gdb / f"overture_{overture_type}_{btch_idx:04d}"

            # convert the batch to a feature class
            table_to_features(batch, output_features=tmp_fc)

            # add the feature class to the list if there is data to work with
            fc_list.append(str(tmp_fc))

    # merge the feature classes into a single feature class if any data was found
    if len(fc_list) > 0:
        arcpy.management.Merge(fc_list, str(output_feature_class))
    else:
        logger.warning(
            "No data found for the specified bounding box. No output feature class created."
        )

    # cleanup temporary data - remove temporary geodatabase using arcpy to avoid any locks
    arcpy.management.Delete(str(tmp_gdb))

    return output_feature_class

get_spatially_enabled_dataframe(overture_type, bbox, connect_timeout=None, request_timeout=None)

Retrieve data from Overture Maps as an ArcGIS spatially enabled Pandas DataFrame.

Note

To see available overture types, use arcgis_overture.utils.get_all_overture_types().

Parameters:

Name Type Description Default
overture_type str

Overture feature type to retrieve.

required
bbox tuple[float, float, float, float]

Bounding box to filter the data. Format: (minx, miny, maxx, maxy).

required
connect_timeout int

Optional timeout in seconds for establishing a connection to the Overture Maps service.

None
request_timeout int

Optional timeout in seconds for waiting for a response from the Overture Maps service.

None

Returns:

Type Description
DataFrame

A spatially enabled pandas DataFrame containing the requested Overture Maps data.

Source code in src/overture_to_arcgis/_main.py
def get_spatially_enabled_dataframe(
    overture_type: str,
    bbox: tuple[float, float, float, float],
    connect_timeout: int = None,
    request_timeout: int = None,
) -> pd.DataFrame:
    """
    Retrieve data from Overture Maps as an
    [ArcGIS spatially enabled Pandas DataFrame](https://developers.arcgis.com/python/latest/guide/introduction-to-the-spatially-enabled-dataframe/).

    !!! note

        To see available overture types, use `arcgis_overture.utils.get_all_overture_types()`.

    Args:
        overture_type: Overture feature type to retrieve.
        bbox: Bounding box to filter the data. Format: (minx, miny, maxx, maxy).
        connect_timeout: Optional timeout in seconds for establishing a connection to the Overture Maps service.
        request_timeout: Optional timeout in seconds for waiting for a response from the Overture Maps service.

    Returns:
        A spatially enabled pandas DataFrame containing the requested Overture Maps data.
    """
    # validate the overture type
    available_types = get_all_overture_types()
    if overture_type not in available_types:
        raise ValueError(
            f"Invalid overture type: {overture_type}. Valid types are: {available_types}"
        )

    # validate the bounding box
    bbox = validate_bounding_box(bbox)

    # get the record batch generator
    batches = get_record_batches(overture_type, bbox, connect_timeout, request_timeout)

    # initialize the dataframe and geometry column name
    df = None

    # iterate the batches
    for idx, batch in enumerate(batches):
        # if the batch has any rows and the dataframe is not yet initialized
        if batch.num_rows > 0 and df is None:
            # create the initial dataframe
            df = table_to_spatially_enabled_dataframe(batch)

            # save the geometry column name
            geom_col = df.spatial.name

        elif batch.num_rows > 0:
            # get the batch as a spatially enabled dataframe
            tmb_df = table_to_spatially_enabled_dataframe(batch)

            # append the batch dataframe to the main dataframe
            df = pd.concat([df, tmb_df], ignore_index=True)

    # if data found, perform post processing
    if isinstance(df, pd.DataFrame):
        # set the geometry column using the ArcGIS GeoAccessor to get a Spatially Enabled DataFrame
        df.spatial.set_geometry(geom_col, sr=4326, inplace=True)

        # reset the index so it makes sense after concatenation
        df.reset_index(drop=True, inplace=True)

        # log the number of rows fetched
        df_cnt = df.shape[0]
        logger.debug(
            f"Fetched {df_cnt} rows of '{overture_type}' data from Overture Maps."
        )

    # if no data found, log a warning and create an empty dataframe to return
    else:
        df = pd.DataFrame()
        logger.warning(
            f"No '{overture_type}' data found for the specified bounding box: {bbox}"
        )

    return df

Module overture_to_arcgis.utils

add_alternate_category_field(features)

Add an 'alternate_category' field to the input features and populate it from the 'categories' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis_fields.py
def add_alternate_category_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add an 'alternate_category' field to the input features and populate it from the 'categories' field.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # check if 'alternate_category' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]

    # add 'alternate_category' field
    if "alternate_category" not in field_names:
        arcpy.management.AddField(
            in_table=features,
            field_name="alternate_category",
            field_type="TEXT",
            field_length=255,
        )
        logger.debug("Added 'alternate_category' field to features.")

    # calculate 'alternate_category' from 'categories' field
    with arcpy.da.UpdateCursor(
        features, ["categories", "alternate_category"]
    ) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the categories value and extract alternate category
            categories_value = row[0]

            # set the alternate category if categories_value is valid
            if (
                categories_value is not None
                and isinstance(categories_value, str)
                and len(categories_value) > 0
                and not categories_value.strip() == "None"
                and not categories_value.strip().lower() == "null"
            ):
                # parse the categories value into a dictionary
                categories_dict = json.loads(categories_value)

                # extract the alternate category
                alternate_category = categories_dict.get("alternate")

                # convert to string if it is a list
                if isinstance(alternate_category, list):
                    alternate_category = ", ".join(alternate_category)

                # ensure the alternate category is not some variation of None
                if alternate_category in [None, "None", "none", ""]:
                    alternate_category = None

                # set the alternate category in the row
                row[1] = alternate_category

                # update the row
                update_cursor.updateRow(row)

add_boolean_access_restrictions_fields(features, access_field='access_restrictions', remove_original_field=False)

Add boolean access restriction fields to the input features based on the access_restrictions field.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
access_field str

The name of the access restrictions field.

'access_restrictions'
remove_original_field Optional[bool]

When True, the access_field field is deleted from the feature class after the boolean fields have been populated.

False
Source code in src/overture_to_arcgis/utils/_arcgis_access.py
def add_boolean_access_restrictions_fields(
    features: Union[str, Path, arcpy._mp.Layer],
    access_field: str = "access_restrictions",
    remove_original_field: Optional[bool] = False,
) -> None:
    """
    Add boolean access restriction fields to the input features based on the access_restrictions field.

    Args:
        features: The input feature layer or feature class.
        access_field: The name of the access restrictions field.
        remove_original_field: When ``True``, the ``access_field``
            field is deleted from the feature class after the boolean
            fields have been populated.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # ensure the features exist
    if not arcpy.Exists(features):
        raise ValueError("Input features do not exist.")

    # first pass to collect all unique keys
    unique_keys = set()
    with arcpy.da.SearchCursor(features, [access_field]) as cursor:
        for row in cursor:
            if row[0] is not None:
                bool_dict = flatten_dict_to_bool_keys(row[0])
                unique_keys.update(bool_dict.keys())

    # create a list of fields to add
    add_fields = sorted([[slugify(key), "SHORT"] for key in unique_keys])

    # add fields to feature class
    arcpy.management.AddFields(features, add_fields)

    logger.info(
        "Added boolean access restriction fields to features: "
        + ", ".join([f[0] for f in add_fields])
    )

    # second pass to populate the fields
    field_names = [slugify(key) for key in unique_keys]

    with arcpy.da.UpdateCursor(features, [access_field] + field_names) as cursor:
        for row in cursor:
            bool_dict = {}
            if row[0] is not None:
                bool_dict = flatten_dict_to_bool_keys(row[0])
            for idx, key in enumerate(unique_keys):
                row[idx + 1] = bool_dict.get(key, 0)
            cursor.updateRow(row)

    # remove the original access restrictions field if requested
    if remove_original_field:
        arcpy.management.DeleteField(features, [access_field])
        logger.debug(
            f"Removed '{access_field}' field from features."
        )

add_h3_indices(features, resolution=9, h3_field=None)

Add an H3 index field to the input features based on their geometry.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
resolution int

The H3 resolution to use for indexing.

9
h3_field Optional[str]

The name of the H3 index field to add.

None
Source code in src/overture_to_arcgis/utils/_arcgis_fields.py
def add_h3_indices(
    features: Union[str, Path, arcpy._mp.Layer],
    resolution: int = 9,
    h3_field: Optional[str] = None,
) -> None:
    """
    Add an H3 index field to the input features based on their geometry.

    Args:
        features: The input feature layer or feature class.
        resolution: The H3 resolution to use for indexing.
        h3_field: The name of the H3 index field to add.
    """
    if find_spec("h3") is None:
        raise ImportError(
            "The 'h3' library is not installed. Please install it to use this function."
        )

    import h3

    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # validate resolution
    if not isinstance(resolution, int) or not (0 <= resolution <= 15):
        raise ValueError(
            "Invalid H3 resolution. Please choose a resolution between 0 and 15."
        )

    # if h3_field is None, set to default
    if h3_field is None:
        h3_field = f"h3_{resolution:02d}"

    # check if h3_field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if h3_field not in field_names:
        # add h3_field
        arcpy.management.AddField(
            in_table=features,
            field_name=h3_field,
            field_type="TEXT",
            field_length=20,
        )

        logger.debug(f"Added '{h3_field}' field to features.")

    # calculate H3 indices from geometry
    with arcpy.da.UpdateCursor(features, ["SHAPE@XY", h3_field]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the geometry coordinates
            x, y = row[0]

            # get the H3 index for the centroid
            h3_index = h3.latlng_to_cell(y, x, resolution)

            # set the H3 index in the row
            row[1] = h3_index

            # update the row
            update_cursor.updateRow(row)

add_impedance_column(edge_features, modality_prefix, coefficients=None)

Add impedance columns to the edge features for routing.

Parameters:

Name Type Description Default
edge_features Union[str, Path, Layer]

The input line feature layer or feature class.

required
modality_prefix str

The modality name (e.g. "walk", "bike"). Must be a key in _IMPEDANCE_REGISTRY unless coefficients is supplied.

required
coefficients Optional[dict]

Optional coefficient table override. When provided, the registry is not consulted and modality_prefix may be any slugifiable string.

None

Returns:

Type Description
Union[Path, Layer]

Path or layer reference to the updated edge features.

Source code in src/overture_to_arcgis/utils/_arcgis_routing.py
def add_impedance_column(
    edge_features: Union[str, Path, arcpy._mp.Layer],
    modality_prefix: str,
    coefficients: Optional[dict] = None,
) -> Union[Path, arcpy._mp.Layer]:
    """
    Add impedance columns to the edge features for routing.

    Args:
        edge_features: The input line feature layer or feature class.
        modality_prefix: The modality name (e.g. "walk", "bike"). Must be a key in
            ``_IMPEDANCE_REGISTRY`` unless ``coefficients`` is supplied.
        coefficients: Optional coefficient table override. When provided, the registry is
            not consulted and ``modality_prefix`` may be any slugifiable string.

    Returns:
        Path or layer reference to the updated edge features.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(edge_features, Path):
        edge_features = str(edge_features)

    # ensure the input features exist
    if not arcpy.Exists(edge_features):
        raise FileNotFoundError("Cannot access the path for the input features.")

    # ensure the modality prefix is valid for the field name
    modality_prefix = slugify(modality_prefix)

    # validate modality against registry (unless a custom coefficient table is provided)
    if coefficients is None and modality_prefix not in _IMPEDANCE_REGISTRY:
        raise ValueError(
            f"Unknown modality_prefix '{modality_prefix}'. "
            f"Valid options: {', '.join(sorted(_IMPEDANCE_REGISTRY))}."
        )

    # resolve the coefficient table to use
    _coeff_table = coefficients if coefficients is not None else _IMPEDANCE_REGISTRY[modality_prefix]

    # ensure the subtype and class fields exist
    fields = [f.name for f in arcpy.ListFields(edge_features)]
    if "subtype" not in fields or "class" not in fields:
        raise ValueError("The input features must have 'subtype' and 'class' fields.")

    # type fields necessary to be in the schema
    type_fields = ("class", "subtype")

    # add the impedance field if it does not exist
    impedance_field = f"{modality_prefix}_impedance"
    if impedance_field not in fields:
        arcpy.management.AddField(
            in_table=edge_features,
            field_name=impedance_field,
            field_type="FLOAT",
        )
        logger.info(f"Added field '{impedance_field}' to edge features.")
    else:
        logger.info(f"Field '{impedance_field}' already exists in edge features.")

    # update the impedance fields based on the resolved coefficient table
    with arcpy.da.UpdateCursor(
        edge_features, type_fields + (impedance_field,)
    ) as cursor:

        # iterate through the rows
        for row in cursor:

            # get the type values
            type_values = {
                "class": row[0],
                "subtype": row[1],
            }

            # reset restriction value
            row[2] = None

            # set restriction values based on the type values
            for type_field, type_value in type_values.items():

                # check if there are restrictions for this type field and value
                if type_value in _coeff_table[type_field]:

                    # get the restrictions for this type value
                    restriction = _coeff_table[type_field][type_value]

                    # set the restriction value in the row
                    row[2] = restriction

                # provide a default restriction value if none is set
                if row[2] is None:
                    row[2] = 1.0

            cursor.updateRow(row)

    # make sure edge features are a path to return if path is a string
    if isinstance(edge_features, str):
        edge_features = Path(edge_features)

    return edge_features

add_overture_taxonomy_fields(features)

Add 'category_' fields to the input features based on the Overture taxonomy.

The category for each row is read from the primary key in the JSON-encoded categories field.

Note

This function attempts to read the value for the primary key from string JSON in the categories field. If this field does not exist, this will raise an error.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis_fields.py
def add_overture_taxonomy_fields(
    features: Union[str, Path, arcpy._mp.Layer],
) -> None:
    """
    Add 'category_<n>' fields to the input features based on the Overture taxonomy.

    The category for each row is read from the ``primary`` key in the JSON-encoded
    ``categories`` field.

    !!! note
        This function attempts to read the value for the ``primary`` key from string JSON in the ``categories`` field.
        If this field does not exist, this will raise an error.

    Args:
        features: The input feature layer or feature class.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # describe the features and ensure it is point geometry
    desc = arcpy.Describe(features)
    if desc.shapeType not in ["Point", "Multipoint"]:
        raise ValueError(
            "Input features must be of point geometry type to add Overture taxonomy fields."
        )

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get a list of existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure the 'categories' field exists
    if "categories" not in field_names:
        raise ValueError(
            "Field for category extraction, 'categories', does not exist in features."
        )

    # create a generator to extract categories from the 'categories' field
    categories_gen = (
        json.loads(row[0]).get("primary")
        for row in arcpy.da.SearchCursor(features, ["categories"])
    )

    # root name for the taxonomy fields
    root_name = "primary_category"

    # get taxonomy dataframe
    taxonomy_df = get_overture_taxonomy_dataframe()

    # get the max lengths for each category field
    max_lengths = get_overture_taxonomy_category_field_max_lengths(taxonomy_df)

    # set the index to category_code for easier lookup
    taxonomy_df.set_index("category_code", inplace=True)

    # only keep the category columns in the taxonomy dataframe
    taxonomy_df = taxonomy_df.loc[
        :, [col for col in taxonomy_df.columns if col.startswith("category_")]
    ]

    # replace category in the field names with the root name
    taxonomy_df.columns = [
        col.replace("category_", f"{root_name}_") for col in taxonomy_df.columns
    ]
    max_lengths = {
        col.replace("category_", f"{root_name}_"): max_len
        for col, max_len in max_lengths.items()
    }

    # iterate through the maximum lengths and add fields to the features
    for col, max_len in max_lengths.items():
        # add the field to the features
        arcpy.management.AddField(
            in_table=features,
            field_name=col,
            field_type="TEXT",
            field_length=max_len,
        )

        logger.info(f"Added field '{col}' with length {max_len} to features.")

    # get the intersection of rows and taxonomy columns
    col_lst = [col for col in max_lengths.keys() if col in taxonomy_df.columns]

    # add the primary category column to the list
    col_lst.insert(0, f"{root_name}_code")

    # calculate the category code fields from the categories generator
    with arcpy.da.UpdateCursor(features, col_lst) as update_cursor:
        # iterate through the rows and categories
        for row, category in zip(update_cursor, categories_gen):
            # set the category fields if category is valid
            if (
                category is not None
                and isinstance(category, str)
                and len(category) > 0
                and not category.strip() == "None"
                and not category.strip().lower() == "null"
            ):
                # get the taxonomy row for the category
                taxonomy_row = taxonomy_df.loc[category]

                # if a taxonomy row is found, set the category fields
                if not taxonomy_row.empty:

                    # hydrate the first column with the category code
                    row[0] = taxonomy_row.name

                    # populate the rest of the values with values from the taxonomy row
                    for idx, col in enumerate(col_lst[1:]):
                        row[idx + 1] = taxonomy_row.loc[col]

                    # update the row
                    update_cursor.updateRow(row)

add_primary_category_field(features)

Add a 'primary_category' field to the input features and populate it from the 'categories' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis_fields.py
def add_primary_category_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'primary_category' field to the input features and populate it from the 'categories' field.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure source field 'categories' exists
    if "categories" not in field_names:
        raise ValueError("Source field 'categories' does not exist in features.")

    # check if 'primary_category' field exists
    if "primary_category" not in field_names:
        # add 'primary_category' field
        arcpy.management.AddField(
            in_table=features,
            field_name="primary_category",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'primary_category' field to features.")

    # calculate 'primary_category' from 'categories' field
    with arcpy.da.UpdateCursor(
        features, ["categories", "primary_category"]
    ) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the categories value and extract primary category
            categories_value = row[0]

            # set the primary category if categories_value is valid
            if (
                categories_value is not None
                and isinstance(categories_value, str)
                and len(categories_value) > 0
                and not categories_value.strip() == "None"
                and not categories_value.strip().lower() == "null"
            ):
                # parse the categories value into a dictionary
                categories_dict = json.loads(categories_value)

                # extract the primary category
                primary_category = categories_dict.get("primary")

                # ensure the primary category is not some variation of None
                if primary_category in [None, "None", "none", ""]:
                    primary_category = None

                # set the primary category in the row
                row[1] = primary_category

                # update the row
                update_cursor.updateRow(row)

add_primary_name(features)

Add a 'primary_name' field to the input features and populate it from the 'names' column.

Parses the JSON-encoded 'names' field and extracts the first common name entry to populate a new 'primary_name' text field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis_fields.py
def add_primary_name(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'primary_name' field to the input features and populate it from the 'names' column.

    Parses the JSON-encoded 'names' field and extracts the first common name entry to
    populate a new 'primary_name' text field.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure source field 'names' exists
    if "names" not in field_names:
        raise ValueError("Source field 'names' does not exist in features.")

    # check if 'primary_name' field exists
    if "primary_name" not in field_names:
        arcpy.management.AddField(
            in_table=features,
            field_name="primary_name",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'primary_name' field to features.")

    # calculate 'primary_name' from 'name' field
    with arcpy.da.UpdateCursor(features, ["names", "primary_name"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the name value and extract primary name
            name_str = row[0]

            # set the primary name if name_value is populated
            if (
                name_str is not None
                and isinstance(name_str, str)
                and len(name_str) > 0
                and not name_str.strip() == "None"
                and not name_str.strip().lower() == "null"
            ):
                # parse the name value into a dictionary
                name_dict = json.loads(name_str)

                # extract the primary name
                primary_name = name_dict.get("primary")

                # set the primary name in the row
                row[1] = primary_name

                # update the row
                update_cursor.updateRow(row)

                logger.debug(f"Set 'primary_name' to '{primary_name}' for feature.")

add_trail_field(features)

Add a 'trail' boolean field to the input features if it does not already exist.

Features with a class of 'track', 'path', 'footway', 'trail' or 'cycleway' are flagged with a value of 1.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis_fields.py
def add_trail_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'trail' boolean field to the input features if it does not already exist.

    Features with a class of 'track', 'path', 'footway', 'trail' or 'cycleway' are
    flagged with a value of ``1``.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get all field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure source field 'class' exists
    if "class" not in field_names:
        raise ValueError("Source field 'class' does not exist in features.")

    # check if 'trail_field' field exists
    if "trail" not in field_names:
        # add 'trail_field' field
        arcpy.management.AddField(
            in_table=features,
            field_name="trail",
            field_type="SHORT",
        )

        logger.debug("Added 'trail_field' field to features.")

    # list of classes to search for
    trail_classes = ["track", "path", "footway", "trail", "cycleway"]

    # calculate 'trail_field' from 'attributes' field
    with arcpy.da.UpdateCursor(features, ["class", "trail"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the attributes value and extract trail field
            class_value = row[0]

            # set the trail field if class_value is one of trail classes
            if class_value in trail_classes:
                # set the trail field in the row
                row[1] = 1

                # update the row
                update_cursor.updateRow(row)

add_website_field(features)

Add a 'website' field to the input features and populate it from the 'contact_info' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis_fields.py
def add_website_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'website' field to the input features and populate it from the 'contact_info' field.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # check if 'website' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if "website" not in field_names:
        # add 'website' field
        arcpy.management.AddField(
            in_table=features,
            field_name="website",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'website' field to features.")

    # calculate 'website' from 'websites' field
    with arcpy.da.UpdateCursor(features, ["websites", "website"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the websites value and extract website
            website_value = row[0]

            # set the website if website_value is valid
            if (
                website_value is not None
                and isinstance(website_value, str)
                and len(website_value) > 0
                and not website_value.strip() == "None"
                and not website_value.strip().lower() == "null"
            ):
                # parse the website value into a list
                website_lst = json.loads(website_value)

                # extract the first website from the list
                if isinstance(website_lst, list) and len(website_lst) > 0:
                    website = website_lst[0]

                    # only use the website if it is less than 255 characters
                    if (
                        isinstance(website, str)
                        and website.lower().strip() != "none"
                        and 0 < len(website) <= 255
                    ):
                        row[1] = website

                        # update the row
                        update_cursor.updateRow(row)

                    else:
                        logger.warning(
                            f"Website exceeds 255 characters and will not be set for the feature: '{website}'"
                        )

create_network_dataset(segment_features, connector_features, geodatabase, feature_dataset_name='overture_transportation', network_dataset_name='overture_network', modalities=None)

Create a network dataset from the input features.

Parameters:

Name Type Description Default
segment_features Union[str, Path, Layer]

The input line feature layer or feature class.

required
connector_features Union[str, Path, Layer]

Point feature layer or feature class for connector features.

required
geodatabase Union[str, Path]

The output geodatabase to create the network dataset in.

required
feature_dataset_name Optional[str]

The name of the feature dataset to create the network dataset in.

'overture_transportation'
network_dataset_name Optional[str]

The name of the network dataset to create.

'overture_network'
modalities Optional[list]

List of modality names for which impedance columns should be added (e.g. ["walk", "bike"]). Defaults to ["walk", "bike"].

None

Returns:

Type Description
Path

Path to the created network dataset.

Source code in src/overture_to_arcgis/utils/_arcgis_routing.py
def create_network_dataset(
    segment_features: Union[str, Path, arcpy._mp.Layer],
    connector_features: Union[str, Path, arcpy._mp.Layer],
    geodatabase: Union[str, Path],
    feature_dataset_name: Optional[str] = "overture_transportation",
    network_dataset_name: Optional[str] = "overture_network",
    modalities: Optional[list] = None,
) -> Path:
    """
    Create a network dataset from the input features.

    Args:
        segment_features: The input line feature layer or feature class.
        connector_features: Point feature layer or feature class for connector features.
        geodatabase: The output geodatabase to create the network dataset in.
        feature_dataset_name: The name of the feature dataset to create the network dataset in.
        network_dataset_name: The name of the network dataset to create.
        modalities: List of modality names for which impedance columns should be added
            (e.g. ``["walk", "bike"]``). Defaults to ``["walk", "bike"]``.

    Returns:
        Path to the created network dataset.
    """
    # function constant
    NETWORK_WALK_PATH = Path(__file__).parent.parent / "assets" / "walk_network.xml"

    # ensure network walk path exists
    if not NETWORK_WALK_PATH.exists():
        err_msg = f"Network walk path does not exist: {NETWORK_WALK_PATH}"
        logger.error(err_msg)
        raise FileNotFoundError(err_msg)

    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(segment_features, Path):
        segment_features = str(segment_features)

    # if the segment features is a layer, get the data source path
    if isinstance(segment_features, arcpy._mp.Layer):
        segment_features = segment_features.dataSource

    # ensure the input features exist
    if not arcpy.Exists(segment_features):
        err_msg = f"Cannot access the path for the input features: {segment_features}"
        logger.error(err_msg)
        raise FileNotFoundError(err_msg)

    # if the connector features are a path, convert to string - arcpy cannot handle Path objects
    if isinstance(connector_features, Path):
        connector_features = str(connector_features)

    # if the connector features are a layer, get the data source path
    if isinstance(connector_features, arcpy._mp.Layer):
        connector_features = connector_features.dataSource

    # ensure the connector features exist
    if not arcpy.Exists(connector_features):
        err_msg = (
            f"Cannot access the path for the connector features: {connector_features}"
        )
        logger.error(err_msg)
        raise FileNotFoundError(err_msg)

    # if the geodatabase is a Path, convert to string
    if isinstance(geodatabase, Path):
        geodatabase = str(geodatabase)

    # if the geodatabase does not exist, create it
    if not arcpy.Exists(geodatabase):
        arcpy.management.CreateFileGDB(
            out_folder=os.path.dirname(geodatabase),
            out_name=os.path.basename(geodatabase),
        )
        logger.info(f"Created geodatabase at '{geodatabase}'.")
    else:
        logger.info(f"Using existing geodatabase at '{geodatabase}'.")

    # if the feature dataset does not exist, create it
    feature_dataset_path = os.path.join(geodatabase, feature_dataset_name)
    if not arcpy.Exists(feature_dataset_path):
        # get the spatial reference from the input features
        spatial_ref = arcpy.Describe(segment_features).spatialReference

        arcpy.management.CreateFeatureDataset(
            out_dataset_path=geodatabase,
            out_name=feature_dataset_name,
            spatial_reference=spatial_ref,
        )

        logger.info(f"Created feature dataset '{feature_dataset_name}' in geodatabase.")

    else:
        logger.info(
            f"Using existing feature dataset '{feature_dataset_name}' in geodatabase."
        )

    # delete existing network dataset first — segments/connectors cannot be deleted while
    # they participate in a controller dataset such as a network dataset
    network_dataset_path = os.path.join(feature_dataset_path, network_dataset_name)
    if arcpy.Exists(network_dataset_path):
        arcpy.management.Delete(network_dataset_path)
        logger.info(
            f"Deleted existing network dataset '{network_dataset_name}' before recreating."
        )

    # copy segment features into the feature dataset, overwriting any previous run's data.
    # feature class names must be unique across the entire geodatabase, so also remove any
    # root-level copy that could conflict with the name inside the feature dataset.
    segments_in_dataset = os.path.join(feature_dataset_path, "segments")
    for segments_path in (segments_in_dataset, os.path.join(geodatabase, "segments")):
        if arcpy.Exists(segments_path):
            arcpy.management.Delete(segments_path)
            logger.info(f"Deleted existing segment features at '{segments_path}'.")
    arcpy.management.CopyFeatures(segment_features, segments_in_dataset)
    logger.info(
        f"Copied segment features '{segment_features}' to feature dataset '{feature_dataset_name}'."
    )

    # copy connector features into the feature dataset, overwriting any previous run's data.
    # same root-level conflict guard as for segments.
    if connector_features is not None:
        connector_features_in_dataset = os.path.join(feature_dataset_path, "connectors")
        for connectors_path in (
            connector_features_in_dataset,
            os.path.join(geodatabase, "connectors"),
        ):
            if arcpy.Exists(connectors_path):
                arcpy.management.Delete(connectors_path)
                logger.info(
                    f"Deleted existing connector features at '{connectors_path}'."
                )
        arcpy.management.CopyFeatures(connector_features, connector_features_in_dataset)
        logger.info(
            f"Copied connector features '{connector_features}' to feature dataset '{feature_dataset_name}'."
        )

    # if the primary name column is not in the segment features, add and populate
    if "primary_name" not in [f.name for f in arcpy.ListFields(segments_in_dataset)]:
        logger.info(
            f"Adding and populating 'primary_name' field in segment features '{segments_in_dataset}'."
        )
        add_primary_name(segments_in_dataset)
    else:
        logger.info(
            f"Primary name field already exists in segment features '{segments_in_dataset}'."
        )

    # split into segments by subclass rules
    logger.info(
        f"Splitting segment features '{segments_in_dataset}' into segments by subclass rules."
    )
    split_into_subclass_features(segments_in_dataset, remove_original_field=True)

    # split segments into subsegments by level rules
    logger.info(
        f"Splitting segment features '{segments_in_dataset}' into subsegments by level (z-index) rules."
    )
    split_into_level_features(segments_in_dataset, remove_original_field=True)

    # split the segment features at the connector points
    logger.info(
        f"Splitting segment features '{segments_in_dataset}' at connector points '{connector_features_in_dataset}'."
    )
    split_segments_at_connectors(
        segments_in_dataset, connector_features_in_dataset, delete_connectors_field=True
    )

    # add boolean access restriction fields
    logger.info(f"Adding boolean access restriction fields.")
    add_boolean_access_restrictions_fields(
        segments_in_dataset, remove_original_field=True
    )

    # ensure the walk-prohibition field always exists for the network dataset template
    _FOOT_ACCESS_FIELD = "access_denied_when_mode_foot"
    if _FOOT_ACCESS_FIELD not in [
        f.name for f in arcpy.ListFields(segments_in_dataset)
    ]:
        arcpy.management.AddField(
            in_table=segments_in_dataset,
            field_name=_FOOT_ACCESS_FIELD,
            field_type="SHORT",
        )
        logger.debug(
            f"Added missing field '{_FOOT_ACCESS_FIELD}' as SHORT to segment features."
        )
    else:
        logger.debug(
            f"Field '{_FOOT_ACCESS_FIELD}' already exists in segment features."
        )

    # add impedance columns for each requested modality
    if modalities is None:
        modalities = ["walk", "bike"]
    for modality in modalities:
        logger.info(
            f"Adding {modality} impedance column to segment features '{segments_in_dataset}'."
        )
        add_impedance_column(segments_in_dataset, modality_prefix=modality)

    # ensure the network template XML file has a valid XML declaration
    with open(NETWORK_WALK_PATH, "r", encoding="utf-8") as f:
        first_line = f.readline()
    if not first_line.strip().startswith("<?xml"):
        logger.warning(
            "Network template XML missing declaration. Prepending XML declaration."
        )
        with open(NETWORK_WALK_PATH, "r", encoding="utf-8") as f:
            content = f.read()
        with open(NETWORK_WALK_PATH, "w", encoding="utf-8") as f:
            f.write('<?xml version="1.0" encoding="utf-8"?>\n' + content)

    # create the network dataset
    logger.info(
        f"Creating network dataset '{network_dataset_name}' in feature dataset '{feature_dataset_path}'."
    )
    network_dataset = arcpy.na.CreateNetworkDatasetFromTemplate(
        network_dataset_template=str(NETWORK_WALK_PATH),
        output_feature_dataset=feature_dataset_path,
    )[0]

    # build the network so it is ready to use
    logger.info("Building network dataset.")
    arcpy.na.BuildNetwork(network_dataset)

    return Path(network_dataset)

get_all_overture_types(release=None, s3=None)

Returns a list of all available Overture dataset types for a given release.

Parameters:

Name Type Description Default
release Optional[str]

Optional release version. If not provided, the most current release will be used.

None
s3 Optional[S3FileSystem]

Optional pre-configured S3 filesystem. If not provided, an anonymous S3 filesystem will be created.

None

Returns:

Type Description
list[str]

List of available overture types for the release.

Source code in src/overture_to_arcgis/utils/_core.py
def get_all_overture_types(
    release: Optional[str] = None, s3: Optional[fs.S3FileSystem] = None
) -> list[str]:
    """
    Returns a list of all available Overture dataset types for a given release.

    Args:
        release: Optional release version. If not provided, the most current
            release will be used.
        s3: Optional pre-configured S3 filesystem. If not provided, an anonymous
            S3 filesystem will be created.

    Returns:
        List of available overture types for the release.
    """
    # if no release provided, get the most current one
    if release is None:
        release = get_current_release()

    # get the type theme map
    type_theme_map = get_type_theme_map(release=release, s3=s3)

    # get the types from the mapping
    types = list(type_theme_map.keys())

    logger.debug(f"Available types for release {release}: {types}")

    return types

get_current_release()

Returns the most current Overture dataset release string.

Returns:

Type Description
str

Most current release string.

Source code in src/overture_to_arcgis/utils/_core.py
def get_current_release() -> str:
    """
    Returns the most current Overture dataset release string.

    Returns:
        Most current release string.
    """
    # retrieve the list of releases
    releases = get_release_list()

    # make sure there is at least one release
    if not releases:
        raise RuntimeError("No Overture dataset releases found.")

    # get the most current release by sorting the list
    current_release = sorted(releases)[-1]

    logger.debug(f"Current release: {current_release}")

    return current_release

get_geometry_column(table)

Get the name of the geometry column from the PyArrow Table or RecordBatch metadata.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
str

Name of the geometry column.

Source code in src/overture_to_arcgis/utils/_core.py
def get_geometry_column(table: Union[pa.Table, pa.RecordBatch]) -> str:
    """
    Get the name of the geometry column from the PyArrow Table or RecordBatch metadata.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        Name of the geometry column.
    """
    geo_meta = table.schema.metadata.get(b"geo")
    if geo_meta is None:
        raise ValueError("No geometry metadata found in the Overture Maps data.")
    geo_meta = json.loads(geo_meta.decode("utf-8"))
    geom_col = geo_meta.get("primary_column")
    if geom_col is None or geom_col not in table.column_names:
        raise ValueError(
            "No valid primary_geometry column defined in the Overture Maps metadata."
        )
    return geom_col

get_layers_for_unique_values(input_features, field_name, arcgis_map=None)

Create layers from unique values in a specified field of the input features.

Parameters:

Name Type Description Default
input_features Union[Layer, str, Path]

The input feature layer or feature class.

required
field_name str

The field name to get unique values from.

required
arcgis_map Optional[Map]

The ArcGIS map object to add the layers to.

None

Returns:

Type Description
list[Layer]

A list of ArcGIS layers created from the unique values.

Source code in src/overture_to_arcgis/utils/_arcgis_features.py
def get_layers_for_unique_values(
    input_features: Union[arcpy._mp.Layer, str, Path],
    field_name: str,
    arcgis_map: Optional[arcpy._mp.Map] = None,
) -> list[arcpy._mp.Layer]:
    """
    Create layers from unique values in a specified field of the input features.

    Args:
        input_features: The input feature layer or feature class.
        field_name: The field name to get unique values from.
        arcgis_map: The ArcGIS map object to add the layers to.

    Returns:
        A list of ArcGIS layers created from the unique values.
    """
    # get unique values using a search cursor to generate value into a set
    unique_values = set(
        (val[0] for val in arcpy.da.SearchCursor(input_features, [field_name]))
    )

    # list to hydrate with created layers
    layers = []

    # iterate unique values
    for value in unique_values:
        # create layer name
        layer_name = f"{field_name}_{value}"

        # create definition query
        definition_query = (
            f"{field_name} = '{value}'"
            if isinstance(value, str)
            else f"{field_name} = {value}"
        )

        # use definition query to create layer object
        layer = arcpy.management.MakeFeatureLayer(
            in_features=input_features,
            out_layer=layer_name,
            where_clause=definition_query,
        )[0]

        # if the map is provided, add the layer to the map
        if arcgis_map:
            arcgis_map.addLayer(layer)
        layers.append(layer)

    return layers

get_logger(level='INFO', logger_name=None, logfile_path=None, log_format='%(asctime)s | %(name)s | %(levelname)s | %(message)s', propagate=True, add_stream_handler=True, add_arcpy_handler=False)

Get Python :class:Logger<logging.Logger> configured to provide stream, file or, if available, ArcPy output. The way the method is set up, logging will be routed through ArcPy messaging using :class:ArcpyHandler if ArcPy is available. If ArcPy is not available, messages will be sent to the console using a :class:StreamHandler<logging.StreamHandler>. Next, if the logfile_path is provided, log messages will also be written to the provided path to a logfile using a :class:FileHandler<logging.FileHandler>.

Valid log_level inputs include: * DEBUG - Detailed information, typically of interest only when diagnosing problems. * INFO - Confirmation that things are working as expected. * WARNING or WARN - An indication that something unexpected happened, or indicative of some problem in the near future (e.g. "disk space low"). The software is still working as expected. * ERROR - Due to a more serious problem, the software has not been able to perform some function. * CRITICAL - A serious error, indicating that the program itself may be unable to continue running.

Parameters:

Name Type Description Default
level Optional[Union[str, int]]

Logging level to use. Default is 'INFO'.

'INFO'
logger_name Optional[str]

Name of the logger. If None, the root logger is used.

None
log_format Optional[str]

Format string for the logging messages. Default is '%(asctime)s | %(name)s | %(levelname)s | %(message)s'.

'%(asctime)s | %(name)s | %(levelname)s | %(message)s'
propagate bool

If True, log messages are passed to the handlers of ancestor loggers. Default is False.

True
logfile_path Union[Path, str]

Where to save the logfile if file output is desired.

None
add_stream_handler bool

If True, add a StreamHandler to route logging to the console. Default is True.

True
add_arcpy_handler bool

If True and ArcPy is available, add the ArcpyHandler to route logging through ArcPy messaging. Default is False.

False
configure_logging('DEBUG')
logging.debug('nauseatingly detailed debugging message')
logging.info('something actually useful to know')
logging.warning('The sky may be falling')
logging.error('The sky is falling.)
logging.critical('The sky appears to be falling because a giant meteor is colliding with the earth.')
Source code in src/overture_to_arcgis/utils/_logging.py
def get_logger(
    level: Optional[Union[str, int]] = "INFO",
    logger_name: Optional[str] = None,
    logfile_path: Union[Path, str] = None,
    log_format: Optional[str] = "%(asctime)s | %(name)s | %(levelname)s | %(message)s",
    propagate: bool = True,
    add_stream_handler: bool = True,
    add_arcpy_handler: bool = False,
) -> logging.Logger:
    """
    Get Python :class:`Logger<logging.Logger>` configured to provide stream, file or, if available, ArcPy output.
    The way the method is set up, logging will be routed through ArcPy messaging using :class:`ArcpyHandler` if
    ArcPy is available. If ArcPy is *not* available, messages will be sent to the console using a
    :class:`StreamHandler<logging.StreamHandler>`. Next, if the `logfile_path` is provided, log messages will also
    be written to the provided path to a logfile using a :class:`FileHandler<logging.FileHandler>`.

    Valid `log_level` inputs include:
    * `DEBUG` - Detailed information, typically of interest only when diagnosing problems.
    * `INFO` - Confirmation that things are working as expected.
    * `WARNING` or ``WARN`` -  An indication that something unexpected happened, or indicative of some problem in the
        near future (e.g. "disk space low"). The software is still working as expected.
    * `ERROR` - Due to a more serious problem, the software has not been able to perform some function.
    * `CRITICAL` - A serious error, indicating that the program itself may be unable to continue running.

    Args:
        level: Logging level to use. Default is `'INFO'`.
        logger_name: Name of the logger. If `None`, the root logger is used.
        log_format: Format string for the logging messages. Default is `'%(asctime)s | %(name)s | %(levelname)s | %(message)s'`.
        propagate: If `True`, log messages are passed to the handlers of ancestor loggers. Default is `False`.
        logfile_path: Where to save the logfile if file output is desired.
        add_stream_handler: If `True`, add a `StreamHandler` to route logging to the console. Default is `True`.
        add_arcpy_handler: If `True` and ArcPy is available, add the `ArcpyHandler` to route logging through
            ArcPy messaging. Default is `False`.

    ``` python
    configure_logging('DEBUG')
    logging.debug('nauseatingly detailed debugging message')
    logging.info('something actually useful to know')
    logging.warning('The sky may be falling')
    logging.error('The sky is falling.)
    logging.critical('The sky appears to be falling because a giant meteor is colliding with the earth.')
    ```

    """
    # ensure valid logging level
    log_str_lst = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "WARN", "FATAL"]
    log_int_lst = [0, 10, 20, 30, 40, 50]

    if not isinstance(level, (str, int)):
        raise ValueError(
            "You must define a specific logging level for log_level as a string or integer."
        )
    elif isinstance(level, str) and level not in log_str_lst:
        raise ValueError(
            f'The log_level must be one of {log_str_lst}. You provided "{level}".'
        )
    elif isinstance(level, int) and level not in log_int_lst:
        raise ValueError(
            f"If providing an integer for log_level, it must be one of the following, {log_int_lst}."
        )

    # get default logger and set logging level at the same time
    logger = logging.getLogger(logger_name)
    logger.setLevel(level=level)

    # clear handlers
    logger.handlers.clear()

    # configure formatting
    log_frmt = logging.Formatter(log_format)

    # set propagation
    logger.propagate = propagate

    # make sure at least a stream handler is present
    if add_stream_handler:
        # create and add the stream handler
        sh = logging.StreamHandler()
        sh.setFormatter(log_frmt)
        logger.addHandler(sh)

    # if in an environment with ArcPy, add handler to bubble logging up to ArcGIS through ArcPy
    if add_arcpy_handler:
        ah = ArcpyHandler()
        ah.setFormatter(log_frmt)
        logger.addHandler(ah)

    # if a path for the logfile is provided, log results to the file
    if logfile_path is not None:
        # ensure the full path exists
        if not logfile_path.parent.exists():
            logfile_path.parent.mkdir(parents=True)

        # create and add the file handler
        fh = logging.FileHandler(str(logfile_path))
        fh.setFormatter(log_frmt)
        logger.addHandler(fh)

    return logger

get_record_batches(overture_type, bbox=None, connect_timeout=None, request_timeout=None)

Return a pyarrow RecordBatchReader for the desired bounding box and S3 path.

Parameters:

Name Type Description Default
overture_type str

Overture feature type to load.

required
bbox Optional[Tuple[float, float, float, float]]

Optional bounding box for data fetch (xmin, ymin, xmax, ymax).

None
connect_timeout Optional[float]

Optional connection timeout in seconds.

None
request_timeout Optional[float]

Optional request timeout in seconds.

None

Yields:

Type Description
RecordBatch

pa.RecordBatch: Record batches with the requested data.

Source code in src/overture_to_arcgis/utils/_core.py
def get_record_batches(
    overture_type: str,
    bbox: Optional[Tuple[float, float, float, float]] = None,
    connect_timeout: Optional[float] = None,
    request_timeout: Optional[float] = None,
) -> Generator[pa.RecordBatch, None, None]:
    """
    Return a pyarrow RecordBatchReader for the desired bounding box and S3 path.

    Args:
        overture_type: Overture feature type to load.
        bbox: Optional bounding box for data fetch (xmin, ymin, xmax, ymax).
        connect_timeout: Optional connection timeout in seconds.
        request_timeout: Optional request timeout in seconds.

    Yields:
        pa.RecordBatch: Record batches with the requested data.
    """
    # create connection to the S3 filesystem
    s3 = fs.S3FileSystem(
        anonymous=True,
        region="us-west-2",
        connect_timeout=connect_timeout,
        request_timeout=request_timeout,
    )

    # get the overture type to theme mapping
    type_theme_map = get_type_theme_map(s3=s3)

    # validate the overture type
    available_types = type_theme_map.keys()
    if overture_type not in available_types:
        raise ValueError(
            f"Invalid overture type: {overture_type}. Available types are: {list(available_types)}"
        )

    # validate the bounding box coordinates
    bbox = validate_bounding_box(bbox)

    # extract the coordinates from the bounding box and create the filter
    xmin, ymin, xmax, ymax = bbox
    dataset_filter = (
        (pc.field("bbox", "xmin") < xmax)
        & (pc.field("bbox", "xmax") > xmin)
        & (pc.field("bbox", "ymin") < ymax)
        & (pc.field("bbox", "ymax") > ymin)
    )

    # get the most current release version
    release = get_current_release()

    # create the dataset path
    s3_pth = get_dataset_path(overture_type, release)

    # create the PyArrow dataset
    dataset = ds.dataset(s3_pth, filesystem=s3)

    # get the record batches with the extent filter applied
    batches = dataset.to_batches(filter=dataset_filter)

    # iterate through the batches and yield with geoarrow metadata
    for idx, batch in enumerate(batches):
        # get the geometry field
        geo_fld_idx = batch.schema.get_field_index("geometry")
        geo_fld = batch.schema.field(geo_fld_idx)

        # set the geoarrow metadata on the geometry field
        geoarrow_geo_fld = geo_fld.with_metadata(
            {b"ARROW:extension:name": b"geoarrow.wkb"}
        )

        # create an updated schema with the correct metadata for the geometry field
        geoarrow_schema = batch.schema.set(geo_fld_idx, geoarrow_geo_fld)

        # replace the batch schema with the updated geoarrow schema
        batch = batch.replace_schema_metadata(geoarrow_schema.metadata)

        # yield the batch to the caller
        yield batch

get_release_list(s3=None)

Returns a list of all available Overture dataset releases.

Parameters:

Name Type Description Default
s3 Optional[S3FileSystem]

Optional pre-configured S3 filesystem. If not provided, an anonymous S3 filesystem will be created.

None

Returns:

Type Description
list[str]

List of release version strings available on S3.

Source code in src/overture_to_arcgis/utils/_core.py
def get_release_list(s3: Optional[fs.S3FileSystem] = None) -> list[str]:
    """
    Returns a list of all available Overture dataset releases.

    Args:
        s3: Optional pre-configured S3 filesystem. If not provided, an anonymous
            S3 filesystem will be created.

    Returns:
        List of release version strings available on S3.
    """
    # create S3 filesystem if not provided
    if s3 is None:
        s3 = fs.S3FileSystem(anonymous=True, region="us-west-2")

    # create fileselector
    selector = fs.FileSelector(
        base_dir="overturemaps-us-west-2/release/", recursive=False
    )

    # get the most current releases from S3 as FileInfo objects
    file_infos = s3.get_file_info(selector)

    # extract the directory names from the FileInfo objects
    directories = [
        info.path for info in file_infos if info.type == fs.FileType.Directory
    ]

    # get the directory names only (last part of the path)
    releases = [dir_path.split("/")[-1] for dir_path in directories]

    # for each of the releases, ensure the releas has data (can happen if new release is still being loaded)
    releases = [rel for rel in releases if len(get_themes(rel, s3)) >= 5]

    logger.debug(f"Available releases: {releases}")

    return releases

get_temp_gdb()

Get a shared temporary file geodatabase, creating it if it does not exist.

Returns:

Type Description
Path

Path to the temporary file geodatabase.

Source code in src/overture_to_arcgis/utils/_core.py
def get_temp_gdb() -> Path:
    """
    Get a shared temporary file geodatabase, creating it if it does not exist.

    Returns:
        Path to the temporary file geodatabase.
    """
    tmp_dir = get_temp_dir()
    tmp_gdb = tmp_dir / "tmp_data.gdb"
    if not tmp_gdb.exists():
        if has_arcpy:
            import arcpy

            arcpy.management.CreateFileGDB(str(tmp_dir), tmp_gdb.name)
        else:
            raise EnvironmentError("arcpy is required to create a File Geodatabase.")
    return tmp_gdb

split_into_level_features(features, output_features=None, remove_original_field=False)

Split features into subsegments based on the level_rules field and populate a z_index field.

The level_rules field uses the same structure as subclass_rules: a JSON array of objects, each with an integer value (the vertical level / z-index) and an optional between pair of fractions [start, end] describing which portion of the geometry the value applies to.

When output_features is provided the input data is first copied to the specified location and the split is performed on the copy. If the process fails, the newly created output dataset is deleted so the caller never sees a half-processed result.

Note

Any gaps between rules (leading, interior, or trailing) are filled with segments that retain the original feature's properties but have a None z_index value.

Note

When features share the same id value (e.g. after being split by [split_segments_at_connectors][]), the between fractions are evaluated relative to the combined original geometry rather than each individual sub-segment.

Warning

When output_features is not provided this function modifies the input features in place by adding new features and deleting the original ones.

# Example level_rules values:
# 1. [{"value": 1, "between": null}]
#    -> same geometry with 'z_index' field populated with 1
# 2. [{"value": 1, "between": [0.5, 1.0]}]
#    -> two features: 0-50% with null z_index, 50-100% with z_index=1
# 3. [{"value": -1, "between": [0.0, 0.3]}, {"value": 1, "between": [0.7, 1.0]}]
#    -> three features: 0-30% z_index=-1, 30-70% gap (null z_index),
#       70-100% z_index=1

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
output_features Optional[Union[str, Path]]

Optional path to an output feature class. When supplied, the input features are copied here before splitting and the original data is left untouched.

None
remove_original_field Optional[bool]

When True, the level_rules field is deleted from the feature class after the split is complete.

False

Returns:

Type Description
Optional[str]

The path to the output feature class when output_features is

Optional[str]

provided, otherwise None (in-place modification).

Raises:

Type Description
ValueError

If the required level_rules field is missing.

Source code in src/overture_to_arcgis/utils/_arcgis_features.py
def split_into_level_features(
    features: Union[str, Path, arcpy._mp.Layer],
    output_features: Optional[Union[str, Path]] = None,
    remove_original_field: Optional[bool] = False,
) -> Optional[str]:
    """
    Split features into subsegments based on the `level_rules` field and populate a `z_index` field.

    The `level_rules` field uses the same structure as `subclass_rules`:
    a JSON array of objects, each with an integer `value` (the vertical
    level / z-index) and an optional `between` pair of fractions
    `[start, end]` describing which portion of the geometry the value
    applies to.

    When `output_features` is provided the input data is first copied to
    the specified location and the split is performed on the copy.  If the
    process fails, the newly created output dataset is deleted so the
    caller never sees a half-processed result.

    !!! note
        Any gaps between rules (leading, interior, or trailing) are filled
        with segments that retain the original feature's properties but have
        a `None` z_index value.

    !!! note
        When features share the same `id` value (e.g. after being split
        by [split_segments_at_connectors][]), the `between`
        fractions are evaluated relative to the combined original
        geometry rather than each individual sub-segment.

    !!! warning
        When `output_features` is *not* provided this function modifies
        the input features in place by adding new features and deleting the
        original ones.

    ``` python
    # Example level_rules values:
    # 1. [{"value": 1, "between": null}]
    #    -> same geometry with 'z_index' field populated with 1
    # 2. [{"value": 1, "between": [0.5, 1.0]}]
    #    -> two features: 0-50% with null z_index, 50-100% with z_index=1
    # 3. [{"value": -1, "between": [0.0, 0.3]}, {"value": 1, "between": [0.7, 1.0]}]
    #    -> three features: 0-30% z_index=-1, 30-70% gap (null z_index),
    #       70-100% z_index=1
    ```

    Args:
        features: The input feature layer or feature class.
        output_features: Optional path to an output feature class.  When
            supplied, the input features are copied here before splitting
            and the original data is left untouched.
        remove_original_field: When ``True``, the ``level_rules``
            field is deleted from the feature class after the split is
            complete.

    Returns:
        The path to the output feature class when `output_features` is
        provided, otherwise `None` (in-place modification).

    Raises:
        ValueError: If the required `level_rules` field is missing.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # ------------------------------------------------------------------
    # If an output location was requested, copy the features there first
    # and redirect all subsequent operations to the copy.
    # ------------------------------------------------------------------
    if output_features is not None:
        if isinstance(output_features, Path):
            output_features = str(output_features)

        logger.debug(f"Copying features to output location: {output_features}")
        arcpy.management.CopyFeatures(features, output_features)

        # from here on, operate on the copy
        features = output_features

    # log the initial feature count
    initial_count = int(arcpy.management.GetCount(features)[0])
    logger.info(
        f"Starting split_into_level_features with {initial_count:,} features."
    )

    # get a list of existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure the necessary source field exists
    level_rules_field = "level_rules"
    if level_rules_field not in field_names:
        # roll back the copy if it was created before the validation error
        if output_features is not None and arcpy.Exists(output_features):
            arcpy.management.Delete(output_features)
            logger.debug(
                "Rolled back output feature class after validation failure."
            )
        raise ValueError(
            f"Source field '{level_rules_field}' does not exist in features. "
            f"This is necessary to split features by level."
        )

    try:
        # add z_index field if it does not already exist
        if "z_index" not in field_names:
            arcpy.management.AddField(
                in_table=features,
                field_name="z_index",
                field_type="LONG",
            )
            logger.debug("Added 'z_index' field to features.")

            # update field names list
            field_names = [f.name for f in arcpy.ListFields(features)]
        else:
            logger.debug("'z_index' field already exists in features.")

        # describe once for OID/shape metadata
        desc = arcpy.Describe(features)

        # ------------------------------------------------------------------
        # Pre-compute fraction ranges for features that may share an 'id'
        # (e.g. after split_segments_at_connectors).  For each feature we
        # store (group_start_frac, group_end_frac) representing where that
        # feature sits within the combined original segment.
        # ------------------------------------------------------------------
        id_field = "id"
        has_id_field = id_field in field_names
        oid_frac_map: dict[int, tuple[float, float]] = {}

        if has_id_field:
            id_groups: dict[object, list[tuple[int, object]]] = {}
            with arcpy.da.SearchCursor(
                features, [desc.OIDFieldName, id_field, "SHAPE@"]
            ) as sc:
                for oid, fid, geom in sc:
                    if fid is not None:
                        if geom is not None:
                            id_groups.setdefault(fid, []).append(
                                (oid, geom)
                            )
                        else:
                            logger.debug(
                                "Skipping OID %s with null geometry "
                                "(id=%s).",
                                oid,
                                fid,
                            )
                            oid_frac_map[oid] = (0.0, 1.0)
                    else:
                        oid_frac_map[oid] = (0.0, 1.0)

            for fid, members in id_groups.items():
                if len(members) == 1:
                    oid_frac_map[members[0][0]] = (0.0, 1.0)
                else:
                    chained = _chain_sub_segments(members)
                    total_length = sum(
                        g.length for _, g in chained if g is not None
                    )
                    if total_length == 0:
                        for oid, _ in chained:
                            oid_frac_map[oid] = (0.0, 1.0)
                        continue
                    cum = 0.0
                    for oid, geom in chained:
                        start_f = cum / total_length
                        cum += geom.length
                        end_f = cum / total_length
                        oid_frac_map[oid] = (start_f, end_f)

            logger.debug(
                f"Pre-computed fraction ranges for {len(oid_frac_map):,} features "
                f"across {len(id_groups):,} id groups."
            )
        else:
            logger.debug(
                "No 'id' field found — treating each feature as an independent segment."
            )

        # counters
        add_cnt = 0
        update_cnt = 0
        del_cnt = 0

        # delete oid tracker — use a set for O(1) membership checks during deletion
        del_oid_set: set[int] = set()

        # create a temporary feature class with the same schema to hold new features
        tmp_gdb = get_tmp_gdb()
        tmp_fc = arcpy.management.CreateFeatureclass(
            out_path=str(tmp_gdb),
            out_name=f"temp_level_{uuid.uuid4().hex}",
            geometry_type=desc.shapeType,
            template=features,
            spatial_reference=desc.spatialReference,
        )[0]

        logger.debug(f"Created temporary feature class for level features: {tmp_fc}")

        # cursor field names not including the geometry column
        cursor_fields = [f for f in field_names if f != desc.shapeFieldName]

        # add geometry token to cursor field names
        cursor_fields = cursor_fields + ["SHAPE@"]

        # use an update cursor to read and update features
        with arcpy.da.UpdateCursor(features, cursor_fields) as update_cursor:
            # use an insert cursor to add new features to the temporary feature class
            with arcpy.da.InsertCursor(tmp_fc, cursor_fields) as insert_cursor:
                # iterate through the update_cursor rows
                for row in update_cursor:
                    # get the level_rules as a raw string
                    level_rules_str = row[cursor_fields.index(level_rules_field)]

                    # only process if level_rules is valid
                    if not (
                        level_rules_str is None
                        or not isinstance(level_rules_str, str)
                        or level_rules_str.strip() == "null"
                        or len(level_rules_str) == 0
                    ):
                        # parse the level_rules string into a list of dicts
                        level_rules = json.loads(level_rules_str)

                        # common indices resolved once per feature
                        geom = row[-1]
                        if geom is None:
                            logger.warning(
                                "Skipping feature with null geometry "
                                f"(OID {row[0]})."
                            )
                            continue
                        z_index_idx = cursor_fields.index("z_index")
                        oid_idx = cursor_fields.index(desc.OIDFieldName)

                        # sort rules by start fraction so gaps can be detected in order
                        between_rules = [
                            r for r in level_rules if r.get("between") is not None
                        ]
                        no_between_rules = [
                            r for r in level_rules if r.get("between") is None
                        ]

                        # handle rules without a 'between' range (whole-geometry assignment)
                        for rule in no_between_rules:
                            value = rule.get("value")
                            # ensure value is stored as int (or None)
                            row[z_index_idx] = (
                                int(value) if value is not None else None
                            )
                            update_cursor.updateRow(row)
                            logger.debug(
                                f"Updated feature with OID {row[0]} to have "
                                f"z_index={value} for entire geometry."
                            )
                            update_cnt += 1

                        # process rules that define subsegments
                        if between_rules:
                            # sort by start fraction to process in order
                            between_rules.sort(key=lambda r: r["between"][0])

                            # determine this feature's position within the
                            # original (possibly pre-split) segment
                            group_start, group_end = oid_frac_map.get(
                                row[oid_idx], (0.0, 1.0)
                            )

                            # build a full [0,1] segment map and find pieces
                            # that overlap with this feature's range
                            segment_map = _build_segment_map(between_rules)
                            pieces = _get_overlapping_pieces(
                                segment_map, group_start, group_end
                            )

                            if len(pieces) <= 1:
                                # feature falls entirely within a single
                                # segment (or gap) — update in place
                                value = pieces[0][2] if pieces else None
                                row[z_index_idx] = (
                                    int(value) if value is not None else None
                                )
                                update_cursor.updateRow(row)
                                logger.debug(
                                    f"Updated feature OID {row[oid_idx]} with "
                                    f"z_index={value} (range "
                                    f"{group_start:.4f}-{group_end:.4f})."
                                )
                                update_cnt += 1
                            else:
                                # feature spans multiple segments — split it
                                feat_range = group_end - group_start
                                for p_start, p_end, value in pieces:
                                    local_start = (
                                        (p_start - group_start) / feat_range
                                    )
                                    local_end = (
                                        (p_end - group_start) / feat_range
                                    )
                                    new_row = list(row)
                                    new_row[z_index_idx] = (
                                        int(value) if value is not None else None
                                    )
                                    new_row[-1] = geom.segmentAlongLine(
                                        local_start * geom.length,
                                        local_end * geom.length,
                                    )
                                    insert_cursor.insertRow(new_row)
                                    logger.debug(
                                        f"Inserted level segment z_index={value} "
                                        f"local {local_start:.4f}-"
                                        f"{local_end:.4f} (global "
                                        f"{p_start:.4f}-{p_end:.4f}) for "
                                        f"OID {row[oid_idx]}."
                                    )
                                    add_cnt += 1

                                # mark the original feature for deletion
                                del_oid_set.add(row[oid_idx])

        # append the new features from the temporary feature class to the original features
        arcpy.management.Append(
            inputs=tmp_fc,
            target=features,
            schema_type="NO_TEST",
        )

        logger.debug("Appended new level-split features to original features.")

        # delete the split features - deleting after appending new features to avoid data loss
        with arcpy.da.UpdateCursor(features, "OID@") as drop_cursor:
            for row in drop_cursor:
                if row[0] in del_oid_set:
                    drop_cursor.deleteRow()

        logger.debug("Deleted original split features.")

        # delete the temporary file geodatabase
        shutil.rmtree(tmp_gdb, ignore_errors=True)

        logger.debug("Deleted temporary file geodatabase.")

        # remove the level_rules field if requested
        if remove_original_field:
            arcpy.management.DeleteField(features, [level_rules_field])
            logger.debug(
                f"Removed '{level_rules_field}' field from features."
            )

        # log the final counts
        final_count = int(arcpy.management.GetCount(features)[0])
        logger.info(
            f"Added {add_cnt:,} new level-split features, updated {update_cnt:,} "
            f"existing features, and deleted {len(del_oid_set):,} original features. "
            f"Final feature count: {final_count:,}."
        )

    except Exception:
        # if output_features was requested, roll back by deleting the copy
        if output_features is not None and arcpy.Exists(output_features):
            arcpy.management.Delete(output_features)
            logger.error(
                "Level split failed — rolled back by deleting the output feature class."
            )
        raise

    return output_features

split_into_subclass_features(features, output_features=None, remove_original_field=False)

Split features into subsegments based on the definition in the subclass_rules field.

When output_features is provided the input data is first copied to the specified location and the split is performed on the copy. If the process fails, the newly created output dataset is deleted so the caller never sees a half-processed result.

Note

Any gaps between rules (leading, interior, or trailing) are filled with segments that retain the original feature's properties but have a None subsegment value.

Note

When features share the same id value (e.g. after being split by [split_segments_at_connectors][]), the between fractions are evaluated relative to the combined original geometry rather than each individual sub-segment.

Warning

When output_features is not provided this function modifies the input features in place by adding new features and deleting the original ones.

# Example subclass_rules values:
# 1. [{"value": "driveway", "between": null}]
#    -> same geometry with 'subsegment' field populated with 'driveway'
# 2. [{"value": "driveway", "between": [0.772783061, 1.0]}]
#    -> two features: 0-77.28% with null subsegment, 77.28-100% with 'driveway'
# 3. [{"value": "driveway", "between": [0.0, 0.5]}, {"value": "alley", "between": [0.5, 1.0]}]
#    -> two subsegments with 'subsegment' field populated accordingly
# 4. [{"value": "driveway", "between": [0.0, 0.3]}, {"value": "alley", "between": [0.6, 1.0]}]
#    -> three features: 0-30% 'driveway', 30-60% gap (original properties, no subsegment),
#       60-100% 'alley'
# 5. [{"value": "driveway", "between": [0.0, 0.5]}]
#    -> two features: 0-50% 'driveway', 50-100% trailing gap (original properties,
#       no subsegment)

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
output_features Optional[Union[str, Path]]

Optional path to an output feature class. When supplied, the input features are copied here before splitting and the original data is left untouched.

None
remove_original_field Optional[bool]

When True, the subclass_rules field is deleted from the feature class after the split is complete.

False

Returns:

Type Description
Optional[str]

The path to the output feature class when output_features is

Optional[str]

provided, otherwise None (in-place modification).

Raises:

Type Description
ValueError

If the required subclass_rules field is missing.

Source code in src/overture_to_arcgis/utils/_arcgis_features.py
def split_into_subclass_features(
    features: Union[str, Path, arcpy._mp.Layer],
    output_features: Optional[Union[str, Path]] = None,
    remove_original_field: Optional[bool] = False
) -> Optional[str]:
    """
    Split features into subsegments based on the definition in the `subclass_rules` field.

    When `output_features` is provided the input data is first copied to the
    specified location and the split is performed on the copy.  If the process
    fails, the newly created output dataset is deleted so the caller never sees
    a half-processed result.

    !!! note
        Any gaps between rules (leading, interior, or trailing) are filled
        with segments that retain the original feature's properties but have
        a `None` subsegment value.

    !!! note
        When features share the same `id` value (e.g. after being split
        by [split_segments_at_connectors][]), the `between`
        fractions are evaluated relative to the combined original
        geometry rather than each individual sub-segment.

    !!! warning
        When `output_features` is *not* provided this function modifies
        the input features in place by adding new features and deleting the
        original ones.

    ``` python
    # Example subclass_rules values:
    # 1. [{"value": "driveway", "between": null}]
    #    -> same geometry with 'subsegment' field populated with 'driveway'
    # 2. [{"value": "driveway", "between": [0.772783061, 1.0]}]
    #    -> two features: 0-77.28% with null subsegment, 77.28-100% with 'driveway'
    # 3. [{"value": "driveway", "between": [0.0, 0.5]}, {"value": "alley", "between": [0.5, 1.0]}]
    #    -> two subsegments with 'subsegment' field populated accordingly
    # 4. [{"value": "driveway", "between": [0.0, 0.3]}, {"value": "alley", "between": [0.6, 1.0]}]
    #    -> three features: 0-30% 'driveway', 30-60% gap (original properties, no subsegment),
    #       60-100% 'alley'
    # 5. [{"value": "driveway", "between": [0.0, 0.5]}]
    #    -> two features: 0-50% 'driveway', 50-100% trailing gap (original properties,
    #       no subsegment)
    ```

    Args:
        features: The input feature layer or feature class.
        output_features: Optional path to an output feature class.  When
            supplied, the input features are copied here before splitting
            and the original data is left untouched.
        remove_original_field: When ``True``, the ``subclass_rules``
            field is deleted from the feature class after the split is
            complete.

    Returns:
        The path to the output feature class when `output_features` is
        provided, otherwise `None` (in-place modification).

    Raises:
        ValueError: If the required `subclass_rules` field is missing.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # ------------------------------------------------------------------
    # If an output location was requested, copy the features there first
    # and redirect all subsequent operations to the copy.
    # ------------------------------------------------------------------
    if output_features is not None:
        if isinstance(output_features, Path):
            output_features = str(output_features)

        logger.debug(f"Copying features to output location: {output_features}")
        arcpy.management.CopyFeatures(features, output_features)

        # from here on, operate on the copy
        features = output_features

    # log the initial feature count
    initial_count = int(arcpy.management.GetCount(features)[0])
    logger.info(f"Starting split_into_subclass_features with {initial_count:,} features.")

    # get a list of existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure the necessary source field exists
    subclass_rules_field = "subclass_rules"
    if subclass_rules_field not in field_names:
        # roll back the copy if it was created before the validation error
        if output_features is not None and arcpy.Exists(output_features):
            arcpy.management.Delete(output_features)
            logger.debug(
                "Rolled back output feature class after validation failure."
            )
        raise ValueError(
            f"Source field '{subclass_rules_field}' does not exist in features. This is necessary to split features "
            f"into subclasses."
        )

    try:
        # add subsegment field if it does not already exist
        if "subsegment" not in field_names:
            arcpy.management.AddField(
                in_table=features,
                field_name="subsegment",
                field_type="TEXT",
                field_length=50,
            )
            logger.debug("Added 'subsegment' field to features.")

            # update field names list
            field_names = [f.name for f in arcpy.ListFields(features)]
        else:
            logger.debug("'subsegment' field already exists in features.")

        # describe once for OID/shape metadata
        desc = arcpy.Describe(features)

        # ------------------------------------------------------------------
        # Pre-compute fraction ranges for features that may share an 'id'
        # (e.g. after split_segments_at_connectors).  For each feature we
        # store (group_start_frac, group_end_frac) representing where that
        # feature sits within the combined original segment.
        # ------------------------------------------------------------------
        id_field = "id"
        has_id_field = id_field in field_names
        oid_frac_map: dict[int, tuple[float, float]] = {}

        if has_id_field:
            id_groups: dict[object, list[tuple[int, object]]] = {}
            with arcpy.da.SearchCursor(
                features, [desc.OIDFieldName, id_field, "SHAPE@"]
            ) as sc:
                for oid, fid, geom in sc:
                    if fid is not None:
                        if geom is not None:
                            id_groups.setdefault(fid, []).append(
                                (oid, geom)
                            )
                        else:
                            logger.debug(
                                "Skipping OID %s with null geometry "
                                "(id=%s).",
                                oid,
                                fid,
                            )
                            oid_frac_map[oid] = (0.0, 1.0)
                    else:
                        oid_frac_map[oid] = (0.0, 1.0)

            for fid, members in id_groups.items():
                if len(members) == 1:
                    oid_frac_map[members[0][0]] = (0.0, 1.0)
                else:
                    chained = _chain_sub_segments(members)
                    total_length = sum(
                        g.length for _, g in chained if g is not None
                    )
                    if total_length == 0:
                        for oid, _ in chained:
                            oid_frac_map[oid] = (0.0, 1.0)
                        continue
                    cum = 0.0
                    for oid, geom in chained:
                        start_f = cum / total_length
                        cum += geom.length
                        end_f = cum / total_length
                        oid_frac_map[oid] = (start_f, end_f)

            logger.debug(
                f"Pre-computed fraction ranges for {len(oid_frac_map):,} features "
                f"across {len(id_groups):,} id groups."
            )
        else:
            logger.debug(
                "No 'id' field found — treating each feature as an independent segment."
            )

        # counters
        add_cnt = 0
        update_cnt = 0
        del_cnt = 0

        # delete oid tracker — use a set for O(1) membership checks during deletion
        del_oid_set: set[int] = set()

        # create a temporary feature class with the same schema to hold new features
        tmp_gdb = get_tmp_gdb()
        tmp_fc = arcpy.management.CreateFeatureclass(
            out_path=str(tmp_gdb),
            out_name=f"temp_subclass_{uuid.uuid4().hex}",
            geometry_type=desc.shapeType,
            template=features,
            spatial_reference=desc.spatialReference,
        )[0]

        logger.debug(f"Created temporary feature class for subclass features: {tmp_fc}")

        # cursor field names not including the geometry column
        cursor_fields = [f for f in field_names if f != desc.shapeFieldName]

        # add geometry token to cursor field names
        cursor_fields = cursor_fields + ["SHAPE@"]

        # use an update cursor to read and update features
        with arcpy.da.UpdateCursor(features, cursor_fields) as update_cursor:
            # use an insert cursor to add new features to the temporary feature class
            with arcpy.da.InsertCursor(tmp_fc, cursor_fields) as insert_cursor:
                # iterate through the update_cursor rows
                for row in update_cursor:
                    # get the subclass_rules as a raw string
                    subclass_rules_str = row[cursor_fields.index(subclass_rules_field)]

                    # only process if subclass_rules is valid
                    if not (
                        subclass_rules_str is None
                        or not isinstance(subclass_rules_str, str)
                        or subclass_rules_str.strip() == "null"
                        or len(subclass_rules_str) == 0
                    ):
                        # parse the subclass_rules string into a list of dictionaries
                        subclass_rules = json.loads(subclass_rules_str)

                        # common indices resolved once per feature
                        geom = row[-1]
                        subclass_idx = cursor_fields.index("subsegment")
                        oid_idx = cursor_fields.index(desc.OIDFieldName)

                        # sort rules by start fraction so gaps can be detected in order
                        between_rules = [
                            r for r in subclass_rules if r.get("between") is not None
                        ]
                        no_between_rules = [
                            r for r in subclass_rules if r.get("between") is None
                        ]

                        # handle rules without a 'between' range (whole-geometry assignment)
                        for rule in no_between_rules:
                            value = rule.get("value")
                            row[subclass_idx] = value
                            update_cursor.updateRow(row)
                            logger.debug(
                                f"Updated feature with OID {row[0]} to have subsegment '{value}' for entire geometry."
                            )
                            update_cnt += 1

                        # process rules that define subsegments
                        if between_rules:
                            # sort by start fraction to process in order
                            between_rules.sort(key=lambda r: r["between"][0])

                            # determine this feature's position within the
                            # original (possibly pre-split) segment
                            group_start, group_end = oid_frac_map.get(
                                row[oid_idx], (0.0, 1.0)
                            )

                            # build a full [0,1] segment map and find pieces
                            # that overlap with this feature's range
                            segment_map = _build_segment_map(between_rules)
                            pieces = _get_overlapping_pieces(
                                segment_map, group_start, group_end
                            )

                            if len(pieces) <= 1:
                                # feature falls entirely within a single
                                # segment (or gap) — update in place
                                value = pieces[0][2] if pieces else None
                                row[subclass_idx] = value
                                update_cursor.updateRow(row)
                                logger.debug(
                                    f"Updated feature OID {row[oid_idx]} with "
                                    f"subsegment '{value}' (range "
                                    f"{group_start:.4f}-{group_end:.4f})."
                                )
                                update_cnt += 1
                            else:
                                # feature spans multiple segments — split it
                                feat_range = group_end - group_start
                                for p_start, p_end, value in pieces:
                                    local_start = (
                                        (p_start - group_start) / feat_range
                                    )
                                    local_end = (
                                        (p_end - group_start) / feat_range
                                    )
                                    new_row = list(row)
                                    new_row[subclass_idx] = value
                                    new_row[-1] = geom.segmentAlongLine(
                                        local_start * geom.length,
                                        local_end * geom.length,
                                    )
                                    insert_cursor.insertRow(new_row)
                                    logger.debug(
                                        f"Inserted subsegment '{value}' "
                                        f"local {local_start:.4f}-"
                                        f"{local_end:.4f} (global "
                                        f"{p_start:.4f}-{p_end:.4f}) for "
                                        f"OID {row[oid_idx]}."
                                    )
                                    add_cnt += 1

                                # mark the original feature for deletion
                                del_oid_set.add(row[oid_idx])

        # append the new features from the temporary feature class to the original features
        arcpy.management.Append(
            inputs=tmp_fc,
            target=features,
            schema_type="NO_TEST",
        )

        logger.debug("Appended new subsegment features to original features.")

        # delete the split features - deleting after appending new features to avoid data loss
        with arcpy.da.UpdateCursor(features, "OID@") as drop_cursor:
            for row in drop_cursor:
                if row[0] in del_oid_set:
                    drop_cursor.deleteRow()

        logger.debug("Deleted original split features.")

        # delete the temporary file geodatabase
        shutil.rmtree(tmp_gdb, ignore_errors=True)

        logger.debug("Deleted temporary file geodatabase.")

        # remove the subclass_rules field if requested
        if remove_original_field:
            arcpy.management.DeleteField(features, [subclass_rules_field])
            logger.debug(
                f"Removed '{subclass_rules_field}' field from features."
            )

        # log the final counts
        final_count = int(arcpy.management.GetCount(features)[0])
        logger.info(
            f"Added {add_cnt:,} new subsegment features, updated {update_cnt:,} existing features, and deleted "
            f"{len(del_oid_set):,} original features. Final feature count: {final_count:,}."
        )

    except Exception:
        # if output_features was requested, roll back by deleting the copy
        if output_features is not None and arcpy.Exists(output_features):
            arcpy.management.Delete(output_features)
            logger.error(
                "Split failed — rolled back by deleting the output feature class."
            )
        raise

    return output_features

split_segments_at_connectors(features, connector_features, output_features=None, search_radius='10 Meters', delete_connectors_field=False)

Split segment polylines at connector point geometries listed in the connectors field.

For each segment, the connectors JSON field is parsed to obtain the list of connector_id values. The corresponding point geometries are looked up from connector_features and their positions along the segment are computed using queryPointAndDistance. The segment is then split into sub-segments between consecutive connector positions using segmentAlongLine.

Only connector points explicitly referenced in a segment's connectors field are used to split that segment, ensuring unrelated nearby connectors do not interfere.

When output_features is provided the input data is first copied to the specified location and all processing is performed on the copy. If the process fails, the newly created output dataset is deleted so the caller never sees a half-processed result.

Note

Features whose connectors field is null, empty, unparseable, or references fewer than three connector points (i.e. only start and end) are left untouched because no interior split is required.

Note

Connector points are snapped to the nearest position on the segment polyline. Points farther than search_radius from any listed segment are logged as warnings and skipped.

Warning

When output_features is not provided this function modifies the input features in place by inserting new sub-segment features and deleting the originals that were split.

# Example connectors values:
# [{"connector_id": "abc", "at": 0.0}, {"connector_id": "def", "at": 1.0}]
#    -> no split needed (start and end only)
# [{"connector_id": "abc", "at": 0.0}, {"connector_id": "mid", "at": 0.4},
#  {"connector_id": "def", "at": 1.0}]
#    -> two features split at the "mid" connector point location

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class containing Overture segment polylines.

required
connector_features Union[str, Path, Layer]

A point feature layer or feature class containing Overture connector geometries. Must have an id field matching the connector_id values stored in each segment's connectors JSON.

required
output_features Optional[Union[str, Path]]

Optional path to an output feature class. When supplied, the input features are copied here before splitting and the original data is left untouched.

None
search_radius str

Maximum distance a connector point may be from a segment to be considered valid. Points farther away are skipped with a warning. Accepts any linear unit string recognised by arcpy (e.g. "10 Meters").

'10 Meters'
delete_connectors_field Optional[bool]

When True, the connectors field is deleted from the feature class after the split is complete.

False

Returns:

Type Description
Optional[str]

The path to the output feature class when output_features is

Optional[str]

provided, otherwise None (in-place modification).

Raises:

Type Description
ValueError

If the required connectors field is missing from features or the required id field is missing from connector_features.

Source code in src/overture_to_arcgis/utils/_arcgis_features.py
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
def split_segments_at_connectors(
    features: Union[str, Path, arcpy._mp.Layer],
    connector_features: Union[str, Path, arcpy._mp.Layer],
    output_features: Optional[Union[str, Path]] = None,
    search_radius: str = "10 Meters",
    delete_connectors_field: Optional[bool] = False
) -> Optional[str]:
    """
    Split segment polylines at connector point geometries listed in the `connectors` field.

    For each segment, the `connectors` JSON field is parsed to obtain the
    list of `connector_id` values.  The corresponding point geometries are
    looked up from `connector_features` and their positions along the
    segment are computed using `queryPointAndDistance`.  The segment is
    then split into sub-segments between consecutive connector positions
    using `segmentAlongLine`.

    Only connector points explicitly referenced in a segment's
    `connectors` field are used to split that segment, ensuring unrelated
    nearby connectors do not interfere.

    When `output_features` is provided the input data is first copied to
    the specified location and all processing is performed on the copy.
    If the process fails, the newly created output dataset is deleted so
    the caller never sees a half-processed result.

    !!! note
        Features whose `connectors` field is *null*, empty, unparseable,
        or references fewer than three connector points (i.e. only start
        and end) are left untouched because no interior split is required.

    !!! note
        Connector points are snapped to the nearest position on the
        segment polyline.  Points farther than `search_radius` from
        any listed segment are logged as warnings and skipped.

    !!! warning
        When `output_features` is *not* provided this function modifies
        the input features in place by inserting new sub-segment features
        and deleting the originals that were split.

    ``` python
    # Example connectors values:
    # [{"connector_id": "abc", "at": 0.0}, {"connector_id": "def", "at": 1.0}]
    #    -> no split needed (start and end only)
    # [{"connector_id": "abc", "at": 0.0}, {"connector_id": "mid", "at": 0.4},
    #  {"connector_id": "def", "at": 1.0}]
    #    -> two features split at the "mid" connector point location
    ```

    Args:
        features: The input feature layer or feature class containing
            Overture segment polylines.
        connector_features: A point feature layer or feature class
            containing Overture connector geometries.  Must have an
            `id` field matching the `connector_id` values stored in
            each segment's `connectors` JSON.
        output_features: Optional path to an output feature class.  When
            supplied, the input features are copied here before splitting
            and the original data is left untouched.
        search_radius: Maximum distance a connector point may be from a
            segment to be considered valid.  Points farther away are
            skipped with a warning.  Accepts any linear unit string
            recognised by arcpy (e.g. ``"10 Meters"``).
        delete_connectors_field: When ``True``, the ``connectors``
            field is deleted from the feature class after the split is
            complete.

    Returns:
        The path to the output feature class when `output_features` is
        provided, otherwise `None` (in-place modification).

    Raises:
        ValueError: If the required `connectors` field is missing from
            `features` or the required `id` field is missing from
            `connector_features`.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # resolve to catalog path when a layer is provided to avoid schema locks
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # normalise connector_features to a string path
    if isinstance(connector_features, Path):
        connector_features = str(connector_features)
    if isinstance(connector_features, arcpy._mp.Layer):
        connector_features = arcpy.Describe(connector_features).catalogPath

    # parse the search_radius into a linear unit value for distance comparison
    radius_parts = search_radius.strip().split()
    radius_value = float(radius_parts[0])

    # ------------------------------------------------------------------
    # If an output location was requested, copy the features there first
    # and redirect all subsequent operations to the copy.
    # ------------------------------------------------------------------
    if output_features is not None:
        if isinstance(output_features, Path):
            output_features = str(output_features)

        logger.debug(f"Copying features to output location: {output_features}")
        arcpy.management.CopyFeatures(features, output_features)

        # from here on, operate on the copy
        features = output_features

    # log the initial feature count
    initial_count = int(arcpy.management.GetCount(features)[0])
    logger.info(
        f"Starting split_segments_at_connectors with {initial_count:,} features."
    )

    # get a list of existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure the connectors field exists
    connectors_field = "connectors"
    if connectors_field not in field_names:
        # roll back the copy if it was created before the validation error
        if output_features is not None and arcpy.Exists(output_features):
            arcpy.management.Delete(output_features)
            logger.debug(
                "Rolled back output feature class after validation failure."
            )
        raise ValueError(
            f"Source field '{connectors_field}' does not exist in features. "
            f"This is necessary to split segments at connector points."
        )

    # validate connector_features has an 'id' field
    conn_field_names = [f.name for f in arcpy.ListFields(connector_features)]
    if "id" not in conn_field_names:
        if output_features is not None and arcpy.Exists(output_features):
            arcpy.management.Delete(output_features)
            logger.debug(
                "Rolled back output feature class after connector validation failure."
            )
        raise ValueError(
            "Connector features must contain an 'id' field matching the "
            "'connector_id' values in the segment connectors JSON."
        )

    try:
        # ------------------------------------------------------------------
        # Build connector_id -> point geometry lookup dict (single pass).
        # ------------------------------------------------------------------
        connector_geom_map: dict[str, arcpy.Geometry] = {}
        with arcpy.da.SearchCursor(
            connector_features, ["id", "SHAPE@"]
        ) as conn_cursor:
            for conn_id, geom in conn_cursor:
                if conn_id is not None and geom is not None:
                    connector_geom_map[str(conn_id)] = geom

        logger.debug(
            f"Built connector geometry lookup with "
            f"{len(connector_geom_map):,} entries."
        )

        # counters
        add_cnt = 0
        del_oid_lst: list[int] = []

        # create a temporary feature class with the same schema to hold new features
        tmp_gdb = get_tmp_gdb()
        desc = arcpy.Describe(features)
        tmp_fc = arcpy.management.CreateFeatureclass(
            out_path=str(tmp_gdb),
            out_name=f"temp_connectors_{uuid.uuid4().hex}",
            geometry_type=desc.shapeType,
            template=features,
            spatial_reference=desc.spatialReference,
        )[0]

        logger.debug(
            f"Created temporary feature class for connector-split features: {tmp_fc}"
        )

        # build cursor field list (all fields except shape, plus SHAPE@ token)
        cursor_fields = [f for f in field_names if f != desc.shapeFieldName]
        cursor_fields = cursor_fields + ["SHAPE@"]

        # resolve field indices once
        connectors_idx = cursor_fields.index(connectors_field)
        oid_idx = cursor_fields.index(desc.OIDFieldName)

        # read + split
        with arcpy.da.UpdateCursor(features, cursor_fields) as update_cursor:
            with arcpy.da.InsertCursor(tmp_fc, cursor_fields) as insert_cursor:
                for row in update_cursor:
                    connectors_str = row[connectors_idx]

                    # skip features with no valid connectors value
                    if (
                        connectors_str is None
                        or not isinstance(connectors_str, str)
                        or connectors_str.strip() in ("", "null")
                    ):
                        continue

                    # attempt to parse the JSON
                    try:
                        connectors_list = json.loads(connectors_str)
                    except (json.JSONDecodeError, TypeError):
                        logger.debug(
                            f"Skipping OID {row[oid_idx]}: unable to parse connectors JSON."
                        )
                        continue

                    # must be a non-empty list
                    if not isinstance(connectors_list, list) or len(connectors_list) == 0:
                        continue

                    # need at least 3 connector entries to have an interior split
                    if len(connectors_list) < 3:
                        continue

                    geom = row[-1]
                    if geom is None:
                        continue

                    line_length = geom.length
                    if line_length == 0:
                        continue

                    # ----------------------------------------------------------
                    # Resolve connector_ids to point geometries and compute
                    # their distance along the segment polyline.
                    # ----------------------------------------------------------
                    distances: list[float] = []
                    for entry in connectors_list:
                        cid = entry.get("connector_id")
                        if cid is None:
                            continue

                        pt_geom = connector_geom_map.get(str(cid))
                        if pt_geom is None:
                            logger.debug(
                                f"Connector '{cid}' referenced by OID "
                                f"{row[oid_idx]} not found in connector "
                                f"features — skipping."
                            )
                            continue

                        # queryPointAndDistance returns:
                        # (point_on_line, distance_along, distance_from, right_side)
                        result = geom.queryPointAndDistance(pt_geom)
                        distance_along = result[1]
                        distance_from_line = result[2]

                        # skip if the connector is too far from the segment
                        if distance_from_line > radius_value:
                            logger.warning(
                                f"Connector '{cid}' is {distance_from_line:.2f} "
                                f"units from OID {row[oid_idx]} (exceeds "
                                f"search_radius={search_radius}) — skipping."
                            )
                            continue

                        distances.append(distance_along)

                    # deduplicate and sort distances
                    unique_distances = sorted(set(
                        round(d, 8) for d in distances
                    ))

                    # need at least 3 unique positions to produce interior splits
                    if len(unique_distances) < 3:
                        continue

                    # create one sub-segment for each consecutive pair of distances
                    for i in range(len(unique_distances) - 1):
                        start_dist = unique_distances[i]
                        end_dist = unique_distances[i + 1]

                        new_row = list(row)
                        new_row[-1] = geom.segmentAlongLine(
                            start_dist,
                            end_dist,
                        )
                        insert_cursor.insertRow(new_row)
                        logger.debug(
                            f"Inserted connector sub-segment from "
                            f"{start_dist:.2f} to {end_dist:.2f} for "
                            f"OID {row[oid_idx]}."
                        )
                        add_cnt += 1

                    # mark the original feature for deletion
                    del_oid_lst.append(row[oid_idx])

        # append the new features from the temporary feature class into the target
        arcpy.management.Append(
            inputs=tmp_fc,
            target=features,
            schema_type="NO_TEST",
        )
        logger.debug("Appended connector-split features to target features.")

        # delete the original features that were split
        del_oid_set = set(del_oid_lst)
        with arcpy.da.UpdateCursor(features, "OID@") as drop_cursor:
            for row in drop_cursor:
                if row[0] in del_oid_set:
                    drop_cursor.deleteRow()

        logger.debug("Deleted original features that were split at connectors.")

        # clean up temporary geodatabase
        shutil.rmtree(tmp_gdb, ignore_errors=True)
        logger.debug("Deleted temporary file geodatabase.")

        # remove the connectors field if requested
        if delete_connectors_field:
            arcpy.management.DeleteField(features, [connectors_field])
            logger.debug(
                f"Removed '{connectors_field}' field from features."
            )

        # log final counts
        final_count = int(arcpy.management.GetCount(features)[0])
        logger.info(
            f"Added {add_cnt:,} connector-split sub-segments and deleted "
            f"{len(del_oid_lst):,} original features. "
            f"Final feature count: {final_count:,}."
        )

    except Exception:
        # roll back output copy on failure
        if output_features is not None and arcpy.Exists(output_features):
            arcpy.management.Delete(output_features)
            logger.error(
                "Split at connectors failed — rolled back by deleting the output feature class."
            )
        raise

    return output_features

table_to_features(table, output_features)

Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Feature Class.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
Path

Path to the created feature class.

Source code in src/overture_to_arcgis/utils/_core.py
def table_to_features(
    table: Union[pa.Table, pa.RecordBatch], output_features: Union[str, Path]
) -> Path:
    """
    Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Feature Class.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        Path to the created feature class.
    """
    # convert the table to a spatially enabled dataframe
    df = table_to_spatially_enabled_dataframe(table)

    # save the dataframe to a feature class
    df.spatial.to_featureclass(output_features)

    return output_features

table_to_spatially_enabled_dataframe(table)

Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Spatially Enabled DataFrame.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
DataFrame

ArcGIS Spatially Enabled DataFrame.

Source code in src/overture_to_arcgis/utils/_core.py
def table_to_spatially_enabled_dataframe(
    table: Union[pa.Table, pa.RecordBatch]
) -> pd.DataFrame:
    """
    Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Spatially Enabled DataFrame.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        ArcGIS Spatially Enabled DataFrame.
    """
    # clean up any complex columns
    smpl_table = convert_complex_columns_to_strings(table)

    # convert table to a pandas DataFrame
    df = smpl_table.to_pandas()

    # get the geometry column from the metadata using the helper function
    geom_col = get_geometry_column(table)

    # convert the geometry column from WKB to arcgis Geometry objects
    df[geom_col] = convert_wkb_column_to_arcgis_geometry(df[geom_col])

    # set the geometry column using the ArcGIS GeoAccessor to get a Spatially Enabled DataFrame
    df.spatial.set_geometry(geom_col, sr=4326, inplace=True)

    return df

validate_bounding_box(bbox)

Validate and normalize bounding box coordinates.

Ensures the bounding box has four numeric values within valid geographic ranges and that min values are less than max values.

Parameters:

Name Type Description Default
bbox tuple[float, float, float, float]

Tuple of (minx, miny, maxx, maxy) coordinates.

required

Returns:

Type Description
tuple[float, float, float, float]

Validated bounding box tuple with float values.

Raises:

Type Description
ValueError

If the bounding box is malformed or out of range.

Source code in src/overture_to_arcgis/utils/_core.py
def validate_bounding_box(
    bbox: tuple[float, float, float, float]
) -> tuple[float, float, float, float]:
    """
    Validate and normalize bounding box coordinates.

    Ensures the bounding box has four numeric values within valid geographic
    ranges and that min values are less than max values.

    Args:
        bbox: Tuple of (minx, miny, maxx, maxy) coordinates.

    Returns:
        Validated bounding box tuple with float values.

    Raises:
        ValueError: If the bounding box is malformed or out of range.
    """
    # ensure four numeric values are provided
    if len(bbox) != 4:
        raise ValueError(
            "Bounding box must be a tuple of four values: (minx, miny, maxx, maxy)."
        )

    # ensure all coordinates are numeric, and if so convert to float
    if not all(isinstance(coord, (int, float)) for coord in bbox):
        raise ValueError(
            "All coordinates in the bounding box must be numeric (int or float)."
        )
    else:
        bbox = tuple(float(coord) for coord in bbox)

    # ensure minx < maxx and miny < maxy
    if bbox[0] >= bbox[2] or bbox[1] >= bbox[3]:
        raise ValueError(
            "Invalid bounding box coordinates: ensure that minx < maxx and miny < maxy."
        )

    # ensure coordinates are within valid ranges
    if not (
        -180.0 <= bbox[0] <= 180.0
        and -90.0 <= bbox[1] <= 90.0
        and -180.0 <= bbox[2] <= 180.0
        and -90.0 <= bbox[3] <= 90.0
    ):
        raise ValueError(
            "Bounding box coordinates must be within valid ranges: minx/maxx [-180, 180], miny/maxy [-90, 90]."
        )

    # If all checks pass, the bounding box is valid
    return bbox