Scarf analytics pixel

Nov 24, 2024

Meta-Prompt for Seamless Unstructured Ingest Pipeline Integration

Nina Lopatina + Tarun Narayanan

Unstructured

Effortlessly managing and processing unstructured data is at the heart of modern data pipelines. The Unstructured Ingest Pipeline simplifies this by offering a robust framework for indexing, filtering, partitioning, and embedding unstructured data. But what if you could supercharge your workflows with the precision and adaptability of an LLM? Introducing the Meta-Prompt for the Unstructured Ingest Pipeline — your go-to guide for generating production-ready, API-compatible code and CLI commands with ease.

Let’s explore how Meta-Prompt unlocks the full potential of the Unstructured Ingest Pipeline, offering developers a streamlined way to tackle complex data processing tasks. You can get started with our Serverless API with a free trial for 2 weeks.  

What is a meta-prompt?

Our personalized Meta-Prompt acts as an AI-engineer-in-a-box, guiding you through the entire Unstructured Ingest Pipeline. Whether you're working with local files, cloud storage, or specialized connectors like Azure or S3, Meta-Prompt ensures accurate, efficient, and production-ready implementations. 

Key features include:

  1. Built-in API Context: Avoid hallucinations and ensure LLMs use the correct API calls and settings.

  2. Simplified Implementations: Generate code and CLI commands tailored to your use case, avoiding over-engineered solutions.

  3. No Placeholder Data: Always generates actionable, real-world-ready code.

  4. Connector-Aware Design: Automatically aligns with your source and destination connectors for seamless integration.

  5. Unstructured Ingest Compliance: Leverages all core stages, from indexing to embedding.

Example Use Cases:

  • Local file ingestion with filtering and chunking for structured analysis.

  • Indexing and partitioning PDFs from S3 for downstream RAG systems.

  • Integrating Unstructured into your codebase

  • Writing a workflow to process all of your documents in Azure blob storage, classifying the output to add additional metadata, then writing a new workflow to embed these documents in your destination of choice.

Meta-Prompt in Action

How to use the meta-prompt: First, open your LLM code assistant of choice. We have a full version in our repo that can be used directly in most frontier models (GPT, Claude 3, etc.)

You can copy this prompt for models with a longer context window. If you are using the free version of ChatGPT, or any custom open-source model, we also provide a split version of the Meta Prompt that can be added one message at a time, ensuring that it complies with the model’s context length limitations. Each of these is <5k tokens long. To use this version, copy these three prompts in a row for models with input limitations. 

Once you have pasted these metaprompts in to start, you can try one of our examples below as a follow up prompt that will have you coding with Unstructured’s Serverless API in no time. Note that we were not able to fit every connector in the meta-prompt, so check out our documentation if your prompt doesn’t work.

Scenario 1: Local File Processing

You have a directory of PDFs and want to partition them using the high-resolution strategy, then generate embeddings using Hugging Face models. With Meta-Prompt, you can prompt an LLM to generate this in seconds. In these examples, we are using Claude. Simply type this as your next prompt after the meta-prompt:

Please write all the code for using the ingest pipeline with OpenAI ada002 embeddings and writing files from my "content" dir to "output" in my local directories. I am running in a colab notebook so please import all my variables as secrets

Response:

Here's the complete code for a Google Colab notebook that securely handles credentials and processes files from "content" to "output" using OpenAI ada-002 embeddings:

python

# First, install the required dependencies
!pip install "unstructured-ingest[openai]"
# Import required libraries
import os
from pathlib import Path
from google.colab import userdata
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.local import (
    LocalIndexerConfig,
    LocalDownloaderConfig,
    LocalConnectionConfig,
    LocalUploaderConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
# Get access to the Unstructured Serverless API key for free: app.unstructured.io
# Securely get API keys from Colab Secrets
try:
    UNSTRUCTURED_API_KEY = userdata.get('UNSTRUCTURED_API_KEY')
    UNSTRUCTURED_API_URL = userdata.get('UNSTRUCTURED_API_URL')
    OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
except Exception as e:
    print("Please set up your secrets in Colab:")
    print("1. Click on the 'Files' tab in the left sidebar")
    print("2. Click on the '...' menu next to 'Files'")
    print("3. Select 'Add secrets'")
    print("4. Add the following secrets:")
    print("   - UNSTRUCTURED_API_KEY")
    print("   - UNSTRUCTURED_API_URL")
    print("   - OPENAI_API_KEY")
    raise Exception("Missing required secrets") from e
# Create directories if they don't exist
!mkdir -p content output
# Validate directories
input_dir = Path("content")
output_dir = Path("output")
if not input_dir.exists():
    raise ValueError("Input directory 'content' does not exist")
# Create and run the pipeline
try:
    pipeline = Pipeline.from_configs(
        # Basic processor configuration
        context=ProcessorConfig(
            verbose=True,  # Enable detailed logging
            raise_on_error=True,  # Stop on errors
            num_processes=2  # Number of parallel processes
        ),
        
        # Source configuration (local files)
        indexer_config=LocalIndexerConfig(
            input_path=str(input_dir)
        ),
        downloader_config=LocalDownloaderConfig(),
        source_connection_config=LocalConnectionConfig(),
        
        # Partitioning configuration
        partitioner_config=PartitionerConfig(
            partition_by_api=True,
            api_key=UNSTRUCTURED_API_KEY,
            partition_endpoint=UNSTRUCTURED_API_URL,
            strategy="hi_res",
            additional_partition_args={
                "split_pdf_page": True,
                "split_pdf_allow_failed": True,
                "split_pdf_concurrency_level": 15
            }
        ),
        
        # Chunking configuration
        chunker_config=ChunkerConfig(
            chunking_strategy="by_title",
            chunk_max_characters=500
        ),
        
        # Embedding configuration (OpenAI)
        embedder_config=EmbedderConfig(
            embedding_provider="openai",
            embedding_model_name="text-embedding-ada-002",
            api_key=OPENAI_API_KEY
        ),
        
        # Destination configuration (local files)
        uploader_config=LocalUploaderConfig(
            output_dir=str(output_dir)
        )
    )
    
    # Execute the pipeline
    pipeline.run()

    print(f"Successfully processed files from 'content'

Note that code generation can still be a little buggy – Claude likes to trail off in the last line, so we just deleted that one. We needed to add a “/” after the content directory, and then running the code, we received a few errors that prompted us to add a few more pip installs. Here is the notebook we successfully ran to process a file, with the tweaks we noted. Note that it may take several prompts to tweak the output that you are seeking, and/or to debug the output code.

Scenario 2

Now we will attempt to write a full RAG pipeline starting with Unstructured. Here is the prompt:

Can you write me a RAG pipeline starting with unstructured (s3 to local) then follow with a simple RAG application with a FAISS local vector store and llama3.2 as the inference model? I am running in a colab notebook so please import all my variables as secrets

Here is the output (after reminding Claude to continue after a mid-code trail off):

Response:

Yes, I'll continue with the RAG implementation. Here's the complete code:

```python

# Install required packages
!pip install "unstructured-ingest[s3]" langchain faiss-cpu transformers torch
# For Colab, first let's set up our secrets
from google.colab import userdata
# Get access to the Unstructured Serverless API key for free: app.unstructured.io
UNSTRUCTURED_API_KEY = userdata.get('UNSTRUCTURED_API_KEY')
UNSTRUCTURED_API_URL = userdata.get('UNSTRUCTURED_API_URL')
AWS_ACCESS_KEY_ID = userdata.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = userdata.get('AWS_SECRET_ACCESS_KEY')
AWS_S3_URL = userdata.get('AWS_S3_URL') # format: s3://bucket-name/folder/
import os
import tempfile
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
from unstructured_ingest.v2.interfaces import ProcessorConfig
from unstructured_ingest.v2.processes.connectors.fsspec.s3 import (
S3IndexerConfig,
S3DownloaderConfig,
S3ConnectionConfig,
S3AccessConfig
)
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
from unstructured_ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalUploaderConfig
)
# Create temporary directory for output
output_dir = tempfile.mkdtemp()
def process_documents():
"""Process documents from S3 using Unstructured Ingest"""
pipeline = Pipeline.from_configs(
context=ProcessorConfig(
work_dir=output_dir,
raise_on_error=True
),
indexer_config=S3IndexerConfig(
remote_url=AWS_S3_URL
),
downloader_config=S3DownloaderConfig(),
source_connection_config=S3ConnectionConfig(
access_config=S3AccessConfig(
key=AWS_ACCESS_KEY_ID,
secret=AWS_SECRET_ACCESS_KEY
)
),
partitioner_config=PartitionerConfig(
partition_by_api=True,
api_key=UNSTRUCTURED_API_KEY,
partition_endpoint=UNSTRUCTURED_API_URL,
strategy="hi_res",
additional_partition_args={
"split_pdf_page": True,
"split_pdf_allow_failed": True
}
),
chunker_config=ChunkerConfig(
chunking_strategy="by_title",
chunk_max_characters=500
),
destination_connection_config=LocalConnectionConfig(),
uploader_config=LocalUploaderConfig(
output_dir=output_dir
)
)
pipeline.run()
return output_dir
# Now let's create the RAG application
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.llms import HuggingFacePipeline
from langchain.chains import RetrievalQA
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import json
import glob
Yes, let me continue from where I left off with the complete RAG implementation:
python
Copy
def build_vector_store(processed_docs_dir):
    """Build FAISS vector store from processed documents"""
    # Initialize embeddings
    embeddings = HuggingFaceEmbeddings(
        model_name="sentence-transformers/all-mpnet-base-v2"
    )
    
    # Load processed documents
    texts = []
    for json_file in glob.glob(f"{processed_docs_dir}/*.json"):
        with open(json_file, 'r') as f:
            elements = json.load(f)
            for element in elements:
                if isinstance(element, dict) and 'text' in element:
                    texts.append(element['text'])
    
    # Create FAISS vector store
    vectorstore = FAISS.from_texts(texts, embeddings)
    return vectorstore
def setup_llm():
    """Initialize Llama 2 model"""
    model_id = "meta-llama/Llama-2-7b-chat-hf"
    tokenizer = AutoTokenizer.from_pretrained(model_id)
    model = AutoModelForCausalLM.from_pretrained(
        model_id,
        device_map="auto",
        torch_dtype="auto"
    )
    
    pipe = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_length=2048,
        temperature=0.7,
        top_p=0.95,
        repetition_penalty=1.15
    )
    
    llm = HuggingFacePipeline(pipeline=pipe)
    return llm
def create_rag_chain(vectorstore, llm):
    """Create RAG chain combining retrieval and generation"""
    qa_chain = RetrievalQA.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vectorstore.as_retriever(
            search_kwargs={"k": 3}
        ),
        return_source_documents=True
    )
    return qa_chain
# Main execution
if __name__ == "__main__":
    # Process documents from S3
    print("Processing documents from S3...")
    processed_docs_dir = process_documents()
    
    # Build vector store
    print("Building vector store...")
    vectorstore = build_vector_store(processed_docs_dir)
    
    # Setup LLM
    print("Setting up LLM...")
    llm = setup_llm()
    
    # Create RAG chain
    print("Creating RAG chain...")
    qa_chain = create_rag_chain(vectorstore, llm)
    
    # Interactive query loop
    while True:
        query = input("\nEnter your question (or 'quit' to exit): ")
        if query.lower() == 'quit':
            break
            
        try:
            result = qa_chain({"query": query})
            print("\nAnswer:", result['result'])
            print("\nSource Documents:")
            for doc in result['source_documents']:
                print("-", doc.page_content[:200], "...")
        except Exception as e:

This completes the implementation. Here's what each part does:

  1. build_vector_store(): Creates a FAISS vector store from the processed documents

  2. setup_llm(): Initializes the Llama 2 model for generation

  3. create_rag_chain(): Combines the vector store and LLM into a RAG chain

  4. The main execution block sets everything up and provides an interactive query interface

__________

We did need to debug this one as well, pasting an error message and then adding more pip installs with subsequent errors. The final notebook we ran with those tweaks is here. With a few tweaks, the pipeline ran, but was not well-optimized for speed. Perhaps combining additional meta-prompts and tweaking the prompt would yield better results, but this is a great start for creating a complete RAG pipeline in two prompts!

We also have an example of our meta prompt in action with ChatGPT.

Are you ready to unlock the full potential of your data workflows with RAG? Explore more in the Unstructured Documentation.

Keep Reading

Keep Reading

Recent Stories

Recent Stories

Dec 29, 2024

Transform files in S3 to Pinecone with Unstructured Platform with no code

Nina Lopatina

Unstructured

Dec 29, 2024

Transform files in S3 to Pinecone with Unstructured Platform with no code

Nina Lopatina

Unstructured

Dec 29, 2024

Transform files in S3 to Pinecone with Unstructured Platform with no code

Nina Lopatina

Unstructured

Dec 18, 2024

Introducing Unstructured Platform API for Programmatic Data Transformation

Unstructured

Unstructured

Dec 18, 2024

Introducing Unstructured Platform API for Programmatic Data Transformation

Unstructured

Unstructured

Dec 18, 2024

Introducing Unstructured Platform API for Programmatic Data Transformation

Unstructured

Unstructured

Dec 17, 2024

No-Code AI Assistant in No Time with Unstructured Platform, AstraDB, and Langflow

Maria Khalusova

RAG

Dec 17, 2024

No-Code AI Assistant in No Time with Unstructured Platform, AstraDB, and Langflow

Maria Khalusova

RAG

Dec 17, 2024

No-Code AI Assistant in No Time with Unstructured Platform, AstraDB, and Langflow

Maria Khalusova

RAG