| Title: | A 'sparklyr' Extension for Nested Data |
|---|---|
| Description: | A 'sparklyr' extension adding the capability to work easily with nested data. |
| Authors: | Matt Pollock [aut, cre], The MITRE Corporation [cph] |
| Maintainer: | Matt Pollock <[email protected]> |
| License: | Apache License 2.0 | file LICENSE |
| Version: | 0.0.4 |
| Built: | 2026-05-28 07:02:11 UTC |
| Source: | https://github.com/mitre/sparklyr.nested |
Returns a simplified Avro schema containing only the specified fields.
Nested fields (e.g. points.latitude in a points array of records)
are supported. The field specification is the same for a field in a nested array
of records or for a field in a nested record. The function will detect the type
and produce a valid sub-schema.
avro_subschema( schema, fields, keep_docs = FALSE, keep_namespace = FALSE, keep_java_class = keep_namespace, keep_default = FALSE, ... ) avro_subschema_jobj( sc, schema, fields, keep_docs = FALSE, keep_namespace = FALSE, keep_java_class = keep_namespace, keep_default = FALSE, ... )avro_subschema( schema, fields, keep_docs = FALSE, keep_namespace = FALSE, keep_java_class = keep_namespace, keep_default = FALSE, ... ) avro_subschema_jobj( sc, schema, fields, keep_docs = FALSE, keep_namespace = FALSE, keep_java_class = keep_namespace, keep_default = FALSE, ... )
schema |
Character or list. The Avro schema to subset (JSON string or
parsed list). This can be extracted using |
fields |
List or character. The fields to keep. If a list, use nested structure mirroring the schema; if a character vector, use dot or dollar notation for nested paths. List form: Character form: |
keep_docs |
Logical. If |
keep_namespace |
Logical. If |
keep_java_class |
Logical. If |
keep_default |
Logical. If |
... |
Arguments passed to |
sc |
A |
The purpose of a sub-schema is use with spark_read_avro or
spark_read_parquet to read data more efficiently into Spark
DataFrames.
avro_subschema: A JSON string (subsetted schema).
avro_subschema_jobj: A Spark StructType Java object (spark_jobj).
Both functions accept the same schema and field arguments, but differ in their return value:
avro_subschemaReturns a JSON string representing the subsetted Avro schema. Useful when you need the schema as text (e.g. for storage, logging, or passing to another system).
avro_subschema_jobjReturns a Spark StructType Java
object (spark_jobj), ready to pass directly to
spark_read_parquet or
spark_read_avro as the schema argument.
Requires an active spark_connection.
## Not run: schema <- '{"type":"record","name":"X","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}' # JSON string output avro_subschema(schema, "a") avro_subschema(schema, list("a", "b")) # Spark StructType output (requires a spark connection) sc <- sparklyr::spark_connect(master = "local") jobj <- avro_subschema_jobj(sc, schema, "a") df <- sparklyr::spark_read_parquet(sc, path = "data.parquet", schema = jobj) ## End(Not run)## Not run: schema <- '{"type":"record","name":"X","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}' # JSON string output avro_subschema(schema, "a") avro_subschema(schema, list("a", "b")) # Spark StructType output (requires a spark connection) sc <- sparklyr::spark_connect(master = "local") jobj <- avro_subschema_jobj(sc, schema, "a") df <- sparklyr::spark_read_parquet(sc, path = "data.parquet", schema = jobj) ## End(Not run)
Parses an Avro JSON schema (character or list) and builds the corresponding
Spark StructType via sparklyr.nested type constructors. This is
useful for supplying a schema argument to
spark_read_parquet or
spark_read_avro.
schema_to_jobj(sc, schema)schema_to_jobj(sc, schema)
sc |
A |
schema |
Character or list. An Avro schema (JSON string or parsed list). |
A Spark StructType Java object (spark_jobj).
## Not run: sc <- sparklyr::spark_connect(master = "local") schema <- '{"type":"record","name":"X","fields":[{"name":"a","type":"string"}]}' jobj <- schema_to_jobj(sc, schema) df <- sparklyr::spark_read_parquet(sc, path = "data.parquet", schema = jobj) ## End(Not run)## Not run: sc <- sparklyr::spark_connect(master = "local") schema <- '{"type":"record","name":"X","fields":[{"name":"a","type":"string"}]}' jobj <- schema_to_jobj(sc, schema) df <- sparklyr::spark_read_parquet(sc, path = "data.parquet", schema = jobj) ## End(Not run)
Exploding an array column of length N will replicate the top level record N times.
The i^th replicated record will contain a struct (not an array) corresponding to the i^th element
of the exploded array. Exploding will not promote any fields or otherwise change the schema of
the data.
sdf_explode(x, column, is_map = FALSE, keep_all = FALSE)sdf_explode(x, column, is_map = FALSE, keep_all = FALSE)
x |
An object (usually a |
column |
The field to explode |
is_map |
Logical. The (scala) |
keep_all |
Logical. If |
Two types of exploding are possible. The default method calls the scala explode method.
This operation is supported in both Spark version > 1.6. It will however drop records where the
exploding field is empty/null. Alternatively keep_all=TRUE will use the explode_outer
scala method introduced in spark 2 to not drop any records.
## Not run: # first get some nested data iris_tbl <- copy_to(sc, iris, name="iris") iris_nst <- iris_tbl |> sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") |> group_by(Species) |> summarize(data=collect_list(data)) # then explode it iris_nst |> sdf_explode(data) ## End(Not run)## Not run: # first get some nested data iris_tbl <- copy_to(sc, iris, name="iris") iris_nst <- iris_tbl |> sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") |> group_by(Species) |> summarize(data=collect_list(data)) # then explode it iris_nst |> sdf_explode(data) ## End(Not run)
This function is like tidyr::nest. Calling this function will not
aggregate over other columns. Rather the output has the same number of
rows/records as the input. See examples of how to achieve row reduction
by aggregating elements using collect_list, which is a Spark SQL function
sdf_nest(x, ..., .key = "data")sdf_nest(x, ..., .key = "data")
x |
A Spark dataframe. |
... |
Columns to nest. |
.key |
Character. A name for the new column containing nested fields |
## Not run: # produces a dataframe with an array of characteristics nested under # each unique species identifier iris_tbl <- copy_to(sc, iris, name="iris") iris_tbl |> sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") |> group_by(Species) |> summarize(data=collect_list(data)) ## End(Not run)## Not run: # produces a dataframe with an array of characteristics nested under # each unique species identifier iris_tbl <- copy_to(sc, iris, name="iris") iris_tbl |> sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") |> group_by(Species) |> summarize(data=collect_list(data)) ## End(Not run)
These functions support flexible schema inspection both algorithmically and in human-friendly ways.
sdf_schema_json( x, parse_json = TRUE, simplify = FALSE, append_complex_type = TRUE ) sdf_schema_viewer( x, simplify = TRUE, append_complex_type = TRUE, use_react = FALSE )sdf_schema_json( x, parse_json = TRUE, simplify = FALSE, append_complex_type = TRUE ) sdf_schema_viewer( x, simplify = TRUE, append_complex_type = TRUE, use_react = FALSE )
x |
An |
parse_json |
Logical. If |
simplify |
Logical. If |
append_complex_type |
Logical. This only matters if |
use_react |
Logical. If |
## Not run: library(testthat) library(jsonlite) library(sparklyr) library(sparklyr.nested) sample_json <- paste0( '{"aircraft_id":["string"],"phase_sequence":["string"],"phases (array)":{"start_point (struct)":', '{"segment_phase":["string"],"agl":["double"],"elevation":["double"],"time":["long"],', '"latitude":["double"],"longitude":["double"],"altitude":["double"],"course":["double"],', '"speed":["double"],"source_point_keys (array)":["[string]"],"primary_key":["string"]},', '"end_point (struct)":{"segment_phase":["string"],"agl":["double"],"elevation":["double"],', '"time":["long"],"latitude":["double"],"longitude":["double"],"altitude":["double"],', '"course":["double"],"speed":["double"],"source_point_keys (array)":["[string]"],', '"primary_key":["string"]},"phase":["string"],"primary_key":["string"]},"primary_key":["string"]}' ) with_mock( # I am mocking functions so that the example works without a real spark connection spark_read_parquet = function(x, ...){return("this is a spark dataframe")}, sdf_schema_json = function(x, ...){return(fromJSON(sample_json))}, spark_connect = function(...){return("this is a spark connection")}, # the meat of the example is here sc <- spark_connect(), spark_data <- spark_read_parquet(sc, path="path/to/data/*.parquet", name="some_name"), sdf_schema_viewer(spark_data) ) ## End(Not run)## Not run: library(testthat) library(jsonlite) library(sparklyr) library(sparklyr.nested) sample_json <- paste0( '{"aircraft_id":["string"],"phase_sequence":["string"],"phases (array)":{"start_point (struct)":', '{"segment_phase":["string"],"agl":["double"],"elevation":["double"],"time":["long"],', '"latitude":["double"],"longitude":["double"],"altitude":["double"],"course":["double"],', '"speed":["double"],"source_point_keys (array)":["[string]"],"primary_key":["string"]},', '"end_point (struct)":{"segment_phase":["string"],"agl":["double"],"elevation":["double"],', '"time":["long"],"latitude":["double"],"longitude":["double"],"altitude":["double"],', '"course":["double"],"speed":["double"],"source_point_keys (array)":["[string]"],', '"primary_key":["string"]},"phase":["string"],"primary_key":["string"]},"primary_key":["string"]}' ) with_mock( # I am mocking functions so that the example works without a real spark connection spark_read_parquet = function(x, ...){return("this is a spark dataframe")}, sdf_schema_json = function(x, ...){return(fromJSON(sample_json))}, spark_connect = function(...){return("this is a spark connection")}, # the meat of the example is here sc <- spark_connect(), spark_data <- spark_read_parquet(sc, path="path/to/data/*.parquet", name="some_name"), sdf_schema_viewer(spark_data) ) ## End(Not run)
The select function works well for keeping/dropping top level fields. It does not
however support access to nested data. This function will accept complex field names
such as x.y.z where z is a field nested within y which is in turn
nested within x. Since R uses "$" to access nested elements and java/scala use ".",
sdf_select(data, x.y.z) and sdf_select(data, x$y$z) are equivalent.
sdf_select(x, ..., .aliases, .drop_parents = TRUE, .full_name = FALSE)sdf_select(x, ..., .aliases, .drop_parents = TRUE, .full_name = FALSE)
x |
An object (usually a |
... |
Fields to select |
.aliases |
Character. Optional. If provided these names will be matched positionally with
selected fields provided in |
.drop_parents |
Logical. If |
.full_name |
Logical. If |
dplyr allows the use of selection helpers (e.g., see everything).
These helpers only work for top level fields however. For now all nested fields that should
be promoted need to be explicitly identified.
## Not run: # produces a dataframe with an array of characteristics nested under # each unique species identifier iris_tbl <- copy_to(sc, iris, name="iris") iris_nst <- iris_tbl |> sdf_nest(Sepal_Length, Sepal_Width, .key="Sepal") # using java-like dot-notation iris_nst |> sdf_select(Species, Petal_Width, Sepal.Sepal_Width) # using R-like dollar-sign-notation iris_nst |> sdf_select(Species, Petal_Width, Sepal$Sepal_Width) # using dplyr selection helpers iris_nst |> sdf_select(Species, matches("Petal"), Sepal$Sepal_Width) ## End(Not run)## Not run: # produces a dataframe with an array of characteristics nested under # each unique species identifier iris_tbl <- copy_to(sc, iris, name="iris") iris_nst <- iris_tbl |> sdf_nest(Sepal_Length, Sepal_Width, .key="Sepal") # using java-like dot-notation iris_nst |> sdf_select(Species, Petal_Width, Sepal.Sepal_Width) # using R-like dollar-sign-notation iris_nst |> sdf_select(Species, Petal_Width, Sepal$Sepal_Width) # using dplyr selection helpers iris_nst |> sdf_select(Species, matches("Petal"), Sepal$Sepal_Width) ## End(Not run)
Unnesting is an (optional) explode operation coupled with a nested select to promote the sub-fields of
the exploded top level array/map/struct to the top level. Hence, given a, an array with fields
a1, a2, a3, then codesdf_explode(df, a) will produce output with each record replicated
for every element in the a array and with the fields a1, a2, a3 (but not a)
at the top level. Similar to tidyr::unnest.
sdf_unnest(x, column, keep_all = FALSE)sdf_unnest(x, column, keep_all = FALSE)
x |
An object (usually a |
column |
The field to explode |
keep_all |
Logical. If |
Note that this is a less precise tool than using sdf_explode and sdf_select
directly because all fields of the exploded array will be kept and promoted. Direct calls to these
methods allows for more targeted use of sdf_select to promote only those fields that
are wanted to the top level of the data frame.
Additionally, though sdf_select allows users to reach arbitrarily far into a nested
structure, this function will only reach one layer deep. It may well be that the unnested fields
are themselves nested structures that need to be dealt with accordingly.
Note that map types are supported, but there is no is_map argument. This is because the
function is doing schema interrogation of the input data anyway to determine whether an explode
operation is required (it is of maps and arrays, but not for bare structs). Given this the result
of the schema interrogation drives the value o is_map provided to sdf_explode.
## Not run: # first get some nested data iris_tbl <- copy_to(sc, iris, name="iris") iris_nst <- iris_tbl |> sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") |> group_by(Species) |> summarize(data=collect_list(data)) # then explode it iris_nst |> sdf_unnest(data) ## End(Not run)## Not run: # first get some nested data iris_tbl <- copy_to(sc, iris, name="iris") iris_nst <- iris_tbl |> sdf_nest(Sepal_Length, Sepal_Width, Petal_Length, Petal_Width, .key="data") |> group_by(Species) |> summarize(data=collect_list(data)) # then explode it iris_nst |> sdf_unnest(data) ## End(Not run)
These function support supplying a spark read schema. This is particularly useful when reading data with nested arrays when you are not interested in several of the nested fields.
struct_type(sc, struct_fields) struct_field(sc, name, data_type, nullable = FALSE) array_type(sc, data_type, nullable = FALSE) binary_type(sc) boolean_type(sc) byte_type(sc) date_type(sc) double_type(sc) float_type(sc) integer_type(sc) numeric_type(sc) long_type(sc) map_type(sc, key_type, value_type, nullable = FALSE) string_type(sc) character_type(sc) timestamp_type(sc)struct_type(sc, struct_fields) struct_field(sc, name, data_type, nullable = FALSE) array_type(sc, data_type, nullable = FALSE) binary_type(sc) boolean_type(sc) byte_type(sc) date_type(sc) double_type(sc) float_type(sc) integer_type(sc) numeric_type(sc) long_type(sc) map_type(sc, key_type, value_type, nullable = FALSE) string_type(sc) character_type(sc) timestamp_type(sc)
sc |
A |
struct_fields |
A vector or fields obtained from |
name |
A field name to use in the output struct type |
data_type |
A (java) data type (e.g., |
nullable |
Logical. Describes whether field can be missing for some rows. |
key_type |
A (java) data type describing the map keys (usually |
value_type |
A (java) data type describing the map values |