Skip to content

call_tool Hangs During In-Memory Testing When Server Sends Notifications Before Response #594

@maxibove13

Description

@maxibove13

Describe the Bug

When using the mcp.shared.memory.create_connected_server_and_client_session utility for integration testing, calls to client_session.call_tool hang indefinitely if the server-side tool handler sends a notification (e.g., ProgressNotification via ctx.report_progress or session.send_progress_notification before returning its final result.

It appears this occurs because the underlying memory streams created by create_client_server_memory_streams are unbuffered (max_buffer_size=0 by default). When the server sends the notification, the client's internal _receive_loop needs to consume it. However, the await client_session.call_tool(...) call is waiting specifically for the JSON-RPC response. If the event loop doesn't schedule the _receive_loop quickly enough to consume the notification, the server might block when trying to send the final response into the (full) unbuffered stream, leading to a deadlock where the client never receives the response it's waiting for.

Environment:
MCP Python SDK Version: 1.3.0
Python Version: 3.12
Operating System: macOS Sequoia 15.3.1
anyio version: 4.8.0

Steps to Reproduce
Create a simple MCP server using mcp.server.lowlevel.Server or mcp.server.fastmcp.FastMCP.
Define a tool that sends a notification (e.g., session.send_progress_notification or ctx.report_progress) before returning its result.
Set up an integration test using mcp.shared.memory.create_connected_server_and_client_session to connect a ClientSession to this server over memory streams.
Within the test, await a call to client_session.call_tool targeting the tool defined in step 2.

Minimal Reproducible Example:

import asyncio
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator, List

import anyio
import pytest

from mcp import types, ClientSession, Server
from mcp.shared.memory import create_connected_server_and_client_session
from mcp.types import CallToolResult, TextContent, ProgressToken

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

# --- Server Setup ---
@pytest.fixture
def mcp_server() -> Server:
    server = Server(name="test_notification_server")

    @server.call_tool()
    async def tool_with_notification(name: str, arguments: dict) -> List[TextContent]:
        logger.info("Server: Tool 'tool_with_notification' started.")
        # Get session to send notification
        session = server.request_context.session
        progress_token: ProgressToken | None = None
        if server.request_context.meta:
            progress_token = server.request_context.meta.progressToken

        if progress_token is not None:
            logger.info(f"Server: Sending progress notification for token {progress_token}...")
            await session.send_progress_notification(
                progress_token=progress_token, progress=50.0, total=100.0
            )
            logger.info("Server: Progress notification sent.")
        else:
            logger.warning("Server: No progress token found in request meta.")

        # Simulate some work after notification
        await anyio.sleep(0.05)

        logger.info("Server: Tool 'tool_with_notification' finishing.")
        return [TextContent(type="text", text="Tool finished successfully")]

    return server

# --- Test ---
@pytest.mark.asyncio
async def test_call_tool_with_notification_hangs(mcp_server: Server):
    """
    This test demonstrates the hang when using the default memory stream utility.
    """
    logger.info("Starting test...")
    async with create_connected_server_and_client_session(
        mcp_server,
        raise_exceptions=True # Make test fail clearly on server errors
    ) as client_session:
        logger.info("Client session initialized.")

        tool_name = "tool_with_notification"
        tool_args = {}

        # Add progress token to trigger notification
        request_meta = types.RequestParams.Meta(progressToken="test-token-123")

        logger.info(f"Client: Calling tool '{tool_name}'...")
        try:
            # This await call hangs indefinitely with default unbuffered streams
            result: CallToolResult = await asyncio.wait_for(
                client_session.send_request( # Use send_request to add meta easily
                    types.ClientRequest(
                        types.CallToolRequest(
                            method="tools/call",
                            params=types.CallToolRequestParams(
                                name=tool_name,
                                arguments=tool_args,
                                meta=request_meta # Add meta here
                            )
                        )
                    ),
                    types.CallToolResult # Expected result type
                ),
                timeout=5.0 # Add a timeout to prevent infinite hang in test run
            )
            logger.info(f"Client: Received result: {result}")

            # Assertions (won't be reached if it hangs)
            assert not result.isError
            assert len(result.content) == 1
            assert isinstance(result.content[0], TextContent)
            assert result.content[0].text == "Tool finished successfully"

        except asyncio.TimeoutError:
            pytest.fail("Test timed out - call_tool likely hanged.")
        except Exception as e:
             logger.error(f"Client: Error during call_tool: {e}", exc_info=True)
             pytest.fail(f"call_tool raised an unexpected exception: {e}")

Expected Behavior

The await client_session.call_tool(...) (or send_request) should complete successfully after the server sends both the ProgressNotification and the final CallToolResult response. The test should pass the assertions.
Actual Behavior
The await client_session.call_tool(...) call hangs indefinitely and the test eventually times out (if a timeout is applied). Logs show the server sending the notification but the client never seems to receive the final response.
Workaround/Analysis
We observed that modifying the create_client_server_memory_streams utility (or replicating its logic within the test) to use a non-zero buffer size for anyio.create_memory_object_stream (e.g., buffer_size=10) resolves the hang. This suggests the issue is related to the server potentially blocking on writing the final response to the unbuffered stream because the client's receive loop hasn't yet consumed the preceding notification.

Question/Suggestion

Is this hanging behavior expected with the default unbuffered memory streams when notifications precede responses?

What is the recommended approach for writing integration tests using the in-memory transport utilities when server-sent notifications (like progress) are involved?

Could the create_connected_server_and_client_session utility be updated to either:
Use a small default buffer for the memory streams?

Accept a message_handler argument to be passed to the ClientSession it creates, allowing tests to explicitly consume all messages?

Alternatively, is there a different pattern recommended for testing these concurrent interactions with the current SDK?

Thank you

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    Status

    To triage

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions