Skip to content

Commit 4f7147a

Browse files
authored
Merge branch 'main' into fix-tool-retry-error-message
2 parents 09a0ca4 + d2b08ad commit 4f7147a

23 files changed

+2812
-815
lines changed

docs/agents.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,11 @@ It also takes an optional `event_stream_handler` argument that you can use to ga
125125
The example below shows how to stream events and text output. You can also [stream structured output](output.md#streaming-structured-output).
126126

127127
!!! note
128-
As the `run_stream()` method will consider the first output matching the [output type](output.md#structured-output) to be the final output,
129-
it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.
128+
The `run_stream()` method will consider the first output that matches the [output type](output.md#structured-output) to be the final output of the agent run, even when the model generates tool calls after this "final" output.
130129

131-
If you want to always run the agent graph to completion and stream all events from the model's streaming response and the agent's execution of tools,
130+
These "dangling" tool calls will not be executed unless the agent's [`end_strategy`][pydantic_ai.agent.Agent.end_strategy] is set to `'exhaustive'`, and even then their results will not be sent back to the model as the agent run will already be considered completed.
131+
132+
If you want to always keep running the agent when it performs tool calls, and stream all events from the model's streaming response and the agent's execution of tools,
132133
use [`agent.run_stream_events()`][pydantic_ai.agent.AbstractAgent.run_stream_events] or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead, as described in the following sections.
133134

134135
```python {title="run_stream_event_stream_handler.py"}

docs/durable_execution/temporal.md

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ from temporalio.worker import Worker
8686

8787
from pydantic_ai import Agent
8888
from pydantic_ai.durable_exec.temporal import (
89-
AgentPlugin,
9089
PydanticAIPlugin,
90+
PydanticAIWorkflow,
9191
TemporalAgent,
9292
)
9393

@@ -101,26 +101,27 @@ temporal_agent = TemporalAgent(agent) # (1)!
101101

102102

103103
@workflow.defn
104-
class GeographyWorkflow: # (2)!
104+
class GeographyWorkflow(PydanticAIWorkflow): # (2)!
105+
__pydantic_ai_agents__ = [temporal_agent] # (3)!
106+
105107
@workflow.run
106108
async def run(self, prompt: str) -> str:
107-
result = await temporal_agent.run(prompt) # (3)!
109+
result = await temporal_agent.run(prompt) # (4)!
108110
return result.output
109111

110112

111113
async def main():
112-
client = await Client.connect( # (4)!
113-
'localhost:7233', # (5)!
114-
plugins=[PydanticAIPlugin()], # (6)!
114+
client = await Client.connect( # (5)!
115+
'localhost:7233', # (6)!
116+
plugins=[PydanticAIPlugin()], # (7)!
115117
)
116118

117-
async with Worker( # (7)!
119+
async with Worker( # (8)!
118120
client,
119121
task_queue='geography',
120122
workflows=[GeographyWorkflow],
121-
plugins=[AgentPlugin(temporal_agent)], # (8)!
122123
):
123-
output = await client.execute_workflow( # (9)!
124+
output = await client.execute_workflow( # (10)!
124125
GeographyWorkflow.run,
125126
args=['What is the capital of Mexico?'],
126127
id=f'geography-{uuid.uuid4()}',
@@ -131,15 +132,15 @@ async def main():
131132
```
132133

133134
1. The original `Agent` cannot be used inside a deterministic Temporal workflow, but the `TemporalAgent` can.
134-
2. As explained above, the workflow represents a deterministic piece of code that can use non-deterministic activities for operations that require I/O.
135-
3. [`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] works just like [`Agent.run()`][pydantic_ai.Agent.run], but it will automatically offload model requests, tool calls, and MCP server communication to Temporal activities.
136-
4. We connect to the Temporal server which keeps track of workflow and activity execution.
137-
5. This assumes the Temporal server is [running locally](https://github.com/temporalio/temporal#download-and-start-temporal-server-locally).
138-
6. The [`PydanticAIPlugin`][pydantic_ai.durable_exec.temporal.PydanticAIPlugin] tells Temporal to use Pydantic for serialization and deserialization, and to treat [`UserError`][pydantic_ai.exceptions.UserError] exceptions as non-retryable.
139-
7. We start the worker that will listen on the specified task queue and run workflows and activities. In a real world application, this might be run in a separate service.
140-
8. The [`AgentPlugin`][pydantic_ai.durable_exec.temporal.AgentPlugin] registers the `TemporalAgent`'s activities with the worker.
141-
9. We call on the server to execute the workflow on a worker that's listening on the specified task queue.
142-
10. The agent's `name` is used to uniquely identify its activities.
135+
2. As explained above, the workflow represents a deterministic piece of code that can use non-deterministic activities for operations that require I/O. Subclassing [`PydanticAIWorkflow`][pydantic_ai.durable_exec.temporal.PydanticAIWorkflow] is optional but provides proper typing for the `__pydantic_ai_agents__` class variable.
136+
3. List the `TemporalAgent`s used by this workflow. The [`PydanticAIPlugin`][pydantic_ai.durable_exec.temporal.PydanticAIPlugin] will automatically register their activities with the worker. Alternatively, if modifying the worker initialization is easier than the workflow class, you can use [`AgentPlugin`][pydantic_ai.durable_exec.temporal.AgentPlugin] to register agents directly on the worker.
137+
4. [`TemporalAgent.run()`][pydantic_ai.durable_exec.temporal.TemporalAgent.run] works just like [`Agent.run()`][pydantic_ai.Agent.run], but it will automatically offload model requests, tool calls, and MCP server communication to Temporal activities.
138+
5. We connect to the Temporal server which keeps track of workflow and activity execution.
139+
6. This assumes the Temporal server is [running locally](https://github.com/temporalio/temporal#download-and-start-temporal-server-locally).
140+
7. The [`PydanticAIPlugin`][pydantic_ai.durable_exec.temporal.PydanticAIPlugin] tells Temporal to use Pydantic for serialization and deserialization, treats [`UserError`][pydantic_ai.exceptions.UserError] exceptions as non-retryable, and automatically registers activities for agents listed in `__pydantic_ai_agents__`.
141+
8. We start the worker that will listen on the specified task queue and run workflows and activities. In a real world application, this might be run in a separate service.
142+
9. The agent's `name` is used to uniquely identify its activities.
143+
10. We call on the server to execute the workflow on a worker that's listening on the specified task queue.
143144

144145
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_
145146

docs/output.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,15 @@ print(repr(result.output))
306306

307307
_(This example is complete, it can be run "as is")_
308308

309+
##### Parallel Output Tool Calls
310+
311+
When the model calls other tools in parallel with an output tool, you can control how tool calls are executed by setting the agent's [`end_strategy`][pydantic_ai.agent.Agent.end_strategy]:
312+
313+
- `'early'` (default): Output tools are executed first. Once a valid final result is found, remaining function and output tool calls are skipped
314+
- `'exhaustive'`: Output tools are executed first, then all function tools are executed. The first valid output tool result becomes the final output
315+
316+
The `'exhaustive'` strategy is useful when tools have important side effects (like logging, sending notifications, or updating metrics) that should always execute.
317+
309318
#### Native Output
310319

311320
Native Output mode uses a model's native "Structured Outputs" feature (aka "JSON Schema response format"), where the model is forced to only output text matching the provided JSON schema. Note that this is not supported by all models, and sometimes comes with restrictions. For example, Gemini cannot use tools at the same time as structured output, and attempting to do so will result in an error.

docs/tools-advanced.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,38 @@ def my_flaky_tool(query: str) -> str:
371371

372372
Raising `ModelRetry` also generates a `RetryPromptPart` containing the exception message, which is sent back to the LLM to guide its next attempt. Both `ValidationError` and `ModelRetry` respect the `retries` setting configured on the `Tool` or `Agent`.
373373

374+
### Tool Timeout
375+
376+
You can set a timeout for tool execution to prevent tools from running indefinitely. If a tool exceeds its timeout, it is treated as a failure and a retry prompt is sent to the model (counting towards the retry limit).
377+
378+
```python
379+
import asyncio
380+
381+
from pydantic_ai import Agent
382+
383+
# Set a default timeout for all tools on the agent
384+
agent = Agent('test', tool_timeout=30)
385+
386+
387+
@agent.tool_plain
388+
async def slow_tool() -> str:
389+
"""This tool will use the agent's default timeout (30 seconds)."""
390+
await asyncio.sleep(10)
391+
return 'Done'
392+
393+
394+
@agent.tool_plain(timeout=5)
395+
async def fast_tool() -> str:
396+
"""This tool has its own timeout (5 seconds) that overrides the agent default."""
397+
await asyncio.sleep(1)
398+
return 'Done'
399+
```
400+
401+
- **Agent-level timeout**: Set `tool_timeout` on the [`Agent`][pydantic_ai.Agent] to apply a default timeout to all tools.
402+
- **Per-tool timeout**: Set `timeout` on individual tools via [`@agent.tool`][pydantic_ai.Agent.tool], [`@agent.tool_plain`][pydantic_ai.Agent.tool_plain], or the [`Tool`][pydantic_ai.tools.Tool] dataclass. This overrides the agent-level default.
403+
404+
When a timeout occurs, the tool is considered to have failed and the model receives a retry prompt with the message `"Timed out after {timeout} seconds."`. This counts towards the tool's retry limit just like validation errors or explicit [`ModelRetry`][pydantic_ai.exceptions.ModelRetry] exceptions.
405+
374406
### Parallel tool calls & concurrency
375407

376408
When a model returns multiple tool calls in one response, Pydantic AI schedules them concurrently using `asyncio.create_task`.
@@ -381,6 +413,13 @@ Async functions are run on the event loop, while sync functions are offloaded to
381413
!!! note "Limiting tool executions"
382414
You can cap tool executions within a run using [`UsageLimits(tool_calls_limit=...)`](agents.md#usage-limits). The counter increments only after a successful tool invocation. Output tools (used for [structured output](output.md)) are not counted in the `tool_calls` metric.
383415

416+
#### Output Tool Calls
417+
418+
When a model calls an [output tool](output.md#tool-output) in parallel with other tools, the agent's [`end_strategy`][pydantic_ai.agent.Agent.end_strategy] parameter controls how these tool calls are executed.
419+
The `'exhaustive'` strategy ensures all tools are executed even after a final result is found, which is useful when tools have side effects (like logging, sending notifications, or updating metrics) that should always execute.
420+
421+
For more information of how `end_strategy` works with both function tools and output tools, see the [Output Tool](output.md#parallel-output-tool-calls) docs.
422+
384423
## See Also
385424

386425
- [Function Tools](tools.md) - Basic tool concepts and registration

pydantic_ai_slim/pydantic_ai/_agent_graph.py

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@
6060
S = TypeVar('S')
6161
NoneType = type(None)
6262
EndStrategy = Literal['early', 'exhaustive']
63-
"""The strategy for handling multiple tool calls when a final result is found.
64-
65-
- `'early'`: Stop processing other tool calls once a final result is found
66-
- `'exhaustive'`: Process all tool calls even after finding a final result
67-
"""
6863
DepsT = TypeVar('DepsT')
6964
OutputT = TypeVar('OutputT')
7065

@@ -865,35 +860,56 @@ async def process_tool_calls( # noqa: C901
865860

866861
# First, we handle output tool calls
867862
for call in tool_calls_by_kind['output']:
868-
if final_result:
869-
if final_result.tool_call_id == call.tool_call_id:
870-
part = _messages.ToolReturnPart(
871-
tool_name=call.tool_name,
872-
content='Final result processed.',
873-
tool_call_id=call.tool_call_id,
874-
)
875-
else:
876-
yield _messages.FunctionToolCallEvent(call)
877-
part = _messages.ToolReturnPart(
878-
tool_name=call.tool_name,
879-
content='Output tool not used - a final result was already processed.',
880-
tool_call_id=call.tool_call_id,
881-
)
882-
yield _messages.FunctionToolResultEvent(part)
883-
863+
# `final_result` can be passed into `process_tool_calls` from `Agent.run_stream`
864+
# when streaming and there's already a final result
865+
if final_result and final_result.tool_call_id == call.tool_call_id:
866+
part = _messages.ToolReturnPart(
867+
tool_name=call.tool_name,
868+
content='Final result processed.',
869+
tool_call_id=call.tool_call_id,
870+
)
871+
output_parts.append(part)
872+
# Early strategy is chosen and final result is already set
873+
elif ctx.deps.end_strategy == 'early' and final_result:
874+
yield _messages.FunctionToolCallEvent(call)
875+
part = _messages.ToolReturnPart(
876+
tool_name=call.tool_name,
877+
content='Output tool not used - a final result was already processed.',
878+
tool_call_id=call.tool_call_id,
879+
)
880+
yield _messages.FunctionToolResultEvent(part)
884881
output_parts.append(part)
882+
# Early strategy is chosen and final result is not yet set
883+
# Or exhaustive strategy is chosen
885884
else:
886885
try:
887886
result_data = await tool_manager.handle_call(call)
888887
except exceptions.UnexpectedModelBehavior as e:
889-
ctx.state.increment_retries(
890-
ctx.deps.max_result_retries, error=e, model_settings=ctx.deps.model_settings
891-
)
892-
raise e # pragma: lax no cover
888+
# If we already have a valid final result, don't fail the entire run
889+
# This allows exhaustive strategy to complete successfully when at least one output tool is valid
890+
if final_result:
891+
# If output tool fails when we already have a final result, skip it without retrying
892+
yield _messages.FunctionToolCallEvent(call)
893+
part = _messages.ToolReturnPart(
894+
tool_name=call.tool_name,
895+
content='Output tool not used - output failed validation.',
896+
tool_call_id=call.tool_call_id,
897+
)
898+
output_parts.append(part)
899+
yield _messages.FunctionToolResultEvent(part)
900+
else:
901+
# No valid result yet, so this is a real failure
902+
ctx.state.increment_retries(
903+
ctx.deps.max_result_retries, error=e, model_settings=ctx.deps.model_settings
904+
)
905+
raise e # pragma: lax no cover
893906
except ToolRetryError as e:
894-
ctx.state.increment_retries(
895-
ctx.deps.max_result_retries, error=e, model_settings=ctx.deps.model_settings
896-
)
907+
# If we already have a valid final result, don't increment retries for invalid output tools
908+
# This allows the run to succeed if at least one output tool returned a valid result
909+
if not final_result:
910+
ctx.state.increment_retries(
911+
ctx.deps.max_result_retries, error=e, model_settings=ctx.deps.model_settings
912+
)
897913
yield _messages.FunctionToolCallEvent(call)
898914
output_parts.append(e.tool_retry)
899915
yield _messages.FunctionToolResultEvent(e.tool_retry)
@@ -904,7 +920,10 @@ async def process_tool_calls( # noqa: C901
904920
tool_call_id=call.tool_call_id,
905921
)
906922
output_parts.append(part)
907-
final_result = result.FinalResult(result_data, call.tool_name, call.tool_call_id)
923+
924+
# In both `early` and `exhaustive` modes, use the first output tool's result as the final result
925+
if not final_result:
926+
final_result = result.FinalResult(result_data, call.tool_name, call.tool_call_id)
908927

909928
# Then, we handle function tool calls
910929
calls_to_run: list[_messages.ToolCallPart] = []

pydantic_ai_slim/pydantic_ai/_tool_manager.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,7 @@ async def _call_tool(
172172
call.args or {}, allow_partial=pyd_allow_partial, context=ctx.validation_context
173173
)
174174

175-
result = await self.toolset.call_tool(name, args_dict, ctx, tool)
176-
177-
return result
175+
return await self.toolset.call_tool(name, args_dict, ctx, tool)
178176
except (ValidationError, ModelRetry) as e:
179177
max_retries = tool.max_retries if tool is not None else 1
180178
current_retry = self.ctx.retries.get(name, 0)

0 commit comments

Comments
 (0)