val df = sqlc.read.format("org.elasticsearch.spark.sql").load("idxcli/exploits")
println(df.count())
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at org.elasticsearch.spark.sql.RowValueReader$class.rowOrder(RowValueReader.scala:24)
at org.elasticsearch.spark.sql.ScalaRowValueReader.rowOrder(ScalaEsRowValueReader.scala:13)
at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:32)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:620)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I think the problem occur because we have an Array of objects in our index
Our index:
"exploits" : {
"properties" : {
"actions" : {
"properties" : {
"action" : {
"type" : "string"
"logobj_author" : {
"type" : "string"
"logobj_event" : {
"type" : "string"
"logobj_id" : {
"type" : "string"
"logobj_section" : {
"type" : "string"
"logobj_subsection" : {
"type" : "string"
"logobj_tags" : {
"type" : "string"
"relation_progress" : {
"type" : "string"
"subject_geoip_countrycode" : {
"type" : "string"
"subject_origin" : {
"type" : "string"
"subject_referrer" : {
"type" : "string"
"subject_ua_name" : {
"type" : "string"
"subject_ua_os" : {
"type" : "string"
"category" : {
"type" : "string"
"description" : {
"type" : "string"
"endDate" : {
"type" : "string"
"evtType" : {
"type" : "string"
"id" : {
"type" : "string"
"name" : {
"type" : "string",
"store" : true
"points" : {
"type" : "string"
"startDate" : {
"type" : "string"
"status" : {
"type" : "boolean"
"type" : {
"type" : "string"
seems I have the same error; also having arrays of objects in the index. I am running Spark-ES-Connector 2.1.0 final with Spark 1.4.0.
All the best,
Sebastian
Unfortunately there's no easy solution for this one. Elasticsearch mapping treats fields with one or multiple (array) values the same way in mapping. In Spark SQL since the associated schema of the DataFrame is fixed, this means either treating all fields to be single value (the default) or as arrays (potentially with only one value).
When dealing with a document that breaks the rule - single value suddenly becomes an array, the conflict leads to an exception.
There are two issues here:
the exception should be clearer/better - this is not currently the case
allow the user to specify what fields are arrays vs single value. This would be reflected in the resulting schema as well as otherwise, es-spark cannot tell which is the case without looking at all the data first.
I'd like to address both 2 and 1 in the same go in the upcoming version. Will let you know once that happens.
relates #464
Is there a way to manually specify the schema or the type of that one column when creating the DataFrame right now?
Also, can elasticsearch-hadoop read the _mapping from the index and use that for the types, instead of relying on the actual values in the document to guess the types? I have the same problem - I have a keywords field where the _mapping is set to
"keywords" : {
"type" : "nested",
"properties" : {
"level" : {
"type" : "long"
"word" : {
"type" : "string"
but the actual value in the documents right now is null, and df.printSchema outputs:
scala> df.printSchema
|-- ...
|-- keywords: string (nullable = true)
|-- ...
So it's incorrectly detecting it as a string.
Hi Costin,
Is it possible for you to give an interim jar file with the fix? At this moment we are kind of stuck due to this issue.
The problem arises due to the existence of the following key in the ES document. The printSchema() on the DataFrame should print the following and we apply the schema to the DataFrame also, but the result is same.
|-- SensitiveDataDomains: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- columns: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- columnName: string (nullable = true)
| | | | |-- schemaName: string (nullable = true)
| | | | |-- tableName: string (nullable = true)
| | |-- dataDomainName: string (nullable = true)
The issue is being worked on, hopefully there's going to be a fix over the weekend.
As for the interim jars - that's what the dev builds are all about; we publish nightly builds between releases so folks can try things out right away, without having to build them or wait for the release to happen.
Once it will be solved, you'll get notified on this thread.
As a workaround, one can create the schema manually and specify what fields have to be arrays. This can be done either by manually creating the RDD and then the DataFrame from it.
Thank you for the patience.
Hi everyone,
I've pushed a fix to master as well as the dev builds. Basically one can now configure what fields will be read as arrays by the connector through es.field.read.as.array.include and es.field.read.as.array.exclude.
By default, no field will be considered as an array. So for example given the mapping:
"properties" : {
"arr" : {
"properties" : {
"one" : { "type" : "string" },
"two" : { "type" : "string" }
"top-level" : { "type" : "string" }
by default the detected schema will be:
|-- arr: struct (nullable = true)
| |-- one: string (nullable = true)
| |-- two: string (nullable = true)
|-- top-level: string (nullable = true)
To tell the connector that arr needs to be treated as an array, set es.field.read.as.array.include to arr.
The resulting schema will be:
|-- arr: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- one: string (nullable = true)
| | |-- two: string (nullable = true)
|-- top-level: string (nullable = true)
Multiple values can be specified by separating them with a comma and even regexps can be used.
Feedback is welcome!
P.S. Thanks for your patience.
@costin which commit are you referring to? searching for es.field.read.as.array.include in the sources results in nothing
thanks for being so quick :)
Introduce option to tell the connector what fields in ES need to be
read as arrays. This way the connector always creates the appropriate
structure, in particular in Spark SQL.
relates #484
Introduce option to tell the connector what fields in ES need to be
read as arrays. This way the connector always creates the appropriate
structure, in particular in Spark SQL.
relates #484
(cherry picked from commit 4a76b47)
Conflicts:
spark/sql-13/src/main/scala/org/elasticsearch/spark/sql/SchemaUtils.scala
Refactor logic to use absolute field names as the relative name are
prone to overwrites
Improve field filtering to allow partial or strict matching
relates #482 #484
Refactor logic to use absolute field names as the relative name are
prone to overwrites
Improve field filtering to allow partial or strict matching
relates #482 #484
(cherry picked from commit 6222363)
Folks as there hasn't been any update I'm closing the issue. Please see the new releases once they come out (should be a couple of hours).
Thanks!