385 lines
16 KiB
Python

import traceback
import threading
import queue
from langchain.prompts import ChatPromptTemplate
from langchain_ollama import ChatOllama
from rag_chatbot import MultimodalRAGChatbot
from config.settings import QDRANT_URL, QDRANT_COLLECTION_NAME, EMBEDDING_MODEL, OLLAMA_URL, DEFAULT_MODEL
from translations.lang_mappings import LANGUAGE_MAPPING
from utils.image_utils import base64_to_image
from langchain.callbacks.base import BaseCallbackHandler
import re
def clean_llm_response(text):
"""Nettoie la réponse du LLM en enlevant les balises de pensée et autres éléments non désirés."""
# Supprimer les blocs de pensée (<think>...</think>)
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
# Supprimer les espaces supplémentaires au début de la réponse
text = text.lstrip()
return text
# Handler personnalisé pour le streaming
class GradioStreamingHandler(BaseCallbackHandler):
def __init__(self):
self.tokens_queue = queue.Queue()
self.full_text = ""
def on_llm_new_token(self, token, **kwargs):
self.tokens_queue.put(token)
self.full_text += token
# Initialiser le chatbot
rag_bot = MultimodalRAGChatbot(
qdrant_url=QDRANT_URL,
qdrant_collection_name=QDRANT_COLLECTION_NAME,
ollama_model=DEFAULT_MODEL,
embedding_model=EMBEDDING_MODEL,
ollama_url=OLLAMA_URL
)
print(f"Chatbot initialisé avec modèle: {DEFAULT_MODEL}")
# Variables globales
current_images = []
current_tables = []
# Fonctions utilitaires
def display_images(images_list=None):
"""Crée une liste de tuples (image, caption) pour Gradio Gallery"""
images_to_use = images_list if images_list is not None else current_images
if not images_to_use:
return None
gallery = []
for img_data in images_to_use:
image = img_data["image"]
if image:
caption = f"{img_data['caption']} (Source: {img_data['source']}, Page: {img_data['page']})"
gallery.append((image, caption))
return gallery if gallery else None
def display_tables(tables_list=None, language=None):
"""Crée le HTML pour afficher les tableaux"""
tables_to_use = tables_list if tables_list is not None else current_tables
if not tables_to_use:
return None
html = ""
for idx, table in enumerate(tables_to_use):
table_data = table['data']
table_html = ""
try:
if isinstance(table_data, str):
if '|' in table_data:
rows = table_data.strip().split('\n')
table_html = '<div class="table-container"><table>'
for i, row in enumerate(rows):
if i == 1 and all(c in ':-|' for c in row):
continue
cells = row.split('|')
if cells and cells[0].strip() == '':
cells = cells[1:]
if cells and cells[-1].strip() == '':
cells = cells[:-1]
if cells:
is_header = (i == 0)
table_html += '<tr>'
for cell in cells:
cell_content = cell.strip()
if is_header:
table_html += f'<th>{cell_content}</th>'
else:
table_html += f'<td>{cell_content}</td>'
table_html += '</tr>'
table_html += '</table></div>'
else:
table_html = f'<pre>{table_data}</pre>'
else:
table_html = f'<pre>{table_data}</pre>'
except Exception as e:
print(f"Error formatting table {idx}: {e}")
table_html = f'<pre>{table_data}</pre>'
html += f"""
<div style="margin-bottom: 20px; border: 1px solid #ddd; padding: 15px; border-radius: 8px;">
<h3>{table.get('caption', 'Tableau')}</h3>
<p style="color:#666; font-size:0.9em;">Source: {table.get('source', 'N/A')}, Page: {table.get('page', 'N/A')}</p>
<p><strong>Description:</strong> {table.get('description', '')}</p>
{table_html}
</div>
"""
return html if html else None
# Fonction pour changer de modèle
def change_model(model_name, language="Français"):
global rag_bot
try:
rag_bot = MultimodalRAGChatbot(
qdrant_url=QDRANT_URL,
qdrant_collection_name=QDRANT_COLLECTION_NAME,
ollama_model=model_name,
embedding_model=EMBEDDING_MODEL,
ollama_url=OLLAMA_URL
)
print(f"Modèle changé pour: {model_name}")
return f"✅ Modèle changé pour: {model_name}"
except Exception as e:
print(f"Erreur lors du changement de modèle: {e}")
return f"❌ Erreur: {str(e)}"
# Fonction pour changer de collection
def change_collection(collection_name, language="Français"):
global rag_bot
try:
rag_bot = MultimodalRAGChatbot(
qdrant_url=QDRANT_URL,
qdrant_collection_name=collection_name,
ollama_model=rag_bot.llm.model,
embedding_model=EMBEDDING_MODEL,
ollama_url=OLLAMA_URL
)
print(f"Collection changée pour: {collection_name}")
return f"✅ Collection changée pour: {collection_name}"
except Exception as e:
print(f"Erreur lors du changement de collection: {e}")
return f"❌ Erreur: {str(e)}"
# Fonction de traitement de requête
def process_query(message, history, streaming, show_sources, max_images, language):
global current_images, current_tables
if not message.strip():
return history, "", None, None
current_images = []
current_tables = []
print(f"Traitement du message: {message}")
print(f"Streaming: {streaming}")
try:
if streaming:
# Version avec streaming dans Gradio
history = history + [(message, "")]
# 1. Récupérer les documents pertinents
docs = rag_bot._retrieve_relevant_documents(message)
# 2. Préparer le contexte et l'historique
context = rag_bot._format_documents(docs)
history_text = rag_bot._format_chat_history()
# 3. Préparer le prompt
prompt_template = ChatPromptTemplate.from_template("""
Tu es un assistant documentaire spécialisé qui utilise toutes les informations disponibles dans le contexte fourni.
TRÈS IMPORTANT: Tu dois répondre EXCLUSIVEMENT en {language}. Ne réponds JAMAIS dans une autre langue.
Instructions spécifiques:
1. Pour chaque image mentionnée dans le contexte, inclue TOUJOURS dans ta réponse:
- La légende/caption exacte de l'image
- La source et le numéro de page
- Une description brève de ce qu'elle montre
2. Pour chaque tableau mentionné dans le contexte, inclue TOUJOURS:
- Le titre/caption exact du tableau
- La source et le numéro de page
- Ce que contient et signifie le tableau
3. Lorsque tu cites des équations mathématiques:
- Utilise la syntaxe LaTeX exacte comme dans le document ($...$ ou $$...$$)
- Reproduis-les fidèlement sans modification
4. IMPORTANT: Ne pas inventer d'informations - si une donnée n'est pas explicitement fournie dans le contexte,
indique clairement que cette information n'est pas disponible dans les documents fournis.
5. Cite précisément les sources pour chaque élément d'information (format: [Source, Page]).
6. CRUCIAL: Ta réponse doit être UNIQUEMENT et INTÉGRALEMENT en {language}, quelle que soit la langue de la question.
Historique de conversation:
{chat_history}
Contexte (à utiliser pour répondre):
{context}
Question: {question}
Réponds de façon structurée et précise en intégrant activement les images, tableaux et équations disponibles dans le contexte.
Ta réponse doit être exclusivement en {language}.
""")
# 4. Formater les messages pour le LLM
messages = prompt_template.format_messages(
chat_history=history_text,
context=context,
question=message,
language=LANGUAGE_MAPPING.get(language, "français")
)
# 5. Créer un handler de streaming personnalisé
handler = GradioStreamingHandler()
# 6. Créer un modèle LLM avec notre handler
streaming_llm = ChatOllama(
model=rag_bot.llm.model,
base_url=rag_bot.llm.base_url,
streaming=True,
callbacks=[handler]
)
# 7. Lancer la génération dans un thread pour ne pas bloquer l'UI
def generate_response():
streaming_llm.invoke(messages)
thread = threading.Thread(target=generate_response)
thread.start()
# 8. Récupérer les tokens et mettre à jour l'interface
partial_response = ""
# Attendre les tokens avec un timeout
while thread.is_alive() or not handler.tokens_queue.empty():
try:
token = handler.tokens_queue.get(timeout=0.05)
partial_response += token
# Nettoyer la réponse uniquement pour l'affichage (pas pour l'historique interne)
clean_response = clean_llm_response(partial_response)
history[-1] = (message, clean_response)
yield history, "", None, None
except queue.Empty:
continue
# Après la boucle, nettoyer la réponse complète pour l'historique interne
partial_response = clean_llm_response(partial_response)
rag_bot.chat_history.append({"role": "user", "content": message})
rag_bot.chat_history.append({"role": "assistant", "content": partial_response})
# 10. Récupérer les sources, images, tableaux
texts, images, tables = rag_bot._process_documents(docs)
# Préparer les informations sur les sources
source_info = ""
if texts:
source_info += f"📚 {len(texts)} textes • "
if images:
source_info += f"🖼️ {len(images)} images • "
if tables:
source_info += f"📊 {len(tables)} tableaux"
if source_info:
source_info = "Sources trouvées: " + source_info
# 11. Traiter les images
if show_sources and images:
images = images[:max_images]
for img in images:
img_data = img.get("image_data")
if img_data:
image = base64_to_image(img_data)
if image:
current_images.append({
"image": image,
"caption": img.get("caption", ""),
"source": img.get("source", ""),
"page": img.get("page", ""),
"description": img.get("description", "")
})
# 12. Traiter les tableaux
if show_sources and tables:
for table in tables:
current_tables.append({
"data": rag_bot.format_table(table.get("table_data", "")),
"caption": table.get("caption", ""),
"source": table.get("source", ""),
"page": table.get("page", ""),
"description": table.get("description", "")
})
# 13. Retourner les résultats finaux
images_display = display_images()
tables_display = display_tables()
yield history, source_info, images_display, tables_display
else:
# Version sans streaming
print("Mode non-streaming activé")
source_info = ""
result = rag_bot.chat(message, stream=False)
# Nettoyer la réponse des balises <think>
result["response"] = clean_llm_response(result["response"])
history = history + [(message, result["response"])]
# Mise à jour de l'historique interne
rag_bot.chat_history.append({"role": "user", "content": message})
rag_bot.chat_history.append({"role": "assistant", "content": result["response"]})
# Traiter les sources
if "texts" in result:
source_info += f"📚 {len(result['texts'])} textes • "
if "images" in result:
source_info += f"🖼️ {len(result['images'])} images • "
if "tables" in result:
source_info += f"📊 {len(result['tables'])} tableaux"
if source_info:
source_info = "Sources trouvées: " + source_info
# Traiter les images et tableaux
if show_sources and "images" in result and result["images"]:
images = result["images"][:max_images]
for img in images:
img_data = img.get("image_data")
if img_data:
image = base64_to_image(img_data)
if image:
current_images.append({
"image": image,
"caption": img.get("caption", ""),
"source": img.get("source", ""),
"page": img.get("page", ""),
"description": img.get("description", "")
})
if show_sources and "tables" in result and result["tables"]:
tables = result["tables"]
for table in tables:
current_tables.append({
"data": rag_bot.format_table(table.get("table_data", "")),
"caption": table.get("caption", ""),
"source": table.get("source", ""),
"page": table.get("page", ""),
"description": table.get("description", "")
})
yield history, source_info, display_images(), display_tables()
except Exception as e:
error_msg = f"Une erreur est survenue: {str(e)}"
traceback_text = traceback.format_exc()
print(error_msg)
print(traceback_text)
history = history + [(message, error_msg)]
yield history, "Erreur lors du traitement de la requête", None, None
# Fonction pour réinitialiser la conversation
def reset_conversation():
global current_images, current_tables
current_images = []
current_tables = []
rag_bot.clear_history()
return [], "", None, None