Scarf analytics pixel

Feb 6, 2025

RAG: Seamlessly Integrating Context from Multiple Sources into Delta Tables in Databricks

Maria Khalusova

RAG

Knowledge is power, but only when it's accessible. In today's data-driven world, crucial information is often fragmented across a myriad of data storage platforms and business tools: blob storage, company's wiki, Google Drive, Dropbox, Slack, etc. Each system has its own unique authentication methods, data formats, access patterns and quirks - making it a challenge to bring this collective intelligence into a RAG application effectively.

With Unstructured Platform, you can connect to most enterprise systems, ingest the data, preprocess it in a standardized manner to deliver RAG-ready embedded chunks into a destination of your choice. 

In this tutorial, we'll use Amazon S3 and Google Drive as data sources, and we'll show how you can seamlessly process documents from these sources, store the results in a Delta Table in Databricks, and build RAG over this data.

Prepare the data

To start transforming your data with Unstructured Platform, you'll need to sign up on the Unstructured For Developers page. Once you do, you can log into the Platform and process up to 1000 pages per day for free for the first 14 days.

In this tutorial, our data will consist of a few annual 10-K SEC filings from Walmart, Kroeger, and Costco for the 2023 fiscal year. These reports offer a deep insight into each company's financial performance that year.

The documents are originally in PDF format, two of them are stored in a Google Drive folder, and one more is in an Amazon S3 bucket. After preprocessing, we'll store the document chunks with their embeddings in a Delta Table in Databricks for retrieval. Here is what we need to do to prepare the data:

1) Create an S3 source connector in Unstructured Platform to connect it to the documents stored in the Amazon S3 bucket

2) Create a Google Drive source connector in Unstructured Platform to connect it to the documents in a specified Google Drive folder

3) Set up a Delta Table in Databricks which we will use to write the processed documents into

4) Create a Delta Table in Databricks destination connector in Unstructured Platform to upload the processed documents

5) Create a workflow that starts with ingestion of the documents from the source connectors, then adds data transformation steps (such as extracting content of the PDFs, enriching the documents with metadata, chunking the text, and generating embedding vectors for the similarity search), and then ends with uploading the results into the destination.

Let's go over these steps in detail.

Create an S3 source connector in Unstructured Platform

Log in to your Unstructured Platform account, click Connectors on the left side bar, make sure you have Sources selected, and click New to create a new source connector. Alternatively, use this direct link. Choose S3, and enter the required info about your bucket. If you’re unsure how to obtain the required credentials, Unstructured docs contain helpful instructions and videos that guide you through the setup of your S3 bucket for ingestion and obtaining the necessary credentials - check it out.

Create a Google Drive source connector in Unstructured Platform

In the same manner, create a Google Drive source connector. 

  1. Follow the instructions in Unstructured documentation to learn how to enable Google Drive API in your Google Cloud account, create a service account, and obtain a key. 

  2. Make sure to share your Google Drive folder with your service account. 

  3. Find your Google Drive folder id - it is a part of the URL for your Google Drive folder: https://drive.google.com/drive/folders/{folder-id}.

Set up a Delta Table in Databricks

You can reproduce the setup in this tutorial by using the trial version of Databricks. 

Once you create an account, and the cloud set up is complete, note down the URL of your workspace (you’ll get it in the email), as you will need it later on to connect to your workspace. 

In your workspace, navigate to your Catalog, called demo_workspace here, find a schema called default. In that schema, create a volume:  

Next, you need to create a table - we’ll do that using the SQL Editor. However, before navigating to the SQL Editor, go to SQL Warehouses to make sure that you have a Serverless Starter Warehouse and that it is running:

Once you confirm that your Serverless Started Warehouse is alive and kicking, click on it and navigate to the Connection details tab. Copy the values for the Server hostname and HTTP path. You’ll need them later when creating a destination connector in Unstructured Platform: 

Now go to the SQL Editor and run the following query. Note that you may need to modify the first line to change it to <your_catalog>.<your_schema>.elements

CREATE TABLE IF NOT EXISTS demo_workspace.default.elements (
   id STRING NOT NULL PRIMARY KEY,
   record_id STRING,
   element_id STRING,
   text STRING,
   embeddings ARRAY<FLOAT>,
   type STRING,
   date_created TIMESTAMP,
   date_modified TIMESTAMP,
   date_processed TIMESTAMP,
   permissions_data STRING,
   filesize_bytes FLOAT,
   url STRING,
   version STRING,
   record_locator STRING,
   category_depth INT,
   parent_id STRING,
   attached_filename STRING,
   filetype STRING,
   last_modified TIMESTAMP,
   file_directory STRING,
   filename STRING,
   languages ARRAY<STRING>,
   page_number STRING,
   links STRING,
   page_name STRING,
   link_urls STRING,
   link_texts STRING,
   sent_from STRING,
   sent_to STRING,
   subject STRING,
   section STRING,
   header_footer_type STRING,
   emphasized_text_contents STRING,
   emphasized_text_tags STRING,
   text_as_html STRING,
   regex_metadata STRING,
   detection_class_prob FLOAT,
   is_continuation BOOLEAN,
   orig_elements STRING,
   coordinates_points STRING,
   coordinates_system STRING,
   coordinates_layout_width FLOAT,
   coordinates_layout_height FLOAT
);

Running this query will create a table called elements in your catalog under the default schema. You should see it in in your catalog now:

Now, we just need to make sure we have access to write to this table from Unstructured. 

Unstructured Platform currently supports two authentication methods - a Personal Access Token and Service Principal. Service Principal is a more secure method of authentication, and is better to use in a production environment. However, in this tutorial we’ll be using an access token, because it’s simpler to experiment with, and also because this is what LangChain integration supports. 

To get a personal access token, click on your profile icon in the upper right corner and go to Settings. Navigate to Developer settings and click Manage next to Access tokens

Generate a new token, and copy and save its value - you won’t be able to access it again! 

Now we can create a destination connector in the Unstructured Platform for this new Delta Table in Databricks. 

Create a Delta Tables in Databricks destination connector in Unstructured Platform

To create a new destination connector, you can navigate to Connectors in the Platform UI, switch to Destinations, and click New; or use this direct link.

Give your connector a descriptive name, choose Delta Tables in Databricks as the provider, and then provide the authentication information you’ve just obtained.

Click continue, and enter the remaining information: 

When done, click Save and Test, and Unstructured Platform will confirm that it can access and write into this new destination. 

Create and run a data processing workflow in Unstructured Platform

The good news is that we’re past the hardest part of the setup process - preparing the connectors for the data sources and a destination. Setting up a data processing workflow in the Unstructured Platform is really easy. 

Navigate to the Workflows tab in Unstructured Platform, and click New workflow. Choose the Build it with Me option to set up the workflow with pre-configured options. You could start completely from scratch, but the pre-configured options can be edited later, and make building workflows much faster. 

First, choose your sources and destinations using the connectors that you've just created.

Next, select the "Advanced" workflow to preprocess the files:

Optionally, set a schedule, but here we won’t do it, and click Complete

You’ll find your newly created workflow at the top of the list on the Workflows tab. In this tutorial, we’ll modify it slightly. To do so, open the workflow as a DAG:

You can now see the entire flow of the data transformation, and modify it as you like: 

Let’s change the embedding model from the default text-embedding-3-large to text-embedding-3-small. This will be enough for this tutorial:

Note the embedding dimension - 1536, you’ll need this information later when configuring the vector search index. 

When done, save the workflow, and then hit Run to kick off a job. When the job is finished, you can review all the steps your data went through. If there are any errors, you can learn what caused them on the Errors tab. 

If you navigate back to your table in Databricks, you can browse through some sample data. Looking at the data is always a good idea.  

If we want to run similarity search over embeddings in this delta table, we now need to set up Databricks Vector Search over the delta table. The next section will guide you though the steps required to do so. 

Set up Databricks Vector Search

To use Databricks AI Vector Search in a RAG application, you need the following:

  • A vector search endpoint that you can query.

  • A vector search index, created from a Delta table.

First, let’s create a vector search endpoint. You can do so in UI: 

  1. In the left sidebar, click Compute.

  2. Click the Vector Search tab and click Create. Then give your endpoint a name and confirm. 

Create endpoint form


Next, we need to create a vector search from your Delta table. Before you can do this, make sure that your table has the Change Data Feed enabled. By default, it is not. You can change that by running the following line in the SQL Editor. If needed, update the line to <your_catalog>.<your_schema>.elements

ALTER TABLE `demo_workspace`.`default`.`elements` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Now, navigate to your table, and click Create | Vector search index:

Fill out the fields in the dialog that opens. Since we’ve already generated the embeddings during data preprocessing, choose the Use existing embedding column option. The embeddings column is called embeddings, and the text-embedding-3-small model that we used produces vectors with embedding dimension of 1536

When done, wait a few moments for the data to get indexed. Now, we can build RAG!

Build RAG with Databricks Vector Search 

Find the full example in this Google Colab notebook. Let’s walk step by step through building RAG with Databricks Vector Search. 

First, you need to install the necessary libraries:

!pip install -qU langchain-openai databricks-langchain openai

Set your environment variables: OPENAI_API_KEY, DATABRICKS_HOST, DATABRICKS_TOKEN.

To embed the user query, use the same embedding model that was used to generate embedding vectors in the vector search index. In this case, it's text-embedding-3-small from OpenAI.

from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

Create a LangChain vector store. By default, similarity search only returns the primary key and text column. If you want to retrieve the custom metadata associated with the document, pass the additional columns in the columns parameter when initializing the vector store. In this example, we'll use the element type metadata to know whether it's a table or not, and text_as_html metadata to leverage the preserved table structure.

from databricks_langchain import DatabricksVectorSearch
index_name = "demo_workspace.default.demo_index"  # Format: "<catalog>.<schema>.<index-name>"
endpoint_name = "uns_demo_vector_search"
vector_store = DatabricksVectorSearch(
    endpoint=endpoint_name,
    index_name=index_name,
    embedding=embeddings,
    text_column="text", # The column name in the index that contains the text data
    columns=["text_as_html", "type"], # metadata columns to retrieve
)

Set up this vector store as a retriever:

retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 3})

Finally, build a RAG application in which when we retrieve a table, we'll actually give the LLM the html representation of said table instead of plain text.

First, we'll create a function that generates an answer given a question and retrieved documents:

from openai import OpenAI
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def generate_answer(question: str, documents: str):  
    prompt = """
You are an assistant that can answer user questions given provided contextYour answer should be thorough and technicalIf you don't know the answer, or no documents are provided, say 'I do not have enough context to answer the question.'
"""
    augmented_prompt = (
        f"{prompt}"
        f"User question: {question}\n\n"
        f"{documents}"
    )
    response = client.chat.completions.create(
        messages=[
            {'role': 'system', 'content': 'You answer users questions.'},
            {'role': 'user', 'content': augmented_prompt},
        ],
        model="gpt-4o-2024-11-20",
        temperature=0,
    )
    return response.choices[0].message.content

Next, we'll create a helper function that will format the retrieved documents in the following way: if we retrieved a table, then use the text_as_html representation of the table as a source, otherwise, just use the text:

def format_docs(docs):
  useful_content = [doc.page_content if doc.metadata["type"] != "Table" else doc.metadata["text_as_html"] for doc in docs]
  return "\nRetrieved documents:\n" + "".join(
            [
                f"\n\n===== Document {str(i)} =====\n" + doc
                for i, doc in enumerate(useful_content)
            ]
        )

Bring everything together: 

1) Given query, invoke the retriever and get the documents

2) Format the documents to preserve the table structure

3) Pass the formatted documents and the user query to the LLM to generate an answer

def rag(query):
  docs = retriever.invoke(query)
  documents = format_docs(docs)
  answer = generate_answer(query, documents)
  return answer

Let’s try it out: 

query = "What is the exact Kroger's operating profit in 2022?"
rag(query)

Output:

Based on the provided documents, Kroger's operating profit for the fiscal year 2022 (52 weeks) was **$4.126 billion**. This figure is explicitly stated in the financial table in Document 2.

Let’s see what we have in Document 2:

results = retriever.invoke(query)
display(HTML(results[2].metadata["text_as_html"]))

Output:

Congratulations on making it this far! You’ve learned how to preprocess data from multiple sources through a single workflow in Unstructured Platform, and how to store the results in a Delta Table in Databricks. You’ve learned how to create Databricks Vector Search over a delta table, and how to use it as a retriever with LangChain, and finally, you’ve learned how to leverage the table structure that Unstructured preserves in the metadata to improve your RAG. 

Now, it's time to put your knowledge into action! Try Unstructured Platform with a 14-day trial, allowing you to process up to 1,000 pages per day.

For enterprises with more complex needs, we offer tailored solutions. Book a session with our engineers to explore how we can optimize Unstructured for your unique use case.

Keep Reading

Keep Reading

Recent Stories

Recent Stories

Feb 20, 2025

Integration Highlight: Databricks Delta Tables

Unstructured

Unstructured

Feb 20, 2025

Integration Highlight: Databricks Delta Tables

Unstructured

Unstructured

Feb 20, 2025

Integration Highlight: Databricks Delta Tables

Unstructured

Unstructured

Feb 17, 2025

Traditional ETL is not enough for GenAI applications

Maria Khalusova

Unstructured

Feb 17, 2025

Traditional ETL is not enough for GenAI applications

Maria Khalusova

Unstructured

Feb 17, 2025

Traditional ETL is not enough for GenAI applications

Maria Khalusova

Unstructured

Feb 13, 2025

Unstructured Platform Now Integrates with Apache Kafka in Confluent Cloud

Unstructured

LLM

Feb 13, 2025

Unstructured Platform Now Integrates with Apache Kafka in Confluent Cloud

Unstructured

LLM

Feb 13, 2025

Unstructured Platform Now Integrates with Apache Kafka in Confluent Cloud

Unstructured

LLM