Complete API reference for schedule management operations in Conductor Python SDK.
📚 Complete Working Example: See schedule_journey.py for a comprehensive implementation covering all schedule management APIs.
- Schedule APIs
- Schedule Execution APIs
- Schedule Tag Management APIs
- API Details
- Model Reference
- Error Handling
Core CRUD operations for managing workflow schedules.
| Method | Endpoint | Description | Example |
|---|---|---|---|
save_schedule() |
POST /api/scheduler/schedules |
Create or update a schedule | Example |
get_schedule() |
GET /api/scheduler/schedules/{name} |
Get a specific schedule | Example |
get_all_schedules() |
GET /api/scheduler/schedules |
Get all schedules (optionally by workflow) | Example |
delete_schedule() |
DELETE /api/scheduler/schedules/{name} |
Delete a schedule | Example |
Operations for controlling schedule execution state.
| Method | Endpoint | Description | Example |
|---|---|---|---|
pause_schedule() |
PUT /api/scheduler/schedules/{name}/pause |
Pause a specific schedule | Example |
pause_all_schedules() |
PUT /api/scheduler/schedules/pause |
Pause all schedules | Example |
resume_schedule() |
PUT /api/scheduler/schedules/{name}/resume |
Resume a specific schedule | Example |
resume_all_schedules() |
PUT /api/scheduler/schedules/resume |
Resume all schedules | Example |
APIs for managing and querying schedule executions.
| Method | Endpoint | Description | Example |
|---|---|---|---|
get_next_few_schedule_execution_times() |
GET /api/scheduler/nextFewRuns |
Get next execution times for cron expression | Example |
search_schedule_executions() |
GET /api/scheduler/search/executions |
Search schedule execution history | Example |
requeue_all_execution_records() |
POST /api/scheduler/requeue |
Requeue all execution records | Example |
Operations for managing tags associated with schedules.
| Method | Endpoint | Description | Example |
|---|---|---|---|
set_scheduler_tags() |
POST /api/scheduler/schedules/{name}/tags |
Set/overwrite tags on a schedule | Example |
get_scheduler_tags() |
GET /api/scheduler/schedules/{name}/tags |
Get tags for a schedule | Example |
delete_scheduler_tags() |
DELETE /api/scheduler/schedules/{name}/tags |
Delete specific tags from a schedule | Example |
Create or update a workflow schedule.
from conductor.client.http.models.save_schedule_request import SaveScheduleRequest
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
# Create workflow start request
start_workflow_request = StartWorkflowRequest(
name="order_processing",
version=1,
input={
"source": "scheduled",
"batch_size": 100
},
correlation_id="SCHEDULE_ORDER_BATCH"
)
# Create schedule request
# Note: Conductor uses Spring cron format (6 fields: second minute hour day month weekday)
schedule_request = SaveScheduleRequest(
name="daily_order_processing",
description="Process pending orders daily at midnight",
cron_expression="0 0 0 * * ?", # Daily at midnight (Spring cron format)
zone_id="America/New_York",
start_workflow_request=start_workflow_request,
paused=False # Schedule starts active
)
# Save the schedule
scheduler_client.save_schedule(schedule_request)Parameters:
name(str, required): Unique schedule namedescription(str, optional): Schedule descriptioncron_expression(str, required): Spring cron expression (6 fields: second minute hour day month weekday)zone_id(str, optional): Time zone ID (default: UTC)start_workflow_request(StartWorkflowRequest, required): Workflow to executepaused(bool, optional): Start schedule paused (default: False)schedule_start_time(int, optional): Schedule start time (epoch millis)schedule_end_time(int, optional): Schedule end time (epoch millis)
Spring Cron Format:
- Format:
second minute hour day month weekday - Examples:
0 0 0 * * ?- Daily at midnight0 0 * * * ?- Every hour0 0 9 ? * MON- Every Monday at 9 AM0 0 */2 * * ?- Every 2 hours0 0 0,12 * * ?- Midnight and noon
Retrieve a specific schedule by name.
schedule = scheduler_client.get_schedule("daily_order_processing")
if schedule:
print(f"Schedule: {schedule.name}")
print(f"Cron: {schedule.cron_expression}")
print(f"Paused: {schedule.paused}")
print(f"Next Run: {schedule.next_execution_time}")Returns: WorkflowSchedule object or None if not found
Retrieve all schedules, optionally filtered by workflow name.
# Get all schedules
all_schedules = scheduler_client.get_all_schedules()
# Get schedules for specific workflow
workflow_schedules = scheduler_client.get_all_schedules("order_processing")
for schedule in workflow_schedules:
print(f"{schedule.name}: {schedule.cron_expression}")Parameters:
workflow_name(str, optional): Filter by workflow name
Returns: List of WorkflowSchedule objects
Delete a schedule by name.
scheduler_client.delete_schedule("daily_order_processing")
print("Schedule deleted successfully")Pause a specific schedule to stop executions.
scheduler_client.pause_schedule("daily_order_processing")
print("Schedule paused")Pause all schedules in the system.
scheduler_client.pause_all_schedules()
print("All schedules paused")Resume a paused schedule.
scheduler_client.resume_schedule("daily_order_processing")
print("Schedule resumed")Resume all paused schedules.
scheduler_client.resume_all_schedules()
print("All schedules resumed")Calculate next execution times for a cron expression.
import time
# Get next 5 execution times
next_times = scheduler_client.get_next_few_schedule_execution_times(
cron_expression="0 0 0 * * ?", # Daily at midnight (Spring cron)
schedule_start_time=int(time.time() * 1000),
schedule_end_time=None,
limit=5
)
for timestamp in next_times:
from datetime import datetime
dt = datetime.fromtimestamp(timestamp / 1000)
print(f"Next execution: {dt}")Parameters:
cron_expression(str, required): Cron expression to evaluateschedule_start_time(int, optional): Start time in epoch millisschedule_end_time(int, optional): End time in epoch millislimit(int, optional): Number of times to return (default: 3)
Search schedule execution history with filtering and pagination.
# Search recent executions
results = scheduler_client.search_schedule_executions(
start=0,
size=20,
sort="startTime:DESC",
free_text="order",
query="scheduleName='daily_order_processing' AND status='COMPLETED'"
)
print(f"Total executions: {results.total_hits}")
for execution in results.results:
print(f"Execution: {execution.workflow_id} - {execution.status}")Parameters:
start(int, optional): Start index for pagination (default: 0)size(int, optional): Number of results (default: 100)sort(str, optional): Sort field and order (e.g., "startTime:DESC")free_text(str, optional): Free text searchquery(str, optional): Query DSL for filtering
Returns: SearchResultWorkflowScheduleExecutionModel with results and metadata
Requeue all execution records for retry.
scheduler_client.requeue_all_execution_records()
print("All execution records requeued")Set or overwrite all tags on a schedule.
from conductor.client.orkes.models.metadata_tag import MetadataTag
tags = [
MetadataTag("environment", "production"),
MetadataTag("priority", "high"),
MetadataTag("team", "backend")
]
scheduler_client.set_scheduler_tags(tags, "daily_order_processing")
print("Tags set successfully")Note: This overwrites all existing tags
Retrieve all tags for a schedule.
tags = scheduler_client.get_scheduler_tags("daily_order_processing")
for tag in tags:
print(f"{tag.key}: {tag.value}")Returns: List of MetadataTag objects
Delete specific tags from a schedule.
tags_to_delete = [
MetadataTag("priority", "high"),
MetadataTag("team", "backend")
]
remaining_tags = scheduler_client.delete_scheduler_tags(
tags_to_delete,
"daily_order_processing"
)
print(f"Remaining tags: {len(remaining_tags)}")Returns: List of remaining MetadataTag objects
Request model for creating/updating schedules.
class SaveScheduleRequest:
name: str # Unique schedule name
description: Optional[str] # Schedule description
cron_expression: str # Spring cron expression (6 fields)
zone_id: Optional[str] = "UTC" # Time zone
start_workflow_request: StartWorkflowRequest # Workflow to execute
paused: Optional[bool] = False # Start paused
schedule_start_time: Optional[int] # Start time (epoch millis)
schedule_end_time: Optional[int] # End time (epoch millis)Schedule configuration and status.
class WorkflowSchedule:
name: str # Schedule name
cron_expression: str # Spring cron expression
zone_id: str # Time zone
paused: bool # Pause status
enabled: bool # Enable status
start_workflow_request: dict # Workflow configuration
created_time: int # Creation time (epoch millis)
updated_time: int # Last update time
next_execution_time: Optional[int] # Next run time
schedule_start_time: Optional[int] # Schedule start
schedule_end_time: Optional[int] # Schedule endWorkflow execution request.
class StartWorkflowRequest:
name: str # Workflow name
version: Optional[int] # Workflow version
input: Optional[dict] # Input parameters
correlation_id: Optional[str] # Correlation ID
task_to_domain: Optional[dict] # Task domain mapping
workflow_def: Optional[WorkflowDef] # Inline workflow definition
priority: Optional[int] = 0 # Execution prioritySearch results for schedule executions.
class SearchResultWorkflowScheduleExecutionModel:
results: List[WorkflowScheduleExecution] # Execution records
total_hits: int # Total matching recordstry:
schedule = scheduler_client.get_schedule("non_existent")
except Exception as e:
if "404" in str(e):
print("Schedule not found")
else:
print(f"Error: {e}")
# Validation errors
try:
schedule_request = SaveScheduleRequest(
name="invalid",
cron_expression="invalid_cron", # Invalid cron
start_workflow_request=start_request
)
scheduler_client.save_schedule(schedule_request)
except ValueError as e:
print(f"Validation error: {e}")
# Permission errors
try:
scheduler_client.delete_schedule("system_schedule")
except PermissionError as e:
print(f"Permission denied: {e}")-
Schedule Naming:
- Use descriptive, unique names
- Include frequency/purpose in name
- Follow naming conventions
-
Cron Expressions:
- Test expressions before deployment
- Use
get_next_few_schedule_execution_times()to verify - Consider time zones carefully
-
Error Recovery:
- Monitor execution history regularly
- Use
search_schedule_executions()for debugging - Implement workflow error handling
-
Tagging Strategy:
- Tag by environment (dev/staging/prod)
- Tag by team/owner
- Tag by priority/criticality
For a comprehensive example covering all schedule management APIs with proper error handling and best practices, see schedule_journey.py.
# Quick example
from conductor.client.orkes.orkes_scheduler_client import OrkesSchedulerClient
from conductor.client.configuration.configuration import Configuration
config = Configuration(server_api_url="http://localhost:8080/api")
scheduler = OrkesSchedulerClient(config)
# Create, manage, and monitor schedules
# Full implementation in examples/schedule_journey.py- Workflow Management - Creating workflows to schedule
- Metadata Management - Task and workflow definitions
- Authorization - Permission management for schedules
- Examples - Complete working examples