Skip to content

streamable_http client call_tool hangs when receiving invalid JSONRPCMessage #1144

@Nuclear2

Description

@Nuclear2

Initial Checks

Description

Issue Description:
During testing, we encountered a critical issue in the Server-Sent Events (SSE) handling mechanism:

  1. Problem Trigger:
    To some reason, the _handle_sse_event method received a truncated sse.data payload, causing:
    A JSON parsing exception message = JSONRPCMessage.model_validate_json(sse.data)
    Then the exception handle sent the exc and returned false
  2. Current Behavior Gap:
    Despite the failed processing:
    ·No error propagation to the client occurs
    ·No cleanup/retry mechanism is triggered
    ·The client remains stuck in a perpetual wait state for a tool_call result until timeout
  3. Critical Impact: The server has already completed the tool_call workflow and moved to the final /done state, creating a state desynchronization between client and server
  4. Root Cause Hypothesis:
    It appears that the client failed to detect the reception of invalid responses. The await read_stream_writer.send(exc) did not work during exception handling

Or maybe I missed some configs such as the retry mechanism or other error handling methods?

Example Code

async def _handle_sse_event(
    self,
    sse: ServerSentEvent,
    read_stream_writer: StreamWriter,
    original_request_id: RequestId | None = None,
    resumption_callback: Callable[[str], Awaitable[None]] | None = None,
    is_initialization: bool = False,
) -> bool:
    """Handle an SSE event, returning True if the response is complete."""
    if sse.event == "message":
        try:
            message = JSONRPCMessage.model_validate_json(sse.data) ##### threw the exception
            logger.debug(f"SSE message: {message}")

            # Extract protocol version from initialization response
            if is_initialization:
                self._maybe_extract_protocol_version_from_message(message)

            # If this is a response and we have original_request_id, replace it
            if original_request_id is not None and isinstance(message.root, JSONRPCResponse | JSONRPCError):
                message.root.id = original_request_id

            session_message = SessionMessage(message)
            await read_stream_writer.send(session_message)

            # Call resumption token callback if we have an ID
            if sse.id and resumption_callback:
                await resumption_callback(sse.id)

            # If this is a response or error return True indicating completion
            # Otherwise, return False to continue listening
            return isinstance(message.root, JSONRPCResponse | JSONRPCError)

        except Exception as exc:
            ##### handled exception but client did not receive ######
            logger.exception("Error parsing SSE message")
            await read_stream_writer.send(exc)
            return False
    else:
        logger.warning(f"Unknown SSE event: {sse.event}")
        return False

Python & MCP Python SDK

python == 3.12
SDK == 1.11.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions