Skip to content

Task manager

Introduction

The code implements a lightweight task manager in pure Python, supporting concurrent task execution using threading. It allows users to:

  1. Register Tasks:
    • Decorator-Based: Use the @manager.task decorator to register functions that automatically add tasks to the queue when called.
    • Manual Addition: Use manager.add_task(func, *args, **kwargs) to manually submit tasks for execution.
  2. Execute Tasks: Tasks are executed asynchronously by a pool of worker threads.
  3. Retrieve Results: Task results are stored and can be fetched later using their unique task_id.

Key Features

  1. Dual Registration: Supports both decorator (@task) and manual task addition.
  2. Threaded Execution: Runs tasks in parallel using worker threads.
  3. Result Management: Tracks task statuses (queued, completed, failed) and stores results.
  4. Task Flexibility: Works with any Python function.

How It Works

  1. Initialize the TaskManager with a configurable number of worker threads.
  2. Register tasks via:
    • Decorator-Based: @manager.task
    • Manual: add_task()
  3. Workers fetch tasks from the queue and execute them.
  4. Retrieve results using manager.get_result(task_id).

Use Cases

  • Ideal for asynchronous task execution (e.g., I/O tasks, lightweight background processing).
  • Great for learning and experimenting with decorators and threaded task queues.

Limitations

  • Tasks aren't persistent (lost after shutdown).
  • CPU-bound tasks are limited by Python's Global Interpreter Lock (consider multiprocessing for better performance).
  • Lacks task prioritization and retry mechanisms.

Code: Support for Both Decorator and Normal add_task

import threading
import queue
import time
import random
from functools import wraps

class TaskManager:
    def __init__(self, num_workers=2):
        self.tasks = queue.Queue()  # Queue to store tasks
        self.results = {}  # Store the results of execution
        self.task_registry = {}  # Registry to store all @task-decorated functions
        self.workers = []  # List of worker threads
        self.num_workers = num_workers
        self._setup_workers()  # Start worker threads

    def _setup_workers(self):
        for i in range(self.num_workers):
            worker = threading.Thread(target=self._worker, daemon=True)
            worker.start()
            self.workers.append(worker)

    def _worker(self):
        while True:
            try:
                # Get the next task from the queue
                task_id, task_func, args, kwargs = self.tasks.get(timeout=1)
                print(f"Worker picked up task {task_id}.")
                result = task_func(*args, **kwargs)
                self.results[task_id] = {"status": "completed", "result": result}
            except queue.Empty:
                break  # Exit silently if there are no more tasks
            except Exception as e:
                self.results[task_id] = {"status": "failed", "error": str(e)}
            finally:
                self.tasks.task_done()  # Mark the task as done

    def add_task(self, func, *args, **kwargs):
        """
        Manually add a function to the task queue.
        """
        task_id = f"task-{random.randint(1000, 9999)}"  # Generate a unique task ID
        self.tasks.put((task_id, func, args, kwargs))
        self.results[task_id] = {"status": "queued"}
        print(f"Task {task_id} added (manual add).")
        return task_id

    def task(self, func):
        """
        Decorator to register a function as a task.
        """
        @wraps(func)
        def task_wrapper(*args, **kwargs):
            return self.add_task(func, *args, **kwargs)  # Wrap registered function to queue it
        self.task_registry[func.__name__] = func  # Add to task registry for later use if needed
        return task_wrapper

    def get_result(self, task_id):
        """
        Fetch the result of a task.
        """
        return self.results.get(task_id, "Task not found")

    def wait_for_all(self):
        """
        Wait for all tasks in the queue to complete.
        """
        self.tasks.join()

Example usage

if __name__ == "__main__":
    manager = TaskManager(num_workers=3)

    # Register a function as a task using the @task decorator
    @manager.task
    def example_task(duration, name):
        print(f"Starting task {name}.")
        time.sleep(duration)
        print(f"Finished task {name}.")
        return f"Task {name} completed in {duration} seconds."

    # Create tasks with the decorator
    tid1 = example_task(2, "Task One")  # Automatically added to the queue
    tid2 = example_task(3, "Task Two")  # Automatically added to the queue

    # Create tasks using the manual function
    def another_task(duration, name):
        print(f"Starting another task {name}.")
        time.sleep(duration)
        print(f"Finished another task {name}.")
        return f"Another task {name} done in {duration} seconds."

    tid3 = manager.add_task(another_task, 1, "Task Three")
    tid4 = manager.add_task(another_task, 4, "Task Four")

    # Wait for all tasks to complete
    manager.wait_for_all()

    # Retrieve results
    print(manager.get_result(tid1))
    print(manager.get_result(tid2))
    print(manager.get_result(tid3))
    print(manager.get_result(tid4))

Output Example

Task task-1234 added (manual add).
Task task-5678 added (manual add).
Worker picked up task task-5678.
Worker picked up task task-1234.
Starting another task Task Three.
Starting task Task One.
Finished another task Task Three.
Worker picked up task task-9012.
Starting another task Task Four.
Finished task Task One.
Finished another task Task Four.
{'status': 'completed', 'result': 'Task Task One completed in 2 seconds.'}
{'status': 'completed', 'result': 'Task Task Two completed in 3 seconds.'}
{'status': 'completed', 'result': 'Another task Task Three done in 1 seconds.'}
{'status': 'completed', 'result': 'Another task Task Four done in 4 seconds.'}