User Guide
Introduction
Tasktronaut is a Python library for defining and executing processes composed of individual tasks.
It provides a declarative API for organizing tasks into complex workflows with support for sequential execution, concurrent execution, conditional logic, iteration, and nested sub-processes.
Getting Started
Basic Process Definition
To create a process, subclass ProcessDefinition and implement the define_process method:
from tasktronaut import ProcessDefinition, Builder
class MyProcess(ProcessDefinition):
def define_process(self, builder: Builder):
builder.task(self.step_one)
builder.task(self.step_two)
builder.task(self.step_three)
def step_one(self):
print("Executing step one")
def step_two(self):
print("Executing step two")
def step_three(self):
print("Executing step three")
The builder object is used to add tasks and structure your process flow.
Tasks
Defining Tasks
Tasks are methods that perform work within a process. There are several ways to define tasks:
Simple Methods:
Any method can be used as a task:
def my_task(self, *args, **kwargs):
# Task implementation
pass
Context-Aware Methods:
Methods can receive a Context object to access execution information:
from tasktronaut import Context
def my_task(self, context: Context, **kwargs):
# Access runtime information via context
pass
Task Decorator:
Use the @task decorator to add metadata like descriptions:
from tasktronaut import task
@task(description="Processes data input")
def my_task(self, *args, **kwargs):
pass
@task()
def another_task(self):
pass
@task
def simple_task(self):
pass
Execution Modes
Sequential Execution
By default, tasks execute one after another in the order they are defined. Set the execution mode explicitly:
from tasktronaut import ExecutionMode, ProcessDefinition
class SequentialProcess(ProcessDefinition):
process_execution_mode = ExecutionMode.SEQUENTIAL
def define_process(self, builder: Builder):
builder.task(self.task_a)
builder.task(self.task_b)
builder.task(self.task_c)
Tasks execute in order: task_a, then task_b, then task_c.
Concurrent Execution
Execute multiple tasks in parallel:
class ConcurrentProcess(ProcessDefinition):
process_execution_mode = ExecutionMode.CONCURRENT
def define_process(self, builder: Builder):
builder.task(self.task_a)
builder.task(self.task_b)
builder.task(self.task_c)
All three tasks may execute simultaneously.
Process Structure
Sequential and Concurrent Blocks
Group tasks into sequential or concurrent sections within a larger process:
class MixedProcess(ProcessDefinition):
def define_process(self, builder: Builder):
builder.task(self.setup)
# Run these tasks sequentially
with builder.sequential() as b:
b.task(self.step_one)
b.task(self.step_two)
# Run these tasks concurrently
with builder.concurrent() as b:
b.task(self.parallel_task_a)
b.task(self.parallel_task_b)
builder.task(self.cleanup)
This allows fine-grained control over execution flow within a single process.
Input Arguments
Defining Expected Arguments
Specify the input arguments your process accepts with type hints:
import datetime
from typing import List, Optional
class ProcessWithArguments(ProcessDefinition):
def define_process(self, builder: Builder):
builder.expected_arguments(
name=str,
count=int,
date=datetime.date,
tags=List[str],
description=Optional[str],
data=None, # Any type accepted
)
builder.task(self.process_data)
def process_data(self, name: str, count: int, **kwargs):
pass
These arguments are passed to your tasks and can be accessed via keyword arguments or through the Context object.
Conditional Logic
Using Options
Include or exclude tasks based on boolean options:
class ConditionalProcess(ProcessDefinition):
def define_process(self, builder: Builder):
if builder.option("enable_validation", False):
builder.task(self.validate_input)
builder.task(self.process)
if builder.option("enable_logging", True):
builder.task(self.log_results)
The first argument is the option name, and the second is the default value. Tasks are only added if the option is True.
Iteration
The Each Pattern
Iterate over a collection and execute tasks for each item:
from typing import Iterator
class IterativeProcess(ProcessDefinition):
def items(self, files: List[str], **_) -> Iterator[dict]:
for file in files:
yield {"filename": file}
def define_process(self, builder: Builder):
for iteration in builder.each(self.items):
iteration.task(self.process_file)
def process_file(self, filename: str, **kwargs):
print(f"Processing {filename}")
The items method is a generator that yields dictionaries of arguments for each iteration.
With Descriptions:
Optionally provide descriptions for each iteration:
class IterativeProcessWithDescriptions(ProcessDefinition):
def items(self, files: List[str], **_) -> Iterator[Tuple[dict, str]]:
for file in files:
yield ({"filename": file}, f"Processing {file}")
def define_process(self, builder: Builder):
for iteration in builder.each(self.items):
iteration.task(self.process_file)
Transformation
The Transform Pattern
Transform input arguments before executing tasks:
class TransformingProcess(ProcessDefinition):
def transform_input(self, raw_data: str, **_) -> dict:
return {
"processed_data": raw_data.upper(),
}
def define_process(self, builder: Builder):
with builder.transform(self.transform_input) as transformed:
transformed.task(self.process)
def process(self, processed_data: str, **kwargs):
pass
The transform method receives the input arguments and returns a dictionary of transformed arguments.
With Descriptions:
Include descriptions for transformed sections:
class TransformingProcessWithDescription(ProcessDefinition):
def transform_input(self, item_id: int, **_) -> Tuple[dict, str]:
return (
{"processed_id": item_id * 2},
f"Processing item {item_id}",
)
def define_process(self, builder: Builder):
with builder.transform(self.transform_input) as transformed:
transformed.task(self.process)
Sub-Processes
Nesting Processes
Include another ProcessDefinition as a sub-process:
class ValidationProcess(ProcessDefinition):
def define_process(self, builder: Builder):
builder.task(self.check_schema)
builder.task(self.check_constraints)
class MainProcess(ProcessDefinition):
def define_process(self, builder: Builder):
builder.task(self.load_data)
builder.sub_process(ValidationProcess)
builder.task(self.save_data)
Sub-processes allow you to compose larger workflows from smaller, reusable process definitions.
Advanced Patterns
Combining Multiple Patterns
Complex workflows can combine multiple patterns:
class ComplexProcess(ProcessDefinition):
def get_batches(self, items: List[str], **_) -> Iterator[Tuple[dict, str]]:
for i in range(0, len(items), 10):
batch = items[i:i+10]
yield ({"batch": batch}, f"Batch {i//10}")
def define_process(self, builder: Builder):
builder.task(self.setup)
for batch_iter in builder.each(self.get_batches):
with batch_iter.transform(self.prepare_batch) as prepared:
with prepared.concurrent() as b:
b.task(self.process_item_a)
b.task(self.process_item_b)
builder.task(self.finalize)
This process iterates over batches, transforms them, and then processes items concurrently within each batch.
Best Practices
Keep tasks focused: Each task should have a single responsibility.
Use descriptions: Add task descriptions for clarity and debugging.
Handle exceptions: Include error handling within task methods.
Leverage context: Use the Context object to access execution metadata when needed.
Organize related tasks: Use sub-processes to organize large workflows into logical units.
Document arguments: Clearly define expected arguments in
expected_arguments().Use type hints: Include type hints for better IDE support and documentation.