170 lines
6.0 KiB
Python
170 lines
6.0 KiB
Python
import streamlit as st
|
||
import requests
|
||
import json
|
||
import subprocess
|
||
import sys
|
||
from pathlib import Path
|
||
import os
|
||
|
||
def get_available_tools():
|
||
"""Récupère la liste des outils MCP disponibles"""
|
||
return [
|
||
{
|
||
"name": "airbnb",
|
||
"description": "Client Airbnb pour MCP",
|
||
"version": "1.0.0",
|
||
"installed": False,
|
||
"package_name": "mcp-airbnb",
|
||
"repo_url": "https://github.com/your-org/mcp-airbnb.git"
|
||
},
|
||
{
|
||
"name": "booking",
|
||
"description": "Client Booking pour MCP",
|
||
"version": "1.0.0",
|
||
"installed": False,
|
||
"package_name": "mcp-booking",
|
||
"repo_url": "https://github.com/your-org/mcp-booking.git"
|
||
},
|
||
{
|
||
"name": "hotels",
|
||
"description": "Client Hotels pour MCP",
|
||
"version": "1.0.0",
|
||
"installed": False,
|
||
"package_name": "mcp-hotels",
|
||
"repo_url": "https://github.com/your-org/mcp-hotels.git"
|
||
}
|
||
]
|
||
|
||
def get_installed_servers():
|
||
"""Récupère la liste des serveurs MCP installés"""
|
||
try:
|
||
with open("mcp_servers.json", "r") as f:
|
||
return json.load(f)
|
||
except FileNotFoundError:
|
||
return []
|
||
|
||
def save_server(server_data):
|
||
"""Sauvegarde un nouveau serveur MCP"""
|
||
servers = get_installed_servers()
|
||
servers.append(server_data)
|
||
with open("mcp_servers.json", "w") as f:
|
||
json.dump(servers, f, indent=4)
|
||
|
||
def install_tool(tool_name):
|
||
"""Installe un outil MCP"""
|
||
try:
|
||
# Trouve le package_name correspondant
|
||
tools = get_available_tools()
|
||
tool = next((t for t in tools if t['name'] == tool_name), None)
|
||
if not tool:
|
||
return False, f"Outil {tool_name} non trouvé"
|
||
|
||
# Crée le dossier tools s'il n'existe pas
|
||
tools_dir = Path("tools")
|
||
tools_dir.mkdir(exist_ok=True)
|
||
|
||
# Clone le repository
|
||
repo_dir = tools_dir / tool['name']
|
||
if not repo_dir.exists():
|
||
subprocess.check_call(["git", "clone", tool['repo_url'], str(repo_dir)])
|
||
|
||
# Installe les dépendances
|
||
subprocess.check_call([sys.executable, "-m", "pip", "install", "-e", str(repo_dir)])
|
||
|
||
return True, f"L'outil {tool_name} a été installé avec succès!"
|
||
except subprocess.CalledProcessError as e:
|
||
return False, f"Erreur lors de l'installation de {tool_name}: {str(e)}"
|
||
except Exception as e:
|
||
return False, f"Erreur lors de l'installation de {tool_name}: {str(e)}"
|
||
|
||
def install_server(server_url, server_name):
|
||
"""Installe un serveur MCP"""
|
||
try:
|
||
# Crée le dossier servers s'il n'existe pas
|
||
servers_dir = Path("servers")
|
||
servers_dir.mkdir(exist_ok=True)
|
||
|
||
# Clone le repository du serveur
|
||
repo_dir = servers_dir / server_name
|
||
if not repo_dir.exists():
|
||
subprocess.check_call(["git", "clone", server_url, str(repo_dir)])
|
||
|
||
# Installe les dépendances
|
||
subprocess.check_call([sys.executable, "-m", "pip", "install", "-e", str(repo_dir)])
|
||
|
||
# Sauvegarde les informations du serveur
|
||
save_server({"name": server_name, "url": server_url})
|
||
|
||
return True, f"Le serveur {server_name} a été installé avec succès!"
|
||
except subprocess.CalledProcessError as e:
|
||
return False, f"Erreur lors de l'installation du serveur {server_name}: {str(e)}"
|
||
except Exception as e:
|
||
return False, f"Erreur lors de l'installation du serveur {server_name}: {str(e)}"
|
||
|
||
def main():
|
||
st.set_page_config(
|
||
page_title="MCP Tools Manager",
|
||
page_icon="🛠️",
|
||
layout="wide"
|
||
)
|
||
|
||
st.title("🛠️ Gestionnaire d'Outils MCP")
|
||
st.write("Gérez vos outils MCP facilement")
|
||
|
||
# Section des serveurs MCP dans la barre latérale
|
||
st.sidebar.header("Serveurs MCP")
|
||
|
||
# Formulaire d'ajout de serveur
|
||
with st.sidebar.expander("➕ Ajouter un nouveau serveur"):
|
||
new_server_name = st.text_input("Nom du serveur")
|
||
new_server_url = st.text_input("URL du serveur (URL Git)")
|
||
if st.button("Installer le serveur"):
|
||
if new_server_name and new_server_url:
|
||
success, message = install_server(new_server_url, new_server_name)
|
||
if success:
|
||
st.success(message)
|
||
else:
|
||
st.error(message)
|
||
else:
|
||
st.warning("Veuillez remplir tous les champs")
|
||
|
||
# Liste des serveurs installés
|
||
st.sidebar.subheader("Serveurs installés")
|
||
installed_servers = get_installed_servers()
|
||
for server in installed_servers:
|
||
st.sidebar.info(f"📡 {server['name']}\n{server['url']}")
|
||
|
||
# Récupération des outils disponibles
|
||
tools = get_available_tools()
|
||
|
||
# Affichage des outils dans un tableau
|
||
st.subheader("Outils Disponibles")
|
||
|
||
for tool in tools:
|
||
with st.expander(f"{tool['name']} - v{tool['version']}"):
|
||
col1, col2 = st.columns([3, 1])
|
||
with col1:
|
||
st.write(tool['description'])
|
||
st.caption(f"Repository: {tool['repo_url']}")
|
||
with col2:
|
||
if tool['installed']:
|
||
st.success("✅ Installé")
|
||
else:
|
||
if st.button("Installer", key=f"install_{tool['name']}"):
|
||
success, message = install_tool(tool['name'])
|
||
if success:
|
||
st.success(message)
|
||
else:
|
||
st.error(message)
|
||
|
||
# Section d'information
|
||
st.sidebar.header("À propos")
|
||
st.sidebar.info("""
|
||
Cette application permet de gérer les outils MCP (Message Control Protocol).
|
||
Vous pouvez installer et gérer différents outils pour interagir avec différents services.
|
||
|
||
Les outils sont installés en mode développement (-e) pour faciliter les modifications.
|
||
""")
|
||
|
||
if __name__ == "__main__":
|
||
main() |