Simple, linear scripts are great for basic automation, but real-world business processes are rarely that straightforward. They involve parallel tasks, conditional logic, human approvals, and the need to recover from failure. When your "simple script" starts to look like a tangled web of if/else statements, try/except blocks, and complex state management, it's time for a better tool.
Enter the .do Agentic Workflow Platform. With our official Python SDK, you can move beyond basic API calls and start orchestrating sophisticated, resilient, and scalable business logic directly within your Python applications. It provides a simple, elegant interface to define complex processes as code.
Let's explore five powerful workflow patterns you can implement today to solve complex business problems.
The Pattern: Instead of processing items in a list one by one, you "fan out" by starting a concurrent task for each item. Once all tasks are complete, the workflow "fans in" to aggregate the results and continue. This dramatically reduces total execution time.
Real-World Example: You need to enrich a list of 100 new user signups by calling three different external APIs for each user (e.g., Clearbit for company data, Hunter for email validation, and an internal fraud detection service). Doing this sequentially would be incredibly slow.
With the .do Python SDK: You can trigger a child workflow for each user, run the three API calls in parallel within that workflow, and then wait for all 100 enrichment tasks to complete before generating a final report.
from do_sdk import DoClient, DoException
# Initialize the client with your API key
do_client = DoClient(api_key="your_do_api_key")
def enrich_new_users(user_list: list):
try:
# Define a parent workflow that fans out tasks
# Each user object is passed as input to a parallel child workflow
result = do_client.workflows.run(
name="user-enrichment-parent-workflow",
input={
"users": user_list,
"child_workflow": "enrich-single-user-sub-workflow"
},
wait_for_result=True # Fan-in: Wait for all parallel tasks to complete
)
print("All users enriched successfully!")
return result.output
except DoException as e:
print(f"Workflow failed: {e}")
Benefit: Massive speed improvements and enhanced efficiency for batch processing tasks.
The Pattern: Many critical workflows require human judgment. An HITL pattern allows a workflow to pause its execution, assign a task to a person or team, and wait for their input (like an approval or data entry) before proceeding.
Real-World Example: An employee submits an expense report over $1,000. The workflow automatically pauses and creates an approval task for their manager in a web portal or Slack. The workflow only resumes to process the reimbursement after the manager clicks "Approve."
With the .do Python SDK: You can design workflows with explicit approval steps that halt execution and expose a secure callback URL.
# A conceptual workflow definition (managed in the .do platform)
# - Step 1: Ingest expense data
# - Step 2: If amount > 1000, start approval
# - Step 3: Wait for human input (e.g., manager clicks "approve")
# - Step 4: Continue to payment processing
def submit_expense_report(report_data: dict):
"""Triggers the expense approval workflow."""
try:
run = do_client.workflows.run(
name="expense-approval-workflow",
input={"report": report_data}
)
print(f"Expense report submitted. Workflow Run ID: {run.run_id}")
# The workflow will now run and pause at the approval step if necessary
except DoException as e:
print(f"Failed to submit report: {e}")
Benefit: Safely integrate manual decision-making into fully automated processes without complex polling or webhook management.
The Pattern: Workflows that can intelligently alter their path based on the data they are processing. This is the foundation of creating truly smart and responsive automation.
Real-World Example: A customer support ticket is created. The workflow inspects the ticket's priority level. If it's "Urgent," it's routed to the senior support channel on Slack and a PagerDuty incident is created. If it's "Low," it's simply added to the support team's general queue.
With the .do Python SDK: The logic is defined within your workflow on the .do platform, and your Python code simply triggers it with the right data.
def route_support_ticket(ticket_details: dict):
"""
Triggers a workflow that uses conditional logic to route a support ticket.
"""
try:
run = do_client.workflows.run(
name="support-ticket-router-workflow",
input={"ticket": ticket_details}
)
print(f"Ticket routing initiated. Run ID: {run.run_id}")
except DoException as e:
print(f"Failed to route ticket: {e}")
Benefit: Build sophisticated, rule-based systems that adapt to changing inputs, reducing the need for hardcoded logic in your application code.
The Pattern: Some processes take days, weeks, or even months to complete, far exceeding the lifespan of a typical serverless function or script. A stateful workflow engine reliably keeps track of progress, timers, and data over long durations.
Real-World Example: A 30-day trial user onboarding sequence. The workflow sends a welcome email on Day 1, a feature-discovery email on Day 3, checks for a specific user action on Day 7, and sends a conversion offer on Day 25. The workflow must reliably "sleep" and wake up at the correct times.
With the .do Python SDK: The platform manages the state, timers, and durability. Your application code just needs to kick off the process.
def start_user_onboarding_trial(user_id: str, email: str):
"""
Starts a 30-day onboarding workflow that includes multiple timed delays.
"""
try:
# This workflow is defined with "sleep" steps (e.g., wait 3 days)
run = do_client.workflows.run(
name="30-day-trial-onboarding",
input={"userId": user_id, "userEmail": email}
)
print(f"Onboarding sequence started for {email}. Run ID: {run.run_id}")
except DoException as e:
print(f"Failed to start onboarding: {e}")
Benefit: Reliably orchestrate long-running processes without building and maintaining your own complex state machine, databases, and cron job systems.
The Pattern: External systems fail. Networks glitch. A robust workflow must anticipate these transient errors and handle them gracefully, often by retrying the failed step with an exponential backoff strategy.
Real-World Example: A workflow needs to charge a customer's credit card via the Stripe API. If the API returns a 503 Service Unavailable error, the workflow shouldn't fail immediately. Instead, it should wait 5 seconds and retry. If it fails again, it waits 15 seconds, and so on, before finally marking the step as failed and triggering an alert.
With the .do Python SDK: You configure retry policies directly within your workflow's step definitions on the .do platform, making your process instantly more resilient.
# A conceptual workflow definition on the .do platform for a single step
# - Step: "charge-credit-card"
# - Action: Call Stripe API
# - Retry Policy:
# - Max Attempts: 5
# - Strategy: Exponential Backoff
# - Initial Interval: 5 seconds
# - Catchable Errors: [500, 503, 'timeout']
def trigger_payment_workflow(payment_details: dict):
"""Triggers a payment workflow with built-in retry logic."""
try:
run = do_client.workflows.run(
name="resilient-payment-workflow",
input={"payment": payment_details}
)
print(f"Payment workflow started. Run ID: {run.run_id}")
except DoException as e:
print(f"Workflow could not be started: {e}")
Benefit: Build highly reliable systems that can automatically recover from common, temporary failures, improving stability and reducing manual interventions.
The .do Python SDK empowers you to treat complex business logic as a first-class citizen of your application. By combining these powerful patterns, you can orchestrate everything from simple API integrations to mission-critical, long-running agentic processes with confidence and clarity.
Ready to build more powerful workflows?