tasktronaut package
Subpackages
Submodules
tasktronaut.backend module
Backend module.
- class tasktronaut.backend.Backend
Bases:
ABCAbstract base class defining the interface for process execution backends.
Backends are responsible for enqueueing tasks, managing job dependencies, and executing process lifecycle callbacks. Subclasses must implement the abstract enqueue methods to integrate with specific job queue systems.
The backend handles three main phases of process execution: - Process start callbacks - Individual task execution with error handling - Process completion callbacks
- abstractmethod enqueue_perform_complete(identifier, module_name, definition_class, depends_on=None)
Enqueue a process completion callback.
Schedules the execution of the process completion callback, which allows the process definition to perform finalization logic when all tasks have completed. Can depend on one or more jobs.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
depends_on (Optional[Union[Job, List[Job]]]) – Optional job or list of jobs that this job depends on. If provided, this job will not execute until all dependencies complete.
- Returns:
Job object representing the enqueued task
- Return type:
- abstractmethod enqueue_perform_start(identifier, module_name, definition_class, depends_on=None)
Enqueue a process start callback.
Schedules the execution of the process start callback, which allows the process definition to perform initialization logic when execution begins.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
depends_on (Optional[Job]) – Optional job that this job depends on. If provided, this job will not execute until the dependency completes.
- Returns:
Job object representing the enqueued task
- Return type:
- abstractmethod enqueue_perform_task(identifier, module_name, definition_class, function_name, description, kwargs, depends_on=None)
Enqueue a process task for execution.
Schedules the execution of a specific task function within a process definition. The task will be executed with the provided keyword arguments and can optionally depend on another job.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
function_name (str) – Name of the task method to execute
description (Description) – Human-readable description of the task
kwargs (Dict[str, Any]) – Keyword arguments to pass to the task function
depends_on (Optional[Job]) – Optional job that this job depends on. If provided, this job will not execute until the dependency completes.
- Returns:
Job object representing the enqueued task
- Return type:
- static perform_complete(identifier, module_name, definition_class)
Execute the process completion callback.
Loads the process definition and invokes its completion callback. This method is typically called by the backend after all process tasks have completed successfully.
It is implemented as a staticmethod, to allow for the most flexibility in backend implementations.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
- static perform_start(identifier, module_name, definition_class)
Execute the process start callback.
Loads the process definition and invokes its start callback. This method is typically called by the backend after the job has been dequeued.
It is implemented as a staticmethod, to allow for the most flexibility in backend implementations.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
- classmethod perform_task(job, identifier, module_name, definition_class, function_name, description, kwargs)
Execute a process task with comprehensive error handling.
Loads the process definition, creates an execution context, and invokes the specified task function. Handles three categories of errors:
NonRetryableProcessError: Logs error, calls on_failed callback, and re-raises
CancelProcessError: Cancels the job and its dependents, calls on_cancelled callback
Other exceptions: Logs error, calls on_failed callback, and re-raises for retry
The task execution is wrapped with the definition’s around_task context manager, allowing the definition to perform setup and teardown logic.
- Parameters:
job (Job) – Job object that can be cancelled or have dependents deleted
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
function_name (str) – Name of the task method to execute
description (Description) – Human-readable description of the task
kwargs (Dict[str, Any]) – Keyword arguments to pass to the task function
- Raises:
NonRetryableProcessError – When task execution raises a non-retryable error
Exception – When task execution raises an unhandled error (retryable)
- class tasktronaut.backend.Job(*args, **kwargs)
Bases:
ProtocolProtocol defining the interface for a job object.
A job represents an enqueued unit of work in the backend system and provides methods to manage its lifecycle.
- cancel()
Cancel the execution of this job.
Stops the job from being executed if it hasn’t already started.
- delete_dependents()
Delete all jobs that depend on this job.
Removes any downstream jobs that were scheduled to run after completion of this job.
tasktronaut.builder module
Builder module.
- class tasktronaut.builder.Builder(process, kwargs, options=None, description=None)
Bases:
objectA builder class for constructing and configuring process workflows.
The Builder provides a fluent interface for defining task execution graphs, including sequential and concurrent execution patterns, argument validation, iteration, and transformation capabilities.
- Parameters:
process (Process) – The process instance being built
kwargs (Dict[str, Any]) – Keyword arguments to pass to tasks and transformations
options (SimpleNamespace) – Optional configuration options for the process
description (Description) – Optional description for the builder context
- Variables:
process – The process instance being constructed
description – Description for the current builder context
options – Configuration options converted to SimpleNamespace
kwargs – Keyword arguments for task execution
- concurrent(description=None)
Create a concurrent execution block where tasks run in parallel.
This context manager creates a new ConcurrentProcess that executes its contained tasks concurrently. The resulting process is appended to the parent process’s steps.
- Parameters:
description (Description) – Optional description for the concurrent block
- Returns:
Builder instance for the concurrent process
- Return type:
BuilderIterator
- Yield:
Builder configured for concurrent execution
Example:
with builder.concurrent() as b: b.task(task1) b.task(task2)
- each(func, description=None)
Iterate over items yielded by the given
funcmethod, and yields a builder for each iteration.Executes the provided function to generate items, then yields a Builder instance for each item with updated kwargs.
The provided function can yield either a dict of kwargs or a tuple of (kwargs, description).
- Parameters:
func (ForEachMethod) – A function that yields dicts or (dict, str) tuples. The function receives the current kwargs as parameters.
description (Description) – Optional default description for iterations
- Returns:
Iterator of Builder instances, one per yielded item
- Return type:
BuilderIterator
- Yield:
Builder with kwargs updated for each iteration
Example:
def items(self, count: int, **_): for i in range(count): yield {"index": i} for b in builder.each(self.items): b.task(task_for_item)
Note
The provided function can yield either
dictor(dict, str)tuples, where the string provides a per-item description.
- expected_arguments(**kwargs)
Validate that expected arguments are present and of the correct type.
This method checks that all specified arguments exist in the builder’s kwargs and validates their types using Pydantic type adapters. Type validation is only performed when a non-None type is specified.
- Parameters:
kwargs (Dict[str, Optional[Type]]) – Mapping of argument names to expected types. Use None to skip type validation for an argument.
- Raises:
TypeError – If a required argument is not found in kwargs
ValueError – If an argument’s type does not match the expected type
Note
This method does not check for additional arguments beyond those specified.
Example:
builder.expected_arguments( foo=str, bar=int, baz=None, # No type checking qux=Optional[datetime.date] )
- option(name, default=None)
Retrieve an option value with optional default.
Accesses configuration options passed to the builder, returning the specified option’s value or a default if the option is not set.
- Parameters:
name (str) – The name of the option to retrieve
default (Optional[Value]) – Default value to return if option is not found
- Returns:
The option’s value or the default
- Return type:
Example:
if builder.option("debug", False): builder.task(verbose_logging)
- sequential(description=None)
Create a sequential execution block where tasks run one after another.
This context manager creates a new SequentialProcess that executes its contained tasks sequentially. The resulting process is appended to the parent process’s steps.
- Parameters:
description (Description) – Optional description for the sequential block
- Returns:
Builder instance for the sequential process
- Return type:
BuilderIterator
- Yield:
Builder configured for sequential execution
Example:
with builder.sequential() as b: b.task(task1) b.task(task2)
- sub_process(definition, description=None)
Embed a sub-process within the current process.
Creates and configures a new process based on the provided
ProcessDefinition, then appends it to the parent process’s steps. The sub-process inherits the current options and kwargs.- Parameters:
definition (ProcessDefinitionType) – A ProcessDefinition class defining the sub-process
description (Description) – Optional description for the sub-process
Example:
builder.sub_process(MySubProcess) builder.sub_process(MySubProcess, description="Data validation")
- task(func, description=None)
Add a task to the process.
Creates a Step wrapping the provided function and appends it to the process’s steps. The task inherits the builder’s kwargs and description unless overridden.
- Parameters:
func (TaskMethod) – The function to execute as a task. Can be a regular method, a method accepting a Context parameter, or a
@taskdecorated method.description (Description) – Optional description for the task. If not provided, uses the function’s ‘description’ attribute or the builder’s description.
Example:
builder.task(my_task) builder.task(my_task, description="Custom description")
- transform(func, description=None)
Transform kwargs for tasks within a context block.
Executes the provided function to generate new kwargs, then yields a Builder with the transformed kwargs. Tasks defined within the context use the transformed arguments. The function can return either a dict or a tuple of (dict, description).
- Parameters:
func (TransformMethod) – A function that returns a dict of new kwargs or a (dict, str) tuple. The function receives the current kwargs as parameters.
description (Description) – Optional description for the transformation context
- Returns:
Builder instance with transformed kwargs
- Return type:
BuilderIterator
- Yield:
Builder configured with transformed kwargs
Example:
def double_value(self, value: int, **_): return {"value": value * 2} with builder.transform(double_value) as t: t.task(task_with_doubled_value)
Note
The transform function can return either
dictor(dict, str)where the string overrides the description for the context.
tasktronaut.context module
Context module.
- class tasktronaut.context.Context
Bases:
objectA context object that carries execution information during process execution.
The Context class serves as a container for state and metadata that may be needed by tasks during process execution. Tasks can optionally accept a Context parameter to access execution-specific information and maintain state across the process.
This is a minimal class designed to be extended or used as a base for storing execution context data passed between tasks in a Tasktronaut process.
tasktronaut.decorators module
Decorators module.
- tasktronaut.decorators.task(func=None, /, *, description=None)
Decorator for marking methods as tasks within a process definition.
This decorator can be applied to methods with or without arguments and optionally assigns a description to the task. When applied, it attaches metadata to the decorated function that identifies it as a task and stores any provided description.
- Parameters:
func (
UndecoratedTaskMethodor None) – The undecorated task method. This parameter is only provided when the decorator is used without parentheses (bare decorator form). When None, the decorator is being used with keyword arguments and returns a decorator function.description (
Descriptionor None) – An optional description of the task’s purpose and behavior. This description may be used for logging, documentation generation, or user-facing displays of the process flow.
- Returns:
Either the decorated method (when used as a bare decorator) or a decorator function (when used with keyword arguments).
- Return type:
DecoratedTaskMethodorCallable
Note
The decorator supports two usage patterns:
Bare decorator: Applied directly without parentheses or with empty parentheses, where no description is provided.
Parameterized decorator: Applied with the
descriptionkeyword argument to specify a task description.
See also
UndecoratedTaskMethod– Type for undecorated task methodsDecoratedTaskMethod– Type for decorated task methodsDescription– Type for task descriptions
tasktronaut.errors module
Errors module.
- exception tasktronaut.errors.CancelProcessError
Bases:
ExceptionRaised to cancel the execution of a process.
This exception is used to signal that a process should be cancelled and terminated immediately. When raised during process execution, it will halt the current process and any pending tasks without completing the normal process flow.
Whilst a
NonRetryableProcessError` has the same effect; to cancel the execution of a process, ``CancelProcessErrorcauses any enqueued tasks to be removed from the queue.- Raises:
CancelProcessError – When a process needs to be cancelled.
- exception tasktronaut.errors.NonRetryableProcessError
Bases:
ExceptionException to indicate a process error that should not be retried.
This exception is used to signal that a process has encountered an error condition that is permanent and attempting to retry the process would be futile. When this exception is raised during process execution, the process will terminate without attempting any configured retry logic.
- Raises:
NonRetryableProcessError – When a process encounters an unrecoverable error.
tasktronaut.process module
Process module.
- class tasktronaut.process.ConcurrentProcess(identifier, definition)
Bases:
ProcessA process that executes its steps concurrently in parallel.
ConcurrentProcess is a concrete implementation of the Process base class that enqueues steps for parallel execution. All steps depend only on the initial start job and can execute simultaneously, with a final completion job that depends on all step jobs finishing.
- class tasktronaut.process.ExecutionMode(*values)
Bases:
EnumDefines the available execution modes for the process.
The execution mode determines how tasks within a process are executed. Sequential execution runs tasks one after another in order, while concurrent execution allows tasks to run in parallel.
- CONCURRENT(identifier, definition) = <class 'tasktronaut.process.ConcurrentProcess'>
Concurrent execution mode. Tasks are executed in parallel, allowing multiple tasks to run simultaneously. This mode is suitable for processes with independent tasks that can benefit from parallel execution.
- SEQUENTIAL(identifier, definition) = <class 'tasktronaut.process.SequentialProcess'>
Sequential execution mode. Tasks are executed one at a time in the order they are defined. Each task completes before the next begins. This is the default execution mode for processes.
- class tasktronaut.process.Process(identifier, definition)
Bases:
ABCAbstract base class for process execution workflows.
A Process represents an executable workflow composed of multiple steps that can be enqueued and executed by a backend. Processes are created with an identifier and a process definition that specifies the workflow structure.
This is an abstract base class that must be subclassed to implement specific execution strategies (e.g., sequential, concurrent).
- Parameters:
identifier (str) – Unique identifier for this process instance
definition (Type[ProcessDefinition]) – The ProcessDefinition class that defines the workflow structure
- Variables:
identifier – Unique identifier for this process instance
definition – The ProcessDefinition class associated with this process
steps – Collection of steps that comprise this process workflow
- abstractmethod enqueue(backend, start_job=None)
Enqueue this process for execution on the specified backend.
This abstract method must be implemented by subclasses to define how the process and its steps are queued for execution. Different execution modes (sequential, concurrent) will have different enqueueing strategies.
- Parameters:
- Returns:
A Job representing the enqueued process execution
- Return type:
- Raises:
NotImplementedError – If subclass does not implement this method
Note
Subclasses must implement this method to define their specific execution strategy (sequential vs concurrent ordering, dependency management, etc.)
- property is_process: bool
Check if this object is a Process.
This property provides a consistent way to identify Process instances throughout the framework.
- Returns:
Always returns True
- Return type:
bool
- class tasktronaut.process.ProcessDefinition
Bases:
ABCBase class for defining workflow processes in Tasktronaut.
This class serves as the foundation for creating custom process definitions. Subclasses must implement the
define_process()method to specify the workflow structure using the providedBuilderinstance.- Variables:
description – Optional description of the process. Defaults to None.
execution_mode – The execution mode for the process (SEQUENTIAL or CONCURRENT). Defaults to SEQUENTIAL.
Example:
class MyProcess(ProcessDefinition): execution_mode = ExecutionMode.SEQUENTIAL def define_process(self, builder: Builder): builder.task(self.my_task) @task def my_task(self, context, **kwargs): print("Executing task")
- around_task(identifier, step, description, context)
Lifecycle hook called around an executing a task. It is a context manager that wraps individual task execution.
This method provides a context for task execution, allowing pre- and post-task actions such as logging, metrics collection, or resource management. Override to implement custom behavior around task execution.
- Parameters:
identifier (str) – The unique identifier of the process instance.
step (str) – The step identifier within the process.
description (Description) – Human-readable description of the task being executed.
context (Context) – The context for the invocation of the task. This object can be used to pass additional metadata to the task if needed.
- Yield:
Control is yielded during task execution.
- Return type:
Generator[None, None, None]
Example:
@contextmanager def around_task(self, identifier, step, description, context): start_time = time.time() with super().around_task(context, identifier, step, description): yield duration = time.time() - start_time logger.info(f"Task took {duration:.2f}s")
- Note:
The default implementation logs when task execution begins and completes.
- classmethod build(identifier=None, options=None, **kwargs)
Build and return a Process instance from this definition.
This class method constructs a complete process by instantiating the definition class, invoking
define_process(), and returning the fully configured process.- Parameters:
identifier (Optional[str]) – Unique identifier for the process instance. If not provided, a random UUID hex string is generated.
options (Optional[Options]) – Configuration options to pass to the builder for conditional process logic.
kwargs (Dict[str, Any]) – Additional keyword arguments passed to tasks during execution.
- Returns:
A fully configured process ready for execution.
- Return type:
Example:
process = MyProcess.build( identifier="my-process-001", options={"debug": True}, user_id=123 )
- abstractmethod define_process(builder)
Define the workflow process structure.
This method must be implemented by subclasses to specify the tasks and execution flow of the process using the provided builder.
- Parameters:
builder (Builder) – The builder instance used to construct the process workflow.
- Raises:
NotImplementedError – If not implemented by subclass.
Example:
def define_process(self, builder: Builder): builder.task(self.first_task) with builder.concurrent(): builder.task(self.second_task) builder.task(self.third_task) builder.task(self.forth_task)
-
description:
Optional[str] = None Optional description of the process. Defaults to None.
-
execution_mode(identifier, definition):
ExecutionMode= <class 'tasktronaut.process.SequentialProcess'> The execution mode for the process (SEQUENTIAL or CONCURRENT).
- on_cancelled(identifier)
Lifecycle hook called when the process is cancelled.
This method is invoked automatically when process execution is deliberately cancelled before completion. Override to implement custom cancellation logic such as cleanup or state restoration.
- Parameters:
identifier (str) – The unique identifier of the process instance.
- Note:
The default implementation logs a warning message indicating the process was cancelled.
- on_completed(identifier)
Lifecycle hook called when the process completes successfully.
This method is invoked automatically when all process tasks complete without errors. Override to implement custom completion logic such as cleanup, notifications, or result processing.
- Parameters:
identifier (str) – The unique identifier of the process instance.
- Note:
The default implementation logs an informational message indicating the process has completed.
- on_failed(identifier, step, e)
Lifecycle hook called when the process fails due to an exception.
This method is invoked automatically when a task raises an exception during process execution. Override to implement custom error handling such as rollback operations, error notifications, or recovery attempts.
- Parameters:
identifier (str) – The unique identifier of the process instance.
step (str) – The step identifier where the failure occurred.
e (Exception) – The exception that caused the failure.
- Note:
The default implementation logs an error message with the exception details.
- on_started(identifier)
Lifecycle hook called when the process starts execution.
This method is invoked automatically when process execution begins. Override to implement custom startup logic such as resource initialization or logging.
- Parameters:
identifier (str) – The unique identifier of the process instance.
- Note:
The default implementation logs an informational message indicating the process has started.
- class tasktronaut.process.SequentialProcess(identifier, definition)
Bases:
ProcessA process that executes its steps in sequential order.
SequentialProcess is a concrete implementation of the Process base class that enqueues steps for execution in a strict sequential manner. Each step must complete before the next step begins, creating a chain of dependent jobs.
tasktronaut.steps module
Steps module.
- class tasktronaut.steps.Step(func, description, kwargs)
Bases:
objectRepresents a single executable step in a process.
A
Stepencapsulates a task function along with its metadata and execution parameters.Steps form the building blocks of process definitions and are executed by the process engine according to the defined execution mode.
- Variables:
func – The callable function or method that performs the task
description – Human-readable description of the step
kwargs – Keyword arguments to be passed to the function during execution
- Parameters:
func (TaskMethod) – The task function to execute
description (Description) – A description of what this step does
kwargs (Dict[str, Any]) – Additional keyword arguments for the task function
- property is_process: bool
Determine if this step represents a process.
- Returns:
Always returns
Falsesince a Step is not a Process- Return type:
bool
- class tasktronaut.steps.Steps(iterable=(), /)
Bases:
list[Step|Process]A container for holding a collection of steps and sub-processes.
This class extends the built-in list type to provide a typed container that holds both Step instances and Process instances. It is used internally to manage the collection of execution items within a process definition.
- Inherits:
list[Union[Step, Process]]
tasktronaut.types module
Type definitions and protocols.
- tasktronaut.types.BuilderIterator
Type alias for an iterator of
Builderinstances.Represents an iterator that yields Builder objects, typically used in control flow patterns such as sequential, concurrent, or each blocks.
- Type:
Iterator[Builder]
alias of
Iterator[Builder]
- class tasktronaut.types.DecoratedTaskMethod(*args, **kwargs)
Bases:
ProtocolProtocol for a task method decorated with the
@taskdecorator.Defines the interface that decorated task methods must implement. Decorated tasks have metadata attributes and can optionally accept a
Contextparameter.- Variables:
__name__ – The name of the task method.
description – An optional human-readable description of the task.
-
description:
Optional[str]
- tasktronaut.types.Description
Type alias for an optional task or item description.
Used to provide human-readable descriptions for tasks, items in iterations, or other process elements.
- Type:
Optional[str]
alias of
str|None
- tasktronaut.types.ForEachMethod
Type alias for the
eachmethod.Represents a callable that accepts any arguments and keyword arguments, and returns an iterator of task arguments. Used to define iteration methods for the
eachpattern.- Type:
Callable[…, ForEachReturn]
alias of
Callable[[…],Iterator[Dict[str,Any] |Tuple[Dict[str,Any],str|None]]]
- tasktronaut.types.ForEachReturn
Type alias for the return type of the
eachmethod.Represents an iterator that yields task arguments. Used in the
eachpattern to iterate over a collection of items, yielding arguments for each iteration.- Type:
Iterator[TaskArgs]
alias of
Iterator[Dict[str,Any] |Tuple[Dict[str,Any],str|None]]
- tasktronaut.types.Options
Type alias for process options.
Represents configuration options that can be passed to a process, either as a
SimpleNamespaceobject or as a dictionary.- Type:
Union[SimpleNamespace, Dict[str, Any]]
alias of
SimpleNamespace|Dict[str,Any]
- tasktronaut.types.ProcessDefinitionType
Type alias for a
ProcessDefinitionclass.Represents a class type that inherits from ProcessDefinition, used when specifying sub-processes or other process references.
- Type:
Type[ProcessDefinition]
alias of
Type[ProcessDefinition]
- tasktronaut.types.TaskArgs
Type alias for task arguments.
Represents arguments passed to a task, which can be either a dictionary of arguments or a tuple containing a dictionary of arguments and an optional description string.
- Type:
Union[Dict[str, Any], Tuple[Dict[str, Any], Description]]
alias of
Dict[str,Any] |Tuple[Dict[str,Any],str|None]
- tasktronaut.types.TaskMethod
Type alias for any task method.
Represents either a decorated or undecorated task method. This is the primary type used when registering methods as tasks with a builder.
- Type:
Union[UndecoratedTaskMethod, DecoratedTaskMethod]
alias of
Callable[[…],None] |DecoratedTaskMethod
- tasktronaut.types.TransformMethod
Type alias for a transformation method.
Represents a callable that accepts any arguments and keyword arguments, and returns task arguments. Used to define transformation methods for the
transformpattern to modify arguments before task execution.- Type:
Callable[…, TaskArgs]
alias of
Callable[[…],Dict[str,Any] |Tuple[Dict[str,Any],str|None]]
- tasktronaut.types.UndecoratedTaskMethod
Type alias for an undecorated task method.
Represents a callable that accepts any arguments and keyword arguments, and returns None. These are regular methods that can be registered as tasks without explicit decoration.
- Type:
Callable[…, None]
alias of
Callable[[…],None]
- class tasktronaut.types.Value
Generic type variable for generic operations.
Used in generic function or class definitions where a type parameter is needed to represent any arbitrary type.
- Type:
TypeVar
alias of TypeVar(‘Value’)
tasktronaut.utils module
Utility and supporting methods.
- tasktronaut.utils.load_definition(module_name, definition_class)
Dynamically load and instantiate a process definition class.
This function provides dynamic loading of ProcessDefinition subclasses from specified modules. It imports the module by name, retrieves the definition class from that module, and returns a new instance of the class.
- Parameters:
module_name (str) – Fully qualified module name to import
definition_class (str) – Name of the ProcessDefinition class to load from the module
- Returns:
A new instance of the specified ProcessDefinition class
- Return type:
- Raises:
ImportError – If the module cannot be imported
AttributeError – If the definition_class does not exist in the module
TypeError – If the loaded class cannot be instantiated
- tasktronaut.utils.to_dict(**kwargs)
Convert keyword arguments to a dictionary.
This utility function packages keyword arguments into a dictionary, providing a convenient way to convert variadic keyword arguments into a dictionary structure.
- Parameters:
kwargs (Dict[str, Any]) – Arbitrary keyword arguments to convert
- Returns:
Dictionary containing all passed keyword arguments
- Return type:
dict[str, Any]
- tasktronaut.utils.to_kwargs(base, **kwargs)
Merge a base dictionary with additional keyword arguments.
Creates a new dictionary by copying the base dictionary and updating it with the provided keyword arguments. The original base dictionary is not modified. Keyword arguments take precedence over base dictionary values in case of key conflicts.
- Parameters:
base (dict[str, Any]) – Base dictionary to copy and update
kwargs (Dict[str, Any]) – Keyword arguments to merge into the base dictionary
- Returns:
New dictionary containing merged key-value pairs
- Return type:
dict[str, Any]
Module contents
Tasktronaut: A Python library for defining and executing process workflows.
The tasktronaut package provides a framework for building complex process definitions with support for sequential and concurrent execution, sub-processes, conditional logic, and iterative patterns.
This module serves as the main entry point, exposing the primary public API for the library.
- class tasktronaut.Backend
Bases:
ABCAbstract base class defining the interface for process execution backends.
Backends are responsible for enqueueing tasks, managing job dependencies, and executing process lifecycle callbacks. Subclasses must implement the abstract enqueue methods to integrate with specific job queue systems.
The backend handles three main phases of process execution: - Process start callbacks - Individual task execution with error handling - Process completion callbacks
- abstractmethod enqueue_perform_complete(identifier, module_name, definition_class, depends_on=None)
Enqueue a process completion callback.
Schedules the execution of the process completion callback, which allows the process definition to perform finalization logic when all tasks have completed. Can depend on one or more jobs.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
depends_on (Optional[Union[Job, List[Job]]]) – Optional job or list of jobs that this job depends on. If provided, this job will not execute until all dependencies complete.
- Returns:
Job object representing the enqueued task
- Return type:
- abstractmethod enqueue_perform_start(identifier, module_name, definition_class, depends_on=None)
Enqueue a process start callback.
Schedules the execution of the process start callback, which allows the process definition to perform initialization logic when execution begins.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
depends_on (Optional[Job]) – Optional job that this job depends on. If provided, this job will not execute until the dependency completes.
- Returns:
Job object representing the enqueued task
- Return type:
- abstractmethod enqueue_perform_task(identifier, module_name, definition_class, function_name, description, kwargs, depends_on=None)
Enqueue a process task for execution.
Schedules the execution of a specific task function within a process definition. The task will be executed with the provided keyword arguments and can optionally depend on another job.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
function_name (str) – Name of the task method to execute
description (Description) – Human-readable description of the task
kwargs (Dict[str, Any]) – Keyword arguments to pass to the task function
depends_on (Optional[Job]) – Optional job that this job depends on. If provided, this job will not execute until the dependency completes.
- Returns:
Job object representing the enqueued task
- Return type:
- static perform_complete(identifier, module_name, definition_class)
Execute the process completion callback.
Loads the process definition and invokes its completion callback. This method is typically called by the backend after all process tasks have completed successfully.
It is implemented as a staticmethod, to allow for the most flexibility in backend implementations.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
- static perform_start(identifier, module_name, definition_class)
Execute the process start callback.
Loads the process definition and invokes its start callback. This method is typically called by the backend after the job has been dequeued.
It is implemented as a staticmethod, to allow for the most flexibility in backend implementations.
- Parameters:
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
- classmethod perform_task(job, identifier, module_name, definition_class, function_name, description, kwargs)
Execute a process task with comprehensive error handling.
Loads the process definition, creates an execution context, and invokes the specified task function. Handles three categories of errors:
NonRetryableProcessError: Logs error, calls on_failed callback, and re-raises
CancelProcessError: Cancels the job and its dependents, calls on_cancelled callback
Other exceptions: Logs error, calls on_failed callback, and re-raises for retry
The task execution is wrapped with the definition’s around_task context manager, allowing the definition to perform setup and teardown logic.
- Parameters:
job (Job) – Job object that can be cancelled or have dependents deleted
identifier (str) – Unique identifier for the process execution instance
module_name (str) – Fully qualified module name containing the process definition
definition_class (str) – Name of the process definition class to instantiate
function_name (str) – Name of the task method to execute
description (Description) – Human-readable description of the task
kwargs (Dict[str, Any]) – Keyword arguments to pass to the task function
- Raises:
NonRetryableProcessError – When task execution raises a non-retryable error
Exception – When task execution raises an unhandled error (retryable)
- class tasktronaut.Builder(process, kwargs, options=None, description=None)
Bases:
objectA builder class for constructing and configuring process workflows.
The Builder provides a fluent interface for defining task execution graphs, including sequential and concurrent execution patterns, argument validation, iteration, and transformation capabilities.
- Parameters:
process (Process) – The process instance being built
kwargs (Dict[str, Any]) – Keyword arguments to pass to tasks and transformations
options (SimpleNamespace) – Optional configuration options for the process
description (Description) – Optional description for the builder context
- Variables:
process – The process instance being constructed
description – Description for the current builder context
options – Configuration options converted to SimpleNamespace
kwargs – Keyword arguments for task execution
- concurrent(description=None)
Create a concurrent execution block where tasks run in parallel.
This context manager creates a new ConcurrentProcess that executes its contained tasks concurrently. The resulting process is appended to the parent process’s steps.
- Parameters:
description (Description) – Optional description for the concurrent block
- Returns:
Builder instance for the concurrent process
- Return type:
BuilderIterator
- Yield:
Builder configured for concurrent execution
Example:
with builder.concurrent() as b: b.task(task1) b.task(task2)
- each(func, description=None)
Iterate over items yielded by the given
funcmethod, and yields a builder for each iteration.Executes the provided function to generate items, then yields a Builder instance for each item with updated kwargs.
The provided function can yield either a dict of kwargs or a tuple of (kwargs, description).
- Parameters:
func (ForEachMethod) – A function that yields dicts or (dict, str) tuples. The function receives the current kwargs as parameters.
description (Description) – Optional default description for iterations
- Returns:
Iterator of Builder instances, one per yielded item
- Return type:
BuilderIterator
- Yield:
Builder with kwargs updated for each iteration
Example:
def items(self, count: int, **_): for i in range(count): yield {"index": i} for b in builder.each(self.items): b.task(task_for_item)
Note
The provided function can yield either
dictor(dict, str)tuples, where the string provides a per-item description.
- expected_arguments(**kwargs)
Validate that expected arguments are present and of the correct type.
This method checks that all specified arguments exist in the builder’s kwargs and validates their types using Pydantic type adapters. Type validation is only performed when a non-None type is specified.
- Parameters:
kwargs (Dict[str, Optional[Type]]) – Mapping of argument names to expected types. Use None to skip type validation for an argument.
- Raises:
TypeError – If a required argument is not found in kwargs
ValueError – If an argument’s type does not match the expected type
Note
This method does not check for additional arguments beyond those specified.
Example:
builder.expected_arguments( foo=str, bar=int, baz=None, # No type checking qux=Optional[datetime.date] )
- option(name, default=None)
Retrieve an option value with optional default.
Accesses configuration options passed to the builder, returning the specified option’s value or a default if the option is not set.
- Parameters:
name (str) – The name of the option to retrieve
default (Optional[Value]) – Default value to return if option is not found
- Returns:
The option’s value or the default
- Return type:
Example:
if builder.option("debug", False): builder.task(verbose_logging)
- sequential(description=None)
Create a sequential execution block where tasks run one after another.
This context manager creates a new SequentialProcess that executes its contained tasks sequentially. The resulting process is appended to the parent process’s steps.
- Parameters:
description (Description) – Optional description for the sequential block
- Returns:
Builder instance for the sequential process
- Return type:
BuilderIterator
- Yield:
Builder configured for sequential execution
Example:
with builder.sequential() as b: b.task(task1) b.task(task2)
- sub_process(definition, description=None)
Embed a sub-process within the current process.
Creates and configures a new process based on the provided
ProcessDefinition, then appends it to the parent process’s steps. The sub-process inherits the current options and kwargs.- Parameters:
definition (ProcessDefinitionType) – A ProcessDefinition class defining the sub-process
description (Description) – Optional description for the sub-process
Example:
builder.sub_process(MySubProcess) builder.sub_process(MySubProcess, description="Data validation")
- task(func, description=None)
Add a task to the process.
Creates a Step wrapping the provided function and appends it to the process’s steps. The task inherits the builder’s kwargs and description unless overridden.
- Parameters:
func (TaskMethod) – The function to execute as a task. Can be a regular method, a method accepting a Context parameter, or a
@taskdecorated method.description (Description) – Optional description for the task. If not provided, uses the function’s ‘description’ attribute or the builder’s description.
Example:
builder.task(my_task) builder.task(my_task, description="Custom description")
- transform(func, description=None)
Transform kwargs for tasks within a context block.
Executes the provided function to generate new kwargs, then yields a Builder with the transformed kwargs. Tasks defined within the context use the transformed arguments. The function can return either a dict or a tuple of (dict, description).
- Parameters:
func (TransformMethod) – A function that returns a dict of new kwargs or a (dict, str) tuple. The function receives the current kwargs as parameters.
description (Description) – Optional description for the transformation context
- Returns:
Builder instance with transformed kwargs
- Return type:
BuilderIterator
- Yield:
Builder configured with transformed kwargs
Example:
def double_value(self, value: int, **_): return {"value": value * 2} with builder.transform(double_value) as t: t.task(task_with_doubled_value)
Note
The transform function can return either
dictor(dict, str)where the string overrides the description for the context.
- class tasktronaut.Context
Bases:
objectA context object that carries execution information during process execution.
The Context class serves as a container for state and metadata that may be needed by tasks during process execution. Tasks can optionally accept a Context parameter to access execution-specific information and maintain state across the process.
This is a minimal class designed to be extended or used as a base for storing execution context data passed between tasks in a Tasktronaut process.
- class tasktronaut.ExecutionMode(*values)
Bases:
EnumDefines the available execution modes for the process.
The execution mode determines how tasks within a process are executed. Sequential execution runs tasks one after another in order, while concurrent execution allows tasks to run in parallel.
- CONCURRENT(identifier, definition) = <class 'tasktronaut.process.ConcurrentProcess'>
Concurrent execution mode. Tasks are executed in parallel, allowing multiple tasks to run simultaneously. This mode is suitable for processes with independent tasks that can benefit from parallel execution.
- SEQUENTIAL(identifier, definition) = <class 'tasktronaut.process.SequentialProcess'>
Sequential execution mode. Tasks are executed one at a time in the order they are defined. Each task completes before the next begins. This is the default execution mode for processes.
- class tasktronaut.ProcessDefinition
Bases:
ABCBase class for defining workflow processes in Tasktronaut.
This class serves as the foundation for creating custom process definitions. Subclasses must implement the
define_process()method to specify the workflow structure using the providedBuilderinstance.- Variables:
description – Optional description of the process. Defaults to None.
execution_mode – The execution mode for the process (SEQUENTIAL or CONCURRENT). Defaults to SEQUENTIAL.
Example:
class MyProcess(ProcessDefinition): execution_mode = ExecutionMode.SEQUENTIAL def define_process(self, builder: Builder): builder.task(self.my_task) @task def my_task(self, context, **kwargs): print("Executing task")
- around_task(identifier, step, description, context)
Lifecycle hook called around an executing a task. It is a context manager that wraps individual task execution.
This method provides a context for task execution, allowing pre- and post-task actions such as logging, metrics collection, or resource management. Override to implement custom behavior around task execution.
- Parameters:
identifier (str) – The unique identifier of the process instance.
step (str) – The step identifier within the process.
description (Description) – Human-readable description of the task being executed.
context (Context) – The context for the invocation of the task. This object can be used to pass additional metadata to the task if needed.
- Yield:
Control is yielded during task execution.
- Return type:
Generator[None, None, None]
Example:
@contextmanager def around_task(self, identifier, step, description, context): start_time = time.time() with super().around_task(context, identifier, step, description): yield duration = time.time() - start_time logger.info(f"Task took {duration:.2f}s")
- Note:
The default implementation logs when task execution begins and completes.
- classmethod build(identifier=None, options=None, **kwargs)
Build and return a Process instance from this definition.
This class method constructs a complete process by instantiating the definition class, invoking
define_process(), and returning the fully configured process.- Parameters:
identifier (Optional[str]) – Unique identifier for the process instance. If not provided, a random UUID hex string is generated.
options (Optional[Options]) – Configuration options to pass to the builder for conditional process logic.
kwargs (Dict[str, Any]) – Additional keyword arguments passed to tasks during execution.
- Returns:
A fully configured process ready for execution.
- Return type:
Example:
process = MyProcess.build( identifier="my-process-001", options={"debug": True}, user_id=123 )
- abstractmethod define_process(builder)
Define the workflow process structure.
This method must be implemented by subclasses to specify the tasks and execution flow of the process using the provided builder.
- Parameters:
builder (Builder) – The builder instance used to construct the process workflow.
- Raises:
NotImplementedError – If not implemented by subclass.
Example:
def define_process(self, builder: Builder): builder.task(self.first_task) with builder.concurrent(): builder.task(self.second_task) builder.task(self.third_task) builder.task(self.forth_task)
-
description:
Optional[str] = None Optional description of the process. Defaults to None.
-
execution_mode(identifier, definition):
ExecutionMode= <class 'tasktronaut.process.SequentialProcess'> The execution mode for the process (SEQUENTIAL or CONCURRENT).
- on_cancelled(identifier)
Lifecycle hook called when the process is cancelled.
This method is invoked automatically when process execution is deliberately cancelled before completion. Override to implement custom cancellation logic such as cleanup or state restoration.
- Parameters:
identifier (str) – The unique identifier of the process instance.
- Note:
The default implementation logs a warning message indicating the process was cancelled.
- on_completed(identifier)
Lifecycle hook called when the process completes successfully.
This method is invoked automatically when all process tasks complete without errors. Override to implement custom completion logic such as cleanup, notifications, or result processing.
- Parameters:
identifier (str) – The unique identifier of the process instance.
- Note:
The default implementation logs an informational message indicating the process has completed.
- on_failed(identifier, step, e)
Lifecycle hook called when the process fails due to an exception.
This method is invoked automatically when a task raises an exception during process execution. Override to implement custom error handling such as rollback operations, error notifications, or recovery attempts.
- Parameters:
identifier (str) – The unique identifier of the process instance.
step (str) – The step identifier where the failure occurred.
e (Exception) – The exception that caused the failure.
- Note:
The default implementation logs an error message with the exception details.
- on_started(identifier)
Lifecycle hook called when the process starts execution.
This method is invoked automatically when process execution begins. Override to implement custom startup logic such as resource initialization or logging.
- Parameters:
identifier (str) – The unique identifier of the process instance.
- Note:
The default implementation logs an informational message indicating the process has started.
- tasktronaut.task(func=None, /, *, description=None)
Decorator for marking methods as tasks within a process definition.
This decorator can be applied to methods with or without arguments and optionally assigns a description to the task. When applied, it attaches metadata to the decorated function that identifies it as a task and stores any provided description.
- Parameters:
func (
UndecoratedTaskMethodor None) – The undecorated task method. This parameter is only provided when the decorator is used without parentheses (bare decorator form). When None, the decorator is being used with keyword arguments and returns a decorator function.description (
Descriptionor None) – An optional description of the task’s purpose and behavior. This description may be used for logging, documentation generation, or user-facing displays of the process flow.
- Returns:
Either the decorated method (when used as a bare decorator) or a decorator function (when used with keyword arguments).
- Return type:
DecoratedTaskMethodorCallable
Note
The decorator supports two usage patterns:
Bare decorator: Applied directly without parentheses or with empty parentheses, where no description is provided.
Parameterized decorator: Applied with the
descriptionkeyword argument to specify a task description.
See also
UndecoratedTaskMethod– Type for undecorated task methodsDecoratedTaskMethod– Type for decorated task methodsDescription– Type for task descriptions