document processing and response gen smooth

This commit is contained in:
Madhu
2024-12-04 22:43:23 +05:30
parent c9ea9655bd
commit 5193d040f1

View File

@@ -47,13 +47,13 @@ def process_document(file_path: str) -> None:
try:
import time
start_time = time.time()
# Insert document into PostgreSQL database
insert_document(Path(file_path), config=my_config)
processing_time = time.time() - start_time
logger.info(f"Document processed and embedded in {processing_time:.2f} seconds")
except Exception as e:
logger.error(f"Error processing document: {str(e)}")
raise
@@ -65,15 +65,15 @@ def perform_search(query: str) -> List[dict]:
# First try hybrid search in the database
chunk_ids, scores = hybrid_search(query, num_results=10, config=my_config)
logger.debug(f"Found {len(chunk_ids)} chunks with scores: {scores}")
if not chunk_ids:
logger.info("No relevant chunks found in database")
return []
# Retrieve and rerank chunks
chunks = retrieve_chunks(chunk_ids, config=my_config)
reranked_chunks = rerank_chunks(query, chunks, config=my_config)
return reranked_chunks
except Exception as e:
logger.error(f"Search error: {str(e)}")
@@ -83,15 +83,13 @@ def perform_search(query: str) -> List[dict]:
async def handle_settings_update(settings: dict):
"""Handle settings updates when user submits the form."""
try:
# Validate API keys
def validate_key(key: str, key_type: str, valid_prefixes: tuple) -> bool:
if not key:
raise ValueError(f"{key_type} API key is required")
if valid_prefixes and not any(key.startswith(prefix) for prefix in valid_prefixes):
raise ValueError(f"Invalid {key_type} API key format")
return True
# Validate DB URL
# Validate DB URL
def validate_db_url(url: str) -> bool:
valid_prefixes = ('postgresql://', 'mysql://', 'sqlite:///')
if not url:
@@ -100,12 +98,10 @@ async def handle_settings_update(settings: dict):
raise ValueError("Invalid database URL format")
return True
# Validate all inputs
validate_key(settings["OpenAIApiKey"], "OpenAI", ("sk-", "sk-proj-"))
validate_key(settings["AnthropicApiKey"], "Anthropic", ("sk-ant-",))
validate_key(settings["CohereApiKey"], "Cohere", tuple())
validate_db_url(settings["DBUrl"])
# Store validated values in user_env
user_env = {
"OPENAI_API_KEY": settings["OpenAIApiKey"],
@@ -114,23 +110,65 @@ async def handle_settings_update(settings: dict):
"DB_URL": settings["DBUrl"]
}
# Store in user session
cl.user_session.set("env", user_env)
# Initialize RAGLite config
global my_config
my_config = initialize_config(user_env)
cl.user_session.set("env", user_env)
await cl.Message(content="✅ Successfully configured with your API keys!").send()
# Automatically prompt for PDF upload
await cl.AskFileMessage(
# Ask for file upload with proper configuration
files = await cl.AskFileMessage(
content="Please upload one or more PDF documents to begin!",
accept=["application/pdf"],
max_size_mb=20,
timeout=300,
max_files=5
).send()
if files:
success = False
# Process uploaded files
for file in files:
logger.info(f"Starting to process file: {file.name}")
# Create new message for each file
await cl.Message(f"Processing {file.name}...").send()
try:
logger.info(f"Embedding document: {file.path}")
process_document(file_path=file.path)
success = True
await cl.Message(f"✅ Successfully processed: {file.name}").send()
logger.info(f"Successfully processed and embedded: {file.name}")
except Exception as proc_error:
error_msg = f"Failed to process {file.name}: {str(proc_error)}"
logger.error(error_msg)
await cl.Message(f"{error_msg}").send()
continue
if success:
# Send completion message
await cl.Message(
content="✅ Documents are ready! You can now ask questions about them."
).send()
# Store session state
cl.user_session.set("documents_loaded", True)
# Reset the chat interface
await cl.Message(content="Ask your first question:").send()
# Clear any existing message elements
cl.user_session.set("message_elements", [])
else:
await cl.Message(
content="❌ No documents were successfully processed. Please try uploading again."
).send()
except Exception as e:
error_msg = f"❌ Error with provided settings: {str(e)}"
logger.error(error_msg)
@@ -138,39 +176,38 @@ async def handle_settings_update(settings: dict):
@cl.on_chat_start
async def start() -> None:
"""Initialize chat and request API keys."""
try:
logger.info("Chat session started")
cl.user_session.set("chat_history", [])
# Just show the settings form
await cl.ChatSettings(
[
TextInput(
id="OpenAIApiKey",
label="OpenAI API Key",
initial="",
placeholder="Enter your OpenAI API Key (starts with 'sk-')"
),
TextInput(
id="AnthropicApiKey",
label="Anthropic API Key",
initial="",
placeholder="Enter your Anthropic API Key (starts with 'sk-ant-')"
),
TextInput(
id="CohereApiKey",
label="Cohere API Key",
initial="",
placeholder="Enter your Cohere API Key"
),
TextInput(
id="DBUrl",
label="Database URL",
initial="",
placeholder="Enter your Database URL (e.g., postgresql://user:pass@host:port/db)"
),
]
).send()
# Show settings form first
await cl.ChatSettings([
TextInput(
id="OpenAIApiKey",
label="OpenAI API Key",
initial="",
placeholder="Enter your OpenAI API Key (starts with 'sk-')"
),
TextInput(
id="AnthropicApiKey",
label="Anthropic API Key",
initial="",
placeholder="Enter your Anthropic API Key (starts with 'sk-ant-')"
),
TextInput(
id="CohereApiKey",
label="Cohere API Key",
initial="",
placeholder="Enter your Cohere API Key"
),
TextInput(
id="DBUrl",
label="Database URL",
initial="",
placeholder="Enter your Database URL (e.g., postgresql://user:pass@host:port/db)"
),
]).send()
except Exception as e:
logger.error(f"Error in chat start: {str(e)}")
@@ -178,67 +215,73 @@ async def start() -> None:
@cl.on_message
async def message_handler(message: cl.Message) -> None:
"""Handle user queries using RAG."""
try:
msg = cl.Message(content="Thinking...")
# Check if documents are loaded
if not cl.user_session.get("documents_loaded"):
await cl.Message(content="❌ Please upload and process documents first!").send()
return
if not my_config:
await cl.Message(content="❌ Please configure your API keys first!").send()
return
# Create message for streaming
msg = cl.Message(content="")
await msg.send()
query = message.content.strip()
chat_history = cl.user_session.get("chat_history", [])
# Search for relevant chunks using global config
logger.info(f"Processing query: {query}")
# Search for relevant chunks
reranked_chunks = perform_search(query)
if reranked_chunks:
logger.info("Using RAG for response generation")
try:
# Convert chat history to proper format for RAG
formatted_messages = []
for user_msg, assistant_msg in chat_history:
formatted_messages.append({"role": "user", "content": user_msg})
formatted_messages.append({"role": "assistant", "content": assistant_msg})
response_stream = rag(
prompt=query,
system_prompt=RAG_SYSTEM_PROMPT,
search=hybrid_search,
messages=formatted_messages,
max_contexts=5,
config=my_config
)
full_response = ""
for chunk in response_stream:
full_response += chunk
await msg.stream_token(chunk)
await msg.send()
except Exception as e:
logger.error(f"RAG error: {str(e)}")
# If RAG fails, fall back to general Claude
await handle_fallback(query, msg)
return
else:
if not reranked_chunks:
logger.info("No relevant chunks found, falling back to general Claude response")
await handle_fallback(query, msg)
return
# Update chat history
chat_history.append((query, full_response))
cl.user_session.set("chat_history", chat_history)
# Use RAG for response generation
try:
chat_history = cl.user_session.get("chat_history", [])
formatted_messages = []
for user_msg, assistant_msg in chat_history:
formatted_messages.append({"role": "user", "content": user_msg})
formatted_messages.append({"role": "assistant", "content": assistant_msg})
response_stream = rag(
prompt=query,
system_prompt=RAG_SYSTEM_PROMPT,
search=hybrid_search,
messages=formatted_messages,
max_contexts=5,
config=my_config
)
full_response = ""
for chunk in response_stream:
full_response += chunk
await msg.stream_token(chunk)
# Update chat history
chat_history.append((query, full_response))
cl.user_session.set("chat_history", chat_history)
except Exception as e:
logger.error(f"RAG error: {str(e)}")
await handle_fallback(query, msg)
except Exception as e:
error_msg = f"Error processing your question: {str(e)}"
logger.error(error_msg)
await msg.send(content=error_msg) # Use send instead of update
await cl.Message(content=error_msg).send()
async def handle_fallback(query: str, msg: cl.Message) -> None:
"""Handle fallback to Claude when RAG is not available or fails."""
try:
user_env = cl.user_session.get("env")
client = anthropic.Anthropic(api_key=user_env["ANTHROPIC_API_KEY"])
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
@@ -246,19 +289,19 @@ async def handle_fallback(query: str, msg: cl.Message) -> None:
{"role": "user", "content": query}
]
)
full_response = response.content[0].text
await msg.send(content=full_response)
# Update chat history
chat_history = cl.user_session.get("chat_history", [])
chat_history.append((query, full_response))
cl.user_session.set("chat_history", chat_history)
except Exception as e:
error_msg = f"Fallback error: {str(e)}"
logger.error(error_msg)
await msg.send(content=error_msg)
if __name__ == "__main__":
cl.run()
cl.run()