Skip to content

API Reference

This section contains the auto-generated API documentation for Genie Tooling.

Explore the modules and classes below for detailed information.

Core

Configuration

Plugins (Protocols & Implementations)

This section can be expanded by mkdocstrings or by manually linking to specific plugin protocol documentation pages. Key plugin categories include: * Tools * LLM Providers * Command Processors * RAG Components (Loaders, Splitters, Embedders, Vector Stores, Retrievers) * Tool Lookup Providers * Definition Formatters * Caching Providers * Key Providers * Invocation Strategies * Error Handling * Logging & Redaction * Code Executors * Prompt System Plugins (Registries, Templates) * Conversation State Providers * Observability Tracers * HITL Approvers * Token Usage Recorders * Guardrail Plugins * LLM Output Parsers


genie_tooling

genie-tooling

A hyper-pluggable Python middleware for Agentic AI and LLM applications. Async-first for performance.

Classes

BaseAgent

BaseAgent(genie: Genie, agent_config: Optional[Dict[str, Any]] = None)

Bases: ABC

Abstract base class for agents that implement specific execution patterns (e.g., ReAct, Plan-and-Execute) using the Genie facade.

Initializes the BaseAgent.

Parameters:

Name Type Description Default
genie Genie

An initialized instance of the Genie facade.

required
agent_config Optional[Dict[str, Any]]

Optional dictionary for agent-specific configurations.

None
Source code in src/genie_tooling/agents/base_agent.py
def __init__(self, genie: "Genie", agent_config: Optional[Dict[str, Any]] = None):
    """
    Initializes the BaseAgent.

    Args:
        genie: An initialized instance of the Genie facade.
        agent_config: Optional dictionary for agent-specific configurations.
    """
    if not genie:
        raise ValueError("A Genie instance is required to initialize an agent.")
    self.genie = genie
    self.agent_config = agent_config or {}
    logger.info(f"Initialized {self.__class__.__name__} with Genie instance and config: {self.agent_config}")
Functions
run abstractmethod async
run(goal: str, **kwargs: Any) -> Any

The main entry point to execute the agent's logic for a given goal.

Parameters:

Name Type Description Default
goal str

The high-level goal or task for the agent to accomplish.

required
**kwargs Any

Additional runtime parameters specific to the agent's execution.

{}

Returns:

Type Description
Any

The final result or outcome of the agent's execution.

Any

The structure of the result is specific to the agent implementation.

Source code in src/genie_tooling/agents/base_agent.py
@abc.abstractmethod
async def run(self, goal: str, **kwargs: Any) -> Any:
    """
    The main entry point to execute the agent's logic for a given goal.

    Args:
        goal: The high-level goal or task for the agent to accomplish.
        **kwargs: Additional runtime parameters specific to the agent's execution.

    Returns:
        The final result or outcome of the agent's execution.
        The structure of the result is specific to the agent implementation.
    """
    pass
teardown async
teardown() -> None

Optional method for any cleanup specific to the agent. The Genie instance itself should be torn down separately by the application.

Source code in src/genie_tooling/agents/base_agent.py
async def teardown(self) -> None:
    """
    Optional method for any cleanup specific to the agent.
    The Genie instance itself should be torn down separately by the application.
    """
    logger.info(f"{self.__class__.__name__} teardown initiated.")
    # Add any agent-specific cleanup here if needed in the future.
    pass

PlanAndExecuteAgent

PlanAndExecuteAgent(genie: Genie, agent_config: Optional[Dict[str, Any]] = None)

Bases: BaseAgent

Implements the Plan-and-Execute agentic loop. 1. Planner: LLM generates a sequence of steps (tool calls) to achieve the goal. Steps can name their outputs for use in subsequent steps. 2. Executor: Executes these steps sequentially, resolving placeholders.

Source code in src/genie_tooling/agents/plan_and_execute_agent.py
def __init__(self, genie: "Genie", agent_config: Optional[Dict[str, Any]] = None):
    super().__init__(genie, agent_config)
    self.planner_system_prompt_id = self.agent_config.get("planner_system_prompt_id", DEFAULT_PLANNER_SYSTEM_PROMPT_ID)
    self.planner_llm_provider_id = self.agent_config.get("planner_llm_provider_id")
    self.tool_formatter_id = self.agent_config.get("tool_formatter_id", "compact_text_formatter_plugin_v1")
    self.max_plan_retries = self.agent_config.get("max_plan_retries", 1)
    self.max_step_retries = self.agent_config.get("max_step_retries", 0)
    self.replan_on_step_failure = self.agent_config.get("replan_on_step_failure", False)

    logger.info(
        f"PlanAndExecuteAgent initialized. Planner Prompt ID: {self.planner_system_prompt_id}, "
        f"Planner LLM: {self.planner_llm_provider_id or 'Genie Default'}"
    )

ReActAgent

ReActAgent(genie: Genie, agent_config: Optional[Dict[str, Any]] = None)

Bases: BaseAgent

Implements the ReAct (Reason-Act) agentic loop.

Source code in src/genie_tooling/agents/react_agent.py
def __init__(self, genie: "Genie", agent_config: Optional[Dict[str, Any]] = None):
    super().__init__(genie, agent_config)
    self.max_iterations = self.agent_config.get("max_iterations", DEFAULT_REACT_MAX_ITERATIONS)
    self.system_prompt_id = self.agent_config.get("system_prompt_id", DEFAULT_REACT_SYSTEM_PROMPT_ID)
    self.llm_provider_id = self.agent_config.get("llm_provider_id")
    self.tool_formatter_id = self.agent_config.get("tool_formatter_id", "compact_text_formatter_plugin_v1")
    self.stop_sequences = self.agent_config.get("stop_sequences", ["Observation:"])
    self.llm_retry_attempts = self.agent_config.get("llm_retry_attempts", 1)
    self.llm_retry_delay = self.agent_config.get("llm_retry_delay_seconds", 2.0)

    logger.info(
        f"ReActAgent initialized. Max iterations: {self.max_iterations}, "
        f"System Prompt ID: {self.system_prompt_id}, LLM Provider: {self.llm_provider_id or 'Genie Default'}"
    )

AgentOutput

Bases: TypedDict

Standardized output structure from an agent's run method.

PlannedStep

Bases: TypedDict

Represents a single step in a generated plan.

ReActObservation

Bases: TypedDict

Represents one cycle of Thought-Action-Observation in ReAct.

CacheProviderPlugin

Bases: Plugin, Protocol

Protocol for a cache provider, designed for async operations.

Functions
get async
get(key: str) -> Optional[Any]

Retrieves an item from the cache. Args: key: The key of the item to retrieve. Returns: The cached item, or None if the key is not found or item is expired.

Source code in src/genie_tooling/cache_providers/abc.py
async def get(self, key: str) -> Optional[Any]:
    """
    Retrieves an item from the cache.
    Args:
        key: The key of the item to retrieve.
    Returns:
        The cached item, or None if the key is not found or item is expired.
    """
    logger.warning(f"CacheProvider '{self.plugin_id}' get method not fully implemented.")
    return None
set async
set(key: str, value: Any, ttl_seconds: Optional[int] = None) -> None

Stores an item in the cache. Args: key: The key under which to store the item. value: The item to store. Should be serializable if cache is external. ttl_seconds: Optional time-to-live in seconds. If None, item may persist indefinitely or use a default TTL defined by the cache provider.

Source code in src/genie_tooling/cache_providers/abc.py
async def set(self, key: str, value: Any, ttl_seconds: Optional[int] = None) -> None:
    """
    Stores an item in the cache.
    Args:
        key: The key under which to store the item.
        value: The item to store. Should be serializable if cache is external.
        ttl_seconds: Optional time-to-live in seconds. If None, item may persist indefinitely
                     or use a default TTL defined by the cache provider.
    """
    logger.warning(f"CacheProvider '{self.plugin_id}' set method not fully implemented.")
    pass
delete async
delete(key: str) -> bool

Deletes an item from the cache. Args: key: The key of the item to delete. Returns: True if the key existed and was deleted, False otherwise.

Source code in src/genie_tooling/cache_providers/abc.py
async def delete(self, key: str) -> bool:
    """
    Deletes an item from the cache.
    Args:
        key: The key of the item to delete.
    Returns:
        True if the key existed and was deleted, False otherwise.
    """
    logger.warning(f"CacheProvider '{self.plugin_id}' delete method not fully implemented.")
    return False
exists async
exists(key: str) -> bool

Checks if a key exists in the cache (and is not expired). Args: key: The key to check. Returns: True if the key exists and is valid, False otherwise.

Source code in src/genie_tooling/cache_providers/abc.py
async def exists(self, key: str) -> bool:
    """
    Checks if a key exists in the cache (and is not expired).
    Args:
        key: The key to check.
    Returns:
        True if the key exists and is valid, False otherwise.
    """
    # Default implementation using get()
    logger.debug(f"CacheProvider '{self.plugin_id}' exists method using default get() check.")
    return await self.get(key) is not None
clear_all async
clear_all() -> bool

Clears all items from the cache managed by this provider. Use with caution, especially for shared caches. Returns: True if the clear operation was successful or attempted, False on failure.

Source code in src/genie_tooling/cache_providers/abc.py
async def clear_all(self) -> bool:
    """
    Clears all items from the cache managed by this provider.
    Use with caution, especially for shared caches.
    Returns:
        True if the clear operation was successful or attempted, False on failure.
    """
    logger.warning(f"CacheProvider '{self.plugin_id}' clear_all method not fully implemented.")
    return False

CodeExecutionResult

Bases: NamedTuple

Standardized result structure for code execution.

CodeExecutorPlugin

Bases: Plugin, Protocol

Protocol for a plugin that executes code, ideally in a sandboxed environment.

Functions
execute_code async
execute_code(
    language: str,
    code: str,
    timeout_seconds: int,
    input_data: Optional[Dict[str, Any]] = None,
) -> CodeExecutionResult

Asynchronously executes the provided code string in the specified language. Implementations should strive for secure execution (sandboxing). Args: language: The programming language of the code. code: The code script to execute. timeout_seconds: Maximum allowed execution time in seconds. input_data: Optional dictionary to make available as variables within the code's scope (e.g., as a global dict named _input or similar, executor-defined). Returns: A CodeExecutionResult NamedTuple.

Source code in src/genie_tooling/code_executors/abc.py
async def execute_code(
    self,
    language: str,
    code: str,
    timeout_seconds: int,
    input_data: Optional[Dict[str, Any]] = None # For passing structured input to the code's execution scope
) -> CodeExecutionResult:
    """
    Asynchronously executes the provided code string in the specified language.
    Implementations should strive for secure execution (sandboxing).
    Args:
        language: The programming language of the code.
        code: The code script to execute.
        timeout_seconds: Maximum allowed execution time in seconds.
        input_data: Optional dictionary to make available as variables within the code's scope
                    (e.g., as a global dict named `_input` or similar, executor-defined).
    Returns:
        A CodeExecutionResult NamedTuple.
    """
    logger.warning(f"CodeExecutor '{self.plugin_id}' execute_code method not fully implemented.")
    return CodeExecutionResult(stdout="", stderr="Not implemented", result=None, error="Executor not implemented", execution_time_ms=0.0)

CommandProcessorPlugin

Bases: Plugin, Protocol

Protocol for a plugin that processes a natural language command to determine which tool to use and what parameters to pass to it.

Functions
setup async
setup(config: Optional[Dict[str, Any]]) -> None

Initializes the command processor. Args: config: Processor-specific configuration dictionary. Expected to contain 'genie_facade: Genie' for accessing other middleware components.

Source code in src/genie_tooling/command_processors/abc.py
async def setup(self, config: Optional[Dict[str, Any]]) -> None:
    """
    Initializes the command processor.
    Args:
        config: Processor-specific configuration dictionary. Expected to contain
                'genie_facade: Genie' for accessing other middleware components.
    """
    await super().setup(config)
    logger.debug(f"CommandProcessorPlugin '{self.plugin_id}': Base setup logic (if any) completed.")
process_command async
process_command(
    command: str,
    conversation_history: Optional[List[ChatMessage]] = None,
    correlation_id: Optional[str] = None,
) -> CommandProcessorResponse

Processes the given command, potentially using conversation history and other Genie components (via the facade provided in setup), to decide on a tool and its parameters.

Parameters:

Name Type Description Default
command str

The natural language command string from the user.

required
conversation_history Optional[List[ChatMessage]]

Optional list of previous ChatMessages in the conversation.

None
correlation_id Optional[str]

Optional ID to link related trace events.

None

Returns:

Type Description
CommandProcessorResponse

A CommandProcessorResponse dictionary.

Source code in src/genie_tooling/command_processors/abc.py
async def process_command(
    self,
    command: str,
    conversation_history: Optional[List[ChatMessage]] = None,
    correlation_id: Optional[str] = None
) -> CommandProcessorResponse:
    """
    Processes the given command, potentially using conversation history and
    other Genie components (via the facade provided in setup), to decide
    on a tool and its parameters.

    Args:
        command: The natural language command string from the user.
        conversation_history: Optional list of previous ChatMessages in the conversation.
        correlation_id: Optional ID to link related trace events.

    Returns:
        A CommandProcessorResponse dictionary.
    """
    logger.error(f"CommandProcessorPlugin '{self.plugin_id}' process_command method not implemented.")
    raise NotImplementedError(f"CommandProcessorPlugin '{self.plugin_id}' does not implement 'process_command'.")

CommandProcessorResponse

Bases: TypedDict

Standardized response from a CommandProcessorPlugin.

PluginManager

PluginManager(plugin_dev_dirs: Optional[List[str]] = None)

Manages discovery, loading, and access to plugins. Implements Hybrid: Entry Points + Configured Dev Directory discovery.

Source code in src/genie_tooling/core/plugin_manager.py
def __init__(self, plugin_dev_dirs: Optional[List[str]] = None):
    self.plugin_dev_dirs = [Path(p).resolve() for p in plugin_dev_dirs] if plugin_dev_dirs else []
    self._plugin_instances: Dict[str, Plugin] = {}
    self._discovered_plugin_classes: Dict[str, Type[Plugin]] = {}
    self._plugin_source_map: Dict[str, str] = {}
    logger.debug(f"PluginManager initialized. Dev dirs: {self.plugin_dev_dirs}")

Chunk

Bases: Protocol

Represents a chunk of a document after splitting.

Document

Bases: Protocol

Represents a loaded document before splitting.

Plugin

Bases: Protocol

Base protocol for all plugins.

Attributes
plugin_id property
plugin_id: str

A unique string identifier for this plugin instance/type.

Functions
setup async
setup(config: Optional[Dict[str, Any]] = None) -> None

Optional asynchronous setup method for plugins.

This method is called by the PluginManager after a plugin is instantiated. It is the primary mechanism for a plugin to receive its configuration.

Parameters:

Name Type Description Default
config Optional[Dict[str, Any]]

A dictionary containing the specific configuration for this plugin instance. This dictionary is sourced from the relevant *_configurations dictionary in MiddlewareConfig, keyed by this plugin's plugin_id.

None
Source code in src/genie_tooling/core/types.py
async def setup(self, config: Optional[Dict[str, Any]] = None) -> None:
    """
    Optional asynchronous setup method for plugins.

    This method is called by the PluginManager after a plugin is instantiated.
    It is the primary mechanism for a plugin to receive its configuration.

    Args:
        config: A dictionary containing the specific configuration for this
            plugin instance. This dictionary is sourced from the relevant
            `*_configurations` dictionary in `MiddlewareConfig`, keyed by this
            plugin's `plugin_id`.
    """
    pass
teardown async
teardown() -> None

Optional asynchronous teardown method for plugins. Called before application shutdown.

Source code in src/genie_tooling/core/types.py
async def teardown(self) -> None:
    """Optional asynchronous teardown method for plugins. Called before application shutdown."""
    pass

RetrievedChunk

Bases: Chunk, Protocol

Represents a chunk retrieved from a vector store, with a relevance score.

StructuredError

Bases: TypedDict

Standardized structure for reporting errors, especially to LLMs.

DefinitionFormatterPlugin

Bases: Plugin, Protocol

Protocol for a plugin that formats a tool's metadata into a specific structure (e.g., for LLMs, for human readability).

Functions
format
format(tool_metadata: Dict[str, Any]) -> Any

Takes the comprehensive metadata from Tool.get_metadata() and transforms it into the specific output format.

Parameters:

Name Type Description Default
tool_metadata Dict[str, Any]

The raw metadata dictionary from a Tool instance.

required

Returns:

Type Description
Any

The formatted definition (e.g., a dict for JSON, a string for text).

Any

The type Any allows flexibility for various output formats.

Source code in src/genie_tooling/definition_formatters/abc.py
def format(self, tool_metadata: Dict[str, Any]) -> Any:
    """
    Takes the comprehensive metadata from Tool.get_metadata()
    and transforms it into the specific output format.

    Args:
        tool_metadata: The raw metadata dictionary from a Tool instance.

    Returns:
        The formatted definition (e.g., a dict for JSON, a string for text).
        The type `Any` allows flexibility for various output formats.
    """
    ...

DocumentLoaderPlugin

Bases: Plugin, Protocol

Loads documents from a source into an async stream of Document objects.

Functions
load async
load(
    source_uri: str, config: Optional[Dict[str, Any]] = None
) -> AsyncIterable[Document]

Loads documents from the given source URI. Args: source_uri: The URI of the data source (e.g., file path, URL, database connection string). config: Loader-specific configuration dictionary. Yields: Document objects.

Source code in src/genie_tooling/document_loaders/abc.py
async def load(self, source_uri: str, config: Optional[Dict[str, Any]] = None) -> AsyncIterable[Document]:
    """
    Loads documents from the given source URI.
    Args:
        source_uri: The URI of the data source (e.g., file path, URL, database connection string).
        config: Loader-specific configuration dictionary.
    Yields:
        Document objects.
    """
    # Example of how to make an async generator that does nothing if not implemented:
    logger.warning(f"DocumentLoaderPlugin '{self.plugin_id}' load method not fully implemented.")
    if False: # pylint: disable=false-condition
        yield # type: ignore
    return

EmbeddingGeneratorPlugin

Bases: Plugin, Protocol

Generates embeddings for an async stream of Chunks.

Functions
embed async
embed(
    chunks: AsyncIterable[Chunk], config: Optional[Dict[str, Any]] = None
) -> AsyncIterable[Tuple[Chunk, EmbeddingVector]]

Generates embeddings for each chunk. Args: chunks: An async iterable of Chunk objects. config: Embedder-specific configuration (e.g., model_name, batch_size, key_provider). Yields: Tuples of (Chunk, EmbeddingVector).

Source code in src/genie_tooling/embedding_generators/abc.py
async def embed(self, chunks: AsyncIterable[Chunk], config: Optional[Dict[str, Any]] = None) -> AsyncIterable[Tuple[Chunk, EmbeddingVector]]:
    """
    Generates embeddings for each chunk.
    Args:
        chunks: An async iterable of Chunk objects.
        config: Embedder-specific configuration (e.g., model_name, batch_size, key_provider).
    Yields:
        Tuples of (Chunk, EmbeddingVector).
    """
    logger.warning(f"EmbeddingGeneratorPlugin '{self.plugin_id}' embed method not fully implemented.")
    if False: # pylint: disable=false-condition
        yield # type: ignore
    return

ErrorFormatter

Bases: Plugin, Protocol

Protocol for formatting StructuredError for different consumers (e.g., LLM, logs).

Functions
format
format(structured_error: StructuredError, target_format: str = 'llm') -> Any

Formats the structured_error. Args: structured_error: The error dictionary produced by an ErrorHandler. target_format: A hint for the desired output format (e.g., "llm", "json", "human_log"). Default is "llm". Returns: The formatted error (e.g., a string for LLM, a dict for JSON). This method is synchronous.

Source code in src/genie_tooling/error_formatters/abc.py
def format(self, structured_error: StructuredError, target_format: str = "llm") -> Any:
    """
    Formats the structured_error.
    Args:
        structured_error: The error dictionary produced by an ErrorHandler.
        target_format: A hint for the desired output format (e.g., "llm", "json", "human_log").
                       Default is "llm".
    Returns:
        The formatted error (e.g., a string for LLM, a dict for JSON).
    This method is synchronous.
    """
    ...

ErrorHandler

Bases: Plugin, Protocol

Protocol for handling exceptions during tool execution and converting them to StructuredError.

Functions
handle
handle(
    exception: Exception, tool: Any, context: Optional[Dict[str, Any]]
) -> StructuredError

Handles an exception that occurred during tool.execute() or related steps. Args: exception: The exception instance caught. tool: The Tool instance that was being executed (typed as Any to avoid circular import). Implementations can cast or use getattr to access tool.identifier. context: Optional context dictionary active during the call. Returns: A StructuredError dictionary. This method is synchronous as error classification is typically CPU-bound.

Source code in src/genie_tooling/error_handlers/abc.py
def handle(self, exception: Exception, tool: Any, context: Optional[Dict[str, Any]]) -> StructuredError:
    """
    Handles an exception that occurred during tool.execute() or related steps.
    Args:
        exception: The exception instance caught.
        tool: The Tool instance that was being executed (typed as Any to avoid circular import).
              Implementations can cast or use getattr to access tool.identifier.
        context: Optional context dictionary active during the call.
    Returns:
        A StructuredError dictionary.
    This method is synchronous as error classification is typically CPU-bound.
    """
    ...

GuardrailPlugin

Bases: Plugin, Protocol

Base protocol for all guardrail plugins.

InputGuardrailPlugin

Bases: GuardrailPlugin, Protocol

Protocol for guardrails that check input data (e.g., prompts, user messages).

Functions
check_input async
check_input(data: Any, context: Optional[Dict[str, Any]] = None) -> GuardrailViolation

Checks input data. Returns: GuardrailViolation: Contains action (allow, block, warn) and reason.

Source code in src/genie_tooling/guardrails/abc.py
async def check_input(self, data: Any, context: Optional[Dict[str, Any]] = None) -> GuardrailViolation:
    """
    Checks input data.
    Returns:
        GuardrailViolation: Contains action (allow, block, warn) and reason.
    """
    logger.warning(f"InputGuardrailPlugin '{self.plugin_id}' check_input method not fully implemented.")
    return GuardrailViolation(action=self.default_action, reason="Not implemented")

OutputGuardrailPlugin

Bases: GuardrailPlugin, Protocol

Protocol for guardrails that check output data (e.g., LLM responses, tool results).

Functions
check_output async
check_output(data: Any, context: Optional[Dict[str, Any]] = None) -> GuardrailViolation

Checks output data. Returns: GuardrailViolation: Contains action (allow, block, warn) and reason.

Source code in src/genie_tooling/guardrails/abc.py
async def check_output(self, data: Any, context: Optional[Dict[str, Any]] = None) -> GuardrailViolation:
    """
    Checks output data.
    Returns:
        GuardrailViolation: Contains action (allow, block, warn) and reason.
    """
    logger.warning(f"OutputGuardrailPlugin '{self.plugin_id}' check_output method not fully implemented.")
    return GuardrailViolation(action=self.default_action, reason="Not implemented")

ToolUsageGuardrailPlugin

Bases: GuardrailPlugin, Protocol

Protocol for guardrails that check tool usage attempts.

Functions
check_tool_usage async
check_tool_usage(
    tool: Tool, params: Dict[str, Any], context: Optional[Dict[str, Any]] = None
) -> GuardrailViolation

Checks if a tool usage attempt is permissible. Returns: GuardrailViolation: Contains action (allow, block, warn) and reason.

Source code in src/genie_tooling/guardrails/abc.py
async def check_tool_usage(self, tool: Tool, params: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> GuardrailViolation:
    """
    Checks if a tool usage attempt is permissible.
    Returns:
        GuardrailViolation: Contains action (allow, block, warn) and reason.
    """
    logger.warning(f"ToolUsageGuardrailPlugin '{self.plugin_id}' check_tool_usage method not fully implemented.")
    return GuardrailViolation(action=self.default_action, reason="Not implemented")

GuardrailViolation

Bases: TypedDict

Represents the outcome of a guardrail check.

HumanApprovalRequestPlugin

Bases: Plugin, Protocol

Protocol for a plugin that requests human approval for an action.

Functions
request_approval async
request_approval(request: ApprovalRequest) -> ApprovalResponse

Requests human approval for a given action or data. Implementations will vary (e.g., CLI prompt, web UI notification, API call).

Source code in src/genie_tooling/hitl/abc.py
async def request_approval(self, request: ApprovalRequest) -> ApprovalResponse:
    """
    Requests human approval for a given action or data.
    Implementations will vary (e.g., CLI prompt, web UI notification, API call).
    """
    logger.warning(f"HumanApprovalRequestPlugin '{self.plugin_id}' request_approval method not fully implemented.")
    # Default to denied if not implemented
    return ApprovalResponse(
        request_id=request.request_id,
        status="denied",
        approver_id="system_default",
        reason="Plugin not implemented."
    )

HITLManager

HITLManager(
    plugin_manager: PluginManager,
    default_approver_id: Optional[str] = None,
    approver_configurations: Optional[Dict[str, Dict[str, Any]]] = None,
)
Source code in src/genie_tooling/hitl/manager.py
def __init__(self, plugin_manager: PluginManager, default_approver_id: Optional[str] = None, approver_configurations: Optional[Dict[str, Dict[str, Any]]] = None):
    self._plugin_manager = plugin_manager
    self._default_approver_id = default_approver_id
    self._approver_configurations = approver_configurations or {}
    self._default_approver_instance: Optional[HumanApprovalRequestPlugin] = None
    self._initialized_default = False
    logger.info("HITLManager initialized.")
Attributes
is_active property
is_active: bool

Returns True if a default approver is configured (and not 'none').

ApprovalRequest

Bases: TypedDict

Represents a request for human approval.

ApprovalResponse

Bases: TypedDict

Represents the response from a human approval request.

InputValidationException

InputValidationException(
    message: str, errors: Any = None, params: Optional[Dict[str, Any]] = None
)

Bases: ValueError

Custom exception for input validation errors, providing more context.

Source code in src/genie_tooling/input_validators/abc.py
def __init__(self, message: str, errors: Any = None, params: Optional[Dict[str,Any]] = None):
    super().__init__(message)
    self.errors = errors
    self.params = params

InputValidator

Bases: Plugin, Protocol

Protocol for input parameter validators.

Functions
validate
validate(params: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]

Validates parameters against a schema. Should raise InputValidationException on failure. May return params (possibly coerced or with defaults applied by validator). This method is synchronous as validation is typically CPU-bound.

Source code in src/genie_tooling/input_validators/abc.py
def validate(self, params: Dict[str, Any], schema: Dict[str, Any]) -> Dict[str, Any]:
    """
    Validates parameters against a schema.
    Should raise InputValidationException on failure.
    May return params (possibly coerced or with defaults applied by validator).
    This method is synchronous as validation is typically CPU-bound.
    """
    ...

InvocationStrategy

Bases: Plugin, Protocol

Protocol for a strategy that defines the complete lifecycle of invoking a tool. This includes validation, execution, transformation, caching, and error handling. All strategies must be designed to be async.

Functions
invoke async
invoke(
    tool: Tool,
    params: Dict[str, Any],
    key_provider: KeyProvider,
    context: Optional[Dict[str, Any]],
    invoker_config: Dict[str, Any],
) -> Any

Executes the full tool invocation lifecycle according to this strategy.

Parameters:

Name Type Description Default
tool Tool

The Tool instance to invoke.

required
params Dict[str, Any]

The parameters for the tool.

required
key_provider KeyProvider

The async key provider.

required
context Optional[Dict[str, Any]]

Optional context dictionary.

required
invoker_config Dict[str, Any]

Configuration from the ToolInvoker, including: - "plugin_manager": PluginManager instance - "validator_id": Optional[str] - "transformer_id": Optional[str] - "error_handler_id": Optional[str] - "error_formatter_id": Optional[str] - "cache_provider_id": Optional[str] - "cache_config": Optional[Dict[str, Any]] - "tracing_manager": Optional[InteractionTracingManager] - "correlation_id": Optional[str]

required

Returns:

Type Description
Any

The result of the tool execution (possibly transformed by an OutputTransformer)

Any

or a structured error (formatted by an ErrorFormatter). The exact type depends

Any

on the formatter's output.

Source code in src/genie_tooling/invocation_strategies/abc.py
async def invoke(
    self,
    tool: Tool,
    params: Dict[str, Any],
    key_provider: KeyProvider,
    context: Optional[Dict[str, Any]],
    invoker_config: Dict[str, Any] # Contains plugin_manager and override IDs for components
) -> Any:
    """
    Executes the full tool invocation lifecycle according to this strategy.

    Args:
        tool: The Tool instance to invoke.
        params: The parameters for the tool.
        key_provider: The async key provider.
        context: Optional context dictionary.
        invoker_config: Configuration from the ToolInvoker, including:
                        - "plugin_manager": PluginManager instance
                        - "validator_id": Optional[str]
                        - "transformer_id": Optional[str]
                        - "error_handler_id": Optional[str]
                        - "error_formatter_id": Optional[str]
                        - "cache_provider_id": Optional[str]
                        - "cache_config": Optional[Dict[str, Any]]
                        - "tracing_manager": Optional[InteractionTracingManager]
                        - "correlation_id": Optional[str]

    Returns:
        The result of the tool execution (possibly transformed by an OutputTransformer)
        or a structured error (formatted by an ErrorFormatter). The exact type depends
        on the formatter's output.
    """
    ...

LLMProviderPlugin

Bases: Plugin, Protocol

Protocol for a plugin that interacts with a Large Language Model provider.

Functions
setup async
setup(config: Optional[Dict[str, Any]]) -> None

Initializes the LLM provider. The 'config' dictionary is expected to contain 'key_provider: KeyProvider' if the specific LLM provider implementation requires API keys.

Source code in src/genie_tooling/llm_providers/abc.py
async def setup(self, config: Optional[Dict[str, Any]]) -> None:
    """
    Initializes the LLM provider.
    The 'config' dictionary is expected to contain 'key_provider: KeyProvider'
    if the specific LLM provider implementation requires API keys.
    """
    await super().setup(config)
    logger.debug(f"LLMProviderPlugin '{getattr(self, 'plugin_id', 'UnknownPluginID')}': Base setup logic (if any) completed.")

ChatMessage

Bases: TypedDict

Represents a single message in a chat conversation. Compatible with OpenAI's ChatCompletion message structure.

LLMChatResponse

Bases: TypedDict

Standardized response for chat completion LLM calls.

LLMCompletionResponse

Bases: TypedDict

Standardized response for text completion LLM calls.

LLMUsageInfo

Bases: TypedDict

Represents token usage information from an LLM response.

ToolCall

Bases: TypedDict

Represents a tool call requested by the LLM. Compatible with OpenAI's tool_calls structure.

ToolCallFunction

Bases: TypedDict

Represents the function to be called within a ToolCall.

LogAdapterPlugin

Bases: Plugin, Protocol

Protocol for a logging/monitoring adapter.

Functions
setup async
setup(config: Dict[str, Any]) -> None

Configures logging handlers or integrates with external monitoring systems. This method is called after the adapter is instantiated. Args: config: Adapter-specific configuration dictionary. May include 'plugin_manager' if this adapter needs to load other plugins (e.g., a RedactorPlugin).

Source code in src/genie_tooling/log_adapters/abc.py
async def setup(self, config: Dict[str, Any]) -> None:
    """
    Configures logging handlers or integrates with external monitoring systems.
    This method is called after the adapter is instantiated.
    Args:
        config: Adapter-specific configuration dictionary. May include 'plugin_manager'
                if this adapter needs to load other plugins (e.g., a RedactorPlugin).
    """
    pass
process_event async
process_event(
    event_type: str,
    data: Dict[str, Any],
    schema_for_data: Optional[Dict[str, Any]] = None,
) -> None

Processes a structured event (e.g., for monitoring or detailed logging). Data should be pre-sanitized by this method or by a configured RedactorPlugin. Args: event_type: A string identifying the type of event (e.g., "tool_invoked", "tool_error"). data: A dictionary containing event-specific data. schema_for_data: Optional JSON schema corresponding to 'data', to aid redaction.

Source code in src/genie_tooling/log_adapters/abc.py
async def process_event(self, event_type: str, data: Dict[str, Any], schema_for_data: Optional[Dict[str, Any]] = None) -> None:
    """
    Processes a structured event (e.g., for monitoring or detailed logging).
    Data should be pre-sanitized by this method or by a configured RedactorPlugin.
    Args:
        event_type: A string identifying the type of event (e.g., "tool_invoked", "tool_error").
        data: A dictionary containing event-specific data.
        schema_for_data: Optional JSON schema corresponding to 'data', to aid redaction.
    """
    pass

ToolLookupService

ToolLookupService(
    tool_manager: ToolManager,
    plugin_manager: PluginManager,
    default_provider_id: Optional[str] = DEFAULT_LOOKUP_PROVIDER_ID,
    default_indexing_formatter_id: Optional[str] = DEFAULT_INDEXING_FORMATTER_ID,
    tracing_manager: Optional[InteractionTracingManager] = None,
)
Source code in src/genie_tooling/lookup/service.py
def __init__(
    self,
    tool_manager: ToolManager,
    plugin_manager: PluginManager,
    default_provider_id: Optional[str] = DEFAULT_LOOKUP_PROVIDER_ID,
    default_indexing_formatter_id: Optional[str] = DEFAULT_INDEXING_FORMATTER_ID,
    tracing_manager: Optional[InteractionTracingManager] = None,
):
    self._tool_manager = tool_manager
    self._plugin_manager = plugin_manager
    self._default_provider_id = default_provider_id
    self._default_indexing_formatter_id = default_indexing_formatter_id
    self._tracing_manager = tracing_manager
    self._tracing_manager = tracing_manager
    self._is_indexed_map: Dict[str, bool] = {}
    logger.info(f"ToolLookupService initialized. Default provider: '{default_provider_id}', default formatter plugin for indexing: '{default_indexing_formatter_id}'. Tracing enabled: {self._tracing_manager is not None}")
Functions
add_or_update_tools async
add_or_update_tools(
    tool_ids: List[str],
    provider_id_override: Optional[str] = None,
    indexing_formatter_id_override: Optional[str] = None,
    provider_config_override: Optional[Dict[str, Any]] = None,
    correlation_id: Optional[str] = None,
) -> None

Adds or updates a list of tools in the specified lookup provider's index. This is an incremental update.

Source code in src/genie_tooling/lookup/service.py
async def add_or_update_tools(
    self,
    tool_ids: List[str],
    provider_id_override: Optional[str] = None,
    indexing_formatter_id_override: Optional[str] = None,
    provider_config_override: Optional[Dict[str, Any]] = None,
    correlation_id: Optional[str] = None
) -> None:
    """
    Adds or updates a list of tools in the specified lookup provider's index.
    This is an incremental update.
    """
    corr_id = correlation_id or str(uuid.uuid4())
    await self._trace("tool_lookup.add_or_update.start", {"tool_ids": tool_ids}, "debug", corr_id)

    target_provider_id = provider_id_override or self._default_provider_id
    if not target_provider_id:
        await self._trace("log.warning", {"message": "No tool lookup provider specified for add/update. Skipping."}, "warning", corr_id)
        return

    formatter_id_to_use = indexing_formatter_id_override or self._default_indexing_formatter_id
    if not formatter_id_to_use:
        await self._trace("log.error", {"message": f"No indexing formatter ID available for provider '{target_provider_id}'. Cannot update index."}, "error", corr_id)
        return

    await self._ensure_provider_is_indexed(target_provider_id, formatter_id_to_use, provider_config_override, correlation_id=corr_id)

    provider_instance = await self._plugin_manager.get_plugin_instance(target_provider_id, config=provider_config_override)
    if not isinstance(provider_instance, ToolLookupProviderPlugin):
        await self._trace("log.error", {"message": f"Tool lookup provider '{target_provider_id}' not found or invalid for update."}, "error", corr_id)
        return

    for tool_id in tool_ids:
        tool_data = await self._get_formatted_tool_data(tool_id, formatter_id_to_use, correlation_id=corr_id)
        if tool_data:
            try:
                await provider_instance.update_tool(tool_id, tool_data, config=provider_config_override)
                await self._trace("tool_lookup.add_or_update.success", {"tool_id": tool_id}, "debug", corr_id)
            except Exception as e_update:
                await self._trace("log.error", {"message": f"Error updating tool '{tool_id}' in provider '{target_provider_id}': {e_update}", "exc_info": True}, "error", corr_id)
remove_tools async
remove_tools(
    tool_ids: List[str],
    provider_id_override: Optional[str] = None,
    provider_config_override: Optional[Dict[str, Any]] = None,
    correlation_id: Optional[str] = None,
) -> None

Removes a list of tools from the specified lookup provider's index.

Source code in src/genie_tooling/lookup/service.py
async def remove_tools(
    self,
    tool_ids: List[str],
    provider_id_override: Optional[str] = None,
    provider_config_override: Optional[Dict[str, Any]] = None,
    correlation_id: Optional[str] = None
) -> None:
    """Removes a list of tools from the specified lookup provider's index."""
    corr_id = correlation_id or str(uuid.uuid4())
    await self._trace("tool_lookup.remove.start", {"tool_ids": tool_ids}, "debug", corr_id)

    target_provider_id = provider_id_override or self._default_provider_id
    if not target_provider_id:
        await self._trace("log.warning", {"message": "No tool lookup provider specified for remove. Skipping."}, "warning", corr_id)
        return

    if not self._is_indexed_map.get(target_provider_id, False):
        await self._trace("log.debug", {"message": f"Index for provider '{target_provider_id}' not built. Skipping tool removal as it's not indexed anyway."}, "debug", corr_id)
        return

    provider_instance = await self._plugin_manager.get_plugin_instance(target_provider_id, config=provider_config_override)
    if not isinstance(provider_instance, ToolLookupProviderPlugin):
        await self._trace("log.error", {"message": f"Tool lookup provider '{target_provider_id}' not found or invalid for removal."}, "error", corr_id)
        return

    for tool_id in tool_ids:
        try:
            await provider_instance.remove_tool(tool_id, config=provider_config_override)
            await self._trace("tool_lookup.remove.success", {"tool_id": tool_id}, "debug", corr_id)
        except Exception as e_remove:
            await self._trace("log.error", {"message": f"Error removing tool '{tool_id}' from provider '{target_provider_id}': {e_remove}", "exc_info": True}, "error", corr_id)

RankedToolResult

RankedToolResult(
    tool_identifier: str,
    score: float,
    matched_tool_data: Optional[Dict[str, Any]] = None,
    description_snippet: Optional[str] = None,
    matched_keywords: Optional[List[str]] = None,
    similarity_score_details: Optional[Dict[str, float]] = None,
)

Represents a tool found by a lookup provider, with diagnostic info.

Source code in src/genie_tooling/lookup/types.py
def __init__(
    self,
    tool_identifier: str,
    score: float,
    matched_tool_data: Optional[Dict[str, Any]] = None,
    description_snippet: Optional[str] = None,
    matched_keywords: Optional[List[str]] = None,
    similarity_score_details: Optional[Dict[str, float]] = None,
):
    self.tool_identifier: str = tool_identifier
    self.score: float = score # Relevance score (higher is better)
    self.matched_tool_data: Optional[Dict[str, Any]] = matched_tool_data or {}
    self.description_snippet: Optional[str] = description_snippet
    self.matched_keywords: Optional[List[str]] = matched_keywords
    self.similarity_score_details: Optional[Dict[str, float]] = similarity_score_details

InteractionTracerPlugin

Bases: Plugin, Protocol

Protocol for a plugin that records interaction traces.

Functions
record_trace async
record_trace(event: TraceEvent) -> None

Records a single trace event. Implementations should handle batching or asynchronous sending if needed.

Source code in src/genie_tooling/observability/abc.py
async def record_trace(self, event: TraceEvent) -> None:
    """
    Records a single trace event.
    Implementations should handle batching or asynchronous sending if needed.
    """
    logger.warning(f"InteractionTracerPlugin '{self.plugin_id}' record_trace method not fully implemented.")
    pass

TraceEvent

Bases: TypedDict

Represents a single event to be traced.

OutputTransformer

Bases: Plugin, Protocol

Protocol for output transformers.

Functions
transform
transform(output: Any, schema: Dict[str, Any]) -> Any

Transforms raw tool output according to an output_schema or desired format. Raises OutputTransformationException on failure. This method is synchronous as transformation is typically CPU-bound.

Source code in src/genie_tooling/output_transformers/abc.py
def transform(self, output: Any, schema: Dict[str, Any]) -> Any:
    """
    Transforms raw tool output according to an output_schema or desired format.
    Raises OutputTransformationException on failure.
    This method is synchronous as transformation is typically CPU-bound.
    """
    ...

LLMOutputParserPlugin

Bases: Plugin, Protocol

Protocol for a plugin that parses the text output of an LLM.

Functions
parse
parse(text_output: str, schema: Optional[Any] = None) -> ParsedOutput

Parses the LLM's text output. Args: text_output: The raw text string from the LLM. schema: Optional schema (e.g., Pydantic model, JSON schema) to guide parsing. Returns: The parsed data, which could be a dict, list, Pydantic model instance, etc. Raises: ValueError or a custom parsing exception if parsing fails.

Source code in src/genie_tooling/prompts/llm_output_parsers/abc.py
def parse(self, text_output: str, schema: Optional[Any] = None) -> ParsedOutput:
    """
    Parses the LLM's text output.
    Args:
        text_output: The raw text string from the LLM.
        schema: Optional schema (e.g., Pydantic model, JSON schema) to guide parsing.
    Returns:
        The parsed data, which could be a dict, list, Pydantic model instance, etc.
    Raises:
        ValueError or a custom parsing exception if parsing fails.
    """
    logger.warning(f"LLMOutputParserPlugin '{self.plugin_id}' parse method not implemented.")
    # Basic fallback: return as is, or attempt JSON if it looks like it
    if text_output.strip().startswith("{") and text_output.strip().endswith("}"):
        try:
            return json.loads(text_output) # type: ignore
        except Exception:
            pass
    return text_output # type: ignore

RedactorPlugin

Bases: Plugin, Protocol

Protocol for a data redaction plugin.

Functions
sanitize
sanitize(data: Any, schema_hints: Optional[Dict[str, Any]] = None) -> Any

Sanitizes data to remove or mask sensitive information. This method is synchronous as redaction is typically CPU-bound. Args: data: The data to sanitize (can be any Python object, commonly dicts or lists). schema_hints: Optional JSON schema corresponding to 'data'. If provided, the redactor can use schema annotations (e.g., 'x-sensitive', 'format') to guide redaction. Returns: The sanitized data, with sensitive parts replaced or removed.

Source code in src/genie_tooling/redactors/abc.py
def sanitize(self, data: Any, schema_hints: Optional[Dict[str, Any]] = None) -> Any:
    """
    Sanitizes data to remove or mask sensitive information.
    This method is synchronous as redaction is typically CPU-bound.
    Args:
        data: The data to sanitize (can be any Python object, commonly dicts or lists).
        schema_hints: Optional JSON schema corresponding to 'data'. If provided,
                      the redactor can use schema annotations (e.g., 'x-sensitive', 'format')
                      to guide redaction.
    Returns:
        The sanitized data, with sensitive parts replaced or removed.
    """
    logger.warning(f"Redactor '{self.plugin_id}' sanitize method not fully implemented. Returning data as is.")
    return data # Default: no redaction

RetrieverPlugin

Bases: Plugin, Protocol

Retrieves relevant chunks based on a query, typically by composing an embedder and a vector store.

Functions
retrieve async
retrieve(
    query: str, top_k: int, config: Optional[Dict[str, Any]] = None
) -> List[RetrievedChunk]

Retrieves relevant chunks for the given query. Args: query: The natural language query string. top_k: The number of top results to return. config: Retriever-specific configuration. This should include details for the embedder (e.g., "embedder_id", "embedder_config") and vector store (e.g., "vector_store_id", "vector_store_config") that this retriever instance will use. It should also contain "plugin_manager". Returns: A list of RetrievedChunk objects.

Source code in src/genie_tooling/retrievers/abc.py
async def retrieve(self, query: str, top_k: int, config: Optional[Dict[str, Any]] = None) -> List[RetrievedChunk]:
    """
    Retrieves relevant chunks for the given query.
    Args:
        query: The natural language query string.
        top_k: The number of top results to return.
        config: Retriever-specific configuration. This should include details for
                  the embedder (e.g., "embedder_id", "embedder_config") and
                  vector store (e.g., "vector_store_id", "vector_store_config")
                  that this retriever instance will use. It should also contain "plugin_manager".
    Returns:
        A list of RetrievedChunk objects.
    """
    logger.warning(f"RetrieverPlugin '{self.plugin_id}' retrieve method not fully implemented.")
    return []

KeyProvider

Bases: Protocol

Protocol for a component that securely provides API keys. This protocol must be implemented by the consuming application. The middleware itself does not store or manage API keys directly. All methods must be async.

Functions
get_key async
get_key(key_name: str) -> Optional[str]

Asynchronously retrieves the API key value for the given key name.

Parameters:

Name Type Description Default
key_name str

The logical name of the API key required by a tool (e.g., "OPENWEATHERMAP_API_KEY", "OPENAI_API_KEY").

required

Returns:

Type Description
Optional[str]

The API key string if found and accessible, otherwise None.

Optional[str]

Implementations should fetch keys securely (e.g., from environment

Optional[str]

variables, a secrets manager/vault, or application configuration).

Optional[str]

It should log (at debug level) if a key is requested but not found,

Optional[str]

but avoid logging the key value itself.

Source code in src/genie_tooling/security/key_provider.py
async def get_key(self, key_name: str) -> Optional[str]:
    """
    Asynchronously retrieves the API key value for the given key name.

    Args:
        key_name: The logical name of the API key required by a tool
                  (e.g., "OPENWEATHERMAP_API_KEY", "OPENAI_API_KEY").

    Returns:
        The API key string if found and accessible, otherwise None.
        Implementations should fetch keys securely (e.g., from environment
        variables, a secrets manager/vault, or application configuration).
        It should log (at debug level) if a key is requested but not found,
        but avoid logging the key value itself.
    """
    # Example of what an implementation might log (this is a protocol, so no actual logic here)
    # logger.debug(f"KeyProvider: Request received for key '{key_name}'.")
    # value = self._internal_secure_key_storage.get(key_name)
    # if not value:
    #     logger.warning(f"KeyProvider: Key '{key_name}' not found in secure storage.")
    # return value
    ... # Indicates abstract method in Protocol

DistributedTaskQueuePlugin

Bases: Plugin, Protocol

Protocol for a plugin that interacts with a distributed task queue system.

Functions
submit_task async
submit_task(
    task_name: str,
    args: Tuple = (),
    kwargs: Optional[Dict[str, Any]] = None,
    queue_name: Optional[str] = None,
    task_options: Optional[Dict[str, Any]] = None,
) -> str

Submits a task to the distributed queue. Returns: str: The unique ID of the submitted task. Raises: Exception: If submission fails.

Source code in src/genie_tooling/task_queues/abc.py
async def submit_task(
    self,
    task_name: str, # Name of the task registered with the queue system
    args: Tuple = (),
    kwargs: Optional[Dict[str, Any]] = None,
    queue_name: Optional[str] = None, # Specific queue to send to
    task_options: Optional[Dict[str, Any]] = None # Options like countdown, eta, priority
) -> str:
    """
    Submits a task to the distributed queue.
    Returns:
        str: The unique ID of the submitted task.
    Raises:
        Exception: If submission fails.
    """
    logger.error(f"DistributedTaskQueuePlugin '{self.plugin_id}' submit_task not implemented.")
    raise NotImplementedError
get_task_status async
get_task_status(task_id: str, queue_name: Optional[str] = None) -> TaskStatus

Gets the status of a previously submitted task.

Source code in src/genie_tooling/task_queues/abc.py
async def get_task_status(self, task_id: str, queue_name: Optional[str] = None) -> TaskStatus:
    """Gets the status of a previously submitted task."""
    logger.warning(f"DistributedTaskQueuePlugin '{self.plugin_id}' get_task_status not implemented.")
    return "unknown"
get_task_result async
get_task_result(
    task_id: str,
    queue_name: Optional[str] = None,
    timeout_seconds: Optional[float] = None,
) -> Any

Retrieves the result of a completed task. May block until the task is complete or timeout occurs. Raises: TimeoutError: If timeout is specified and exceeded. Exception: If task failed or other error.

Source code in src/genie_tooling/task_queues/abc.py
async def get_task_result(
    self, task_id: str, queue_name: Optional[str] = None, timeout_seconds: Optional[float] = None
) -> Any:
    """
    Retrieves the result of a completed task.
    May block until the task is complete or timeout occurs.
    Raises:
        TimeoutError: If timeout is specified and exceeded.
        Exception: If task failed or other error.
    """
    logger.error(f"DistributedTaskQueuePlugin '{self.plugin_id}' get_task_result not implemented.")
    raise NotImplementedError
revoke_task async
revoke_task(
    task_id: str, queue_name: Optional[str] = None, terminate: bool = False
) -> bool

Revokes (cancels) a pending or running task. Args: terminate: If True, attempt to terminate a running task. Returns: True if revocation was successful or task was already completed/revoked.

Source code in src/genie_tooling/task_queues/abc.py
async def revoke_task(self, task_id: str, queue_name: Optional[str] = None, terminate: bool = False) -> bool:
    """
    Revokes (cancels) a pending or running task.
    Args:
        terminate: If True, attempt to terminate a running task.
    Returns:
        True if revocation was successful or task was already completed/revoked.
    """
    logger.warning(f"DistributedTaskQueuePlugin '{self.plugin_id}' revoke_task not implemented.")
    return False

TextSplitterPlugin

Bases: Plugin, Protocol

Splits an async stream of Documents into an async stream of Chunks.

Functions
split async
split(
    documents: AsyncIterable[Document], config: Optional[Dict[str, Any]] = None
) -> AsyncIterable[Chunk]

Splits documents into smaller chunks. Args: documents: An async iterable of Document objects. config: Splitter-specific configuration (e.g., chunk_size, chunk_overlap). Yields: Chunk objects.

Source code in src/genie_tooling/text_splitters/abc.py
async def split(self, documents: AsyncIterable[Document], config: Optional[Dict[str, Any]] = None) -> AsyncIterable[Chunk]:
    """
    Splits documents into smaller chunks.
    Args:
        documents: An async iterable of Document objects.
        config: Splitter-specific configuration (e.g., chunk_size, chunk_overlap).
    Yields:
        Chunk objects.
    """
    logger.warning(f"TextSplitterPlugin '{self.plugin_id}' split method not fully implemented.")
    if False: # pylint: disable=false-condition
        yield # type: ignore
    return

TokenUsageRecorderPlugin

Bases: Plugin, Protocol

Protocol for a plugin that records LLM token usage.

Functions
record_usage async
record_usage(record: TokenUsageRecord) -> None

Records a single token usage event.

Source code in src/genie_tooling/token_usage/abc.py
async def record_usage(self, record: TokenUsageRecord) -> None:
    """Records a single token usage event."""
    logger.warning(f"TokenUsageRecorderPlugin '{self.plugin_id}' record_usage method not fully implemented.")
    pass
get_summary async
get_summary(filter_criteria: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Retrieves a summary of token usage. The structure of the summary is implementation-dependent.

Source code in src/genie_tooling/token_usage/abc.py
async def get_summary(self, filter_criteria: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """
    Retrieves a summary of token usage.
    The structure of the summary is implementation-dependent.
    """
    logger.warning(f"TokenUsageRecorderPlugin '{self.plugin_id}' get_summary method not fully implemented.")
    return {"error": "Not implemented"}
clear_records async
clear_records(filter_criteria: Optional[Dict[str, Any]] = None) -> bool

Clears recorded usage data, optionally based on criteria.

Source code in src/genie_tooling/token_usage/abc.py
async def clear_records(self, filter_criteria: Optional[Dict[str, Any]] = None) -> bool:
    """Clears recorded usage data, optionally based on criteria."""
    logger.warning(f"TokenUsageRecorderPlugin '{self.plugin_id}' clear_records method not fully implemented.")
    return False

TokenUsageRecord

Bases: TypedDict

Represents a single LLM token usage event.

ToolLookupProviderPlugin

Bases: Plugin, Protocol

Protocol for a plugin that finds relevant tools based on a query.

Functions
index_tools async
index_tools(
    tools_data: List[Dict[str, Any]], config: Optional[Dict[str, Any]] = None
) -> None

Builds or completely replaces an internal index using formatted tool data. This is for full, batch re-indexing. For dynamic updates, use add/update/remove_tool.

Source code in src/genie_tooling/tool_lookup_providers/abc.py
async def index_tools(self, tools_data: List[Dict[str, Any]], config: Optional[Dict[str, Any]] = None) -> None:
    """
    Builds or completely replaces an internal index using formatted tool data.
    This is for full, batch re-indexing. For dynamic updates, use add/update/remove_tool.
    """
    logger.warning(f"ToolLookupProvider '{self.plugin_id}' index_tools method not fully implemented.")
    pass
add_tool async
add_tool(tool_data: Dict[str, Any], config: Optional[Dict[str, Any]] = None) -> bool

Adds a single tool to the index. Returns: True if the tool was added successfully, False otherwise.

Source code in src/genie_tooling/tool_lookup_providers/abc.py
async def add_tool(self, tool_data: Dict[str, Any], config: Optional[Dict[str, Any]] = None) -> bool:
    """
    Adds a single tool to the index.
    Returns:
        True if the tool was added successfully, False otherwise.
    """
    logger.warning(f"ToolLookupProvider '{self.plugin_id}' add_tool method not implemented.")
    return False
update_tool async
update_tool(
    tool_id: str, tool_data: Dict[str, Any], config: Optional[Dict[str, Any]] = None
) -> bool

Updates an existing tool in the index. This may be an add/overwrite operation. Returns: True if the tool was updated successfully, False otherwise.

Source code in src/genie_tooling/tool_lookup_providers/abc.py
async def update_tool(self, tool_id: str, tool_data: Dict[str, Any], config: Optional[Dict[str, Any]] = None) -> bool:
    """
    Updates an existing tool in the index. This may be an add/overwrite operation.
    Returns:
        True if the tool was updated successfully, False otherwise.
    """
    logger.warning(f"ToolLookupProvider '{self.plugin_id}' update_tool method not implemented.")
    return False
remove_tool async
remove_tool(tool_id: str, config: Optional[Dict[str, Any]] = None) -> bool

Removes a single tool from the index by its ID. Returns: True if the tool was removed or did not exist, False on failure.

Source code in src/genie_tooling/tool_lookup_providers/abc.py
async def remove_tool(self, tool_id: str, config: Optional[Dict[str, Any]] = None) -> bool:
    """
    Removes a single tool from the index by its ID.
    Returns:
        True if the tool was removed or did not exist, False on failure.
    """
    logger.warning(f"ToolLookupProvider '{self.plugin_id}' remove_tool method not implemented.")
    return False
find_tools async
find_tools(
    natural_language_query: str, top_k: int = 5, config: Optional[Dict[str, Any]] = None
) -> List[RankedToolResult]

Searches the indexed tools based on the natural_language_query. Returns: A list of RankedToolResult objects, sorted by relevance (highest score first).

Source code in src/genie_tooling/tool_lookup_providers/abc.py
async def find_tools(
    self,
    natural_language_query: str,
    top_k: int = 5,
    config: Optional[Dict[str, Any]] = None
) -> List[RankedToolResult]:
    """
    Searches the indexed tools based on the natural_language_query.
    Returns:
        A list of RankedToolResult objects, sorted by relevance (highest score first).
    """
    logger.warning(f"ToolLookupProvider '{self.plugin_id}' find_tools method not fully implemented.")
    return []

ToolPlugin

Bases: Plugin, Protocol

Protocol for a tool that can be executed by the middleware. All tools must be designed to be async.

Attributes
identifier property
identifier: str

A unique string identifier for this tool.

Functions
get_metadata async
get_metadata() -> Dict[str, Any]

Returns comprehensive metadata about the tool. This metadata is crucial for tool discovery, LLM function calling, and UI display.

Expected structure: { "identifier": str, (matches self.identifier) "name": str, (human-friendly name) "description_human": str, (detailed for developers/UI) "description_llm": str, (concise, token-efficient for LLM prompts/function descriptions) "input_schema": Dict[str, Any], (JSON Schema for tool parameters) "output_schema": Dict[str, Any], (JSON Schema for tool's expected output structure) "key_requirements": List[Dict[str, str]], (e.g., [{"name": "API_KEY_NAME", "description": "Purpose of key"}]) "tags": List[str], (for categorization, e.g., ["weather", "api", "location"]) "version": str, (e.g., "1.0.0") "cacheable": bool, (optional, hints if tool output can be cached, default False) "cache_ttl_seconds": Optional[int] (optional, default TTL if cacheable) }

Source code in src/genie_tooling/tools/abc.py
async def get_metadata(self) -> Dict[str, Any]:
    """
    Returns comprehensive metadata about the tool.
    This metadata is crucial for tool discovery, LLM function calling, and UI display.

    Expected structure:
    {
        "identifier": str, (matches self.identifier)
        "name": str, (human-friendly name)
        "description_human": str, (detailed for developers/UI)
        "description_llm": str, (concise, token-efficient for LLM prompts/function descriptions)
        "input_schema": Dict[str, Any], (JSON Schema for tool parameters)
        "output_schema": Dict[str, Any], (JSON Schema for tool's expected output structure)
        "key_requirements": List[Dict[str, str]], (e.g., [{"name": "API_KEY_NAME", "description": "Purpose of key"}])
        "tags": List[str], (for categorization, e.g., ["weather", "api", "location"])
        "version": str, (e.g., "1.0.0")
        "cacheable": bool, (optional, hints if tool output can be cached, default False)
        "cache_ttl_seconds": Optional[int] (optional, default TTL if cacheable)
    }
    """
    ...
execute async
execute(
    params: Dict[str, Any], key_provider: KeyProvider, context: Dict[str, Any]
) -> Any

Executes the tool with the given parameters. Must be an async method.

Parameters:

Name Type Description Default
params Dict[str, Any]

Validated parameters for the tool, conforming to its input_schema.

required
key_provider KeyProvider

An async key provider instance for fetching necessary API keys.

required
context Dict[str, Any]

A dictionary carrying session or request-specific data. This can include framework-level information (e.g., 'correlation_id', 'otel_context' for distributed tracing, 'genie_framework_instance') as well as application-specific data passed into methods like genie.run_command(..., context_for_tools=...).

required

Returns:

Type Description
Any

The result of the tool execution. The structure should align with output_schema.

Any

If an error occurs during execution that the tool handles, it should still

Any

return a structured response, possibly including an 'error' field, conforming

Any

to its output_schema. Unhandled exceptions will be caught by the InvocationStrategy.

Source code in src/genie_tooling/tools/abc.py
async def execute(
    self,
    params: Dict[str, Any],
    key_provider: KeyProvider,
    context: Dict[str, Any]
) -> Any:
    """
    Executes the tool with the given parameters.
    Must be an async method.

    Args:
        params: Validated parameters for the tool, conforming to its input_schema.
        key_provider: An async key provider instance for fetching necessary API keys.
        context: A dictionary carrying session or request-specific data.
            This can include framework-level information (e.g., 'correlation_id',
            'otel_context' for distributed tracing, 'genie_framework_instance')
            as well as application-specific data passed into methods like
            `genie.run_command(..., context_for_tools=...)`.

    Returns:
        The result of the tool execution. The structure should align with output_schema.
        If an error occurs during execution that the tool handles, it should still
        return a structured response, possibly including an 'error' field, conforming
        to its output_schema. Unhandled exceptions will be caught by the InvocationStrategy.
    """
    ...

ToolManager

ToolManager(
    plugin_manager: PluginManager,
    tracing_manager: Optional[InteractionTracingManager] = None,
)
Source code in src/genie_tooling/tools/manager.py
def __init__(self, plugin_manager: PluginManager, tracing_manager: Optional[InteractionTracingManager] = None):
    self._plugin_manager = plugin_manager
    self._tracing_manager = tracing_manager
    self._tools: Dict[str, Tool] = {}
    self._tool_initial_configs: Dict[str, Dict[str, Any]] = {} # Stores the initial tool_configurations
    logger.debug("ToolManager initialized.")
Functions
register_decorated_tools async
register_decorated_tools(functions: List[Callable], auto_enable: bool)

Processes a list of @tool decorated functions, enabling them based on the auto_enable flag and whether they are listed in the initial tool_configurations.

Source code in src/genie_tooling/tools/manager.py
async def register_decorated_tools(self, functions: List[Callable], auto_enable: bool):
    """
    Processes a list of @tool decorated functions, enabling them based on the auto_enable flag
    and whether they are listed in the initial tool_configurations.
    """
    from genie_tooling.genie import (
        FunctionToolWrapper,
    )
    registered_count = 0
    for func_item in functions:
        metadata = getattr(func_item, "_tool_metadata_", None)
        original_func_to_call = getattr(func_item, "_original_function_", func_item)

        if not (metadata and isinstance(metadata, dict) and callable(original_func_to_call)):
            await self._trace("log.warning", {"message": f"Function '{getattr(func_item, '__name__', str(func_item))}' not @tool decorated. Skipping."})
            continue

        tool_wrapper = FunctionToolWrapper(original_func_to_call, metadata)
        tool_id = tool_wrapper.identifier

        if tool_id in self._tools:
            # This case means a class-based tool with the same identifier was already loaded
            # via tool_configurations. We honor the explicitly configured class-based tool.
            await self._trace("log.debug", {"message": f"Tool '{tool_id}' from decorated function was already loaded (e.g., as a class-based plugin via tool_configurations). Explicit/prior loading takes precedence."})
            continue

        # Determine if the tool should be enabled:
        # 1. If auto_enable is True.
        # 2. OR if auto_enable is False, BUT the tool_id is present in self._tool_initial_configs
        #    (meaning it was listed in the `tool_configurations` dict passed to Genie.create).
        should_enable_this_tool = auto_enable or (tool_id in self._tool_initial_configs)

        if should_enable_this_tool:
            self._tools[tool_id] = tool_wrapper
            registered_count += 1
            if auto_enable: # Logged as auto-enabled
                await self._trace("log.info", {"message": f"Auto-enabled tool '{tool_id}' from decorated function '{func_item.__name__}'."})
            else: # Logged as explicitly enabled via tool_configurations
                await self._trace("log.info", {"message": f"Explicitly enabled tool '{tool_id}' from decorated function '{func_item.__name__}' via tool_configurations."})
        else: # auto_enable is False AND tool_id is NOT in tool_configurations
            await self._trace("log.warning", {"message": f"Tool '{tool_id}' from decorated function '{func_item.__name__}' was registered but is NOT active. To enable it, add '{tool_id}' to the `tool_configurations` dictionary in MiddlewareConfig."})

    if registered_count > 0 and not auto_enable: # Log summary if any tools were enabled explicitly this way
        await self._trace("log.info", {"message": f"Explicitly enabled {registered_count} decorated tools listed in tool_configurations."})
    elif registered_count > 0 and auto_enable:
        await self._trace("log.info", {"message": f"Auto-enabled {registered_count} decorated tools."})

VectorStorePlugin

Bases: Plugin, Protocol

Interface for interacting with a vector database.

Functions
add async
add(
    embeddings: AsyncIterable[Tuple[Chunk, EmbeddingVector]],
    config: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]

Adds chunks and their embeddings to the vector store. Should handle batching internally if the input stream is large. Args: embeddings: An async iterable of (Chunk, EmbeddingVector) tuples. config: Vector store-specific configuration (e.g., collection name, batch_size). Returns: A dictionary with status, e.g., {"added_count": int, "errors": List[str]}

Source code in src/genie_tooling/vector_stores/abc.py
async def add(self, embeddings: AsyncIterable[Tuple[Chunk, EmbeddingVector]], config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: # Returns status/count
    """
    Adds chunks and their embeddings to the vector store.
    Should handle batching internally if the input stream is large.
    Args:
        embeddings: An async iterable of (Chunk, EmbeddingVector) tuples.
        config: Vector store-specific configuration (e.g., collection name, batch_size).
    Returns:
        A dictionary with status, e.g., {"added_count": int, "errors": List[str]}
    """
    logger.warning(f"VectorStorePlugin '{self.plugin_id}' add method not fully implemented.")
    return {"added_count": 0, "errors": ["Not implemented"]}
search async
search(
    query_embedding: EmbeddingVector,
    top_k: int,
    filter_metadata: Optional[Dict[str, Any]] = None,
    config: Optional[Dict[str, Any]] = None,
) -> List[RetrievedChunk]

Searches the vector store for chunks similar to the query_embedding. Args: query_embedding: The embedding vector of the query. top_k: The number of top results to return. filter_metadata: Optional metadata filter to apply during search. config: Search-specific configuration. Returns: A list of RetrievedChunk objects.

Source code in src/genie_tooling/vector_stores/abc.py
async def search(self, query_embedding: EmbeddingVector, top_k: int, filter_metadata: Optional[Dict[str, Any]] = None, config: Optional[Dict[str, Any]] = None) -> List[RetrievedChunk]:
    """
    Searches the vector store for chunks similar to the query_embedding.
    Args:
        query_embedding: The embedding vector of the query.
        top_k: The number of top results to return.
        filter_metadata: Optional metadata filter to apply during search.
        config: Search-specific configuration.
    Returns:
        A list of RetrievedChunk objects.
    """
    logger.warning(f"VectorStorePlugin '{self.plugin_id}' search method not fully implemented.")
    return []
delete async
delete(
    ids: Optional[List[str]] = None,
    filter_metadata: Optional[Dict[str, Any]] = None,
    delete_all: bool = False,
    config: Optional[Dict[str, Any]] = None,
) -> bool

Deletes items from the vector store. Args: ids: Optional list of chunk IDs to delete. filter_metadata: Optional metadata filter to select items for deletion. delete_all: If True, delete all items in the collection/store. config: Deletion-specific configuration. Returns: True if deletion was successful (or partially successful), False otherwise.

Source code in src/genie_tooling/vector_stores/abc.py
async def delete(self, ids: Optional[List[str]] = None, filter_metadata: Optional[Dict[str, Any]] = None, delete_all: bool = False, config: Optional[Dict[str, Any]] = None) -> bool:
    """
    Deletes items from the vector store.
    Args:
        ids: Optional list of chunk IDs to delete.
        filter_metadata: Optional metadata filter to select items for deletion.
        delete_all: If True, delete all items in the collection/store.
        config: Deletion-specific configuration.
    Returns:
        True if deletion was successful (or partially successful), False otherwise.
    """
    logger.warning(f"VectorStorePlugin '{self.plugin_id}' delete method not fully implemented.")
    return False

Functions

tool

tool(func: Callable) -> Callable

Decorator to transform a Python function into a Genie-compatible Tool.

Source code in src/genie_tooling/decorators.py
def tool(func: Callable) -> Callable:
    """Decorator to transform a Python function into a Genie-compatible Tool."""
    globalns = getattr(func, "__globals__", {})
    try:
        type_hints = get_type_hints(func, globalns=globalns)
    except NameError as e:
        try:
            type_hints = get_type_hints(func)
        except Exception:
            type_hints = {}
            print(
                f"Warning: Could not fully resolve type hints for {func.__name__} due to {e}. Schemas might be incomplete."
            )

    sig = inspect.signature(func)
    docstring = inspect.getdoc(func) or ""
    main_description = docstring.split("\n\n")[0].strip() or f"Executes the '{func.__name__}' tool."
    param_descriptions_from_doc = _parse_docstring_for_params(docstring)
    properties: Dict[str, Any] = {}
    required_params: List[str] = []

    for name, param in sig.parameters.items():
        if name in ("self", "cls") or param.kind in (
            inspect.Parameter.VAR_POSITIONAL,
            inspect.Parameter.VAR_KEYWORD,
        ):
            continue

        param_py_type_hint = type_hints.get(name, Any)
        if isinstance(param_py_type_hint, str):
            try:
                param_py_type_hint = ForwardRef(param_py_type_hint)._evaluate(  # type: ignore
                    globalns, {}, recursive_guard=frozenset()
                )
            except Exception:
                param_py_type_hint = Any

        is_optional_from_union_type = False
        actual_param_type_for_schema = param_py_type_hint
        origin = get_origin(param_py_type_hint)
        args = get_args(param_py_type_hint)

        if origin is Union and type(None) in (args or []):
            is_optional_from_union_type = True
            if args:
                non_none_args = [t for t in args if t is not type(None)]
                if len(non_none_args) == 1:
                    actual_param_type_for_schema = non_none_args[0]
                else:
                    actual_param_type_for_schema = Union[tuple(non_none_args)]  # Keep as Union of non-None

        param_schema_def = _map_type_to_json_schema(actual_param_type_for_schema)
        # --- FIX: Default to 'string' if schema is empty (from Any type) ---
        if not param_schema_def:
            param_schema_def["type"] = "string"

        param_schema_def["description"] = param_descriptions_from_doc.get(name, f"Parameter '{name}'.")

        # Handle optionality by adding "null" to the type list
        if is_optional_from_union_type:
            if "type" in param_schema_def and isinstance(param_schema_def["type"], str):
                param_schema_def["type"] = [param_schema_def["type"], "null"]
            elif "type" in param_schema_def and isinstance(param_schema_def["type"], list):
                if "null" not in param_schema_def["type"]:
                    param_schema_def["type"].append("null")

        if param.default is inspect.Parameter.empty:
            if not is_optional_from_union_type and name not in FRAMEWORK_INJECTED_PARAMS:
                required_params.append(name)
        else:
            param_schema_def["default"] = param.default

        properties[name] = param_schema_def

    input_schema: Dict[str, Any] = {"type": "object", "properties": properties}
    if required_params:
        input_schema["required"] = required_params

    return_py_type_hint = type_hints.get("return", Any)
    if isinstance(return_py_type_hint, str):
        try:
            return_py_type_hint = ForwardRef(return_py_type_hint)._evaluate(  # type: ignore
                globalns, {}, recursive_guard=frozenset()
            )
        except Exception:
            return_py_type_hint = Any

    actual_return_type_for_schema = return_py_type_hint
    ret_origin = get_origin(return_py_type_hint)
    ret_args = get_args(return_py_type_hint)
    if ret_origin is Union and type(None) in (ret_args or []):
        if ret_args:
            actual_return_type_for_schema = next((t for t in ret_args if t is not type(None)), Any)

    output_schema_prop_def = _map_type_to_json_schema(actual_return_type_for_schema)
    if not output_schema_prop_def:
        output_schema_prop_def = {"type": "object"}

    output_schema: Dict[str, Any] = {"type": "object", "properties": {"result": output_schema_prop_def}}
    if (
        output_schema_prop_def.get("type") != "null"
        and not (
            isinstance(output_schema_prop_def.get("type"), list)
            and "null" in output_schema_prop_def["type"]
            and len(output_schema_prop_def["type"]) == 1
        )
        and output_schema_prop_def != {}
    ):
        output_schema["required"] = ["result"]

    tool_metadata = {
        "identifier": func.__name__,
        "name": func.__name__.replace("_", " ").title(),
        "description_human": main_description,
        "description_llm": main_description,
        "input_schema": input_schema,
        "output_schema": output_schema,
        "key_requirements": [],
        "tags": ["decorated_tool"],
        "version": "1.0.0",
        "cacheable": False,
    }

    if inspect.iscoroutinefunction(func):

        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            return await func(*args, **kwargs)

    else:

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            return func(*args, **kwargs)

    wrapper._tool_metadata_ = tool_metadata  # type: ignore
    wrapper._original_function_ = func  # type: ignore
    return wrapper