Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams

Trying to run two different functions at the same time with shared queue and get an error...how can I run two functions at the same time with a shared queue? This is Python version 3.6 on Windows 7.

from multiprocessing import Process
from queue import Queue
import logging
def main():
    x = DataGenerator()
        x.run()
    except Exception as e:
        logging.exception("message")
class DataGenerator:
    def __init__(self):
        logging.basicConfig(filename='testing.log', level=logging.INFO)
    def run(self):
        logging.info("Running Generator")
        queue = Queue()
        Process(target=self.package, args=(queue,)).start()
        logging.info("Process started to generate data")
        Process(target=self.send, args=(queue,)).start()
        logging.info("Process started to send data.")
    def package(self, queue): 
        while True:
            for i in range(16):
                datagram = bytearray()
                datagram.append(i)
                queue.put(datagram)
    def send(self, queue):
        byte_array = bytearray()
        while True:
            size_of__queue = queue.qsize()
            logging.info(" queue size %s", size_of_queue)
            if size_of_queue > 7:
                for i in range(1, 8):
                    packet = queue.get()
                    byte_array.append(packet)
                logging.info("Sending datagram ")
                print(str(datagram))
                byte_array(0)
if __name__ == "__main__":
    main()

The logs indicate an error, I tried running console as administrator and I get the same message...

INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
  File "test.py", line 8, in main
    x.run()
  File "test.py", line 20, in run
    Process(target=self.package, args=(queue,)).start()
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
                queue.Queue is for inter-thread communication. multiprocessing.Queue is for sending things between processes.
– user2357112
                May 23, 2017 at 20:45
                @user2357112 I made the change to multiprocessing.Queue and that fixed the issue. Thank you.
– Jonathan Kittell
                May 23, 2017 at 20:57
                Does this answer your question? multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
– Camilo Terevinto
                Jan 28, 2022 at 14:27

I had the same problem with Pool() in Python 3.6.3.

Error received: TypeError: can't pickle _thread.RLock objects

Let's say we want to add some number num_to_add to each element of some list num_list in parallel. The code is schematically like this:

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1 
        self.run()
    def run(self):
        new_num_list = Manager().list()
        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list)) 
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()
    def run_parallel(self, num, shared_new_num_list):
        new_num = num + self.num_to_add # uses class parameter
        shared_new_num_list.append(new_num)

The problem here is that self in function run_parallel() can't be pickled as it is a class instance. Moving this parallelized function run_parallel() out of the class helped. But it's not the best solution as this function probably needs to use class parameters like self.num_to_add and then you have to pass it as an argument.

Solution:

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + to_add
    shared_new_num_list.append(new_num)
class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1
        self.run()
    def run(self):
        new_num_list = Manager().list()
        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

Other suggestions above didn't help me.

I encountered this same error when I was launching a process from within a class. Was just using "multiprocessing.Process". I moved the function outside class as per your suggestion and it worked for me! – Sandeep S D Nov 29, 2021 at 16:39

multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

Move the queue to self instead of as an argument to your functions package and send

Complementing Marina answer here something to access the whole class. It also fools Pool.map as I needed today.

fakeSelf = None
def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + fakeSelf.num_to_add
    shared_new_num_list.append(new_num)
class DataGenerator:
    def __init__(self, num_list, num_to_add)
        globals()['fakeSelf'] = self
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1
        self.run()
    def run(self):
        new_num_list = Manager().list()

As this is the first answer that shows up when searching for this issue, I will also add my solutions here.

This issue can be caused by many things. Here are two scenarios I have encountered:

  • Package incompatibility
  • Problem: Trying to use multiprocessing when another part of your code that eventually gets called also needs to create new processes or is incompatible with being copied to a new process because of the use of locks, for example.
  • Solution: My issue was with trying to use a single connection to a MongoDB instance for all of my processes. Creating a new connection for each process resolved the issue.
  • Class instance
  • Problem: Trying to call pool.starmap from inside of a class to another function in the class. Making it a staticmethod or having a function on the outside call it didn't work and gave the same error. A class instance just can't be pickled so we need to create the instance after we start the multiprocessing.
  • Solution: What I ended up doing that worked for me was to separate my class into two classes. Basically, the function you are calling the multiprocessing on needs to be called right after you instantiate a new object for the class it belongs to. Something like this:
  • from multiprocessing import Pool
    class B:
        def process_feature(idx, feature):
            # do stuff in the new process
    def multiprocess_feature(process_args):
        b_instance = B()
        return b_instance.process_feature(*process_args)
    class A:
        def process_stuff():
            with Pool(processes=num_processes, maxtasksperchild=10) as pool:
                results = pool.starmap(
                    multiprocess_feature,
                        (idx, feature)
                        for idx, feature in enumerate(features)
                    chunksize=100,
            

    Thanks for contributing an answer to Stack Overflow!

    • Please be sure to answer the question. Provide details and share your research!

    But avoid

    • Asking for help, clarification, or responding to other answers.
    • Making statements based on opinion; back them up with references or personal experience.

    To learn more, see our tips on writing great answers.

    TypeErr: cant pickle '_thread.RLock' object - Objective: save model in .joblib format to reuse after so .joblib needed See more linked questions