|
发表于 2020-10-25 12:40:45
|
显示全部楼层
本帖最后由 Stubborn 于 2020-10-25 12:42 编辑
- @func_set_timeout(10)
- def askChoice():
- return input('yes or no:')
- try:
- s = askChoice()
- except FunctionTimedOut as e:
- s = 'f'
- print(s)
复制代码
- import copy
- import ctypes
- import os
- import sys
- import threading
- import types
- from functools import wraps
- RETRY_SAME_TIMEOUT = 'RETRY_SAME_TIMEOUT'
- class FunctionTimedOut(BaseException):
- def __init__(self, msg='', timedOutAfter=None, timedOutFunction=None, timedOutArgs=None, timedOutKwargs=None):
- self.timedOutAfter = timedOutAfter
- self.timedOutFunction = timedOutFunction
- self.timedOutArgs = timedOutArgs
- self.timedOutKwargs = timedOutKwargs
- if not msg:
- msg = self.getMsg()
- BaseException.__init__(self, msg)
- self.msg = msg
- def getMsg(self):
- if self.timedOutFunction is not None:
- timedOutFuncName = self.timedOutFunction.__name__
- else:
- timedOutFuncName = 'Unknown Function'
- if self.timedOutAfter is not None:
- timedOutAfterStr = "%f" % (self.timedOutAfter,)
- else:
- timedOutAfterStr = "Unknown"
- return 'Function %s (args=%s) (kwargs=%s) timed out after %s seconds.\n' % (
- timedOutFuncName, repr(self.timedOutArgs), repr(self.timedOutKwargs), timedOutAfterStr)
- def retry(self, timeout=RETRY_SAME_TIMEOUT):
- if timeout is None:
- return self.timedOutFunction(*(self.timedOutArgs), **self.timedOutKwargs)
- if timeout == RETRY_SAME_TIMEOUT:
- timeout = self.timedOutAfter
- return func_timeout(timeout, self.timedOutFunction, args=self.timedOutArgs, kwargs=self.timedOutKwargs)
- class StoppableThread(threading.Thread):
- def _stopThread(self, exception, raiseEvery=2.0):
- if self.is_alive() is False:
- return True
- self._stderr = open(os.devnull, 'w')
- joinThread = JoinThread(self, exception, repeatEvery=raiseEvery)
- joinThread._stderr = self._stderr
- joinThread.start()
- joinThread._stderr = self._stderr
- def stop(self, exception, raiseEvery=2.0):
- return self._stopThread(exception, raiseEvery)
- class JoinThread(threading.Thread):
- def __init__(self, otherThread, exception, repeatEvery=2.0):
- threading.Thread.__init__(self)
- self.otherThread = otherThread
- self.exception = exception
- self.repeatEvery = repeatEvery
- self.daemon = True
- def run(self):
- self.otherThread._Thread__stderr = self._stderr
- if hasattr(self.otherThread, '_Thread__stop'):
- self.otherThread._Thread__stop()
- while self.otherThread.is_alive():
- ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.otherThread.ident),
- ctypes.py_object(self.exception))
- self.otherThread.join(self.repeatEvery)
- try:
- self._stderr.close()
- except:
- pass
- def func_timeout(timeout, func, args=(), kwargs=None):
- def raise_exception(exception):
- raise exception[0] from None
- if not kwargs:
- kwargs = {}
- if not args:
- args = ()
- ret = []
- exception = []
- isStopped = False
- def funcwrap(args2, kwargs2):
- try:
- ret.append(func(*args2, **kwargs2))
- except FunctionTimedOut:
- pass
- except Exception as e:
- exc_info = sys.exc_info()
- if isStopped is False:
- e.__traceback__ = exc_info[2].tb_next
- exception.append(e)
- thread = StoppableThread(target=funcwrap, args=(args, kwargs))
- thread.daemon = True
- thread.start()
- thread.join(timeout)
- stopException = None
- if thread.is_alive():
- isStopped = True
- class FunctionTimedOutTempType(FunctionTimedOut):
- def __init__(self):
- return FunctionTimedOut.__init__(self, '', timeout, func, args, kwargs)
- FunctionTimedOutTemp = type(
- 'FunctionTimedOut' + str(hash("%d_%d_%d_%d" % (id(timeout), id(func), id(args), id(kwargs)))),
- FunctionTimedOutTempType.__bases__, dict(FunctionTimedOutTempType.__dict__))
- stopException = FunctionTimedOutTemp
- thread._stopThread(stopException)
- thread.join(min(.1, timeout / 50.0))
- raise FunctionTimedOut('', timeout, func, args, kwargs)
- else:
- thread.join(.5)
- if exception:
- raise_exception(exception)
- if ret:
- return ret[0]
- def func_set_timeout(timeout, allowOverride=False):
- defaultTimeout = copy.copy(timeout)
- isTimeoutAFunction = bool(issubclass(timeout.__class__, (
- types.FunctionType, types.MethodType, types.LambdaType, types.BuiltinFunctionType, types.BuiltinMethodType)))
- if not isTimeoutAFunction:
- if not issubclass(timeout.__class__, (float, int)):
- try:
- timeout = float(timeout)
- except:
- raise ValueError(
- 'timeout argument must be a float/int for number of seconds, or a function/lambda which gets passed the function arguments and returns a calculated timeout (as float or int). Passed type: < %s > is not of any of these, and cannot be converted to a float.' % (
- timeout.__class__.__name__,))
- if not allowOverride and not isTimeoutAFunction:
- def _function_decorator(func):
- return wraps(func)(lambda *args, **kwargs: func_timeout(defaultTimeout, func, args=args, kwargs=kwargs))
- return _function_decorator
- if not isTimeoutAFunction:
- def _function_decorator(func):
- def _function_wrapper(*args, **kwargs):
- if 'forceTimeout' in kwargs:
- useTimeout = kwargs.pop('forceTimeout')
- else:
- useTimeout = defaultTimeout
- return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
- return wraps(func)(_function_wrapper)
- return _function_decorator
- timeoutFunction = timeout
- if allowOverride:
- def _function_decorator(func):
- def _function_wrapper(*args, **kwargs):
- if 'forceTimeout' in kwargs:
- useTimeout = kwargs.pop('forceTimeout')
- else:
- useTimeout = timeoutFunction(*args, **kwargs)
- return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
- return wraps(func)(_function_wrapper)
- return _function_decorator
- def _function_decorator(func):
- def _function_wrapper(*args, **kwargs):
- useTimeout = timeoutFunction(*args, **kwargs)
- return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
- return wraps(func)(_function_wrapper)
- return _function_decorator
复制代码 |
|