相关文章推荐
腼腆的柠檬  ·  python ...·  1 月前    · 
眉毛粗的电梯  ·  python ...·  3 周前    · 
想出国的拐杖  ·  python dataframe ...·  6 天前    · 
风流的鸵鸟  ·  使用GeoPandas/Fiona从shap ...·  1 年前    · 
乖乖的钱包  ·  useEffect使用指南 - 知乎·  1 年前    · 

在Python中是否有一种简单的方法来等待,直到某些条件为真?

56 人关注

我需要在一个脚本中等待,直到一定数量的条件为真?

我知道我可以使用条件变量和朋友们推出我自己的事件,但我不想经历实现它的所有麻烦,因为一些对象的属性变化来自于包装好的C++库中的外部线程(Boost.Python),所以我不能只是在一个类中劫持 __setattr__ 并在那里放一个条件变量,这让我不得不尝试从C++中创建一个Python条件变量并发出信号,或者在Python中封装一个本地条件变量并等待它,这两种方法听起来都很麻烦,不必要地复杂和无聊。

有什么更简单的方法吗,除非连续轮询条件?

理想情况下,它应该是这样的

res = wait_until(lambda: some_predicate, timeout)
if (not res):
    print 'timed out'
    
python
Alex B
Alex B
发布于 2010-05-07
12 个回答
Alex Martelli
Alex Martelli
发布于 2020-03-15
已采纳
0 人赞同

不幸的是,要满足你的限制条件,唯一的可能性是定期进行 poll , e.g....:

import time
def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs):
  mustend = time.time() + timeout
  while time.time() < mustend:
    if somepredicate(*args, **kwargs): return True
    time.sleep(period)
  return False

或类似的东西。 如果somepredicate可以被分解,这可以以多种方式进行优化(例如如果已知它是几个子句的and,特别是如果其中一些子句又可以通过threading.Event或其他什么东西检测出来而被优化,等等),但在你要求的一般条件下,这种低效的方法是唯一的出路。

这就是我最后要做的事情。幸运的是,性能不是问题(这是个测试脚本)。
也许有另一种解决办法 stackoverflow.com/a/7836454/995714
在2018年的今天也没有办法做到这一点吗?
记载的".waitvariable "通用小组件方法。 infohost.nmt.edu/tcc/help/pubs/tkinter/web/universal.html might do the trick
Alon Gouldman
Alon Gouldman
发布于 2020-03-15
0 人赞同

另一个不错的软件包是 waiting 。- https://pypi.org/project/waiting/

pip install waiting

使用方法。 你传递一个每次都会被调用的函数作为条件,一个超时,(这个很有用)你可以传递一个等待的描述,如果你得到超时错误,这个描述就会显示出来。

using function:

from waiting import wait
def is_something_ready(something):
    if something.ready():
        return True
    return False
# wait for something to be ready
something = # whatever
wait(lambda: is_something_ready(something), timeout_seconds=120, waiting_for="something to be ready")
# this code will only execute after "something" is ready
print("Done")

注意:该函数必须返回一个布尔值--等待结束时为真,否则为假。

wait函数应该使用传递timeout_seconds来代替timeout。
如果我想只等待(如你所写的)5次准备就绪,然后在执行时出现超时错误,那么代码会是什么?
@a11eksandar 你可以用 sleep_seconds=20 来指定投票的间隔时间,所以我想你可以把它和 timeout_seconds 结合起来,得到你想要的行为。
Alex
Alex
发布于 2020-03-15
0 人赞同

这里是另一个解决方案。我们的目标是让线程在以非常精确的顺序做一些工作之前互相等待。这项工作可能需要未知的时间。持续的轮询不是很好,有两个原因:它占用了CPU时间,而且在条件满足后行动不会立即开始。

class Waiter():
    def __init__(self, init_value):
        self.var = init_value
        self.var_mutex = threading.Lock()
        self.var_event = threading.Event()
    def WaitUntil(self, v):
        while True:
            self.var_mutex.acquire()
            if self.var == v:
                self.var_mutex.release()
                return # Done waiting
            self.var_mutex.release()
            self.var_event.wait(1) # Wait 1 sec
    def Set(self, v):
        self.var_mutex.acquire()
        self.var = v
        self.var_mutex.release()
        self.var_event.set() # In case someone is waiting
        self.var_event.clear()

以及测试的方法

class TestWaiter():
    def __init__(self):
        self.waiter = Waiter(0)
        threading.Thread(name='Thread0', target=self.Thread0).start()
        threading.Thread(name='Thread1', target=self.Thread1).start()
        threading.Thread(name='Thread2', target=self.Thread2).start()
    def Thread0(self):
        while True:
            self.waiter.WaitUntil(0)
            # Do some work
            time.sleep(np.random.rand()*2)
            self.waiter.Set(1)
    def Thread1(self):
        while True:
            self.waiter.WaitUntil(1)
            # Do some work
            time.sleep(np.random.rand())
            self.waiter.Set(2)
    def Thread2(self):
        while True:
            self.waiter.WaitUntil(2)
            # Do some work
            time.sleep(np.random.rand()/10)
            self.waiter.Set(0)

用于多处理的服务器。

import multiprocessing as mp
import ctypes
class WaiterMP():
    def __init__(self, init_value, stop_value=-1):
        self.var = mp.Value(ctypes.c_int, init_value)
        self.stop_value = stop_value
        self.event = mp.Event()
    def Terminate(self):
        self.Set(self.stop_value)
    def Restart(self):
        self.var.value = self.init_value
    def WaitUntil(self, v):
        while True:
            if self.var.value == v or self.var.value == self.stop_value:
                return
            # Wait 1 sec and check aiagn (in case event was missed)
            self.event.wait(1)
    def Set(self, v):
        exit = self.var.value == self.stop_value
        if not exit: # Do not set var if threads are exiting
            self.var.value = v
        self.event.set() # In case someone is waiting
        self.event.clear()

如果这仍然不是最好的解决方案,请评论。

谢谢你的帮助。我不知道事件, set() wait() 。最终在我的代码中使用了这种类型的模式。绝对比 sleep() 要优雅得多。
看起来很有希望!比其他涉及睡眠的解决方案好得多。据我所知,在测试变量时不需要获得锁,只需要修改它。
Yann Ramin
Yann Ramin
发布于 2020-03-15
0 人赞同

你基本上已经回答了你自己的问题:没有。

由于你在 boost.python 中与外部库打交道,它们可能会随意改变对象,你需要让这些例程调用一个事件处理程序来刷新,或者用一个条件来工作。

Can Kavaklıoğlu
Can Kavaklıoğlu
发布于 2020-03-15
0 人赞同

这里是Alex解决方案的线程扩展。

import time
import threading
# based on https://stackoverflow.com/a/2785908/1056345                                                                                                                                                                                                                                                                         
def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs):
    must_end = time.time() + timeout
    while time.time() < must_end:
        if somepredicate(*args, **kwargs):
            return True
        time.sleep(period)
    return False
def wait_until_par(*args, **kwargs):
    t = threading.Thread(target=wait_until, args=args, kwargs=kwargs)
    t.start()
    print ('wait_until_par exits, thread runs in background')
def test():
    print('test')
wait_until_par(test, 5)
    
Patrik Staron
Patrik Staron
发布于 2020-03-15
0 人赞同

从计算的角度来看,必须在某个地方、某个时间对所有条件进行检查。如果你有两部分代码,一部分产生条件变化,另一部分在某些条件为真时应该被执行,你可以做以下工作。

把改变条件的代码放在比如说主线程中,而当某些条件为真时,应该启动的代码放在工作线程中。

from threading import Thread,Event
locker = Event()
def WhenSomeTrue(locker):
    locker.clear() # To prevent looping, see manual, link below
    locker.wait(2.0) # Suspend the thread until woken up, or 2s timeout is reached
    if not locker.is_set(): # when is_set() false, means timeout was reached
        print('TIMEOUT')
    else:
    # Code when some conditions are true
worker_thread = Thread(target=WhenSomeTrue, args=(locker,))
worker_thread.start()
cond1 = False
cond2 = False
cond3 = False
def evaluate():
    true_conditions = 0
    for i in range(1,4):
        if globals()["cond"+str(i)]: #access a global condition variable one by one
            true_conditions += 1     #increment at each true value
    if true_conditions > 1:
        locker.set() # Resume the worker thread executing the else branch
    #Or just if true_conditions > 1: locker.set();
    #true_conditions would need be incremented when 'True' is written to any of those variables
# some condition change code
evaluate()

关于这种方法的更多信息,请访问。https://docs.python.org/3/library/threading.html#event-objects

Roger Hoem-Martinsen
Roger Hoem-Martinsen
发布于 2020-03-15
0 人赞同

Proposed solution:

def wait_until(delegate, timeout: int):
    end = time.time() + timeout
    while time.time() < end:
        if delegate():
            return True
        else:
            time.sleep(0.1)
    return False

使用方法。

wait_until(lambda: True, 2)
    
Mika C.
Mika C.
发布于 2020-03-15
0 人赞同

In 2022 now you could use https://trio-util.readthedocs.io/en/latest/#trio_util.AsyncValue

我认为这最接近你想要的 "最平滑 "的形式。

PravCoding
PravCoding
发布于 2020-03-15
0 人赞同

I once used this in my code:

while not condition:

Hope this helps

Someone
Someone
发布于 2020-03-15
0 人赞同

This worked for me

direction = ''
t = 0
while direction == '' and t <= 1:
    sleep(0.1)
    t += 0.1

这是为了等待信号,同时确保时间限制为1秒。

Guest
Guest
发布于 2020-03-15
0 人赞同

here's how:

import time
i = false
while i == false:
    if (condition):
        i = true
        break
    
你为什么要包括时间模块?既然break语句足以让你退出循环,为什么还要设置 i=True
Aryaveer Singh Rawat
Aryaveer Singh Rawat
发布于 2020-03-15
0 人赞同

Here's my Code I used during one of my Projects :

import time
def no() :
    if (Condition !!!) :
        it got true
    else:
        time.sleep(1) /Don't remove or don't blame me if ur system gets ""DEAD""
def oh() :   /Ur main program 
    while True:
        if(bla) :
            .......
        else :
            time.sleep(1)