Tutorial: Distributed Tasks (E22)¶
This tutorial corresponds to the example file examples/E22_distributed_task_example.py
.
It demonstrates how to offload long-running tasks to a distributed queue like Celery or RQ. It shows how to:
- Configure a DistributedTaskQueuePlugin
for Celery or RQ.
- Use genie.task_queue.submit_task()
to send a task to a worker.
- Use genie.task_queue.get_task_status()
and genie.task_queue.get_task_result()
to monitor and retrieve results.
Note: This example focuses on the client-side API. A running Celery/RQ worker environment is required to execute the tasks.
Example Code¶
examples/E22_distributed_task_example.py¶
""" Example: Distributed Task Offloading (Conceptual)
This example outlines how one might use the Distributed Task Queue feature
with Celery or RQ. It assumes:
1. Celery/RQ and a broker (e.g., Redis) are installed.
2. A worker is running and configured to find tasks.
3. A task (e.g., execute_genie_tool_task
) is defined for the worker
that can instantiate a minimal Genie environment or directly execute a tool.
This example focuses on the Genie client-side configuration and submission. The worker-side task implementation is beyond this basic example.
To Run (Conceptual - requires worker setup):
1. Start Redis: docker run -d -p 6379:6379 redis
2. Start a Celery or RQ worker (details depend on your task definitions).
3. Run this script: poetry run python examples/E22_distributed_task_example.py
"""
import asyncio
import logging
from typing import Optional
from genie_tooling.config.features import FeatureSettings from genie_tooling.config.models import MiddlewareConfig from genie_tooling.genie import Genie
REMOTE_TOOL_EXEC_TASK_NAME = "genie_tooling.worker_tasks.execute_genie_tool_task" # Placeholder
async def run_distributed_task_demo(): print("--- Distributed Task Offloading Demo (Conceptual) ---") logging.basicConfig(level=logging.INFO) # logging.getLogger("genie_tooling").setLevel(logging.DEBUG)
# --- Configuration for Celery ---
app_config_celery = MiddlewareConfig(
features=FeatureSettings(
llm="none",
command_processor="none",
task_queue="celery",
task_queue_celery_broker_url="redis://localhost:6379/1",
task_queue_celery_backend_url="redis://localhost:6379/2",
),
distributed_task_queue_configurations={
"celery_task_queue_v1": {
"celery_app_name": "genie_example_tasks_celery",
}
}
)
# --- Configuration for RQ (Redis Queue) ---
app_config_rq = MiddlewareConfig(
features=FeatureSettings(
llm="none",
command_processor="none",
task_queue="rq",
),
distributed_task_queue_configurations={
"redis_queue_task_plugin_v1": { # Canonical ID for RQ plugin
"redis_url": "redis://localhost:6379/3", # Separate Redis DB for RQ
"default_queue_name": "genie-rq-jobs"
}
}
)
# Select the configuration to use
app_config = app_config_celery
# app_config = app_config_rq # Uncomment to test with RQ
print(f"Using task queue: {app_config.features.task_queue}")
genie: Optional[Genie] = None
try:
genie = await Genie.create(config=app_config)
print(f"Genie initialized with {app_config.features.task_queue} task queue support.")
tool_exec_params = {
"tool_id": "calculator_tool",
"tool_params": {"num1": 200, "num2": 25, "operation": "multiply"},
"context_info": {"user_id": "demo_user"}
}
task_id_tool = await genie.task_queue.submit_task(
task_name=REMOTE_TOOL_EXEC_TASK_NAME,
kwargs=tool_exec_params
)
if task_id_tool:
print(f"Tool execution task for 'calculator_tool' submitted with ID: {task_id_tool}")
result_output = "Polling for result..."
for i in range(15): # Poll for up to 15 seconds
status = await genie.task_queue.get_task_status(task_id_tool)
print(f" (Poll {i+1}) Task {task_id_tool} status: {status}")
if status == "success":
tool_result = await genie.task_queue.get_task_result(task_id_tool)
result_output = f"Tool task '{task_id_tool}' successful. Result: {tool_result}"
break
elif status in ["failure", "revoked"]:
result_output = f"Tool task '{task_id_tool}' failed or revoked. Status: {status}"
try:
error_details = await genie.task_queue.get_task_result(task_id_tool)
result_output += f" Details: {error_details}"
except Exception:
pass # Error details might not be available or might raise
break
await asyncio.sleep(1)
print(result_output)
else:
print("Failed to submit tool execution task.")
except Exception as e:
print(f"An error occurred: {e}")
logging.exception("Distributed task demo error details:")
finally:
if genie:
await genie.close()
print("\nGenie torn down.")
if name == "main": print("This example is conceptual and requires a configured Celery/RQ worker.") print("It demonstrates the client-side API usage for both.") asyncio.run(run_distributed_task_demo())