Oct 3, 2023

Setting up a Private Retrieval Augmented Generation

Unstructured

Data Transforming

In the realm of AI, access to current and accurate data is paramount. The Retrieval Augmented Generation (RAG) model exemplifies this, serving as an established tool in the AI ecosystem that taps into the synergies of large language models with external databases to deliver more precise and up-to-date answers. With RAG, you aren’t just getting black box answers; you’re receiving insights grounded in real-time information, pulled directly from the most recent data sources.

However, the power of RAG is truly unlocked when it has a reliable and efficient source of data. This is where Unstructured shines. Imagine having a treasure trove of data spread out in various formats and places, and the task of harnessing it feels herculean. Unstructured is purpose-built to bridge this gap. It acts as the ETL pipeline specifically designed for LLMs, connecting to your data regardless of its format or location, transforming and cleansing it into a streamlined and usable format. In essence, Unstructured is empowering your AI models with real-time, actionable data. It’s the first, yet the most crucial step in your AI journey, transforming unstructured data chaos into data-driven insights.

While many organizations are inclined to cloud-based RAG solutions, there’s a growing sentiment towards local implementations. This shift is driven by concerns over data privacy, desires for reduced latency, and considerations of cost. Opting for a local RAG system means ensuring that your data never leaves your premises, a crucial aspect for entities dealing with sensitive or proprietary information.

In this article, we’ll explore the specifics of constructing a private RAG system using a local model and vector database. We’ll break down the process and offer a step-by-step guide for those interested in such a setup. We will also spotlight why using Unstructured in your setup is not just a choice but a necessity.



How to use Unstructured in your Local RAG System:

Unstructured is a critical tool when setting up your own RAG system. Here’s a step-by-step guide to get you started:


Prerequisites:
  1. Unstructured: Grab it from PyPI or directly clone its GitHub repository.

  2. Pyenv: Install it from PyPL or via Brew

  3. Docker: Essential for containerized software utility.

  4. I will run the notebook in Jupyter Notebook. If you haven’t installed Jupyter Notebook yet you can follow these steps.


Setting up your Environment:

Now that you have installed the required packages, open up your terminal and run the following commands.

PYENV Installation: Use pyenv to manage your Python versions seamlessly.


pyenv install 3.9.1
pyenv virtualenv 3.9.1 local-rag
pyenv activate local-rag


Clone Repo:


git clone https://github.com/jackretterer/local-rag.git


Dependencies Installation: Install necessary packages from the provided requirements file.


cd local-rag
pip install -r requirement.txt


Update Hugging Face Hub: To avoid any unforeseen issues, it’s a good practice to ensure that you’re running the latest version of huggingface_hub.


pip install --upgrade huggingface_hub


Docker Setup: Fire up Docker.

Creating the model directory: This directory will store the required model files.


mkdir model_files


Installing the llama-cpp-python package with specific build arguments:


CMAKE_ARGS="-DLLAMA_METAL=on" FORCE_CMAKE=1 pip install llama-cpp-python


Downloading a pre-trained model from Hugging Face Hub:


huggingface-cli download TheBloke/Llama-2-7b-Chat-GGUF --local-dir
model_files --local-dir-use-symlinks False --include='*Q4_K*gguf'


I also made a new kernel for my jupyter notebook via:

  1. Ipykernel, you can install it via: $pip install ipykernel

  2. Run: $python -m ipykernel install — user — name=local-rag

  3. Restart notebook

  4. Kernel->Change Kernel->Local Rag


Set Parameters and Constants:

Before diving into the coding part, let’s first establish the foundation. Here, we define some key parameters and constants (this tutorial was designed to be run on Apple devices).


output_dir = 'my-docs' # Path to output directory
input_dir = 'files_used' # Path to input directory
weaviate_url = "http://localhost:8080"
embedding_model_name = 'all-MiniLM-L6-v2'
device = 'mps'


Notice how we’ve set up directories for both input and output files. We also specify the weaviate_url, a crucial endpoint for our upcoming interactions.


Helper Functions to Ingest Documents and Pre-Process Them:

Next, we’re entering the realm of document ingestion and pre-processing. These helper functions will be our trusty tools to ensure a seamless data flow.


import subprocess
import os
from typing import List, Dict
from userpaths import get_my_documents


def process_local(output_dir: str, num_processes: int, input_path: str):
command = [
"unstructured-ingest",
"local",
"--input-path", input_path,
"--output-dir", output_dir,
"--num-processes", str(num_processes),
"--recursive",
"--verbose",
]

# Run the command
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output, error = process.communicate()

# Print output
if process.returncode == 0:
print('Command executed successfully. Output:')
print(output.decode())
else:
print('Command failed. Error:')
print(error.decode())

def get_result_files(folder_path) -> List[Dict]:
file_list = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.endswith('.json'):
file_path = os.path.join(root, file)
file_list.append(file_path)
return file_list


Once our data is processed, the get_result_files function comes into play. This utility function scours a directory to fetch a list of all JSON files, providing us with the much-needed collection of processed results.

Run Desired Pipeline To Generate Clean JSON using Unstructured


process_local(output_dir=output_dir, num_processes=2, input_path=input_dir)
files = get_result_files(output_dir)


Helper Functions to Setup Weaviate Schema and Client


import uuid
import weaviate
from weaviate.util import get_valid_uuid

def create_local_weaviate_client(db_url: str):
return weaviate.Client(
url=db_url,
)

def get_schema(vectorizer: str = "none"):
return {
"classes": [
{
"class": "Doc",
"description": "A generic document class",
"vectorizer": vectorizer,
"properties": [
{
"name": "last_modified",
"dataType": ["text"],
"description": "Last modified date for the document",
},
{
"name": "player",
"dataType": ["text"],
"description": "Player related to the document",
},
{
"name": "position",
"dataType": ["text"],
"description": "Player Position related to the document",
},
{
"name": "text",
"dataType": ["text"],
"description": "Text content for the document",
},
],
},
],
}

def upload_schema(my_schema, weaviate):
weaviate.schema.delete_all()
weaviate.schema.create(my_schema)

def count_documents(client: weaviate.Client) -> Dict:
response = (
client.query
.aggregate("Doc")
.with_meta_count()
.do()
)
count = response
return count


Setup Weaviate Client and Schema

Before diving into the intricate process of document indexing, the primary step is to set up our Weaviate client. This client will act as a bridge, connecting us to the Weaviate instance.


client = create_local_weaviate_client(db_url=weaviate_url)
my_schema = get_schema()
upload_schema(my_schema, weaviate=client)


With this groundwork, we have our Weaviate system primed and ready to accept the upcoming document chunks.


Helper Functions to Stage Unstructured Documents for Indexing

Handling unstructured data can be tricky. Hence, we’ll be leveraging a series of helper functions that leverage Unstructured’s library to process the unstructured data, chunk them appropriately, and index them in our system.


from unstructured.chunking.title import chunk_by_title
from unstructured.documents.elements import DataSourceMetadata
from unstructured.partition.json import partition_json
from sentence_transformers import SentenceTransformer

embedding_model = SentenceTransformer(embedding_model_name, device=device)

def compute_embedding(chunk_text: List[str]):
embeddings = embedding_model.encode(chunk_text, device=device)
return embeddings

def get_chunks(elements, chunk_under_n_chars=500, chunk_new_after_n_chars=1500):
for element in elements:
if not type(element.metadata.data_source) is DataSourceMetadata:
delattr(element.metadata, "data_source")

if hasattr(element.metadata, "coordinates"):
delattr(element.metadata, "coordinates")

chunks = chunk_by_title(
elements,
combine_under_n_chars=chunk_under_n_chars,
new_after_n_chars=chunk_new_after_n_chars
)

for i in range(len(chunks)):
chunks[i] = {"last_modified": chunks[i].metadata.last_modified, "text": chunks[i].text}

chunk_texts = [x['text'] for x in chunks]
embeddings = compute_embedding(chunk_texts)
return chunks, embeddings


def add_data_to_weaviate(files, client, chunk_under_n_chars=500, chunk_new_after_n_chars=1500):
for filename in files:
try:
elements = partition_json(filename=filename)
chunks, embeddings = get_chunks(elements, chunk_under_n_chars, chunk_new_after_n_chars)
except IndexError as e:
print(e)
continue

print(f"Uploading {len(chunks)} chunks for {str(filename)}.")
for i, chunk in enumerate(chunks):
client.batch.add_data_object(
data_object=chunk,
class_name="doc",
uuid=get_valid_uuid(uuid.uuid4()),
vector=embeddings[i]
)

client.batch.flush()


Now, the spotlight is on add_data_to_weaviate. This function is the captain of our ship, orchestrating the entire process of reading, chunking, embedding, and eventually storing the data into Weaviate.


Add Chunks to Weaviate

Using the predefined functions, we’ll ingest our document chunks into the Weaviate system.


add_data_to_weaviate(
files=files,
client=client,
chunk_under_n_chars=250,
chunk_new_after_n_chars=500
)

print(count_documents(client=client)['data']['Aggregate']['Doc'])


Upon completion, a quick print statement gives us insight into the total number of documents we’ve successfully added.


LangChain RAG Application

For those unfamiliar, LangChain provides a suite of tools designed to enhance information retrieval, search, and more.


from langchain.llms import LlamaCpp
from langchain.vectorstores.weaviate import Weaviate
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.prompts import PromptTemplate


Instantiate Local Llama 2 LLM

The heart of our question-answering system lies in the open source Llama 2 LLM. At its core, it’s an intricate yet powerful model designed to generate human-like text based on the input provided.


callback_manager = CallbackManager([StreamingStdOutCallbackHandler()])
n_gpu_layers = 1 # Metal set to 1 is enough.
n_batch = 100 # Should be between 1 and n_ctx, consider the amount of RAM of your Apple Silicon Chip.
# Make sure the model path is correct for your system!
llm = LlamaCpp(
model_path="model_files/llama-2-7b-chat.Q4_K_S.gguf",
n_gpu_layers=n_gpu_layers,
n_batch=n_batch,
n_ctx=2048, # context window. By default 512
f16_kv=True, # MUST set to True, otherwise you will run into problem after a couple of calls
callback_manager=callback_manager,
verbose=True, # Verbose is required to pass to the callback manager
)


Helper Function to Run RAG Process

Our main endeavor is to fetch relevant answers to any posed question. Hence, creating a function that encapsulates the entire RAG (Retrieval Augmented Generation) process is crucial.


def question_answer(question: str, vectorstore: Weaviate):
embedding = compute_embedding(question)
similar_docs = vectorstore.max_marginal_relevance_search_by_vector(embedding)
content = [x.page_content for x in similar_docs]
prompt_template = PromptTemplate.from_template(
"""\
Given context about the subject, answer the question based on the context provided to the best of your ability.
Context: {context}
Question:
{question}
Answer:
"""
)
prompt = prompt_template.format(context=content, question=question)
answer = llm(prompt)
return answer, similar_docs


This function, question_answer, works in stages. Initially, it computes an embedding of the provided question using our earlier-defined method. With this embedding, the system scours the vector store to find documents most related to our query. These documents serve as the context for our LLM, helping it craft a well-informed response.


Run RAG on a Question

Time to put our system to the test.


client = weaviate.Client(weaviate_url)
vectorstore = Weaviate(client, "Doc", "text")

question = "Give a summary of NFL Draft 2020 Scouting Reports: RB Jonathan Taylor, Wisconsin?"

answer, similar_docs = question_answer(question, vectorstore)

print("\n\n\n-------------------------")
print(f"QUERY: {question}")
print("\n\n\n-------------------------")
print(f"Answer: {answer}")
print("\n\n\n-------------------------")
for index, result in enumerate(similar_docs):
print(f"\n\n-- RESULT {index+1}:\n")
print(result)


Results:


This result does show an accurate retrieval.


Conclusion:

In an era dominated by digital innovation, building robust RAG systems is no longer a herculean task. With the step-by-step guide laid out in this blog, we’ve offered an approach that is both effective and accessible. While the presented workflow can undergo further refinements to meet the demands of a production-grade environment, our primary focus was to maintain its simplicity for clarity’s sake. Moreover, the emphasis on local setups underscores the significance of data privacy. By opting for in-house RAG implementations, organizations can strike a balance between harnessing the power of cutting-edge AI technology and ensuring that sensitive information remains safeguarded. As part of our ongoing commitment to enhancing data privacy, we’re thrilled to announce that support for RBAC (Role-Based Access Control) is on our roadmap and will be rolling out in the coming weeks. As AI continues to permeate various sectors, prioritizing data security becomes paramount. Through local RAG systems, we are not just stepping into the future of AI; we are doing so responsibly and ethically.

Encountered challenges following this tutorial or have questions? I encourage you to join our community Slack group. There, you can connect with fellow users, exchange insights, and stay updated on the latest developments. We’re excited to see the innovations you come up with!

A link to the full code can be found in this Github repo.