Skip to content

API

feature_class_to_parquet(input_table, output_parquet, include_geometry=True, geometry_format='WKB', batch_size=300000)

Export a Feature Class to Parquet.

Parameters:

Name Type Description Default
input_table Path

Path to feature class or table.

required
output_parquet Path

Path to where the output Parquet file will be saved.

required
include_geometry bool

Whether to include the geometry in the output Parquet dataset.

True
geometry_format str

If including the geometry, what format the geometry should be in, either XY, WKT, or WKB. Default is WKB.

'WKB'
batch_size int

Count of records per part-*.parquet file.

300000
Source code in src/arcpy_parquet/__main__.py
def feature_class_to_parquet(
    input_table: Path,
    output_parquet: Path,
    include_geometry: bool = True,
    geometry_format: str = "WKB",
    batch_size: int = 300000,
) -> Path:
    """
    Export a Feature Class to Parquet.

    Args:
        input_table: Path to feature class or table.
        output_parquet: Path to where the output Parquet file will be saved.
        include_geometry: Whether to include the geometry in the output Parquet dataset.
        geometry_format: If including the geometry, what format the geometry should be in, either
            ``XY``, ``WKT``, or ``WKB``. Default is ``WKB``.
        batch_size: Count of records per ``part-*.parquet`` file.
    """
    # downgrade path so arcpy can use strings
    fc_pth = str(input_table)

    # check the geometry output type
    geometry_format = geometry_format.upper()
    assert geometry_format in (
        "XY",
        "WKT",
        "WKB",
    ), "Geometry output format must be either XY, WKB or WKT."

    # make sure the full output path exists where the data_dir will be saved
    if not output_parquet.exists():
        output_parquet.mkdir(parents=True)

    # describe the data to access properties for validation
    desc = arcpy.da.Describe(fc_pth)

    # fields to be excluded
    exclude_fld_typ = ["Raster"]
    exclude_fld_lst = []

    if desc["hasOID"]:
        exclude_fld_lst.append(desc["OIDFieldName"])

    for shp_fld_key in ["lengthFieldName", "areaFieldName"]:
        shp_fld = desc.get(shp_fld_key)
        if shp_fld is not None:
            exclude_fld_lst.append(shp_fld)

    # get a list of input fields to use with the search cursor
    sc_col_lst = [
        f.name
        for f in desc["fields"]
        if f.name not in exclude_fld_lst and f.type not in exclude_fld_typ
    ]

    # iterate fields to create output schema, excluding geometry since handled explicitly later
    pa_fld_typ_xcld = exclude_fld_typ + ["Geometry"]
    pa_fld_lst = [
        f
        for f in desc["fields"]
        if f.name not in exclude_fld_lst and f.type not in pa_fld_typ_xcld
    ]
    pa_typ_lst = [
        (f.name, export_dtype_dict.get(f.type, pa.string())) for f in pa_fld_lst
    ]
    pq_schema = pa.schema(pa_typ_lst)

    # if the input has geometry (is a feature class)
    if desc.get("shapeFieldName") is not None and include_geometry:
        # get the name of the geometry column
        geom_nm = desc.get("shapeFieldName")

        # since the geometry must be specially formatted, it needs to first be removed from the list
        sc_col_lst.remove(geom_nm)

        # if just outputting centroid (point coordinates)
        if pq_schema == "XY":
            # add X and Y columns to search cursor list and the output schema
            for prt in ["X", "Y"]:
                sc_col_lst.append(f"{geom_nm}@{prt}")
                pq_schema = pq_schema.append(pa.field(f"geometry_{prt}", pa.float64()))

        # if working with any other output type, just use the specific output format
        else:
            # add the correct search cursor type
            sc_col_lst.append(f"{geom_nm}@{geometry_format}")

            # add the geometry to the output schema
            pq_schema = pq_schema.append(
                pa.field(f"{geometry_format.lower()}", pa.binary())
            )

    # if the geometry is not desired in the output, remove it from the search cursor column list
    if "shapeFieldName" in desc.keys() and not include_geometry:
        sc_col_lst.remove(desc["shapeFieldName"])

    # get values from the data to track progress
    max_range = int(arcpy.management.GetCount(str(input_table))[0])
    rep_range = max(1, max_range // 100)

    # report progress
    features_tense = "feature" if max_range == 1 else "features"
    arcpy.AddMessage(f"Starting export of {max_range:,} {features_tense}.")
    arcpy.SetProgressor("step", "Exporting...", 0, max_range, rep_range)

    # turn off auto cancelling since handling in loop
    arcpy.env.autoCancelling = False

    # create a template dictionary for data export
    pa_dict = {col: [] for col in pq_schema.names}

    # create a search cursor to work through the data_dir
    with arcpy.da.SearchCursor(str(input_table), sc_col_lst) as search_cur:
        # variable for batch numbering
        prt_num = 0

        # begin to iterate through the features
        for idx, row in enumerate(search_cur):
            # add each row column partition_column_list to the respective key in the dictionary
            for col, val in zip(pq_schema.names, row):
                pa_dict[col].append(val)

            # if at a percent interval
            if idx % rep_range == 0:
                # report progress
                arcpy.SetProgressorPosition(idx)

                # check if cancelled, and if so, break out
                if arcpy.env.isCancelled:
                    break

            # if at a batch size (part) interval or end of dataset
            if (idx + 1) % batch_size == 0 or (idx + 1) == max_range:
                # create a PyArrow table object instance from the accumulated dictionary
                pa_tbl = pa.Table.from_pydict(pa_dict, pq_schema)

                # create a name and path string for the table
                part_nm = f"part-{uuid.uuid4().hex}-{prt_num:05d}.snappy.parquet"
                part_pth = output_parquet / part_nm

                # write the parquet part
                pq.write_table(
                    table=pa_tbl,
                    where=str(part_pth),
                    # filesystem=pa.filesystem.LocalFileSystem,
                    version="2.0",
                    flavor="spark_session",
                    compression="snappy",
                )

                # increment the naming index
                prt_num += 1

                # reset the loading dictionary
                pa_dict = {col: [] for col in pq_schema.names}

    # reset the progress indicator
    arcpy.ResetProgressor()

    return output_parquet

parquet_to_feature_class(parquet_path, output_feature_class, schema_file=None, geometry_type='POINT', parquet_partitions=None, geometry_column='wkb', spatial_reference=4326, sample_count=None, build_spatial_index=False, compact=True)

Convert a properly formatted Parquet source into a Feature Class in a Geodatabase.

.. note:: The geometry must be encoded as well known binary (WKB), and complex field types such as arrays and structs are not supported.

Parameters:

Name Type Description Default
parquet_path Path

The directory or Parquet part file to convert.

required
output_feature_class Path

Where to save the new Feature Class.

required
schema_file Path

CSV file with detailed schema properties.

None
geometry_type str

POINT, COORDINATES, H3 POLYLINE, or POLYGON describing the geometry type. Default is COORDINATES. COORDINATES, is a point geometry type described by two coordinate columns. H3 is a column containing H3 indices as hexadecimal strings. Polygon geometries will be created based on the H3 indices for the rows.

'POINT'
parquet_partitions Optional[List[str]]

Partition name and values, if available, to select. For instance, if partitioned by country column using ISO2 identifiers, select Mexico using country=mx.

None
geometry_column Union[List[str], str]

Column from parquet table containing the geometry encoded as WKB. Default is wkb. If the geometry type is COORDINATES, this must be an iterable (tuple or list) of the x (longitude) and y (latitude) columns containing the coordinates.

'wkb'
spatial_reference Union[SpatialReference, str, int]

Spatial reference of input data. Default is WGS84 (WKID: 4326).

4326
sample_count Optional[int]

If only wanting to import enough data to understand the schema, specify the count of records with this parameter. If left blank, will import all records.

None
build_spatial_index bool

Optional if desired to build the spatial index once inserting all the data. Default is False.

False
compact bool

If the File Geodatabase should be compacted following import. Default is True.

True
Source code in src/arcpy_parquet/__main__.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
def parquet_to_feature_class(
    parquet_path: Path,
    output_feature_class: Path,
    schema_file: Path = None,
    geometry_type: str = "POINT",
    parquet_partitions: Optional[List[str]] = None,
    geometry_column: Union[List[str], str] = "wkb",
    spatial_reference: Union[arcpy.SpatialReference, str, int] = 4326,
    sample_count: Optional[int] = None,
    build_spatial_index: bool = False,
    compact: bool = True,
) -> Path:
    """
    Convert a *properly formatted* Parquet source into a Feature Class in a Geodatabase.

    .. note::
        The geometry *must* be encoded as well known binary (WKB), and complex field types
        such as arrays and structs are not supported.

    Args:
        parquet_path: The directory or Parquet part file to convert.
        output_feature_class: Where to save the new Feature Class.
        schema_file: CSV file with detailed schema properties.
        geometry_type: ``POINT``, ``COORDINATES``, ``H3`` ``POLYLINE``, or ``POLYGON`` describing the geometry type.
            Default is ``COORDINATES``. ``COORDINATES``, is a point geometry type described by two coordinate columns.
            ``H3`` is a column containing H3 indices as hexadecimal strings. Polygon geometries will be created based
            on the H3 indices for the rows.
        parquet_partitions: Partition name and values, if available, to select. For instance,
            if partitioned by country column using ISO2 identifiers, select Mexico using
            ``country=mx``.
        geometry_column: Column from parquet table containing the geometry encoded as WKB. Default
            is ``wkb``. If the geometry type is ``COORDINATES``, this must be an iterable (tuple or list) of
            the x (longitude) and y (latitude) columns containing the coordinates.
        spatial_reference: Spatial reference of input data. Default is WGS84 (WKID: 4326).
        sample_count: If only wanting to import enough data to understand the schema, specify
            the count of records with this parameter. If left blank, will import all records.
        build_spatial_index: Optional if desired to build the spatial index once inserting all the data.
            Default is ``False``.
        compact: If the File Geodatabase should be compacted following import. Default is ``True``.
    """
    # convert integer to Spatial Reference
    if isinstance(spatial_reference, (int, str)):
        spatial_reference = arcpy.SpatialReference(spatial_reference)

    # if used as a tool in pro, let user know what is going on...kind of
    arcpy.SetProgressorLabel("Warming up the Flux Capacitor...")

    # make sure paths...are paths
    parquet_path = (
        parquet_path if isinstance(parquet_path, Path) else Path(parquet_path)
    )
    output_feature_class = (
        output_feature_class
        if isinstance(output_feature_class, Path)
        else Path(output_feature_class)
    )

    # ensure will not encounter unexpected results based on incompatible input parameter or parameter combinations
    if not parquet_path.exists():
        raise ValueError(
            f"Cannot locate the input path {parquet_path}. Please double check to ensure the path is "
            f"correct and reachable."
        )

    elif parquet_path.is_dir():
        # get all the part files to start with
        pqt_prts = [prt for prt in parquet_path.rglob("part-*.parquet")]

        # now, filter based on parts being part of string - enables to specify nested partition
        if isinstance(parquet_partitions, list):
            for partition in parquet_partitions:
                pqt_prts = [prt for prt in pqt_prts if partition in str(prt)]

        # if a list is not provided, throw a fit
        elif parquet_partitions is not None:
            raise ValueError("parquet_partitions must be a list")

        # ensure we have part files still left to work with
        assert len(pqt_prts) > 0, (
            "The provided directory and partitions do not appear to contain any parquet "
            "part files."
        )
    elif parquet_path.is_file():
        assert (
            parquet_partitions is None
        ), "If providing a parquet part file, you cannot specify a parquet partition."
        assert parquet_path.stem.startswith(
            "part-"
        ), "The provided file does not appear to be a parquet part file."

        # assemble into list so iteration below works
        pqt_prts = [parquet_path]

    else:
        raise ValueError(
            "parquet_path must be either a directory for parquet data or a specific part file."
        )

    # validate the input geometry type
    if geometry_type not in geom_dict.keys():
        raise ValueError(
            f'geometry_type must be from the following list [{", ".join(geom_dict.keys())}]. You '
            f'provided "{geometry_type}".'
        )

    # create a PyArrow Table to read from
    pqt_ds = pq.ParquetDataset(parquet_path, use_legacy_dataset=False)

    # slightly change how column names are handled if using coordinates or h3
    if isinstance(geometry_column, (tuple, list)):
        # ensure coordinate columns are in input data
        if (
            geometry_column[0] not in pqt_ds.schema.names
            or geometry_column[1] not in pqt_ds.schema.names
        ):
            raise ValueError(
                f"The geometry_column names provided for the coordinate columns do not appear to be in "
                f"the input parquet columns."
            )

        # get a list of the string column types and field aliases from parquet
        col_typ_lst, attr_alias_lst = zip(
            *[
                (str(c.type.value_type), c.name)
                if isinstance(c.type, pa.DictionaryType)
                else (str(c.type), c.name)
                for c in pqt_ds.schema
                # if c.name not in geometry_column  # geomery columns get dropped
            ]
        )

    else:
        # ensure geometry column is in input data
        if geometry_column not in pqt_ds.schema.names:
            raise ValueError(
                "The geometry_column does not appear to be in the input parquet columns."
            )

        # get a list of the string column types and field aliases from parquet
        col_typ_lst, attr_alias_lst = zip(
            *[
                (str(c.type.value_type), c.name)
                if isinstance(c.type, pa.DictionaryType)
                else (str(c.type), c.name)
                for c in pqt_ds.schema
                if c.name not in geometry_column
            ]
        )

    # prepend any column names starting with a number with an 'c' and save as the field names
    attr_nm_lst = [f"c{c}" if c[0].isdigit() else c for c in attr_alias_lst]

    # use these to map to esri field types
    fld_typ_lst = [import_dtype_dict[typ] for typ in col_typ_lst]

    # check for really strange and uncaught error in naming
    if output_feature_class.name.lower().startswith("delta"):
        raise ValueError('Feature Class name cannot start with "delta".')

    # create the new feature class
    arcpy.management.CreateFeatureclass(
        out_path=str(output_feature_class.parent),
        out_name=output_feature_class.name,
        geometry_type=geom_dict[geometry_type][0],
        spatial_reference=spatial_reference,
        has_m=geom_dict[geometry_type][1],
        has_z=geom_dict[geometry_type][2],
    )

    logger.info(f"Created feature class at {str(output_feature_class)}")

    # if a schema file is provided as part of input, load it to a dict using Pandas because it's easy
    if schema_file is None:
        schema_dict = {}

    else:

        # if a directory for the schema is provided, get the enclosed csv file
        if schema_file.is_dir() and schema_file.stem == 'schema':

            # try to get the csv file
            csv_lst = list(schema_file.glob('*.csv'))
            if len(csv_lst) > 0:
                schema_file = csv_lst[0]

        # read the csv into a Pandas DataFrame
        schema_df = pd.read_csv(schema_file)

        # swap out the string types for text so add field works
        schema_df.loc[schema_df["field_type"] == "String", "field_type"] = "TEXT"

        # dump to a dict
        schema_dict = {
            k: v for k, v in zip(schema_df["field_name"], schema_df.to_dict("records"))
        }

    # iteratively add columns using the introspected field name, alias, and type
    for nm, alias, typ in zip(attr_nm_lst, attr_alias_lst, fld_typ_lst):
        # if the field name exists in the dictionary, peel off a single field's properties and add a field using them
        if nm in schema_dict.keys():
            prop_dict = schema_dict.pop(nm)
            arcpy.management.AddField(in_table=str(output_feature_class), **prop_dict)

            # for logging progress
            log_dict = dict()
            log_dict["in_table"] = str(output_feature_class)
            log_dict = {**log_dict, **prop_dict}

        # otherwise, add based on introspected properties
        else:
            arcpy.management.AddField(
                in_table=str(output_feature_class),
                field_name=nm,
                field_type=typ,
                field_length=512,
                field_alias=alias,
                field_is_nullable="NULLABLE",
            )

            # for logging progress
            log_dict = dict(
                in_table=str(output_feature_class),
                field_name=nm,
                field_type=typ,
                field_length=512,
                field_alias=alias,
                field_is_nullable="NULLABLE",
            )

        # log progress
        logger.info(f"Field added to Feature Class {log_dict}")

    # if any fields are defined in the schema file still left over, add them
    for nm in schema_dict.keys():
        arcpy.management.AddField(
            in_table=str(output_feature_class), **schema_dict.get(nm)
        )

        # log remaining results
        log_dict = dict()
        log_dict["in_table"] = str(output_feature_class)
        log_dict = {**log_dict, **schema_dict}
        logger.info(
            f"Field added from schema file, but not detected in input data {log_dict}"
        )

    # interrogate the ACTUAL column names since, depending on the database, names can get truncated
    fc_fld_dict = {
        c.aliasName: c.name
        for c in arcpy.ListFields(str(output_feature_class))
        if c.aliasName in attr_alias_lst
    }

    # depending on the input geometry type, set the insert cursor geometry type
    if geometry_type == "COORDINATES":
        insert_geom_typ = "SHAPE@XY"
    elif geometry_type == "H3":
        insert_geom_typ = "SHAPE@"
    else:
        insert_geom_typ = "SHAPE@WKB"

    # create the list of feature class columns for the insert cursor and for row lookup from parquet from pydict object
    insert_col_lst = list(fc_fld_dict.values()) + [insert_geom_typ]
    pydict_col_lst = list(fc_fld_dict.keys())

    # this prevents pyarrow from getting hung up
    arcpy.env.autoCancelling = False

    # set up so progress is communicated to user
    arcpy.SetProgressorLabel("Importing rows...")

    # variable to track completed count
    added_cnt = 0

    # variable to track fail count
    fail_cnt = 0

    # create a cursor for inserting rows
    with arcpy.da.InsertCursor(
        str(output_feature_class), insert_col_lst
    ) as insert_cursor:
        # partition replacement values dictionary
        partition_values_dict = {}

        # flag for if at sample count and need to break out of loop
        at_sample_count = False

        # variable to track start time
        start_time = time.time()

        # iterate the parquet part files
        for part_file in pqt_prts:
            # get path parts for potential parent partitions
            partition_lst = [
                pth_prt.split("=")
                for pth_prt in part_file.relative_to(parquet_path).parent.parts
            ]

            # if there are parent partitions, load them into a dictionary
            if len(partition_lst) > 0:
                partition_values_dict = dict(partition_lst)
            else:
                partition_values_dict = {}

            # load part file into a PyArrow Table
            pa_tbl = pq.read_table(part_file)

            # pull the parquet data into a dict
            pqt_pydict = pa_tbl.to_pydict()

            # for every row index in the number of rows
            for pqt_idx in range(pa_tbl.num_rows):
                # instantiate the row variable so error messages can be formatted.
                row = None

                # try to add the row
                try:
                    # start creating a dict of single key partition_column_list pairs for this row of data_dir
                    row_pydict = {k: v[pqt_idx] for k, v in pqt_pydict.items()}

                    # populate the row dictionary with values from the partition dict
                    row_dict = {k: row_pydict.get(k) for k in pydict_col_lst}

                    # fill any partition values
                    for p_key in partition_values_dict.keys():
                        row_dict[p_key] = partition_values_dict[p_key]

                    # if the geometry is being generated from coordinate columns, create the coordinate tuple
                    if geometry_type == "COORDINATES":
                        geom_tpl = (
                            row_pydict[geometry_column[0]],
                            row_pydict[geometry_column[1]],
                        )
                        row_dict[insert_geom_typ] = geom_tpl

                    # if geometry created from H3 index, create the geometry
                    elif geometry_type == "H3":
                        geom = h3_index_to_geometry(row_pydict[geometry_column])
                        row_dict[insert_geom_typ] = geom

                    # otherwise, just tack wkb geometry onto list
                    else:
                        row_dict[insert_geom_typ] = row_pydict[geometry_column]

                    # create a row object by plucking out the values from the row dictionary
                    row = tuple(row_dict.values())

                    # insert the row
                    insert_cursor.insertRow(row)

                    # update the completed count
                    added_cnt += 1

                # if cannot add the row
                except Exception as e:
                    # handle case of having issues prior to even getting the row
                    if row is None:
                        raise

                    else:
                        # update the fail count
                        fail_cnt += 1

                        # make sure the reason is tracked
                        logger.warning(
                            f"Could not import row.\n\nContents:{row}\n\nMessage: {e}"
                        )

                # check of at sample count
                if added_cnt == sample_count:
                    at_sample_count = True
                    break

                # provide status updates every 1000 features, and provide an exit if cancelled
                if added_cnt % 1000 == 0:
                    arcpy.SetProgressorLabel(f"Imported {added_cnt:,} rows...")

                    if arcpy.env.isCancelled:
                        break

                # provide messages every 10,000 features
                if added_cnt % 10000 == 0:
                    # find the elapsed time
                    elapsed_time = time.time() - start_time

                    # calculate the rate per hour
                    rate = round(added_cnt / elapsed_time * 3600)

                    logger.info(
                        f"Imported {added_cnt:,} rows at a rate of {rate:,} per hour..."
                    )

            # ensure next batch is not run if cancelled or only running a sample
            if arcpy.env.isCancelled or at_sample_count:
                break

    # declare success, and track failure if necessary
    success_msg = f"Successfully imported {added_cnt:,} rows."
    arcpy.SetProgressorLabel(success_msg)
    arcpy.ResetProgressor()
    logger.info(success_msg)

    if fail_cnt > 0:
        fail_msg = f"Failure count: {fail_cnt:,}"
        logger.warning(fail_msg)

    # if compacting, do it
    if compact:
        arcpy.SetProgressorLabel("Compacting data.")
        arcpy.management.Compact(str(output_feature_class.parent))
        logger.info("Successfully compacted data.")

    # build spatial index if requested
    if build_spatial_index:
        arcpy.SetProgressorLabel("Building spatial index.")
        arcpy.management.AddSpatialIndex(str(output_feature_class))
        logger.info("Completed building spatial index.")

    return output_feature_class