import copy
import ctypes
import os
import sys
import threading
import types
from functools import wraps
RETRY_SAME_TIMEOUT = 'RETRY_SAME_TIMEOUT'
class FunctionTimedOut(BaseException):
def __init__(self, msg='', timedOutAfter=None, timedOutFunction=None, timedOutArgs=None, timedOutKwargs=None):
self.timedOutAfter = timedOutAfter
self.timedOutFunction = timedOutFunction
self.timedOutArgs = timedOutArgs
self.timedOutKwargs = timedOutKwargs
if not msg:
msg = self.getMsg()
BaseException.__init__(self, msg)
self.msg = msg
def getMsg(self):
if self.timedOutFunction is not None:
timedOutFuncName = self.timedOutFunction.__name__
else:
timedOutFuncName = 'Unknown Function'
if self.timedOutAfter is not None:
timedOutAfterStr = "%f" % (self.timedOutAfter,)
else:
timedOutAfterStr = "Unknown"
return 'Function %s (args=%s) (kwargs=%s) timed out after %s seconds.\n' % (
timedOutFuncName, repr(self.timedOutArgs), repr(self.timedOutKwargs), timedOutAfterStr)
def retry(self, timeout=RETRY_SAME_TIMEOUT):
if timeout is None:
return self.timedOutFunction(*(self.timedOutArgs), **self.timedOutKwargs)
if timeout == RETRY_SAME_TIMEOUT:
timeout = self.timedOutAfter
return func_timeout(timeout, self.timedOutFunction, args=self.timedOutArgs, kwargs=self.timedOutKwargs)
class StoppableThread(threading.Thread):
def _stopThread(self, exception, raiseEvery=2.0):
if self.is_alive() is False:
return True
self._stderr = open(os.devnull, 'w')
joinThread = JoinThread(self, exception, repeatEvery=raiseEvery)
joinThread._stderr = self._stderr
joinThread.start()
joinThread._stderr = self._stderr
def stop(self, exception, raiseEvery=2.0):
return self._stopThread(exception, raiseEvery)
class JoinThread(threading.Thread):
def __init__(self, otherThread, exception, repeatEvery=2.0):
threading.Thread.__init__(self)
self.otherThread = otherThread
self.exception = exception
self.repeatEvery = repeatEvery
self.daemon = True
def run(self):
self.otherThread._Thread__stderr = self._stderr
if hasattr(self.otherThread, '_Thread__stop'):
self.otherThread._Thread__stop()
while self.otherThread.is_alive():
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.otherThread.ident),
ctypes.py_object(self.exception))
self.otherThread.join(self.repeatEvery)
try:
self._stderr.close()
except:
pass
def func_timeout(timeout, func, args=(), kwargs=None):
def raise_exception(exception):
raise exception[0] from None
if not kwargs:
kwargs = {}
if not args:
args = ()
ret = []
exception = []
isStopped = False
def funcwrap(args2, kwargs2):
try:
ret.append(func(*args2, **kwargs2))
except FunctionTimedOut:
pass
except Exception as e:
exc_info = sys.exc_info()
if isStopped is False:
e.__traceback__ = exc_info[2].tb_next
exception.append(e)
thread = StoppableThread(target=funcwrap, args=(args, kwargs))
thread.daemon = True
thread.start()
thread.join(timeout)
stopException = None
if thread.is_alive():
isStopped = True
class FunctionTimedOutTempType(FunctionTimedOut):
def __init__(self):
return FunctionTimedOut.__init__(self, '', timeout, func, args, kwargs)
FunctionTimedOutTemp = type(
'FunctionTimedOut' + str(hash("%d_%d_%d_%d" % (id(timeout), id(func), id(args), id(kwargs)))),
FunctionTimedOutTempType.__bases__, dict(FunctionTimedOutTempType.__dict__))
stopException = FunctionTimedOutTemp
thread._stopThread(stopException)
thread.join(min(.1, timeout / 50.0))
raise FunctionTimedOut('', timeout, func, args, kwargs)
else:
thread.join(.5)
if exception:
raise_exception(exception)
if ret:
return ret[0]
def func_set_timeout(timeout, allowOverride=False):
defaultTimeout = copy.copy(timeout)
isTimeoutAFunction = bool(issubclass(timeout.__class__, (
types.FunctionType, types.MethodType, types.LambdaType, types.BuiltinFunctionType, types.BuiltinMethodType)))
if not isTimeoutAFunction:
if not issubclass(timeout.__class__, (float, int)):
try:
timeout = float(timeout)
except:
raise ValueError(
'timeout argument must be a float/int for number of seconds, or a function/lambda which gets passed the function arguments and returns a calculated timeout (as float or int). Passed type: < %s > is not of any of these, and cannot be converted to a float.' % (
timeout.__class__.__name__,))
if not allowOverride and not isTimeoutAFunction:
def _function_decorator(func):
return wraps(func)(lambda *args, **kwargs: func_timeout(defaultTimeout, func, args=args, kwargs=kwargs))
return _function_decorator
if not isTimeoutAFunction:
def _function_decorator(func):
def _function_wrapper(*args, **kwargs):
if 'forceTimeout' in kwargs:
useTimeout = kwargs.pop('forceTimeout')
else:
useTimeout = defaultTimeout
return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
return wraps(func)(_function_wrapper)
return _function_decorator
timeoutFunction = timeout
if allowOverride:
def _function_decorator(func):
def _function_wrapper(*args, **kwargs):
if 'forceTimeout' in kwargs:
useTimeout = kwargs.pop('forceTimeout')
else:
useTimeout = timeoutFunction(*args, **kwargs)
return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
return wraps(func)(_function_wrapper)
return _function_decorator
def _function_decorator(func):
def _function_wrapper(*args, **kwargs):
useTimeout = timeoutFunction(*args, **kwargs)
return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
return wraps(func)(_function_wrapper)
return _function_decorator