Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Ask Question
17/03/26 03:04:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 10, 192.168.56.5, executor 1): java.io.IOException: java.lang.UnsupportedOperationException
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at GowallaTask$2.call(GowallaTask.java:214)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException
at java.util.AbstractMap.put(AbstractMap.java:209)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$10.apply(TorrentBroadcast.scala:286)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:287)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:221)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)
... 19 more
I received the exception when I use KryoSerializer
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryoserializer.buffer.mb", "24");
This is my code.
JavaPairRDD<Object, Iterable<GowallaDataLocation>> line_RDD_2 = sc
.textFile("/home/piero/gowalla_location.txt", 2).map(new GowallaMapperDataLocation())
.groupBy(new Function<GowallaDataLocation, Object>() {
private static final long serialVersionUID = -6773509902594100325L;
@Override
public Object call(GowallaDataLocation v1) throws Exception {
DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
return dateFormat.format(v1.getDATE());
}).persist(StorageLevel.MEMORY_AND_DISK_SER());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(line_RDD_2.collectAsMap());
//System.out.println(broadcastVar_2.getValue().size());
JavaRDD<Object> keys = line_RDD_2.keys().persist(StorageLevel.MEMORY_ONLY_SER());
line_RDD_2.unpersist();
keys.foreach(new VoidFunction<Object>() {
private static final long serialVersionUID = -8148877518271969523L;
@Override
public void call(Object t) throws Exception {
// TODO Auto-generated method stub
//System.out.println("KEY:" + t + " ");
Iterable<GowallaDataLocation> dr = broadcastVar_2.getValue().get(t);
I suspect this happens because you are broadcasting the line_RDD_2.collectAsMap()
directly: this means the type broadcasted is Map and kryo doesn't know the correct implementation and will use an AbstractMap
for it's inner work
Like if i do this:
Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = a.keySet();
c.add("e");
i will get unsupported operation for AbstractCollection
, easily resolved:
Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = new TreeSet<String>();
c.addAll(a.keySet());
c.add("e");
if my guess is right you can probably resolve it like this:
Map<Object, Iterable<GowallaDataLocation>> a = new HashMap<>();
a.putAll(line_RDD_2.collectAsMap());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(a);
let me know if this works
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.