Skip to main content

Command Palette

Search for a command to run...

Building a Production-Grade PDF Extraction Pipeline: DSPy Signatures + CocoIndex Incremental Processing

Published
6 min read

Table of Contents


Introduction

Extracting structured data from unstructured documents is one of the most common challenges in production AI systems. Traditional approaches involve:

  1. OCR preprocessing - Converting PDFs to text with tools like Tesseract

  2. Regex patterns - Brittle rules that break on format variations

  3. Prompt engineering - String-based LLM instructions that are hard to test and version

This tutorial demonstrates a fundamentally different approach: declarative extraction with typed contracts.

We'll build a patient intake form extraction pipeline that:

  • Uses DSPy Signatures to define extraction contracts

  • Leverages vision models to skip OCR entirely

  • Employs CocoIndex for incremental processing and caching

  • Exports to PostgreSQL with automatic change detection


Architecture Overview

┌─────────────────┐     ┌──────────────────┐     ┌─────────────────┐
│   PDF Files     │────▶│   CocoIndex      │────▶│   PostgreSQL    │
│   (Source)      │     │   Flow Engine    │     │   (Target)      │
└─────────────────┘     └────────┬─────────┘     └─────────────────┘
                                 │
                                 ▼
                        ┌──────────────────┐
                        │   DSPy Module    │
                        │   (Extractor)    │
                        └────────┬─────────┘
                                 │
                                 ▼
                        ┌──────────────────┐
                        │   Gemini Vision  │
                        │   (LLM)          │
                        └──────────────────┘

The data flow is:

  1. CocoIndex monitors a directory for PDF files

  2. Changed files trigger the DSPy extraction module

  3. DSPy converts PDFs to images and calls Gemini Vision

  4. Structured Patient objects are validated by Pydantic

  5. Results are upserted to PostgreSQL with lineage tracking


Understanding DSPy's Programming Model

The Problem with Prompt Engineering

Traditional LLM applications embed logic in strings:

# Fragile approach
prompt = f"""
Extract patient information from this form.
Return JSON with fields: name, dob, address...

Form text:
{ocr_text}
"""
response = llm.complete(prompt)
patient = json.loads(response)  # Hope it's valid JSON!

This breaks when:

  • The model returns malformed JSON

  • Field names don't match your schema

  • The OCR text has unexpected formatting

DSPy's Solution: Typed Signatures

DSPy introduces Signatures - typed contracts that declare inputs and outputs:

import dspy
from pydantic import BaseModel

class Patient(BaseModel):
    name: str
    dob: date
    address: Address
    # ... full schema

class PatientExtractionSignature(dspy.Signature):
    """Extract structured patient information from intake form images."""

    form_images: list[dspy.Image] = dspy.InputField(
        desc="Images of the patient intake form pages"
    )
    patient: Patient = dspy.OutputField(
        desc="Extracted patient information"
    )

Key insight: You define the contract, DSPy figures out the prompting.

Modules: Composable Building Blocks

class PatientExtractor(dspy.Module):
    def __init__(self):
        super().__init__()
        # ChainOfThought adds reasoning steps automatically
        self.extract = dspy.ChainOfThought(PatientExtractionSignature)

    def forward(self, form_images: list[dspy.Image]) -> Patient:
        result = self.extract(form_images=form_images)
        return result.patient

dspy.ChainOfThought wraps your signature with reasoning capabilities. The framework:

  1. Generates an appropriate prompt from your signature

  2. Adds chain-of-thought reasoning steps

  3. Parses the response into your Pydantic model

  4. Validates against your schema


CocoIndex: The Incremental Processing Engine

Why Incremental Processing Matters

Consider a directory with 10,000 patient forms. Traditional pipelines reprocess everything on each run:

Run 1: Process 10,000 files → 10,000 API calls
Run 2: 5 new files added → 10,005 API calls (wasteful!)

CocoIndex tracks data lineage and only processes changes:

Run 1: Process 10,000 files → 10,000 API calls
Run 2: 5 new files added → 5 API calls (efficient!)

The Flow Abstraction

CocoIndex uses a declarative flow model:

import cocoindex

@cocoindex.flow_def(name="PatientIntakeExtraction")
def extraction_flow(flow_builder, data_scope):
    # 1. Define source
    data_scope["documents"] = flow_builder.add_source(
        cocoindex.sources.LocalFile(
            path="data/patient_forms",
            binary=True  # Read as bytes for PDF
        )
    )

    # 2. Create collector for results
    patients_index = data_scope.add_collector()

    # 3. Process each document
    with data_scope["documents"].row() as doc:
        doc["patient_info"] = doc["content"].transform(extract_patient)
        patients_index.collect(
            filename=doc["filename"],
            patient_info=doc["patient_info"]
        )

    # 4. Export to storage
    patients_index.export(
        "patients",
        cocoindex.storages.Postgres(table_name="patients_info"),
        primary_key_fields=["filename"]
    )

Caching with Behavior Versioning

@cocoindex.op.function(cache=True, behavior_version=1)
def extract_patient(pdf_content: bytes) -> Patient:
    # ... extraction logic

The cache=True decorator enables result caching. The behavior_version parameter allows you to invalidate caches when your extraction logic changes.


Implementation Deep Dive

Complete Pydantic Schema

from pydantic import BaseModel, Field
from datetime import date
from typing import Optional

class Contact(BaseModel):
    name: str
    phone: str
    relationship: str

class Address(BaseModel):
    street: str
    city: str
    state: str
    zip_code: str

class Insurance(BaseModel):
    provider: str
    policy_number: str
    group_number: Optional[str] = None
    policyholder_name: str
    relationship_to_patient: str

class Medication(BaseModel):
    name: str
    dosage: str

class Allergy(BaseModel):
    name: str

class Patient(BaseModel):
    name: str
    dob: date
    gender: str
    address: Address
    phone: str
    email: str
    preferred_contact_method: str
    emergency_contact: Contact
    insurance: Optional[Insurance] = None
    reason_for_visit: str
    symptoms_duration: str
    current_medications: list[Medication] = Field(default_factory=list)
    allergies: list[Allergy] = Field(default_factory=list)
    consent_given: bool
    consent_date: Optional[str] = None

PDF to Image Conversion

import pymupdf

def pdf_to_images(pdf_content: bytes) -> list[dspy.Image]:
    """Convert PDF pages to DSPy Image objects."""
    pdf_doc = pymupdf.open(stream=pdf_content, filetype="pdf")
    images = []

    for page in pdf_doc:
        # Render at 2x resolution for better extraction
        pix = page.get_pixmap(matrix=pymupdf.Matrix(2, 2))
        img_bytes = pix.tobytes("png")
        images.append(dspy.Image(img_bytes))

    pdf_doc.close()
    return images

The Complete Extraction Function

@cocoindex.op.function(cache=True, behavior_version=1)
def extract_patient(pdf_content: bytes) -> Patient:
    """Extract patient info from PDF using DSPy vision."""
    form_images = pdf_to_images(pdf_content)
    extractor = PatientExtractor()
    return extractor(form_images=form_images)

Configuration

@cocoindex.settings
def cocoindex_settings() -> cocoindex.Settings:
    # Configure DSPy with Gemini
    lm = dspy.LM("gemini/gemini-2.5-flash")
    dspy.configure(lm=lm)
    return cocoindex.Settings.from_env()

Performance Considerations

1. Image Resolution Trade-offs

Higher resolution improves extraction accuracy but increases:

  • API token costs (larger images)

  • Processing time

  • Memory usage

Recommendation: Start with 2x scaling, adjust based on accuracy needs.

2. Caching Strategy

  • Development: Enable caching to avoid repeated API calls

  • Production: Use behavior_version to invalidate when logic changes

  • Debugging: Temporarily disable caching to test changes

3. Batch Processing

For large document sets, CocoIndex's incremental processing means:

  • Initial backfill processes all documents

  • Subsequent runs only process changes

  • Failed extractions can be retried without reprocessing successes

4. Error Handling

@cocoindex.op.function(cache=True, behavior_version=1)
def extract_patient(pdf_content: bytes) -> Patient | None:
    try:
        form_images = pdf_to_images(pdf_content)
        extractor = PatientExtractor()
        return extractor(form_images=form_images)
    except Exception as e:
        logging.error(f"Extraction failed: {e}")
        return None

Conclusion

This architecture provides several advantages over traditional approaches:

AspectTraditionalDSPy + CocoIndex
Extraction LogicRegex + prompt stringsTyped Signatures
ValidationManual JSON parsingPydantic automatic
ProcessingFull reprocessingIncremental only
CachingCustom implementationBuilt-in
LineageNoneAutomatic tracking
TestingString comparisonModule unit tests

The key insight is separation of concerns:

  • DSPy handles LLM interaction and structured output

  • CocoIndex handles data flow and incremental processing

  • Pydantic handles validation and serialization

Each component does one thing well, and they compose cleanly.


Resources


What challenges have you faced with document extraction? I'd love to hear your experiences in the comments!