413 lines
17 KiB
Python

import traceback
import threading
import queue
from langchain.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama
from rag_chatbot import MultimodalRAGChatbot
from config.settings import QDRANT_URL, QDRANT_COLLECTION_NAME, EMBEDDING_MODEL, OLLAMA_URL, DEFAULT_MODEL
from translations.lang_mappings import LANGUAGE_MAPPING
from utils.image_utils import base64_to_image
from langchain.callbacks.base import BaseCallbackHandler
import re
def clean_llm_response(text):
"""Nettoie la réponse du LLM en enlevant les balises de pensée et autres éléments non désirés."""
# Supprimer les blocs de pensée (<think>...</think>)
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
# Supprimer les espaces supplémentaires au début de la réponse
text = text.lstrip()
return text
# Handler personnalisé pour le streaming
class GradioStreamingHandler(BaseCallbackHandler):
def __init__(self):
self.tokens_queue = queue.Queue()
self.full_text = ""
def on_llm_new_token(self, token, **kwargs):
self.tokens_queue.put(token)
self.full_text += token
# Initialiser le chatbot
rag_bot = MultimodalRAGChatbot(
qdrant_url=QDRANT_URL,
qdrant_collection_name=QDRANT_COLLECTION_NAME,
ollama_model=DEFAULT_MODEL,
embedding_model=EMBEDDING_MODEL,
ollama_url=OLLAMA_URL
)
print(f"Chatbot initialisé avec modèle: {DEFAULT_MODEL}")
# Variables globales
current_images = []
current_tables = []
# Fonctions utilitaires
def display_images(images_list=None):
"""Crée une liste de tuples (image, caption) pour Gradio Gallery"""
images_to_use = images_list if images_list is not None else current_images
if not images_to_use:
return None
gallery = []
for img_data in images_to_use:
image = img_data["image"]
if image:
caption = f"{img_data['caption']} (Source: {img_data['source']}, Page: {img_data['page']})"
gallery.append((image, caption))
return gallery if gallery else None
def display_tables(tables_list=None, language=None):
"""Crée le HTML pour afficher les tableaux"""
tables_to_use = tables_list if tables_list is not None else current_tables
if not tables_to_use:
return None
html = ""
for idx, table in enumerate(tables_to_use):
table_data = table['data']
table_html = ""
try:
if isinstance(table_data, str):
if '|' in table_data:
rows = table_data.strip().split('\n')
table_html = '<div class="table-container"><table>'
for i, row in enumerate(rows):
if i == 1 and all(c in ':-|' for c in row):
continue
cells = row.split('|')
if cells and cells[0].strip() == '':
cells = cells[1:]
if cells and cells[-1].strip() == '':
cells = cells[:-1]
if cells:
is_header = (i == 0)
table_html += '<tr>'
for cell in cells:
cell_content = cell.strip()
if is_header:
table_html += f'<th>{cell_content}</th>'
else:
table_html += f'<td>{cell_content}</td>'
table_html += '</tr>'
table_html += '</table></div>'
else:
table_html = f'<pre>{table_data}</pre>'
else:
table_html = f'<pre>{table_data}</pre>'
except Exception as e:
print(f"Error formatting table {idx}: {e}")
table_html = f'<pre>{table_data}</pre>'
html += f"""
<div style="margin-bottom: 20px; border: 1px solid #ddd; padding: 15px; border-radius: 8px;">
<h3>{table.get('caption', 'Tableau')}</h3>
<p style="color:#666; font-size:0.9em;">Source: {table.get('source', 'N/A')}, Page: {table.get('page', 'N/A')}</p>
<p><strong>Description:</strong> {table.get('description', '')}</p>
{table_html}
</div>
"""
return html if html else None
# Fonction pour changer de modèle
def change_model(model_name, language="Français"):
global rag_bot
try:
rag_bot = MultimodalRAGChatbot(
qdrant_url=QDRANT_URL,
qdrant_collection_name=QDRANT_COLLECTION_NAME,
ollama_model=model_name,
embedding_model=EMBEDDING_MODEL,
ollama_url=OLLAMA_URL
)
print(f"Modèle changé pour: {model_name}")
return f"✅ Modèle changé pour: {model_name}"
except Exception as e:
print(f"Erreur lors du changement de modèle: {e}")
return f"❌ Erreur: {str(e)}"
# Fonction pour changer de collection
def change_collection(collection_name, language="Français"):
global rag_bot
try:
rag_bot = MultimodalRAGChatbot(
qdrant_url=QDRANT_URL,
qdrant_collection_name=collection_name,
ollama_model=rag_bot.llm.model,
embedding_model=EMBEDDING_MODEL,
ollama_url=OLLAMA_URL
)
print(f"Collection changée pour: {collection_name}")
return f"✅ Collection changée pour: {collection_name}"
except Exception as e:
print(f"Erreur lors du changement de collection: {e}")
return f"❌ Erreur: {str(e)}"
# Fonction de traitement de requête
def convert_to_messages_format(history):
"""Convertit différents formats d'historique au format messages."""
messages = []
# Vérifier si nous avons déjà le format messages
if history and isinstance(history[0], dict) and "role" in history[0]:
return history
# Format tuples [(user_msg, assistant_msg), ...]
try:
for item in history:
if isinstance(item, tuple) and len(item) == 2:
user_msg, assistant_msg = item
messages.append({"role": "user", "content": user_msg})
if assistant_msg: # Éviter les messages vides
messages.append({"role": "assistant", "content": assistant_msg})
except ValueError:
# Journaliser l'erreur pour le débogage
print(f"Format d'historique non reconnu: {history}")
# Retourner un historique vide en cas d'erreur
return []
return messages
def process_query(message, history, streaming, show_sources, max_images, language):
global current_images, current_tables
# Debug plus clair
print(f"Langue sélectionnée pour la réponse: {language} -> {LANGUAGE_MAPPING.get(language, 'français')}")
if not message.strip():
return history, "", None, None
current_images = []
current_tables = []
print(f"Traitement du message: {message}")
print(f"Streaming: {streaming}")
try:
if streaming:
# Convertir history en format messages pour l'affichage
messages_history = convert_to_messages_format(history)
messages_history.append({"role": "user", "content": message})
messages_history.append({"role": "assistant", "content": ""})
# 1. Récupérer les documents pertinents
docs = rag_bot._retrieve_relevant_documents(message)
# 2. Préparer le contexte et l'historique
context = rag_bot._format_documents(docs)
history_text = rag_bot._format_chat_history()
# 3. Préparer le prompt
prompt_template = ChatPromptTemplate.from_template("""
Tu es un assistant documentaire spécialisé qui utilise le contexte fourni.
===== INSTRUCTION CRUCIALE SUR LA LANGUE =====
RÉPONDS UNIQUEMENT EN {language}. C'est une exigence ABSOLUE.
NE RÉPONDS JAMAIS dans une autre langue que {language}, quelle que soit la langue de la question.
==============================================
Instructions spécifiques:
1. Pour chaque image mentionnée: inclure la légende, source, page et description
2. Pour chaque tableau: inclure titre, source, page et signification
3. Pour les équations: utiliser la syntaxe LaTeX exacte
4. Ne pas inventer d'informations hors du contexte fourni
5. Citer précisément les sources
Historique de conversation:
{chat_history}
Contexte:
{context}
Question: {question}
Réponds de façon structurée en intégrant les images, tableaux et équations disponibles.
TA RÉPONSE DOIT ÊTRE UNIQUEMENT ET ENTIÈREMENT EN {language}. CETTE RÈGLE EST ABSOLUE.
""")
# Assurer que la langue est bien passée dans le format du prompt
selected_language = LANGUAGE_MAPPING.get(language, "français")
messages = prompt_template.format_messages(
chat_history=history_text,
context=context,
question=message,
language=selected_language
)
# 5. Créer un handler de streaming personnalisé
handler = GradioStreamingHandler()
# 6. Créer un modèle LLM avec notre handler
streaming_llm = ChatOllama(
model=rag_bot.llm.model,
base_url=rag_bot.llm.base_url,
streaming=True,
callbacks=[handler]
)
# 7. Lancer la génération dans un thread pour ne pas bloquer l'UI
def generate_response():
streaming_llm.invoke(messages)
thread = threading.Thread(target=generate_response)
thread.start()
# 8. Récupérer les tokens et mettre à jour l'interface
partial_response = ""
# Attendre les tokens avec un timeout
while thread.is_alive() or not handler.tokens_queue.empty():
try:
token = handler.tokens_queue.get(timeout=0.05)
partial_response += token
# Nettoyer la réponse uniquement pour l'affichage (pas pour l'historique interne)
clean_response = clean_llm_response(partial_response)
# Mettre à jour le dernier message (assistant)
messages_history[-1]["content"] = clean_response
yield messages_history, "", None, None
except queue.Empty:
continue
# Après la boucle, nettoyer la réponse complète pour l'historique interne
partial_response = clean_llm_response(partial_response)
rag_bot.chat_history.append({"role": "user", "content": message})
rag_bot.chat_history.append({"role": "assistant", "content": partial_response})
# 10. Récupérer les sources, images, tableaux
texts, images, tables = rag_bot._process_documents(docs)
# Préparer les informations sur les sources
source_info = ""
if texts:
source_info += f"📚 {len(texts)} textes • "
if images:
source_info += f"🖼️ {len(images)} images • "
if tables:
source_info += f"📊 {len(tables)} tableaux"
if source_info:
source_info = "Sources trouvées: " + source_info
# 11. Traiter les images
if show_sources and images:
images = images[:max_images]
for img in images:
img_data = img.get("image_data")
if img_data:
image = base64_to_image(img_data)
if image:
current_images.append({
"image": image,
"caption": img.get("caption", ""),
"source": img.get("source", ""),
"page": img.get("page", ""),
"description": img.get("description", "")
})
# 12. Traiter les tableaux
if show_sources and tables:
for table in tables:
current_tables.append({
"data": rag_bot.format_table(table.get("table_data", "")),
"caption": table.get("caption", ""),
"source": table.get("source", ""),
"page": table.get("page", ""),
"description": table.get("description", "")
})
# 13. Retourner les résultats finaux
images_display = display_images()
tables_display = display_tables()
yield messages_history, source_info, images_display, tables_display
else:
# Version sans streaming
print("Mode non-streaming activé")
source_info = ""
result = rag_bot.chat(
message,
stream=False,
language=LANGUAGE_MAPPING.get(language, "français") # Vérifiez que cette ligne existe
)
# Nettoyer la réponse des balises <think>
result["response"] = clean_llm_response(result["response"])
# Convertir l'historique au format messages
messages_history = convert_to_messages_format(history)
messages_history.append({"role": "user", "content": message})
messages_history.append({"role": "assistant", "content": result["response"]})
# Mise à jour de l'historique interne
rag_bot.chat_history.append({"role": "user", "content": message})
rag_bot.chat_history.append({"role": "assistant", "content": result["response"]})
# Traiter les sources
if "texts" in result:
source_info += f"📚 {len(result['texts'])} textes • "
if "images" in result:
source_info += f"🖼️ {len(result['images'])} images • "
if "tables" in result:
source_info += f"📊 {len(result['tables'])} tableaux"
if source_info:
source_info = "Sources trouvées: " + source_info
# Traiter les images et tableaux
if show_sources and "images" in result and result["images"]:
images = result["images"][:max_images]
for img in images:
img_data = img.get("image_data")
if img_data:
image = base64_to_image(img_data)
if image:
current_images.append({
"image": image,
"caption": img.get("caption", ""),
"source": img.get("source", ""),
"page": img.get("page", ""),
"description": img.get("description", "")
})
if show_sources and "tables" in result and result["tables"]:
tables = result["tables"]
for table in tables:
current_tables.append({
"data": rag_bot.format_table(table.get("table_data", "")),
"caption": table.get("caption", ""),
"source": table.get("source", ""),
"page": table.get("page", ""),
"description": table.get("description", "")
})
yield messages_history, source_info, display_images(), display_tables()
except Exception as e:
error_msg = f"Une erreur est survenue: {str(e)}"
traceback_text = traceback.format_exc()
print(error_msg)
print(traceback_text)
history = history + [(message, error_msg)]
yield history, "Erreur lors du traitement de la requête", None, None
# Fonction pour réinitialiser la conversation
def reset_conversation():
global current_images, current_tables
current_images = []
current_tables = []
rag_bot.clear_history()
# Retourner une liste vide au format messages
return [], "", None, None