Commit cdc5f8ea authored by Mirza Mohammed Baig's avatar Mirza Mohammed Baig

Refectored the code

parent a9becc62
......@@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="Poetry (reviewsense-ecom)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Poetry (reviewsense-ecom)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (featurepulse-ecom)" project-jdk-type="Python SDK" />
</project>
\ No newline at end of file
......@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Poetry (reviewsense-ecom)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.12 (featurepulse-ecom)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
from langchain_groq import ChatGroq
from functools import lru_cache
from pydantic import SecretStr
from threading import Lock
class ChatGroqFactory:
"""
Factory class for creating and managing a singleton ChatGroq instance.
Utilizes LRU caching for efficiency and ensures thread safety.
"""
_lock: Lock = Lock()
@classmethod
@lru_cache(maxsize=1)
def get_llm(cls, groq_token: SecretStr) -> ChatGroq:
"""
Returns a cached singleton ChatGroq instance.
Args:
groq_token (SecretStr): The Groq API token.
Returns:
ChatGroq: A singleton instance of the ChatGroq LLM.
"""
with cls._lock:
return ChatGroq(
model_name="llama-3.3-70b-versatile",
temperature=1,
groq_api_key=groq_token
)
@classmethod
def clear_cache(cls) -> None:
"""
Clears the cached ChatGroq instance.
"""
cls.get_llm.cache_clear()
from langchain_groq import ChatGroq
from functools import lru_cache
@lru_cache(maxsize=1) # Cache only one instance (singleton behavior)
def get_llm(groq_token: str):
"""
Get a singleton LLM instance for the given Groq token.
Args:
groq_token (str): The Groq API token.
Returns:
ChatGroq: A singleton instance of the ChatGroq LLM.
"""
return ChatGroq(
model_name="llama-3.3-70b-versatile",
temperature=1,
groq_api_key=groq_token # Pass the Groq token here
)
......@@ -2,6 +2,7 @@ from fastapi import FastAPI
from router.routes import router
from fastapi.middleware.cors import CORSMiddleware
def create_app() -> FastAPI:
"""
Create and configure the FastAPI application.
......@@ -34,4 +35,4 @@ app = create_app()
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='127.0.0.1', port=8003)
\ No newline at end of file
uvicorn.run(app, host='127.0.0.1', port=8002)
from dataclasses import dataclass
@dataclass
class FeatureRating:
average_rating: float = 0.0
review_count: int = 0
positive_count: int = 0
negative_count: int = 0
\ No newline at end of file
negative_count: int = 0
from pydantic import BaseModel
from typing import List
class FeatureReviews(BaseModel):
feature_reviews: List[dict]
from pydantic import BaseModel
class FeatureSentiment(BaseModel):
feature_sentence: str
feature_sentiment: str # Fixed the typo from `feature_sentment`
# from dataclasses import dataclass, field
# from datetime import datetime
# from typing import Dict
#
# from src.reviewsense.FeatureRatingModel import FeatureRating
#
#
# @dataclass
# class Product:
# product_id: str
# category: str
# features: Dict[str, FeatureRating]
# ratings_distribution: Dict[int, int]
# overall_rating: float = 0.0
# total_reviews: int = 0
# created_at: datetime = field(default_factory=datetime.utcnow)
# updated_at: datetime = field(default_factory=datetime.utcnow)
#
# @classmethod
# def from_dict(cls, data):
# """Convert a dictionary from MongoDB to a Product instance."""
# return cls(
# product_id=data["product_id"],
# category=data["category"],
# features={k: FeatureRating(**v) for k, v in data["features"].items()},
# ratings_distribution=data["ratings_distribution"],
# overall_rating=data["overall_rating"],
# total_reviews=data["total_reviews"],
# created_at=datetime.fromisoformat(data["created_at"]["$date"].replace("Z", "")),
# updated_at=datetime.fromisoformat(data["updated_at"]["$date"].replace("Z", ""))
# )
#
# def to_dict(self):
# """Convert the data class to a dictionary for MongoDB insertion."""
# return {
# "product_id": self.product_id,
# "category": self.category,
# "features": {k: v.__dict__ for k, v in self.features.items()},
# "ratings_distribution": self.ratings_distribution,
# "overall_rating": self.overall_rating,
# "total_reviews": self.total_reviews,
# "created_at": {"$date": self.created_at.isoformat() + "Z"},
# "updated_at": {"$date": self.updated_at.isoformat() + "Z"}
# }
#
#
#
#
from dataclasses import dataclass, field
from datetime import datetime, UTC
from datetime import datetime, timezone
from typing import Dict
from src.reviewsense_ecom.model.FeatureRating import FeatureRating
from src.reviewsense_ecom.model.FeatureRatingModel import FeatureRating
@dataclass
......@@ -15,8 +63,8 @@ class Product:
total_reviews: int = 0
# created_at: datetime = field(default_factory=datetime.utcnow)
# updated_at: datetime = field(default_factory=datetime.utcnow)
created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) # Use timezone-aware UTC datetime
updated_at: datetime = field(default_factory=lambda: datetime.now(UTC))
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) # Use timezone-aware UTC datetime
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
@classmethod
def from_dict(cls, data):
......@@ -58,4 +106,6 @@ class Product:
"total_reviews": self.total_reviews,
"created_at": {"$date": self.created_at.isoformat() + "Z"} if self.created_at else None,
"updated_at": {"$date": self.updated_at.isoformat() + "Z"} if self.updated_at else None
}
\ No newline at end of file
}
from pydantic import BaseModel
from typing import Dict
from datetime import date, datetime
from dataclasses import dataclass, field
from typing import List, Dict, Optional,Any
from bson import ObjectId
from pydantic import Field
from src.reviewsense_ecom.model.FeatureSentiment import FeatureSentiment
@dataclass
class FeatureSentiment:
positive: List[str] = field(default_factory=list)
negative: List[str] = field(default_factory=list)
class ProductReview(BaseModel):
product_id: str
review_id: str
review: str
product_rating: float
review_date: datetime
features_analysis: Dict[str, FeatureSentiment] # Ensure it's Dict[str, FeatureSentiment]
@dataclass
class ProductReview:
#id: Optional[ObjectId] = None
product_id: str = ""
review: Optional[str] = Field(default=None, max_length=5000, description="Review text")
rating: float = 0.0
# features: List[Dict[str, FeatureSentiment]] = field(default_factory=list)
features: List[Dict[str, Dict[str, Any]]] = field(default_factory=list)
\ No newline at end of file
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from bson import ObjectId
@dataclass
class FeatureSentiment:
positive: List[str] = field(default_factory=list)
negative: List[str] = field(default_factory=list)
@dataclass
class ProductReview:
id: Optional[ObjectId] = None
product_id: str = ""
review: str = ""
rating: float = 0.0
features: List[Dict[str, FeatureSentiment]] = field(default_factory=list)
\ No newline at end of file
from pydantic import BaseModel, Field
from typing import List, Optional
class ProductReviewInput(BaseModel):
"""Input model for product review processing"""
product_id: str = Field(..., description="Unique identifier for the product")
features: List[str] = Field(..., description="Features to analyze")
new_review: Optional[str] = Field(default=None, description="Optional new review to add")
is_rating_evaluation_required: bool = Field(
default=True,
description="Flag to determine if rating evaluation is needed"
)
new_rating: int = Field(..., description="Rating for the product")
class Config:
"""Pydantic model configuration"""
json_schema_extra = {
"example": {
"product_id": "",
"features": ["camera", "battery", "display", "design"],
"new_review": "",
"is_rating_evaluation_required": True,
"new_rating": 0,
}
}
from pymongo import MongoClient
from threading import Lock
from functools import lru_cache
class MongoClientFactory:
"""
Factory class for creating and managing a MongoDB client instance.
Implements the Singleton pattern with LRU caching to ensure a single shared client.
"""
_lock: Lock = Lock()
@classmethod
@lru_cache(maxsize=1)
def get_client(cls, uri: str = "mongodb://localhost:27017/") -> MongoClient:
"""
Returns a cached singleton MongoClient instance.
Args:
uri (str): MongoDB connection URI. Defaults to localhost.
Returns:
MongoClient: The MongoDB client instance.
"""
with cls._lock:
return MongoClient(uri)
@classmethod
def close_client(cls) -> None:
"""
Closes the MongoDB client connection and clears the cache.
"""
client = cls.get_client.cache_info().hits and cls.get_client()
if client:
client.close()
cls.get_client.cache_clear()
import urllib.parse
from pymongo import MongoClient
from datetime import datetime, timezone
from src.reviewsense_ecom.model.ProductReview import ProductReview
from src.reviewsense_ecom.model.Product import Product
from src.reviewsense_ecom.model.productReviewsModel import FeatureSentiment
def get_db_N_connection(collection_name: str): #LOCAL DB
client = MongoClient("mongodb://localhost:27017/") # Update with your MongoDB connection
db = client["productRater"] # Change to your database name
return db[collection_name] # db["productFeatureRater"] # Change to your collection name
def get_db_connection(collection_name: str): #get_db_cloud_connection
password="nisum@123"
escaped_password = urllib.parse.quote_plus(password)
client = MongoClient(f"mongodb+srv://genai:{escaped_password}@productrater.hpbsn.mongodb.net/?retryWrites=true&w=majority&appName=productRater") # Update with your MongoDB connection
db = client["productRater"] # Change to your database name
return db[collection_name]
def insert_product(product_id): # Use this method to insert if null
collection = get_db_connection("productFeatureRater")
new_product = {
"product_id": product_id,
"category": "Electronics", # Default category (can be updated later)
"features": {}, # Empty features dictionary, to be filled later
"ratings_distribution": {str(i): 0 for i in range(1, 6)}, # Rating count (1-5 stars)
"overall_rating": 0.0, # Default overall rating
"total_reviews": 0, # No reviews initially
"created_at": datetime.now(timezone.utc), # issue could be
"updated_at": datetime.now(timezone.utc),
# "review_ratings": {}, # Placeholder for review-specific ratings
# "ratings_summary": {} # Placeholder for summarized rating data
}
collection.insert_one(new_product)
print("Product inserted successfully.")
def update_product(product_id, update_data):
print(f"Product ID: {product_id}")
print(f"Update Data: {update_data}")
collection = get_db_connection("productFeatureRater")
# Set the updated timestamp
update_data["updated_at"] = datetime.now(timezone.utc)
# Ensure required fields are included in the update data if missing
update_data.setdefault("review_ratings", {})
update_data.setdefault("ratings_summary", {})
print(f"Updated Data with Timestamp: {update_data}")
try:
result = collection.update_one(
{"product_id": product_id},
{"$set": update_data},
upsert=True # 🔥 This will create a new product if none exists
)
except Exception as e:
print(f"Error updating product: {e}")
return None
if result.matched_count > 0:
print("Product updated successfully.")
elif result.upserted_id:
print(f"New product created with ID: {result.upserted_id}")
return update_data
def get_product_by_id(product_id):
collection = get_db_connection("productFeatureRater")
product_data = collection.find_one({"product_id": product_id})
if product_data:
product = Product.from_dict(product_data)
print("Product found:", product)
return product
else:
print("No product found with the given ID.")
return None
# add New Reviews to MongoDB
def add_review(input_data, reviews_by_feature):
collection = get_db_connection("productReviews")
# now we need to set the feature sentance according to sentiment in the feature list
new_review = ProductReview(
product_id=input_data.product_id,
review=input_data.new_review,
rating=input_data.new_rating,
# feature=reviews_by_feature.feature
)
collection.insert_one(new_review.__dict__)
# adding New Reviews to productReview Collection
def add_review_features(input_data, reviews_by_feature):
collection = get_db_connection("productReviews")
# Create a dictionary to store features with their corresponding sentiments
feature_sentiments = {}
for review in reviews_by_feature:
feature = review["feature"]
sentiment = review["sentiment"]
sentence = review["sentence"]
# Initialize the feature if not already present
if feature not in feature_sentiments:
feature_sentiments[feature] = FeatureSentiment()
# Append sentences to the correct sentiment list
if sentiment == "positive":
feature_sentiments[feature].positive.append(sentence)
elif sentiment == "negative":
feature_sentiments[feature].negative.append(sentence)
# Convert FeatureSentiment objects to dictionaries before storing
features_list = [{key: value.__dict__} for key, value in feature_sentiments.items()]
# Create the ProductReview object
new_review = ProductReview(
product_id=input_data.product_id,
review=input_data.new_review,
rating=input_data.new_rating,
features=features_list
)
# Convert the ProductReview instance to a dictionary before inserting
inserted_result = collection.insert_one(new_review.__dict__)
if inserted_result.inserted_id:
print(f"Review successfully added with ID: {inserted_result.inserted_id}")
# Return the newly inserted review object
return new_review.__dict__
def product_feature(product_id):
collection = get_db_connection("productFeatures")
product = collection.find_one({"product_id": product_id}, {"_id": 0, "feature": 1})
if product and "feature" in product:
return product["feature"]
return []
from fastapi import APIRouter, HTTPException
import logging
from typing import Dict, Optional, List
from pyexpat import features
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import logging
from src.reviewsense_ecom.model.ProductReview import ProductReview
from src.reviewsense_ecom.service.FeatureUpdater import FeatureUpdater
from src.reviewsense_ecom.model.Product import Product
from src.reviewsense_ecom.mongo.MongoClientFactory import MongoClientFactory
from src.reviewsense_ecom.service.ProductDefinitionService import ProductDefinitionService
from src.reviewsense_ecom.service.ReviewAnalysisService import ReviewAnalysisService
from src.reviewsense_ecom.service.StoreReviewAnalysis import add_review_features
from src.reviewsense_ecom.service.ReviewService import ReviewService
from src.reviewsense_ecom.service.feature_extractor import FeatureExtractor
from src.reviewsense_ecom.mongo.mongo_db_config import get_product_by_id, update_product, add_review_features, insert_product,product_feature
from src.reviewsense_ecom.model.product_review_input import ProductReviewInput
router = APIRouter()
# review_service = ReviewService()
# feature_extractor = FeatureExtractor()
review_analysis_service = ReviewAnalysisService()
client = MongoClientFactory.get_client()
product_definition_service = ProductDefinitionService(client)
review_service = ReviewService()
feature_extractor = FeatureExtractor()
logger = logging.getLogger(__name__)
......@@ -35,10 +29,118 @@ class ReviewResponse(BaseModel):
review_ratings: Dict[str, Optional[float]]
ratings_summary: RatingSummary
class PercentageResponseModel(BaseModel):
positive_percentage: str
negative_percentage: str
average_rating: float
class ReviewResponseModel(BaseModel):
reviews: List[str]
@router.post("/calculate_ratings/", response_model=Product)
async def calculate_ratings(input_data: ProductReview):
categories = product_definition_service.get_product_categories(input_data.product_id)
async def calculate_ratings(input_data: ProductReviewInput):
"""
Endpoint to calculate feature-specific ratings
Args:
input_data (ProductReviewInput): Input data for review processing
Returns:
Updated Product object
"""
logger.info(f"Received request: {input_data.dict()}")
logger.info(f"InputData: {input_data.new_review}")
product_data = get_product_by_id(input_data.product_id)
# feature=get_db_connection("productFeature")
# feature.fin by Product ID ()
if product_data is None:
logger.warning(f"Product with ID {input_data.product_id} not found. Creating a new product.")
# Insert a new product into the database
insert_product(input_data.product_id)
# Re-fetch the product after insertion to ensure it exists
product_data = get_product_by_id(input_data.product_id)
if product_data is None: # Double-check if insertion failed
logger.error(f"Failed to insert product with ID {input_data.product_id}.")
raise HTTPException(status_code=500, detail="Product creation failed")
logger.info(f"Product Data: {product_data}")
final_product = product_data
if input_data.is_rating_evaluation_required :
final_product = await update_product_data(final_product, input_data, product_data)
logger.info(f"Final Return to Mongo: {final_product}")
return final_product
@router.get("/fetch_percentage/", response_model=PercentageResponseModel)
async def fetch_percentage(product_id: str, features: str):
"""
Fetch reviews from MongoDB based on product_id and feature.
"""
logger.info(f"Fetching reviews for product_id: {product_id}, feature: {features}")
reviews = review_service.get_percentage_by_feature(product_id, features)
if isinstance(reviews, dict):
return reviews # Return the dictionary directly
return {"message": "No data found"}
@router.get("/fetch_reviews/", response_model=ReviewResponseModel)
async def fetch_review_by_feature(product_id: str, features: str):
"""
Fetch reviews from MongoDB based on product_id and feature.
"""
logger.info(f"Fetching reviews for product_id: {product_id}, feature: {features}")
reviews = review_service.get_review_by_feature(product_id, features)
if isinstance(reviews, dict):
return reviews
return {"reviews": reviews}
@router.get("/fetch_full_reviews/", response_model=ReviewResponseModel)
async def fetch_review_by_feature(product_id: str, features: str):
"""
Fetch reviews from MongoDB based on product_id and feature.
"""
logger.info(f"Fetching reviews for product_id: {product_id}, feature: {features}")
reviews = review_service.get_full_review_by_feature(product_id, features)
if isinstance(reviews, dict):
return reviews
return {"reviews": reviews}
async def update_product_data(final_product, input_data, product_data):
# ✅ Second Flow: Update Feature Ratings in Another Collection FROM LLM
features=product_feature(input_data.product_id)
reviews_by_feature = feature_extractor.extract_feature_reviews(
input_data.new_review, features)
logger.info(f"REVIEW BY LLM --->{reviews_by_feature}")
ratings = review_service.fetch_feature_ratings(reviews_by_feature)
logger.info(f"Generated ratings: {ratings}")
# ✅ First Flow: Add Review to a Separate Collection TO REVIEWS
new_review_data = add_review_features(input_data, reviews_by_feature)
if new_review_data:
logger.info(f"Successfully added review data: {new_review_data}")
else:
logger.warning("add_review() returned None. Review was not added.")
updater = FeatureUpdater(product_data.to_dict())
updated_product = updater.update_feature_ratings(ratings, input_data.new_rating)
# Struck after this since this function is not creating new onject in feature
logger.info(f"Updated Product Data: {updated_product}")
# Save updated product back to the database
updated_product_data = update_product(input_data.product_id, updated_product)
if updated_product_data:
final_product = Product.from_dict(updated_product_data)
logger.info(f"Successfully updated product: {final_product}")
else:
logger.error("update_product() returned None. Update failed.")
raise HTTPException(status_code=500, detail="Failed to update product")
return final_product
feature_reviews_analysis = review_analysis_service.generate_feature_reviews_analysis(input_data.review, categories)
add_review_features(input_data, feature_reviews_analysis)
\ No newline at end of file
from typing import Dict, Optional
class FeatureUpdater:
def __init__(self, product_data: Optional[Dict] = None):
if product_data is None:
product_data = {}
self.product_data = product_data
self.product_data.setdefault("total_reviews", 0)
self.product_data.setdefault("features", {})
self.product_data.setdefault("ratings_distribution", {str(i): 0 for i in range(1, 6)})
self.product_data.setdefault("overall_rating", 0.0)
def update_feature_ratings(self, feature_data: Dict[str, Dict[str, Optional[float]]],
new_overall_rating: int) -> Dict:
features = self.product_data.get("features", {})
ratings_distribution = self.product_data.get("ratings_distribution", {str(i): 0 for i in range(1, 6)})
existing_total_reviews = self.product_data.get("total_reviews", 0)
current_overall_rating = self.product_data.get("overall_rating", 0.0)
for feature, new_data in feature_data.items():
if feature not in features:
features[feature] = {
"average_rating": 0.0,
"review_count": 0,
"positive_count": 0,
"negative_count": 0
}
current_feature = features[feature]
existing_review_count = current_feature["review_count"]
new_rating = new_data.get("rating", 0) or 0
new_positive = new_data.get("positive_count", 0) or 0
new_negative = new_data.get("negative_count", 0) or 0
new_average_rating = (
(current_feature["average_rating"] * existing_review_count) + new_rating
) / (existing_review_count + 1) if existing_review_count > 0 else new_rating
features[feature]["average_rating"] = round(new_average_rating, 1)
features[feature]["review_count"] += 1
features[feature]["positive_count"] += new_positive
features[feature]["negative_count"] += new_negative
ratings_distribution[str(new_overall_rating)] += 1
updated_total_reviews = existing_total_reviews + 1
new_overall_rating = (
(current_overall_rating * existing_total_reviews) + new_overall_rating
) / updated_total_reviews if existing_total_reviews > 0 else new_overall_rating
self.product_data["features"] = features
self.product_data["ratings_distribution"] = ratings_distribution
self.product_data["overall_rating"] = round(new_overall_rating, 1)
self.product_data["total_reviews"] = updated_total_reviews
return self.product_data
from pymongo.collection import Collection
from typing import Dict, List, Optional
from pymongo import MongoClient
class ProductDefinitionService:
"""
Service class to fetch dynamic product categories from the productDefinition collection.
"""
def __init__(self, client: MongoClient, db_name: str = "productRater", collection_name: str = "productDefinition"):
"""
Initializes the service with the MongoDB client.
Args:
client (MongoClient): MongoDB client instance.
db_name (str): Name of the database.
collection_name (str): Name of the collection.
"""
self.collection: Collection = client[db_name][collection_name]
def get_product_categories(self, product_id: str) -> Optional[List[str]]:
"""
Fetches all dynamic product categories under the 'category' field.
Returns:
Optional[Dict[str, List[str]]]: A dictionary of categories and their corresponding product lists.
"""
document = self.collection.find_one({
'product_id': product_id
}, {
'feature_ratings': 1,
'_id': 0
})
return document.get('feature_ratings') if document else None
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from typing import List, Dict
from pydantic import SecretStr
from src.reviewsense_ecom.llm.chatGroq import ChatGroqFactory
from src.reviewsense_ecom.model.FeatureReviews import FeatureReviews
import re
def _create_reviews_parser() -> JsonOutputParser:
"""Create JSON parser for feature-specific reviews extraction"""
return JsonOutputParser()
def _create_extraction_prompt() -> ChatPromptTemplate:
"""Create prompt for extracting feature-specific reviews with enhanced rules and sentiment analysis."""
template = """Extract sentences about the given feature from the list of reviews.
Rules:
- Extract only parts discussing the specific feature.
- Remove unrelated parts connected by 'and' or 'but'.
- Keep original wording and capitalization.
- If there is only one review, apply the same rules to extract sentences about the feature.
Reviews: {reviews}
Feature: {feature}
Return only the parts discussing the specific feature and perform sentiment analysis for each extracted sentence in this JSON format:
{{
"feature_reviews": [
{{
"feature" : {feature}
"sentence": "relevant sentence 1",
"sentiment": "positive/negative/neutral"
}},
{{
"feature" : {feature}
"sentence": "relevant sentence 2",
"sentiment": "positive/negative/neutral"
}}
]
}}
"""
return ChatPromptTemplate.from_template(template)
class ReviewAnalysisService:
def __init__(self):
token: SecretStr[str] = SecretStr("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")
self.llm = ChatGroqFactory.get_llm(token)
self.parser = _create_reviews_parser()
self.prompt = _create_extraction_prompt()
def generate_feature_reviews_analysis(self, review: str, features: List[str]) -> List[Dict[str, str]]:
"""
Extract feature-specific sentences from reviews with sentiment analysis.
Args:
review (str): Review text.
features (List[str]): List of features to extract.
Returns:
List[Dict[str, str]]: Feature-specific sentences with sentiment analysis.
"""
try:
extracted_reviews = []
sentences = re.split(r'(?<=[.!?])\s+', review) # Split review into sentences
for feature in features:
feature_sentences = [s for s in sentences if feature.lower() in s.lower()]
for sentence in feature_sentences:
result = self.prompt | self.llm | self.parser
response = result.invoke({
"reviews": sentence,
"feature": feature
})
parsed_data = FeatureReviews(**response) # Validate and parse result
extracted_reviews.extend(parsed_data.feature_reviews)
return extracted_reviews
except Exception as e:
print(f"Error extracting feature reviews: {e}")
return []
# router/review_service.py
#
# from typing import List, Dict, Optional
#
# from src.reviewsense.mongo_db_config import get_db_connection
# from src.reviewsense.ratings_fetcher import RatingsFetcher
# from src.reviewsense.review_fetcher import ReviewFetcher
# from src.reviewsense.review_rater import ReviewRater
# from src.reviewsense.review_adder import ReviewAdder
# class ReviewService:
# """Service class for handling product reviews"""
#
# def __init__(self, collection_name: str = "android_phone_reviews",
# embedding_model: str = "sentence-transformers/all-mpnet-base-v2"):
# """
# Initialize the review service with vector database and embeddings
# """
# self.fetcher = ReviewFetcher()
# self.rater = ReviewRater()
# self.adder = ReviewAdder()
# self.ratingsFetcher = RatingsFetcher()
#
# def fetch_reviews(self, product_id: str, features: List[str], threshold: float = 0.5) -> Dict[str, List[str]]:
# return self.fetcher.fetch_reviews(product_id, features, threshold)
#
# def fetch_feature_ratings(self, reviews_by_feature: List[Dict[str, str]]) -> Dict[str, Dict[str, Optional[float]]]:
# return self.rater.generate_feature_ratings(reviews_by_feature)
#
# def add_review(self, product_id: str, review: str, rating: int) -> str:
# return self.adder.add_review(product_id, review, rating)
#
# def fetch_ratings(self, product_id):
# return self.ratingsFetcher.fetch_ratings(product_id=product_id)
from typing import List, Dict, Optional
from src.reviewsense_ecom.mongo.mongo_db_config import get_db_connection
from src.reviewsense_ecom.service.ratings_fetcher import RatingsFetcher
from src.reviewsense_ecom.service.review_fetcher import ReviewFetcher
from src.reviewsense_ecom.service.review_rater import ReviewRater
from src.reviewsense_ecom.service.review_adder import ReviewAdder
class ReviewService:
"""Service class for handling product reviews"""
def __init__(self, embedding_model="sentence-transformers/all-mpnet-base-v2"):
"""
Initialize the review service with MongoDB connection.
"""
# Vector-based processing components
self.embedding_model = embedding_model
self.fetcher = ReviewFetcher()
self.rater = ReviewRater()
self.adder = ReviewAdder()
self.ratingsFetcher = RatingsFetcher()
def _get_collection(self, collection_name):
"""Returns the requested MongoDB collection."""
return get_db_connection(collection_name)
# def get_review_by_feature(self, product_id, feature, collection_name="productReviews"):
# """
# Fetch reviews from MongoDB based on `product_id` and `feature`
# """
# collection = self._get_collection(collection_name)
#
# query = {"product_id": str(product_id)}
# if feature.lower() == "overall":
# projection = {"_id": 0, "review": 1}
# else:
# query[f"features.{feature}"] = {"$exists": True}
# projection = {
# "_id": 0,
# f"features.{feature}.positive": 1,
# f"features.{feature}.negative": 1
# }
#
# reviews = collection.find(query, projection)
#
# result = []
# for review in reviews:
# if feature.lower() == "overall":
# result.append(review.get("review", ""))
# else:
# feature_data = review.get("features", {}).get(feature, {})
# result.extend(feature_data.get("positive", []))
# result.extend(feature_data.get("negative", []))
#
# print(f"result of Afreedi API ----->>> {result}")
# return result
#WORKING
def get_review_by_feature(self, product_id, feature):
"""
Fetch reviews from MongoDB based on `product_id` and `feature`
"""
collection = self._get_collection("productReviews")
query = {"product_id": str(product_id)}
projection = {"_id": 0, "review": 1, "features": 1}
reviews = collection.find(query, projection)
p=list(collection.find(query, projection))
result = []
for review in reviews:
if feature.lower() == "overall":
result.append(review.get("review", ""))
else:
features = review.get("features", [])
for feature_dict in features:
if feature in feature_dict:
feature_data = feature_dict[feature]
result.extend(feature_data.get("positive", []))
result.extend(feature_data.get("negative", []))
logger.info(f"results------->>>>> {result}")
return result
def get_full_review_by_feature(self, product_id, feature):
"""
Fetch reviews from MongoDB based on product_id and feature.
If the feature is present, return the 'review' field.
"""
collection = self._get_collection("productReviews")
query = {"product_id": str(product_id)}
projection = {"_id": 0, "review": 1, "features": 1}
reviews = collection.find(query, projection)
result = []
for review in reviews:
if feature.lower() == "overall":
result.append(review.get("review", ""))
else:
features = review.get("features", [])
for feature_dict in features:
if feature in feature_dict:
result.append(review.get("review", ""))
break
return result
def get_percentage_by_feature(self, product_id, feature, collection_name="productFeatureRater"):
"""
Retrieves positive and negative review counts along with average rating for a given product feature.
"""
collection = self._get_collection(collection_name)
query = {
"product_id": str(product_id),
f"features.{feature}": {"$exists": True}
}
projection = {
"_id": 0,
f"features.{feature}.positive_count": 1,
f"features.{feature}.negative_count": 1,
f"features.{feature}.average_rating": 1
}
document = collection.find_one(query, projection)
if not document:
return {"message": "No reviews found for this product and feature."}
feature_data = document.get("features", {}).get(feature, {})
positive_count = feature_data.get("positive_count", 0)
negative_count = feature_data.get("negative_count", 0)
average_rating = feature_data.get("average_rating", None)
sum_reviews = positive_count + negative_count
positive_percentage = float((positive_count / sum_reviews) * 100) if sum_reviews else 0
negative_percentage = float((negative_count / sum_reviews) * 100) if sum_reviews else 0
return {
"positive_percentage": f"{positive_percentage}%",
"negative_percentage": f"{negative_percentage}%",
"average_rating": average_rating
}
# 🔹 Fetch reviews using vector-based method
def fetch_reviews(self, product_id: str, features: List[str], threshold: float = 0.5) -> Dict[str, List[str]]:
return self.fetcher.fetch_reviews(product_id, features, threshold)
# 🔹 Generate feature-specific ratings
def generate_feature_ratings(self, reviews_by_feature: Dict[str, List[str]]) -> Dict[str, Optional[float]]:
return self.rater.generate_feature_ratings(reviews_by_feature)
def fetch_feature_ratings(self, reviews_by_feature: List[Dict[str, str]]) -> Dict[str, Dict[str, Optional[float]]]:
return self.rater.generate_feature_ratings(reviews_by_feature)
# 🔹 Add a new review
def add_review(self, product_id: str, review: str, rating: int) -> str:
return self.adder.add_review(product_id, review, rating)
# 🔹 Fetch product ratings
def fetch_ratings(self, product_id: str):
return self.ratingsFetcher.fetch_ratings(product_id=product_id)
import uuid
from typing import List, Dict
from datetime import date, datetime
from src.reviewsense_ecom.model.FeatureSentiment import FeatureSentiment
from src.reviewsense_ecom.model.ProductReview import ProductReview
from src.reviewsense_ecom.mongo.MongoClientFactory import MongoClientFactory
def add_review_features(input_data: ProductReview, reviews_by_feature: List[Dict[str, str]]):
db_name = "productRater"
collection_name = "test"
# Establish MongoDB connection
client = MongoClientFactory.get_client()
collection = client[db_name][collection_name]
# Dictionary to store feature analysis (keeping one FeatureSentiment per feature)
features_analysis: Dict[str, FeatureSentiment] = {}
for review in reviews_by_feature:
feature = review["feature"]
new_sentence = review["sentence"]
new_sentiment = review["sentiment"]
if feature in features_analysis:
# Option 1: Concatenating sentences if feature already exists
existing_sentiment = features_analysis[feature]
existing_sentiment.feature_sentence += f" {new_sentence}" # Merge sentences
else:
# Store new feature sentiment
features_analysis[feature] = FeatureSentiment(
feature_sentence=new_sentence,
feature_sentiment=new_sentiment # Fixed typo from original code
)
# Create a ProductReview object
new_review = ProductReview(
product_id=input_data.product_id,
review_id=f"rev_{uuid.uuid4().hex[:16]}", # Better uniqueness
review=input_data.review,
product_rating=input_data.product_rating,
review_date=datetime.now(),
features_analysis=features_analysis
)
# Insert into the collection
collection.insert_one(new_review.dict()) # More explicit than __dict__
# Close MongoDB connection
client.close()
from pydantic import BaseModel
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from typing import List, Optional, Dict
from src.reviewsense_ecom.llm.llm import get_llm
import re
class FeatureReviews(BaseModel):
feature_reviews: List[dict] # Each review will include a sentence and sentiment
class FeatureExtractor:
def __init__(self):
self.llm = get_llm("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")
self.parser = self._create_reviews_parser()
self.prompt = self._create_extraction_prompt()
def _create_reviews_parser(self) -> JsonOutputParser:
"""Create JSON parser for feature-specific reviews extraction"""
return JsonOutputParser()
def _create_extraction_prompt(self) -> ChatPromptTemplate:
"""Create prompt for extracting feature-specific reviews with enhanced rules and sentiment analysis."""
template = """Extract sentences about the given feature from the list of reviews.
Rules:
- Extract only parts discussing the specific feature.
- Remove unrelated parts connected by 'and' or 'but'.
- Keep original wording and capitalization.
- If there is only one review, apply the same rules to extract sentences about the feature.
Reviews: {reviews}
Feature: {feature}
Return only the parts discussing the specific feature and perform sentiment analysis for each extracted sentence in this JSON format:
{{
"feature_reviews": [
{{
"feature" : {feature}
"sentence": "relevant sentence 1",
"sentiment": "positive/negative/neutral"
}},
{{
"feature" : {feature}
"sentence": "relevant sentence 2",
"sentiment": "positive/negative/neutral"
}}
]
}}
"""
return ChatPromptTemplate.from_template(template)
# def extract_feature_reviews(self, reviews: List[str], feature: str) -> List[dict]:
# """
# Extract feature-specific sentences from reviews with sentiment analysis
#
# Args:
# reviews: List of review texts
# feature: Target feature to extract
#
# Returns:
# List[dict]: Feature-specific sentences with sentiment analysis
# """
# try:
# chain = self.prompt | self.llm | self.parser
# reviews_text = "\n".join(reviews)
#
# result = chain.invoke({
# "reviews": reviews_text,
# "feature": feature
# })
#
# parsed_data = FeatureReviews(**result) # Validate and parse the result
# return parsed_data.feature_reviews
#
# except Exception as e:
# print(f"Error extracting feature reviews: {e}")
# return []
def extract_feature_reviews(self, review: str, features: List[str]) -> List[Dict[str, str]]:
"""
Extract feature-specific sentences from reviews with sentiment analysis.
Args:
review (str): Review text.
features (List[str]): List of features to extract.
Returns:
List[Dict[str, str]]: Feature-specific sentences with sentiment analysis.
"""
try:
extracted_reviews = []
sentences = re.split(r'(?<=[.!?])\s+', review) # Split review into sentences
for feature in features:
feature_sentences = [s for s in sentences if feature.lower() in s.lower()]
for sentence in feature_sentences:
result = self.prompt | self.llm | self.parser
response = result.invoke({
"reviews": sentence,
"feature": feature
})
parsed_data = FeatureReviews(**response) # Validate and parse result
extracted_reviews.extend(parsed_data.feature_reviews)
print(f"Responce from LLM : {extracted_reviews}")
return extracted_reviews
except Exception as e:
print(f"Error extracting feature reviews: {e}")
return []
# services/review_fetcher.py
from typing import Dict
from src.reviewsense_ecom.service.retrieval import get_vector_store
from src.reviewsense_ecom.service.feature_extractor import FeatureExtractor
from typing import (Any)
class RatingsFetcher:
"""Class for fetching reviews from the vector store"""
def __init__(self):
self.vector_store = get_vector_store()
self.feature_extractor = FeatureExtractor()
def fetch_ratings(self, product_id: str) -> Dict[str, Any]:
rating_distribution = {}
filter_criteria = {"title": product_id}
documents = self.vector_store.metadata_search(filter=filter_criteria, n=100)
ratings_count = len(documents)
total_rating = 0
rated_documents = 0
total_reviews = []
for doc in documents:
total_reviews.append(doc.page_content.lstrip("Review: "))
if 'rating' in doc.metadata:
rating = str(doc.metadata['rating'])
rating_distribution[rating] = rating_distribution.get(rating, 0) + 1
total_rating += float(rating)
rated_documents += 1
# Calculate average rating, handling case where there are no ratings
average_rating = round(total_rating / rated_documents, 2) if rated_documents > 0 else 0
return {
"ratings_count": ratings_count,
"average_rating": average_rating,
"rating_distribution": rating_distribution,
"total_reviews": total_reviews
}
# src/reviewsense/core/database.py
from functools import lru_cache
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_astradb import AstraDBVectorStore
@lru_cache(maxsize=1)
def get_vector_store(
collection_name: str = "feature_reviews",
embedding_model: str = "sentence-transformers/all-mpnet-base-v2"
):
"""
Create a singleton vector store instance
Args:
collection_name (str): Database collection name
embedding_model (str): Embedding model to use
Returns:
AstraDBVectorStore: Configured vector store instance
"""
# Initialize embeddings
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
# Create and return vector store
return AstraDBVectorStore(
collection_name=collection_name,
embedding=embeddings,
api_endpoint="https://19b6fcda-3fe8-4585-a5f5-6a464a382426-westus3.apps.astra.datastax.com",
token="AstraCS:UaZmElcIDkRHzUmktguKwrnd:197d3e825510580f5f97cc749b00a94b685d54a5f1d4a755b75440fe2c29329b",
namespace="default_keyspace",
)
# router/review_adder.py
from langchain_community.docstore.document import Document
from src.reviewsense_ecom.service.retrieval import get_vector_store
class ReviewAdder:
"""Class for adding reviews to the vector store"""
def __init__(self):
self.vector_store = get_vector_store()
def add_review(self, product_id: str, review: str, rating: int) -> str:
review_document = Document(page_content=review, metadata={"title": product_id, "rating": rating})
return self.vector_store.add_documents([review_document])[0]
# router/review_fetcher.py
from typing import List, Dict
from src.reviewsense_ecom.service.retrieval import get_vector_store
from src.reviewsense_ecom.service.feature_extractor import FeatureExtractor
class ReviewFetcher:
"""Class for fetching reviews from the vector store"""
def __init__(self):
self.vector_store = get_vector_store()
self.feature_extractor = FeatureExtractor()
def fetch_reviews(self, product_id: str, features: List[str], threshold: float = 0.5) -> Dict[str, List[str]]:
feature_reviews = {}
for feature in features:
filter_criteria = {"title": product_id}
# documents = self.vector_store.similarity_search_with_score_id(query=self.generate_feature_query(feature), k=100, filter=filter_criteria)
documents = self.vector_store.similarity_search_with_score_id(query=feature, k=100, filter=filter_criteria)
if len(documents) != 0:
filtered_reviews = [doc.page_content for doc, score, _ in documents if score > threshold]
if len(filtered_reviews) != 0:
extracted_reviews = self.feature_extractor.extract_feature_reviews(filtered_reviews, feature)
if len(extracted_reviews) != 0:
feature_reviews[feature] = extracted_reviews
return feature_reviews
def generate_feature_query(self, feature: str) -> str:
"""
Generate semantic search query for different product features
Args:
feature (str): Product feature to search
Returns:
str: Semantic search query
"""
feature_queries = {
"battery": "battery performance, battery life, charging speed, power consumption, battery drain, long-lasting battery",
"backup": "data backup, storage capacity, backup options, cloud storage, data protection, file recovery",
"design": "build quality, aesthetic design, material, form factor, ergonomics, look and feel, physical appearance",
"display": "screen quality, display resolution, color accuracy, brightness, viewing angles, screen technology, display performance"
}
# Default to feature name if not found
return feature_queries.get(feature.lower(), feature)
# router/review_rater.py
from collections import defaultdict
from typing import Dict, List, Optional
import numpy as np
from transformers import pipeline
class ReviewRater:
"""Class for generating feature-specific ratings based on sentiment analysis"""
def __init__(self):
self.sentiment_analyzer = pipeline("sentiment-analysis")
# def generate_feature_ratings(self, reviews_by_feature: List[Dict[str, str]]) -> Dict[str, Optional[float]]:
# feature_ratings = {}
#
# for feature, reviews in reviews_by_feature.items():
# if not reviews:
# feature_ratings[feature] = None
# continue
#
# ratings = []
# for review in reviews:
# sentiment = review.get('sentiment')
# if "positive" in sentiment:
# ratings.append(5)
# elif "negative" in sentiment:
# ratings.append(1)
# else:
# ratings.append(3)
#
# feature_ratings[feature] = round(np.mean(ratings), 1) if ratings else None
#
# return feature_ratings
def generate_feature_ratings(self, reviews_by_feature: List[Dict[str, str]]) -> Dict[
str, Dict[str, Optional[float]]]:
"""
Generate ratings and count sentiment occurrences for each feature.
Args:
reviews_by_feature (List[Dict[str, str]]): List of feature-specific reviews with sentiment.
Returns:
Dict[str, Dict[str, Optional[float]]]: Feature-wise ratings and sentiment counts.
"""
feature_data = defaultdict(lambda: {"ratings": [], "positive_count": 0, "negative_count": 0})
for review_data in reviews_by_feature:
print("Processing Review:", review_data) # Debugging line
feature = review_data.get("feature")
sentiment = review_data.get("sentiment")
if not feature or not sentiment:
print("Skipping due to missing feature or sentiment")
continue # Skip invalid entries
if "positive" in sentiment:
score = 5
feature_data[feature]["positive_count"] += 1
elif "negative" in sentiment:
score = 1
feature_data[feature]["negative_count"] += 1
else:
score = 3 # Neutral sentiment
feature_data[feature]["neutral_count"] = feature_data[feature].get("neutral_count", 0) + 1
feature_data[feature]["ratings"].append(score)
# Compute average ratings
for feature, data in feature_data.items():
data["rating"] = round(np.mean(data["ratings"]), 1) if data["ratings"] else None
del data["ratings"] # Remove temporary list after computing rating
print("Final Feature Data:", dict(feature_data)) # Debugging line
return feature_data
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment