多线程在python中读写json文件的并发性

1 人关注

我想在python中通过多个线程读写一个json文件。

初始设置) open(file_path, "w+") (如果文件是空的,就直接转储空的json文件)

当用threading.lock编写时

1) 在内存中加载json文件。

2) 通过新的键和值更新内存中加载的json。

3) dump current json(on memory) to file.

因为在写的时候有一个锁,我认为即使有多个线程运行,读和写文件也是安全的,但是会出错。

class Writer(object):
    def __init__(self, path):
        self._lock = threading.Lock()
        self.path = path
        self.current_json = None
        self._init_opend_file()
    def _init_opend_file(self):
        with self._lock:
            self._opened_file = open(self.path, "w+")
            if self._opened_file.read() == "":
                json.dump({}, self._opened_file)
            else:
    def write(self, key, value):
        with self._lock:
            self._opened_file.seek(0)
            self.current_json = json.load(self._opened_file)
            self.current_json[key] = value
            self._opened_file.seek(0)
            self._opened_file.truncate()
            json.dump(self.current_json, self._opened_file)
if __name__ == "__main__":
    path = r"D:\test.json"
    def run(name, range_):
        writer = Writer(path)
        for i in range(range_):
            writer.write(name,i)
    t1 = threading.Thread(target=run, args=("one", 1000))
    t2 = threading.Thread(target=run, args=("two", 2000))
    t1.start()
    t2.start()

我希望在test.json中得到{"一":1000,"二":2000}。但我得到的是{"一"。1}"二"。1}.似乎多个线程同时访问该文件,并写入不同的东西 但是,我不能不理解为什么会发生threading.lock()的情况。

Exception in thread Thread-2:
Traceback (most recent call last):
  File "D:\Anaconda3_64\envs\atom\lib\threading.py", line 917, in _bootstrap_inner
    self.run()
  File "D:\Anaconda3_64\envs\atom\lib\threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "D:/Dropbox/000_ComputerScience/000_개발/Quant/Seperator/json_test.py", line 37, in run
    writer.write(name,i)
  File "D:/Dropbox/000_ComputerScience/000_개발/Quant/Seperator/json_test.py", line 24, in write
    self.current_json = json.load(self._opened_file)
  File "D:\Anaconda3_64\envs\atom\lib\json\__init__.py", line 296, in load
    parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw)
  File "D:\Anaconda3_64\envs\atom\lib\json\__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "D:\Anaconda3_64\envs\atom\lib\json\decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "D:\Anaconda3_64\envs\atom\lib\json\decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    
python
json
multithreading
concurrency
thread-safety
Terry Jo
Terry Jo
发布于 2019-08-23
2 个回答
Nitin
Nitin
发布于 2019-08-23
已采纳
0 人赞同

发生这种情况是因为两个线程没有共享同一个锁。 请尝试使用 ThreadPoolExecutor 或将该类扩展为 class Writer(threading.Thread):

替换代码0】负责处理线程之间的共同共享资源本身。所以,你不需要担心锁的问题。

ThreadPoolExecutor 文件

threading : Refer HERE

Example of ThreadPoolExecutor :

def data_write(z):
    sleep_wait = random.randint(0, 2)
    print("sleeping:", sleep_wait, ", data:", z)
    time.sleep(sleep_wait)
    print('{field: %s}' % z , file=f)
    return z
from concurrent.futures import ThreadPoolExecutor
with open('test', 'a') as f:
    data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    with ThreadPoolExecutor(max_workers=3) as executor:
        future = list(executor.map(data_write, data))
    print(future)
    
Ivan Popov
Ivan Popov
发布于 2019-08-23
0 人赞同

最好把事情简单化。你的类只做了一些写入的工作,所以你可以用一个简单的函数做得很好。你所使用的 w+ 模式在打开时也会截断文件,所以你永远看不到它之前的状态。替换代码1】的方法需要一个字节数来截断,所以我把它改为 truncate(0) 。有一个单锁,它在 write 函数中被获取和释放。最后, range(1000) 给你的值最高为 999 ;)下面是结果。

import threading
import json
def write(path, key, value):
    lock.acquire()
    with open(path, "r+") as opened_file:
        current_json = opened_file.read()
        if current_json == "":
            current_json = {}
        else:
            current_json = json.loads(current_json)
        current_json[key] = value
        opened_file.seek(0)
        opened_file.truncate(0)
        json.dump(current_json, opened_file)
    lock.release()
if __name__ == "__main__":
    path = r"test.json"
    lock = threading.Lock()
    def run(name, range_):
        for i in range(range_):
            write(path, name,i)
    t1 = threading.Thread(target=run, args=("one", 1001))
    t2 = threading.Thread(target=run, args=("two", 2001))