491 lines
20 KiB
Python
491 lines
20 KiB
Python
import os
|
|
import uuid
|
|
import pytesseract
|
|
from typing import Dict, List, Any, Optional, Union
|
|
from langchain_community.document_loaders import UnstructuredPDFLoader
|
|
from langchain.schema import Document
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
from langchain_core.prompts import ChatPromptTemplate
|
|
from langchain_core.output_parsers import StrOutputParser
|
|
|
|
|
|
class PdfProcessor:
|
|
"""
|
|
A configurable PDF processor that extracts text, images, and tables from PDFs,
|
|
summarizes them using LLMs, and stores them in a Qdrant vector database.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Initialize the PDF processor with the given configuration.
|
|
|
|
Args:
|
|
config: Dictionary of configuration options
|
|
"""
|
|
# Default configuration
|
|
self.config = {
|
|
# Embeddings
|
|
"embedding_provider": "ollama", # "ollama" or "openai"
|
|
"ollama_embedding_url": "http://localhost:11434",
|
|
"ollama_embedding_model": "mxbai-embed-large",
|
|
"openai_embedding_model": "text-embedding-3-small",
|
|
|
|
# LLM for text/table summarization
|
|
"summary_provider": "ollama", # "ollama" or "openai"
|
|
"ollama_summary_url": "http://localhost:11434",
|
|
"ollama_summary_model": "llama3.2",
|
|
"openai_summary_model": "gpt-3.5-turbo",
|
|
|
|
# Image processing
|
|
"image_provider": "ollama", # "ollama" or "openai"
|
|
"ollama_image_url": "http://localhost:11434",
|
|
"ollama_image_model": "llama3.2-vision",
|
|
"openai_image_model": "gpt-4o-mini",
|
|
|
|
# Vector store
|
|
"qdrant_url": "http://localhost:6333",
|
|
"collection_name": "pdf_documents",
|
|
|
|
# PDF processing
|
|
"extract_images": True,
|
|
"extract_tables": True,
|
|
"chunk_size": 10000,
|
|
"chunk_overlap": 2000,
|
|
"tesseract_path": r'C:\Program Files\Tesseract-OCR\tesseract.exe',
|
|
"image_output_dir": "./temp_images",
|
|
"summary_language": "English",
|
|
|
|
# API keys
|
|
"openai_api_key": None,
|
|
}
|
|
|
|
# Update with user-provided configuration
|
|
if config:
|
|
self.config.update(config)
|
|
|
|
# Set up components
|
|
self._setup_components()
|
|
self._setup_models()
|
|
|
|
def _setup_components(self):
|
|
"""Set up necessary components based on configuration."""
|
|
# Set up Tesseract for OCR
|
|
if self.config["tesseract_path"]:
|
|
pytesseract.pytesseract.tesseract_cmd = self.config["tesseract_path"]
|
|
|
|
# Set up OpenAI key if using OpenAI services
|
|
if (self.config["embedding_provider"] == "openai" or
|
|
self.config["summary_provider"] == "openai" or
|
|
self.config["image_provider"] == "openai"):
|
|
if not self.config["openai_api_key"]:
|
|
raise ValueError("OpenAI API key is required when using OpenAI models")
|
|
os.environ["OPENAI_API_KEY"] = self.config["openai_api_key"]
|
|
|
|
def _setup_models(self):
|
|
"""Initialize models based on configuration."""
|
|
# Set up embedding model
|
|
if self.config["embedding_provider"] == "ollama":
|
|
from langchain_ollama import OllamaEmbeddings
|
|
self.embedding_model = OllamaEmbeddings(
|
|
base_url=self.config["ollama_embedding_url"],
|
|
model=self.config["ollama_embedding_model"]
|
|
)
|
|
else: # openai
|
|
from langchain_openai import OpenAIEmbeddings
|
|
self.embedding_model = OpenAIEmbeddings(
|
|
model=self.config["openai_embedding_model"]
|
|
)
|
|
|
|
# Set up text summarization model
|
|
if self.config["summary_provider"] == "ollama":
|
|
from langchain_ollama import OllamaLLM
|
|
self.summary_model = OllamaLLM(
|
|
base_url=self.config["ollama_summary_url"],
|
|
model=self.config["ollama_summary_model"]
|
|
)
|
|
else: # openai
|
|
from langchain_openai import ChatOpenAI
|
|
self.summary_model = ChatOpenAI(
|
|
model=self.config["openai_summary_model"]
|
|
)
|
|
|
|
# Create summarization chain
|
|
prompt_text = """
|
|
You are an assistant tasked with summarizing tables and text.
|
|
Give a concise summary of the table or text.
|
|
|
|
Respond only with the summary, no additional comment.
|
|
Do not start your message by saying "Here is a summary" or anything like that.
|
|
Just give the summary as it is. All summaries will be in {language}
|
|
|
|
Text or table to summarize: {element}
|
|
"""
|
|
|
|
self.summarize_prompt = ChatPromptTemplate.from_template(prompt_text)
|
|
self.summarize_chain = {"element": lambda x: x, "language": lambda _: self.config["summary_language"]} | self.summarize_prompt | self.summary_model | StrOutputParser()
|
|
|
|
def process_pdf(self, pdf_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Process a PDF file and store its contents in Qdrant.
|
|
|
|
Args:
|
|
pdf_path: Path to the PDF file
|
|
|
|
Returns:
|
|
Dictionary with processing statistics
|
|
"""
|
|
# Load and extract content from PDF
|
|
print("Loading PDF and extracting elements...")
|
|
documents = self._load_pdf(pdf_path)
|
|
|
|
# Process text chunks
|
|
print("Processing text chunks...")
|
|
title_chunks = self._process_text(documents)
|
|
text_summaries = self._summarize_text(title_chunks)
|
|
processed_text = self._convert_text_to_documents(title_chunks, text_summaries)
|
|
|
|
# Process images if configured
|
|
print("Processing images...")
|
|
processed_images = []
|
|
if self.config["extract_images"]:
|
|
images = self._extract_images(documents)
|
|
image_summaries = self._process_images(images)
|
|
processed_images = self._convert_images_to_documents(images, image_summaries)
|
|
|
|
# Process tables if configured
|
|
print("Processing tables...")
|
|
processed_tables = []
|
|
if self.config["extract_tables"]:
|
|
tables = self._extract_tables(documents)
|
|
table_summaries = self._process_tables(tables)
|
|
processed_tables = self._convert_tables_to_documents(tables, table_summaries)
|
|
|
|
print("Storing processed elements in Qdrant...")
|
|
# Combine all processed elements
|
|
final_documents = processed_text + processed_images + processed_tables
|
|
|
|
# Store in Qdrant
|
|
self._store_documents(final_documents)
|
|
|
|
return {
|
|
"text_chunks": len(processed_text),
|
|
"image_chunks": len(processed_images),
|
|
"table_chunks": len(processed_tables),
|
|
"total_chunks": len(final_documents),
|
|
"collection_name": self.config["collection_name"]
|
|
}
|
|
|
|
def _load_pdf(self, pdf_path: str) -> List[Document]:
|
|
"""Load PDF and extract elements."""
|
|
loader = UnstructuredPDFLoader(
|
|
pdf_path,
|
|
infer_table_structure=True,
|
|
extract_images=self.config["extract_images"],
|
|
image_output_dir=self.config["image_output_dir"],
|
|
mode="elements",
|
|
strategy="hi_res",
|
|
extract_image_block_types=["Image"],
|
|
extract_image_block_to_payload=True,
|
|
)
|
|
return loader.load()
|
|
|
|
def _process_text(self, documents: List[Document]) -> List[Document]:
|
|
"""Process text and create title-based chunks."""
|
|
return self._chunk_by_title(
|
|
documents,
|
|
max_chunk_size=self.config["chunk_size"],
|
|
chunk_overlap=self.config["chunk_overlap"]
|
|
)
|
|
|
|
def _summarize_text(self, chunks: List[Document]) -> List[str]:
|
|
"""Generate summaries for text chunks."""
|
|
return self.summarize_chain.batch([chunk.page_content for chunk in chunks], {"max_concurrency": 3})
|
|
|
|
def _extract_images(self, documents: List[Document]) -> List[Dict[str, Any]]:
|
|
"""Extract images with captions from documents."""
|
|
images_info = []
|
|
for i, chunk in enumerate(documents):
|
|
if chunk.metadata.get("category") == "Image":
|
|
image_b64 = chunk.metadata.get('image_base64')
|
|
caption = ""
|
|
|
|
# Look for caption in next chunk
|
|
if i < len(documents) - 1:
|
|
next_chunk = documents[i+1]
|
|
if next_chunk.metadata.get("category") == "FigureCaption":
|
|
caption = next_chunk.page_content.strip()
|
|
|
|
images_info.append({
|
|
"image_base64": image_b64,
|
|
"caption": caption,
|
|
"source": os.path.basename(chunk.metadata.get("source", "")),
|
|
"page": chunk.metadata.get("page_number", ""),
|
|
})
|
|
return images_info
|
|
|
|
def _process_images(self, images: List[Dict[str, Any]]) -> List[str]:
|
|
"""Generate descriptions for images using configured model."""
|
|
if self.config["image_provider"] == "ollama":
|
|
from ollama import Client
|
|
client = Client(host=self.config["ollama_image_url"])
|
|
|
|
image_summaries = []
|
|
for img in images:
|
|
prompt = f"Caption of image: {img.get('caption', '')}. Describe this image in detail in {self.config['summary_language']}."
|
|
response = client.chat(
|
|
model=self.config["ollama_image_model"],
|
|
messages=[
|
|
{"role": "user", "content": prompt, "images": [img.get("image_base64")]}
|
|
]
|
|
)
|
|
image_summaries.append(response["message"]["content"])
|
|
return image_summaries
|
|
|
|
else: # openai
|
|
from langchain_openai import ChatOpenAI
|
|
|
|
prompt_template = f"""Describe the image in detail in {self.config['summary_language']}.
|
|
If there's a caption, use it for context: {{caption}}"""
|
|
|
|
messages = [
|
|
(
|
|
"user",
|
|
[
|
|
{"type": "text", "text": prompt_template},
|
|
{
|
|
"type": "image_url",
|
|
"image_url": {"url": "data:image/jpeg;base64,{image_base64}"},
|
|
},
|
|
],
|
|
)
|
|
]
|
|
|
|
prompt = ChatPromptTemplate.from_messages(messages)
|
|
chain = prompt | ChatOpenAI(model=self.config["openai_image_model"]) | StrOutputParser()
|
|
|
|
return chain.batch([{"image_base64": img["image_base64"], "caption": img.get("caption", "")} for img in images])
|
|
|
|
def _extract_tables(self, documents: List[Document]) -> List[Dict[str, Any]]:
|
|
"""Extract tables with captions from documents."""
|
|
tables_info = []
|
|
for idx, chunk in enumerate(documents):
|
|
if chunk.metadata.get("category") == "Table" or "table" in chunk.metadata.get("category", "").lower():
|
|
# Extract table content and caption
|
|
payload = chunk.metadata.get("payload", {})
|
|
caption = payload.get("caption", "").strip()
|
|
|
|
# Look for caption in next chunk
|
|
if not caption and idx + 1 < len(documents):
|
|
next_chunk = documents[idx + 1]
|
|
if next_chunk.metadata.get("category") == "FigureCaption":
|
|
caption = next_chunk.page_content.strip()
|
|
|
|
tables_info.append({
|
|
"table_data": chunk.page_content,
|
|
"caption": caption,
|
|
"source": os.path.basename(chunk.metadata.get("source", "")),
|
|
"page": chunk.metadata.get("page_number", ""),
|
|
})
|
|
return tables_info
|
|
|
|
def _process_tables(self, tables: List[Dict[str, Any]]) -> List[str]:
|
|
"""Generate summaries for tables."""
|
|
table_summaries = []
|
|
|
|
for table in tables:
|
|
prompt = f"""Caption of table: {table.get('caption', '')}.
|
|
Describe this table in detail in {self.config['summary_language']}.
|
|
Table content: {table.get('table_data', '')}"""
|
|
|
|
if self.config["summary_provider"] == "ollama":
|
|
summary = self.summary_model.invoke(prompt)
|
|
else: # openai
|
|
summary = self.summary_model.invoke(prompt).content
|
|
|
|
table_summaries.append(summary)
|
|
|
|
return table_summaries
|
|
|
|
def _convert_text_to_documents(self, texts: List[Document], summaries: List[str]) -> List[Document]:
|
|
"""Convert text chunks and summaries into Document objects."""
|
|
documents = []
|
|
txt_ids = [str(uuid.uuid4()) for _ in texts]
|
|
|
|
for idx, item in enumerate(texts):
|
|
if idx < len(summaries):
|
|
summary_text = summaries[idx]
|
|
else:
|
|
summary_text = ""
|
|
|
|
metadata = {
|
|
"source": item.metadata.get("source", ""),
|
|
"page_number": item.metadata.get("page_numbers", []),
|
|
"text": item.page_content,
|
|
"id_key": txt_ids[idx],
|
|
"txt": item.metadata.get("title", "")
|
|
}
|
|
|
|
doc = Document(page_content=summary_text, metadata=metadata)
|
|
documents.append(doc)
|
|
|
|
return documents
|
|
|
|
def _convert_images_to_documents(self, images: List[Dict[str, Any]], summaries: List[str]) -> List[Document]:
|
|
"""Convert image data and summaries into Document objects."""
|
|
documents = []
|
|
img_ids = [str(uuid.uuid4()) for _ in images]
|
|
|
|
for idx, item in enumerate(images):
|
|
if idx < len(summaries):
|
|
summary_text = summaries[idx]
|
|
else:
|
|
summary_text = ""
|
|
|
|
metadata = {
|
|
"source": item.get("source", ""),
|
|
"page_number": item.get("page", ""),
|
|
"caption": item.get("caption", ""),
|
|
"id_key": img_ids[idx],
|
|
"image_base64": item.get("image_base64")
|
|
}
|
|
|
|
doc = Document(page_content=summary_text, metadata=metadata)
|
|
documents.append(doc)
|
|
|
|
return documents
|
|
|
|
def _convert_tables_to_documents(self, tables: List[Dict[str, Any]], summaries: List[str]) -> List[Document]:
|
|
"""Convert table data and summaries into Document objects."""
|
|
documents = []
|
|
table_ids = [str(uuid.uuid4()) for _ in tables]
|
|
|
|
for idx, item in enumerate(tables):
|
|
if idx < len(summaries):
|
|
summary_text = summaries[idx]
|
|
else:
|
|
summary_text = ""
|
|
|
|
metadata = {
|
|
"source": item.get("source", ""),
|
|
"page_number": item.get("page", ""),
|
|
"caption": item.get("caption", ""),
|
|
"id_key": table_ids[idx],
|
|
"table_content": item.get("table_data")
|
|
}
|
|
|
|
doc = Document(page_content=summary_text, metadata=metadata)
|
|
documents.append(doc)
|
|
|
|
return documents
|
|
|
|
def _store_documents(self, documents: List[Document]) -> None:
|
|
"""Store documents in Qdrant vector database."""
|
|
from langchain_qdrant import QdrantVectorStore
|
|
|
|
qdrant = QdrantVectorStore.from_documents(
|
|
documents,
|
|
self.embedding_model,
|
|
url=self.config["qdrant_url"],
|
|
collection_name=self.config["collection_name"],
|
|
)
|
|
|
|
def _chunk_by_title(self, documents: List[Document], max_chunk_size: int = 10000,
|
|
chunk_overlap: int = 2000) -> List[Document]:
|
|
"""
|
|
Create chunks based on document title structure.
|
|
Each title starts a new chunk.
|
|
"""
|
|
# Identify title positions
|
|
title_positions = []
|
|
for i, doc in enumerate(documents):
|
|
if doc.metadata.get("category") == "Title":
|
|
title_positions.append(i)
|
|
|
|
# Add final position
|
|
title_positions.append(len(documents))
|
|
|
|
# Create chunks based on titles
|
|
title_based_chunks = []
|
|
|
|
# If no titles found, process as single chunk
|
|
if len(title_positions) <= 1:
|
|
text_elements = [doc for doc in documents
|
|
if doc.metadata.get("category") not in ["Table", "Image", "FigureCaption"]]
|
|
combined_text = " ".join([doc.page_content for doc in text_elements])
|
|
|
|
title_based_chunks.append(Document(
|
|
page_content=combined_text,
|
|
metadata={
|
|
"source": os.path.basename(documents[0].metadata.get("source", "")),
|
|
"title": "Document without title",
|
|
"page_numbers": list(set(doc.metadata.get("page_number")
|
|
for doc in text_elements if doc.metadata.get("page_number")))
|
|
}
|
|
))
|
|
else:
|
|
# Process each title-delimited section
|
|
for i in range(len(title_positions) - 1):
|
|
start_idx = title_positions[i]
|
|
end_idx = title_positions[i + 1]
|
|
|
|
# Get section title
|
|
title_doc = documents[start_idx]
|
|
title_text = title_doc.page_content
|
|
|
|
# Get section text elements
|
|
section_docs = [
|
|
doc for doc in documents[start_idx+1:end_idx]
|
|
if doc.metadata.get("category") not in ["Table", "Image", "FigureCaption"]
|
|
]
|
|
|
|
if section_docs:
|
|
# Combine section text
|
|
section_text = " ".join([doc.page_content for doc in section_docs])
|
|
|
|
# Get page numbers
|
|
page_numbers = list(set(
|
|
doc.metadata.get("page_number") for doc in section_docs
|
|
if doc.metadata.get("page_number")
|
|
))
|
|
|
|
source = os.path.basename(section_docs[0].metadata.get("source", ""))
|
|
|
|
# Create Document for section
|
|
title_based_chunks.append(Document(
|
|
page_content=section_text,
|
|
metadata={
|
|
"source": source,
|
|
"title": title_text,
|
|
"page_numbers": page_numbers
|
|
}
|
|
))
|
|
|
|
# Further chunk if sections are too large
|
|
final_chunks = []
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|
chunk_size=max_chunk_size,
|
|
chunk_overlap=chunk_overlap
|
|
)
|
|
|
|
for chunk in title_based_chunks:
|
|
if len(chunk.page_content) <= max_chunk_size:
|
|
final_chunks.append(chunk)
|
|
else:
|
|
# Split large sections
|
|
sub_chunks = text_splitter.split_documents([chunk])
|
|
# Preserve title info in sub-chunks
|
|
for i, sub_chunk in enumerate(sub_chunks):
|
|
sub_chunk.metadata["title"] = chunk.metadata["title"]
|
|
sub_chunk.metadata["sub_chunk"] = i + 1
|
|
sub_chunk.metadata["total_sub_chunks"] = len(sub_chunks)
|
|
final_chunks.extend(sub_chunks)
|
|
|
|
return final_chunks
|
|
|
|
processor = PdfProcessor({
|
|
"image_provider": "openai",
|
|
"openai_api_key": "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",
|
|
"collection_name": "my_custom_collection",
|
|
"summary_language": "English"
|
|
})
|
|
result = processor.process_pdf(r"F:\Dev\Rag\chat_bot_rag\T4 Machines thermiques.pdf") |