mcp_test/brave_search_mcp.py
2025-03-30 14:47:33 +02:00

174 lines
6.1 KiB
Python

import subprocess
import json
import requests
import time
import os
import signal
import atexit
class BraveSearchMCP:
def __init__(self, config_path="mcp-config.json", port=8087):
self.config_path = config_path
self.port = port
self.server_process = None
self.start_server()
# Enregistrer l'arrêt du serveur à la fin de l'exécution du programme
atexit.register(self.stop_server)
def start_server(self):
"""Démarre le serveur MCP pour Brave Search"""
# Vérifier si le serveur est déjà en cours d'exécution
try:
response = requests.get(f"http://localhost:{self.port}/status")
if response.status_code == 200:
print("Un serveur MCP est déjà en cours d'exécution")
return
except requests.exceptions.ConnectionError:
pass # Le serveur n'est pas en cours d'exécution, ce qui est attendu
# Charger la configuration MCP
with open(self.config_path, 'r') as f:
config = json.load(f)
brave_config = config["mcpServers"]["brave-search"]
command = brave_config["command"]
args = brave_config["args"]
# Préparer l'environnement
env = os.environ.copy()
env.update(brave_config["env"])
# Démarrer le serveur
cmd = [command] + args
print("Starting server with command:", " ".join(cmd))
print("Environment:", env.get("BRAVE_API_KEY", "Not set"))
# Démarrer le processus
self.server_process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=os.path.dirname(os.path.abspath(self.config_path))
)
# Attendre que le serveur démarre
max_attempts = 10
for attempt in range(max_attempts):
# Vérifier si le processus s'est terminé
if self.server_process.poll() is not None:
stdout, stderr = self.server_process.communicate()
print("Server failed to start!")
print("Server stdout:", stdout)
print("Server stderr:", stderr)
raise Exception("Le serveur MCP Brave Search s'est arrêté inattendument")
try:
response = requests.get(f"http://localhost:{self.port}/status", timeout=1)
if response.status_code == 200:
print(f"Serveur MCP Brave Search démarré sur le port {self.port}")
return
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
print(f"Waiting for server to start (attempt {attempt + 1}/{max_attempts})...")
# Vérifier la sortie du serveur
try:
stdout = self.server_process.stdout.readline()
if stdout:
print("Server stdout:", stdout.strip())
except:
pass
try:
stderr = self.server_process.stderr.readline()
if stderr:
print("Server stderr:", stderr.strip())
except:
pass
time.sleep(1)
# Si nous arrivons ici, le serveur n'a pas démarré
if self.server_process.poll() is None:
self.server_process.terminate()
stdout, stderr = self.server_process.communicate()
print("Server failed to start after maximum attempts!")
print("Final server stdout:", stdout)
print("Final server stderr:", stderr)
raise Exception("Impossible de démarrer le serveur MCP Brave Search")
def stop_server(self):
"""Arrête le serveur MCP"""
if self.server_process:
os.kill(self.server_process.pid, signal.SIGTERM)
self.server_process = None
print("Serveur MCP Brave Search arrêté")
def search(self, query, options=None):
"""
Exécute une recherche via Brave Search MCP
Args:
query (str): La requête de recherche
options (dict): Options de recherche supplémentaires
- count (int): Nombre de résultats à renvoyer (défaut: 10)
- offset (int): Décalage dans les résultats (défaut: 0)
- search_lang (str): Langue de recherche (défaut: 'fr' ou 'en')
- country (str): Code pays (défaut: 'fr' ou 'us')
Returns:
dict: Résultats de recherche
"""
if options is None:
options = {}
default_options = {
"count": 10,
"offset": 0,
"search_lang": "fr",
"country": "fr"
}
# Fusion des options par défaut avec les options fournies
for key, value in default_options.items():
if key not in options:
options[key] = value
# Construire la requête
payload = {
"q": query,
**options
}
# Envoyer la requête au serveur MCP
response = requests.post(
f"http://localhost:{self.port}/search",
json=payload
)
if response.status_code != 200:
raise Exception(f"Erreur lors de la recherche: {response.text}")
return response.json()
def autocomplete(self, query):
"""
Obtient des suggestions d'autocomplétion
Args:
query (str): Début de la requête
Returns:
dict: Suggestions d'autocomplétion
"""
response = requests.post(
f"http://localhost:{self.port}/autocomplete",
json={"q": query}
)
if response.status_code != 200:
raise Exception(f"Erreur lors de l'autocomplétion: {response.text}")
return response.json()