相关文章推荐
眉毛粗的木瓜  ·  spark split - CSDN文库·  1 月前    · 
没有腹肌的开水瓶  ·  Exception in thread ...·  2 周前    · 
千年单身的蚂蚁  ·  Exception in thread ...·  2 周前    · 
谦虚好学的火柴  ·  Spark ...·  昨天    · 
要出家的米饭  ·  在 Azure Databricks ...·  昨天    · 
傲视众生的熊猫  ·  svg 图标只有点击 ...·  1 年前    · 
自信的酱肘子  ·  Android Recycleview ...·  1 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Spark kryo_serializers and Broadcast<Map<Object, Iterable<GowallaDataLocation>>> java.io.IOException: java.lang.UnsupportedOperationException

Ask Question

17/03/26 03:04:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 10, 192.168.56.5, executor 1): java.io.IOException: java.lang.UnsupportedOperationException at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at GowallaTask$2.call(GowallaTask.java:214) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351) at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:917) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.UnsupportedOperationException at java.util.AbstractMap.put(AbstractMap.java:209) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:244) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$10.apply(TorrentBroadcast.scala:286) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:287) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:221) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) ... 19 more

I received the exception when I use KryoSerializer

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.kryoserializer.buffer.mb", "24");

This is my code.

JavaPairRDD<Object, Iterable<GowallaDataLocation>> line_RDD_2 = sc
            .textFile("/home/piero/gowalla_location.txt", 2).map(new GowallaMapperDataLocation())
            .groupBy(new Function<GowallaDataLocation, Object>() {
                private static final long serialVersionUID = -6773509902594100325L;
                @Override
                public Object call(GowallaDataLocation v1) throws Exception {
                    DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
                    return dateFormat.format(v1.getDATE());
            }).persist(StorageLevel.MEMORY_AND_DISK_SER());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(line_RDD_2.collectAsMap());
    //System.out.println(broadcastVar_2.getValue().size());
    JavaRDD<Object> keys = line_RDD_2.keys().persist(StorageLevel.MEMORY_ONLY_SER());
    line_RDD_2.unpersist();
    keys.foreach(new VoidFunction<Object>() {
        private static final long serialVersionUID = -8148877518271969523L;
        @Override
        public void call(Object t) throws Exception {
            // TODO Auto-generated method stub
            //System.out.println("KEY:" + t + " ");
            Iterable<GowallaDataLocation> dr = broadcastVar_2.getValue().get(t);

I suspect this happens because you are broadcasting the line_RDD_2.collectAsMap() directly: this means the type broadcasted is Map and kryo doesn't know the correct implementation and will use an AbstractMap for it's inner work

Like if i do this:

Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = a.keySet();
c.add("e");

i will get unsupported operation for AbstractCollection, easily resolved:

Map<String, String> a = new HashMap<String, String>();
a.put("a", "b");
Set<String> c = new TreeSet<String>();
c.addAll(a.keySet());
c.add("e");

if my guess is right you can probably resolve it like this:

Map<Object, Iterable<GowallaDataLocation>> a = new HashMap<>();
a.putAll(line_RDD_2.collectAsMap());
Broadcast<Map<Object, Iterable<GowallaDataLocation>>> broadcastVar_2 = sc.broadcast(a);

let me know if this works

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.