Commit 0b88f5f4 authored by BRellu's avatar BRellu

semantic search

parent 33089a85
......@@ -3,4 +3,4 @@ ASTRA_DB_API_ENDPOINT="https://0c52512b-0c65-4e70-ac47-71edf5244a82-us-east-2.ap
ASTRA_DB_APPLICATION_TOKEN="AstraCS:EvXpFFafufegdQJvhqlYxmxt:ef86b5996013b12140b69254bd554d7e8e10eb5a7137859b9c432f92a5a3b65c"
ASTRA_DB_NAMESPACE="default_keyspace"
HF_TOKEN="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"
GROQ_API_KEY="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"
GROQ_API_KEY="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"
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from typing import Dict, Optional
from reviewsense.api.schemas.UserMessage import UserMessage
from reviewsense.api.schemas.product_review_input import ProductReviewInput
from reviewsense.services.BotService import BotService
from reviewsense.services.ReviewService import ReviewService
......@@ -33,3 +36,18 @@ async def calculate_ratings(input_data: ProductReviewInput):
# Generate and return feature ratings
return review_service.generate_feature_ratings(reviews_by_feature)
@router.post("/chat")
async def chat(user_message: UserMessage):
"""
Endpoint to handle user messages and return chatbot responses.
"""
if not user_message.message:
raise HTTPException(status_code=400, detail="Message cannot be empty")
bot_service = BotService()
# Handle a query
response = bot_service.handle_user_query(user_message.message)
# Or use individual components
return {"response": response}
from pydantic import BaseModel, Field
from typing import List, Optional
class UserMessage(BaseModel):
message: str = "suggest me a mobile with okay battery backup"
\ No newline at end of file
from pydantic_settings import BaseSettings
from functools import lru_cache
from pydantic.v1 import BaseSettings
class Settings(BaseSettings):
"""Application configuration settings"""
ASTRA_DB_API_ENDPOINT: str
ASTRA_DB_APPLICATION_TOKEN: str
ASTRA_DB_NAMESPACE: str
ASTRA_DB_API_ENDPOINT: str = "https://0c52512b-0c65-4e70-ac47-71edf5244a82-us-east-2.apps.astra.datastax.com"
ASTRA_DB_APPLICATION_TOKEN: str = "AstraCS:EvXpFFafufegdQJvhqlYxmxt:ef86b5996013b12140b69254bd554d7e8e10eb5a7137859b9c432f92a5a3b65c"
ASTRA_DB_NAMESPACE: str = "default_keyspace"
# Default settings
EMBEDDING_MODEL: str = "sentence-transformers/all-mpnet-base-v2"
......
# src/reviewsense/core/llm.py
from langchain_core.messages import BaseMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_groq import ChatGroq
from functools import lru_cache
from sqlalchemy.testing.plugin.plugin_base import logging
@lru_cache()
def get_llm():
"""Get singleton LLM instance"""
return ChatGroq(
model_name="llama-3.3-70b-versatile",
temperature=1
)
\ No newline at end of file
temperature=1,
groq_api_key="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"
)
def generate_response(
query: str,
context: str,
max_context_length: int = 2000
) -> str:
"""
Generate response using LLM for ecommerce product queries.
Args:
query: User query string
context: Context string containing product details and reviews
max_context_length: Maximum allowed length for context
Returns:
Generated response string
Raises:
ValueError: If input parameters are invalid
Exception: For other processing errors
"""
# Input validation
if not query or not query.strip():
raise ValueError("Query cannot be empty")
if not context or not context.strip():
raise ValueError("Context cannot be empty")
try:
# Preprocess context
context = context.strip()
if len(context) > max_context_length:
context = context[:max_context_length] + "..."
llm_template = """
Your task is to act as an ecommerce product expert assistant.
Analyze the provided product information and reviews carefully.
Guidelines:
- Provide accurate product-specific information based on the context
- Keep responses concise and focused on the query
- Include relevant product features and customer feedback
- If information is not in the context, acknowledge the limitation
- Maintain a helpful and professional tone
CONTEXT:
{context}
QUESTION: {input}
YOUR ANSWER:
"""
# Create and execute the chain
chain = ChatPromptTemplate.from_template(llm_template) | get_llm()
result = chain.invoke({
"context": context,
"input": query
})
# Extract response text based on message type
if isinstance(result, BaseMessage):
response = result.content
elif isinstance(result, str):
response = result
else:
response = str(result)
# Validate response
if not response or not response.strip():
raise ValueError("Empty response received from LLM")
return response.strip()
except Exception as e:
return "I apologize, but I encountered an error while processing your request. Please try again or contact support if the issue persists."
......@@ -32,4 +32,4 @@ app.add_middleware(
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=8000)
uvicorn.run(app, host='0.0.0.0', port=8007)
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple, Any
from langchain_astradb import AstraDBVectorStore
from logging import getLogger
from functools import lru_cache
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from reviewsense.core.llm import get_llm, generate_response
from reviewsense.services.retrieval import get_vector_store
logger = getLogger(__name__)
@dataclass
class ProductContext:
product_name: str
product_reviews: str
class BotServiceError(Exception):
"""Base exception for bot service errors"""
pass
class VectorStoreError(BotServiceError):
"""Exception for vector store related errors"""
pass
class ReviewProcessor:
@staticmethod
def clean_review(review: str) -> str:
"""Clean and format a single review"""
return review.strip() + ('.' if not review.strip().endswith('.') else '')
@staticmethod
def process_reviews(doc_content: str) -> List[str]:
"""Process document content into cleaned review sentences"""
return [
ReviewProcessor.clean_review(sentence)
for sentence in doc_content.split('.')
if sentence.strip()
]
class BotService:
def __init__(self, vector_store: Optional[AstraDBVectorStore] = None):
self.vector_store = vector_store or self._initialize_vector_store()
self.llm = get_llm()
@staticmethod
@lru_cache(maxsize=1)
def _initialize_vector_store() -> AstraDBVectorStore:
"""Initialize and cache vector store connection"""
try:
return get_vector_store()
except Exception as e:
logger.error(f"Failed to initialize vector store: {e}")
raise VectorStoreError(f"Vector store initialization failed: {e}")
def get_product_reviews(
self,
product_id: str,
query: str,
k: int = 1
) -> str:
"""
Retrieve and combine product reviews from the vector store.
Args:
product_id: Product identifier
query: Search query
k: Number of results to retrieve
Returns:
Combined review string
"""
try:
results = self.vector_store.similarity_search_with_score_id(
query=query,
k=k,
filter={"product_name": product_id}
)
all_reviews = []
for doc, _, _ in results:
reviews = ReviewProcessor.process_reviews(doc.page_content)
all_reviews.extend(reviews)
return ' '.join(all_reviews)
except Exception as e:
logger.error(f"Error retrieving product reviews: {e}")
raise VectorStoreError(f"Failed to retrieve reviews: {e}")
def retrieve_context(
self,
query: str,
search_query: str,
k: int = 1
) -> Optional[ProductContext]:
"""
Retrieve context from vector store.
Args:
query: User query
search_query: Search query for vector store
k: Number of results to retrieve
Returns:
ProductContext if found, None otherwise
"""
try:
as_retriever = self.vector_store.as_retriever(search_type="similarity_score_threshold",
search_kwargs={"k": 1, "score_threshold": 0.8}, )
results = as_retriever.invoke(search_query)
if not results:
logger.info("No results found for query")
return None
document = results[0]
product_name = document.metadata.get('product_name')
if not product_name:
logger.warning("Product name missing in document metadata")
return None
return ProductContext(
product_name=product_name,
product_reviews=self.get_product_reviews(query=query, product_id=product_name)
)
except Exception as e:
logger.error(f"Error retrieving context: {e}")
raise VectorStoreError(f"Context retrieval failed: {e}")
def handle_user_query(self, query: str) -> str:
"""
Process user query end-to-end.
Args:
query: User query
Returns:
Generated response
"""
try:
context = self.retrieve_context(query=query, search_query=query)
if not context:
return "I'm sorry, I couldn't find relevant information."
formatted_context = f"""Product Information:
Name: {context.product_name}
Customer Reviews:
{context.product_reviews}"""
return generate_response(
query=query,
context=formatted_context
)
except BotServiceError as e:
logger.error(f"Error handling user query: {e}")
return "I apologize, but I encountered an error processing your request."
# src/reviewsense/core/database.py
from functools import lru_cache
from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_astradb import AstraDBVectorStore
from reviewsense.core.config import get_settings
@lru_cache(maxsize=1)
def get_vector_store(
collection_name: str = "android_phone_reviews",
collection_name: str = "One",
embedding_model: str = "sentence-transformers/all-mpnet-base-v2"
):
"""
......@@ -22,13 +24,13 @@ def get_vector_store(
settings = get_settings()
# Initialize embeddings
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
embeddings = HuggingFaceInferenceAPIEmbeddings(api_key="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", model_name="BAAI/bge-base-en-v1.5")
# Create and return vector store
return AstraDBVectorStore(
collection_name=collection_name,
embedding=embeddings,
api_endpoint=settings.ASTRA_DB_API_ENDPOINT,
token=settings.ASTRA_DB_APPLICATION_TOKEN,
namespace=settings.ASTRA_DB_NAMESPACE,
api_endpoint="https://9706ee0b-b3e8-4ee2-bb26-45e8a0db1586-us-east-2.apps.astra.datastax.com",
token="AstraCS:OEbpcggZbUjOvRiesULZkTnf:9d215d8d32c2ffb09081212208e474f81a0bcdde45e7b683cd67f4a1a936a8bf",
namespace="default_keyspace",
)
\ No newline at end of file
......@@ -18,7 +18,7 @@ class ReviewFetcher:
for feature in features:
filter_criteria = {"title": product_id}
# documents = self.vector_store.similarity_search_with_score_id(query=self.generate_feature_query(feature), k=100, filter=filter_criteria)
documents = self.vector_store.similarity_search_with_score_id(query=feature, k=100, filter=filter_criteria)
documents = self.vector_store.similarity_search()
if len(documents) != 0:
filtered_reviews = [doc.page_content for doc, score, _ in documents if score > threshold]
if len(filtered_reviews) != 0 :
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment