Skip to content

Python API

get_features(output_feature_class, overture_type, bbox, connect_timeout=None, request_timeout=None)

Retrieve data from Overture Maps and save it as an ArcGIS Feature Class.

Parameters:

Name Type Description Default
output_feature_class Union[str, Path]

Path to the output feature class.

required
overture_type str

Overture feature type to retrieve.

required
bbox tuple[float, float, float, float]

Bounding box to filter the data. Format: (minx, miny, maxx, maxy).

required
connect_timeout int

Optional timeout in seconds for establishing a connection to the AWS S3.

None
request_timeout int

Optional timeout in seconds for waiting for a response from the AWS S3.

None

Returns:

Type Description
Path

Path to the created feature class.

Source code in src/overture_to_arcgis/__main__.py
def get_features(
    output_feature_class: Union[str, Path],
    overture_type: str,
    bbox: tuple[float, float, float, float],
    connect_timeout: int = None,
    request_timeout: int = None,
) -> Path:
    """
    Retrieve data from Overture Maps and save it as an ArcGIS Feature Class.

    Args:
        output_feature_class: Path to the output feature class.
        overture_type: Overture feature type to retrieve.
        bbox: Bounding box to filter the data. Format: (minx, miny, maxx, maxy).
        connect_timeout: Optional timeout in seconds for establishing a connection to the AWS S3.
        request_timeout: Optional timeout in seconds for waiting for a response from the AWS S3.

    Returns:
        Path to the created feature class.
    """
    # ensure arcpy is available
    if find_spec("arcpy") is None:
        raise EnvironmentError("ArcPy is required for get_as_feature_class.")

    # validate the bounding box
    bbox = validate_bounding_box(bbox)

    # get a temporary geodatabase to hold the batch feature classes
    tmp_gdb = get_temp_gdb()

    # list to hold the feature classes
    fc_list = []

    # get the record batch generator
    batches = get_record_batches(overture_type, bbox, connect_timeout, request_timeout)

    # iterate through the record batches to see if we have any data
    for btch_idx, batch in enumerate(batches):
        # warn of no data found for the batch
        if batch.num_rows == 0:
            logger.warning(
                f"No '{overture_type}' data found for the specified bounding box: {bbox}. No temporary feature "
                f"class will be created for this batch."
            )

        # if there is data to work with, process it
        else:
            # report progress
            if logger.level <= logging.DEBUG:
                tbl_cnt = batch.num_rows
                logger.debug(
                    f"In batch {btch_idx:,} fetched {tbl_cnt:,} rows of '{overture_type}' data from Overture Maps."
                )

            # create the temporary feature class path
            tmp_fc = tmp_gdb / f"overture_{overture_type}_{btch_idx:04d}"

            # convert the batch to a feature class
            table_to_features(batch, output_features=tmp_fc)

            # add the feature class to the list if there is data to work with
            fc_list.append(str(tmp_fc))

    # merge the feature classes into a single feature class if any data was found
    if len(fc_list) > 0:
        arcpy.management.Merge(fc_list, str(output_feature_class))
    else:
        logger.warning(
            "No data found for the specified bounding box. No output feature class created."
        )

    # cleanup temporary data - remove temporary geodatabase using arcpy to avoid any locks
    arcpy.management.Delete(str(tmp_gdb))

    return output_feature_class

get_spatially_enabled_dataframe(overture_type, bbox, connect_timeout=None, request_timeout=None)

Retrieve data from Overture Maps as an ArcGIS spatially enabled Pandas DataFrame.

Note

To see available overture types, use arcgis_overture.utils.get_all_overture_types().

Parameters:

Name Type Description Default
overture_type str

Overture feature type to retrieve.

required
bbox tuple[float, float, float, float]

Bounding box to filter the data. Format: (minx, miny, maxx, maxy).

required
connect_timeout int

Optional timeout in seconds for establishing a connection to the Overture Maps service.

None
request_timeout int

Optional timeout in seconds for waiting for a response from the Overture Maps service.

None

Returns:

Type Description
DataFrame

A spatially enabled pandas DataFrame containing the requested Overture Maps data.

Source code in src/overture_to_arcgis/__main__.py
def get_spatially_enabled_dataframe(
    overture_type: str,
    bbox: tuple[float, float, float, float],
    connect_timeout: int = None,
    request_timeout: int = None,
) -> pd.DataFrame:
    """
    Retrieve data from Overture Maps as an
    [ArcGIS spatially enabled Pandas DataFrame](https://developers.arcgis.com/python/latest/guide/introduction-to-the-spatially-enabled-dataframe/).

    !!! note

        To see available overture types, use `arcgis_overture.utils.get_all_overture_types()`.

    Args:
        overture_type: Overture feature type to retrieve.
        bbox: Bounding box to filter the data. Format: (minx, miny, maxx, maxy).
        connect_timeout: Optional timeout in seconds for establishing a connection to the Overture Maps service.
        request_timeout: Optional timeout in seconds for waiting for a response from the Overture Maps service.

    Returns:
        A spatially enabled pandas DataFrame containing the requested Overture Maps data.
    """
    # validate the overture type
    available_types = get_all_overture_types()
    if overture_type not in available_types:
        raise ValueError(
            f"Invalid overture type: {overture_type}. Valid types are: {available_types}"
        )

    # validate the bounding box
    bbox = validate_bounding_box(bbox)

    # get the record batch generator
    batches = get_record_batches(overture_type, bbox, connect_timeout, request_timeout)

    # initialize the dataframe and geometry column name
    df = None

    # iterate the batches
    for idx, batch in enumerate(batches):
        # if the batch has any rows and the dataframe is not yet initialized
        if batch.num_rows > 0 and df is None:
            # create the initial dataframe
            df = table_to_spatially_enabled_dataframe(batch)

            # save the geometry column name
            geom_col = df.spatial.name

        elif batch.num_rows > 0:
            # get the batch as a spatially enabled dataframe
            tmb_df = table_to_spatially_enabled_dataframe(batch)

            # append the batch dataframe to the main dataframe
            df = pd.concat([df, tmb_df], ignore_index=True)

    # if data found, perform post processing
    if isinstance(df, pd.DataFrame):
        # set the geometry column using the ArcGIS GeoAccessor to get a Spatially Enabled DataFrame
        df.spatial.set_geometry(geom_col, sr=4326, inplace=True)

        # reset the index so it makes sense after concatenation
        df.reset_index(drop=True, inplace=True)

        # log the number of rows fetched
        df_cnt = df.shape[0]
        logger.debug(
            f"Fetched {df_cnt} rows of '{overture_type}' data from Overture Maps."
        )

    # if no data found, log a warning and create an empty dataframe to return
    else:
        df = pd.DataFrame()
        logger.warning(
            f"No '{overture_type}' data found for the specified bounding box: {bbox}"
        )

    return df

Module overture_to_arcgis.utils

add_alternate_category_field(features)

Add an 'alternate_category' field to the input features if it does not already exist, and calculate from the 'categories' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_alternate_category_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add an 'alternate_category' field to the input features if it does not already exist, and calculate from
    the 'categories' field.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # check if 'alternate_category' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]

    # add 'alternate_category' field
    if "alternate_category" not in field_names:
        arcpy.management.AddField(
            in_table=features,
            field_name="alternate_category",
            field_type="TEXT",
            field_length=255,
        )
        logger.debug("Added 'alternate_category' field to features.")

    # calculate 'alternate_category' from 'categories' field
    with arcpy.da.UpdateCursor(
        features, ["categories", "alternate_category"]
    ) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the categories value and extract alternate category
            categories_value = row[0]

            # set the alternate category if categories_value is valid
            if (
                categories_value is not None
                and isinstance(categories_value, str)
                and len(categories_value) > 0
                and not categories_value.strip() == "None"
                and not categories_value.strip().lower() == "null"
            ):
                # parse the categories value into a dictionary
                categories_dict = json.loads(categories_value)

                # extract the alternate category
                alternate_category = categories_dict.get("alternate")

                # convert to string if it is a list
                if isinstance(alternate_category, list):
                    alternate_category = ", ".join(alternate_category)

                # ensure the alternate category is not some variation of None
                if alternate_category in [None, "None", "none", ""]:
                    alternate_category = None

                # set the alternate category in the row
                row[1] = alternate_category

                # update the row
                update_cursor.updateRow(row)

    return

add_boolean_access_restrictions_fields(features, access_field='access_restrictions')

Add boolean access restriction fields to the input features based on the access_restrictions field.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
access_field str

The name of the access restrictions field.

'access_restrictions'
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_boolean_access_restrictions_fields(
    features: Union[str, Path, arcpy._mp.Layer],
    access_field: str = "access_restrictions",
) -> None:
    """
    Add boolean access restriction fields to the input features based on the access_restrictions field.

    Args:
        features: The input feature layer or feature class.
        access_field: The name of the access restrictions field.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # ensure the features exist
    if not arcpy.Exists(features):
        raise ValueError("Input features do not exist.")

    # first pass to collect all unique keys
    unique_keys = set()
    with arcpy.da.SearchCursor(features, [access_field]) as cursor:
        for row in cursor:
            if row[0] is not None:
                bool_dict = flatten_dict_to_bool_keys(row[0])
                unique_keys.update(bool_dict.keys())

    # create a list of fields to add
    add_fields = sorted([[slugify(key), "SHORT"] for key in unique_keys])

    # add fields to feature class
    arcpy.management.AddFields(features, add_fields)

    logger.info(
        "Added boolean access restriction fields to features: "
        + ", ".join([f[0] for f in add_fields])
    )

    # second pass to populate the fields
    field_names = [slugify(key) for key in unique_keys]

    with arcpy.da.UpdateCursor(features, [access_field] + field_names) as cursor:
        for row in cursor:
            bool_dict = {}
            if row[0] is not None:
                bool_dict = flatten_dict_to_bool_keys(row[0])
            for idx, key in enumerate(unique_keys):
                row[idx + 1] = bool_dict.get(key, 0)
            cursor.updateRow(row)

    return

add_h3_indices(features, resolution=9, h3_field=None)

Add an H3 index field to the input features based on their geometry.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
resolution int

The H3 resolution to use for indexing.

9
h3_field Optional[str]

The name of the H3 index field to add.

None
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_h3_indices(
    features: Union[str, Path, arcpy._mp.Layer],
    resolution: int = 9,
    h3_field: Optional[str] = None,
) -> None:
    """
    Add an H3 index field to the input features based on their geometry.

    Args:
        features: The input feature layer or feature class.
        resolution: The H3 resolution to use for indexing.
        h3_field: The name of the H3 index field to add.
    """
    if find_spec("h3") is None:
        raise ImportError(
            "The 'h3' library is not installed. Please install it to use this function."
        )

    import h3

    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # validate resolution
    if not isinstance(resolution, int) or not (0 <= resolution <= 15):
        raise ValueError(
            "Invalid H3 resolution. Please choose a resolution between 0 and 15."
        )

    # if h3_field is None, set to default
    if h3_field is None:
        h3_field = f"h3_{resolution:02d}"

    # check if h3_field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if h3_field not in field_names:
        # add h3_field
        arcpy.management.AddField(
            in_table=features,
            field_name=h3_field,
            field_type="TEXT",
            field_length=20,
        )

        logger.debug(f"Added '{h3_field}' field to features.")

    # calculate H3 indices from geometry
    with arcpy.da.UpdateCursor(features, ["SHAPE@XY", h3_field]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the geometry coordinates
            x, y = row[0]

            # get the H3 index for the centroid
            h3_index = h3.latlng_to_cell(y, x, resolution)

            # set the H3 index in the row
            row[1] = h3_index

            # update the row
            update_cursor.updateRow(row)

    return

add_overture_taxonomy_fields(features, single_category_field=None)

Add 'category_' fields to the input features based on the Overture taxonomy based on the category provided for each row. The category for each row can be specified using the single_category_field parameter.

Note

If a single category field is not provided, the function will attempt to read the value for the primary key from string JSON in the categories field, if this field exists.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
single_category_field Optional[str]

The field name containing a single category.

None
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_overture_taxonomy_fields(
    features: Union[str, Path, arcpy._mp.Layer],
    single_category_field: Optional[str] = None,
) -> None:
    """
    Add 'category_<n>' fields to the input features based on the Overture taxonomy based on the category provided for each row.
    The category for each row can be specified using the `single_category_field` parameter.

    !!! note
        If a single category field is not provided, the function will attempt to read the value for the `primary` key from
        string JSON in the `categories` field, if this field exists.

    Args:
        features: The input feature layer or feature class.
        single_category_field: The field name containing a single category.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get a list of existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # if single category not provided, attempt to use the 'categories' field to extract the primary category
    if single_category_field is None:
        # ensure the 'categories' field exists
        if "categories" not in field_names:
            raise ValueError(
                "Field for category extraction, 'categories', does not exist in features."
            )

        # create a generator to extract categories from the 'categories' field
        categories_gen = (
            json.loads(row[0]).get("primary")
            for row in arcpy.da.SearchCursor(features, ["categories"])
        )

        # root name for the taxonomy fields
        root_name = "primary_category"

    # if single category field is provided
    else:
        # ensure the single category field exists
        if single_category_field not in field_names:
            raise ValueError(
                f"Provided single category field '{single_category_field}' does not exist in features."
            )

        # create a generator to extract categories from the single category field
        categories_gen = (
            row[0] for row in arcpy.da.SearchCursor(features, [single_category_field])
        )

        # root name for the taxonomy fields
        root_name = slugify(single_category_field)

    # get taxonomy dataframe
    taxonomy_df = get_overture_taxonomy_dataframe()

    # get the max lengths for each category field
    max_lengths = get_overture_taxonomy_category_field_max_lengths(taxonomy_df)

    # set the index to category_code for easier lookup
    taxonomy_df.set_index("category_code", inplace=True)

    # only keep the category columns in the taxonomy dataframe
    taxonomy_df = taxonomy_df.loc[
        :, [col for col in taxonomy_df.columns if col.startswith("category_")]
    ]

    # replace category in the field names with the root name
    taxonomy_df.columns = [
        col.replace("category_", f"{root_name}_") for col in taxonomy_df.columns
    ]
    max_lengths = {
        col.replace("category_", f"{root_name}_"): max_len
        for col, max_len in max_lengths.items()
    }

    # iterate through the maximum lengths and add fields to the features
    for col, max_len in max_lengths.items():
        # add the field to the features
        arcpy.management.AddField(
            in_table=features,
            field_name=col,
            field_type="TEXT",
            field_length=max_len,
        )

        logger.info(f"Added field '{col}' with length {max_len} to features.")

    # calculate the category code fields from the categories generator
    with arcpy.da.UpdateCursor(features, list(max_lengths.keys())) as update_cursor:
        # iterate through the rows and categories
        for row, category in zip(update_cursor, categories_gen):
            # set the category fields if category is valid
            if (
                category is not None
                and isinstance(category, str)
                and len(category) > 0
                and not category.strip() == "None"
                and not category.strip().lower() == "null"
            ):
                # get the taxonomy row for the category
                taxonomy_row = taxonomy_df.loc[category]

                # if a taxonomy row is found, set the category fields
                if not taxonomy_row.empty:
                    # iterate through the category fields and set their values
                    for idx, col in enumerate(max_lengths.keys()):
                        row[idx] = taxonomy_row.loc[col]

                    # update the row
                    update_cursor.updateRow(row)

    return

add_primary_category_field(features)

Add a 'primary_category' field to the input features if it does not already exist, and calculate from the 'categories' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_primary_category_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'primary_category' field to the input features if it does not already exist, and calculate from
    the 'categories' field.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure source field 'categories' exists
    if "categories" not in field_names:
        raise ValueError("Source field 'categories' does not exist in features.")

    # check if 'primary_category' field exists
    if "primary_category" not in field_names:
        # add 'primary_category' field
        arcpy.management.AddField(
            in_table=features,
            field_name="primary_category",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'primary_category' field to features.")

    # calculate 'primary_category' from 'categories' field
    with arcpy.da.UpdateCursor(
        features, ["categories", "primary_category"]
    ) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the categories value and extract primary category
            categories_value = row[0]

            # set the primary category if categories_value is valid
            if (
                categories_value is not None
                and isinstance(categories_value, str)
                and len(categories_value) > 0
                and not categories_value.strip() == "None"
                and not categories_value.strip().lower() == "null"
            ):
                # parse the categories value into a dictionary
                categories_dict = json.loads(categories_value)

                # extract the primary category
                primary_category = categories_dict.get("primary")

                # ensure the primary category is not some variation of None
                if primary_category in [None, "None", "none", ""]:
                    primary_category = None

                # set the primary category in the row
                row[1] = primary_category

                # update the row
                update_cursor.updateRow(row)

    return

add_primary_name(features)

Add a 'primary_name' field to the input features if it does not already exist, and calculate from

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_primary_name(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'primary_name' field to the input features if it does not already exist, and calculate from

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure source field 'names' exists
    if "names" not in field_names:
        raise ValueError("Source field 'names' does not exist in features.")

    # check if 'primary_name' field exists
    if "primary_name" not in field_names:
        arcpy.management.AddField(
            in_table=features,
            field_name="primary_name",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'primary_name' field to features.")

    # calculate 'primary_name' from 'name' field
    with arcpy.da.UpdateCursor(features, ["names", "primary_name"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the name value and extract primary name
            name_str = row[0]

            # set the primary name if name_value is populated
            if (
                name_str is not None
                and isinstance(name_str, str)
                and len(name_str) > 0
                and not name_str.strip() == "None"
                and not name_str.strip().lower() == "null"
            ):
                # parse the name value into a dictionary
                name_dict = json.loads(name_str)

                # extract the primary name
                primary_name = name_dict.get("primary")

                # set the primary name in the row
                row[1] = primary_name

                # update the row
                update_cursor.updateRow(row)

                logger.debug(f"Set 'primary_name' to '{primary_name}' for feature.")

    return

add_trail_field(features)

Add a 'trail' boolean field to the input features if it does not already exist. These features are those with a class of 'track', 'path', 'footway', 'trail' or 'cycleway' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_trail_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'trail' boolean field to the input features if it does not already exist. These features
    are those with a class of 'track', 'path', 'footway', 'trail' or 'cycleway' field.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get all field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure source field 'class' exists
    if "class" not in field_names:
        raise ValueError("Source field 'class' does not exist in features.")

    # check if 'trail_field' field exists
    if "trail" not in field_names:
        # add 'trail_field' field
        arcpy.management.AddField(
            in_table=features,
            field_name="trail",
            field_type="SHORT",
        )

        logger.debug("Added 'trail_field' field to features.")

    # list of classes to search for
    trail_classes = ["track", "path", "footway", "trail", "cycleway"]

    # calculate 'trail_field' from 'attributes' field
    with arcpy.da.UpdateCursor(features, ["class", "trail"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the attributes value and extract trail field
            class_value = row[0]

            # set the trail field if class_value is one of trail classes
            if class_value in trail_classes:
                # set the trail field in the row
                row[1] = 1

                # update the row
                update_cursor.updateRow(row)

    return

add_website_field(features)

Add a 'website' field to the input features if it does not already exist, and calculate from the 'contact_info' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_website_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'website' field to the input features if it does not already exist, and calculate from
    the 'contact_info' field.

    Args:
        features: The input feature layer or feature class.
    """
    # Ensure features is a string path if it's a Path or Layer
    if isinstance(features, Path):
        features = str(features)
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # check if 'website' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if "website" not in field_names:
        # add 'website' field
        arcpy.management.AddField(
            in_table=features,
            field_name="website",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'website' field to features.")

    # calculate 'website' from 'websites' field
    with arcpy.da.UpdateCursor(features, ["websites", "website"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the websites value and extract website
            website_value = row[0]

            # set the website if website_value is valid
            if (
                website_value is not None
                and isinstance(website_value, str)
                and len(website_value) > 0
                and not website_value.strip() == "None"
                and not website_value.strip().lower() == "null"
            ):
                # parse the website value into a list
                website_lst = json.loads(website_value)

                # extract the first website from the list
                if isinstance(website_lst, list) and len(website_lst) > 0:
                    website = website_lst[0]

                    # only use the website if it is less than 255 characters
                    if (
                        isinstance(website, str)
                        and website.lower().strip() != "none"
                        and 0 < len(website) <= 255
                    ):
                        row[1] = website

                        # update the row
                        update_cursor.updateRow(row)

                    else:
                        logger.warning(
                            f"Website exceeds 255 characters and will not be set for the feature: '{website}'"
                        )

    return

get_all_overture_types(release=None, s3=None)

Returns a list of all available Overture dataset types for a given release.

Parameters:

Name Type Description Default
release Optional[str]

Optional release version. If not provided, the most current release will be used.

None
s3 Optional[S3FileSystem]

Optional pre-configured S3 filesystem. If not provided, an anonymous S3 filesystem will be created.

None

Returns:

Type Description
list[str]

List of available overture types for the release.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_all_overture_types(
    release: Optional[str] = None, s3: Optional[fs.S3FileSystem] = None
) -> list[str]:
    """
    Returns a list of all available Overture dataset types for a given release.

    Args:
        release: Optional release version. If not provided, the most current
            release will be used.
        s3: Optional pre-configured S3 filesystem. If not provided, an anonymous
            S3 filesystem will be created.

    Returns:
        List of available overture types for the release.
    """
    # if no release provided, get the most current one
    if release is None:
        release = get_current_release()

    # get the type theme map
    type_theme_map = get_type_theme_map(release=release, s3=s3)

    # get the types from the mapping
    types = list(type_theme_map.keys())

    logger.debug(f"Available types for release {release}: {types}")

    return types

get_current_release()

Returns the most current Overture dataset release string.

Returns:

Type Description
str

Most current release string.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_current_release() -> str:
    """
    Returns the most current Overture dataset release string.

    Returns:
        Most current release string.
    """
    # retrieve the list of releases
    releases = get_release_list()

    # make sure there is at least one release
    if not releases:
        raise RuntimeError("No Overture dataset releases found.")

    # get the most current release by sorting the list
    current_release = sorted(releases)[-1]

    logger.debug(f"Current release: {current_release}")

    return current_release

get_geometry_column(table)

Get the name of the geometry column from the PyArrow Table or RecordBatch metadata.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
str

Name of the geometry column.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_geometry_column(table: Union[pa.Table, pa.RecordBatch]) -> str:
    """
    Get the name of the geometry column from the PyArrow Table or RecordBatch metadata.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        Name of the geometry column.
    """
    geo_meta = table.schema.metadata.get(b"geo")
    if geo_meta is None:
        raise ValueError("No geometry metadata found in the Overture Maps data.")
    geo_meta = json.loads(geo_meta.decode("utf-8"))
    geom_col = geo_meta.get("primary_column")
    if geom_col is None or geom_col not in table.column_names:
        raise ValueError(
            "No valid primary_geometry column defined in the Overture Maps metadata."
        )
    return geom_col

get_layers_for_unique_values(input_features, field_name, arcgis_map=None)

Create layers from unique values in a specified field of the input features.

Parameters:

Name Type Description Default
input_features Union[Layer, str, Path]

The input feature layer or feature class.

required
field_name str

The field name to get unique values from.

required
arcgis_map Optional[Map]

The ArcGIS map object to add the layers to.

None

Returns:

Type Description
list[Layer]

A list of ArcGIS layers created from the unique values.

Source code in src/overture_to_arcgis/utils/_arcgis.py
def get_layers_for_unique_values(
    input_features: Union[arcpy._mp.Layer, str, Path],
    field_name: str,
    arcgis_map: Optional[arcpy._mp.Map] = None,
) -> list[arcpy._mp.Layer]:
    """
    Create layers from unique values in a specified field of the input features.

    Args:
        input_features: The input feature layer or feature class.
        field_name: The field name to get unique values from.
        arcgis_map: The ArcGIS map object to add the layers to.

    Returns:
        A list of ArcGIS layers created from the unique values.
    """
    # get unique values using a search cursor to generate value into a set
    unique_values = set(
        (val[0] for val in arcpy.da.SearchCursor(input_features, [field_name]))
    )

    # list to hydrate with created layers
    layers = []

    # iterate unique values
    for value in unique_values:
        # create layer name
        layer_name = f"{field_name}_{value}"

        # create definition query
        definition_query = (
            f"{field_name} = '{value}'"
            if isinstance(value, str)
            else f"{field_name} = {value}"
        )

        # use definition query to create layer object
        layer = arcpy.management.MakeFeatureLayer(
            in_features=input_features,
            out_layer=layer_name,
            where_clause=definition_query,
        )[0]

        # if the map is provided, add the layer to the map
        if arcgis_map:
            arcgis_map.addLayer(layer)
        layers.append(layer)

    return layers

get_logger(level='INFO', logger_name=None, logfile_path=None, log_format='%(asctime)s | %(name)s | %(levelname)s | %(message)s', propagate=True, add_stream_handler=True, add_arcpy_handler=False)

Get Python :class:Logger<logging.Logger> configured to provide stream, file or, if available, ArcPy output. The way the method is set up, logging will be routed through ArcPy messaging using :class:ArcpyHandler if ArcPy is available. If ArcPy is not available, messages will be sent to the console using a :class:StreamHandler<logging.StreamHandler>. Next, if the logfile_path is provided, log messages will also be written to the provided path to a logfile using a :class:FileHandler<logging.FileHandler>.

Valid log_level inputs include: * DEBUG - Detailed information, typically of interest only when diagnosing problems. * INFO - Confirmation that things are working as expected. * WARNING or WARN - An indication that something unexpected happened, or indicative of some problem in the near future (e.g. "disk space low"). The software is still working as expected. * ERROR - Due to a more serious problem, the software has not been able to perform some function. * CRITICAL - A serious error, indicating that the program itself may be unable to continue running.

Parameters:

Name Type Description Default
level Optional[Union[str, int]]

Logging level to use. Default is 'INFO'.

'INFO'
logger_name Optional[str]

Name of the logger. If None, the root logger is used.

None
log_format Optional[str]

Format string for the logging messages. Default is '%(asctime)s | %(name)s | %(levelname)s | %(message)s'.

'%(asctime)s | %(name)s | %(levelname)s | %(message)s'
propagate bool

If True, log messages are passed to the handlers of ancestor loggers. Default is False.

True
logfile_path Union[Path, str]

Where to save the logfile if file output is desired.

None
add_stream_handler bool

If True, add a StreamHandler to route logging to the console. Default is True.

True
add_arcpy_handler bool

If True and ArcPy is available, add the ArcpyHandler to route logging through ArcPy messaging. Default is False.

False
configure_logging('DEBUG')
logging.debug('nauseatingly detailed debugging message')
logging.info('something actually useful to know')
logging.warning('The sky may be falling')
logging.error('The sky is falling.)
logging.critical('The sky appears to be falling because a giant meteor is colliding with the earth.')
Source code in src/overture_to_arcgis/utils/_logging.py
def get_logger(
    level: Optional[Union[str, int]] = "INFO",
    logger_name: Optional[str] = None,
    logfile_path: Union[Path, str] = None,
    log_format: Optional[str] = "%(asctime)s | %(name)s | %(levelname)s | %(message)s",
    propagate: bool = True,
    add_stream_handler: bool = True,
    add_arcpy_handler: bool = False,
) -> logging.Logger:
    """
    Get Python :class:`Logger<logging.Logger>` configured to provide stream, file or, if available, ArcPy output.
    The way the method is set up, logging will be routed through ArcPy messaging using :class:`ArcpyHandler` if
    ArcPy is available. If ArcPy is *not* available, messages will be sent to the console using a
    :class:`StreamHandler<logging.StreamHandler>`. Next, if the `logfile_path` is provided, log messages will also
    be written to the provided path to a logfile using a :class:`FileHandler<logging.FileHandler>`.

    Valid `log_level` inputs include:
    * `DEBUG` - Detailed information, typically of interest only when diagnosing problems.
    * `INFO` - Confirmation that things are working as expected.
    * `WARNING` or ``WARN`` -  An indication that something unexpected happened, or indicative of some problem in the
        near future (e.g. "disk space low"). The software is still working as expected.
    * `ERROR` - Due to a more serious problem, the software has not been able to perform some function.
    * `CRITICAL` - A serious error, indicating that the program itself may be unable to continue running.

    Args:
        level: Logging level to use. Default is `'INFO'`.
        logger_name: Name of the logger. If `None`, the root logger is used.
        log_format: Format string for the logging messages. Default is `'%(asctime)s | %(name)s | %(levelname)s | %(message)s'`.
        propagate: If `True`, log messages are passed to the handlers of ancestor loggers. Default is `False`.
        logfile_path: Where to save the logfile if file output is desired.
        add_stream_handler: If `True`, add a `StreamHandler` to route logging to the console. Default is `True`.
        add_arcpy_handler: If `True` and ArcPy is available, add the `ArcpyHandler` to route logging through
            ArcPy messaging. Default is `False`.

    ``` python
    configure_logging('DEBUG')
    logging.debug('nauseatingly detailed debugging message')
    logging.info('something actually useful to know')
    logging.warning('The sky may be falling')
    logging.error('The sky is falling.)
    logging.critical('The sky appears to be falling because a giant meteor is colliding with the earth.')
    ```

    """
    # ensure valid logging level
    log_str_lst = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "WARN", "FATAL"]
    log_int_lst = [0, 10, 20, 30, 40, 50]

    if not isinstance(level, (str, int)):
        raise ValueError(
            "You must define a specific logging level for log_level as a string or integer."
        )
    elif isinstance(level, str) and level not in log_str_lst:
        raise ValueError(
            f'The log_level must be one of {log_str_lst}. You provided "{level}".'
        )
    elif isinstance(level, int) and level not in log_int_lst:
        raise ValueError(
            f"If providing an integer for log_level, it must be one of the following, {log_int_lst}."
        )

    # get default logger and set logging level at the same time
    logger = logging.getLogger(logger_name)
    logger.setLevel(level=level)

    # clear handlers
    logger.handlers.clear()

    # configure formatting
    log_frmt = logging.Formatter(log_format)

    # set propagation
    logger.propagate = propagate

    # make sure at least a stream handler is present
    if add_stream_handler:
        # create and add the stream handler
        sh = logging.StreamHandler()
        sh.setFormatter(log_frmt)
        logger.addHandler(sh)

    # if in an environment with ArcPy, add handler to bubble logging up to ArcGIS through ArcPy
    if add_arcpy_handler:
        ah = ArcpyHandler()
        ah.setFormatter(log_frmt)
        logger.addHandler(ah)

    # if a path for the logfile is provided, log results to the file
    if logfile_path is not None:
        # ensure the full path exists
        if not logfile_path.parent.exists():
            logfile_path.parent.mkdir(parents=True)

        # create and add the file handler
        fh = logging.FileHandler(str(logfile_path))
        fh.setFormatter(log_frmt)
        logger.addHandler(fh)

    return logger

get_record_batches(overture_type, bbox=None, connect_timeout=None, request_timeout=None)

Return a pyarrow RecordBatchReader for the desired bounding box and S3 path.

Parameters:

Name Type Description Default
overture_type str

Overture feature type to load.

required
bbox Optional[Tuple[float, float, float, float]]

Optional bounding box for data fetch (xmin, ymin, xmax, ymax).

None
connect_timeout Optional[float]

Optional connection timeout in seconds.

None
request_timeout Optional[float]

Optional request timeout in seconds.

None

Yields:

Type Description
RecordBatch

pa.RecordBatch: Record batches with the requested data.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_record_batches(
    overture_type: str,
    bbox: Optional[Tuple[float, float, float, float]] = None,
    connect_timeout: Optional[float] = None,
    request_timeout: Optional[float] = None,
) -> Generator[pa.RecordBatch, None, None]:
    """
    Return a pyarrow RecordBatchReader for the desired bounding box and S3 path.

    Args:
        overture_type: Overture feature type to load.
        bbox: Optional bounding box for data fetch (xmin, ymin, xmax, ymax).
        connect_timeout: Optional connection timeout in seconds.
        request_timeout: Optional request timeout in seconds.

    Yields:
        pa.RecordBatch: Record batches with the requested data.
    """
    # create connection to the S3 filesystem
    s3 = fs.S3FileSystem(
        anonymous=True,
        region="us-west-2",
        connect_timeout=connect_timeout,
        request_timeout=request_timeout,
    )

    # get the overture type to theme mapping
    type_theme_map = get_type_theme_map(s3=s3)

    # validate the overture type
    available_types = type_theme_map.keys()
    if overture_type not in available_types:
        raise ValueError(
            f"Invalid overture type: {overture_type}. Available types are: {list(available_types)}"
        )

    # validate the bounding box coordinates
    bbox = validate_bounding_box(bbox)

    # extract the coordinates from the bounding box and create the filter
    xmin, ymin, xmax, ymax = bbox
    dataset_filter = (
        (pc.field("bbox", "xmin") < xmax)
        & (pc.field("bbox", "xmax") > xmin)
        & (pc.field("bbox", "ymin") < ymax)
        & (pc.field("bbox", "ymax") > ymin)
    )

    # get the most current release version
    release = get_current_release()

    # create the dataset path
    s3_pth = get_dataset_path(overture_type, release)

    # create the PyArrow dataset
    dataset = ds.dataset(s3_pth, filesystem=s3)

    # get the record batches with the extent filter applied
    batches = dataset.to_batches(filter=dataset_filter)

    # iterate through the batches and yield with geoarrow metadata
    for idx, batch in enumerate(batches):
        # if this is the first batch, and it's empty, warn of no data found
        if idx == 0 and batch.num_rows == 0:
            warn(
                f"No '{overture_type}' data found for the specified bounding box: {bbox}"
            )

        # get the geometry field
        geo_fld_idx = batch.schema.get_field_index("geometry")
        geo_fld = batch.schema.field(geo_fld_idx)

        # set the geoarrow metadata on the geometry field
        geoarrow_geo_fld = geo_fld.with_metadata(
            {b"ARROW:extension:name": b"geoarrow.wkb"}
        )

        # create an updated schema with the correct metadata for the geometry field
        geoarrow_schema = batch.schema.set(geo_fld_idx, geoarrow_geo_fld)

        # replace the batch schema with the updated geoarrow schema
        batch = batch.replace_schema_metadata(geoarrow_schema.metadata)

        # yield the batch to the caller
        yield batch

get_release_list(s3=None)

Returns a list of all available Overture dataset releases.

Parameters:

Name Type Description Default
s3 Optional[S3FileSystem]

Optional pre-configured S3 filesystem. If not provided, an anonymous S3 filesystem will be created.

None
Source code in src/overture_to_arcgis/utils/__main__.py
def get_release_list(s3: Optional[fs.S3FileSystem] = None) -> list[str]:
    """
    Returns a list of all available Overture dataset releases.

    Args:
        s3: Optional pre-configured S3 filesystem. If not provided, an anonymous
            S3 filesystem will be created.
    """
    # create S3 filesystem if not provided
    if s3 is None:
        s3 = fs.S3FileSystem(anonymous=True, region="us-west-2")

    # create fileselector
    selector = fs.FileSelector(
        base_dir="overturemaps-us-west-2/release/", recursive=False
    )

    # get the most current releases from S3 as FileInfo objects
    file_infos = s3.get_file_info(selector)

    # extract the directory names from the FileInfo objects
    directories = [
        info.path for info in file_infos if info.type == fs.FileType.Directory
    ]

    # get the directory names only (last part of the path)
    releases = [dir_path.split("/")[-1] for dir_path in directories]

    # for each of the releases, ensure the releas has data (can happen if new release is still being loaded)
    releases = [rel for rel in releases if len(get_themes(rel, s3)) >= 5]

    logger.debug(f"Available releases: {releases}")

    return releases

get_temp_gdb()

Get a temporary File Geodatabase path.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_temp_gdb() -> Path:
    """Get a temporary File Geodatabase path."""
    tmp_dir = get_temp_dir()
    tmp_gdb = tmp_dir / "tmp_data.gdb"
    if not tmp_gdb.exists():
        if has_arcpy:
            import arcpy

            arcpy.management.CreateFileGDB(str(tmp_dir), tmp_gdb.name)
        else:
            raise EnvironmentError("arcpy is required to create a File Geodatabase.")
    return tmp_gdb

remove_rail_features(features)

Remove rail features from the input features based on the 'subtype' field.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def remove_rail_features(features: Union[str, Path, arcpy._mp.Layer]) -> None:
    """
    Remove rail features from the input features based on the 'subtype' field.

    Args:
        features: The input feature layer or feature class.
    """
    subtype_field = "subtype"

    # if features is path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # ensure subtype field is in schema
    if subtype_field not in [f.name for f in arcpy.ListFields(features)]:
        raise ValueError(
            f"Field '{subtype_field}' does not exist in features. Cannot remove rail features."
        )

    # counter for deleted features
    del_cnt = 0

    # use an update cursor to delete rail features
    with arcpy.da.UpdateCursor(features, ["subtype"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # check if subtype is 'rail'
            subtype = row[0]
            if (
                subtype is not None
                and isinstance(subtype, str)
                and subtype.lower() == "rail"
            ):
                # delete the row
                update_cursor.deleteRow()
                del_cnt += 1

    logger.info(f"Deleted {del_cnt:,} rail features.")

    return

split_into_subclass_features(features)

Split features into subsegments based on the definition in the 'subclass_rules' field.

Example
  1. [{"value": "driveway", "between": null}] -> same geometry with 'subclass' field populated with 'driveway'
  2. [{"value": "driveway", "between": [0.772783061, 1.0]}] -> two features replacing the original one feature, the first subsegment from 0-77.2783061% with a null subclass and a second subsegment from 77.28% to 100% of geometry with 'subclass' field populated with 'driveway'
  3. [{"value": "driveway", "between": [0.0, 0.5]}, {"value": "alley", "between": [0.5, 1.0]}] -> two subsegments with 'subclass' field populated accordingly

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required

warning !!! This function modifies the input features in place by adding new features and deleting the original ones.

Source code in src/overture_to_arcgis/utils/_arcgis.py
def split_into_subclass_features(features: Union[str, Path, arcpy._mp.Layer]) -> None:
    """
    Split features into subsegments based on the definition in the 'subclass_rules' field.

    Example:
        1. [{"value": "driveway", "between": null}] -> same geometry with 'subclass' field populated with 'driveway'
        2. [{"value": "driveway", "between": [0.772783061, 1.0]}] -> two features replacing the original one feature,
            the first subsegment from 0-77.2783061% with a null subclass and a second subsegment from 77.28% to 100% of
            geometry with 'subclass' field populated with 'driveway'
        3. [{"value": "driveway", "between": [0.0, 0.5]}, {"value": "alley", "between": [0.5, 1.0]}] -> two
            subsegments with 'subclass' field populated accordingly

    Args:
        features: The input feature layer or feature class.

    warning !!!
        This function modifies the input features in place by adding new features and deleting the original ones.
    """
    # if features is a path, convert to string - arcpy cannot handle Path objects
    if isinstance(features, Path):
        features = str(features)

    # make sure features is a path string if a layer is provided - this avoids schema lock issues with AddField
    if isinstance(features, arcpy._mp.Layer):
        features = arcpy.Describe(features).catalogPath

    # get a list of existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # ensure the necessary source field exists
    subclass_rules_field = "subclass_rules"
    if subclass_rules_field not in field_names:
        raise ValueError(
            f"Source field '{subclass_rules_field}' does not exist in features. This is necessary to split features "
            f"into subclasses."
        )

    # add subclass field if it does not exist
    if "subclass" not in field_names:
        arcpy.management.AddField(
            in_table=features,
            field_name="subclass",
            field_type="TEXT",
            field_length=50,
        )
        logger.debug("Added 'subclass' field to features.")

        # update field names list
        field_names = [f.name for f in arcpy.ListFields(features)]
    else:
        logger.debug("'subclass' field already exists in features.")

    # counters
    add_cnt = 0
    update_cnt = 0
    del_cnt = 0

    # delete oid tracker
    del_oid_lst = []

    # create a temporary feature class with the same schema to hold new features
    tmp_gdb = get_tmp_gdb()
    desc = arcpy.Describe(features)
    tmp_fc = arcpy.management.CreateFeatureclass(
        out_path=str(tmp_gdb),
        out_name=f"temp_subclass_{uuid.uuid4().hex}",
        geometry_type=desc.shapeType,
        template=features,
        spatial_reference=desc.spatialReference,
    )[0]

    logger.debug(f"Created temporary feature class for subclass features: {tmp_fc}")

    # cursor field names not including the geometry column
    cursor_fields = [f for f in field_names if f != desc.shapeFieldName]

    # add geometry token to cursor field names
    cursor_fields = cursor_fields + ["SHAPE@"]

    # use an update cursor to read and update features
    with arcpy.da.UpdateCursor(features, cursor_fields) as update_cursor:
        # use an insert cursor to add new features to the temporary feature class
        with arcpy.da.InsertCursor(tmp_fc, cursor_fields) as insert_cursor:
            # iterate through the update_cursor rows
            for row in update_cursor:
                # get the subclass_rules as a raw string
                subclass_rules_str = row[cursor_fields.index(subclass_rules_field)]

                # only process if subclass_rules is valid
                if not (
                    subclass_rules_str is None
                    or not isinstance(subclass_rules_str, str)
                    or subclass_rules_str.strip() == "null"
                    or len(subclass_rules_str) == 0
                ):
                    # parse the subclass_rules string into a list of dictionaries
                    subclass_rules = json.loads(subclass_rules_str)

                    # process each subclass rule
                    for idx, rule in enumerate(subclass_rules):
                        # The geometry object for the current feature
                        geom = row[-1]

                        # Index of subclass field
                        subclass_idx = cursor_fields.index("subclass")

                        # Index of OID field
                        oid_idx = cursor_fields.index(desc.OIDFieldName)

                        # Extract the subclass value and the segment range (between)
                        value = rule.get("value")
                        between = rule.get("between")

                        if between is None:
                            # If 'between' is None, update the current row to set the subclass for the entire geometry
                            row[subclass_idx] = value
                            update_cursor.updateRow(row)
                            logger.debug(
                                f"Updated feature with OID {row[0]} to have subclass '{value}' for entire geometry."
                            )
                            update_cnt += 1
                        else:
                            # If this is the first rule and the segment does not start at 0, retain the original row for the initial segment
                            if idx == 0 and between[0] > 0:
                                new_row = list(row)  # Copy the original row

                                # Create a geometry subsegment from 0 to the start of 'between'
                                new_row[-1] = geom.segmentAlongLine(
                                    0.0, between[0] * geom.length
                                )
                                insert_cursor.insertRow(new_row)
                                logger.debug(
                                    f"Inserted new feature with no subclass from 0.0000 to {between[0]:.4f} fraction of geometry."
                                )
                                add_cnt += 1

                            # For the current rule, create a new row for the specified subclass and segment
                            new_row = list(row)
                            new_row[subclass_idx] = value  # Set the subclass value
                            (
                                start_frac,
                                end_frac,
                            ) = between  # Segment start and end fractions

                            # Create a geometry subsegment for the specified range
                            new_row[-1] = geom.segmentAlongLine(
                                start_frac * geom.length, end_frac * geom.length
                            )
                            insert_cursor.insertRow(new_row)
                            logger.debug(
                                f"Inserted new feature with subclass '{value}' from {start_frac:.4f} to {end_frac:.4f} fraction of geometry."
                            )
                            add_cnt += 1

                            # Mark the original feature for deletion after splitting
                            del_oid_lst.append(row[oid_idx])

    # append the new features from the temporary feature class to the original features
    arcpy.management.Append(
        inputs=tmp_fc,
        target=features,
        schema_type="NO_TEST",
    )

    logger.debug("Appended new subclass features to original features.")

    # delete the split features - deleting after appending new features to avoid data loss
    with arcpy.da.UpdateCursor(features, "OID@") as drop_cursor:
        for row in drop_cursor:
            if row[0] in del_oid_lst:
                drop_cursor.deleteRow()

    logger.debug("Appended new subclass features to original features.")

    # delete the temporary file geodatabase
    shutil.rmtree(tmp_gdb, ignore_errors=True)

    logger.debug("Deleted temporary file geodatabase.")

    # log the final counts
    logger.info(
        f"Added {add_cnt:,} new subclass features, updated {update_cnt:,} existing features, and deleted "
        f"{len(del_oid_lst):,} original features."
    )

    return

table_to_features(table, output_features)

Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Feature Class.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
Path

Path to the created feature class.

Source code in src/overture_to_arcgis/utils/__main__.py
def table_to_features(
    table: Union[pa.Table, pa.RecordBatch], output_features: Union[str, Path]
) -> Path:
    """
    Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Feature Class.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        Path to the created feature class.
    """
    # convert the table to a spatially enabled dataframe
    df = table_to_spatially_enabled_dataframe(table)

    # save the dataframe to a feature class
    df.spatial.to_featureclass(output_features)

    return output_features

table_to_spatially_enabled_dataframe(table)

Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Spatially Enabled DataFrame.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
DataFrame

ArcGIS Spatially Enabled DataFrame.

Source code in src/overture_to_arcgis/utils/__main__.py
def table_to_spatially_enabled_dataframe(
    table: Union[pa.Table, pa.RecordBatch]
) -> pd.DataFrame:
    """
    Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Spatially Enabled DataFrame.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        ArcGIS Spatially Enabled DataFrame.
    """
    # clean up any complex columns
    smpl_table = convert_complex_columns_to_strings(table)

    # convert table to a pandas DataFrame
    df = smpl_table.to_pandas()

    # get the geometry column from the metadata using the helper function
    geom_col = get_geometry_column(table)

    # convert the geometry column from WKB to arcgis Geometry objects
    df[geom_col] = convert_wkb_column_to_arcgis_geometry(df[geom_col])

    # set the geometry column using the ArcGIS GeoAccessor to get a Spatially Enabled DataFrame
    df.spatial.set_geometry(geom_col, sr=4326, inplace=True)

    return df

validate_bounding_box(bbox)

Validate the bounding box coordinates.

Source code in src/overture_to_arcgis/utils/__main__.py
def validate_bounding_box(
    bbox: tuple[float, float, float, float]
) -> tuple[float, float, float, float]:
    """Validate the bounding box coordinates."""
    # ensure four numeric values are provided
    if len(bbox) != 4:
        raise ValueError(
            "Bounding box must be a tuple of four values: (minx, miny, maxx, maxy)."
        )

    # ensure all coordinates are numeric, and if so convert to float
    if not all(isinstance(coord, (int, float)) for coord in bbox):
        raise ValueError(
            "All coordinates in the bounding box must be numeric (int or float)."
        )
    else:
        bbox = tuple(float(coord) for coord in bbox)

    # ensure minx < maxx and miny < maxy
    if bbox[0] >= bbox[2] or bbox[1] >= bbox[3]:
        raise ValueError(
            "Invalid bounding box coordinates: ensure that minx < maxx and miny < maxy."
        )

    # ensure coordinates are within valid ranges
    if not (
        -180.0 <= bbox[0] <= 180.0
        and -90.0 <= bbox[1] <= 90.0
        and -180.0 <= bbox[2] <= 180.0
        and -90.0 <= bbox[3] <= 90.0
    ):
        raise ValueError(
            "Bounding box coordinates must be within valid ranges: minx/maxx [-180, 180], miny/maxy [-90, 90]."
        )

    # If all checks pass, the bounding box is valid
    return bbox