Commit a9becc62 authored by BRellu's avatar BRellu

reviewsense - init

parents
ASTRA_DB_API_ENDPOINT="https://0c52512b-0c65-4e70-ac47-71edf5244a82-us-east-2.apps.astra.datastax.com"
ASTRA_DB_APPLICATION_TOKEN="AstraCS:EvXpFFafufegdQJvhqlYxmxt:ef86b5996013b12140b69254bd554d7e8e10eb5a7137859b9c432f92a5a3b65c"
ASTRA_DB_NAMESPACE="default_keyspace"
HF_TOKEN="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"
GROQ_API_KEY="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"
# Default ignored files
/shelf/
/workspace.xml
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Poetry (reviewsense-ecom)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Poetry (reviewsense-ecom)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/reviewsense-ecom.iml" filepath="$PROJECT_DIR$/.idea/reviewsense-ecom.iml" />
</modules>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Poetry (reviewsense-ecom)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
[project]
name = "reviewsense-ecom"
version = "0.1.0"
description = ""
authors = [
{name = "BRellu",email = "brellu@nisum.com"}
]
readme = "README.md"
requires-python = ">=3.12,<4.0"
dependencies = [
"fastapi (>=0.115.8,<0.116.0)",
"uvicorn (>=0.34.0,<0.35.0)",
"langchain-community (>=0.3.18,<0.4.0)",
"langchain-core (>=0.3.37,<0.4.0)",
"langchain-astradb (>=0.5.3,<0.6.0)",
"python-dotenv (>=1.0.1,<2.0.0)",
"transformers (>=4.49.0,<5.0.0)",
"numpy (>=1.26.0,<2.0.0)",
"pydantic (>=2.10.6,<3.0.0)",
"langchain-groq (>=0.2.4,<0.3.0)"
]
[tool.poetry]
package-mode = false
packages = [{include = "reviewsense_ecom", from = "src"}]
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
from langchain_groq import ChatGroq
from functools import lru_cache
from pydantic import SecretStr
from threading import Lock
class ChatGroqFactory:
"""
Factory class for creating and managing a singleton ChatGroq instance.
Utilizes LRU caching for efficiency and ensures thread safety.
"""
_lock: Lock = Lock()
@classmethod
@lru_cache(maxsize=1)
def get_llm(cls, groq_token: SecretStr) -> ChatGroq:
"""
Returns a cached singleton ChatGroq instance.
Args:
groq_token (SecretStr): The Groq API token.
Returns:
ChatGroq: A singleton instance of the ChatGroq LLM.
"""
with cls._lock:
return ChatGroq(
model_name="llama-3.3-70b-versatile",
temperature=1,
groq_api_key=groq_token
)
@classmethod
def clear_cache(cls) -> None:
"""
Clears the cached ChatGroq instance.
"""
cls.get_llm.cache_clear()
from fastapi import FastAPI
from router.routes import router
from fastapi.middleware.cors import CORSMiddleware
def create_app() -> FastAPI:
"""
Create and configure the FastAPI application.
Returns:
FastAPI: Configured application instance.
"""
app_instance = FastAPI( # Changed variable name to avoid shadowing
title="ReviewSense",
description="AI-powered Product Review Analysis Platform"
)
# Include routes
app_instance.include_router(router)
# Add CORS middleware
app_instance.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
return app_instance # Return the instance
# Initialize FastAPI app
app = create_app()
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='127.0.0.1', port=8003)
\ No newline at end of file
from dataclasses import dataclass
@dataclass
class FeatureRating:
average_rating: float = 0.0
review_count: int = 0
positive_count: int = 0
negative_count: int = 0
\ No newline at end of file
from pydantic import BaseModel
from typing import List
class FeatureReviews(BaseModel):
feature_reviews: List[dict]
from pydantic import BaseModel
class FeatureSentiment(BaseModel):
feature_sentence: str
feature_sentiment: str # Fixed the typo from `feature_sentment`
from dataclasses import dataclass, field
from datetime import datetime, UTC
from typing import Dict
from src.reviewsense_ecom.model.FeatureRating import FeatureRating
@dataclass
class Product:
product_id: str
category: str
features: Dict[str, FeatureRating]
ratings_distribution: Dict[str, int]
overall_rating: float = 0.0
total_reviews: int = 0
# created_at: datetime = field(default_factory=datetime.utcnow)
# updated_at: datetime = field(default_factory=datetime.utcnow)
created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) # Use timezone-aware UTC datetime
updated_at: datetime = field(default_factory=lambda: datetime.now(UTC))
@classmethod
def from_dict(cls, data):
"""Convert a dictionary from MongoDB to a Product instance."""
# Handling 'created_at' and 'updated_at' safely
created_at = data.get("created_at")
updated_at = data.get("updated_at")
def parse_datetime(value):
if isinstance(value, dict) and "$date" in value:
return datetime.fromisoformat(value["$date"].replace("Z", ""))
elif isinstance(value, str): # If it's a string (ISO format)
return datetime.fromisoformat(value.replace("Z", ""))
return None # Return None if it's missing or invalid
created_at = parse_datetime(created_at)
updated_at = parse_datetime(updated_at)
return cls(
product_id=data.get("product_id", ""),
category=data.get("category", ""),
features={k: FeatureRating(**v) for k, v in data.get("features", {}).items()},
ratings_distribution=data.get("ratings_distribution", {}),
overall_rating=data.get("overall_rating", 0),
total_reviews=data.get("total_reviews", 0),
created_at=created_at,
updated_at=updated_at
)
def to_dict(self):
"""Convert the data class to a dictionary for MongoDB insertion."""
return {
"product_id": self.product_id,
"category": self.category,
"features": {k: v.__dict__ if hasattr(v, '__dict__') else v for k, v in self.features.items()},
"ratings_distribution": self.ratings_distribution,
"overall_rating": self.overall_rating,
"total_reviews": self.total_reviews,
"created_at": {"$date": self.created_at.isoformat() + "Z"} if self.created_at else None,
"updated_at": {"$date": self.updated_at.isoformat() + "Z"} if self.updated_at else None
}
\ No newline at end of file
from pydantic import BaseModel
from typing import Dict
from datetime import date, datetime
from src.reviewsense_ecom.model.FeatureSentiment import FeatureSentiment
class ProductReview(BaseModel):
product_id: str
review_id: str
review: str
product_rating: float
review_date: datetime
features_analysis: Dict[str, FeatureSentiment] # Ensure it's Dict[str, FeatureSentiment]
from pymongo import MongoClient
from threading import Lock
from functools import lru_cache
class MongoClientFactory:
"""
Factory class for creating and managing a MongoDB client instance.
Implements the Singleton pattern with LRU caching to ensure a single shared client.
"""
_lock: Lock = Lock()
@classmethod
@lru_cache(maxsize=1)
def get_client(cls, uri: str = "mongodb://localhost:27017/") -> MongoClient:
"""
Returns a cached singleton MongoClient instance.
Args:
uri (str): MongoDB connection URI. Defaults to localhost.
Returns:
MongoClient: The MongoDB client instance.
"""
with cls._lock:
return MongoClient(uri)
@classmethod
def close_client(cls) -> None:
"""
Closes the MongoDB client connection and clears the cache.
"""
client = cls.get_client.cache_info().hits and cls.get_client()
if client:
client.close()
cls.get_client.cache_clear()
from fastapi import APIRouter, HTTPException
from typing import Dict, Optional, List
from pyexpat import features
from pydantic import BaseModel
import logging
from src.reviewsense_ecom.model.ProductReview import ProductReview
from src.reviewsense_ecom.model.Product import Product
from src.reviewsense_ecom.mongo.MongoClientFactory import MongoClientFactory
from src.reviewsense_ecom.service.ProductDefinitionService import ProductDefinitionService
from src.reviewsense_ecom.service.ReviewAnalysisService import ReviewAnalysisService
from src.reviewsense_ecom.service.StoreReviewAnalysis import add_review_features
router = APIRouter()
# review_service = ReviewService()
# feature_extractor = FeatureExtractor()
review_analysis_service = ReviewAnalysisService()
client = MongoClientFactory.get_client()
product_definition_service = ProductDefinitionService(client)
logger = logging.getLogger(__name__)
class RatingSummary(BaseModel):
ratings_count: int
average_rating: float
rating_distribution: Dict[str, int]
total_reviews: List[str]
class ReviewResponse(BaseModel):
review_ratings: Dict[str, Optional[float]]
ratings_summary: RatingSummary
@router.post("/calculate_ratings/", response_model=Product)
async def calculate_ratings(input_data: ProductReview):
categories = product_definition_service.get_product_categories(input_data.product_id)
feature_reviews_analysis = review_analysis_service.generate_feature_reviews_analysis(input_data.review, categories)
add_review_features(input_data, feature_reviews_analysis)
\ No newline at end of file
from pymongo.collection import Collection
from typing import Dict, List, Optional
from pymongo import MongoClient
class ProductDefinitionService:
"""
Service class to fetch dynamic product categories from the productDefinition collection.
"""
def __init__(self, client: MongoClient, db_name: str = "productRater", collection_name: str = "productDefinition"):
"""
Initializes the service with the MongoDB client.
Args:
client (MongoClient): MongoDB client instance.
db_name (str): Name of the database.
collection_name (str): Name of the collection.
"""
self.collection: Collection = client[db_name][collection_name]
def get_product_categories(self, product_id: str) -> Optional[List[str]]:
"""
Fetches all dynamic product categories under the 'category' field.
Returns:
Optional[Dict[str, List[str]]]: A dictionary of categories and their corresponding product lists.
"""
document = self.collection.find_one({
'product_id': product_id
}, {
'feature_ratings': 1,
'_id': 0
})
return document.get('feature_ratings') if document else None
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from typing import List, Dict
from pydantic import SecretStr
from src.reviewsense_ecom.llm.chatGroq import ChatGroqFactory
from src.reviewsense_ecom.model.FeatureReviews import FeatureReviews
import re
def _create_reviews_parser() -> JsonOutputParser:
"""Create JSON parser for feature-specific reviews extraction"""
return JsonOutputParser()
def _create_extraction_prompt() -> ChatPromptTemplate:
"""Create prompt for extracting feature-specific reviews with enhanced rules and sentiment analysis."""
template = """Extract sentences about the given feature from the list of reviews.
Rules:
- Extract only parts discussing the specific feature.
- Remove unrelated parts connected by 'and' or 'but'.
- Keep original wording and capitalization.
- If there is only one review, apply the same rules to extract sentences about the feature.
Reviews: {reviews}
Feature: {feature}
Return only the parts discussing the specific feature and perform sentiment analysis for each extracted sentence in this JSON format:
{{
"feature_reviews": [
{{
"feature" : {feature}
"sentence": "relevant sentence 1",
"sentiment": "positive/negative/neutral"
}},
{{
"feature" : {feature}
"sentence": "relevant sentence 2",
"sentiment": "positive/negative/neutral"
}}
]
}}
"""
return ChatPromptTemplate.from_template(template)
class ReviewAnalysisService:
def __init__(self):
token: SecretStr[str] = SecretStr("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")
self.llm = ChatGroqFactory.get_llm(token)
self.parser = _create_reviews_parser()
self.prompt = _create_extraction_prompt()
def generate_feature_reviews_analysis(self, review: str, features: List[str]) -> List[Dict[str, str]]:
"""
Extract feature-specific sentences from reviews with sentiment analysis.
Args:
review (str): Review text.
features (List[str]): List of features to extract.
Returns:
List[Dict[str, str]]: Feature-specific sentences with sentiment analysis.
"""
try:
extracted_reviews = []
sentences = re.split(r'(?<=[.!?])\s+', review) # Split review into sentences
for feature in features:
feature_sentences = [s for s in sentences if feature.lower() in s.lower()]
for sentence in feature_sentences:
result = self.prompt | self.llm | self.parser
response = result.invoke({
"reviews": sentence,
"feature": feature
})
parsed_data = FeatureReviews(**response) # Validate and parse result
extracted_reviews.extend(parsed_data.feature_reviews)
return extracted_reviews
except Exception as e:
print(f"Error extracting feature reviews: {e}")
return []
import uuid
from typing import List, Dict
from datetime import date, datetime
from src.reviewsense_ecom.model.FeatureSentiment import FeatureSentiment
from src.reviewsense_ecom.model.ProductReview import ProductReview
from src.reviewsense_ecom.mongo.MongoClientFactory import MongoClientFactory
def add_review_features(input_data: ProductReview, reviews_by_feature: List[Dict[str, str]]):
db_name = "productRater"
collection_name = "test"
# Establish MongoDB connection
client = MongoClientFactory.get_client()
collection = client[db_name][collection_name]
# Dictionary to store feature analysis (keeping one FeatureSentiment per feature)
features_analysis: Dict[str, FeatureSentiment] = {}
for review in reviews_by_feature:
feature = review["feature"]
new_sentence = review["sentence"]
new_sentiment = review["sentiment"]
if feature in features_analysis:
# Option 1: Concatenating sentences if feature already exists
existing_sentiment = features_analysis[feature]
existing_sentiment.feature_sentence += f" {new_sentence}" # Merge sentences
else:
# Store new feature sentiment
features_analysis[feature] = FeatureSentiment(
feature_sentence=new_sentence,
feature_sentiment=new_sentiment # Fixed typo from original code
)
# Create a ProductReview object
new_review = ProductReview(
product_id=input_data.product_id,
review_id=f"rev_{uuid.uuid4().hex[:16]}", # Better uniqueness
review=input_data.review,
product_rating=input_data.product_rating,
review_date=datetime.now(),
features_analysis=features_analysis
)
# Insert into the collection
collection.insert_one(new_review.dict()) # More explicit than __dict__
# Close MongoDB connection
client.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment