Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

=========== UPDATED ========

I added some more details in my JSON (the struct_c and the array_d) to make it clearer on where I got the exception.

============================

Good day,

I have a Spark DataFrame with a nested array of type struct. I want to select a column from that struct, but got the error message: " org.apache.spark.sql.AnalysisException: cannot resolve ' home . array_a . array_b ['a']' due to data type mismatch: argument 2 requires integral type, however, ''a'' is of string type ".

Here is my data:

"home": { "a_number": 5, "a_string": "six", "array_a": [ "array_b": [{"a": "1", "b": 2}], "struct_c": {"a": 1.1, "b": 1.3}, "array_d": ["a", "b", "c"] "array_b": [{"a": "3", "b": 4}], "struct_c": {"a": 1.5, "b": 1.6}, "array_d": ["x", "y", "z"]

Here is my data schema:

mydf1 = spark.read.option("multiline", "true").json("myJson.json")
mydf1.printSchema()
 |-- home: struct (nullable = true)
 |    |-- a_number: long (nullable = true)
 |    |-- a_string: string (nullable = true)
 |    |-- array_a: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- array_b: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- a: string (nullable = true)
 |    |    |    |    |    |-- b: long (nullable = true)
 |    |    |    |-- array_d: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- struct_c: struct (nullable = true)
 |    |    |    |    |-- a: double (nullable = true)
 |    |    |    |    |-- b: double (nullable = true)

When I select data from either the struct_c or the array_d (array of strings) inside that array_a, there was no issue.

mydf1.select("home.array_a.array_d").show(10, False)
+----------------------+
|array_d               |
+----------------------+
|[[a, b, c], [x, y, z]]|
+----------------------+
mydf1.select(col("home.array_a.struct_c.a").alias("struct_field_inside_arrayA")).show(10, False)
+--------------------------+
|struct_field_inside_arrayA|
+--------------------------+
|[1.1, 1.5]                |
+--------------------------+

And here is where it failed:

mydf1.select("home.array_a.array_b.a").printSchema()
mydf1.select("home.array_a.array_b.a").show()

What I expect is a two-dimension array of string ([["1", "3"]] is my sample JSON)

Could you please help on why it failed?

Thanks for your help.

Fail to execute line 4: mydf1.select("home.array_a.array_b.a").printSchema() Traceback (most recent call last): File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o15300.select. : org.apache.spark.sql.AnalysisException: cannot resolve 'home.array_a.array_b['a']' due to data type mismatch: argument 2 requires integral type, however, ''a'' is of string type.;; 'Project [home#18213.array_a.array_b[a] AS a#18217] +- Relation[home#18213] json

org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:115) org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$3.applyOrElse(CheckAnalysis.scala:107) org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278) org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:278) org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:277) org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:275) org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:275) org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326) org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324) org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:275) org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93) org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:93) org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105) org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105) org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104) org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116) org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121) org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126) org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126) org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:93) org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:107) org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85) org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85) org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95) org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108) org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201) org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3407) at org.apache.spark.sql.Dataset.select(Dataset.scala:1335) at sun.reflect.GeneratedMethodAccessor348.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-5197917387349583174.py", line 380, in exec(code, _zcUserQueryNameSpace) File "", line 4, in File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1320, in select jdf = self._jdf.select(self._jcols(*cols)) File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: "cannot resolve 'home.array_a.array_b['a']' due to data type mismatch: argument 2 requires integral type, however, ''a'' is of string type.;;\n'Project [home#18213.array_a.array_b[a] AS a#18217]\n+- Relation[home#18213] json\n"

Since array_a and array_b are array type you cannot select its element directly

You need to explode them as below or you can get by Index

mydf1.withColumn("array_a", explode($"home.array_a"))
  .withColumn("array_b", explode($"array_a.array_b"))
  .select("array_b.a").show(false)

This will gice you

+---+
|a  |
+---+
|1  |
|3  |
+---+
                Thank you Shankar. However, "Since array_a and array_b are array type you cannot select its element directly" <<< this is not true, as in my original post, it is possible to select "home.array_a.another_number".  I don't want to use explode though, as I will end up having too many records with duplicated value on other columns. And in the subsequent aggregations, there's a the need to do groupBy.
– Averell
                Jul 31, 2019 at 11:04

Since you have no problem with the element_at() function, I supposed you are using the spark 2.4+, then you can try Spark SQL built-in functions: transform [1][2] + flatten:

>>> mydf1.selectExpr('flatten(transform(home.array_a.array_b, x -> x.a)) as array_field_inside_array').show()
+------------------------+
|array_field_inside_array|
+------------------------+
|                  [1, 3]|
+------------------------+

Where we use transform() function to retrieve only the values of field a of each array element of home.array_a.array_b and transform them to the array [[1], [3]]. then flatten that array into [1, 3]. If you need the result to be [[1, 3]], then just add array() function

array(flatten(transform(home.array_a.array_b, x -> x.a)))
                Thank you @jxc. It works. To have that [[1, 3]] I just only need to remove that "flatten". Does it make any difference between having array(flatten(transform())) and just transform()?  And I couldn't find transform()'s equivalent in PySpark. Is it in the form of some other name? Thanks
– Averell
                Aug 1, 2019 at 3:51
                @Averell, with transform() only, we get [[1], [3]] which I think is not what you are looking for? that's why I added flatten() function. transform() is one of the Spark SQL buildin functions (spark.apache.org/docs/2.4.3/api/sql/index.html). With pyspark dataframes, we can always use df.selectExpr() or spark.sql.functions.expr() to run these SQL functions :), you can google spark sql higher order functions for some more examples of functions related to the array operations.
– jxc
                Aug 1, 2019 at 4:10
                But I need the struct field "a" from ALL elements in array_b, not all struct fields from the 1st element in array_b.
– Averell
                Aug 1, 2019 at 0:07

In your example, it failed because you are trying to print the schema of a value not a column.

So if you remove "a" from the select statement then you can print the desired schema.

scala> dataDF.select("home.array_a.array_b").printSchema
 |-- array_b: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- a: string (nullable = true)
 |    |    |    |-- b: long (nullable = true)

If you want value from the array ( array_b ), you need to give the index.

scala> dataDF.select(element_at(col("home.array_a.array_b"),1)).show
+-----------------------------------+
|element_at(home.array_a.array_b, 1)|
+-----------------------------------+
|                           [[1, 2]]|
+-----------------------------------+

Can you also give the expected dataframe.

Thank you Praveen. But that would only give the first element of array_b, which is a struct of 2 struct-fields "a" and "b". What I need is the struct field "a" from ALL elements in array_b (which should be [["1", "3"]] in my sample json. – Averell Aug 1, 2019 at 0:12 your output is a dataframe with each record to be an array of structs/tuples [(1, 2)]. My expected. While I need a dataframe with each record to be an array of array of string [["1", "3"]]. – Averell Aug 1, 2019 at 0:31

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.