Chat_bot_Rag/pdfProcessing.py
sepehr 819d3a0956 Upload files to "/"
add progress bar plus folder inspect for pdf files
2025-03-09 12:31:02 +01:00

636 lines
26 KiB
Python

import os
import uuid
import pytesseract
from typing import Dict, List, Any, Optional, Union
from langchain_community.document_loaders import UnstructuredPDFLoader
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import httpx
from tqdm import tqdm
http_client = httpx.Client(verify=False)
class PdfProcessor:
"""
A configurable PDF processor that extracts text, images, and tables from PDFs,
summarizes them using LLMs, and stores them in a Qdrant vector database.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""
Initialize the PDF processor with the given configuration.
Args:
config: Dictionary of configuration options
"""
# Default configuration
self.config = {
# Embeddings
"embedding_provider": "ollama", # "ollama" or "openai"
"ollama_embedding_url": "http://localhost:11434",
"ollama_embedding_model": "mxbai-embed-large",
"openai_embedding_model": "text-embedding-3-small",
# LLM for text/table summarization
"summary_provider": "ollama", # "ollama" or "openai"
"ollama_summary_url": "http://localhost:11434",
"ollama_summary_model": "llama3.2",
"openai_summary_model": "gpt-3.5-turbo",
# Image processing
"image_provider": "ollama", # "ollama" or "openai"
"ollama_image_url": "http://localhost:11434",
"ollama_image_model": "llama3.2-vision",
"openai_image_model": "gpt-4o-mini",
# Vector store
"qdrant_url": "http://localhost:6333",
"collection_name": "pdf_documents",
# PDF processing
"extract_images": True,
"extract_tables": True,
"chunk_size": 10000,
"chunk_overlap": 2000,
"tesseract_path": r'C:\Program Files\Tesseract-OCR\tesseract.exe',
"image_output_dir": "./temp_images",
"summary_language": "English",
# API keys
"openai_api_key": None,
}
# Update with user-provided configuration
if config:
self.config.update(config)
# Set up components
self._setup_components()
self._setup_models()
def _setup_components(self):
"""Set up necessary components based on configuration."""
# Set up Tesseract for OCR
if self.config["tesseract_path"]:
pytesseract.pytesseract.tesseract_cmd = self.config["tesseract_path"]
# Set up OpenAI key if using OpenAI services
if (self.config["embedding_provider"] == "openai" or
self.config["summary_provider"] == "openai" or
self.config["image_provider"] == "openai"):
if not self.config["openai_api_key"]:
raise ValueError("OpenAI API key is required when using OpenAI models")
os.environ["OPENAI_API_KEY"] = self.config["openai_api_key"]
# Initialize Qdrant client
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
self.qdrant_client = QdrantClient(url=self.config["qdrant_url"])
# Check if collection exists and create it if not
collections = self.qdrant_client.get_collections().collections
collection_exists = any(collection.name == self.config["collection_name"] for collection in collections)
if not collection_exists:
# Get vector size based on embedding model
if self.config["embedding_provider"] == "ollama":
# For OllamaEmbeddings, typically 4096 dimensions for newer models
vector_size = 4096
else: # OpenAI
# OpenAI embedding dimensions vary by model
model_dimensions = {
"text-embedding-ada-002": 1536,
"text-embedding-3-small": 1536,
"text-embedding-3-large": 3072
}
vector_size = model_dimensions.get(self.config["openai_embedding_model"], 1536)
# Create the collection
self.qdrant_client.create_collection(
collection_name=self.config["collection_name"],
vectors_config=rest.VectorParams(
size=vector_size,
distance=rest.Distance.COSINE
)
)
print(f"Created new Qdrant collection: {self.config['collection_name']}")
def _setup_models(self):
"""Initialize models based on configuration."""
# Set up embedding model
if self.config["embedding_provider"] == "ollama":
from langchain_ollama import OllamaEmbeddings
self.embedding_model = OllamaEmbeddings(
base_url=self.config["ollama_embedding_url"],
model=self.config["ollama_embedding_model"]
)
else: # openai
from langchain_openai import OpenAIEmbeddings
self.embedding_model = OpenAIEmbeddings(
model=self.config["openai_embedding_model"]
)
# Set up text summarization model
if self.config["summary_provider"] == "ollama":
from langchain_ollama import OllamaLLM
self.summary_model = OllamaLLM(
base_url=self.config["ollama_summary_url"],
model=self.config["ollama_summary_model"]
)
else: # openai
from langchain_openai import ChatOpenAI
self.summary_model = ChatOpenAI(
http_client=http_client,
model=self.config["openai_summary_model"]
)
# Create summarization chain
prompt_text = """
You are an assistant tasked with summarizing tables and text.
Give a concise summary of the table or text.
Respond only with the summary, no additional comment.
Do not start your message by saying "Here is a summary" or anything like that.
Just give the summary as it is. All summaries will be in {language}
Text or table to summarize: {element}
"""
self.summarize_prompt = ChatPromptTemplate.from_template(prompt_text)
self.summarize_chain = {"element": lambda x: x, "language": lambda _: self.config["summary_language"]} | self.summarize_prompt | self.summary_model | StrOutputParser()
def process_pdf(self, pdf_path: str) -> Dict[str, Any]:
"""
Process a PDF file and store its contents in Qdrant.
Args:
pdf_path: Path to the PDF file
Returns:
Dictionary with processing statistics
"""
# Create a master progress bar
with tqdm(total=5, desc="PDF Processing", position=0) as master_bar:
# Load and extract content from PDF
master_bar.set_description("Loading PDF")
documents = self._load_pdf(pdf_path)
master_bar.update(1)
# Process text chunks
master_bar.set_description("Processing text chunks")
title_chunks = self._process_text(documents)
text_summaries = self._summarize_text(title_chunks)
processed_text = self._convert_text_to_documents(title_chunks, text_summaries)
master_bar.update(1)
# Process images if configured
master_bar.set_description("Processing images")
processed_images = []
if self.config["extract_images"]:
images = self._extract_images(documents)
image_summaries = self._process_images(images)
processed_images = self._convert_images_to_documents(images, image_summaries)
master_bar.update(1)
# Process tables if configured
master_bar.set_description("Processing tables")
processed_tables = []
if self.config["extract_tables"]:
tables = self._extract_tables(documents)
table_summaries = self._process_tables(tables)
processed_tables = self._convert_tables_to_documents(tables, table_summaries)
master_bar.update(1)
master_bar.set_description("Storing in Qdrant")
# Combine all processed elements
final_documents = processed_text + processed_images + processed_tables
# Store in Qdrant
self._store_documents(final_documents)
master_bar.update(1)
return {
"text_chunks": len(processed_text),
"image_chunks": len(processed_images),
"table_chunks": len(processed_tables),
"total_chunks": len(final_documents),
"collection_name": self.config["collection_name"]
}
def _load_pdf(self, pdf_path: str) -> List[Document]:
"""Load PDF and extract elements."""
loader = UnstructuredPDFLoader(
pdf_path,
infer_table_structure=True,
extract_images=self.config["extract_images"],
image_output_dir=self.config["image_output_dir"],
mode="elements",
strategy="hi_res",
extract_image_block_types=["Image"],
extract_image_block_to_payload=True,
)
return loader.load()
def _process_text(self, documents: List[Document]) -> List[Document]:
"""Process text and create title-based chunks."""
return self._chunk_by_title(
documents,
max_chunk_size=self.config["chunk_size"],
chunk_overlap=self.config["chunk_overlap"]
)
def _summarize_text(self, chunks: List[Document]) -> List[str]:
"""Generate summaries for text chunks."""
if not chunks:
return []
print(f"Summarizing {len(chunks)} text chunks...")
results = []
for chunk in tqdm(chunks, desc="Text summarization", leave=False):
result = self.summarize_chain.invoke(chunk.page_content)
results.append(result)
return results
def _extract_images(self, documents: List[Document]) -> List[Dict[str, Any]]:
"""Extract images with captions from documents."""
images_info = []
for i, chunk in enumerate(documents):
if chunk.metadata.get("category") == "Image":
image_b64 = chunk.metadata.get('image_base64')
caption = ""
# Look for caption in next chunk
if i < len(documents) - 1:
next_chunk = documents[i+1]
if next_chunk.metadata.get("category") == "FigureCaption":
caption = next_chunk.page_content.strip()
images_info.append({
"image_base64": image_b64,
"caption": caption,
"source": os.path.basename(chunk.metadata.get("source", "")),
"page": chunk.metadata.get("page_number", ""),
})
return images_info
def _process_images(self, images: List[Dict[str, Any]]) -> List[str]:
"""Generate descriptions for images using configured model."""
if not images:
return []
print(f"Processing {len(images)} images...")
if self.config["image_provider"] == "ollama":
from ollama import Client
client = Client(host=self.config["ollama_image_url"])
image_summaries = []
for img in tqdm(images, desc="Image processing", leave=False):
prompt = f"Caption of image: {img.get('caption', '')}. Describe this image in detail in {self.config['summary_language']}."
response = client.chat(
model=self.config["ollama_image_model"],
messages=[
{"role": "user", "content": prompt, "images": [img.get("image_base64")]}
]
)
image_summaries.append(response["message"]["content"])
return image_summaries
else: # openai
from langchain_openai import ChatOpenAI
prompt_template = f"""Describe the image in detail in {self.config['summary_language']}.
If there's a caption, use it for context: {{caption}}"""
messages = [
(
"user",
[
{"type": "text", "text": prompt_template},
{
"type": "image_url",
"image_url": {"url": "data:image/jpeg;base64,{image_base64}"},
},
],
)
]
prompt = ChatPromptTemplate.from_messages(messages)
chain = prompt | ChatOpenAI(model=self.config["openai_image_model"], http_client=http_client) | StrOutputParser()
# Process images with progress bar
results = []
image_data = [{"image_base64": img["image_base64"], "caption": img.get("caption", "")} for img in images]
for img_data in tqdm(image_data, desc="Image processing", leave=False):
result = chain.invoke(img_data)
results.append(result)
return results
def _extract_tables(self, documents: List[Document]) -> List[Dict[str, Any]]:
"""Extract tables with captions from documents."""
tables_info = []
for idx, chunk in enumerate(documents):
if chunk.metadata.get("category") == "Table" or "table" in chunk.metadata.get("category", "").lower():
# Extract table content and caption
payload = chunk.metadata.get("payload", {})
caption = payload.get("caption", "").strip()
# Look for caption in next chunk
if not caption and idx + 1 < len(documents):
next_chunk = documents[idx + 1]
if next_chunk.metadata.get("category") == "FigureCaption":
caption = next_chunk.page_content.strip()
tables_info.append({
"table_data": chunk.page_content,
"caption": caption,
"source": os.path.basename(chunk.metadata.get("source", "")),
"page": chunk.metadata.get("page_number", ""),
})
return tables_info
def _process_tables(self, tables: List[Dict[str, Any]]) -> List[str]:
"""Generate summaries for tables."""
if not tables:
return []
print(f"Processing {len(tables)} tables...")
table_summaries = []
for table in tqdm(tables, desc="Table processing", leave=False):
prompt = f"""Caption of table: {table.get('caption', '')}.
Describe this table in detail in {self.config['summary_language']}.
Table content: {table.get('table_data', '')}"""
if self.config["summary_provider"] == "ollama":
summary = self.summary_model.invoke(prompt)
else: # openai
summary = self.summary_model.invoke(prompt).content
table_summaries.append(summary)
return table_summaries
def _convert_text_to_documents(self, texts: List[Document], summaries: List[str]) -> List[Document]:
"""Convert text chunks and summaries into Document objects."""
documents = []
txt_ids = [str(uuid.uuid4()) for _ in texts]
for idx, item in enumerate(texts):
if idx < len(summaries):
summary_text = summaries[idx]
else:
summary_text = ""
metadata = {
"source": item.metadata.get("source", ""),
"page_number": item.metadata.get("page_numbers", []),
"text": item.page_content,
"id_key": txt_ids[idx],
"txt": item.metadata.get("title", "")
}
doc = Document(page_content=summary_text, metadata=metadata)
documents.append(doc)
return documents
def _convert_images_to_documents(self, images: List[Dict[str, Any]], summaries: List[str]) -> List[Document]:
"""Convert image data and summaries into Document objects."""
documents = []
img_ids = [str(uuid.uuid4()) for _ in images]
for idx, item in enumerate(images):
if idx < len(summaries):
summary_text = summaries[idx]
else:
summary_text = ""
metadata = {
"source": item.get("source", ""),
"page_number": item.get("page", ""),
"caption": item.get("caption", ""),
"id_key": img_ids[idx],
"image_base64": item.get("image_base64")
}
doc = Document(page_content=summary_text, metadata=metadata)
documents.append(doc)
return documents
def _convert_tables_to_documents(self, tables: List[Dict[str, Any]], summaries: List[str]) -> List[Document]:
"""Convert table data and summaries into Document objects."""
documents = []
table_ids = [str(uuid.uuid4()) for _ in tables]
for idx, item in enumerate(tables):
if idx < len(summaries):
summary_text = summaries[idx]
else:
summary_text = ""
metadata = {
"source": item.get("source", ""),
"page_number": item.get("page", ""),
"caption": item.get("caption", ""),
"id_key": table_ids[idx],
"table_content": item.get("table_data")
}
doc = Document(page_content=summary_text, metadata=metadata)
documents.append(doc)
return documents
def _store_documents(self, documents: List[Document]) -> None:
"""Store documents in Qdrant vector database."""
from langchain_qdrant import QdrantVectorStore
qdrant = QdrantVectorStore.from_documents(
documents,
self.embedding_model,
url=self.config["qdrant_url"],
collection_name=self.config["collection_name"],
)
def _chunk_by_title(self, documents: List[Document], max_chunk_size: int = 10000,
chunk_overlap: int = 2000) -> List[Document]:
"""
Create chunks based on document title structure.
Each title starts a new chunk.
"""
# Identify title positions
title_positions = []
for i, doc in enumerate(documents):
if doc.metadata.get("category") == "Title":
title_positions.append(i)
# Add final position
title_positions.append(len(documents))
# Create chunks based on titles
title_based_chunks = []
# If no titles found, process as single chunk
if len(title_positions) <= 1:
text_elements = [doc for doc in documents
if doc.metadata.get("category") not in ["Table", "Image", "FigureCaption"]]
combined_text = " ".join([doc.page_content for doc in text_elements])
title_based_chunks.append(Document(
page_content=combined_text,
metadata={
"source": os.path.basename(documents[0].metadata.get("source", "")),
"title": "Document without title",
"page_numbers": list(set(doc.metadata.get("page_number")
for doc in text_elements if doc.metadata.get("page_number")))
}
))
else:
# Process each title-delimited section
for i in range(len(title_positions) - 1):
start_idx = title_positions[i]
end_idx = title_positions[i + 1]
# Get section title
title_doc = documents[start_idx]
title_text = title_doc.page_content
# Get section text elements
section_docs = [
doc for doc in documents[start_idx+1:end_idx]
if doc.metadata.get("category") not in ["Table", "Image", "FigureCaption"]
]
if section_docs:
# Combine section text
section_text = " ".join([doc.page_content for doc in section_docs])
# Get page numbers
page_numbers = list(set(
doc.metadata.get("page_number") for doc in section_docs
if doc.metadata.get("page_number")
))
source = os.path.basename(section_docs[0].metadata.get("source", ""))
# Create Document for section
title_based_chunks.append(Document(
page_content=section_text,
metadata={
"source": source,
"title": title_text,
"page_numbers": page_numbers
}
))
# Further chunk if sections are too large
final_chunks = []
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=max_chunk_size,
chunk_overlap=chunk_overlap
)
for chunk in title_based_chunks:
if len(chunk.page_content) <= max_chunk_size:
final_chunks.append(chunk)
else:
# Split large sections
sub_chunks = text_splitter.split_documents([chunk])
# Preserve title info in sub-chunks
for i, sub_chunk in enumerate(sub_chunks):
sub_chunk.metadata["title"] = chunk.metadata["title"]
sub_chunk.metadata["sub_chunk"] = i + 1
sub_chunk.metadata["total_sub_chunks"] = len(sub_chunks)
final_chunks.extend(sub_chunks)
return final_chunks
def process_directory(self, directory_path: str) -> Dict[str, Any]:
"""
Process all PDF files in the specified directory.
Args:
directory_path: Path to the directory containing PDF files
Returns:
Dictionary with processing statistics for all files
"""
# Check if directory exists
if not os.path.isdir(directory_path):
raise ValueError(f"Directory not found: {directory_path}")
# Find all PDF files in the directory
pdf_files = glob.glob(os.path.join(directory_path, "*.pdf"))
if not pdf_files:
print(f"No PDF files found in {directory_path}")
return {"files_processed": 0}
# Track overall statistics
overall_stats = {
"files_processed": 0,
"total_text_chunks": 0,
"total_image_chunks": 0,
"total_table_chunks": 0,
"total_chunks": 0,
"collection_name": self.config["collection_name"],
"file_details": []
}
# Process each PDF file with a progress bar
print(f"Found {len(pdf_files)} PDF files in {directory_path}")
for pdf_file in tqdm(pdf_files, desc="Processing PDF files", unit="file"):
try:
print(f"\nProcessing: {os.path.basename(pdf_file)}")
result = self.process_pdf(pdf_file)
# Update statistics
overall_stats["files_processed"] += 1
overall_stats["total_text_chunks"] += result.get("text_chunks", 0)
overall_stats["total_image_chunks"] += result.get("image_chunks", 0)
overall_stats["total_table_chunks"] += result.get("table_chunks", 0)
overall_stats["total_chunks"] += result.get("total_chunks", 0)
# Store individual file results
file_detail = {
"filename": os.path.basename(pdf_file),
"text_chunks": result.get("text_chunks", 0),
"image_chunks": result.get("image_chunks", 0),
"table_chunks": result.get("table_chunks", 0),
"total_chunks": result.get("total_chunks", 0)
}
overall_stats["file_details"].append(file_detail)
print(f"Completed: {file_detail['filename']} - {file_detail['total_chunks']} chunks processed")
except Exception as e:
print(f"Error processing {pdf_file}: {str(e)}")
# Continue with next file
print("\nDirectory processing complete!")
print(f"Processed {overall_stats['files_processed']} files")
print(f"Total chunks: {overall_stats['total_chunks']}")
print(f" - Text chunks: {overall_stats['total_text_chunks']}")
print(f" - Image chunks: {overall_stats['total_image_chunks']}")
print(f" - Table chunks: {overall_stats['total_table_chunks']}")
print(f"All content stored in collection: {overall_stats['collection_name']}")
return overall_stats
import glob
import os
processor = PdfProcessor({
# "image_provider": "openai",
# "openai_api_key": "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",
"collection_name": "my_control_and calibration",
"summary_language": "English"
})
results = processor.process_directory(r"C:\Users\serameza\host-data")