import os import uuid import pytesseract from typing import Dict, List, Any, Optional, Union from langchain_community.document_loaders import UnstructuredPDFLoader from langchain.schema import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser import httpx from tqdm import tqdm http_client = httpx.Client(verify=False) class PdfProcessor: """ A configurable PDF processor that extracts text, images, and tables from PDFs, summarizes them using LLMs, and stores them in a Qdrant vector database. """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the PDF processor with the given configuration. Args: config: Dictionary of configuration options """ # Default configuration self.config = { # Embeddings "embedding_provider": "ollama", # "ollama" or "openai" "ollama_embedding_url": "http://localhost:11434", "ollama_embedding_model": "mxbai-embed-large", "openai_embedding_model": "text-embedding-3-small", # LLM for text/table summarization "summary_provider": "ollama", # "ollama" or "openai" "ollama_summary_url": "http://localhost:11434", "ollama_summary_model": "llama3.2", "openai_summary_model": "gpt-3.5-turbo", # Image processing "image_provider": "ollama", # "ollama" or "openai" "ollama_image_url": "http://localhost:11434", "ollama_image_model": "llama3.2-vision", "openai_image_model": "gpt-4o-mini", # Vector store "qdrant_url": "http://localhost:6333", "collection_name": "pdf_documents", # PDF processing "extract_images": True, "extract_tables": True, "chunk_size": 10000, "chunk_overlap": 2000, "tesseract_path": r'C:\Program Files\Tesseract-OCR\tesseract.exe', "image_output_dir": "./temp_images", "summary_language": "English", # API keys "openai_api_key": None, } # Update with user-provided configuration if config: self.config.update(config) # Set up components self._setup_components() self._setup_models() def _setup_components(self): """Set up necessary components based on configuration.""" # Set up Tesseract for OCR if self.config["tesseract_path"]: pytesseract.pytesseract.tesseract_cmd = self.config["tesseract_path"] # Set up OpenAI key if using OpenAI services if (self.config["embedding_provider"] == "openai" or self.config["summary_provider"] == "openai" or self.config["image_provider"] == "openai"): if not self.config["openai_api_key"]: raise ValueError("OpenAI API key is required when using OpenAI models") os.environ["OPENAI_API_KEY"] = self.config["openai_api_key"] # Initialize Qdrant client from qdrant_client import QdrantClient from qdrant_client.http import models as rest self.qdrant_client = QdrantClient(url=self.config["qdrant_url"]) # Check if collection exists and create it if not collections = self.qdrant_client.get_collections().collections collection_exists = any(collection.name == self.config["collection_name"] for collection in collections) if not collection_exists: # Get vector size based on embedding model if self.config["embedding_provider"] == "ollama": # For OllamaEmbeddings, typically 4096 dimensions for newer models vector_size = 4096 else: # OpenAI # OpenAI embedding dimensions vary by model model_dimensions = { "text-embedding-ada-002": 1536, "text-embedding-3-small": 1536, "text-embedding-3-large": 3072 } vector_size = model_dimensions.get(self.config["openai_embedding_model"], 1536) # Create the collection self.qdrant_client.create_collection( collection_name=self.config["collection_name"], vectors_config=rest.VectorParams( size=vector_size, distance=rest.Distance.COSINE ) ) print(f"Created new Qdrant collection: {self.config['collection_name']}") def _setup_models(self): """Initialize models based on configuration.""" # Set up embedding model if self.config["embedding_provider"] == "ollama": from langchain_ollama import OllamaEmbeddings self.embedding_model = OllamaEmbeddings( base_url=self.config["ollama_embedding_url"], model=self.config["ollama_embedding_model"] ) else: # openai from langchain_openai import OpenAIEmbeddings self.embedding_model = OpenAIEmbeddings( model=self.config["openai_embedding_model"] ) # Set up text summarization model if self.config["summary_provider"] == "ollama": from langchain_ollama import OllamaLLM self.summary_model = OllamaLLM( base_url=self.config["ollama_summary_url"], model=self.config["ollama_summary_model"] ) else: # openai from langchain_openai import ChatOpenAI self.summary_model = ChatOpenAI( http_client=http_client, model=self.config["openai_summary_model"] ) # Create summarization chain prompt_text = """ You are an assistant tasked with summarizing tables and text. Give a concise summary of the table or text. Respond only with the summary, no additional comment. Do not start your message by saying "Here is a summary" or anything like that. Just give the summary as it is. All summaries will be in {language} Text or table to summarize: {element} """ self.summarize_prompt = ChatPromptTemplate.from_template(prompt_text) self.summarize_chain = {"element": lambda x: x, "language": lambda _: self.config["summary_language"]} | self.summarize_prompt | self.summary_model | StrOutputParser() def process_pdf(self, pdf_path: str) -> Dict[str, Any]: """ Process a PDF file and store its contents in Qdrant. Args: pdf_path: Path to the PDF file Returns: Dictionary with processing statistics """ # Create a master progress bar with tqdm(total=5, desc="PDF Processing", position=0) as master_bar: # Load and extract content from PDF master_bar.set_description("Loading PDF") documents = self._load_pdf(pdf_path) master_bar.update(1) # Process text chunks master_bar.set_description("Processing text chunks") title_chunks = self._process_text(documents) text_summaries = self._summarize_text(title_chunks) processed_text = self._convert_text_to_documents(title_chunks, text_summaries) master_bar.update(1) # Process images if configured master_bar.set_description("Processing images") processed_images = [] if self.config["extract_images"]: images = self._extract_images(documents) image_summaries = self._process_images(images) processed_images = self._convert_images_to_documents(images, image_summaries) master_bar.update(1) # Process tables if configured master_bar.set_description("Processing tables") processed_tables = [] if self.config["extract_tables"]: tables = self._extract_tables(documents) table_summaries = self._process_tables(tables) processed_tables = self._convert_tables_to_documents(tables, table_summaries) master_bar.update(1) master_bar.set_description("Storing in Qdrant") # Combine all processed elements final_documents = processed_text + processed_images + processed_tables # Store in Qdrant self._store_documents(final_documents) master_bar.update(1) return { "text_chunks": len(processed_text), "image_chunks": len(processed_images), "table_chunks": len(processed_tables), "total_chunks": len(final_documents), "collection_name": self.config["collection_name"] } def _load_pdf(self, pdf_path: str) -> List[Document]: """Load PDF and extract elements.""" loader = UnstructuredPDFLoader( pdf_path, infer_table_structure=True, extract_images=self.config["extract_images"], image_output_dir=self.config["image_output_dir"], mode="elements", strategy="hi_res", extract_image_block_types=["Image"], extract_image_block_to_payload=True, ) return loader.load() def _process_text(self, documents: List[Document]) -> List[Document]: """Process text and create title-based chunks.""" return self._chunk_by_title( documents, max_chunk_size=self.config["chunk_size"], chunk_overlap=self.config["chunk_overlap"] ) def _summarize_text(self, chunks: List[Document]) -> List[str]: """Generate summaries for text chunks.""" if not chunks: return [] print(f"Summarizing {len(chunks)} text chunks...") results = [] for chunk in tqdm(chunks, desc="Text summarization", leave=False): result = self.summarize_chain.invoke(chunk.page_content) results.append(result) return results def _extract_images(self, documents: List[Document]) -> List[Dict[str, Any]]: """Extract images with captions from documents.""" images_info = [] for i, chunk in enumerate(documents): if chunk.metadata.get("category") == "Image": image_b64 = chunk.metadata.get('image_base64') caption = "" # Look for caption in next chunk if i < len(documents) - 1: next_chunk = documents[i+1] if next_chunk.metadata.get("category") == "FigureCaption": caption = next_chunk.page_content.strip() images_info.append({ "image_base64": image_b64, "caption": caption, "source": os.path.basename(chunk.metadata.get("source", "")), "page": chunk.metadata.get("page_number", ""), }) return images_info def _process_images(self, images: List[Dict[str, Any]]) -> List[str]: """Generate descriptions for images using configured model.""" if not images: return [] print(f"Processing {len(images)} images...") if self.config["image_provider"] == "ollama": from ollama import Client client = Client(host=self.config["ollama_image_url"]) image_summaries = [] for img in tqdm(images, desc="Image processing", leave=False): prompt = f"Caption of image: {img.get('caption', '')}. Describe this image in detail in {self.config['summary_language']}." response = client.chat( model=self.config["ollama_image_model"], messages=[ {"role": "user", "content": prompt, "images": [img.get("image_base64")]} ] ) image_summaries.append(response["message"]["content"]) return image_summaries else: # openai from langchain_openai import ChatOpenAI prompt_template = f"""Describe the image in detail in {self.config['summary_language']}. If there's a caption, use it for context: {{caption}}""" messages = [ ( "user", [ {"type": "text", "text": prompt_template}, { "type": "image_url", "image_url": {"url": "data:image/jpeg;base64,{image_base64}"}, }, ], ) ] prompt = ChatPromptTemplate.from_messages(messages) chain = prompt | ChatOpenAI(model=self.config["openai_image_model"], http_client=http_client) | StrOutputParser() # Process images with progress bar results = [] image_data = [{"image_base64": img["image_base64"], "caption": img.get("caption", "")} for img in images] for img_data in tqdm(image_data, desc="Image processing", leave=False): result = chain.invoke(img_data) results.append(result) return results def _extract_tables(self, documents: List[Document]) -> List[Dict[str, Any]]: """Extract tables with captions from documents.""" tables_info = [] for idx, chunk in enumerate(documents): if chunk.metadata.get("category") == "Table" or "table" in chunk.metadata.get("category", "").lower(): # Extract table content and caption payload = chunk.metadata.get("payload", {}) caption = payload.get("caption", "").strip() # Look for caption in next chunk if not caption and idx + 1 < len(documents): next_chunk = documents[idx + 1] if next_chunk.metadata.get("category") == "FigureCaption": caption = next_chunk.page_content.strip() tables_info.append({ "table_data": chunk.page_content, "caption": caption, "source": os.path.basename(chunk.metadata.get("source", "")), "page": chunk.metadata.get("page_number", ""), }) return tables_info def _process_tables(self, tables: List[Dict[str, Any]]) -> List[str]: """Generate summaries for tables.""" if not tables: return [] print(f"Processing {len(tables)} tables...") table_summaries = [] for table in tqdm(tables, desc="Table processing", leave=False): prompt = f"""Caption of table: {table.get('caption', '')}. Describe this table in detail in {self.config['summary_language']}. Table content: {table.get('table_data', '')}""" if self.config["summary_provider"] == "ollama": summary = self.summary_model.invoke(prompt) else: # openai summary = self.summary_model.invoke(prompt).content table_summaries.append(summary) return table_summaries def _convert_text_to_documents(self, texts: List[Document], summaries: List[str]) -> List[Document]: """Convert text chunks and summaries into Document objects.""" documents = [] txt_ids = [str(uuid.uuid4()) for _ in texts] for idx, item in enumerate(texts): if idx < len(summaries): summary_text = summaries[idx] else: summary_text = "" metadata = { "source": item.metadata.get("source", ""), "page_number": item.metadata.get("page_numbers", []), "text": item.page_content, "id_key": txt_ids[idx], "txt": item.metadata.get("title", "") } doc = Document(page_content=summary_text, metadata=metadata) documents.append(doc) return documents def _convert_images_to_documents(self, images: List[Dict[str, Any]], summaries: List[str]) -> List[Document]: """Convert image data and summaries into Document objects.""" documents = [] img_ids = [str(uuid.uuid4()) for _ in images] for idx, item in enumerate(images): if idx < len(summaries): summary_text = summaries[idx] else: summary_text = "" metadata = { "source": item.get("source", ""), "page_number": item.get("page", ""), "caption": item.get("caption", ""), "id_key": img_ids[idx], "image_base64": item.get("image_base64") } doc = Document(page_content=summary_text, metadata=metadata) documents.append(doc) return documents def _convert_tables_to_documents(self, tables: List[Dict[str, Any]], summaries: List[str]) -> List[Document]: """Convert table data and summaries into Document objects.""" documents = [] table_ids = [str(uuid.uuid4()) for _ in tables] for idx, item in enumerate(tables): if idx < len(summaries): summary_text = summaries[idx] else: summary_text = "" metadata = { "source": item.get("source", ""), "page_number": item.get("page", ""), "caption": item.get("caption", ""), "id_key": table_ids[idx], "table_content": item.get("table_data") } doc = Document(page_content=summary_text, metadata=metadata) documents.append(doc) return documents def _store_documents(self, documents: List[Document]) -> None: """Store documents in Qdrant vector database.""" from langchain_qdrant import QdrantVectorStore qdrant = QdrantVectorStore.from_documents( documents, self.embedding_model, url=self.config["qdrant_url"], collection_name=self.config["collection_name"], ) def _chunk_by_title(self, documents: List[Document], max_chunk_size: int = 10000, chunk_overlap: int = 2000) -> List[Document]: """ Create chunks based on document title structure. Each title starts a new chunk. """ # Identify title positions title_positions = [] for i, doc in enumerate(documents): if doc.metadata.get("category") == "Title": title_positions.append(i) # Add final position title_positions.append(len(documents)) # Create chunks based on titles title_based_chunks = [] # If no titles found, process as single chunk if len(title_positions) <= 1: text_elements = [doc for doc in documents if doc.metadata.get("category") not in ["Table", "Image", "FigureCaption"]] combined_text = " ".join([doc.page_content for doc in text_elements]) title_based_chunks.append(Document( page_content=combined_text, metadata={ "source": os.path.basename(documents[0].metadata.get("source", "")), "title": "Document without title", "page_numbers": list(set(doc.metadata.get("page_number") for doc in text_elements if doc.metadata.get("page_number"))) } )) else: # Process each title-delimited section for i in range(len(title_positions) - 1): start_idx = title_positions[i] end_idx = title_positions[i + 1] # Get section title title_doc = documents[start_idx] title_text = title_doc.page_content # Get section text elements section_docs = [ doc for doc in documents[start_idx+1:end_idx] if doc.metadata.get("category") not in ["Table", "Image", "FigureCaption"] ] if section_docs: # Combine section text section_text = " ".join([doc.page_content for doc in section_docs]) # Get page numbers page_numbers = list(set( doc.metadata.get("page_number") for doc in section_docs if doc.metadata.get("page_number") )) source = os.path.basename(section_docs[0].metadata.get("source", "")) # Create Document for section title_based_chunks.append(Document( page_content=section_text, metadata={ "source": source, "title": title_text, "page_numbers": page_numbers } )) # Further chunk if sections are too large final_chunks = [] text_splitter = RecursiveCharacterTextSplitter( chunk_size=max_chunk_size, chunk_overlap=chunk_overlap ) for chunk in title_based_chunks: if len(chunk.page_content) <= max_chunk_size: final_chunks.append(chunk) else: # Split large sections sub_chunks = text_splitter.split_documents([chunk]) # Preserve title info in sub-chunks for i, sub_chunk in enumerate(sub_chunks): sub_chunk.metadata["title"] = chunk.metadata["title"] sub_chunk.metadata["sub_chunk"] = i + 1 sub_chunk.metadata["total_sub_chunks"] = len(sub_chunks) final_chunks.extend(sub_chunks) return final_chunks def process_directory(self, directory_path: str) -> Dict[str, Any]: """ Process all PDF files in the specified directory. Args: directory_path: Path to the directory containing PDF files Returns: Dictionary with processing statistics for all files """ # Check if directory exists if not os.path.isdir(directory_path): raise ValueError(f"Directory not found: {directory_path}") # Find all PDF files in the directory pdf_files = glob.glob(os.path.join(directory_path, "*.pdf")) if not pdf_files: print(f"No PDF files found in {directory_path}") return {"files_processed": 0} # Track overall statistics overall_stats = { "files_processed": 0, "total_text_chunks": 0, "total_image_chunks": 0, "total_table_chunks": 0, "total_chunks": 0, "collection_name": self.config["collection_name"], "file_details": [] } # Process each PDF file with a progress bar print(f"Found {len(pdf_files)} PDF files in {directory_path}") for pdf_file in tqdm(pdf_files, desc="Processing PDF files", unit="file"): try: print(f"\nProcessing: {os.path.basename(pdf_file)}") result = self.process_pdf(pdf_file) # Update statistics overall_stats["files_processed"] += 1 overall_stats["total_text_chunks"] += result.get("text_chunks", 0) overall_stats["total_image_chunks"] += result.get("image_chunks", 0) overall_stats["total_table_chunks"] += result.get("table_chunks", 0) overall_stats["total_chunks"] += result.get("total_chunks", 0) # Store individual file results file_detail = { "filename": os.path.basename(pdf_file), "text_chunks": result.get("text_chunks", 0), "image_chunks": result.get("image_chunks", 0), "table_chunks": result.get("table_chunks", 0), "total_chunks": result.get("total_chunks", 0) } overall_stats["file_details"].append(file_detail) print(f"Completed: {file_detail['filename']} - {file_detail['total_chunks']} chunks processed") except Exception as e: print(f"Error processing {pdf_file}: {str(e)}") # Continue with next file print("\nDirectory processing complete!") print(f"Processed {overall_stats['files_processed']} files") print(f"Total chunks: {overall_stats['total_chunks']}") print(f" - Text chunks: {overall_stats['total_text_chunks']}") print(f" - Image chunks: {overall_stats['total_image_chunks']}") print(f" - Table chunks: {overall_stats['total_table_chunks']}") print(f"All content stored in collection: {overall_stats['collection_name']}") return overall_stats import glob import os processor = PdfProcessor({ # "image_provider": "openai", # "openai_api_key": "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", "collection_name": "my_control_and calibration", "summary_language": "English" }) results = processor.process_directory(r"C:\Users\serameza\host-data")