def test ( _timeout = 2 ) : # 这里ls -lrt可以换成自己的任意模块,这里stdout也可以是文件流 with subprocess . Popen ( "ls -lrt" , shell = True , stdout = subprocess . PIPE ) as p : try : bs = p . communicate ( timeout = _timeout ) [ 0 ] if bs : bs_split = eval ( bs ) res = '' for line in bs_split : res += line . decode ( ) print ( res ) except Exception as e : # 未在指定时间内完成,此处会有异常。主进程在根据情况做处理 print ( e ) p . terminate ( ) test ( 1 )

这里是开启了pipe管道模式,让主进程与子进程进行交互。pipe管道有大小限制(默认64k),如果我们先使用 Popen.poll() 或者 Popen.wait() 后在readline或者readlines,在子进程输出数据较多的情况下很容易造成死锁,原因是子进程写满管道后需要等待管道中有空余空间才能再次写入。 Popen.communicate 就能完成如此操作。这里推荐大家用 subprocess ,因为类似 os.popen 等都是调用 subprocess 。子进程会有巨量输出的情况下,不能使用这种模式。

os.popen

def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering is None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdout=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
    else:
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdin=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

主线程中对函数设置超时时间

import signal
import time
from functools import wraps
import signal
class TimeoutException(Exception):
def timeout_func(timeout=1, default_return=None):
    def decorateFunction(function):
        @wraps(function)
        def wrapper(*args, **kv):
            def handler(signum, frame):
                raise TimeoutException()
            # 设置回调handler
            signal.signal(signal.SIGALRM, handler)
            signal.alarm(timeout)
            try:
                result = function(*args, **kv)
            except TimeoutException as e:
                print(e)
                result = default_return
            finally:
                signal.alarm(0)
                signal.signal(signal.SIGALRM, signal.SIG_DFL)
            return result
        return wrapper
    return decorateFunction
# 装饰器模式
@timeout_func(timeout=2, default_return=0)
def test_timeout(sec, a, b):
    time.sleep(sec)
    return a + b
print(test_timeout(1.99, 2, 3))
# 输出5

注意 : signal.alarm只有linux下才有,window下没有。signal.alarm设置了告警时间以及回调函数后,只会在主线程中进行中断回调。

你可以使用 subprocess.run 函数的 timeout 参数来设置超时时间。如果超时,它会引发一个 TimeoutExpired 异常,你可以在 try-except 语句中捕获这个异常,然后使用 subprocess.Popen 对象的 terminate 方法来终止子进程。 下面是一个例子: import subprocesstry: completed_process = ...
使用subprocess模块,本模块为开辟子进程去执行子程序提供了统一的接口 process = subprocess.Popen(cmd,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE) output, unused_err = process.communicate(timeout=3
1、os.system 该函数返回命令执行结果的返回值,并不是返回命令的执行输出, system()函数在执行过程中进行了以下三步操作: 1.fork一个子进程; 2.在子进程中调用exec函数去执行命令; 3.在父进程中调用wait(阻塞)去等待子进程结束。 对于fork失败,system()函数返回-1。 **** >>> os.system('ls -a')...
* test11.py ...p = subprocess.Popen(“python test11.py”, shell=True, stdout=subprocess.PIPE) # None表示正在执行中 while p.poll() is None:  out = p.stdout.readline()  if out !