Skip to content

[FEATURE] Implement native async iterator support  #83

@awsarron

Description

@awsarron

Problem Statement

The Agent class provides the stream_async function that returns an AsyncIterator:

async def stream_async(self, prompt: str, **kwargs: Any) -> AsyncIterator[Any]:

The current stream_async implementation calls _run_loop and uses threads with a queue to yield streamed data events from the Strands Agents event loop:

def _run_loop(

thread = Thread(target=target_callback, daemon=True)
thread.start()
try:
while True:
item = await queue.get()
if item == _stop_event:
break
if isinstance(item, BaseException):
raise item
yield item
finally:
thread.join()

The problem is that this threading implementation doesn't work with Python features like thread local contextvars. Certain Python libraries (e.g. https://github.com/UKGovernmentBEIS/inspect_ai) use thread local context vars and require them to function correctly. The only reasonable solution for correctly supporting concurrency within a single thread is async.

This issue is to propose, discuss, and refine a true async implementation of the Agent.stream_async function and Strands Agent event loop.

Proposed Solution

No response

Use Case

  • Thread local contextvars
  • A more maintainable implementation of async agent requests
  • Allows for future enhancements that enable streamed tool results

Alternatives Solutions

No response

Additional Context

No response

Metadata

Metadata

Assignees

Labels

area-asyncRelated to asynchronous flows or multi-threadingenhancementNew feature or request

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions