409 lines
16 KiB
Python
409 lines
16 KiB
Python
import traceback
|
|
import threading
|
|
import queue
|
|
from langchain.prompts import ChatPromptTemplate
|
|
from langchain_ollama import ChatOllama
|
|
from rag_chatbot import MultimodalRAGChatbot
|
|
from config.settings import QDRANT_URL, QDRANT_COLLECTION_NAME, EMBEDDING_MODEL, OLLAMA_URL, DEFAULT_MODEL
|
|
from translations.lang_mappings import LANGUAGE_MAPPING
|
|
from utils.image_utils import base64_to_image
|
|
from langchain.callbacks.base import BaseCallbackHandler
|
|
import re
|
|
from typing import List, Union, Dict, Any
|
|
# Pour Gradio 4.x
|
|
# from gradio.types.message import ImageMessage, HtmlMessage, TextMessage
|
|
|
|
def clean_llm_response(text):
|
|
"""Nettoie la réponse du LLM en enlevant les balises de pensée et autres éléments non désirés."""
|
|
# Supprimer les blocs de pensée (<think>...</think>)
|
|
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
|
|
# Supprimer les espaces supplémentaires au début de la réponse
|
|
text = text.lstrip()
|
|
return text
|
|
# Handler personnalisé pour le streaming
|
|
class GradioStreamingHandler(BaseCallbackHandler):
|
|
def __init__(self):
|
|
self.tokens_queue = queue.Queue()
|
|
self.full_text = ""
|
|
|
|
def on_llm_new_token(self, token, **kwargs):
|
|
self.tokens_queue.put(token)
|
|
self.full_text += token
|
|
|
|
# Initialiser le chatbot
|
|
rag_bot = MultimodalRAGChatbot(
|
|
qdrant_url=QDRANT_URL,
|
|
qdrant_collection_name=QDRANT_COLLECTION_NAME,
|
|
ollama_model=DEFAULT_MODEL,
|
|
embedding_model=EMBEDDING_MODEL,
|
|
ollama_url=OLLAMA_URL
|
|
)
|
|
print(f"Chatbot initialisé avec modèle: {DEFAULT_MODEL}")
|
|
|
|
# Variables globales
|
|
current_images = []
|
|
current_tables = []
|
|
|
|
# Fonctions utilitaires
|
|
def display_images(images_list=None):
|
|
"""Crée une liste de tuples (image, caption) pour Gradio Gallery"""
|
|
images_to_use = images_list if images_list is not None else current_images
|
|
|
|
if not images_to_use:
|
|
return None
|
|
|
|
gallery = []
|
|
for img_data in images_to_use:
|
|
image = img_data["image"]
|
|
if image:
|
|
# Supprimer les infos de type "(Texte 5)" dans la caption
|
|
caption = re.sub(pattern_texte, '', img_data["caption"])
|
|
caption = f"{caption} (Source: {img_data['source']}, Page: {img_data['page']})"
|
|
gallery.append((image, caption))
|
|
|
|
return gallery if gallery else None
|
|
|
|
def display_tables(tables_list=None, language=None):
|
|
"""Crée le HTML pour afficher les tableaux"""
|
|
tables_to_use = tables_list if tables_list is not None else current_tables
|
|
|
|
if not tables_to_use:
|
|
return None
|
|
|
|
html = ""
|
|
for idx, table in enumerate(tables_to_use):
|
|
table_data = table['data']
|
|
table_html = ""
|
|
|
|
try:
|
|
if isinstance(table_data, str):
|
|
if '|' in table_data:
|
|
rows = table_data.strip().split('\n')
|
|
table_html = '<div class="table-container"><table>'
|
|
|
|
for i, row in enumerate(rows):
|
|
if i == 1 and all(c in ':-|' for c in row):
|
|
continue
|
|
|
|
cells = row.split('|')
|
|
|
|
if cells and cells[0].strip() == '':
|
|
cells = cells[1:]
|
|
if cells and cells[-1].strip() == '':
|
|
cells = cells[:-1]
|
|
|
|
if cells:
|
|
is_header = (i == 0)
|
|
table_html += '<tr>'
|
|
for cell in cells:
|
|
cell_content = cell.strip()
|
|
if is_header:
|
|
table_html += f'<th>{cell_content}</th>'
|
|
else:
|
|
table_html += f'<td>{cell_content}</td>'
|
|
table_html += '</tr>'
|
|
|
|
table_html += '</table></div>'
|
|
else:
|
|
table_html = f'<pre>{table_data}</pre>'
|
|
else:
|
|
table_html = f'<pre>{table_data}</pre>'
|
|
except Exception as e:
|
|
print(f"Error formatting table {idx}: {e}")
|
|
table_html = f'<pre>{table_data}</pre>'
|
|
|
|
html += f"""
|
|
<div style="margin-bottom: 20px; border: 1px solid #ddd; padding: 15px; border-radius: 8px;">
|
|
<h3>{table.get('caption', 'Tableau')}</h3>
|
|
<p style="color:#666; font-size:0.9em;">Source: {table.get('source', 'N/A')}, Page: {table.get('page', 'N/A')}</p>
|
|
<p><strong>Description:</strong> {table.get('description', '')}</p>
|
|
{table_html}
|
|
</div>
|
|
"""
|
|
|
|
return html if html else None
|
|
|
|
# Fonction pour changer de modèle
|
|
def change_model(model_name, language="Français"):
|
|
global rag_bot
|
|
|
|
try:
|
|
rag_bot = MultimodalRAGChatbot(
|
|
qdrant_url=QDRANT_URL,
|
|
qdrant_collection_name=QDRANT_COLLECTION_NAME,
|
|
ollama_model=model_name,
|
|
embedding_model=EMBEDDING_MODEL,
|
|
ollama_url=OLLAMA_URL
|
|
)
|
|
print(f"Modèle changé pour: {model_name}")
|
|
return f"✅ Modèle changé pour: {model_name}"
|
|
except Exception as e:
|
|
print(f"Erreur lors du changement de modèle: {e}")
|
|
return f"❌ Erreur: {str(e)}"
|
|
|
|
# Fonction pour changer de collection
|
|
def change_collection(collection_name, language="Français"):
|
|
global rag_bot
|
|
|
|
try:
|
|
rag_bot = MultimodalRAGChatbot(
|
|
qdrant_url=QDRANT_URL,
|
|
qdrant_collection_name=collection_name,
|
|
ollama_model=rag_bot.llm.model,
|
|
embedding_model=EMBEDDING_MODEL,
|
|
ollama_url=OLLAMA_URL
|
|
)
|
|
print(f"Collection changée pour: {collection_name}")
|
|
return f"✅ Collection changée pour: {collection_name}"
|
|
except Exception as e:
|
|
print(f"Erreur lors du changement de collection: {e}")
|
|
return f"❌ Erreur: {str(e)}"
|
|
|
|
# Fonction de traitement de requête
|
|
def convert_to_messages_format(history):
|
|
"""Convertit différents formats d'historique au format messages."""
|
|
messages = []
|
|
|
|
# Vérifier si nous avons déjà le format messages
|
|
if history and isinstance(history[0], dict) and "role" in history[0]:
|
|
return history
|
|
|
|
# Format tuples [(user_msg, assistant_msg), ...]
|
|
try:
|
|
for item in history:
|
|
if isinstance(item, tuple) and len(item) == 2:
|
|
user_msg, assistant_msg = item
|
|
messages.append({"role": "user", "content": user_msg})
|
|
if assistant_msg: # Éviter les messages vides
|
|
messages.append({"role": "assistant", "content": assistant_msg})
|
|
except Exception as e:
|
|
# Journaliser l'erreur pour le débogage
|
|
print(f"Format d'historique non reconnu: {history}")
|
|
print(f"Erreur: {str(e)}")
|
|
# Retourner un historique vide en cas d'erreur
|
|
return []
|
|
|
|
return messages
|
|
|
|
# Définir le pattern de l'expression régulière en dehors de la f-string
|
|
pattern_texte = r'\(Texte \d+\)'
|
|
|
|
def process_query(message, history, streaming, show_sources, max_images, language):
|
|
global current_images, current_tables
|
|
|
|
print(f"Language selected for response: {language} -> {LANGUAGE_MAPPING.get(language, 'français')}")
|
|
|
|
if not message.strip():
|
|
return history, "", None, None
|
|
|
|
current_images = []
|
|
current_tables = []
|
|
|
|
try:
|
|
# Convert history to messages format
|
|
messages_history = convert_to_messages_format(history)
|
|
|
|
if streaming:
|
|
# Add user message to history
|
|
messages_history.append({"role": "user", "content": message})
|
|
# Add empty message for assistant response
|
|
messages_history.append({"role": "assistant", "content": ""})
|
|
|
|
# Get relevant documents
|
|
docs = rag_bot._retrieve_relevant_documents(message)
|
|
|
|
# Process context and history
|
|
context = rag_bot._format_documents(docs)
|
|
history_text = rag_bot._format_chat_history()
|
|
|
|
# Create prompt
|
|
prompt_template = ChatPromptTemplate.from_template("""
|
|
You are a specialized document assistant that uses the provided context.
|
|
|
|
===== CRITICAL LANGUAGE INSTRUCTION =====
|
|
RESPOND ONLY IN {language}. This is an ABSOLUTE requirement.
|
|
NEVER RESPOND in any language other than {language}, regardless of question language.
|
|
==============================================
|
|
|
|
Specific instructions:
|
|
1. For each image mentioned: include caption, source, page and description
|
|
2. For each table: include title, source, page and significance
|
|
3. For equations: use exact LaTeX syntax
|
|
4. Don't invent information outside the provided context
|
|
5. Cite sources precisely
|
|
|
|
Conversation history:
|
|
{chat_history}
|
|
|
|
Context:
|
|
{context}
|
|
|
|
Question: {question}
|
|
|
|
Respond in a structured way incorporating available images, tables and equations.
|
|
YOUR RESPONSE MUST BE SOLELY AND ENTIRELY IN {language}. THIS RULE IS ABSOLUTE.
|
|
""")
|
|
|
|
# Set language for the response
|
|
selected_language = LANGUAGE_MAPPING.get(language, "français")
|
|
messages = prompt_template.format_messages(
|
|
chat_history=history_text,
|
|
context=context,
|
|
question=message,
|
|
language=selected_language
|
|
)
|
|
|
|
# Create streaming handler
|
|
handler = GradioStreamingHandler()
|
|
|
|
# Create LLM model with our handler
|
|
streaming_llm = ChatOllama(
|
|
model=rag_bot.llm.model,
|
|
base_url=rag_bot.llm.base_url,
|
|
streaming=True,
|
|
callbacks=[handler]
|
|
)
|
|
|
|
# Generate response in a separate thread
|
|
def generate_response():
|
|
streaming_llm.invoke(messages)
|
|
|
|
thread = threading.Thread(target=generate_response)
|
|
thread.start()
|
|
|
|
# Process tokens and update interface
|
|
partial_response = ""
|
|
|
|
# Wait for tokens with timeout
|
|
while thread.is_alive() or not handler.tokens_queue.empty():
|
|
try:
|
|
token = handler.tokens_queue.get(timeout=0.05)
|
|
partial_response += token
|
|
|
|
# Clean response for display
|
|
clean_response = clean_llm_response(partial_response)
|
|
# Update assistant message - JUST TEXT, not multimodal
|
|
messages_history[-1]["content"] = clean_response
|
|
yield messages_history, "", None, None
|
|
except queue.Empty:
|
|
continue
|
|
|
|
# After loop, clean the complete response for internal history
|
|
partial_response = clean_llm_response(partial_response)
|
|
rag_bot.chat_history.append({"role": "user", "content": message})
|
|
rag_bot.chat_history.append({"role": "assistant", "content": partial_response})
|
|
|
|
# Get sources, images, tables
|
|
texts, images, tables = rag_bot._process_documents(docs)
|
|
|
|
# Process sources
|
|
source_info = ""
|
|
if texts:
|
|
clean_texts = [re.sub(pattern_texte, '', t.get("source", "")) for t in texts]
|
|
# Remove duplicates and empty items
|
|
clean_texts = [t for t in clean_texts if t.strip()]
|
|
clean_texts = list(set(clean_texts))
|
|
if clean_texts:
|
|
source_info += f"📚 Sources: {', '.join(clean_texts)} • "
|
|
|
|
# Process images and tables for SEPARATE display only
|
|
if show_sources and images and max_images > 0:
|
|
for img in images[:max_images]:
|
|
img_data = img.get("image_data")
|
|
if img_data:
|
|
image = base64_to_image(img_data)
|
|
if image:
|
|
caption = re.sub(pattern_texte, '', img.get("caption", ""))
|
|
# Only add to gallery, not to chat messages
|
|
current_images.append({
|
|
"image": image,
|
|
"caption": caption,
|
|
"source": img.get("source", ""),
|
|
"page": img.get("page", "")
|
|
})
|
|
|
|
# Final yield with separate image gallery
|
|
yield messages_history, source_info, display_images(), display_tables()
|
|
|
|
else:
|
|
# Version non-streaming
|
|
print("Mode non-streaming activé")
|
|
source_info = ""
|
|
|
|
history_tuples = history if isinstance(history, list) else []
|
|
|
|
# Ajouter le message utilisateur à l'historique au format message
|
|
messages_history.append({"role": "user", "content": message})
|
|
|
|
# Initialize multimodal_content first
|
|
multimodal_content = [result["response"]] # Start with text response
|
|
|
|
# Après avoir obtenu le résultat
|
|
result = rag_bot.chat(
|
|
message,
|
|
stream=False,
|
|
language=LANGUAGE_MAPPING.get(language, "français") # Vérifiez que cette ligne existe
|
|
)
|
|
# Nettoyer la réponse des balises <think>
|
|
result["response"] = clean_llm_response(result["response"])
|
|
|
|
# Ajouter la réponse de l'assistant au format message
|
|
messages_history.append({"role": "assistant", "content": result["response"]})
|
|
|
|
# Mise à jour de l'historique interne du chatbot
|
|
rag_bot.chat_history.append({"role": "user", "content": message})
|
|
rag_bot.chat_history.append({"role": "assistant", "content": result["response"]})
|
|
|
|
# Traiter les sources
|
|
if "texts" in result:
|
|
source_info += f"📚 {len(result['texts'])} textes • "
|
|
if "images" in result:
|
|
source_info += f"🖼️ {len(result['images'])} images • "
|
|
if "tables" in result:
|
|
source_info += f"📊 {len(result['tables'])} tableaux"
|
|
|
|
if source_info:
|
|
source_info = "Sources trouvées: " + source_info
|
|
|
|
# Process images for SEPARATE gallery
|
|
if show_sources and "images" in result and result["images"]:
|
|
for img in result["images"][:max_images]:
|
|
img_data = img.get("image_data")
|
|
if img_data:
|
|
image = base64_to_image(img_data)
|
|
if image:
|
|
caption = re.sub(pattern_texte, '', img.get("caption", ""))
|
|
# Only add to gallery
|
|
current_images.append({
|
|
"image": image,
|
|
"caption": caption,
|
|
"source": img.get("source", ""),
|
|
"page": img.get("page", "")
|
|
})
|
|
|
|
# Final yield with separate displays
|
|
yield messages_history, source_info, display_images(), display_tables()
|
|
|
|
except Exception as e:
|
|
error_msg = f"Une erreur est survenue: {str(e)}"
|
|
traceback_text = traceback.format_exc()
|
|
print(error_msg)
|
|
print(traceback_text)
|
|
|
|
# Formater l'erreur au format message
|
|
error_history = convert_to_messages_format(history)
|
|
error_history.append({"role": "user", "content": message})
|
|
error_history.append({"role": "assistant", "content": error_msg})
|
|
|
|
yield error_history, "Erreur lors du traitement de la requête", None, None
|
|
|
|
# Fonction pour réinitialiser la conversation
|
|
def reset_conversation():
|
|
global current_images, current_tables
|
|
current_images = []
|
|
current_tables = []
|
|
|
|
rag_bot.clear_history()
|
|
|
|
# Retourner une liste vide au format messages
|
|
return [], "", None, None # Liste vide = pas de messages |