%%capture --no-stderr
%pip install --quiet -U langchain langchain_community tiktoken langchain-nomic "nomic[local]" langchain-ollama scikit-learn langgraph tavily-python bs4
Local RAG agent with LLaMA3¶
We'll combine ideas from paper RAG papers into a RAG agent:
- Routing: Adaptive RAG (paper). Route questions to different retrieval approaches
- Fallback: Corrective RAG (paper). Fallback to web search if docs are not relevant to query
- Self-correction: Self-RAG (paper). Fix answers w/ hallucinations or don’t address question
Local models¶
Embedding¶
LLM¶
### LLM
from langchain_ollama import ChatOllama
local_llm = "llama3.2:3b-instruct-fp16"
llm = ChatOllama(model=local_llm, temperature=0)
llm_json_mode = ChatOllama(model=local_llm, temperature=0, format="json")
Search¶
For search, we use Tavily, which is a search engine optimized for LLMs and RAG.
import os
import getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("TAVILY_API_KEY")
os.environ["TOKENIZERS_PARALLELISM"] = "true"
Tracing¶
Optionally, use LangSmith for tracing.
_set_env("LANGSMITH_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "local-llama32-rag"
Vectorstore¶
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import SKLearnVectorStore
from langchain_nomic.embeddings import NomicEmbeddings
urls = [
"https://lilianweng.github.io/posts/2023-06-23-agent/",
"https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
"https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
# Load documents
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
# Split documents
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=200
)
doc_splits = text_splitter.split_documents(docs_list)
# Add to vectorDB
vectorstore = SKLearnVectorStore.from_documents(
documents=doc_splits,
embedding=NomicEmbeddings(model="nomic-embed-text-v1.5", inference_mode="local"),
)
# Create retriever
retriever = vectorstore.as_retriever(k=3)
API Reference: RecursiveCharacterTextSplitter | WebBaseLoader | SKLearnVectorStore | NomicEmbeddings
Components¶
### Router
import json
from langchain_core.messages import HumanMessage, SystemMessage
# Prompt
router_instructions = """You are an expert at routing a user question to a vectorstore or web search.
The vectorstore contains documents related to agents, prompt engineering, and adversarial attacks.
Use the vectorstore for questions on these topics. For all else, and especially for current events, use web-search.
Return JSON with single key, datasource, that is 'websearch' or 'vectorstore' depending on the question."""
# Test router
test_web_search = llm_json_mode.invoke(
[SystemMessage(content=router_instructions)]
+ [
HumanMessage(
content="Who is favored to win the NFC Championship game in the 2024 season?"
)
]
)
test_web_search_2 = llm_json_mode.invoke(
[SystemMessage(content=router_instructions)]
+ [HumanMessage(content="What are the models released today for llama3.2?")]
)
test_vector_store = llm_json_mode.invoke(
[SystemMessage(content=router_instructions)]
+ [HumanMessage(content="What are the types of agent memory?")]
)
print(
json.loads(test_web_search.content),
json.loads(test_web_search_2.content),
json.loads(test_vector_store.content),
)
API Reference: HumanMessage | SystemMessage
### Retrieval Grader
# Doc grader instructions
doc_grader_instructions = """You are a grader assessing relevance of a retrieved document to a user question.
If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant."""
# Grader prompt
doc_grader_prompt = """Here is the retrieved document: \n\n {document} \n\n Here is the user question: \n\n {question}.
This carefully and objectively assess whether the document contains at least some information that is relevant to the question.
Return JSON with single key, binary_score, that is 'yes' or 'no' score to indicate whether the document contains at least some information that is relevant to the question."""
# Test
question = "What is Chain of thought prompting?"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
doc_grader_prompt_formatted = doc_grader_prompt.format(
document=doc_txt, question=question
)
result = llm_json_mode.invoke(
[SystemMessage(content=doc_grader_instructions)]
+ [HumanMessage(content=doc_grader_prompt_formatted)]
)
json.loads(result.content)
### Generate
# Prompt
rag_prompt = """You are an assistant for question-answering tasks.
Here is the context to use to answer the question:
{context}
Think carefully about the above context.
Now, review the user question:
{question}
Provide an answer to this questions using only the above context.
Use three sentences maximum and keep the answer concise.
Answer:"""
# Post-processing
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# Test
docs = retriever.invoke(question)
docs_txt = format_docs(docs)
rag_prompt_formatted = rag_prompt.format(context=docs_txt, question=question)
generation = llm.invoke([HumanMessage(content=rag_prompt_formatted)])
print(generation.content)
Chain of Thought (CoT) prompting is a technique used in natural language processing to generate human-like responses by iteratively asking questions and refining the search space through external search queries, such as Wikipedia APIs. CoT prompting involves decomposing problems into multiple thought steps, generating multiple thoughts per step, and evaluating each state using a classifier or majority vote. The goal is to find an optimal instruction that leads to the desired output, which can be achieved by optimizing prompt parameters directly on the embedding space via gradient descent or searching over a pool of model-generated instruction candidates.
### Hallucination Grader
# Hallucination grader instructions
hallucination_grader_instructions = """
You are a teacher grading a quiz.
You will be given FACTS and a STUDENT ANSWER.
Here is the grade criteria to follow:
(1) Ensure the STUDENT ANSWER is grounded in the FACTS.
(2) Ensure the STUDENT ANSWER does not contain "hallucinated" information outside the scope of the FACTS.
Score:
A score of yes means that the student's answer meets all of the criteria. This is the highest (best) score.
A score of no means that the student's answer does not meet all of the criteria. This is the lowest possible score you can give.
Explain your reasoning in a step-by-step manner to ensure your reasoning and conclusion are correct.
Avoid simply stating the correct answer at the outset."""
# Grader prompt
hallucination_grader_prompt = """FACTS: \n\n {documents} \n\n STUDENT ANSWER: {generation}.
Return JSON with two two keys, binary_score is 'yes' or 'no' score to indicate whether the STUDENT ANSWER is grounded in the FACTS. And a key, explanation, that contains an explanation of the score."""
# Test using documents and generation from above
hallucination_grader_prompt_formatted = hallucination_grader_prompt.format(
documents=docs_txt, generation=generation.content
)
result = llm_json_mode.invoke(
[SystemMessage(content=hallucination_grader_instructions)]
+ [HumanMessage(content=hallucination_grader_prompt_formatted)]
)
json.loads(result.content)
{'binary_score': 'yes',
'explanation': 'The student answer provides a clear and accurate description of Chain of Thought (CoT) prompting, its components, and its goals. It also mentions various techniques used in CoT prompting, such as external search queries, prompt tuning, and automatic prompt engineering. The answer demonstrates an understanding of the concept and its applications in natural language processing.'}
### Answer Grader
# Answer grader instructions
answer_grader_instructions = """You are a teacher grading a quiz.
You will be given a QUESTION and a STUDENT ANSWER.
Here is the grade criteria to follow:
(1) The STUDENT ANSWER helps to answer the QUESTION
Score:
A score of yes means that the student's answer meets all of the criteria. This is the highest (best) score.
The student can receive a score of yes if the answer contains extra information that is not explicitly asked for in the question.
A score of no means that the student's answer does not meet all of the criteria. This is the lowest possible score you can give.
Explain your reasoning in a step-by-step manner to ensure your reasoning and conclusion are correct.
Avoid simply stating the correct answer at the outset."""
# Grader prompt
answer_grader_prompt = """QUESTION: \n\n {question} \n\n STUDENT ANSWER: {generation}.
Return JSON with two two keys, binary_score is 'yes' or 'no' score to indicate whether the STUDENT ANSWER meets the criteria. And a key, explanation, that contains an explanation of the score."""
# Test
question = "What are the vision models released today as part of Llama 3.2?"
answer = "The Llama 3.2 models released today include two vision models: Llama 3.2 11B Vision Instruct and Llama 3.2 90B Vision Instruct, which are available on Azure AI Model Catalog via managed compute. These models are part of Meta's first foray into multimodal AI and rival closed models like Anthropic's Claude 3 Haiku and OpenAI's GPT-4o mini in visual reasoning. They replace the older text-only Llama 3.1 models."
# Test using question and generation from above
answer_grader_prompt_formatted = answer_grader_prompt.format(
question=question, generation=answer
)
result = llm_json_mode.invoke(
[SystemMessage(content=answer_grader_instructions)]
+ [HumanMessage(content=answer_grader_prompt_formatted)]
)
json.loads(result.content)
{'binary_score': 'yes',
'explanation': "The student's answer helps to answer the question by providing specific details about the vision models released as part of Llama 3.2. The answer mentions two vision models (Llama 3.2 11B Vision Instruct and Llama 3.2 90B Vision Instruct) and their availability on Azure AI Model Catalog via managed compute. Additionally, the student provides context about Meta's first foray into multimodal AI and compares these models to other visual reasoning models like Claude 3 Haiku and GPT-4o mini. This extra information is not explicitly asked for in the question, but it demonstrates a thorough understanding of the topic. The answer also correctly states that these models replace the older text-only Llama 3.1 models, which meets all the criteria specified in the question."}
Web Search Tool¶
### Search
from langchain_community.tools.tavily_search import TavilySearchResults
web_search_tool = TavilySearchResults(k=3)
API Reference: TavilySearchResults
Graph¶
We build the above workflow as a graph using LangGraph.
Graph state¶
The graph state
schema contains keys that we want to:
- Pass to each node in our graph
- Optionally, modify in each node of our graph
See conceptual docs here.
import operator
from typing_extensions import TypedDict
from typing import List, Annotated
class GraphState(TypedDict):
"""
Graph state is a dictionary that contains information we want to propagate to, and modify in, each graph node.
"""
question: str # User question
generation: str # LLM generation
web_search: str # Binary decision to run web search
max_retries: int # Max number of retries for answer generation
answers: int # Number of answers generated
loop_step: Annotated[int, operator.add]
documents: List[str] # List of retrieved documents
Each node in our graph is simply a function that:
(1) Take state
as an input
(2) Modifies state
(3) Write the modified state
to the state schema (dict)
See conceptual docs here.
Each edge routes between nodes in the graph.
See conceptual docs here.
from langchain.schema import Document
from langgraph.graph import END
### Nodes
def retrieve(state):
"""
Retrieve documents from vectorstore
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, documents, that contains retrieved documents
"""
print("---RETRIEVE---")
question = state["question"]
# Write retrieved documents to documents key in state
documents = retriever.invoke(question)
return {"documents": documents}
def generate(state):
"""
Generate answer using RAG on retrieved documents
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, generation, that contains LLM generation
"""
print("---GENERATE---")
question = state["question"]
documents = state["documents"]
loop_step = state.get("loop_step", 0)
# RAG generation
docs_txt = format_docs(documents)
rag_prompt_formatted = rag_prompt.format(context=docs_txt, question=question)
generation = llm.invoke([HumanMessage(content=rag_prompt_formatted)])
return {"generation": generation, "loop_step": loop_step + 1}
def grade_documents(state):
"""
Determines whether the retrieved documents are relevant to the question
If any document is not relevant, we will set a flag to run web search
Args:
state (dict): The current graph state
Returns:
state (dict): Filtered out irrelevant documents and updated web_search state
"""
print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
question = state["question"]
documents = state["documents"]
# Score each doc
filtered_docs = []
web_search = "No"
for d in documents:
doc_grader_prompt_formatted = doc_grader_prompt.format(
document=d.page_content, question=question
)
result = llm_json_mode.invoke(
[SystemMessage(content=doc_grader_instructions)]
+ [HumanMessage(content=doc_grader_prompt_formatted)]
)
grade = json.loads(result.content)["binary_score"]
# Document relevant
if grade.lower() == "yes":
print("---GRADE: DOCUMENT RELEVANT---")
filtered_docs.append(d)
# Document not relevant
else:
print("---GRADE: DOCUMENT NOT RELEVANT---")
# We do not include the document in filtered_docs
# We set a flag to indicate that we want to run web search
web_search = "Yes"
continue
return {"documents": filtered_docs, "web_search": web_search}
def web_search(state):
"""
Web search based based on the question
Args:
state (dict): The current graph state
Returns:
state (dict): Appended web results to documents
"""
print("---WEB SEARCH---")
question = state["question"]
documents = state.get("documents", [])
# Web search
docs = web_search_tool.invoke({"query": question})
web_results = "\n".join([d["content"] for d in docs])
web_results = Document(page_content=web_results)
documents.append(web_results)
return {"documents": documents}
### Edges
def route_question(state):
"""
Route question to web search or RAG
Args:
state (dict): The current graph state
Returns:
str: Next node to call
"""
print("---ROUTE QUESTION---")
route_question = llm_json_mode.invoke(
[SystemMessage(content=router_instructions)]
+ [HumanMessage(content=state["question"])]
)
source = json.loads(route_question.content)["datasource"]
if source == "websearch":
print("---ROUTE QUESTION TO WEB SEARCH---")
return "websearch"
elif source == "vectorstore":
print("---ROUTE QUESTION TO RAG---")
return "vectorstore"
def decide_to_generate(state):
"""
Determines whether to generate an answer, or add web search
Args:
state (dict): The current graph state
Returns:
str: Binary decision for next node to call
"""
print("---ASSESS GRADED DOCUMENTS---")
question = state["question"]
web_search = state["web_search"]
filtered_documents = state["documents"]
if web_search == "Yes":
# All documents have been filtered check_relevance
# We will re-generate a new query
print(
"---DECISION: NOT ALL DOCUMENTS ARE RELEVANT TO QUESTION, INCLUDE WEB SEARCH---"
)
return "websearch"
else:
# We have relevant documents, so generate answer
print("---DECISION: GENERATE---")
return "generate"
def grade_generation_v_documents_and_question(state):
"""
Determines whether the generation is grounded in the document and answers question
Args:
state (dict): The current graph state
Returns:
str: Decision for next node to call
"""
print("---CHECK HALLUCINATIONS---")
question = state["question"]
documents = state["documents"]
generation = state["generation"]
max_retries = state.get("max_retries", 3) # Default to 3 if not provided
hallucination_grader_prompt_formatted = hallucination_grader_prompt.format(
documents=format_docs(documents), generation=generation.content
)
result = llm_json_mode.invoke(
[SystemMessage(content=hallucination_grader_instructions)]
+ [HumanMessage(content=hallucination_grader_prompt_formatted)]
)
grade = json.loads(result.content)["binary_score"]
# Check hallucination
if grade == "yes":
print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
# Check question-answering
print("---GRADE GENERATION vs QUESTION---")
# Test using question and generation from above
answer_grader_prompt_formatted = answer_grader_prompt.format(
question=question, generation=generation.content
)
result = llm_json_mode.invoke(
[SystemMessage(content=answer_grader_instructions)]
+ [HumanMessage(content=answer_grader_prompt_formatted)]
)
grade = json.loads(result.content)["binary_score"]
if grade == "yes":
print("---DECISION: GENERATION ADDRESSES QUESTION---")
return "useful"
elif state["loop_step"] <= max_retries:
print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
return "not useful"
else:
print("---DECISION: MAX RETRIES REACHED---")
return "max retries"
elif state["loop_step"] <= max_retries:
print("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
return "not supported"
else:
print("---DECISION: MAX RETRIES REACHED---")
return "max retries"
Control Flow¶
from langgraph.graph import StateGraph
from IPython.display import Image, display
workflow = StateGraph(GraphState)
# Define the nodes
workflow.add_node("websearch", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generate
# Build graph
workflow.set_conditional_entry_point(
route_question,
{
"websearch": "websearch",
"vectorstore": "retrieve",
},
)
workflow.add_edge("websearch", "generate")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
"grade_documents",
decide_to_generate,
{
"websearch": "websearch",
"generate": "generate",
},
)
workflow.add_conditional_edges(
"generate",
grade_generation_v_documents_and_question,
{
"not supported": "generate",
"useful": END,
"not useful": "websearch",
"max retries": END,
},
)
# Compile
graph = workflow.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
API Reference: StateGraph
inputs = {"question": "What are the types of agent memory?", "max_retries": 3}
for event in graph.stream(inputs, stream_mode="values"):
print(event)
Trace:
https://smith.langchain.com/public/1e01baea-53e9-4341-a6d1-b1614a800a97/r
# Test on current events
inputs = {
"question": "What are the models released today for llama3.2?",
"max_retries": 3,
}
for event in graph.stream(inputs, stream_mode="values"):
print(event)
Trace:
https://smith.langchain.com/public/acdfa49d-aa11-48fb-9d9c-13a687ff311f/r