相关文章推荐
重感情的番茄  ·  使用阿里云CLI Visual ...·  4 月前    · 
豪爽的海龟  ·  学习OPENSEADRAGON之二 ...·  11 月前    · 
至今单身的仙人球  ·  pandas ...·  1 年前    · 

由Pyspark地图调用的函数不修改全局列表

1 人不认可

我定义了这个对全局列表进行操作的函数 签名 , 我已经测试了这个功能,它是有效的。

def add_to_list_initial(x):
    global signature
    signature.append([x])
    print(x)
    return x

The print will check if the function is invoked.

我必须为Pyspark rdd的每一行运行这个函数,所以我写了这段代码。

rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x])).map(lambda x: add_to_list_initial(x))

但这个函数没有被调用,所以,为了避免地图的 "懒惰",我试着在最后加上".count()",以这种方式。

rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x])).map(lambda x: add_to_list_initial(x)).count()

而现在,打印工作已经完成。我甚至已经检查了那份名单签名被更新,但当我试图打印列表的大小时,结果是0,因为列表根本没有被更新。

我甚至尝试过用foreach而不是地图,但结果是一样的。

rdd1 = rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x]))
rdd1.foreach(add_to_list_initial)

这些是输出的第一行,它们在我的Pycharm控制台中是用红色写的,甚至是打印出来的。

19/11/19 21:56:51 WARN TaskSetManager: Stage 2 contains a task of very large size (76414 KB). The maximum recommended task size is 100 KB.
1000052032941703168135263382785614272239884872602
1001548144792848500380180424836160638323674923493
1001192257270049214326810337735024900266705408878
1005273115771118475643621392239203192516851021236
100392090499199786517408984837575190060861208673
1001304115299775295352319010425102201971454728176
1009952688729976061710890304226612996334789156125
1001064097828097404652846404629529563217707288121
1001774517560471388799843553771453069473894089066
1001111820875570611167329779043376285257015448116
1001339474866718130058118603277141156508303423308
1003194269601172112216983411469283303300285500716
1003194269601172112216983411469283303300285500716
1003194269601172112216983411469283303300285500716
1003194269601172112216983411469283303300285500716
1003194269601172112216983411469283303300285500716

我怎样才能以有效的方式解决? 我使用Python 3.7和Pyspark 3.2.1

I'm doing this in order to obtain a min-hash 签名 for each set of hashed shingles, where the id of the document is

然后,为了计算其他的排列组合,我认为要以这种方式行事。

def add_to_list(x):
    global num_announcements
    global signature
    global i
    print(len(signature))
    if i == num_announcements:
        i = 0
    signature[i].append(x)
    print(i)
    i += 1
for function in hash_functions[1:]:
    rdd.map(lambda x: min([str(int.from_bytes(function(str(shingle)), 'big')) for shingle in x])).foreach(add_to_list)

但问题是一样的。 我甚至会很高兴对我的洗矿问题提出建议,但问题是关于上述的问题。

python
apache-spark
lambda
pyspark
global-variables
antoninus96
antoninus96
发布于 2019-11-20
2 个回答
antoninus96
antoninus96
发布于 2019-11-20
已采纳
0 人赞同

我以这种方式解决了问题,即使我没有找到一般问题的有用解决方案。

signatures = shingles.flatMap(lambda x: [[(x[1]+1, (x[1]+1)%lsh_b), min([int.from_bytes(function(str(s)), 'big') for s in x[0]])] for function in hash_functions]).cache()
    
Laurent LAPORTE
Laurent LAPORTE
发布于 2019-11-20
0 人赞同

你可以使用一个类来代替全局变量(一个 可调用 ).

from collections.abc import Callable
class Signature(Callable):
    def __init__(self):
        self.signature = []
    def __call__(self, x):
        self.signature.append([x])
        return x

然后,你可以在你需要的地方把这个可调用的东西实例化。

add_to_list_initial = Signature()
rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x])).map(
    lambda x: add_to_list_initial(x)
).count()
print(add_to_list_initial.signature)

注意:你可以在这里避免使用lambda表达式,而是使用简化的方式。

rdd.map(lambda x: min([str(int.from_bytes(hash_functions[0](str(shingle)), 'big')) for shingle in x])).map(
    add_to_list_initial
).count()

为了允许腌制,你可以使用。

class Signature:
    def __init__(self):
        self.signature = []