-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Open
Description
Initial Checks
- I confirm that I'm using the latest version of MCP Python SDK
- I confirm that I searched for my issue in https://github.com/modelcontextprotocol/python-sdk/issues before opening this issue
Description
I am new to MCP and I am having the bellow connection failure error and note that with the minimal set up it Works and I have tried using many ways it does not work and I have also followed the official guideline from the docs and used uv
for the installation and environment setup and also the MCP inspector does work when I try it to run the tool.
The server does not show any connection related thing when I try to connect in the client.
Many thanks in advance for any help.
here is the error
(client) PS C:\Users\user\Desktop\codes\companies\Fanaye Technologies\MCP\MCP_X_Final\client> python main.py ../server/server.py
Traceback (most recent call last):
File "C:\Users\user\Desktop\codes\companies\Fanaye Technologies\MCP\MCP_X_Final\client\main.py", line 107, in <module>
asyncio.run(main())
File "D:\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "D:\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\Programs\Python\Python312\Lib\asyncio\base_events.py", line 685, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "C:\Users\user\Desktop\codes\companies\Fanaye Technologies\MCP\MCP_X_Final\client\main.py", line 101, in main
await mc.connect_to_server(sys.argv[1])
File "C:\Users\user\Desktop\codes\companies\Fanaye Technologies\MCP\MCP_X_Final\client\main.py", line 30, in connect_to_server
await self.session.initialize()
File "C:\Users\user\Desktop\codes\companies\Fanaye Technologies\MCP\MCP_X_Final\client\.venv\Lib\site-packages\mcp\client\session.py", line 151, in initialize
result = await self.send_request(
^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user\Desktop\codes\companies\Fanaye Technologies\MCP\MCP_X_Final\client\.venv\Lib\site-packages\mcp\shared\session.py", line 286, in send_request raise McpError(response_or_error.error)
mcp.shared.exceptions.McpError: Connection closed
Example Code
here is my code for the server.py
from typing import Any
import os
import time
import asyncio
import openpyxl
import httpx
import json
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
import sys
load_dotenv()
print("Starting MCP server...", file=sys.stderr)
print(f"Python executable: {sys.executable}", file=sys.stderr)
print(f"Working directory: {os.getcwd()}", file=sys.stderr)
# Initialize FastMCP server
mcp = FastMCP("twitter_monitor")
X_API_KEY = os.getenv("X_API_KEY")
print(f"X_API_KEY configured: {bool(X_API_KEY)}", file=sys.stderr)
TWITTER_SEARCH_URL = "https://api.twitter.com/2/tweets/search/recent"
HEADERS = {
"Authorization": f"Bearer {X_API_KEY}",
"Content-Type": "application/json"
}
def read_excel(file_path: str, column_name: str) -> list[str]:
print(f"Attempting to read Excel file: {file_path}", file=sys.stderr)
if not os.path.exists(file_path):
raise FileNotFoundError(f"Excel file not found: {file_path}")
try:
wb = openpyxl.load_workbook(file_path)
print(f"Workbook loaded. Available sheets: {wb.sheetnames}", file=sys.stderr)
except Exception as e:
raise Exception(f"Failed to load workbook: {e}")
if "Market Makers" not in wb.sheetnames:
raise ValueError(f"Sheet 'Market Makers' not found. Available sheets: {wb.sheetnames}")
sheet = wb["Market Makers"]
header = [cell.value for cell in next(sheet.iter_rows(min_row=1, max_row=1))]
print(f"Excel headers: {header}", file=sys.stderr)
try:
col_idx = header.index(column_name)
except ValueError:
raise ValueError(f"Column '{column_name}' not found in sheet header: {header}")
twitter_handles = set()
for row in sheet.iter_rows(min_row=2):
handle = row[col_idx].value
if handle and handle not in twitter_handles:
twitter_handles.add(handle)
if len(twitter_handles) == 2:
break
print(f"Found Twitter handles: {list(twitter_handles)}", file=sys.stderr)
return list(twitter_handles)
@mcp.tool()
async def get_company_tweets(column: str = "Twitter", max_results: int = 10) -> str:
"""Get tweets about Company, Announcement, or Update from Excel-provided usernames.
Args:
column: Column name in Excel sheet with Twitter usernames.
max_results: Max tweets to retrieve per handle.
"""
file_path = "mysheet.xlsx"
try:
twitter_handles = read_excel(file_path, column)
except Exception as e:
return json.dumps({"error": str(e)})
all_posts = []
async with httpx.AsyncClient() as client:
for handle in twitter_handles:
query = f"from:{handle} Company Announcement OR Update"
params = {"query": query, "max_results": max_results}
retries = 3
for _ in range(retries):
try:
response = await client.get(TWITTER_SEARCH_URL, headers=HEADERS, params=params)
if response.status_code == 200:
all_posts.extend(response.json().get("data", []))
break
elif response.status_code == 429:
reset_time = int(response.headers.get("x-rate-limit-reset", 0))
current_time = int(time.time())
wait_time = max(reset_time - current_time, 5)
print(f"Rate limited. Waiting {wait_time}s...", file=sys.stderr)
await asyncio.sleep(wait_time)
else:
print(f"Error {response.status_code} for {handle}: {response.text}", file=sys.stderr)
break
except Exception as e:
print(f"Request failed for {handle}: {e}", file=sys.stderr)
break
await asyncio.sleep(1) # Use async sleep
if len(all_posts) >= 25:
break
result = {"total_fetched": len(all_posts), "tweets": all_posts[:25]}
return json.dumps(result, indent=2)
if __name__ == "__main__":
print("Starting MCP server...", file=sys.stderr)
mcp.run(transport="stdio")
here is the client code
import os
import sys
import json
import asyncio
from contextlib import AsyncExitStack
from dotenv import load_dotenv
import httpx
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
load_dotenv()
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
class MCPClient:
def __init__(self):
self.session = None
self.exit_stack = AsyncExitStack()
self.tool_to_session = {}
self.deepseek_functions = []
async def connect_to_server(self, server_script_path: str):
is_py = server_script_path.endswith('.py')
cmd = "python" if is_py else "node"
params = StdioServerParameters(command=cmd, args=[server_script_path], env=None, stderr_to_stdout=True)
read, write = await self.exit_stack.enter_async_context(stdio_client(params))
self.session = await self.exit_stack.enter_async_context(ClientSession(read, write))
await self.session.initialize()
resp = await self.session.list_tools()
for tool in resp.tools:
self.tool_to_session[tool.name] = self.session
self.deepseek_functions.append({
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
})
print("Connected! Tools:", [f["name"] for f in self.deepseek_functions])
async def call_deepseek(self, messages, functions=None, function_call="auto"):
headers = {
"Authorization": f"Bearer {DEEPSEEK_API_KEY}",
"Content-Type": "application/json"
}
body = {
"model": "deepseek-chat",
"messages": messages,
"max_tokens": 1000,
"function_call": function_call
}
if functions:
body["functions"] = functions
async with httpx.AsyncClient() as client:
response = await client.post(DEEPSEEK_API_URL, headers=headers, json=body)
response.raise_for_status()
return response.json()
async def process_query(self, query: str) -> str:
messages = [{"role": "user", "content": query}]
resp_json = await self.call_deepseek(messages, self.deepseek_functions)
msg = resp_json["choices"][0]["message"]
if "function_call" in msg:
fname = msg["function_call"]["name"]
fargs = json.loads(msg["function_call"]["arguments"])
result = await self.tool_to_session[fname].call_tool(fname, fargs)
messages.append(msg)
messages.append({"role": "function", "name": fname, "content": result.content})
resp2_json = await self.call_deepseek(messages)
return resp2_json["choices"][0]["message"]["content"]
return msg["content"]
async def chat_loop(self):
print("MCP Twitter Client (DeepSeek) started. Type your query or 'quit'.")
while True:
q = input("You: ").strip()
if q.lower() == "quit":
break
try:
print("Assistant:\n", await self.process_query(q))
except Exception as e:
print("Error:", e)
async def cleanup(self):
await self.exit_stack.aclose()
async def main():
if len(sys.argv) != 2:
print("Usage: python client.py <path_to_server_script>")
sys.exit(1)
mc = MCPClient()
try:
await mc.connect_to_server(sys.argv[1])
await mc.chat_loop()
finally:
await mc.cleanup()
if __name__ == "__main__":
asyncio.run(main())
Python & MCP Python SDK
Python 3.12.2
mcp==1.10.1
Metadata
Metadata
Assignees
Labels
No labels