Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Trying to run two different functions at the same time with shared queue and get an error...how can I run two functions at the same time with a shared queue? This is Python version 3.6 on Windows 7.
from multiprocessing import Process
from queue import Queue
import logging
def main():
x = DataGenerator()
x.run()
except Exception as e:
logging.exception("message")
class DataGenerator:
def __init__(self):
logging.basicConfig(filename='testing.log', level=logging.INFO)
def run(self):
logging.info("Running Generator")
queue = Queue()
Process(target=self.package, args=(queue,)).start()
logging.info("Process started to generate data")
Process(target=self.send, args=(queue,)).start()
logging.info("Process started to send data.")
def package(self, queue):
while True:
for i in range(16):
datagram = bytearray()
datagram.append(i)
queue.put(datagram)
def send(self, queue):
byte_array = bytearray()
while True:
size_of__queue = queue.qsize()
logging.info(" queue size %s", size_of_queue)
if size_of_queue > 7:
for i in range(1, 8):
packet = queue.get()
byte_array.append(packet)
logging.info("Sending datagram ")
print(str(datagram))
byte_array(0)
if __name__ == "__main__":
main()
The logs indicate an error, I tried running console as administrator and I get the same message...
INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
File "test.py", line 8, in main
x.run()
File "test.py", line 20, in run
Process(target=self.package, args=(queue,)).start()
File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
–
–
–
I had the same problem with Pool()
in Python 3.6.3.
Error received: TypeError: can't pickle _thread.RLock objects
Let's say we want to add some number num_to_add
to each element of some list num_list
in parallel. The code is schematically like this:
class DataGenerator:
def __init__(self, num_list, num_to_add)
self.num_list = num_list # e.g. [4,2,5,7]
self.num_to_add = num_to_add # e.g. 1
self.run()
def run(self):
new_num_list = Manager().list()
pool = Pool(processes=50)
results = [pool.apply_async(run_parallel, (num, new_num_list))
for num in num_list]
roots = [r.get() for r in results]
pool.close()
pool.terminate()
pool.join()
def run_parallel(self, num, shared_new_num_list):
new_num = num + self.num_to_add # uses class parameter
shared_new_num_list.append(new_num)
The problem here is that self
in function run_parallel()
can't be pickled as it is a class instance. Moving this parallelized function run_parallel()
out of the class helped. But it's not the best solution as this function probably needs to use class parameters like self.num_to_add
and then you have to pass it as an argument.
Solution:
def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
new_num = num + to_add
shared_new_num_list.append(new_num)
class DataGenerator:
def __init__(self, num_list, num_to_add)
self.num_list = num_list # e.g. [4,2,5,7]
self.num_to_add = num_to_add # e.g. 1
self.run()
def run(self):
new_num_list = Manager().list()
pool = Pool(processes=50)
results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument
for num in num_list]
roots = [r.get() for r in results]
pool.close()
pool.terminate()
pool.join()
Other suggestions above didn't help me.
–
multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
Move the queue to self instead of as an argument to your functions package
and send
Complementing Marina answer here something to access the whole class. It also fools Pool.map as I needed today.
fakeSelf = None
def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
new_num = num + fakeSelf.num_to_add
shared_new_num_list.append(new_num)
class DataGenerator:
def __init__(self, num_list, num_to_add)
globals()['fakeSelf'] = self
self.num_list = num_list # e.g. [4,2,5,7]
self.num_to_add = num_to_add # e.g. 1
self.run()
def run(self):
new_num_list = Manager().list()
As this is the first answer that shows up when searching for this issue, I will also add my solutions here.
This issue can be caused by many things. Here are two scenarios I have encountered:
Package incompatibility
Problem: Trying to use multiprocessing when another part of your code that eventually gets called also needs to create new processes or is incompatible with being copied to a new process because of the use of locks, for example.
Solution: My issue was with trying to use a single connection to a MongoDB instance for all of my processes. Creating a new connection for each process resolved the issue.
Class instance
Problem: Trying to call pool.starmap
from inside of a class to another function in the class. Making it a staticmethod or having a function on the outside call it didn't work and gave the same error. A class instance just can't be pickled so we need to create the instance after we start the multiprocessing.
Solution: What I ended up doing that worked for me was to separate my class into two classes. Basically, the function you are calling the multiprocessing on needs to be called right after you instantiate a new object for the class it belongs to. Something like this:
from multiprocessing import Pool
class B:
def process_feature(idx, feature):
# do stuff in the new process
def multiprocess_feature(process_args):
b_instance = B()
return b_instance.process_feature(*process_args)
class A:
def process_stuff():
with Pool(processes=num_processes, maxtasksperchild=10) as pool:
results = pool.starmap(
multiprocess_feature,
(idx, feature)
for idx, feature in enumerate(features)
chunksize=100,
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
TypeErr: cant pickle '_thread.RLock' object - Objective: save model in .joblib format to reuse after so .joblib needed
See more linked questions