| 
 | 
 
 
发表于 2020-10-25 12:40:45
|
显示全部楼层
 
 
 
 本帖最后由 Stubborn 于 2020-10-25 12:42 编辑  
- @func_set_timeout(10)
 
 - def askChoice():
 
 -     return input('yes or no:')
 
  
- try:
 
 -     s = askChoice()
 
 - except FunctionTimedOut as e:
 
 -     s = 'f'
 
 - print(s)
 
  复制代码 
- import copy
 
 - import ctypes
 
 - import os
 
 - import sys
 
 - import threading
 
 - import types
 
 - from functools import wraps
 
  
- RETRY_SAME_TIMEOUT = 'RETRY_SAME_TIMEOUT'
 
  
 
- class FunctionTimedOut(BaseException):
 
  
-     def __init__(self, msg='', timedOutAfter=None, timedOutFunction=None, timedOutArgs=None, timedOutKwargs=None):
 
  
-         self.timedOutAfter = timedOutAfter
 
  
-         self.timedOutFunction = timedOutFunction
 
 -         self.timedOutArgs = timedOutArgs
 
 -         self.timedOutKwargs = timedOutKwargs
 
  
-         if not msg:
 
 -             msg = self.getMsg()
 
  
-         BaseException.__init__(self, msg)
 
  
-         self.msg = msg
 
  
-     def getMsg(self):
 
 -         if self.timedOutFunction is not None:
 
 -             timedOutFuncName = self.timedOutFunction.__name__
 
 -         else:
 
 -             timedOutFuncName = 'Unknown Function'
 
 -         if self.timedOutAfter is not None:
 
 -             timedOutAfterStr = "%f" % (self.timedOutAfter,)
 
 -         else:
 
 -             timedOutAfterStr = "Unknown"
 
  
-         return 'Function %s (args=%s) (kwargs=%s) timed out after %s seconds.\n' % (
 
 -             timedOutFuncName, repr(self.timedOutArgs), repr(self.timedOutKwargs), timedOutAfterStr)
 
  
-     def retry(self, timeout=RETRY_SAME_TIMEOUT):
 
 -         if timeout is None:
 
 -             return self.timedOutFunction(*(self.timedOutArgs), **self.timedOutKwargs)
 
  
-         if timeout == RETRY_SAME_TIMEOUT:
 
 -             timeout = self.timedOutAfter
 
  
-         return func_timeout(timeout, self.timedOutFunction, args=self.timedOutArgs, kwargs=self.timedOutKwargs)
 
  
 
- class StoppableThread(threading.Thread):
 
  
-     def _stopThread(self, exception, raiseEvery=2.0):
 
 -         if self.is_alive() is False:
 
 -             return True
 
  
-         self._stderr = open(os.devnull, 'w')
 
 -         joinThread = JoinThread(self, exception, repeatEvery=raiseEvery)
 
 -         joinThread._stderr = self._stderr
 
 -         joinThread.start()
 
 -         joinThread._stderr = self._stderr
 
  
-     def stop(self, exception, raiseEvery=2.0):
 
 -         return self._stopThread(exception, raiseEvery)
 
  
 
- class JoinThread(threading.Thread):
 
  
-     def __init__(self, otherThread, exception, repeatEvery=2.0):
 
 -         threading.Thread.__init__(self)
 
 -         self.otherThread = otherThread
 
 -         self.exception = exception
 
 -         self.repeatEvery = repeatEvery
 
 -         self.daemon = True
 
  
-     def run(self):
 
  
-         self.otherThread._Thread__stderr = self._stderr
 
 -         if hasattr(self.otherThread, '_Thread__stop'):
 
 -             self.otherThread._Thread__stop()
 
 -         while self.otherThread.is_alive():
 
 -             ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.otherThread.ident),
 
 -                                                        ctypes.py_object(self.exception))
 
 -             self.otherThread.join(self.repeatEvery)
 
  
-         try:
 
 -             self._stderr.close()
 
 -         except:
 
 -             pass
 
  
 
- def func_timeout(timeout, func, args=(), kwargs=None):
 
 -     def raise_exception(exception):
 
 -         raise exception[0] from None
 
  
-     if not kwargs:
 
 -         kwargs = {}
 
 -     if not args:
 
 -         args = ()
 
  
-     ret = []
 
 -     exception = []
 
 -     isStopped = False
 
  
-     def funcwrap(args2, kwargs2):
 
 -         try:
 
 -             ret.append(func(*args2, **kwargs2))
 
 -         except FunctionTimedOut:
 
 -             pass
 
 -         except Exception as e:
 
 -             exc_info = sys.exc_info()
 
 -             if isStopped is False:
 
 -                 e.__traceback__ = exc_info[2].tb_next
 
 -                 exception.append(e)
 
  
-     thread = StoppableThread(target=funcwrap, args=(args, kwargs))
 
 -     thread.daemon = True
 
  
-     thread.start()
 
 -     thread.join(timeout)
 
  
-     stopException = None
 
 -     if thread.is_alive():
 
 -         isStopped = True
 
  
-         class FunctionTimedOutTempType(FunctionTimedOut):
 
 -             def __init__(self):
 
 -                 return FunctionTimedOut.__init__(self, '', timeout, func, args, kwargs)
 
  
-         FunctionTimedOutTemp = type(
 
 -             'FunctionTimedOut' + str(hash("%d_%d_%d_%d" % (id(timeout), id(func), id(args), id(kwargs)))),
 
 -             FunctionTimedOutTempType.__bases__, dict(FunctionTimedOutTempType.__dict__))
 
  
-         stopException = FunctionTimedOutTemp
 
 -         thread._stopThread(stopException)
 
 -         thread.join(min(.1, timeout / 50.0))
 
 -         raise FunctionTimedOut('', timeout, func, args, kwargs)
 
 -     else:
 
 -         thread.join(.5)
 
  
-     if exception:
 
 -         raise_exception(exception)
 
  
-     if ret:
 
 -         return ret[0]
 
  
 
- def func_set_timeout(timeout, allowOverride=False):
 
 -     defaultTimeout = copy.copy(timeout)
 
  
-     isTimeoutAFunction = bool(issubclass(timeout.__class__, (
 
 -         types.FunctionType, types.MethodType, types.LambdaType, types.BuiltinFunctionType, types.BuiltinMethodType)))
 
  
-     if not isTimeoutAFunction:
 
 -         if not issubclass(timeout.__class__, (float, int)):
 
 -             try:
 
 -                 timeout = float(timeout)
 
 -             except:
 
 -                 raise ValueError(
 
 -                     'timeout argument must be a float/int for number of seconds, or a function/lambda which gets passed the function arguments and returns a calculated timeout (as float or int). Passed type: < %s > is not of any of these, and cannot be converted to a float.' % (
 
 -                         timeout.__class__.__name__,))
 
  
-     if not allowOverride and not isTimeoutAFunction:
 
 -         def _function_decorator(func):
 
 -             return wraps(func)(lambda *args, **kwargs: func_timeout(defaultTimeout, func, args=args, kwargs=kwargs))
 
  
-         return _function_decorator
 
  
-     if not isTimeoutAFunction:
 
 -         def _function_decorator(func):
 
 -             def _function_wrapper(*args, **kwargs):
 
 -                 if 'forceTimeout' in kwargs:
 
 -                     useTimeout = kwargs.pop('forceTimeout')
 
 -                 else:
 
 -                     useTimeout = defaultTimeout
 
  
-                 return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
 
  
-             return wraps(func)(_function_wrapper)
 
  
-         return _function_decorator
 
  
-     timeoutFunction = timeout
 
  
-     if allowOverride:
 
 -         def _function_decorator(func):
 
 -             def _function_wrapper(*args, **kwargs):
 
 -                 if 'forceTimeout' in kwargs:
 
 -                     useTimeout = kwargs.pop('forceTimeout')
 
 -                 else:
 
 -                     useTimeout = timeoutFunction(*args, **kwargs)
 
  
-                 return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
 
  
-             return wraps(func)(_function_wrapper)
 
  
-         return _function_decorator
 
  
-     def _function_decorator(func):
 
 -         def _function_wrapper(*args, **kwargs):
 
 -             useTimeout = timeoutFunction(*args, **kwargs)
 
  
-             return func_timeout(useTimeout, func, args=args, kwargs=kwargs)
 
  
-         return wraps(func)(_function_wrapper)
 
  
-     return _function_decorator
 
  复制代码 |   
 
 
 
 |