-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Close server session after handle stateless request #1116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Close server session after handle stateless request #1116
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution!
Do you have a way of reproducing this issue? An regression test as part of this PR would be great.
@@ -190,6 +190,9 @@ async def run_stateless_server(*, task_status: TaskStatus[None] = anyio.TASK_STA | |||
# Handle the HTTP request and return the response | |||
await http_transport.handle_request(scope, receive, send) | |||
|
|||
# Terminate the session after the request is handled | |||
await http_transport._terminate_session() # type: ignore[reportPrivateUsage] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create a public cleanup()
method in StreamableHTTPSeverTransport
and use that internally inside _terminate_session()
instead of using a private method?
There's no "Session" in a stateless request, so I think we shouldn't be terminating a non-existent session and extract the cleanup logic instead.
Something like this:
# streamable_http.py
async def cleanup(self) -> None:
"""Clean up all transport resources (streams, connections).
This method closes all streams and cleans up resources but does NOT
mark the session as terminated. Use this for stateless cleanup.
"""
# We need a copy of the keys to avoid modification during iteration
request_stream_keys = list(self._request_streams.keys())
# Close all request streams asynchronously
for key in request_stream_keys:
await self._clean_up_memory_streams(key)
# Clear the request streams dictionary immediately
self._request_streams.clear()
try:
if self._read_stream_writer is not None:
await self._read_stream_writer.aclose()
if self._read_stream is not None:
await self._read_stream.aclose()
if self._write_stream_reader is not None:
await self._write_stream_reader.aclose()
if self._write_stream is not None:
await self._write_stream.aclose()
except Exception as e:
# During cleanup, we catch all exceptions since streams might be in various states
logger.debug(f"Error closing streams: {e}")
async def _terminate_session(self) -> None:
"""Terminate the current session, closing all streams.
Once terminated, all requests with this session ID will receive 404 Not Found.
"""
self._terminated = True
logger.info(f"Terminating session: {self.mcp_session_id}")
await self.cleanup()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your review, I strongly agree with avoiding calls to internal methods.
There's no "Session" in a stateless request
I agree with your point, but I think we can also view it as a temporary session that only exists during this stateless request
Can we create a public
cleanup()
method inStreamableHTTPSeverTransport
and use that internally inside_terminate_session()
instead of using a private method?
I prefer to modify the _terminal_session
method directly to be public rather than extracting another method from it, because the transport itself should also be terminated after handling the stateless request, which means the _terminated
should also be set to True
. I think terminate
might be a good name, because we actually directly terminate the transport itself, and the session is automatically terminated due to the stream being closed.
And this terminated method needs to be called not only in stateless requests but also in stateful requests. If the server-side manager cannot actively clean up these transports or sessions, it means that as long as the client does not send delete requests, they will exist in memory FOREVER, and the same problem exists even in stateful requests.
I think a "session" can exist forever, but a session instance should not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, works for me! Do you want to make that change to this PR? In that case rename _terminate_session
to just terminate()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, I have submitted this change.
I am handling the termination of a stateful server session. Do you think I should continue in this PR or open a new one?
You can use the Example Code in #1076, just change the FastMCP server as stateless
Or for stateful servers, just add
I tried many methods, but there was no way to implement a reasonable test, because in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again for the contribution! I'll figure out how we can have at least some kind of basic test for this :)
This test verifies that PR #1116's fix properly cleans up transport resources for stateless requests. The test mocks StreamableHTTPServerTransport to track when _terminate_session() is called and ensures it's invoked for each stateless request to prevent memory leaks. Without the fix (commenting out the _terminate_session() call), this test fails, confirming it properly detects the memory leak issue. Github-Issue:#1116
The previous test was self-fulfilling as it only verified that a mocked method was called. The new test uses real StreamableHTTPServerTransport instances and verifies that: - The transport's _terminated flag is set to True - The _request_streams dictionary is empty - The test fails when _terminate_session() is not called This provides a more robust regression test for PR #1116's memory leak fix. Github-Issue:#1116
I made an (imperfect) attempt to have some kind of test coverage for this, feel free to take a look if you agree with the approach: #1140 |
This test verifies that PR #1116's fix properly cleans up transport resources for stateless requests. The test mocks StreamableHTTPServerTransport to track when _terminate_session() is called and ensures it's invoked for each stateless request to prevent memory leaks. Without the fix (commenting out the _terminate_session() call), this test fails, confirming it properly detects the memory leak issue. Github-Issue:#1116
await http_transport.handle_request(scope, receive, send) | ||
|
||
# Terminate the transport after the request is handled | ||
await http_transport.terminate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can handle_request
fail? I didn't check, but I think ideally this transport should run in a context manager, which would terminate on exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that handle_request
might fail, perhaps we could use try...finally...
to handle this situation. Context managers may not be suitable for stateful requests.
Motivation and Context
The
ServerSession
created by stateless Streamable HTTP requests is not being properly cleaned up, leading to memory leaks, which may be related to #1076.This PR allows the transport to immediately close the session after handling the request, which I believe is reasonable, as the session will no longer be needed once the request has been processed.
How Has This Been Tested?
Breaking Changes
Types of changes
Checklist
Additional context
At the same time, for stateful requests, when the client does not send a DELETE request due to abnormal termination or network issues, the service session will not be cleaned up either, which similarly leads to memory leaks.
I think a heartbeat mechanism can be used to determine whether the client is alive. allowing the server to timely clean up expired sessions. I would be happy to open a new PR for this or continue addressing this issue in the current PR.