Skip to content

Python API

get_features(output_feature_class, overture_type, bbox, connect_timeout=None, request_timeout=None)

Retrieve data from Overture Maps and save it as an ArcGIS Feature Class.

Parameters:

Name Type Description Default
output_feature_class Union[str, Path]

Path to the output feature class.

required
overture_type str

Overture feature type to retrieve.

required
bbox tuple[float, float, float, float]

Bounding box to filter the data. Format: (minx, miny, maxx, maxy).

required
connect_timeout int

Optional timeout in seconds for establishing a connection to the AWS S3.

None
request_timeout int

Optional timeout in seconds for waiting for a response from the AWS S3.

None

Returns:

Type Description
Path

Path to the created feature class.

Source code in src/overture_to_arcgis/__main__.py
def get_features(
    output_feature_class: Union[str, Path],
    overture_type: str,
    bbox: tuple[float, float, float, float],
    connect_timeout: int = None,
    request_timeout: int = None,
) -> Path:
    """
    Retrieve data from Overture Maps and save it as an ArcGIS Feature Class.

    Args:
        output_feature_class: Path to the output feature class.
        overture_type: Overture feature type to retrieve.
        bbox: Bounding box to filter the data. Format: (minx, miny, maxx, maxy).
        connect_timeout: Optional timeout in seconds for establishing a connection to the AWS S3.
        request_timeout: Optional timeout in seconds for waiting for a response from the AWS S3.

    Returns:
        Path to the created feature class.
    """
    # ensure arcpy is available
    if find_spec('arcpy') is None:
        raise EnvironmentError("ArcPy is required for get_as_feature_class.")

    # validate the bounding box
    bbox = validate_bounding_box(bbox)

    # get a temporary geodatabase to hold the batch feature classes
    tmp_gdb = get_temp_gdb()

    # list to hold the feature classes
    fc_list = []

    # get the record batch generator
    batches = get_record_batches(overture_type, bbox, connect_timeout, request_timeout)

    # iterate through the record batches to see if we have any data
    for btch_idx, batch in enumerate(batches):
        # warn of no data found for the batch
        if batch.num_rows == 0:
            logger.warning(
                f"No '{overture_type}' data found for the specified bounding box: {bbox}. No temporary feature "
                f"class will be created for this batch."
            )

        # if there is data to work with, process it
        else:
            # report progress
            if logger.level <= logging.DEBUG:
                tbl_cnt = batch.num_rows
                logger.debug(
                    f"In batch {btch_idx:,} fetched {tbl_cnt:,} rows of '{overture_type}' data from Overture Maps."
                )

            # create the temporary feature class path
            tmp_fc = tmp_gdb / f"overture_{overture_type}_{btch_idx:04d}"

            # convert the batch to a feature class
            table_to_features(batch, output_features=tmp_fc)

            # add the feature class to the list if there is data to work with
            fc_list.append(str(tmp_fc))

    # merge the feature classes into a single feature class if any data was found
    if len(fc_list) > 0:
        arcpy.management.Merge(fc_list, str(output_feature_class))
    else:
        logger.warning("No data found for the specified bounding box. No output feature class created.")

    # cleanup temporary data - remove temporary geodatabase using arcpy to avoid any locks
    arcpy.management.Delete(str(tmp_gdb))

    return output_feature_class

get_spatially_enabled_dataframe(overture_type, bbox, connect_timeout=None, request_timeout=None)

Retrieve data from Overture Maps as an ArcGIS spatially enabled Pandas DataFrame.

Note

To see available overture types, use arcgis_overture.utils.get_all_overture_types().

Parameters:

Name Type Description Default
overture_type str

Overture feature type to retrieve.

required
bbox tuple[float, float, float, float]

Bounding box to filter the data. Format: (minx, miny, maxx, maxy).

required
connect_timeout int

Optional timeout in seconds for establishing a connection to the Overture Maps service.

None
request_timeout int

Optional timeout in seconds for waiting for a response from the Overture Maps service.

None

Returns:

Type Description
DataFrame

A spatially enabled pandas DataFrame containing the requested Overture Maps data.

Source code in src/overture_to_arcgis/__main__.py
def get_spatially_enabled_dataframe(
    overture_type: str,
    bbox: tuple[float, float, float, float],
    connect_timeout: int = None,
    request_timeout: int = None,
) -> pd.DataFrame:
    """
    Retrieve data from Overture Maps as an
    [ArcGIS spatially enabled Pandas DataFrame](https://developers.arcgis.com/python/latest/guide/introduction-to-the-spatially-enabled-dataframe/).

    !!! note

        To see available overture types, use `arcgis_overture.utils.get_all_overture_types()`.

    Args:
        overture_type: Overture feature type to retrieve.
        bbox: Bounding box to filter the data. Format: (minx, miny, maxx, maxy).
        connect_timeout: Optional timeout in seconds for establishing a connection to the Overture Maps service.
        request_timeout: Optional timeout in seconds for waiting for a response from the Overture Maps service.

    Returns:
        A spatially enabled pandas DataFrame containing the requested Overture Maps data.
    """
    # validate the overture type
    available_types = get_all_overture_types()
    if overture_type not in available_types:
        raise ValueError(
            f"Invalid overture type: {overture_type}. Valid types are: {available_types}"
        )

    # validate the bounding box
    bbox = validate_bounding_box(bbox)

    # get the record batch generator
    batches = get_record_batches(overture_type, bbox, connect_timeout, request_timeout)

    # initialize the dataframe and geometry column name
    df = None

    # iterate the batches
    for idx, batch in enumerate(batches):

        # if the batch has any rows and the dataframe is not yet initialized
        if batch.num_rows > 0 and df is None:

            # create the initial dataframe
            df = table_to_spatially_enabled_dataframe(batch)

            # save the geometry column name
            geom_col = df.spatial.name

        elif batch.num_rows > 0:
            # get the batch as a spatially enabled dataframe
            tmb_df = table_to_spatially_enabled_dataframe(batch)

            # append the batch dataframe to the main dataframe
            df = pd.concat([df, tmb_df], ignore_index=True)

    # if data found, perform post processing
    if isinstance(df, pd.DataFrame):
        # set the geometry column using the ArcGIS GeoAccessor to get a Spatially Enabled DataFrame
        df.spatial.set_geometry(geom_col, sr=4326, inplace=True)

        # reset the index so it makes sense after concatenation
        df.reset_index(drop=True, inplace=True)

        # log the number of rows fetched
        df_cnt = df.shape[0]
        logger.debug(
            f"Fetched {df_cnt} rows of '{overture_type}' data from Overture Maps."
        )

    # if no data found, log a warning and create an empty dataframe to return
    else:
        df = pd.DataFrame()
        logger.warning(
            f"No '{overture_type}' data found for the specified bounding box: {bbox}"
        )

    return df

Module overture_to_arcgis.utils

add_alternate_category_field(features)

Add an 'alternate_category' field to the input features if it does not already exist, and calculate from the 'categories' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_alternate_category_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add an 'alternate_category' field to the input features if it does not already exist, and calculate from
    the 'categories' field.

    Args:
        features: The input feature layer or feature class.
    """
    # check if 'alternate_category' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if "alternate_category" not in field_names:
        # add 'alternate_category' field
        arcpy.management.AddField(
            in_table=features,
            field_name="alternate_category",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'alternate_category' field to features.")

    # ensure schema lock is released by forcing garbage collection
    gc.collect()

    # calculate 'alternate_category' from 'categories' field
    with arcpy.da.UpdateCursor(features, ["categories", "alternate_category"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the categories value and extract alternate category
            categories_value = row[0]

            # set the alternate category if categories_value is valid
            if (
                categories_value is not None
                and isinstance(categories_value, str)
                and len(categories_value) > 0
                and not categories_value.strip() == "None"
                and not categories_value.strip().lower() == 'null'
            ):
                # parse the categories value into a dictionary
                categories_dict = json.loads(categories_value)

                # extract the alternate category
                alternate_category = categories_dict.get("alternate")

                # convert to string if it is a list
                if isinstance(alternate_category, list):
                    alternate_category = ", ".join(alternate_category)

                # ensure the alternate category is not some variation of None
                if alternate_category in [None, "None", "none", ""]:
                    alternate_category = None

                # set the alternate category in the row
                row[1] = alternate_category

                # update the row
                update_cursor.updateRow(row)

    return

add_boolean_access_restrictions_fields(features, access_field='access_restrictions')

Add boolean access restriction fields to the input features based on the access_restrictions field.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
access_field str

The name of the access restrictions field.

'access_restrictions'
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_boolean_access_restrictions_fields(features: Union[str, Path, arcpy._mp.Layer], access_field: str = "access_restrictions") -> None:
    """
    Add boolean access restriction fields to the input features based on the access_restrictions field.

    Args:
        features: The input feature layer or feature class.
        access_field: The name of the access restrictions field.
    """
    # if features is a path, convert to string
    if isinstance(features, Path):
        features = str(features)

    # ensure the features exist
    if not arcpy.Exists(features):
        raise ValueError("Input features do not exist.")

    # first pass to collect all unique keys
    unique_keys = set()
    with arcpy.da.SearchCursor(features, [access_field]) as cursor:
        for row in cursor:
            if row[0] is not None:
                bool_dict = flatten_dict_to_bool_keys(row[0])
                unique_keys.update(bool_dict.keys())

    # create a list of fields to add
    add_fields = sorted([
        [slugify(key), "SHORT"] for key in unique_keys
    ])

    # add fields to feature class
    arcpy.management.AddFields(features, add_fields)

    logger.info('Added boolean access restriction fields to features: ' + ', '.join([f[0] for f in add_fields]))

    # ensure schema lock is released by forcing garbage collection
    gc.collect()

    # second pass to populate the fields
    field_names = [slugify(key) for key in unique_keys]

    with arcpy.da.UpdateCursor(features, [access_field] + field_names) as cursor:
        for row in cursor:
            bool_dict = {}
            if row[0] is not None:
                bool_dict = flatten_dict_to_bool_keys(row[0])
            for idx, key in enumerate(unique_keys):
                row[idx + 1] = bool_dict.get(key, 0)
            cursor.updateRow(row)

    return

add_h3_indices(features, resolution=9, h3_field=None)

Add an H3 index field to the input features based on their geometry.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
resolution int

The H3 resolution to use for indexing.

9
h3_field Optional[str]

The name of the H3 index field to add.

None
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_h3_indices(
    features: Union[str, Path, arcpy._mp.Layer],
    resolution: int = 9,
    h3_field: Optional[str] = None,
) -> None:
    """
    Add an H3 index field to the input features based on their geometry.

    Args:
        features: The input feature layer or feature class.
        resolution: The H3 resolution to use for indexing.
        h3_field: The name of the H3 index field to add.
    """
    if find_spec("h3") is None:
        raise ImportError("The 'h3' library is not installed. Please install it to use this function.")

    import h3

    # validate resolution
    if not isinstance(resolution, int) or not (0 <= resolution <= 15):
        raise ValueError("Invalid H3 resolution. Please choose a resolution between 0 and 15.")

    # if h3_field is None, set to default
    if h3_field is None:
        h3_field = f"h3_{resolution:02d}"

    # check if h3_field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if h3_field not in field_names:
        # add h3_field
        arcpy.management.AddField(
            in_table=features,
            field_name=h3_field,
            field_type="TEXT",
            field_length=20,
        )

        logger.debug(f"Added '{h3_field}' field to features.")

    # ensure schema lock is released by forcing garbage collection
    gc.collect()

    # calculate H3 indices from geometry
    with arcpy.da.UpdateCursor(features, ['SHAPE@XY', h3_field]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:

            # get the geometry coordinates
            x, y = row[0]

            # get the H3 index for the centroid
            h3_index = h3.latlng_to_cell(y, x, resolution)

            # set the H3 index in the row
            row[1] = h3_index

            # update the row
            update_cursor.updateRow(row)

    return

add_overture_taxonomy_fields(features, single_category_field=None)

Add 'category_' fields to the input features based on the Overture taxonomy based on the category provided for each row. The category for each row can be specified using the single_category_field parameter.

Note

If a single category field is not provided, the function will attempt to read the value for the primary key from string JSON in the categories field, if this field exists.

Parameters:

Name Type Description Default
features Union[str, Path, Layer]

The input feature layer or feature class.

required
single_category_field Optional[str]

The field name containing a single category.

None
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_overture_taxonomy_fields(features: Union[str, Path, arcpy._mp.Layer], single_category_field: Optional[str] = None) -> None:
    """
    Add 'category_<n>' fields to the input features based on the Overture taxonomy based on the category provided for each row.
    The category for each row can be specified using the `single_category_field` parameter.

    !!! note
        If a single category field is not provided, the function will attempt to read the value for the `primary` key from 
        string JSON in the `categories` field, if this field exists.

    Args:
        features: The input feature layer or feature class.
        single_category_field: The field name containing a single category.
    """
    # get a list of existing field names
    field_names = [f.name for f in arcpy.ListFields(features)]

    # if single category not provided, attempt to use the 'categories' field to extract the primary category
    if single_category_field is None:

        # ensure the 'categories' field exists
        if "categories" not in field_names:
            raise ValueError("Field for category extraction, 'categories', does not exist in features.")

        # create a generator to extract categories from the 'categories' field
        categories_gen = (
            json.loads(row[0]).get("primary")
            for row in arcpy.da.SearchCursor(features, ["categories"])
        )

        # root name for the taxonomy fields
        root_name = "primary_category"

    # if single category field is provided
    else:

        # ensure the single category field exists
        if single_category_field not in field_names:
            raise ValueError(f"Provided single category field '{single_category_field}' does not exist in features.")

        # create a generator to extract categories from the single category field
        categories_gen = (
            row[0]
            for row in arcpy.da.SearchCursor(features, [single_category_field])
        )

        # root name for the taxonomy fields
        root_name = slugify(single_category_field)

    # get taxonomy dataframe
    taxonomy_df = get_overture_taxonomy_dataframe()

    # get the max lengths for each category field
    max_lengths = get_overture_taxonomy_category_field_max_lengths(taxonomy_df)

    # set the index to category_code for easier lookup
    taxonomy_df.set_index("category_code", inplace=True)

    # only keep the category columns in the taxonomy dataframe
    taxonomy_df = taxonomy_df.loc[:,[col for col in taxonomy_df.columns if col.startswith("category_")]]

    # replace category in the field names with the root name
    taxonomy_df.columns = [col.replace("category_", f"{root_name}_") for col in taxonomy_df.columns]
    max_lengths = {col.replace("category_", f"{root_name}_"): max_len for col, max_len in max_lengths.items()}

    # iterate through the maximum lengths and add fields to the features
    for col, max_len in max_lengths.items():

        # add the field to the features
        arcpy.management.AddField(
            in_table=features,
            field_name=col,
            field_type="TEXT",
            field_length=max_len,
        )

        logger.info(f"Added field '{col}' with length {max_len} to features.")

    # ensure schema lock is released by forcing garbage collection
    gc.collect()

    # calculate the category code fields from the categories generator
    with arcpy.da.UpdateCursor(features, list(max_lengths.keys())) as update_cursor:
        # iterate through the rows and categories
        for row, category in zip(update_cursor, categories_gen):

            # set the category fields if category is valid
            if (
                category is not None
                and isinstance(category, str)
                and len(category) > 0
                and not category.strip() == "None"
                and not category.strip().lower() == 'null'
            ):
                # get the taxonomy row for the category
                taxonomy_row = taxonomy_df.loc[category]

                # if a taxonomy row is found, set the category fields
                if not taxonomy_row.empty:

                    # iterate through the category fields and set their values
                    for idx, col in enumerate(max_lengths.keys()):
                        row[idx] = taxonomy_row.loc[col]

                    # update the row
                    update_cursor.updateRow(row)

    return

add_primary_category_field(features)

Add a 'primary_category' field to the input features if it does not already exist, and calculate from the 'categories' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_primary_category_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'primary_category' field to the input features if it does not already exist, and calculate from
    the 'categories' field.

    Args:
        features: The input feature layer or feature class.
    """
    # check if 'primary_category' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if "primary_category" not in field_names:
        # add 'primary_category' field
        arcpy.management.AddField(
            in_table=features,
            field_name="primary_category",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'primary_category' field to features.")

    # ensure schema lock is released by forcing garbage collection
    gc.collect()

    # calculate 'primary_category' from 'categories' field
    with arcpy.da.UpdateCursor(features, ["categories", "primary_category"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the categories value and extract primary category
            categories_value = row[0]

            # set the primary category if categories_value is valid
            if (
                categories_value is not None
                and isinstance(categories_value, str)
                and len(categories_value) > 0
                and not categories_value.strip() == "None"
                and not categories_value.strip().lower() == 'null'
            ):
                # parse the categories value into a dictionary
                categories_dict = json.loads(categories_value)

                # extract the primary category
                primary_category = categories_dict.get("primary")

                # ensure the primary category is not some variation of None
                if primary_category in [None, "None", "none", ""]:
                    primary_category = None

                # set the primary category in the row
                row[1] = primary_category

                # update the row
                update_cursor.updateRow(row)

    return

add_primary_name(features)

Add a 'primary_name' field to the input features if it does not already exist, and calculate from

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_primary_name(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'primary_name' field to the input features if it does not already exist, and calculate from

    Args:
        features: The input feature layer or feature class.
    """
    # check if 'primary_name' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if "primary_name" not in field_names:
        # add 'primary_name' field
        arcpy.management.AddField(
            in_table=features,
            field_name="primary_name",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'primary_name' field to features.")

    # ensure schema lock is released by forcing garbage collection
    gc.collect()

    # calculate 'primary_name' from 'name' field
    with arcpy.da.UpdateCursor(features, ["names", "primary_name"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the name value and extract primary name
            name_str = row[0]

            # set the primary name if name_value is populated
            if (
                name_str is not None
                and isinstance(name_str, str)
                and len(name_str) > 0
                and not name_str.strip() == "None"
                and not name_str.strip().lower() == 'null'
            ):
                # parse the name value into a dictionary
                name_dict = json.loads(name_str)

                # extract the primary name
                primary_name = name_dict.get("primary")

                # set the primary name in the row
                row[1] = primary_name

                # update the row
                update_cursor.updateRow(row)

                logger.debug(f"Set 'primary_name' to '{primary_name}' for feature.")

    return

add_trail_field(features)

Add a 'trail' boolean field to the input features if it does not already exist. These features are those with a class of 'track', 'path', 'footway', 'trail' or 'cycleway' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_trail_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'trail' boolean field to the input features if it does not already exist. These features
    are those with a class of 'track', 'path', 'footway', 'trail' or 'cycleway' field.

    Args:
        features: The input feature layer or feature class.
    """
    # check if 'trail_field' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if "trail_field" not in field_names:
        # add 'trail_field' field
        arcpy.management.AddField(
            in_table=features,
            field_name="trail",
            field_type="SHORT",
        )

        logger.debug("Added 'trail_field' field to features.")

    # list of classes to search for
    trail_classes = ["track", "path", "footway", "trail", "cycleway"]

    # ensure schema lock is released by forcing garbage collection
    gc.collect()

    # calculate 'trail_field' from 'attributes' field
    with arcpy.da.UpdateCursor(features, ["class", "trail"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the attributes value and extract trail field
            class_value = row[0]

            # set the trail field if class_value is one of trail classes
            if class_value in trail_classes:
                # set the trail field in the row
                row[1] = 1

                # update the row
                update_cursor.updateRow(row)

    return

add_website_field(features)

Add a 'website' field to the input features if it does not already exist, and calculate from the 'contact_info' field.

Parameters:

Name Type Description Default
features Union[Layer, str, Path]

The input feature layer or feature class.

required
Source code in src/overture_to_arcgis/utils/_arcgis.py
def add_website_field(features: Union[arcpy._mp.Layer, str, Path]) -> None:
    """
    Add a 'website' field to the input features if it does not already exist, and calculate from
    the 'contact_info' field.

    Args:
        features: The input feature layer or feature class.
    """
    # check if 'website' field exists
    field_names = [f.name for f in arcpy.ListFields(features)]
    if "website" not in field_names:
        # add 'website' field
        arcpy.management.AddField(
            in_table=features,
            field_name="website",
            field_type="TEXT",
            field_length=255,
        )

        logger.debug("Added 'website' field to features.")

    # ensure schema lock is released by forcing garbage collection
    gc.collect()

    # calculate 'website' from 'websites' field
    with arcpy.da.UpdateCursor(features, ["websites", "website"]) as update_cursor:
        # iterate through the rows
        for row in update_cursor:
            # get the websites value and extract website
            website_value = row[0]

            # set the website if website_value is valid
            if (
                website_value is not None
                and isinstance(website_value, str)
                and len(website_value) > 0
                and not website_value.strip() == "None"
                and not website_value.strip().lower() == 'null'

            ):
                # parse the website value into a list
                website_lst = json.loads(website_value)

                # extract the first website from the list
                if isinstance(website_lst, list) and len(website_lst) > 0:
                    website = website_lst[0]

                    # only use the website if it is less than 255 characters
                    if isinstance(website, str) and website.lower().strip() != "none" and 0 < len(website) <= 255:
                        row[1] = website

                        # update the row
                        update_cursor.updateRow(row)

                    else:
                        logger.warning(
                            f"Website exceeds 255 characters and will not be set for the feature: '{website}'"
                        )

    return

get_all_overture_types(release=None, s3=None)

Returns a list of all available Overture dataset types for a given release.

Parameters:

Name Type Description Default
release Optional[str]

Optional release version. If not provided, the most current release will be used.

None
s3 Optional[S3FileSystem]

Optional pre-configured S3 filesystem. If not provided, an anonymous S3 filesystem will be created.

None

Returns:

Type Description
list[str]

List of available overture types for the release.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_all_overture_types(
    release: Optional[str] = None, s3: Optional[fs.S3FileSystem] = None
) -> list[str]:
    """
    Returns a list of all available Overture dataset types for a given release.

    Args:
        release: Optional release version. If not provided, the most current
            release will be used.
        s3: Optional pre-configured S3 filesystem. If not provided, an anonymous
            S3 filesystem will be created.

    Returns:
        List of available overture types for the release.
    """
    # if no release provided, get the most current one
    if release is None:
        release = get_current_release()

    # get the type theme map
    type_theme_map = get_type_theme_map(release=release, s3=s3)

    # get the types from the mapping
    types = list(type_theme_map.keys())

    logger.debug(f"Available types for release {release}: {types}")

    return types

get_current_release()

Returns the most current Overture dataset release string.

Returns:

Type Description
str

Most current release string.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_current_release() -> str:
    """
    Returns the most current Overture dataset release string.

    Returns:
        Most current release string.
    """
    # retrieve the list of releases
    releases = get_release_list()

    # make sure there is at least one release
    if not releases:
        raise RuntimeError("No Overture dataset releases found.")

    # get the most current release by sorting the list
    current_release = sorted(releases)[-1]

    logger.debug(f"Current release: {current_release}")

    return current_release

get_geometry_column(table)

Get the name of the geometry column from the PyArrow Table or RecordBatch metadata.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
str

Name of the geometry column.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_geometry_column(table: Union[pa.Table, pa.RecordBatch]) -> str:
    """
    Get the name of the geometry column from the PyArrow Table or RecordBatch metadata.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        Name of the geometry column.
    """
    geo_meta = table.schema.metadata.get(b"geo")
    if geo_meta is None:
        raise ValueError("No geometry metadata found in the Overture Maps data.")
    geo_meta = json.loads(geo_meta.decode("utf-8"))
    geom_col = geo_meta.get("primary_column")
    if geom_col is None or geom_col not in table.column_names:
        raise ValueError(
            "No valid primary_geometry column defined in the Overture Maps metadata."
        )
    return geom_col

get_layers_for_unique_values(input_features, field_name, arcgis_map=None)

Create layers from unique values in a specified field of the input features.

Parameters:

Name Type Description Default
input_features Union[Layer, str, Path]

The input feature layer or feature class.

required
field_name str

The field name to get unique values from.

required
arcgis_map Optional[Map]

The ArcGIS map object to add the layers to.

None

Returns:

Type Description
list[Layer]

A list of ArcGIS layers created from the unique values.

Source code in src/overture_to_arcgis/utils/_arcgis.py
def get_layers_for_unique_values(
    input_features: Union[arcpy._mp.Layer, str, Path],
    field_name: str,
    arcgis_map: Optional[arcpy._mp.Map] = None,
) -> list[arcpy._mp.Layer]:
    """
    Create layers from unique values in a specified field of the input features.

    Args:
        input_features: The input feature layer or feature class.
        field_name: The field name to get unique values from.
        arcgis_map: The ArcGIS map object to add the layers to.

    Returns:
        A list of ArcGIS layers created from the unique values.
    """
    # get unique values using a search cursor to generate value into a set
    unique_values = set(
        (val[0] for val in arcpy.da.SearchCursor(input_features, [field_name]))
    )

    # list to hydrate with created layers
    layers = []

    # iterate unique values
    for value in unique_values:
        # create layer name
        layer_name = f"{field_name}_{value}"

        # create definition query
        definition_query = (
            f"{field_name} = '{value}'"
            if isinstance(value, str)
            else f"{field_name} = {value}"
        )

        # use definition query to create layer object
        layer = arcpy.management.MakeFeatureLayer(
            in_features=input_features,
            out_layer=layer_name,
            where_clause=definition_query,
        )[0]

        # if the map is provided, add the layer to the map
        if arcgis_map:
            arcgis_map.addLayer(layer)
        layers.append(layer)

    return layers

get_logger(level='INFO', logger_name=None, logfile_path=None, log_format='%(asctime)s | %(name)s | %(levelname)s | %(message)s', propagate=True, add_stream_handler=True, add_arcpy_handler=False)

Get Python :class:Logger<logging.Logger> configured to provide stream, file or, if available, ArcPy output. The way the method is set up, logging will be routed through ArcPy messaging using :class:ArcpyHandler if ArcPy is available. If ArcPy is not available, messages will be sent to the console using a :class:StreamHandler<logging.StreamHandler>. Next, if the logfile_path is provided, log messages will also be written to the provided path to a logfile using a :class:FileHandler<logging.FileHandler>.

Valid log_level inputs include: * DEBUG - Detailed information, typically of interest only when diagnosing problems. * INFO - Confirmation that things are working as expected. * WARNING or WARN - An indication that something unexpected happened, or indicative of some problem in the near future (e.g. "disk space low"). The software is still working as expected. * ERROR - Due to a more serious problem, the software has not been able to perform some function. * CRITICAL - A serious error, indicating that the program itself may be unable to continue running.

Parameters:

Name Type Description Default
level Optional[Union[str, int]]

Logging level to use. Default is 'INFO'.

'INFO'
logger_name Optional[str]

Name of the logger. If None, the root logger is used.

None
log_format Optional[str]

Format string for the logging messages. Default is '%(asctime)s | %(name)s | %(levelname)s | %(message)s'.

'%(asctime)s | %(name)s | %(levelname)s | %(message)s'
propagate bool

If True, log messages are passed to the handlers of ancestor loggers. Default is False.

True
logfile_path Union[Path, str]

Where to save the logfile if file output is desired.

None
add_stream_handler bool

If True, add a StreamHandler to route logging to the console. Default is True.

True
add_arcpy_handler bool

If True and ArcPy is available, add the ArcpyHandler to route logging through ArcPy messaging. Default is False.

False
configure_logging('DEBUG')
logging.debug('nauseatingly detailed debugging message')
logging.info('something actually useful to know')
logging.warning('The sky may be falling')
logging.error('The sky is falling.)
logging.critical('The sky appears to be falling because a giant meteor is colliding with the earth.')
Source code in src/overture_to_arcgis/utils/_logging.py
def get_logger(
    level: Optional[Union[str, int]] = "INFO",
    logger_name: Optional[str] = None,
    logfile_path: Union[Path, str] = None,
    log_format: Optional[str] = "%(asctime)s | %(name)s | %(levelname)s | %(message)s",
    propagate: bool = True,
    add_stream_handler: bool = True,
    add_arcpy_handler: bool = False,
) -> logging.Logger:
    """
    Get Python :class:`Logger<logging.Logger>` configured to provide stream, file or, if available, ArcPy output.
    The way the method is set up, logging will be routed through ArcPy messaging using :class:`ArcpyHandler` if
    ArcPy is available. If ArcPy is *not* available, messages will be sent to the console using a
    :class:`StreamHandler<logging.StreamHandler>`. Next, if the `logfile_path` is provided, log messages will also
    be written to the provided path to a logfile using a :class:`FileHandler<logging.FileHandler>`.

    Valid `log_level` inputs include:
    * `DEBUG` - Detailed information, typically of interest only when diagnosing problems.
    * `INFO` - Confirmation that things are working as expected.
    * `WARNING` or ``WARN`` -  An indication that something unexpected happened, or indicative of some problem in the
        near future (e.g. "disk space low"). The software is still working as expected.
    * `ERROR` - Due to a more serious problem, the software has not been able to perform some function.
    * `CRITICAL` - A serious error, indicating that the program itself may be unable to continue running.

    Args:
        level: Logging level to use. Default is `'INFO'`.
        logger_name: Name of the logger. If `None`, the root logger is used.
        log_format: Format string for the logging messages. Default is `'%(asctime)s | %(name)s | %(levelname)s | %(message)s'`.
        propagate: If `True`, log messages are passed to the handlers of ancestor loggers. Default is `False`.
        logfile_path: Where to save the logfile if file output is desired.
        add_stream_handler: If `True`, add a `StreamHandler` to route logging to the console. Default is `True`.
        add_arcpy_handler: If `True` and ArcPy is available, add the `ArcpyHandler` to route logging through
            ArcPy messaging. Default is `False`.

    ``` python
    configure_logging('DEBUG')
    logging.debug('nauseatingly detailed debugging message')
    logging.info('something actually useful to know')
    logging.warning('The sky may be falling')
    logging.error('The sky is falling.)
    logging.critical('The sky appears to be falling because a giant meteor is colliding with the earth.')
    ```

    """
    # ensure valid logging level
    log_str_lst = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL", "WARN", "FATAL"]
    log_int_lst = [0, 10, 20, 30, 40, 50]

    if not isinstance(level, (str, int)):
        raise ValueError(
            "You must define a specific logging level for log_level as a string or integer."
        )
    elif isinstance(level, str) and level not in log_str_lst:
        raise ValueError(
            f'The log_level must be one of {log_str_lst}. You provided "{level}".'
        )
    elif isinstance(level, int) and level not in log_int_lst:
        raise ValueError(
            f"If providing an integer for log_level, it must be one of the following, {log_int_lst}."
        )

    # get default logger and set logging level at the same time
    logger = logging.getLogger(logger_name)
    logger.setLevel(level=level)

    # clear handlers
    logger.handlers.clear()

    # configure formatting
    log_frmt = logging.Formatter(log_format)

    # set propagation
    logger.propagate = propagate

    # make sure at least a stream handler is present
    if add_stream_handler:
        # create and add the stream handler
        sh = logging.StreamHandler()
        sh.setFormatter(log_frmt)
        logger.addHandler(sh)

    # if in an environment with ArcPy, add handler to bubble logging up to ArcGIS through ArcPy
    if add_arcpy_handler:
        ah = ArcpyHandler()
        ah.setFormatter(log_frmt)
        logger.addHandler(ah)

    # if a path for the logfile is provided, log results to the file
    if logfile_path is not None:
        # ensure the full path exists
        if not logfile_path.parent.exists():
            logfile_path.parent.mkdir(parents=True)

        # create and add the file handler
        fh = logging.FileHandler(str(logfile_path))
        fh.setFormatter(log_frmt)
        logger.addHandler(fh)

    return logger

get_record_batches(overture_type, bbox=None, connect_timeout=None, request_timeout=None)

Return a pyarrow RecordBatchReader for the desired bounding box and S3 path.

Parameters:

Name Type Description Default
overture_type str

Overture feature type to load.

required
bbox Optional[Tuple[float, float, float, float]]

Optional bounding box for data fetch (xmin, ymin, xmax, ymax).

None
connect_timeout Optional[float]

Optional connection timeout in seconds.

None
request_timeout Optional[float]

Optional request timeout in seconds.

None

Yields:

Type Description
RecordBatch

pa.RecordBatch: Record batches with the requested data.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_record_batches(
    overture_type: str,
    bbox: Optional[Tuple[float, float, float, float]] = None,
    connect_timeout: Optional[float] = None,
    request_timeout: Optional[float] = None,
) -> Generator[pa.RecordBatch, None, None]:
    """
    Return a pyarrow RecordBatchReader for the desired bounding box and S3 path.

    Args:
        overture_type: Overture feature type to load.
        bbox: Optional bounding box for data fetch (xmin, ymin, xmax, ymax).
        connect_timeout: Optional connection timeout in seconds.
        request_timeout: Optional request timeout in seconds.

    Yields:
        pa.RecordBatch: Record batches with the requested data.
    """
    # create connection to the S3 filesystem
    s3 = fs.S3FileSystem(
        anonymous=True,
        region="us-west-2",
        connect_timeout=connect_timeout,
        request_timeout=request_timeout,
    )

    # get the overture type to theme mapping
    type_theme_map = get_type_theme_map(s3=s3)

    # validate the overture type
    available_types = type_theme_map.keys()
    if overture_type not in available_types:
        raise ValueError(
            f"Invalid overture type: {overture_type}. Available types are: {list(available_types)}"
        )

    # validate the bounding box coordinates
    bbox = validate_bounding_box(bbox)

    # extract the coordinates from the bounding box and create the filter
    xmin, ymin, xmax, ymax = bbox
    dataset_filter = (
        (pc.field("bbox", "xmin") < xmax)
        & (pc.field("bbox", "xmax") > xmin)
        & (pc.field("bbox", "ymin") < ymax)
        & (pc.field("bbox", "ymax") > ymin)
    )

    # get the most current release version
    release = get_current_release()

    # create the dataset path
    s3_pth = get_dataset_path(overture_type, release)

    # create the PyArrow dataset
    dataset = ds.dataset(s3_pth, filesystem=s3)

    # get the record batches with the extent filter applied
    batches = dataset.to_batches(filter=dataset_filter)

    # iterate through the batches and yield with geoarrow metadata
    for idx, batch in enumerate(batches):
        # if this is the first batch, and it's empty, warn of no data found
        if idx == 0 and batch.num_rows == 0:
            warn(
                f"No '{overture_type}' data found for the specified bounding box: {bbox}"
            )

        # get the geometry field
        geo_fld_idx = batch.schema.get_field_index("geometry")
        geo_fld = batch.schema.field(geo_fld_idx)

        # set the geoarrow metadata on the geometry field
        geoarrow_geo_fld = geo_fld.with_metadata(
            {b"ARROW:extension:name": b"geoarrow.wkb"}
        )

        # create an updated schema with the correct metadata for the geometry field
        geoarrow_schema = batch.schema.set(geo_fld_idx, geoarrow_geo_fld)

        # replace the batch schema with the updated geoarrow schema
        batch = batch.replace_schema_metadata(geoarrow_schema.metadata)

        # yield the batch to the caller
        yield batch

get_release_list(s3=None)

Returns a list of all available Overture dataset releases.

Parameters:

Name Type Description Default
s3 Optional[S3FileSystem]

Optional pre-configured S3 filesystem. If not provided, an anonymous S3 filesystem will be created.

None
Source code in src/overture_to_arcgis/utils/__main__.py
def get_release_list(s3: Optional[fs.S3FileSystem] = None) -> list[str]:
    """
    Returns a list of all available Overture dataset releases.

    Args:
        s3: Optional pre-configured S3 filesystem. If not provided, an anonymous
            S3 filesystem will be created.
    """
    # create S3 filesystem if not provided
    if s3 is None:
        s3 = fs.S3FileSystem(anonymous=True, region="us-west-2")

    # create fileselector
    selector = fs.FileSelector(
        base_dir="overturemaps-us-west-2/release/", recursive=False
    )

    # get the most current releases from S3 as FileInfo objects
    file_infos = s3.get_file_info(selector)

    # extract the directory names from the FileInfo objects
    directories = [
        info.path for info in file_infos if info.type == fs.FileType.Directory
    ]

    # get the directory names only (last part of the path)
    releases = [dir_path.split("/")[-1] for dir_path in directories]

    # for each of the releases, ensure the releas has data (can happen if new release is still being loaded)
    releases = [rel for rel in releases if len(get_themes(rel, s3)) >= 5]

    logger.debug(f"Available releases: {releases}")

    return releases

get_temp_gdb()

Get a temporary File Geodatabase path.

Source code in src/overture_to_arcgis/utils/__main__.py
def get_temp_gdb() -> Path:
    """Get a temporary File Geodatabase path."""
    tmp_dir = get_temp_dir()
    tmp_gdb = tmp_dir / "tmp_data.gdb"
    if not tmp_gdb.exists():
        if has_arcpy:
            import arcpy

            arcpy.management.CreateFileGDB(str(tmp_dir), tmp_gdb.name)
        else:
            raise EnvironmentError("arcpy is required to create a File Geodatabase.")
    return tmp_gdb

table_to_features(table, output_features)

Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Feature Class.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
Path

Path to the created feature class.

Source code in src/overture_to_arcgis/utils/__main__.py
def table_to_features(
    table: Union[pa.Table, pa.RecordBatch], output_features: Union[str, Path]
) -> Path:
    """
    Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Feature Class.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        Path to the created feature class.
    """
    # convert the table to a spatially enabled dataframe
    df = table_to_spatially_enabled_dataframe(table)

    # save the dataframe to a feature class
    df.spatial.to_featureclass(output_features)

    return output_features

table_to_spatially_enabled_dataframe(table)

Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Spatially Enabled DataFrame.

Parameters:

Name Type Description Default
table Union[Table, RecordBatch]

PyArrow Table or RecordBatch with GeoArrow metadata.

required

Returns:

Type Description
DataFrame

ArcGIS Spatially Enabled DataFrame.

Source code in src/overture_to_arcgis/utils/__main__.py
def table_to_spatially_enabled_dataframe(
    table: Union[pa.Table, pa.RecordBatch]
) -> pd.DataFrame:
    """
    Convert a PyArrow Table or RecordBatch with GeoArrow metadata to an ArcGIS Spatially Enabled DataFrame.

    Args:
        table: PyArrow Table or RecordBatch with GeoArrow metadata.

    Returns:
        ArcGIS Spatially Enabled DataFrame.
    """
    # clean up any complex columns
    smpl_table = convert_complex_columns_to_strings(table)

    # convert table to a pandas DataFrame
    df = smpl_table.to_pandas()

    # get the geometry column from the metadata using the helper function
    geom_col = get_geometry_column(table)

    # convert the geometry column from WKB to arcgis Geometry objects
    df[geom_col] = convert_wkb_column_to_arcgis_geometry(df[geom_col])

    # set the geometry column using the ArcGIS GeoAccessor to get a Spatially Enabled DataFrame
    df.spatial.set_geometry(geom_col, sr=4326, inplace=True)

    return df

validate_bounding_box(bbox)

Validate the bounding box coordinates.

Source code in src/overture_to_arcgis/utils/__main__.py
def validate_bounding_box(
    bbox: tuple[float, float, float, float]
) -> tuple[float, float, float, float]:
    """Validate the bounding box coordinates."""
    # ensure four numeric values are provided
    if len(bbox) != 4:
        raise ValueError(
            "Bounding box must be a tuple of four values: (minx, miny, maxx, maxy)."
        )

    # ensure all coordinates are numeric, and if so convert to float
    if not all(isinstance(coord, (int, float)) for coord in bbox):
        raise ValueError(
            "All coordinates in the bounding box must be numeric (int or float)."
        )
    else:
        bbox = tuple(float(coord) for coord in bbox)

    # ensure minx < maxx and miny < maxy
    if bbox[0] >= bbox[2] or bbox[1] >= bbox[3]:
        raise ValueError(
            "Invalid bounding box coordinates: ensure that minx < maxx and miny < maxy."
        )

    # ensure coordinates are within valid ranges
    if not (
        -180.0 <= bbox[0] <= 180.0
        and -90.0 <= bbox[1] <= 90.0
        and -180.0 <= bbox[2] <= 180.0
        and -90.0 <= bbox[3] <= 90.0
    ):
        raise ValueError(
            "Bounding box coordinates must be within valid ranges: minx/maxx [-180, 180], miny/maxy [-90, 90]."
        )

    # If all checks pass, the bounding box is valid
    return bbox