Skip to content

API

All of the data conversion functionality for ArcPy Parquet is in a Python package, arcpy_parquet. In this package, there are two primary functions for import and export.

create_schema_file(input_dataset, output_schema_file)

Create a CSV Schema file for use with parquet_to_feature_class.

Parameters:

Name Type Description Default
input_dataset Path

Path to Feature Class or Parquet dataset with schema to describe.

required
output_schema_file Path

Path where the CSV schema file will be stored.

required
Source code in src/arcpy_parquet/__main__.py
def create_schema_file(input_dataset: Path, output_schema_file: Path) -> Path:
    """
    Create a CSV Schema file for use with ``parquet_to_feature_class``.

    Args:
        input_dataset: Path to Feature Class or Parquet dataset with schema to describe.
        output_schema_file: Path where the CSV schema file will be stored.
    """
    # columns to include in the schema file
    param_keys = [
        "field_name",
        "field_type",
        "field_precision",
        "field_scale",
        "field_length",
        "field_alias",
        "field_is_nullable",
        "field_is_required",
        "field_domain",
    ]

    # if a feature class, read using arcpy
    if arcpy.da.Describe(str(input_dataset)).get("type") == "FeatureClass":
        # read the fields from the feature class
        fld_lst = [
            fld
            for fld in arcpy.ListFields(str(input_dataset))
            if fld.type != "OID" and fld.type != "Geometry"
        ]

        # properties and keys to use
        desc_keys = [
            "name",
            "type",
            "precision",
            "scale",
            "length",
            "aliasName",
            "isNullable",
            "required",
            "domain",
        ]

        # dump all the properties into a list of property sets
        prop_lst = [[getattr(fld, k) for k in desc_keys] for fld in fld_lst]

        # create a dataframe to make creating the CSV easier
        schema_df = pd.DataFrame(prop_lst, columns=param_keys)

    # if a parquet dataset, read using pyarrow
    else:
        # create a PyArrow Dataset to read from
        pqt_ds = pq.ParquetDataset(input_dataset, use_legacy_dataset=False)

        # get a list of the string column types and field aliases from parquet
        col_typ_lst, attr_alias_lst = zip(
            *[
                (str(c.type.value_type), c.name)
                if isinstance(c.type, pa.DictionaryType)
                else (str(c.type), c.name)
                for c in pqt_ds.schema
            ]
        )

        # prepend any column names starting with a number with an 'c' and save as the field names
        attr_nm_lst = [f"c{c}" if c[0].isdigit() else c for c in attr_alias_lst]

        # use these to map to esri field types
        fld_typ_lst = [import_dtype_dict[typ] for typ in col_typ_lst]

        # get the maximum string lengths from the parquet data for use in field creation
        schema_dict = get_parquet_max_string_lengths(pqt_ds)

        # create a dataframe to make creating the CSV easier
        schema_df = pd.DataFrame(
            [
                [
                    nm,
                    typ,
                    None,
                    None,
                    schema_dict.get(nm, 512) if typ == "TEXT" else None,
                    alias,
                    "NULLABLE",
                    "NULLABLE",
                    None,
                ]
                for nm, alias, typ in zip(attr_nm_lst, attr_alias_lst, fld_typ_lst)
            ],
            columns=param_keys,
        )

        # ensure field lengths are integers
        schema_df["field_length"] = schema_df["field_length"].astype(pd.Int64Dtype())

    # write to a file
    schema_df.to_csv(output_schema_file, encoding="utf-8", index=False)

    return output_schema_file

feature_class_to_parquet(input_table, output_parquet, partition_columns=None, include_geometry=True, geometry_format='WKB', batch_size=300000)

Export a Feature Class to Parquet with options for exporting to Geoparquet.

Parameters:

Name Type Description Default
input_table Path

Path to feature class or table.

required
output_parquet Path

Path to where the output Parquet file will be saved.

required
partition_columns Optional[Union[list[str], str]]

List of columns to use for partitioning the output Parquet dataset.

None
include_geometry bool

Whether to include the geometry in the output Parquet dataset.

True
geometry_format Literal['WKB']

If including the geometry, what format the geometry should be in, either XY, or WKB. Default is WKB.

'WKB'
batch_size int

Count of records per part-*.parquet file.

300000
Source code in src/arcpy_parquet/__main__.py
def feature_class_to_parquet(
    input_table: Path,
    output_parquet: Path,
    partition_columns: Optional[Union[list[str], str]] = None,
    include_geometry: bool = True,
    geometry_format: Literal["WKB"] = "WKB",
    batch_size: int = 300000,
) -> Path:
    """
    Export a Feature Class to Parquet with options for exporting to Geoparquet.

    Args:
        input_table: Path to feature class or table.
        output_parquet: Path to where the output Parquet file will be saved.
        partition_columns: List of columns to use for partitioning the output Parquet dataset.
        include_geometry: Whether to include the geometry in the output Parquet dataset.
        geometry_format: If including the geometry, what format the geometry should be in, either
            ``XY``, or ``WKB``. Default is ``WKB``.
        batch_size: Count of records per ``part-*.parquet`` file.
    """
    # downgrade path so arcpy can use strings
    fc_pth = str(input_table)

    # ensure source exists
    if not arcpy.Exists(fc_pth):
        raise ValueError(f"The input path {input_table} does not appear to exist.")

    # check the geometry output type
    geometry_format = geometry_format.upper()

    # TODO: add support for GeoJSON geometry format
    if geometry_format in ("GEOJSON", "JSON"):
        raise NotImplementedError(
            "Support for encoding Geoparquet geometry as GeoJSON is not yet implemented."
        )

    # ensure geometry format is supported
    if geometry_format not in ("XY", "WKB"):
        raise ValueError('geometry_format must be one of "XY" or "WKB".')

    # make sure the full output path exists where the data will be saved
    if not output_parquet.exists():
        output_parquet.mkdir(parents=True)

    # describe the data to access properties for validation
    desc = arcpy.da.Describe(fc_pth)

    # fields to be excluded
    exclude_fld_typ = ["Raster"]
    exclude_fld_lst = []

    if desc["hasOID"]:
        exclude_fld_lst.append(desc["OIDFieldName"])

    for shp_fld_key in ["lengthFieldName", "areaFieldName"]:
        shp_fld = desc.get(shp_fld_key)
        if shp_fld is not None:
            exclude_fld_lst.append(shp_fld)

    # get a list of input fields to use with the search cursor
    sc_col_lst = [
        f.name
        for f in desc["fields"]
        if f.name not in exclude_fld_lst and f.type not in exclude_fld_typ
    ]

    # if partitioning, ensure the columns are in the input data
    if partition_columns is None:
        partition_columns = []

    if isinstance(partition_columns, str):
        partition_columns = [partition_columns]

    # get a list of any potentially missing partition columns
    missing_prt_cols = [p for p in partition_columns if p not in sc_col_lst]

    # if any are missing, raise an error
    if len(missing_prt_cols) > 0:
        raise ValueError(
            f"The following partition columns do not appear to be in the input data: {', '.join(missing_prt_cols)}"
        )

    # iterate fields to create output schema, excluding geometry since handled explicitly later
    pa_fld_typ_xcld = exclude_fld_typ + ["Geometry"]
    pa_fld_lst = [
        f
        for f in desc["fields"]
        if f.name not in exclude_fld_lst and f.type not in pa_fld_typ_xcld
    ]
    pa_typ_lst = [
        (f.name, export_dtype_dict.get(f.type, pa.string())) for f in pa_fld_lst
    ]
    pq_schema = pa.schema(pa_typ_lst)

    # if the input has geometry (is a feature class)
    if desc.get("shapeFieldName") is not None and include_geometry:
        # get the name of the geometry column
        geom_nm = desc.get("shapeFieldName")

        # since the geometry must be specially formatted, it needs to first be removed from the list
        sc_col_lst.remove(geom_nm)

        # if just outputting centroid (point coordinates)
        if geometry_format == "XY":
            # add X and Y columns to search cursor list and the output schema
            for prt in ["X", "Y"]:
                sc_col_lst.append(f"{geom_nm}@{prt}")
                pq_schema = pq_schema.append(pa.field(f"geometry_{prt}", pa.float64()))

        # if working with any other output type, just use the specific output format
        else:
            # TODO: add support for GeoJSON geometry formats
            # add the correct search cursor type
            sc_col_lst.append(f"{geom_nm}@{geometry_format}")

            # add the geometry to the output schema
            pq_schema = pq_schema.append(
                pa.field(f"{geometry_format.lower()}", pa.binary())
            )

            # get the spatial reference and geometry type from the input features
            desc = arcpy.da.Describe(str(input_table))
            in_sr = desc.get("spatialReference")
            geom_typ = desc.get("shapeType")

    # if the geometry is not desired in the output, remove it from the search cursor column list
    if "shapeFieldName" in desc.keys() and not include_geometry:
        sc_col_lst.remove(desc["shapeFieldName"])

    # get values from the data to track progress
    max_range = int(arcpy.management.GetCount(str(input_table))[0])
    rep_range = max(1, max_range // 100)

    # report progress
    features_tense = "feature" if max_range == 1 else "features"
    arcpy.AddMessage(f"Starting export of {max_range:,} {features_tense}.")
    arcpy.SetProgressor("step", "Exporting...", 0, max_range, rep_range)

    # turn off auto cancelling since handling in loop
    arcpy.env.autoCancelling = False

    # create a template dictionary for data export
    pa_dict = {col: [] for col in pq_schema.names}

    # prepend geometry onto search cursor fields if outputting geometry
    if (
        include_geometry
        and geometry_format != "XY"
        and desc.get("shapeFieldName") is not None
    ):
        sc_col_lst = [f"{desc.get('shapeFieldName')}@"] + sc_col_lst

    # create a search cursor to work through the data
    with arcpy.da.SearchCursor(str(input_table), sc_col_lst) as search_cur:
        # variable for batch numbering
        prt_num = 0

        # initialize variables for extent tracking
        x_min, y_min, x_max, y_max = None, None, None, None

        # begin to iterate through the features
        for idx, row in enumerate(search_cur):
            # pull the geometry object out of the row, and get the extent if including geometry
            if include_geometry and geometry_format != "XY":
                geom = row[0]
                if geom is not None and isinstance(geom, arcpy.Geometry):
                    ext = geom.extent
                    if x_min is None or ext.XMin < x_min:
                        x_min = ext.XMin
                    if y_min is None or ext.YMin < y_min:
                        y_min = ext.YMin
                    if x_max is None or ext.XMax > x_max:
                        x_max = ext.XMax
                    if y_max is None or ext.YMax > y_max:
                        y_max = ext.YMax
                else:
                    logger.warning(f"Feature at index {idx} has no geometry.")

                # remove the geometry from the row so the rest of the columns line up with the schema
                row = row[1:]

            # add each row column partition_column_list to the respective key in the dictionary
            for col, val in zip(pq_schema.names, row):
                pa_dict[col].append(val)

            # if at a percent interval
            if idx % rep_range == 0:
                # report progress
                arcpy.SetProgressorPosition(idx)

                # check if cancelled, and if so, break out
                if arcpy.env.isCancelled:
                    break

            # if at a batch size (part) interval or end of dataset
            if (idx + 1) % batch_size == 0 or (idx + 1) == max_range:
                # if writing geoparquet, add the metadata to the schema
                if include_geometry and geometry_format != "XY":
                    # get the geoparquet header
                    gpqt_dict = get_geoparquet_header(
                        geometry_type=geom_typ,
                        encoding=geometry_format,
                        spatial_reference=in_sr,
                        bounding_box=[x_min, y_min, x_max, y_max]
                        if None not in (x_min, y_min, x_max, y_max)
                        else None,
                    )

                    # add the spatial reference to the metadata
                    pqt_meta = {
                        b"geo": str.encode(json.dumps(gpqt_dict), encoding="utf-8")
                    }

                    # add the metadata to the schema
                    pq_schema = pq_schema.with_metadata(pqt_meta)

                # create a PyArrow table object instance from the accumulated dictionary
                pa_tbl = pa.Table.from_pydict(pa_dict, pq_schema)

                # write out the part, optionally partitioned
                if len(partition_columns) > 0:
                    pq.write_to_dataset(
                        table=pa_tbl,
                        root_path=output_parquet,
                        partition_cols=partition_columns,
                        version="2.0",
                        compression="snappy",
                    )

                else:
                    # create a name and path string for the part file
                    part_nm = f"part-{uuid.uuid4().hex}-{prt_num:05d}.snappy.parquet"
                    part_pth = output_parquet / part_nm

                    pq.write_table(
                        table=pa_tbl,
                        where=str(part_pth),
                        version="2.0",
                        flavor="spark_session",
                        compression="snappy",
                    )

                # increment the naming index
                prt_num += 1

                # reset the loading dictionary
                pa_dict = {col: [] for col in pq_schema.names}

                # reset variables for extent tracking
                x_min, y_min, x_max, y_max = None, None, None, None

    # reset the progress indicator
    arcpy.ResetProgressor()

    return output_parquet

parquet_to_feature_class(parquet_path, output_feature_class, schema_file=None, geometry_type='GEOPARQUET', parquet_partitions=None, geometry_column='wkb', spatial_reference=4326, sample_count=None, build_spatial_index=True, compact=True)

Convert a properly formatted Parquet source into a Feature Class in a Geodatabase.

Note

The geometry must be encoded as well known binary (WKB), and complex field types such as arrays and structs are not supported.

Parameters:

Name Type Description Default
parquet_path Path

The directory or Parquet part file to convert.

required
output_feature_class Path

Where to save the new Feature Class.

required
schema_file Path

CSV file with detailed schema properties.

None
geometry_type Optional[Literal['GEOPARQUET', 'COORDINATES', 'H3']]

POINT, COORDINATES, H3 POLYLINE, or POLYGON describing the geometry type. Default is COORDINATES. COORDINATES, is a point geometry type described by two coordinate columns. H3 is a column containing H3 indices as hexadecimal strings. Polygon geometries will be created based on the H3 indices for the rows.

'GEOPARQUET'
parquet_partitions Optional[List[str]]

Partition name and values, if available, to select. For instance, if partitioned by country column using ISO2 identifiers, select Mexico using country=mx.

None
geometry_column Union[List[str], str]

Column from parquet table containing the geometry encoded as WKB. Default is wkb. If the geometry type is COORDINATES, this must be an iterable (tuple or list) of the x (longitude) and y (latitude) columns containing the coordinates.

'wkb'
spatial_reference Union[SpatialReference, str, int]

Spatial reference of input data. Default is WGS84 (WKID: 4326).

4326
sample_count Optional[int]

If only wanting to import enough data to understand the schema, specify the count of records with this parameter. If left blank, will import all records.

None
build_spatial_index bool

Optional if desired to build the spatial index once inserting all the data. Default is False.

True
compact bool

If the File Geodatabase should be compacted following import. Default is True.

True
Source code in src/arcpy_parquet/__main__.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
def parquet_to_feature_class(
    parquet_path: Path,
    output_feature_class: Path,
    schema_file: Path = None,
    geometry_type: Optional[Literal["GEOPARQUET", "COORDINATES", "H3"]] = "GEOPARQUET",
    parquet_partitions: Optional[List[str]] = None,
    geometry_column: Union[List[str], str] = "wkb",
    spatial_reference: Union[arcpy.SpatialReference, str, int] = 4326,
    sample_count: Optional[int] = None,
    build_spatial_index: bool = True,
    compact: bool = True,
) -> Path:
    """
    Convert a *properly formatted* Parquet source into a Feature Class in a Geodatabase.

    !!! note
        The geometry *must* be encoded as well known binary (WKB), and complex field types
        such as arrays and structs are not supported.

    Args:
        parquet_path: The directory or Parquet part file to convert.
        output_feature_class: Where to save the new Feature Class.
        schema_file: CSV file with detailed schema properties.
        geometry_type: ``POINT``, ``COORDINATES``, ``H3`` ``POLYLINE``, or ``POLYGON`` describing the geometry type.
            Default is ``COORDINATES``. ``COORDINATES``, is a point geometry type described by two coordinate columns.
            ``H3`` is a column containing H3 indices as hexadecimal strings. Polygon geometries will be created based
            on the H3 indices for the rows.
        parquet_partitions: Partition name and values, if available, to select. For instance,
            if partitioned by country column using ISO2 identifiers, select Mexico using
            ``country=mx``.
        geometry_column: Column from parquet table containing the geometry encoded as WKB. Default
            is ``wkb``. If the geometry type is ``COORDINATES``, this must be an iterable (tuple or list) of
            the x (longitude) and y (latitude) columns containing the coordinates.
        spatial_reference: Spatial reference of input data. Default is WGS84 (WKID: 4326).
        sample_count: If only wanting to import enough data to understand the schema, specify
            the count of records with this parameter. If left blank, will import all records.
        build_spatial_index: Optional if desired to build the spatial index once inserting all the data.
            Default is ``False``.
        compact: If the File Geodatabase should be compacted following import. Default is ``True``.
    """
    # if used as a tool in pro, let user know what is going on...kind of
    arcpy.SetProgressorLabel("Warming up the Flux Capacitor...")

    # ensure coordinates are provided if using COORDINATES geometry type
    if geometry_type == "COORDINATES" and not isinstance(
        geometry_column, (tuple, list)
    ):
        raise ValueError(
            "If using COORDINATES as the geometry type, you must provide an iterable (list or tuple) "
            "of the x and y column names for the coordinates."
        )

    # make sure paths...are paths
    parquet_path = (
        parquet_path if isinstance(parquet_path, Path) else Path(parquet_path)
    )
    output_feature_class = (
        output_feature_class
        if isinstance(output_feature_class, Path)
        else Path(output_feature_class)
    )

    # ensure will not encounter unexpected results based on incompatible input parameter or parameter combinations
    if not parquet_path.exists():
        raise ValueError(
            f"Cannot locate the input path {parquet_path}. Please double check to ensure the path is "
            f"correct and reachable."
        )

    elif parquet_path.is_dir():
        # get all the part files to start with
        pqt_prts = [prt for prt in parquet_path.rglob("part-*.parquet")]

        # now, filter based on parts being part of string - enables to specify nested partition
        if isinstance(parquet_partitions, list):
            for partition in parquet_partitions:
                pqt_prts = [prt for prt in pqt_prts if partition in str(prt)]

        # if a list is not provided, throw a fit
        elif parquet_partitions is not None:
            raise ValueError("parquet_partitions must be a list")

        # ensure we have part files still left to work with
        assert len(pqt_prts) > 0, (
            "The provided directory and partitions do not appear to contain any parquet "
            "part files."
        )
    elif parquet_path.is_file() and parquet_partitions is not None or len(parquet_partitions) > 0:
        raise ValueError("If providing a parquet part file, you cannot specify a parquet partition.")

    else:
        raise ValueError(
            "parquet_path must be either a directory for parquet data or a specific part file."
        )

    # create a PyArrow Dataset to read from
    dataset = pq.ParquetDataset(parquet_path, use_legacy_dataset=False)

    # extract the spatial reference from the parquet metadata if using geoparquet
    if geometry_type == "GEOPARQUET":
        # get the geometry information from the parquet metadata if available
        geo_str = dataset.schema.metadata.get(b"geo")

        # if no geometry metadata, raise an error
        if geo_str is None:
            raise ValueError(
                "The input parquet data does not appear to be formatted as Geoparquet. "
                "No geometry metadata was found."
            )

        # get the geometry metadata as a dictionary
        geo_dict = json.loads(geo_str.decode("utf-8"))

        # get the geometry column
        geometry_column = geo_dict.get("primary_column")

        # get the geometry column metadata
        col_meta = geo_dict.get("columns").get(geometry_column)

        # extract the geometry type from the metadata
        geometry_type = col_meta.get("geometry_type").upper()

        # account for line type names difference
        if geometry_type == "LINESTRING":
            geometry_type = "POLYLINE"

        # extract the spatial reference from the metadata if available
        if col_meta.get("crs") is not None:
            spatial_reference = arcpy.SpatialReference(col_meta.get("crs"))

    # create a spatial reference object if needed
    if isinstance(spatial_reference, (int, str)):
        spatial_reference = arcpy.SpatialReference(spatial_reference)

    # slightly change how column names are handled if using coordinates or h3
    if isinstance(geometry_column, (tuple, list)):
        # ensure coordinate columns are in input data
        if (
            geometry_column[0] not in dataset.schema.names
            or geometry_column[1] not in dataset.schema.names
        ):
            raise ValueError(
                f"The geometry_column names provided for the coordinate columns do not appear to be in "
                f"the input parquet columns."
            )

        # get a list of the string column types and field aliases from parquet
        col_typ_lst, attr_alias_lst = zip(
            *[
                (str(c.type.value_type), c.name)
                if isinstance(c.type, pa.DictionaryType)
                else (str(c.type), c.name)
                for c in dataset.schema
                # if c.name not in geometry_column  # geometry columns get dropped
            ]
        )

    else:
        # ensure geometry column is in input data
        if geometry_column not in dataset.schema.names:
            raise ValueError(
                "The geometry_column does not appear to be in the input parquet columns."
            )

        # get a list of the string column types and field aliases from parquet
        col_typ_lst, attr_alias_lst = zip(
            *[
                (str(c.type.value_type), c.name)
                if isinstance(c.type, pa.DictionaryType)
                else (str(c.type), c.name)
                for c in dataset.schema
                if c.name not in geometry_column
            ]
        )

    # prepend any column names starting with a number with an 'c' and save as the field names
    attr_nm_lst = [f"c{c}" if c[0].isdigit() else c for c in attr_alias_lst]

    # use these to map to esri field types
    fld_typ_lst = [import_dtype_dict[typ] for typ in col_typ_lst]

    # check for really strange and uncaught error in naming
    if output_feature_class.name.lower().startswith("delta"):
        raise ValueError('Feature Class name cannot start with "delta".')

    # create the new feature class
    arcpy.management.CreateFeatureclass(
        out_path=str(output_feature_class.parent),
        out_name=output_feature_class.name,
        geometry_type=geom_dict[geometry_type][0],
        spatial_reference=spatial_reference,
        has_m=geom_dict[geometry_type][1],
        has_z=geom_dict[geometry_type][2],
    )

    logger.info(f"Created feature class at {str(output_feature_class)}")

    # if a schema file is not provided read from the input data
    if schema_file is None:
        # get the maximum string lengths from the parquet data for use in field creation
        schema_dict = get_parquet_max_string_lengths(dataset)

    # if a schema file is provided as part of input, load it to a dict using Pandas because it's easy
    else:
        # if a directory for the schema is provided, get the enclosed csv file
        if schema_file.is_dir() and schema_file.stem == "schema":
            # try to get the csv file
            csv_lst = list(schema_file.glob("*.csv"))
            if len(csv_lst) > 0:
                schema_file = csv_lst[0]

        # read the csv into a Pandas DataFrame
        schema_df = pd.read_csv(schema_file)

        # swap out the string types for text so add field works
        schema_df.loc[schema_df["field_type"] == "String", "field_type"] = "TEXT"

        # dump to a dict
        schema_dict = {
            k: v for k, v in zip(schema_df["field_name"], schema_df.to_dict("records"))
        }

    # iteratively add columns using the introspected field name, alias, and type
    for nm, alias, typ in zip(attr_nm_lst, attr_alias_lst, fld_typ_lst):
        # if the field name exists in the dictionary, peel off a single field's properties and add a field using them
        if nm in schema_dict.keys():
            prop_dict = schema_dict.pop(nm)

            # handle if field properties are just a numeric string length introspected from parquet metadata
            if isinstance(prop_dict, int):
                prop_dict = dict(
                    field_name=nm,
                    field_type=typ,
                    field_length=int(prop_dict),
                    field_alias=alias,
                    field_is_nullable="NULLABLE",
                )

            arcpy.management.AddField(in_table=str(output_feature_class), **prop_dict)

            # for logging progress
            log_dict = dict()
            log_dict["in_table"] = str(output_feature_class)
            log_dict = {**log_dict, **prop_dict}

        # otherwise, add based on introspected properties
        else:
            arcpy.management.AddField(
                in_table=str(output_feature_class),
                field_name=nm,
                field_type=typ,
                field_length=512,
                field_alias=alias,
                field_is_nullable="NULLABLE",
            )

            # for logging progress
            log_dict = dict(
                in_table=str(output_feature_class),
                field_name=nm,
                field_type=typ,
                field_length=512,
                field_alias=alias,
                field_is_nullable="NULLABLE",
            )

        # log progress
        logger.info(f"Field added to Feature Class {log_dict}")

    # if any fields are defined in the schema file still left over, add them
    for nm in schema_dict.keys():
        arcpy.management.AddField(
            in_table=str(output_feature_class), **schema_dict.get(nm)
        )

        # log remaining results
        log_dict = dict()
        log_dict["in_table"] = str(output_feature_class)
        log_dict = {**log_dict, **schema_dict}
        logger.info(
            f"Field added from schema file, but not detected in input data {log_dict}"
        )

    # interrogate the ACTUAL column names since, depending on the database, names can get truncated
    fc_fld_dict = {
        c.aliasName: c.name
        for c in arcpy.ListFields(str(output_feature_class))
        if c.aliasName in attr_alias_lst
    }

    # depending on the input geometry type, set the insert cursor geometry type
    if geometry_type == "COORDINATES":
        insert_geom_typ = "SHAPE@XY"
    elif geometry_type == "H3":
        insert_geom_typ = "SHAPE@"
    else:
        insert_geom_typ = "SHAPE@WKB"

    # create the list of feature class columns for the insert cursor and for row lookup from parquet from pydict object
    insert_col_lst = list(fc_fld_dict.values()) + [insert_geom_typ]
    pydict_col_lst = list(fc_fld_dict.keys())

    # this prevents pyarrow from getting hung up
    arcpy.env.autoCancelling = False

    # set up so progress is communicated to user
    arcpy.SetProgressorLabel("Importing rows...")

    # variable to track completed count
    added_cnt = 0

    # variable to track fail count
    fail_cnt = 0

    # create a cursor for inserting rows
    with arcpy.da.InsertCursor(
        str(output_feature_class), insert_col_lst
    ) as insert_cursor:
        # partition replacement values dictionary
        partition_values_dict = {}

        # flag for if at sample count and need to break out of loop
        at_sample_count = False

        # variable to track start time
        start_time = time.time()

        # create a PyArrow table to iterate
        ds_tbl = dataset.read(columns=pydict_col_lst)

        # get the total number of rows in the dataset
        ds_row_cnt = ds_tbl.num_rows

        logger.info(f"Starting to import {ds_row_cnt:,} rows from parquet data.")

        # iteratively process the full dataset to avoid memory overruns
        for batch_idx, batch_tbl in enumerate(ds_tbl.to_batches(max_chunksize=30000)):
            # pull the parquet data into a dict
            pqt_pydict = batch_tbl.to_pydict()

            # transpose the dictionary of lists into a list of dictionaries
            dict_lst = [
                dict(zip(pqt_pydict.keys(), values))
                for values in zip(*pqt_pydict.values())
            ]

            # for every row index in the number of rows
            for pqt_idx, row_pydict in enumerate(dict_lst):
                # instantiate the row variable so error messages can be formatted.
                row = None

                # try to add the row
                try:
                    # populate the row dictionary with values from the partition dict to match the insert cursor columns
                    row_dict = {k: row_pydict.get(k) for k in pydict_col_lst}

                    # if the geometry is being generated from coordinate columns, create the coordinate tuple
                    if geometry_type == "COORDINATES":
                        geom_tpl = (
                            row_pydict[geometry_column[0]],
                            row_pydict[geometry_column[1]],
                        )
                        row_dict[insert_geom_typ] = geom_tpl

                    # if geometry created from H3 index, create the geometry
                    elif geometry_type == "H3":
                        geom = h3_index_to_geometry(row_pydict[geometry_column])
                        row_dict[insert_geom_typ] = geom

                    # otherwise, just tack wkb geometry onto list
                    else:
                        row_dict[insert_geom_typ] = row_pydict[geometry_column]

                    # create a row object by plucking out the values from the row dictionary
                    row = tuple(row_dict.values())

                    # insert the row
                    insert_cursor.insertRow(row)

                    # update the completed count
                    added_cnt += 1

                # if cannot add the row
                except Exception as e:
                    # handle case of having issues prior to even getting the row
                    if row is None:
                        logger.error(
                            f"Could create row object for parquet row index {pqt_idx}.\npydict: {pqt_pydict}\n"
                            f"row_dict: {row_dict}\nbatch index: {batch_idx}\n\nMessage: {e}"
                        )
                        raise

                    else:
                        # update the fail count
                        fail_cnt += 1

                        # make sure the reason is tracked
                        logger.warning(
                            f"Could not import row.\n\nContents:{row}\n\nMessage: {e}"
                        )

                # check of at sample count
                if added_cnt == sample_count:
                    at_sample_count = True
                    break

                # provide status updates every 1000 features, and provide an exit if cancelled
                if added_cnt % 1000 == 0:
                    arcpy.SetProgressorLabel(f"Imported {added_cnt:,} rows...")

                    if arcpy.env.isCancelled:
                        break

                # provide messages every 10,000 features
                if added_cnt % 10000 == 0:
                    # find the elapsed time
                    elapsed_time = time.time() - start_time

                    # calculate the rate per hour
                    rate = round(added_cnt / elapsed_time * 3600)

                    # get the remaining count
                    remaining_cnt = ds_row_cnt - added_cnt

                    # calculate the per record time
                    per_record_time = elapsed_time / added_cnt if added_cnt > 0 else 0

                    # estimate the remaining time
                    est_remain_time = timedelta(
                        seconds=round(remaining_cnt * per_record_time)
                    )

                    # format remaining time as a string
                    if est_remain_time.days > 0:
                        remain_str = f"{est_remain_time.days} days, {str(timedelta(seconds=est_remain_time.seconds))}"
                    else:
                        remain_str = str(timedelta(seconds=est_remain_time.seconds))

                    logger.info(
                        f"Imported {added_cnt:,} rows at a rate of {rate:,} per hour. Estimated time remaining: {remain_str}."
                    )

            # ensure next batch is not run if cancelled or only running a sample
            if arcpy.env.isCancelled or at_sample_count:
                break

    # declare success, and track failure if necessary
    success_msg = f"Successfully imported {added_cnt:,} rows."
    arcpy.SetProgressorLabel(success_msg)
    arcpy.ResetProgressor()
    logger.info(success_msg)

    if fail_cnt > 0:
        fail_msg = f"Failure count: {fail_cnt:,}"
        logger.warning(fail_msg)

    # build spatial index if requested
    if build_spatial_index:
        arcpy.SetProgressorLabel("Building spatial index.")
        arcpy.management.AddSpatialIndex(str(output_feature_class))
        logger.info("Completed building spatial index.")

    # if compacting, do it
    if compact:
        arcpy.SetProgressorLabel("Compacting data.")
        arcpy.management.Compact(str(output_feature_class.parent))
        logger.info("Successfully compacted data.")

    return output_feature_class