-
Notifications
You must be signed in to change notification settings - Fork 0
/
threading_decorators.py
47 lines (32 loc) · 1.07 KB
/
threading_decorators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# coding=utf-8
# Author: Diego Gonzalez Chavez
# email : [email protected] / [email protected]
import threading
import types
class _Exception_StopThread(Exception):
pass
class Threaded_Function(object):
def __init__(self, target):
self._target = target
self.thread = None
def stop(self):
if self.thread is not None:
self.thread._TD_stop = True
def __get__(self, instance, owner):
# Allows the use of the decorator inside classes
return types.MethodType(self, instance)
def __call__(self, *args, **kwargs):
def stoppable_target():
try:
self._target(*args, **kwargs)
except _Exception_StopThread:
pass
self.thread = threading.Thread(target=stoppable_target)
self.thread._TD_stop = False
self.thread.start()
def as_thread(target):
'''Executes traget as a thread'''
return Threaded_Function(target)
def check_stop():
if threading.current_thread()._TD_stop:
raise _Exception_StopThread()