Using Distributed Task Queues (genie.task_queue
)¶
Genie Tooling supports offloading tasks, such as long-running tool executions, to distributed task queues like Celery or RQ. This allows your main agent loop to remain responsive.
Core Concepts¶
TaskQueueInterface
(genie.task_queue
): Facade for submitting and managing distributed tasks.DistributedTaskQueuePlugin
: Plugin for a specific task queue system.- Built-in:
CeleryTaskQueuePlugin
(alias:celery
),RedisQueueTaskPlugin
(alias:rq
).
- Built-in:
Configuration¶
Enable and configure your chosen task queue system via FeatureSettings
.
from genie_tooling.config.models import MiddlewareConfig
from genie_tooling.config.features import FeatureSettings
# Example for Celery
app_config_celery = MiddlewareConfig(
features=FeatureSettings(
task_queue="celery",
task_queue_celery_broker_url="redis://localhost:6379/1", # Your Celery broker
task_queue_celery_backend_url="redis://localhost:6379/2", # Your Celery result backend
)
)
# Example for RQ (Redis Queue)
app_config_rq = MiddlewareConfig(
features=FeatureSettings(
task_queue="rq"
),
# RQ plugin specific config
distributed_task_queue_configurations={
"redis_queue_task_plugin_v1": { # Canonical ID
"redis_url": "redis://localhost:6379/3", # Redis for RQ
"default_queue_name": "genie-rq-default"
}
}
)
Using genie.task_queue
¶
This interface allows you to directly submit tasks to the configured queue. You must have tasks defined and registered with your Celery/RQ worker environment.
# Assuming 'genie' is initialized with a task queue configured
# and you have a worker task defined as 'my_project.tasks.long_computation'
task_id = await genie.task_queue.submit_task(
task_name="my_project.tasks.long_computation",
args=(10, 20),
kwargs={"operation": "multiply"},
task_options={"countdown": 5} # Example: Celery-specific option
)
if task_id:
print(f"Task submitted with ID: {task_id}")
# Poll for status and result (example polling loop)
for _ in range(30):
status = await genie.task_queue.get_task_status(task_id)
print(f"Task {task_id} status: {status}")
if status == "success":
result = await genie.task_queue.get_task_result(task_id)
print(f"Task result: {result}")
break
elif status in ["failure", "revoked"]:
print(f"Task failed or was revoked.")
break
await asyncio.sleep(1)
Note on Worker Implementation: A common pattern is to create a generic worker task that can initialize a minimal Genie environment to execute tools. This keeps your web-facing application lightweight while offloading heavy work.
Note on Security and Context: Securely passing API keys and sensitive context to distributed workers requires careful design. Options include workers fetching credentials themselves from a secure vault or using short-lived, task-specific credentials. The direct genie.task_queue.submit_task
method gives you full control over what gets sent to the worker.