Task manager
Introduction¶
The code implements a lightweight task manager in pure Python, supporting concurrent task execution using threading. It allows users to:
- Register Tasks:
- Decorator-Based: Use the
@manager.taskdecorator to register functions that automatically add tasks to the queue when called. - Manual Addition: Use
manager.add_task(func, *args, **kwargs)to manually submit tasks for execution.
- Decorator-Based: Use the
- Execute Tasks: Tasks are executed asynchronously by a pool of worker threads.
- Retrieve Results: Task results are stored and can be fetched later using their unique
task_id.
Key Features¶
- Dual Registration: Supports both decorator (
@task) and manual task addition. - Threaded Execution: Runs tasks in parallel using worker threads.
- Result Management: Tracks task statuses (
queued,completed,failed) and stores results. - Task Flexibility: Works with any Python function.
How It Works¶
- Initialize the
TaskManagerwith a configurable number of worker threads. - Register tasks via:
- Decorator-Based:
@manager.task - Manual:
add_task()
- Decorator-Based:
- Workers fetch tasks from the queue and execute them.
- Retrieve results using
manager.get_result(task_id).
Use Cases¶
- Ideal for asynchronous task execution (e.g., I/O tasks, lightweight background processing).
- Great for learning and experimenting with decorators and threaded task queues.
Limitations¶
- Tasks aren't persistent (lost after shutdown).
- CPU-bound tasks are limited by Python's Global Interpreter Lock (consider multiprocessing for better performance).
- Lacks task prioritization and retry mechanisms.
Code: Support for Both Decorator and Normal add_task¶
import threading
import queue
import time
import random
from functools import wraps
class TaskManager:
def __init__(self, num_workers=2):
self.tasks = queue.Queue() # Queue to store tasks
self.results = {} # Store the results of execution
self.task_registry = {} # Registry to store all @task-decorated functions
self.workers = [] # List of worker threads
self.num_workers = num_workers
self._setup_workers() # Start worker threads
def _setup_workers(self):
for i in range(self.num_workers):
worker = threading.Thread(target=self._worker, daemon=True)
worker.start()
self.workers.append(worker)
def _worker(self):
while True:
try:
# Get the next task from the queue
task_id, task_func, args, kwargs = self.tasks.get(timeout=1)
print(f"Worker picked up task {task_id}.")
result = task_func(*args, **kwargs)
self.results[task_id] = {"status": "completed", "result": result}
except queue.Empty:
break # Exit silently if there are no more tasks
except Exception as e:
self.results[task_id] = {"status": "failed", "error": str(e)}
finally:
self.tasks.task_done() # Mark the task as done
def add_task(self, func, *args, **kwargs):
"""
Manually add a function to the task queue.
"""
task_id = f"task-{random.randint(1000, 9999)}" # Generate a unique task ID
self.tasks.put((task_id, func, args, kwargs))
self.results[task_id] = {"status": "queued"}
print(f"Task {task_id} added (manual add).")
return task_id
def task(self, func):
"""
Decorator to register a function as a task.
"""
@wraps(func)
def task_wrapper(*args, **kwargs):
return self.add_task(func, *args, **kwargs) # Wrap registered function to queue it
self.task_registry[func.__name__] = func # Add to task registry for later use if needed
return task_wrapper
def get_result(self, task_id):
"""
Fetch the result of a task.
"""
return self.results.get(task_id, "Task not found")
def wait_for_all(self):
"""
Wait for all tasks in the queue to complete.
"""
self.tasks.join()
Example usage¶
if __name__ == "__main__":
manager = TaskManager(num_workers=3)
# Register a function as a task using the @task decorator
@manager.task
def example_task(duration, name):
print(f"Starting task {name}.")
time.sleep(duration)
print(f"Finished task {name}.")
return f"Task {name} completed in {duration} seconds."
# Create tasks with the decorator
tid1 = example_task(2, "Task One") # Automatically added to the queue
tid2 = example_task(3, "Task Two") # Automatically added to the queue
# Create tasks using the manual function
def another_task(duration, name):
print(f"Starting another task {name}.")
time.sleep(duration)
print(f"Finished another task {name}.")
return f"Another task {name} done in {duration} seconds."
tid3 = manager.add_task(another_task, 1, "Task Three")
tid4 = manager.add_task(another_task, 4, "Task Four")
# Wait for all tasks to complete
manager.wait_for_all()
# Retrieve results
print(manager.get_result(tid1))
print(manager.get_result(tid2))
print(manager.get_result(tid3))
print(manager.get_result(tid4))
Output Example¶
Task task-1234 added (manual add).
Task task-5678 added (manual add).
Worker picked up task task-5678.
Worker picked up task task-1234.
Starting another task Task Three.
Starting task Task One.
Finished another task Task Three.
Worker picked up task task-9012.
Starting another task Task Four.
Finished task Task One.
Finished another task Task Four.
{'status': 'completed', 'result': 'Task Task One completed in 2 seconds.'}
{'status': 'completed', 'result': 'Task Task Two completed in 3 seconds.'}
{'status': 'completed', 'result': 'Another task Task Three done in 1 seconds.'}
{'status': 'completed', 'result': 'Another task Task Four done in 4 seconds.'}