NO valves to update #2857

Closed
opened 2025-11-11 15:15:53 -06:00 by GiteaMirror · 0 comments
Owner

Originally created by @tanishachoudhary on GitHub (Nov 28, 2024).

I am integrating my custom rag via pipeline to openwebui.
Connection is successfully verified and pipeline url is: http://host.docker.internal:9099
but when I am uploading my python file it is showing valves to update, and pipelined not detected.
here is my python code:
from typing import List, Union, Generator, Iterator
import os
import asyncio
from pydantic import BaseModel

class Pipeline:
class Valves(BaseModel):
LLAMAINDEX_OLLAMA_BASE_URL: str
LLAMAINDEX_MODEL_NAME: str
LLAMAINDEX_EMBEDDING_MODEL_NAME: str

def __init__(self):
    self.documents = None
    self.index = None
    self.uploaded_files = []
    self.last_company_name = None

    # Initialize valves with environment variables or defaults
    self.valves = self.Valves(
        LLAMAINDEX_OLLAMA_BASE_URL=os.getenv("LLAMAINDEX_OLLAMA_BASE_URL", "http://localhost:11434"),
        LLAMAINDEX_MODEL_NAME=os.getenv("LLAMAINDEX_MODEL_NAME", "llama3.1:8b"),
        LLAMAINDEX_EMBEDDING_MODEL_NAME=os.getenv("LLAMAINDEX_EMBEDDING_MODEL_NAME", "nomic-embed-text"),
    )

def extract_company_name_from_file_path(self, file_path: str) -> str:
    """
    Extract company name from file path.
    Assumes the first directory in the path is the company name.
    """
    # Remove the base data directory and split the path
    relative_path = os.path.relpath(file_path, "/app/backend/data")
    path_parts = relative_path.split(os.path.sep)
    
    # Return the first directory as company name, or 'default'
    return path_parts[0].lower() if path_parts and len(path_parts) > 1 else 'default'

def extract_company_name_from_query(self, query: str) -> str:
    """Extract company name from query based on uploaded files."""
    if not self.uploaded_files:
        return None
        
    query_lower = query.lower()
    
    for file_info in self.uploaded_files:
        company_name = file_info['company'].lower()
        if company_name in query_lower:
            self.last_company_name = company_name
            return company_name
    
    return self.last_company_name

def create_metadata_filter(self, company_name: str):
    """Create a metadata filter function for the specified company."""
    def filter_fn(node):
        return node.metadata.get('company') == company_name
    return filter_fn

async def on_startup(self):
    """Initialize the pipeline with local directory data and embeddings."""
    from llama_index.embeddings.ollama import OllamaEmbedding
    from llama_index.llms.ollama import Ollama
    from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader, Document
    from llama_index.core.node_parser import RecursiveCharacterTextSplitter

    # Configure LlamaIndex settings
    Settings.embed_model = OllamaEmbedding(
        model_name=self.valves.LLAMAINDEX_EMBEDDING_MODEL_NAME,
        base_url=self.valves.LLAMAINDEX_OLLAMA_BASE_URL,
    )
    Settings.llm = Ollama(
        model=self.valves.LLAMAINDEX_MODEL_NAME,
        base_url=self.valves.LLAMAINDEX_OLLAMA_BASE_URL,
    )
    
    # Define the data directory path
    data_directory = "/app/backend/data"
    
    try:
        # Load documents from the local directory
        reader = SimpleDirectoryReader(data_directory, recursive=True)
        raw_docs = reader.load_data()
        
        # Process documents with enhanced metadata
        processed_docs = []
        for doc in raw_docs:
            # Extract company name from file path
            file_path = doc.metadata.get('file_path', '')
            company_name = self.extract_company_name_from_file_path(file_path)
            
            # Create new document with enhanced metadata
            new_doc = Document(
                text=doc.text,
                metadata={
                    **doc.metadata,
                    'company': company_name,
                    'source_file': file_path
                }
            )
            processed_docs.append(new_doc)
            
            # Track uploaded files
            self.uploaded_files.append({
                'name': file_path,
                'company': company_name
            })
        
        # Create index from processed documents
        self.documents = processed_docs
        self.index = VectorStoreIndex.from_documents(processed_docs)
        
        if self.uploaded_files:
            self.last_company_name = self.uploaded_files[0]['company']
            
    except Exception as e:
        print(f"Error loading documents: {e}")
        raise

def get_available_companies(self) -> List[str]:
    """Get list of available companies from documents."""
    return list(set(file['company'] for file in self.uploaded_files))

def pipe(
    self,
    user_message: str,
    model_id: str,
    messages: List[dict],
    body: dict
) -> Union[str, Generator, Iterator]:
    """Process query with company-specific metadata filtering."""
    from llama_index.core.retrievers import VectorIndexRetriever
    from llama_index.core.query_engine import RetrieverQueryEngine
    from llama_index.core.postprocessor import MetadataReplacementPostProcessor

    if not self.index:
        return "Index not initialized"
        
    # Extract company name from query
    company_name = self.extract_company_name_from_query(user_message)
    
    # Create retriever 
    retriever = VectorIndexRetriever(
        index=self.index,
        similarity_top_k=10,
    )
    
    if company_name:
        # Add metadata filtering if company name is found
        retriever = VectorIndexRetriever(
            index=self.index,
            similarity_top_k=10,
            filters=self.create_metadata_filter(company_name)
        )
    
    # Create query engine with streaming
    query_engine = RetrieverQueryEngine.from_args(
        retriever=retriever,
        streaming=True,
        node_postprocessors=[
            MetadataReplacementPostProcessor(
                target_metadata_key="source_file"
            )
        ]
    )
    
    # Generate response
    response = query_engine.query(user_message)
    return response.response_gen

async def on_shutdown(self):
    """Cleanup operations on shutdown."""
    self.documents = None
    self.index = None
Originally created by @tanishachoudhary on GitHub (Nov 28, 2024). I am integrating my custom rag via pipeline to openwebui. Connection is successfully verified and pipeline url is: http://host.docker.internal:9099 but when I am uploading my python file it is showing valves to update, and pipelined not detected. here is my python code: from typing import List, Union, Generator, Iterator import os import asyncio from pydantic import BaseModel class Pipeline: class Valves(BaseModel): LLAMAINDEX_OLLAMA_BASE_URL: str LLAMAINDEX_MODEL_NAME: str LLAMAINDEX_EMBEDDING_MODEL_NAME: str def __init__(self): self.documents = None self.index = None self.uploaded_files = [] self.last_company_name = None # Initialize valves with environment variables or defaults self.valves = self.Valves( LLAMAINDEX_OLLAMA_BASE_URL=os.getenv("LLAMAINDEX_OLLAMA_BASE_URL", "http://localhost:11434"), LLAMAINDEX_MODEL_NAME=os.getenv("LLAMAINDEX_MODEL_NAME", "llama3.1:8b"), LLAMAINDEX_EMBEDDING_MODEL_NAME=os.getenv("LLAMAINDEX_EMBEDDING_MODEL_NAME", "nomic-embed-text"), ) def extract_company_name_from_file_path(self, file_path: str) -> str: """ Extract company name from file path. Assumes the first directory in the path is the company name. """ # Remove the base data directory and split the path relative_path = os.path.relpath(file_path, "/app/backend/data") path_parts = relative_path.split(os.path.sep) # Return the first directory as company name, or 'default' return path_parts[0].lower() if path_parts and len(path_parts) > 1 else 'default' def extract_company_name_from_query(self, query: str) -> str: """Extract company name from query based on uploaded files.""" if not self.uploaded_files: return None query_lower = query.lower() for file_info in self.uploaded_files: company_name = file_info['company'].lower() if company_name in query_lower: self.last_company_name = company_name return company_name return self.last_company_name def create_metadata_filter(self, company_name: str): """Create a metadata filter function for the specified company.""" def filter_fn(node): return node.metadata.get('company') == company_name return filter_fn async def on_startup(self): """Initialize the pipeline with local directory data and embeddings.""" from llama_index.embeddings.ollama import OllamaEmbedding from llama_index.llms.ollama import Ollama from llama_index.core import Settings, VectorStoreIndex, SimpleDirectoryReader, Document from llama_index.core.node_parser import RecursiveCharacterTextSplitter # Configure LlamaIndex settings Settings.embed_model = OllamaEmbedding( model_name=self.valves.LLAMAINDEX_EMBEDDING_MODEL_NAME, base_url=self.valves.LLAMAINDEX_OLLAMA_BASE_URL, ) Settings.llm = Ollama( model=self.valves.LLAMAINDEX_MODEL_NAME, base_url=self.valves.LLAMAINDEX_OLLAMA_BASE_URL, ) # Define the data directory path data_directory = "/app/backend/data" try: # Load documents from the local directory reader = SimpleDirectoryReader(data_directory, recursive=True) raw_docs = reader.load_data() # Process documents with enhanced metadata processed_docs = [] for doc in raw_docs: # Extract company name from file path file_path = doc.metadata.get('file_path', '') company_name = self.extract_company_name_from_file_path(file_path) # Create new document with enhanced metadata new_doc = Document( text=doc.text, metadata={ **doc.metadata, 'company': company_name, 'source_file': file_path } ) processed_docs.append(new_doc) # Track uploaded files self.uploaded_files.append({ 'name': file_path, 'company': company_name }) # Create index from processed documents self.documents = processed_docs self.index = VectorStoreIndex.from_documents(processed_docs) if self.uploaded_files: self.last_company_name = self.uploaded_files[0]['company'] except Exception as e: print(f"Error loading documents: {e}") raise def get_available_companies(self) -> List[str]: """Get list of available companies from documents.""" return list(set(file['company'] for file in self.uploaded_files)) def pipe( self, user_message: str, model_id: str, messages: List[dict], body: dict ) -> Union[str, Generator, Iterator]: """Process query with company-specific metadata filtering.""" from llama_index.core.retrievers import VectorIndexRetriever from llama_index.core.query_engine import RetrieverQueryEngine from llama_index.core.postprocessor import MetadataReplacementPostProcessor if not self.index: return "Index not initialized" # Extract company name from query company_name = self.extract_company_name_from_query(user_message) # Create retriever retriever = VectorIndexRetriever( index=self.index, similarity_top_k=10, ) if company_name: # Add metadata filtering if company name is found retriever = VectorIndexRetriever( index=self.index, similarity_top_k=10, filters=self.create_metadata_filter(company_name) ) # Create query engine with streaming query_engine = RetrieverQueryEngine.from_args( retriever=retriever, streaming=True, node_postprocessors=[ MetadataReplacementPostProcessor( target_metadata_key="source_file" ) ] ) # Generate response response = query_engine.query(user_message) return response.response_gen async def on_shutdown(self): """Cleanup operations on shutdown.""" self.documents = None self.index = None
Sign in to join this conversation.
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: github-starred/open-webui#2857