tasktronaut package

Subpackages

Submodules

tasktronaut.backend module

Backend module.

class tasktronaut.backend.Backend

Bases: ABC

Abstract base class defining the interface for process execution backends.

Backends are responsible for enqueueing tasks, managing job dependencies, and executing process lifecycle callbacks. Subclasses must implement the abstract enqueue methods to integrate with specific job queue systems.

The backend handles three main phases of process execution: - Process start callbacks - Individual task execution with error handling - Process completion callbacks

abstractmethod enqueue_perform_complete(identifier, module_name, definition_class, depends_on=None)

Enqueue a process completion callback.

Schedules the execution of the process completion callback, which allows the process definition to perform finalization logic when all tasks have completed. Can depend on one or more jobs.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

  • depends_on (Optional[Union[Job, List[Job]]]) – Optional job or list of jobs that this job depends on. If provided, this job will not execute until all dependencies complete.

Returns:

Job object representing the enqueued task

Return type:

Job

abstractmethod enqueue_perform_start(identifier, module_name, definition_class, depends_on=None)

Enqueue a process start callback.

Schedules the execution of the process start callback, which allows the process definition to perform initialization logic when execution begins.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

  • depends_on (Optional[Job]) – Optional job that this job depends on. If provided, this job will not execute until the dependency completes.

Returns:

Job object representing the enqueued task

Return type:

Job

abstractmethod enqueue_perform_task(identifier, module_name, definition_class, function_name, description, kwargs, depends_on=None)

Enqueue a process task for execution.

Schedules the execution of a specific task function within a process definition. The task will be executed with the provided keyword arguments and can optionally depend on another job.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

  • function_name (str) – Name of the task method to execute

  • description (Description) – Human-readable description of the task

  • kwargs (Dict[str, Any]) – Keyword arguments to pass to the task function

  • depends_on (Optional[Job]) – Optional job that this job depends on. If provided, this job will not execute until the dependency completes.

Returns:

Job object representing the enqueued task

Return type:

Job

static perform_complete(identifier, module_name, definition_class)

Execute the process completion callback.

Loads the process definition and invokes its completion callback. This method is typically called by the backend after all process tasks have completed successfully.

It is implemented as a staticmethod, to allow for the most flexibility in backend implementations.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

static perform_start(identifier, module_name, definition_class)

Execute the process start callback.

Loads the process definition and invokes its start callback. This method is typically called by the backend after the job has been dequeued.

It is implemented as a staticmethod, to allow for the most flexibility in backend implementations.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

classmethod perform_task(job, identifier, module_name, definition_class, function_name, description, kwargs)

Execute a process task with comprehensive error handling.

Loads the process definition, creates an execution context, and invokes the specified task function. Handles three categories of errors:

  • NonRetryableProcessError: Logs error, calls on_failed callback, and re-raises

  • CancelProcessError: Cancels the job and its dependents, calls on_cancelled callback

  • Other exceptions: Logs error, calls on_failed callback, and re-raises for retry

The task execution is wrapped with the definition’s around_task context manager, allowing the definition to perform setup and teardown logic.

Parameters:
  • job (Job) – Job object that can be cancelled or have dependents deleted

  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

  • function_name (str) – Name of the task method to execute

  • description (Description) – Human-readable description of the task

  • kwargs (Dict[str, Any]) – Keyword arguments to pass to the task function

Raises:
  • NonRetryableProcessError – When task execution raises a non-retryable error

  • Exception – When task execution raises an unhandled error (retryable)

class tasktronaut.backend.Job(*args, **kwargs)

Bases: Protocol

Protocol defining the interface for a job object.

A job represents an enqueued unit of work in the backend system and provides methods to manage its lifecycle.

cancel()

Cancel the execution of this job.

Stops the job from being executed if it hasn’t already started.

delete_dependents()

Delete all jobs that depend on this job.

Removes any downstream jobs that were scheduled to run after completion of this job.

tasktronaut.builder module

Builder module.

class tasktronaut.builder.Builder(process, kwargs, options=None, description=None)

Bases: object

A builder class for constructing and configuring process workflows.

The Builder provides a fluent interface for defining task execution graphs, including sequential and concurrent execution patterns, argument validation, iteration, and transformation capabilities.

Parameters:
  • process (Process) – The process instance being built

  • kwargs (Dict[str, Any]) – Keyword arguments to pass to tasks and transformations

  • options (SimpleNamespace) – Optional configuration options for the process

  • description (Description) – Optional description for the builder context

Variables:
  • process – The process instance being constructed

  • description – Description for the current builder context

  • options – Configuration options converted to SimpleNamespace

  • kwargs – Keyword arguments for task execution

concurrent(description=None)

Create a concurrent execution block where tasks run in parallel.

This context manager creates a new ConcurrentProcess that executes its contained tasks concurrently. The resulting process is appended to the parent process’s steps.

Parameters:

description (Description) – Optional description for the concurrent block

Returns:

Builder instance for the concurrent process

Return type:

BuilderIterator

Yield:

Builder configured for concurrent execution

Example:

with builder.concurrent() as b:
    b.task(task1)
    b.task(task2)
each(func, description=None)

Iterate over items yielded by the given func method, and yields a builder for each iteration.

Executes the provided function to generate items, then yields a Builder instance for each item with updated kwargs.

The provided function can yield either a dict of kwargs or a tuple of (kwargs, description).

Parameters:
  • func (ForEachMethod) – A function that yields dicts or (dict, str) tuples. The function receives the current kwargs as parameters.

  • description (Description) – Optional default description for iterations

Returns:

Iterator of Builder instances, one per yielded item

Return type:

BuilderIterator

Yield:

Builder with kwargs updated for each iteration

Example:

def items(self, count: int, **_):
    for i in range(count):
        yield {"index": i}

for b in builder.each(self.items):
    b.task(task_for_item)

Note

The provided function can yield either dict or (dict, str) tuples, where the string provides a per-item description.

expected_arguments(**kwargs)

Validate that expected arguments are present and of the correct type.

This method checks that all specified arguments exist in the builder’s kwargs and validates their types using Pydantic type adapters. Type validation is only performed when a non-None type is specified.

Parameters:

kwargs (Dict[str, Optional[Type]]) – Mapping of argument names to expected types. Use None to skip type validation for an argument.

Raises:
  • TypeError – If a required argument is not found in kwargs

  • ValueError – If an argument’s type does not match the expected type

Note

This method does not check for additional arguments beyond those specified.

Example:

builder.expected_arguments(
    foo=str,
    bar=int,
    baz=None,  # No type checking
    qux=Optional[datetime.date]
)
option(name, default=None)

Retrieve an option value with optional default.

Accesses configuration options passed to the builder, returning the specified option’s value or a default if the option is not set.

Parameters:
  • name (str) – The name of the option to retrieve

  • default (Optional[Value]) – Default value to return if option is not found

Returns:

The option’s value or the default

Return type:

Value

Example:

if builder.option("debug", False):
    builder.task(verbose_logging)
sequential(description=None)

Create a sequential execution block where tasks run one after another.

This context manager creates a new SequentialProcess that executes its contained tasks sequentially. The resulting process is appended to the parent process’s steps.

Parameters:

description (Description) – Optional description for the sequential block

Returns:

Builder instance for the sequential process

Return type:

BuilderIterator

Yield:

Builder configured for sequential execution

Example:

with builder.sequential() as b:
    b.task(task1)
    b.task(task2)
sub_process(definition, description=None)

Embed a sub-process within the current process.

Creates and configures a new process based on the provided ProcessDefinition, then appends it to the parent process’s steps. The sub-process inherits the current options and kwargs.

Parameters:
  • definition (ProcessDefinitionType) – A ProcessDefinition class defining the sub-process

  • description (Description) – Optional description for the sub-process

Example:

builder.sub_process(MySubProcess)
builder.sub_process(MySubProcess, description="Data validation")
task(func, description=None)

Add a task to the process.

Creates a Step wrapping the provided function and appends it to the process’s steps. The task inherits the builder’s kwargs and description unless overridden.

Parameters:
  • func (TaskMethod) – The function to execute as a task. Can be a regular method, a method accepting a Context parameter, or a @task decorated method.

  • description (Description) – Optional description for the task. If not provided, uses the function’s ‘description’ attribute or the builder’s description.

Example:

builder.task(my_task)
builder.task(my_task, description="Custom description")
transform(func, description=None)

Transform kwargs for tasks within a context block.

Executes the provided function to generate new kwargs, then yields a Builder with the transformed kwargs. Tasks defined within the context use the transformed arguments. The function can return either a dict or a tuple of (dict, description).

Parameters:
  • func (TransformMethod) – A function that returns a dict of new kwargs or a (dict, str) tuple. The function receives the current kwargs as parameters.

  • description (Description) – Optional description for the transformation context

Returns:

Builder instance with transformed kwargs

Return type:

BuilderIterator

Yield:

Builder configured with transformed kwargs

Example:

def double_value(self, value: int, **_):
    return {"value": value * 2}

with builder.transform(double_value) as t:
    t.task(task_with_doubled_value)

Note

The transform function can return either dict or (dict, str) where the string overrides the description for the context.

tasktronaut.context module

Context module.

class tasktronaut.context.Context

Bases: object

A context object that carries execution information during process execution.

The Context class serves as a container for state and metadata that may be needed by tasks during process execution. Tasks can optionally accept a Context parameter to access execution-specific information and maintain state across the process.

This is a minimal class designed to be extended or used as a base for storing execution context data passed between tasks in a Tasktronaut process.

tasktronaut.decorators module

Decorators module.

tasktronaut.decorators.task(func=None, /, *, description=None)

Decorator for marking methods as tasks within a process definition.

This decorator can be applied to methods with or without arguments and optionally assigns a description to the task. When applied, it attaches metadata to the decorated function that identifies it as a task and stores any provided description.

Parameters:
  • func (UndecoratedTaskMethod or None) – The undecorated task method. This parameter is only provided when the decorator is used without parentheses (bare decorator form). When None, the decorator is being used with keyword arguments and returns a decorator function.

  • description (Description or None) – An optional description of the task’s purpose and behavior. This description may be used for logging, documentation generation, or user-facing displays of the process flow.

Returns:

Either the decorated method (when used as a bare decorator) or a decorator function (when used with keyword arguments).

Return type:

DecoratedTaskMethod or Callable

Note

The decorator supports two usage patterns:

  1. Bare decorator: Applied directly without parentheses or with empty parentheses, where no description is provided.

  2. Parameterized decorator: Applied with the description keyword argument to specify a task description.

See also

UndecoratedTaskMethod – Type for undecorated task methods DecoratedTaskMethod – Type for decorated task methods Description – Type for task descriptions

tasktronaut.errors module

Errors module.

exception tasktronaut.errors.CancelProcessError

Bases: Exception

Raised to cancel the execution of a process.

This exception is used to signal that a process should be cancelled and terminated immediately. When raised during process execution, it will halt the current process and any pending tasks without completing the normal process flow.

Whilst a NonRetryableProcessError` has the same effect; to cancel the execution of a process, ``CancelProcessError causes any enqueued tasks to be removed from the queue.

Raises:

CancelProcessError – When a process needs to be cancelled.

exception tasktronaut.errors.NonRetryableProcessError

Bases: Exception

Exception to indicate a process error that should not be retried.

This exception is used to signal that a process has encountered an error condition that is permanent and attempting to retry the process would be futile. When this exception is raised during process execution, the process will terminate without attempting any configured retry logic.

Raises:

NonRetryableProcessError – When a process encounters an unrecoverable error.

tasktronaut.process module

Process module.

class tasktronaut.process.ConcurrentProcess(identifier, definition)

Bases: Process

A process that executes its steps concurrently in parallel.

ConcurrentProcess is a concrete implementation of the Process base class that enqueues steps for parallel execution. All steps depend only on the initial start job and can execute simultaneously, with a final completion job that depends on all step jobs finishing.

enqueue(backend, start_job=None)

Enqueue this process for execution on the specified backend.

Return type:

Job

class tasktronaut.process.ExecutionMode(*values)

Bases: Enum

Defines the available execution modes for the process.

The execution mode determines how tasks within a process are executed. Sequential execution runs tasks one after another in order, while concurrent execution allows tasks to run in parallel.

CONCURRENT(identifier, definition) = <class 'tasktronaut.process.ConcurrentProcess'>

Concurrent execution mode. Tasks are executed in parallel, allowing multiple tasks to run simultaneously. This mode is suitable for processes with independent tasks that can benefit from parallel execution.

SEQUENTIAL(identifier, definition) = <class 'tasktronaut.process.SequentialProcess'>

Sequential execution mode. Tasks are executed one at a time in the order they are defined. Each task completes before the next begins. This is the default execution mode for processes.

class tasktronaut.process.Process(identifier, definition)

Bases: ABC

Abstract base class for process execution workflows.

A Process represents an executable workflow composed of multiple steps that can be enqueued and executed by a backend. Processes are created with an identifier and a process definition that specifies the workflow structure.

This is an abstract base class that must be subclassed to implement specific execution strategies (e.g., sequential, concurrent).

Parameters:
  • identifier (str) – Unique identifier for this process instance

  • definition (Type[ProcessDefinition]) – The ProcessDefinition class that defines the workflow structure

Variables:
  • identifier – Unique identifier for this process instance

  • definition – The ProcessDefinition class associated with this process

  • steps – Collection of steps that comprise this process workflow

abstractmethod enqueue(backend, start_job=None)

Enqueue this process for execution on the specified backend.

This abstract method must be implemented by subclasses to define how the process and its steps are queued for execution. Different execution modes (sequential, concurrent) will have different enqueueing strategies.

Parameters:
  • backend (Backend) – The execution backend that will run the process

  • start_job (Optional[Job]) – Optional job to execute before this process begins. Used for chaining processes together.

Returns:

A Job representing the enqueued process execution

Return type:

Job

Raises:

NotImplementedError – If subclass does not implement this method

Note

Subclasses must implement this method to define their specific execution strategy (sequential vs concurrent ordering, dependency management, etc.)

property is_process: bool

Check if this object is a Process.

This property provides a consistent way to identify Process instances throughout the framework.

Returns:

Always returns True

Return type:

bool

class tasktronaut.process.ProcessDefinition

Bases: ABC

Base class for defining workflow processes in Tasktronaut.

This class serves as the foundation for creating custom process definitions. Subclasses must implement the define_process() method to specify the workflow structure using the provided Builder instance.

Variables:
  • description – Optional description of the process. Defaults to None.

  • execution_mode – The execution mode for the process (SEQUENTIAL or CONCURRENT). Defaults to SEQUENTIAL.

Example:

class MyProcess(ProcessDefinition):
    execution_mode = ExecutionMode.SEQUENTIAL

    def define_process(self, builder: Builder):
        builder.task(self.my_task)

    @task
    def my_task(self, context, **kwargs):
        print("Executing task")
around_task(identifier, step, description, context)

Lifecycle hook called around an executing a task. It is a context manager that wraps individual task execution.

This method provides a context for task execution, allowing pre- and post-task actions such as logging, metrics collection, or resource management. Override to implement custom behavior around task execution.

Parameters:
  • identifier (str) – The unique identifier of the process instance.

  • step (str) – The step identifier within the process.

  • description (Description) – Human-readable description of the task being executed.

  • context (Context) – The context for the invocation of the task. This object can be used to pass additional metadata to the task if needed.

Yield:

Control is yielded during task execution.

Return type:

Generator[None, None, None]

Example:

@contextmanager
def around_task(self, identifier, step, description, context):
    start_time = time.time()
    with super().around_task(context, identifier, step, description):
        yield
    duration = time.time() - start_time
    logger.info(f"Task took {duration:.2f}s")
Note:

The default implementation logs when task execution begins and completes.

classmethod build(identifier=None, options=None, **kwargs)

Build and return a Process instance from this definition.

This class method constructs a complete process by instantiating the definition class, invoking define_process(), and returning the fully configured process.

Parameters:
  • identifier (Optional[str]) – Unique identifier for the process instance. If not provided, a random UUID hex string is generated.

  • options (Optional[Options]) – Configuration options to pass to the builder for conditional process logic.

  • kwargs (Dict[str, Any]) – Additional keyword arguments passed to tasks during execution.

Returns:

A fully configured process ready for execution.

Return type:

Process

Example:

process = MyProcess.build(
    identifier="my-process-001",
    options={"debug": True},
    user_id=123
)
abstractmethod define_process(builder)

Define the workflow process structure.

This method must be implemented by subclasses to specify the tasks and execution flow of the process using the provided builder.

Parameters:

builder (Builder) – The builder instance used to construct the process workflow.

Raises:

NotImplementedError – If not implemented by subclass.

Example:

def define_process(self, builder: Builder):
    builder.task(self.first_task)
    with builder.concurrent():
        builder.task(self.second_task)
        builder.task(self.third_task)
    builder.task(self.forth_task)
description: Optional[str] = None

Optional description of the process. Defaults to None.

execution_mode(identifier, definition): ExecutionMode = <class 'tasktronaut.process.SequentialProcess'>

The execution mode for the process (SEQUENTIAL or CONCURRENT).

on_cancelled(identifier)

Lifecycle hook called when the process is cancelled.

This method is invoked automatically when process execution is deliberately cancelled before completion. Override to implement custom cancellation logic such as cleanup or state restoration.

Parameters:

identifier (str) – The unique identifier of the process instance.

Note:

The default implementation logs a warning message indicating the process was cancelled.

on_completed(identifier)

Lifecycle hook called when the process completes successfully.

This method is invoked automatically when all process tasks complete without errors. Override to implement custom completion logic such as cleanup, notifications, or result processing.

Parameters:

identifier (str) – The unique identifier of the process instance.

Note:

The default implementation logs an informational message indicating the process has completed.

on_failed(identifier, step, e)

Lifecycle hook called when the process fails due to an exception.

This method is invoked automatically when a task raises an exception during process execution. Override to implement custom error handling such as rollback operations, error notifications, or recovery attempts.

Parameters:
  • identifier (str) – The unique identifier of the process instance.

  • step (str) – The step identifier where the failure occurred.

  • e (Exception) – The exception that caused the failure.

Note:

The default implementation logs an error message with the exception details.

on_started(identifier)

Lifecycle hook called when the process starts execution.

This method is invoked automatically when process execution begins. Override to implement custom startup logic such as resource initialization or logging.

Parameters:

identifier (str) – The unique identifier of the process instance.

Note:

The default implementation logs an informational message indicating the process has started.

class tasktronaut.process.SequentialProcess(identifier, definition)

Bases: Process

A process that executes its steps in sequential order.

SequentialProcess is a concrete implementation of the Process base class that enqueues steps for execution in a strict sequential manner. Each step must complete before the next step begins, creating a chain of dependent jobs.

enqueue(backend, start_job=None)

Enqueue this process for execution on the specified backend.

Return type:

Job

tasktronaut.steps module

Steps module.

class tasktronaut.steps.Step(func, description, kwargs)

Bases: object

Represents a single executable step in a process.

A Step encapsulates a task function along with its metadata and execution parameters.

Steps form the building blocks of process definitions and are executed by the process engine according to the defined execution mode.

Variables:
  • func – The callable function or method that performs the task

  • description – Human-readable description of the step

  • kwargs – Keyword arguments to be passed to the function during execution

Parameters:
  • func (TaskMethod) – The task function to execute

  • description (Description) – A description of what this step does

  • kwargs (Dict[str, Any]) – Additional keyword arguments for the task function

property is_process: bool

Determine if this step represents a process.

Returns:

Always returns False since a Step is not a Process

Return type:

bool

class tasktronaut.steps.Steps(iterable=(), /)

Bases: list[Step | Process]

A container for holding a collection of steps and sub-processes.

This class extends the built-in list type to provide a typed container that holds both Step instances and Process instances. It is used internally to manage the collection of execution items within a process definition.

Inherits:

list[Union[Step, Process]]

tasktronaut.types module

Type definitions and protocols.

tasktronaut.types.BuilderIterator

Type alias for an iterator of Builder instances.

Represents an iterator that yields Builder objects, typically used in control flow patterns such as sequential, concurrent, or each blocks.

Type:

Iterator[Builder]

alias of Iterator[Builder]

class tasktronaut.types.DecoratedTaskMethod(*args, **kwargs)

Bases: Protocol

Protocol for a task method decorated with the @task decorator.

Defines the interface that decorated task methods must implement. Decorated tasks have metadata attributes and can optionally accept a Context parameter.

Variables:
  • __name__ – The name of the task method.

  • description – An optional human-readable description of the task.

description: Optional[str]
tasktronaut.types.Description

Type alias for an optional task or item description.

Used to provide human-readable descriptions for tasks, items in iterations, or other process elements.

Type:

Optional[str]

alias of str | None

tasktronaut.types.ForEachMethod

Type alias for the each method.

Represents a callable that accepts any arguments and keyword arguments, and returns an iterator of task arguments. Used to define iteration methods for the each pattern.

Type:

Callable[…, ForEachReturn]

alias of Callable[[…], Iterator[Dict[str, Any] | Tuple[Dict[str, Any], str | None]]]

tasktronaut.types.ForEachReturn

Type alias for the return type of the each method.

Represents an iterator that yields task arguments. Used in the each pattern to iterate over a collection of items, yielding arguments for each iteration.

Type:

Iterator[TaskArgs]

alias of Iterator[Dict[str, Any] | Tuple[Dict[str, Any], str | None]]

tasktronaut.types.Options

Type alias for process options.

Represents configuration options that can be passed to a process, either as a SimpleNamespace object or as a dictionary.

Type:

Union[SimpleNamespace, Dict[str, Any]]

alias of SimpleNamespace | Dict[str, Any]

tasktronaut.types.ProcessDefinitionType

Type alias for a ProcessDefinition class.

Represents a class type that inherits from ProcessDefinition, used when specifying sub-processes or other process references.

Type:

Type[ProcessDefinition]

alias of Type[ProcessDefinition]

tasktronaut.types.TaskArgs

Type alias for task arguments.

Represents arguments passed to a task, which can be either a dictionary of arguments or a tuple containing a dictionary of arguments and an optional description string.

Type:

Union[Dict[str, Any], Tuple[Dict[str, Any], Description]]

alias of Dict[str, Any] | Tuple[Dict[str, Any], str | None]

tasktronaut.types.TaskMethod

Type alias for any task method.

Represents either a decorated or undecorated task method. This is the primary type used when registering methods as tasks with a builder.

Type:

Union[UndecoratedTaskMethod, DecoratedTaskMethod]

alias of Callable[[…], None] | DecoratedTaskMethod

tasktronaut.types.TransformMethod

Type alias for a transformation method.

Represents a callable that accepts any arguments and keyword arguments, and returns task arguments. Used to define transformation methods for the transform pattern to modify arguments before task execution.

Type:

Callable[…, TaskArgs]

alias of Callable[[…], Dict[str, Any] | Tuple[Dict[str, Any], str | None]]

tasktronaut.types.UndecoratedTaskMethod

Type alias for an undecorated task method.

Represents a callable that accepts any arguments and keyword arguments, and returns None. These are regular methods that can be registered as tasks without explicit decoration.

Type:

Callable[…, None]

alias of Callable[[…], None]

class tasktronaut.types.Value

Generic type variable for generic operations.

Used in generic function or class definitions where a type parameter is needed to represent any arbitrary type.

Type:

TypeVar

alias of TypeVar(‘Value’)

tasktronaut.utils module

Utility and supporting methods.

tasktronaut.utils.load_definition(module_name, definition_class)

Dynamically load and instantiate a process definition class.

This function provides dynamic loading of ProcessDefinition subclasses from specified modules. It imports the module by name, retrieves the definition class from that module, and returns a new instance of the class.

Parameters:
  • module_name (str) – Fully qualified module name to import

  • definition_class (str) – Name of the ProcessDefinition class to load from the module

Returns:

A new instance of the specified ProcessDefinition class

Return type:

ProcessDefinition

Raises:
  • ImportError – If the module cannot be imported

  • AttributeError – If the definition_class does not exist in the module

  • TypeError – If the loaded class cannot be instantiated

tasktronaut.utils.to_dict(**kwargs)

Convert keyword arguments to a dictionary.

This utility function packages keyword arguments into a dictionary, providing a convenient way to convert variadic keyword arguments into a dictionary structure.

Parameters:

kwargs (Dict[str, Any]) – Arbitrary keyword arguments to convert

Returns:

Dictionary containing all passed keyword arguments

Return type:

dict[str, Any]

tasktronaut.utils.to_kwargs(base, **kwargs)

Merge a base dictionary with additional keyword arguments.

Creates a new dictionary by copying the base dictionary and updating it with the provided keyword arguments. The original base dictionary is not modified. Keyword arguments take precedence over base dictionary values in case of key conflicts.

Parameters:
  • base (dict[str, Any]) – Base dictionary to copy and update

  • kwargs (Dict[str, Any]) – Keyword arguments to merge into the base dictionary

Returns:

New dictionary containing merged key-value pairs

Return type:

dict[str, Any]

Module contents

Tasktronaut: A Python library for defining and executing process workflows.

The tasktronaut package provides a framework for building complex process definitions with support for sequential and concurrent execution, sub-processes, conditional logic, and iterative patterns.

This module serves as the main entry point, exposing the primary public API for the library.

class tasktronaut.Backend

Bases: ABC

Abstract base class defining the interface for process execution backends.

Backends are responsible for enqueueing tasks, managing job dependencies, and executing process lifecycle callbacks. Subclasses must implement the abstract enqueue methods to integrate with specific job queue systems.

The backend handles three main phases of process execution: - Process start callbacks - Individual task execution with error handling - Process completion callbacks

abstractmethod enqueue_perform_complete(identifier, module_name, definition_class, depends_on=None)

Enqueue a process completion callback.

Schedules the execution of the process completion callback, which allows the process definition to perform finalization logic when all tasks have completed. Can depend on one or more jobs.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

  • depends_on (Optional[Union[Job, List[Job]]]) – Optional job or list of jobs that this job depends on. If provided, this job will not execute until all dependencies complete.

Returns:

Job object representing the enqueued task

Return type:

Job

abstractmethod enqueue_perform_start(identifier, module_name, definition_class, depends_on=None)

Enqueue a process start callback.

Schedules the execution of the process start callback, which allows the process definition to perform initialization logic when execution begins.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

  • depends_on (Optional[Job]) – Optional job that this job depends on. If provided, this job will not execute until the dependency completes.

Returns:

Job object representing the enqueued task

Return type:

Job

abstractmethod enqueue_perform_task(identifier, module_name, definition_class, function_name, description, kwargs, depends_on=None)

Enqueue a process task for execution.

Schedules the execution of a specific task function within a process definition. The task will be executed with the provided keyword arguments and can optionally depend on another job.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

  • function_name (str) – Name of the task method to execute

  • description (Description) – Human-readable description of the task

  • kwargs (Dict[str, Any]) – Keyword arguments to pass to the task function

  • depends_on (Optional[Job]) – Optional job that this job depends on. If provided, this job will not execute until the dependency completes.

Returns:

Job object representing the enqueued task

Return type:

Job

static perform_complete(identifier, module_name, definition_class)

Execute the process completion callback.

Loads the process definition and invokes its completion callback. This method is typically called by the backend after all process tasks have completed successfully.

It is implemented as a staticmethod, to allow for the most flexibility in backend implementations.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

static perform_start(identifier, module_name, definition_class)

Execute the process start callback.

Loads the process definition and invokes its start callback. This method is typically called by the backend after the job has been dequeued.

It is implemented as a staticmethod, to allow for the most flexibility in backend implementations.

Parameters:
  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

classmethod perform_task(job, identifier, module_name, definition_class, function_name, description, kwargs)

Execute a process task with comprehensive error handling.

Loads the process definition, creates an execution context, and invokes the specified task function. Handles three categories of errors:

  • NonRetryableProcessError: Logs error, calls on_failed callback, and re-raises

  • CancelProcessError: Cancels the job and its dependents, calls on_cancelled callback

  • Other exceptions: Logs error, calls on_failed callback, and re-raises for retry

The task execution is wrapped with the definition’s around_task context manager, allowing the definition to perform setup and teardown logic.

Parameters:
  • job (Job) – Job object that can be cancelled or have dependents deleted

  • identifier (str) – Unique identifier for the process execution instance

  • module_name (str) – Fully qualified module name containing the process definition

  • definition_class (str) – Name of the process definition class to instantiate

  • function_name (str) – Name of the task method to execute

  • description (Description) – Human-readable description of the task

  • kwargs (Dict[str, Any]) – Keyword arguments to pass to the task function

Raises:
  • NonRetryableProcessError – When task execution raises a non-retryable error

  • Exception – When task execution raises an unhandled error (retryable)

class tasktronaut.Builder(process, kwargs, options=None, description=None)

Bases: object

A builder class for constructing and configuring process workflows.

The Builder provides a fluent interface for defining task execution graphs, including sequential and concurrent execution patterns, argument validation, iteration, and transformation capabilities.

Parameters:
  • process (Process) – The process instance being built

  • kwargs (Dict[str, Any]) – Keyword arguments to pass to tasks and transformations

  • options (SimpleNamespace) – Optional configuration options for the process

  • description (Description) – Optional description for the builder context

Variables:
  • process – The process instance being constructed

  • description – Description for the current builder context

  • options – Configuration options converted to SimpleNamespace

  • kwargs – Keyword arguments for task execution

concurrent(description=None)

Create a concurrent execution block where tasks run in parallel.

This context manager creates a new ConcurrentProcess that executes its contained tasks concurrently. The resulting process is appended to the parent process’s steps.

Parameters:

description (Description) – Optional description for the concurrent block

Returns:

Builder instance for the concurrent process

Return type:

BuilderIterator

Yield:

Builder configured for concurrent execution

Example:

with builder.concurrent() as b:
    b.task(task1)
    b.task(task2)
each(func, description=None)

Iterate over items yielded by the given func method, and yields a builder for each iteration.

Executes the provided function to generate items, then yields a Builder instance for each item with updated kwargs.

The provided function can yield either a dict of kwargs or a tuple of (kwargs, description).

Parameters:
  • func (ForEachMethod) – A function that yields dicts or (dict, str) tuples. The function receives the current kwargs as parameters.

  • description (Description) – Optional default description for iterations

Returns:

Iterator of Builder instances, one per yielded item

Return type:

BuilderIterator

Yield:

Builder with kwargs updated for each iteration

Example:

def items(self, count: int, **_):
    for i in range(count):
        yield {"index": i}

for b in builder.each(self.items):
    b.task(task_for_item)

Note

The provided function can yield either dict or (dict, str) tuples, where the string provides a per-item description.

expected_arguments(**kwargs)

Validate that expected arguments are present and of the correct type.

This method checks that all specified arguments exist in the builder’s kwargs and validates their types using Pydantic type adapters. Type validation is only performed when a non-None type is specified.

Parameters:

kwargs (Dict[str, Optional[Type]]) – Mapping of argument names to expected types. Use None to skip type validation for an argument.

Raises:
  • TypeError – If a required argument is not found in kwargs

  • ValueError – If an argument’s type does not match the expected type

Note

This method does not check for additional arguments beyond those specified.

Example:

builder.expected_arguments(
    foo=str,
    bar=int,
    baz=None,  # No type checking
    qux=Optional[datetime.date]
)
option(name, default=None)

Retrieve an option value with optional default.

Accesses configuration options passed to the builder, returning the specified option’s value or a default if the option is not set.

Parameters:
  • name (str) – The name of the option to retrieve

  • default (Optional[Value]) – Default value to return if option is not found

Returns:

The option’s value or the default

Return type:

Value

Example:

if builder.option("debug", False):
    builder.task(verbose_logging)
sequential(description=None)

Create a sequential execution block where tasks run one after another.

This context manager creates a new SequentialProcess that executes its contained tasks sequentially. The resulting process is appended to the parent process’s steps.

Parameters:

description (Description) – Optional description for the sequential block

Returns:

Builder instance for the sequential process

Return type:

BuilderIterator

Yield:

Builder configured for sequential execution

Example:

with builder.sequential() as b:
    b.task(task1)
    b.task(task2)
sub_process(definition, description=None)

Embed a sub-process within the current process.

Creates and configures a new process based on the provided ProcessDefinition, then appends it to the parent process’s steps. The sub-process inherits the current options and kwargs.

Parameters:
  • definition (ProcessDefinitionType) – A ProcessDefinition class defining the sub-process

  • description (Description) – Optional description for the sub-process

Example:

builder.sub_process(MySubProcess)
builder.sub_process(MySubProcess, description="Data validation")
task(func, description=None)

Add a task to the process.

Creates a Step wrapping the provided function and appends it to the process’s steps. The task inherits the builder’s kwargs and description unless overridden.

Parameters:
  • func (TaskMethod) – The function to execute as a task. Can be a regular method, a method accepting a Context parameter, or a @task decorated method.

  • description (Description) – Optional description for the task. If not provided, uses the function’s ‘description’ attribute or the builder’s description.

Example:

builder.task(my_task)
builder.task(my_task, description="Custom description")
transform(func, description=None)

Transform kwargs for tasks within a context block.

Executes the provided function to generate new kwargs, then yields a Builder with the transformed kwargs. Tasks defined within the context use the transformed arguments. The function can return either a dict or a tuple of (dict, description).

Parameters:
  • func (TransformMethod) – A function that returns a dict of new kwargs or a (dict, str) tuple. The function receives the current kwargs as parameters.

  • description (Description) – Optional description for the transformation context

Returns:

Builder instance with transformed kwargs

Return type:

BuilderIterator

Yield:

Builder configured with transformed kwargs

Example:

def double_value(self, value: int, **_):
    return {"value": value * 2}

with builder.transform(double_value) as t:
    t.task(task_with_doubled_value)

Note

The transform function can return either dict or (dict, str) where the string overrides the description for the context.

class tasktronaut.Context

Bases: object

A context object that carries execution information during process execution.

The Context class serves as a container for state and metadata that may be needed by tasks during process execution. Tasks can optionally accept a Context parameter to access execution-specific information and maintain state across the process.

This is a minimal class designed to be extended or used as a base for storing execution context data passed between tasks in a Tasktronaut process.

class tasktronaut.ExecutionMode(*values)

Bases: Enum

Defines the available execution modes for the process.

The execution mode determines how tasks within a process are executed. Sequential execution runs tasks one after another in order, while concurrent execution allows tasks to run in parallel.

CONCURRENT(identifier, definition) = <class 'tasktronaut.process.ConcurrentProcess'>

Concurrent execution mode. Tasks are executed in parallel, allowing multiple tasks to run simultaneously. This mode is suitable for processes with independent tasks that can benefit from parallel execution.

SEQUENTIAL(identifier, definition) = <class 'tasktronaut.process.SequentialProcess'>

Sequential execution mode. Tasks are executed one at a time in the order they are defined. Each task completes before the next begins. This is the default execution mode for processes.

class tasktronaut.ProcessDefinition

Bases: ABC

Base class for defining workflow processes in Tasktronaut.

This class serves as the foundation for creating custom process definitions. Subclasses must implement the define_process() method to specify the workflow structure using the provided Builder instance.

Variables:
  • description – Optional description of the process. Defaults to None.

  • execution_mode – The execution mode for the process (SEQUENTIAL or CONCURRENT). Defaults to SEQUENTIAL.

Example:

class MyProcess(ProcessDefinition):
    execution_mode = ExecutionMode.SEQUENTIAL

    def define_process(self, builder: Builder):
        builder.task(self.my_task)

    @task
    def my_task(self, context, **kwargs):
        print("Executing task")
around_task(identifier, step, description, context)

Lifecycle hook called around an executing a task. It is a context manager that wraps individual task execution.

This method provides a context for task execution, allowing pre- and post-task actions such as logging, metrics collection, or resource management. Override to implement custom behavior around task execution.

Parameters:
  • identifier (str) – The unique identifier of the process instance.

  • step (str) – The step identifier within the process.

  • description (Description) – Human-readable description of the task being executed.

  • context (Context) – The context for the invocation of the task. This object can be used to pass additional metadata to the task if needed.

Yield:

Control is yielded during task execution.

Return type:

Generator[None, None, None]

Example:

@contextmanager
def around_task(self, identifier, step, description, context):
    start_time = time.time()
    with super().around_task(context, identifier, step, description):
        yield
    duration = time.time() - start_time
    logger.info(f"Task took {duration:.2f}s")
Note:

The default implementation logs when task execution begins and completes.

classmethod build(identifier=None, options=None, **kwargs)

Build and return a Process instance from this definition.

This class method constructs a complete process by instantiating the definition class, invoking define_process(), and returning the fully configured process.

Parameters:
  • identifier (Optional[str]) – Unique identifier for the process instance. If not provided, a random UUID hex string is generated.

  • options (Optional[Options]) – Configuration options to pass to the builder for conditional process logic.

  • kwargs (Dict[str, Any]) – Additional keyword arguments passed to tasks during execution.

Returns:

A fully configured process ready for execution.

Return type:

Process

Example:

process = MyProcess.build(
    identifier="my-process-001",
    options={"debug": True},
    user_id=123
)
abstractmethod define_process(builder)

Define the workflow process structure.

This method must be implemented by subclasses to specify the tasks and execution flow of the process using the provided builder.

Parameters:

builder (Builder) – The builder instance used to construct the process workflow.

Raises:

NotImplementedError – If not implemented by subclass.

Example:

def define_process(self, builder: Builder):
    builder.task(self.first_task)
    with builder.concurrent():
        builder.task(self.second_task)
        builder.task(self.third_task)
    builder.task(self.forth_task)
description: Optional[str] = None

Optional description of the process. Defaults to None.

execution_mode(identifier, definition): ExecutionMode = <class 'tasktronaut.process.SequentialProcess'>

The execution mode for the process (SEQUENTIAL or CONCURRENT).

on_cancelled(identifier)

Lifecycle hook called when the process is cancelled.

This method is invoked automatically when process execution is deliberately cancelled before completion. Override to implement custom cancellation logic such as cleanup or state restoration.

Parameters:

identifier (str) – The unique identifier of the process instance.

Note:

The default implementation logs a warning message indicating the process was cancelled.

on_completed(identifier)

Lifecycle hook called when the process completes successfully.

This method is invoked automatically when all process tasks complete without errors. Override to implement custom completion logic such as cleanup, notifications, or result processing.

Parameters:

identifier (str) – The unique identifier of the process instance.

Note:

The default implementation logs an informational message indicating the process has completed.

on_failed(identifier, step, e)

Lifecycle hook called when the process fails due to an exception.

This method is invoked automatically when a task raises an exception during process execution. Override to implement custom error handling such as rollback operations, error notifications, or recovery attempts.

Parameters:
  • identifier (str) – The unique identifier of the process instance.

  • step (str) – The step identifier where the failure occurred.

  • e (Exception) – The exception that caused the failure.

Note:

The default implementation logs an error message with the exception details.

on_started(identifier)

Lifecycle hook called when the process starts execution.

This method is invoked automatically when process execution begins. Override to implement custom startup logic such as resource initialization or logging.

Parameters:

identifier (str) – The unique identifier of the process instance.

Note:

The default implementation logs an informational message indicating the process has started.

tasktronaut.task(func=None, /, *, description=None)

Decorator for marking methods as tasks within a process definition.

This decorator can be applied to methods with or without arguments and optionally assigns a description to the task. When applied, it attaches metadata to the decorated function that identifies it as a task and stores any provided description.

Parameters:
  • func (UndecoratedTaskMethod or None) – The undecorated task method. This parameter is only provided when the decorator is used without parentheses (bare decorator form). When None, the decorator is being used with keyword arguments and returns a decorator function.

  • description (Description or None) – An optional description of the task’s purpose and behavior. This description may be used for logging, documentation generation, or user-facing displays of the process flow.

Returns:

Either the decorated method (when used as a bare decorator) or a decorator function (when used with keyword arguments).

Return type:

DecoratedTaskMethod or Callable

Note

The decorator supports two usage patterns:

  1. Bare decorator: Applied directly without parentheses or with empty parentheses, where no description is provided.

  2. Parameterized decorator: Applied with the description keyword argument to specify a task description.

See also

UndecoratedTaskMethod – Type for undecorated task methods DecoratedTaskMethod – Type for decorated task methods Description – Type for task descriptions