Building a Production-Grade PDF Extraction Pipeline: DSPy Signatures + CocoIndex Incremental Processing
Table of Contents
Introduction
Extracting structured data from unstructured documents is one of the most common challenges in production AI systems. Traditional approaches involve:
OCR preprocessing - Converting PDFs to text with tools like Tesseract
Regex patterns - Brittle rules that break on format variations
Prompt engineering - String-based LLM instructions that are hard to test and version
This tutorial demonstrates a fundamentally different approach: declarative extraction with typed contracts.
We'll build a patient intake form extraction pipeline that:
Uses DSPy Signatures to define extraction contracts
Leverages vision models to skip OCR entirely
Employs CocoIndex for incremental processing and caching
Exports to PostgreSQL with automatic change detection
Architecture Overview
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ PDF Files │────▶│ CocoIndex │────▶│ PostgreSQL │
│ (Source) │ │ Flow Engine │ │ (Target) │
└─────────────────┘ └────────┬─────────┘ └─────────────────┘
│
▼
┌──────────────────┐
│ DSPy Module │
│ (Extractor) │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Gemini Vision │
│ (LLM) │
└──────────────────┘
The data flow is:
CocoIndex monitors a directory for PDF files
Changed files trigger the DSPy extraction module
DSPy converts PDFs to images and calls Gemini Vision
Structured
Patientobjects are validated by PydanticResults are upserted to PostgreSQL with lineage tracking
Understanding DSPy's Programming Model
The Problem with Prompt Engineering
Traditional LLM applications embed logic in strings:
# Fragile approach
prompt = f"""
Extract patient information from this form.
Return JSON with fields: name, dob, address...
Form text:
{ocr_text}
"""
response = llm.complete(prompt)
patient = json.loads(response) # Hope it's valid JSON!
This breaks when:
The model returns malformed JSON
Field names don't match your schema
The OCR text has unexpected formatting
DSPy's Solution: Typed Signatures
DSPy introduces Signatures - typed contracts that declare inputs and outputs:
import dspy
from pydantic import BaseModel
class Patient(BaseModel):
name: str
dob: date
address: Address
# ... full schema
class PatientExtractionSignature(dspy.Signature):
"""Extract structured patient information from intake form images."""
form_images: list[dspy.Image] = dspy.InputField(
desc="Images of the patient intake form pages"
)
patient: Patient = dspy.OutputField(
desc="Extracted patient information"
)
Key insight: You define the contract, DSPy figures out the prompting.
Modules: Composable Building Blocks
class PatientExtractor(dspy.Module):
def __init__(self):
super().__init__()
# ChainOfThought adds reasoning steps automatically
self.extract = dspy.ChainOfThought(PatientExtractionSignature)
def forward(self, form_images: list[dspy.Image]) -> Patient:
result = self.extract(form_images=form_images)
return result.patient
dspy.ChainOfThought wraps your signature with reasoning capabilities. The framework:
Generates an appropriate prompt from your signature
Adds chain-of-thought reasoning steps
Parses the response into your Pydantic model
Validates against your schema
CocoIndex: The Incremental Processing Engine
Why Incremental Processing Matters
Consider a directory with 10,000 patient forms. Traditional pipelines reprocess everything on each run:
Run 1: Process 10,000 files → 10,000 API calls
Run 2: 5 new files added → 10,005 API calls (wasteful!)
CocoIndex tracks data lineage and only processes changes:
Run 1: Process 10,000 files → 10,000 API calls
Run 2: 5 new files added → 5 API calls (efficient!)
The Flow Abstraction
CocoIndex uses a declarative flow model:
import cocoindex
@cocoindex.flow_def(name="PatientIntakeExtraction")
def extraction_flow(flow_builder, data_scope):
# 1. Define source
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(
path="data/patient_forms",
binary=True # Read as bytes for PDF
)
)
# 2. Create collector for results
patients_index = data_scope.add_collector()
# 3. Process each document
with data_scope["documents"].row() as doc:
doc["patient_info"] = doc["content"].transform(extract_patient)
patients_index.collect(
filename=doc["filename"],
patient_info=doc["patient_info"]
)
# 4. Export to storage
patients_index.export(
"patients",
cocoindex.storages.Postgres(table_name="patients_info"),
primary_key_fields=["filename"]
)
Caching with Behavior Versioning
@cocoindex.op.function(cache=True, behavior_version=1)
def extract_patient(pdf_content: bytes) -> Patient:
# ... extraction logic
The cache=True decorator enables result caching. The behavior_version parameter allows you to invalidate caches when your extraction logic changes.
Implementation Deep Dive
Complete Pydantic Schema
from pydantic import BaseModel, Field
from datetime import date
from typing import Optional
class Contact(BaseModel):
name: str
phone: str
relationship: str
class Address(BaseModel):
street: str
city: str
state: str
zip_code: str
class Insurance(BaseModel):
provider: str
policy_number: str
group_number: Optional[str] = None
policyholder_name: str
relationship_to_patient: str
class Medication(BaseModel):
name: str
dosage: str
class Allergy(BaseModel):
name: str
class Patient(BaseModel):
name: str
dob: date
gender: str
address: Address
phone: str
email: str
preferred_contact_method: str
emergency_contact: Contact
insurance: Optional[Insurance] = None
reason_for_visit: str
symptoms_duration: str
current_medications: list[Medication] = Field(default_factory=list)
allergies: list[Allergy] = Field(default_factory=list)
consent_given: bool
consent_date: Optional[str] = None
PDF to Image Conversion
import pymupdf
def pdf_to_images(pdf_content: bytes) -> list[dspy.Image]:
"""Convert PDF pages to DSPy Image objects."""
pdf_doc = pymupdf.open(stream=pdf_content, filetype="pdf")
images = []
for page in pdf_doc:
# Render at 2x resolution for better extraction
pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2))
img_bytes = pix.tobytes("png")
images.append(dspy.Image(img_bytes))
pdf_doc.close()
return images
The Complete Extraction Function
@cocoindex.op.function(cache=True, behavior_version=1)
def extract_patient(pdf_content: bytes) -> Patient:
"""Extract patient info from PDF using DSPy vision."""
form_images = pdf_to_images(pdf_content)
extractor = PatientExtractor()
return extractor(form_images=form_images)
Configuration
@cocoindex.settings
def cocoindex_settings() -> cocoindex.Settings:
# Configure DSPy with Gemini
lm = dspy.LM("gemini/gemini-2.5-flash")
dspy.configure(lm=lm)
return cocoindex.Settings.from_env()
Performance Considerations
1. Image Resolution Trade-offs
Higher resolution improves extraction accuracy but increases:
API token costs (larger images)
Processing time
Memory usage
Recommendation: Start with 2x scaling, adjust based on accuracy needs.
2. Caching Strategy
Development: Enable caching to avoid repeated API calls
Production: Use
behavior_versionto invalidate when logic changesDebugging: Temporarily disable caching to test changes
3. Batch Processing
For large document sets, CocoIndex's incremental processing means:
Initial backfill processes all documents
Subsequent runs only process changes
Failed extractions can be retried without reprocessing successes
4. Error Handling
@cocoindex.op.function(cache=True, behavior_version=1)
def extract_patient(pdf_content: bytes) -> Patient | None:
try:
form_images = pdf_to_images(pdf_content)
extractor = PatientExtractor()
return extractor(form_images=form_images)
except Exception as e:
logging.error(f"Extraction failed: {e}")
return None
Conclusion
This architecture provides several advantages over traditional approaches:
| Aspect | Traditional | DSPy + CocoIndex |
| Extraction Logic | Regex + prompt strings | Typed Signatures |
| Validation | Manual JSON parsing | Pydantic automatic |
| Processing | Full reprocessing | Incremental only |
| Caching | Custom implementation | Built-in |
| Lineage | None | Automatic tracking |
| Testing | String comparison | Module unit tests |
The key insight is separation of concerns:
DSPy handles LLM interaction and structured output
CocoIndex handles data flow and incremental processing
Pydantic handles validation and serialization
Each component does one thing well, and they compose cleanly.
Resources
What challenges have you faced with document extraction? I'd love to hear your experiences in the comments!

