pyThreadWorker¶
- Version
0.10.0
- Github
A library helping you create threaded App. It adds event queue, parent, children to each thread.
The document would mention “thread” object multiple times, but it actually
refers to Worker
instead of builtin threading.Thread
.
Event loop¶
This library implements an event loop for each thread. Each thread has its own event queue. With the event loop, we can pause/resume/stop the thread by sending specific events to event queue. For example:
from worker import create_worker, wait_forever
@create_worker
def worker():
print("thread created")
wait_forever()
In the previous code:
A thread is created
The thread prints “thread created”
The thread enters the event loop
The event loop does following stuff:
Process events.
Call listeners.
If there is a “STOP_THREAD” event,
WorkerExit
would be raised. Keep this in mind and carefully add “breakpoints” in your application.
Event system¶
When you stop a thread by calling Worker.stop()
, the thread wouldn’t stop
immediately:
from worker import create_worker, wait_forever
@create_worker
def thread():
wait_forever()
thread.stop()
print(thread.is_running()) # true
When stop
is called, a “STOP_THREAD” event is put in worker’s event queue,
after the thread processing the event, the thread would exit the event loop by
raising WorkerExit
.
To wait until the thread exits:
from worker import create_worker, wait_forever, wait_thread
@create_worker
def thread():
wait_forever()
thread.stop()
wait_thread(thread)
print(thread.is_running()) # false
Daemon thread¶
A daemon thread is a thread which won’t prevent process to exit. This is dangerous that the daemon thread would be terminated without any cleanup.
In this library, there is no “real” daemon thread. However, we do have a
daemon
argument when creating threads, but it works in a different way:
When a thread is created, it has a
parent
attribute pointing to the creator thread (the parent thread).When the parent thread exits, it would broadcast a “STOP_THREAD” event to its children and wait until all child threads are stopped.
However, if the child thread is marked as
daemon=True
, the parent thread will not wait it. Since the daemon child thread had received the “STOP_THREAD” event, it would eventually stop. But the parent thread doesn’t know when.
Handle WorkerExit¶
If you want to cleanup something:
from worker import create_worker, wait_forever
@create_worker
def server_thread():
server = Server() # some kinds of multiprocess server
server.run()
try:
wait_forever()
finally:
server.terminate() # the server would be correctly terminated when
# the event loop raises WorkerExit
# ... do something ...
server_thread.stop()
It would look better if the cleanup is wrapped in a contextmanager:
from contextlib import contextmanager
from worker import create_worker, wait_forever
@contextmanager
def open_server():
server = Server()
server.run()
try:
yield server
finally:
server.terminate()
@create_worker
def server_thread():
with open_server() as server:
wait_forever()
# ... do something ...
server_thread.stop()
Exceptions¶
- exception WorkerExit¶
Raise this error to exit the thread.
Functions¶
- is_main(thread=None)¶
Check if the thread is the main thread.
- sleep(timeout)¶
Use this function to replace
time.sleep()
, to enter the event loop.This function is a shortcut of
current().wait_timeout(timeout)
.- Parameters
timeout (float) – time to wait.
- create_worker(callback: Callable, *args, parent: bool | None | worker.Worker = None, daemon: bool | None = None, print_traceback: bool = True, **kwargs) worker.Worker ¶
Create and start a
Worker
.callback
,parent
,daemon
, andprint_traceback
are sent toWorker
, other arguments are sent toWorker.start()
.This function can be used as a decorator:
def my_task(): ... my_thread = create_worker(my_task, daemon=True) # my_thread is running # v.s. @create_worker(daemon=True) def my_thread(): ... # my_thread is running
- Return type
- async_(callback: Callable, *args, **kwargs) worker.Async ¶
Create and start an
Async
task.callback
will be sent toAsync
and other arguments will be sent toAsync.start()
.
Shortcut functions for the current thread¶
- listen(*args, **kwargs)¶
A shortcut function of
current().listen()
.Note
Listeners created by
listen
shortcut would havepermanent=False
, so that the listener wouldn’t be added multiple time when the thread is restarted.
- unlisten(*args, **kwargs)¶
A shortcut function of
current().unlisten()
.
- later(*args, **kwargs)¶
A shortcut function of
current().later()
.
- update(*args, **kwargs)¶
A shortcut function of
current().update()
.
- wait_timeout(*args, **kwargs)¶
A shortcut function of
current().wait_timeout()
.
- wait_forever(*args, **kwargs)¶
A shortcut function of
current().wait_forever()
.
- wait_thread(*args, **kwargs)¶
A shortcut function of
current().wait_thread()
.
- wait_event(*args, **kwargs)¶
A shortcut function of
current().wait_event()
.
- wait_until(*args, **kwargs)¶
A shortcut function of
current().wait_until()
.
With these shortcuts, we can write code without referencing to threads:
from worker import listen, wait_forever, create_worker
@create_worker
def printer():
# this function runs in a new thread
@listen("PRINT") # the listener is registered on printer thread
def _(event):
print(event.data)
wait_forever() # printer's event loop
printer.fire("PRINT", "foo")
printer.fire("PRINT", "bar")
printer.stop().join()
Classes¶
- class Worker(task=None, parent=None, daemon=None, print_traceback=True)¶
The main Worker class.
- Parameters
task (Callable) – The function to call when the thread starts. If this is not provided, use
Worker.wait_forever()
as the default.The parent thread.
If parent is None (the default), it uses the current thread as the parent, unless the current thread is the main thread.
If parent is False. The thread is parent-less.
daemon (bool) – Create a daemon thread. See also
is_daemon()
.print_traceback – If True, print error traceback when the thread is crashed (
task
raises an error).
- fire(event, *args, **kwargs)¶
Put an event to the event queue.
- is_daemon()¶
Return true if the thread is a daemon thread.
If
daemon
flag is not None, return the flag value.Otherwise, return
parent.is_daemon()
.If there is no parent thread, return False.
- is_running()¶
Return True if the thread is live.
- join()¶
Join the thread.
join()
is a little different withwait_thread()
:join()
uses nativethreading.Thread.join()
, it doesn’t enter the event loop.wait_thread()
enters the event loop and waits for theWAIT_THREAD_PENDING_DONE
event. It also has a return value:(thread_err, thread_ret)
.
- later(callback, *args, timeout=0, **kwargs)¶
Schedule a task on this thread.
- Parameters
callback (callable) – The task that would be executed.
timeout (float) – In seconds. Wait some time before executing the task.
- Returns
If
timeout
is used, this method returns a daemonWorker
, that would firstsleep(timeout)
before executing the task. Otherwise return None.- Return type
Worker or None
Other arguments are sent to the callback.
The scheduled task would be executed inside the event loop i.e. inside the event listener, so you should avoid blocking in the task.
If a
Worker
is returned, you canWorker.stop()
the worker to cancel the task before the task is executed.
- listen(callback, *args, **kwargs)¶
Register a listener. See
Listener
for argument details.If
callback
is not provided, this method becomes a decorator, so you can use it like:@thread.listen("EVENT_NAME") def handler(event): # handle event...
- pause()¶
Pause the thread.
- resume()¶
Resume the thread.
- start(*args, **kwargs)¶
Start the thread. The arguments are passed into
task
.
- stop()¶
Stop the thread.
- unlisten(callback)¶
Unlisten a callback
- update()¶
Process all events inside the event queue. This allows you to create a break point without waiting.
Use this to hook the event loop into other frameworks. For example, tkinter:
from tkinter import Tk from worker import update root = Tk() def worker_update(): update() root.after(100, worker_update) worker_update() root.mainloop()
- wait_event(name, timeout=None, target=None)¶
Wait for specific event.
- wait_forever()¶
Create an infinite event loop.
- wait_thread(thread)¶
Wait thread to end. Return
(thread_error, thread_result)
tuple.
- class Async(task)¶
Bases:
worker.Worker
Async class. Create asynchronous (threaded) task.
- Parameters
task (Callable) – The worker target.
This class would initiate a parent-less, daemon thread without printing traceback.
- get()¶
Get the result.
If the task failed, this method raises an error. If the task is not completed, enter the event loop.
- class Defer¶
Defer object. Handy in cross-thread communication. For example, update tkinter GUI in the main thread:
from tkinter import * from worker import current, update, create_worker, Defer, is_main main_thread = current() root = Tk() def hook(): root.after(100, hook) update() @create_worker def worker(): i = 0 def update_some_gui(on_finished=None): print("gui", is_main()) def remove_button(): button.destroy() on_finished("OK") button = Button( root, text="Click me to fulfill defer {}".format(i), command=remove_button ) button.pack() while True: defer = Defer() print("worker", is_main()) main_thread.later(update_some_gui, on_finished=defer.resolve) defer.get() i += 1 hook() root.mainloop() worker.stop()
- get()¶
Enter the event loop and wait util the defer is fulfilled.
If the defer is resolved, return the result. If the defer is rejected, raise the result.
- reject(err)¶
Reject with
err
- resolve(value)¶
Resolve with
value
- class Channel¶
Channel class. Broadcast events to multiple threads.
- pub(*args, **kwargs)¶
Publish an event to the channel. See
Event
for the arguments.Events published to the channel are broadcasted to all subscriber threads.
- class Event(name, data=None, *, bubble=False, broadcast=False, target=None)¶
Event data class.
- Parameters
name (str) – Event name.
data – Event data.
bubble (bool) – If true then the event would be bubbled up through parent.
broadcast (bool) – If true then the event would be broadcasted to all child threads.
target (Worker) – Event target. If none then set to the thread calling
Worker.fire
.
- class Listener(callback, event_name, *, target=None, priority=0, once=False, permanent=True)¶
Listener data class.
- Parameters
callback (callable) – The listener callback.
event_name (str) – The event name.
target (Worker) – Only match specific
event.target
.priority (int) – The listener are ordered in priority. The higher is called first.
once (bool) – If True then remove the listener once the listener is called.
permanent (bool) – If False then remove the listener once the thread is stopped. Listeners created by
listen()
shortcut are non-permanent listeners.