demand forecast ai poc

parents
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from prophet import Prophet
from datetime import datetime
import plotly.graph_objects as go
# Custom styling for Streamlit UI
st.markdown(
"""
<style>
.stApp {background-color: #e6f7e6; padding: 10px;}
.header {text-align: center; padding: 20px; font-family: Arial, sans-serif; background-color: #d4a9e8; color: white; border-radius: 8px;}
.header h1 {margin: 0; color: #003366;}
.header p {color: #36454F; }
.stTextInput > div > div > input {background-color: #d9e9f8; color: #333; border: 1px solid #a9cce8; border-radius: 5px; padding: 8px;}
.stSelectbox > div > div {background-color: #d9e9f8; border: 1px solid #a9cce8; border-radius: 5px;}
.stButton > button {background-color: #6c63ff; color: white; border: none; padding: 10px 20px; border-radius: 8px; cursor: pointer; font-size: 16px;}
.stButton > button:hover {background-color: #5a52d6;}
.footer {text-align: center; font-size: 12px; color: #234; padding: 10px; border-top: 1px solid #ccc; margin-top: 50px;}
</style>
<div class="header">
<h1>Franchise Order Forecaster </h1>
<p>Predicting demand quantities for optimized franchise operations</p>
</div>
""",
unsafe_allow_html=True
)
# Get the current year
current_year = datetime.now().year
# Generate future SEASON options dynamically
future_years = [current_year + i for i in range(10)] # Next 10 years
seasons = ["FALL", "HOLIDAY", "SPRING", "SUMMER"]
season_options = [f"{season}-{year}" for year in future_years for season in seasons]
st.header("Enter Franchise, Season, and SKU Details")
# Input fields (removed YEAR input)
franchise_id = st.text_input("FRANCHISE_ID", "")
season = st.selectbox("SEASON", season_options)
sku = st.text_input("SKU", "")
# Load data from multiple sheets
@st.cache_data # Cache the data loading to speed up subsequent runs
def load_data():
df = pd.read_excel("/Users/rkadiyala/Downloads/PREDICTION_POC_DATA_Updated.xlsx", sheet_name=None) # Load all sheets
# Extract specific sheets from the loaded data
sales_history = df["sales_history"]
sku_details = df["sku_details"]
franchise_partner = df["franchise_partner"]
sales_prediction = df["sales_prediction"]
# Convert SEASON to datetime (extract year from SEASON)
season_to_month = {
'FALL': '09', # Fall corresponds to September
'HOLIDAY': '12', # Holiday corresponds to December
'SPRING': '03', # Spring corresponds to March
'SUMMER': '06' # Summer corresponds to June
}
# Extract YEAR and construct proper datetime 'ds'
sales_history['SEASON'] = sales_history['SEASON'].astype(str)
sales_history['YEAR'] = sales_history['SEASON'].str.extract(r'(\d{4})').astype(int) # Extract year from SEASON
sales_history['MONTH'] = sales_history['SEASON'].str.extract(r'([A-Za-z]+)-')[0].map(season_to_month) # Map season to month
# Create the 'ds' column for Prophet, ensuring it gets the correct month and year
sales_history['ds'] = pd.to_datetime(sales_history['YEAR'].astype(str) + '-' + sales_history['MONTH'] + '-01')
return sales_history, sku_details, franchise_partner, sales_prediction
sales_history, sku_details, franchise_partner, sales_prediction = load_data()
# Interactive and responsive plot function
def plot_sales_history_interactive(df_filtered, selected_season):
# Extend the data to include selected season
future_season = selected_season.split('-')
future_year = int(future_season[1])
future_season_name = future_season[0]
# Extract existing seasons and add the future one
unique_seasons = list(df_filtered['SEASON'].unique())
if selected_season not in unique_seasons:
unique_seasons.append(selected_season)
# Sort seasons in chronological order
sorted_seasons = sorted(unique_seasons, key=lambda x: (int(x.split('-')[1]), x.split('-')[0]))
# Prepare data for plotting
df_filtered['SEASON_DISPLAY'] = df_filtered['SEASON'] # Use season-year format for x-axis
full_plot_df = pd.DataFrame({
'SEASON_DISPLAY': sorted_seasons,
'QUANTITY_ORDERED': [None] * len(sorted_seasons) # Placeholder for missing data
})
# Merge actual data with placeholders
plot_df = pd.merge(full_plot_df, df_filtered[['SEASON_DISPLAY', 'QUANTITY_ORDERED']],
on='SEASON_DISPLAY', how='left', suffixes=('_placeholder', '_actual'))
plot_df['QUANTITY_ORDERED'] = plot_df['QUANTITY_ORDERED_actual'].fillna(plot_df['QUANTITY_ORDERED_placeholder'])
# Create Plotly figure
fig = go.Figure()
fig.add_trace(go.Scatter(
x=plot_df['SEASON_DISPLAY'],
y=plot_df['QUANTITY_ORDERED'],
mode='lines+markers',
line=dict(color='blue', width=2),
marker=dict(size=8),
name='Quantity Ordered'
))
fig.update_layout(
title=f"Sales History for SKU {sku}",
xaxis_title='Season-Year',
yaxis_title='Quantity Ordered',
xaxis=dict(type='category', title_font=dict(size=14), tickangle=45),
yaxis=dict(title_font=dict(size=14)),
template='plotly_white'
)
st.plotly_chart(fig, use_container_width=True)
if st.button('Predict Quantity'):
if not franchise_id or not season:
st.error("Please fill all required fields (FRANCHISE_ID and SEASON).")
else:
try:
# Extract the year from the SEASON input
year = int(season.split('-')[1]) # Get year from selected season (e.g., "FALL-2024" -> 2024)
# Filter data based on provided inputs
df_filtered = sales_history[
(sales_history['FRANCHISE_ID'] == int(franchise_id)) &
(sales_history['SEASON'].str.contains(season.split('-')[0]))
]
# If SKU is provided, filter further
if sku:
df_filtered = df_filtered[df_filtered['SKU'] == int(sku)]
# Check if filtered dataframe has at least 2 non-null rows
if df_filtered[['ds', 'QUANTITY_ORDERED']].dropna().shape[0] < 2:
st.error(f"Not enough valid data for FRANCHISE_ID: {franchise_id}, SKU: {sku or 'ALL'} in {season}.")
elif df_filtered.empty:
st.error(f"No data found for FRANCHISE_ID: {franchise_id}, SKU: {sku or 'ALL'} in {season}.")
else:
# Prepare data for Prophet
model = Prophet()
model.fit(df_filtered[['ds', 'QUANTITY_ORDERED']].rename(columns={'ds': 'ds', 'QUANTITY_ORDERED': 'y'}))
# Create future dataframe
future = model.make_future_dataframe(periods=1, freq='Y') # Predict for next year
# Make prediction
forecast = model.predict(future)
prediction = forecast['yhat'].iloc[-1] # Get the last prediction
# Save predicted quantity to the database (Mock implementation here)
st.success(f"Predicted Quantity for SKU {sku or 'ALL'} in {season}: {round(max(prediction, 0))}")
st.info("Prediction has been saved to the database.")
# Plot sales history graph
graph_title = f"Sales History for SKU {sku}" if sku else f"Sales History for FRANCHISE_ID {franchise_id}"
plot_sales_history_interactive(df_filtered, season)
# Calculate model accuracy
historical_forecast = model.predict(df_filtered[['ds']])
df_filtered['yhat'] = historical_forecast['yhat']
df_filtered['error'] = abs(df_filtered['QUANTITY_ORDERED'] - df_filtered['yhat'])
df_filtered['percentage_error'] = (
df_filtered['error'] / df_filtered[
'QUANTITY_ORDERED'].replace(0, float('nan'))
) * 100 # Avoid division by zero
# Drop rows where percentage error could not be calculated
valid_errors = df_filtered.dropna(subset=['percentage_error'])
# Debugging: Check what data is being used to calculate accuracy
st.write("Debugging: Filtered Data for Accuracy Calculation")
st.write(valid_errors[['ds', 'QUANTITY_ORDERED', 'yhat', 'error', 'percentage_error']])
if not valid_errors.empty:
mape = valid_errors['percentage_error'].mean() # Mean Absolute Percentage Error
accuracy = 100 - mape
st.info(f"Model Accuracy: {round(accuracy, 2)}%")
else:
st.warning("Model Accuracy could not be calculated due to insufficient valid data.")
except ValueError as e:
st.exception(e) # Show detailed error message for debugging
except Exception as e:
st.exception(f"An error occurred: {e}")
# Correlation Heatmap
st.markdown("<hr>", unsafe_allow_html=True) # Horizontal Line to separate heatmap
st.subheader("Sales History Correlation Heatmap")
numeric_df = sales_history.select_dtypes(include=['float64', 'int64'])
corr = numeric_df.corr()
plt.figure(figsize=(8, 6)) # Reduced size for better display in Streamlit
sns.heatmap(corr, cbar=True, square=True, annot=True, fmt='.2f', cmap='coolwarm')
st.pyplot(plt.gcf())
st.markdown('<div class="footer">Powered by GAP AI</div>', unsafe_allow_html=True)
from datetime import datetime
import streamlit as st
import pandas as pd
import plotly.express as px
from sqlalchemy import create_engine
import psycopg2
st.markdown(
"""
<style>
.stApp {
background-color: #e6f7e6;
padding: 10px;
}
.header {
text-align: center;
padding: 10px;
font-family: Arial, sans-serif;
background: linear-gradient(to right, red, orange, yellow, green, blue, indigo, violet); /* Rainbow gradient */
color: white;
border-radius: 8px;
}
.header h1 {
margin: 0;
color: white; /* White text for good contrast */
text-shadow: 1px 1px 2px black; /* Optional: Shadow for readability */
}
.header p {
color: #f0f0f0; /* Light text color for subtitle */
}
<div class="header">
<h1>Franchise Order Forecaster</h1>
<p>Predicting demand quantities for optimized franchise operations</p>
</div>
/* Consistent input styles (number input, text input, and selectbox) */
input[type="number"], input[type="text"], .stSelectbox > div {
background-color: #d9e9f8;
color: #003366; /* Darker text color for better contrast */
border: 1px solid #a9cce8;
border-radius: 5px;
}
/* Ensure dropdown selected value is visible */
.stSelectbox > div > div {
background-color: #d9e9f8 !important;
color: #003366 !important; /* Dark text for dropdown value */
}
/* Target the selected value (specific to Streamlit’s dropdowns) */
.stSelectbox > div > div > div {
color: #003366 !important; /* Dark text for selected option */
}
/* Button styles */
.stButton > button {
background-color: #6c63ff;
color: white;
border: none;
padding: 10px 20px;
border-radius: 8px;
cursor: pointer;
font-size: 16px;
}
.stButton > button:hover {
background-color: #5a52d6;
}
/* Footer styles */
.footer {
text-align: center;
font-size: 12px;
color: #234;
padding: 10px;
border-top: 1px solid #ccc;
margin-top: 50px;
}
</style>
<div class="header">
<h1>Franchise Order Forecaster</h1>
<p>Predicting demand quantities for optimized franchise operations</p>
</div>
""",
unsafe_allow_html=True
)
# Database connection function
def get_database_connection():
try:
connection = psycopg2.connect(
database="postgres",
user="postgres",
password="Welcome@1",
host="localhost",
port="5432"
)
return connection
except Exception as e:
st.error(f"Database connection failed: {str(e)}")
return None
# Function to fetch data from database
def fetch_sales_data(franchise_id, season, sku=None):
conn = get_database_connection()
if conn:
try:
query = """
SELECT franchise_id, season, sku, prediction_quantity
FROM public.sales_prediction
WHERE franchise_id = %s AND season = %s
"""
params = [franchise_id, season]
if sku:
query += " AND sku = %s"
params.append(sku)
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
except Exception as e:
st.error(f"Error fetching data: {str(e)}")
conn.close()
return None
return None
current_year = datetime.now().year
future_years = [current_year + i for i in range(10)] # Next 10 years
seasons = ["FALL", "HOLIDAY", "SPRING", "SUMMER"]
season_options = [f"{season}-{year}" for year in future_years for season in seasons]
franchise_id = st.number_input("FRANCHISE ID", min_value=30101, value=30101)
season = st.selectbox("SEASON", season_options)
sku = st.text_input("SKU (optional)", "")
# Get Data button
if st.button("Get Data"):
if not season:
st.warning("Please enter a season")
# Fetch data
df = fetch_sales_data(franchise_id, season, sku if sku else None)
if df is not None and not df.empty:
# Display the raw data
st.dataframe(df, use_container_width=True)
fig_bar = px.bar(
df,
x='sku',
y='prediction_quantity',
title='Prediction Quantity by SKU',
labels={'prediction_quantity': 'Prediction Quantity', 'sku': 'SKU'}
)
st.plotly_chart(fig_bar)
else:
st.error("No data found for the given parameters")
from datetime import datetime
import streamlit as st
import pandas as pd
import plotly.express as px
from sqlalchemy import create_engine
import psycopg2
# Page configuration
st.set_page_config(page_title="Franchise Data Visualization", layout="wide")
# Database connection function
def get_database_connection():
try:
connection = psycopg2.connect(
database="postgres",
user="postgres",
password="Welcome@1",
host="localhost",
port="5432"
)
return connection
except Exception as e:
st.error(f"Database connection failed: {str(e)}")
return None
# Function to fetch data from database
def fetch_sales_data(franchise_id, season, sku=None):
conn = get_database_connection()
if conn:
try:
query = """
SELECT franchise_id, season, sku, prediction_quantity
FROM public.sales_prediction
WHERE franchise_id = %s AND season = %s
"""
params = [franchise_id, season]
if sku:
query += " AND sku = %s"
params.append(sku)
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
except Exception as e:
st.error(f"Error fetching data: {str(e)}")
conn.close()
return None
return None
current_year = datetime.now().year
future_years = [current_year + i for i in range(10)] # Next 10 years
seasons = ["FALL", "HOLIDAY", "SPRING", "SUMMER"]
season_options = [f"{season}-{year}" for year in future_years for season in seasons]
# Main app
def main():
st.title("Franchise Order Forecaster")
# Create three columns for inputs
col1, col2, col3 = st.columns(3)
with col1:
franchise_id = st.number_input("Enter Franchise ID", min_value=30101, value=30105)
with col2:
# season = st.text_input("Enter Season")
season = st.selectbox("SEASON", season_options)
with col3:
sku = st.text_input("Enter SKU (optional)", "")
# Get Data button
if st.button("Get Data"):
if not season:
st.warning("Please enter a season")
return
# Fetch data
df = fetch_sales_data(franchise_id, season, sku if sku else None)
if df is not None and not df.empty:
# Display the raw data
st.subheader("Raw Data")
st.dataframe(df)
col1, col2 = st.columns(2)
with col1:
# Bar chart for quantity by SKU
fig_bar = px.bar(
df,
x='sku',
y='prediction_quantity',
title='Prediction Quantity by SKU',
labels={'prediction_quantity': 'Prediction Quantity', 'sku': 'SKU'}
)
st.plotly_chart(fig_bar)
with col2:
# Pie chart showing distribution of quantities
fig_pie = px.pie(
df,
values='prediction_quantity',
names='sku',
title='Distribution of Quantities by SKU'
)
st.plotly_chart(fig_pie)
# Summary statistics
st.subheader("Summary Statistics")
st.write(df.describe())
else:
st.error("No data found for the given parameters")
if __name__ == "__main__":
main()
\ No newline at end of file
from datetime import datetime
import streamlit as st
import pandas as pd
import plotly.express as px
from sqlalchemy import create_engine
import psycopg2
st.markdown(
"""
<style>
.stApp {
background-color: #e6f7e6;
padding: 10px;
}
.header {
text-align: center;
padding: 10px;
font-family: Arial, sans-serif;
background: linear-gradient(to right, red, orange, yellow, green, blue, indigo, violet);
color: white;
border-radius: 8px;
}
.header h1 {
margin: 0;
color: white;
text-shadow: 1px 1px 2px black;
}
.header p {
color: #f0f0f0;
}
</style>
<div class="header">
<h1>Franchise Order Forecaster</h1>
<p>Predicting demand quantities for optimized franchise operations</p>
</div>
""",
unsafe_allow_html=True
)
# Database connection function
def get_database_connection():
try:
connection = psycopg2.connect(
database="postgres",
user="postgres",
password="Welcome@1",
host="localhost",
port="5432"
)
return connection
except Exception as e:
st.error(f"Database connection failed: {str(e)}")
return None
# Function to fetch data from database
def fetch_sales_data(franchise_id, season, sku=None):
conn = get_database_connection()
if conn:
try:
query = """
SELECT franchise_id, season, sku, prediction_quantity
FROM public.sales_prediction
WHERE franchise_id = %s AND season = %s
"""
params = [franchise_id, season]
if sku:
query += " AND sku = %s"
params.append(sku)
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
except Exception as e:
st.error(f"Error fetching data: {str(e)}")
conn.close()
return None
return None
current_year = datetime.now().year
future_years = [current_year + i for i in range(10)]
seasons = ["FALL", "HOLIDAY", "SPRING", "SUMMER"]
season_options = [f"{season}-{year}" for year in future_years for season in seasons]
franchise_id = st.number_input("FRANCHISE ID", min_value=30101, value=30101)
season = st.selectbox("SEASON", season_options)
sku = st.text_input("SKU (optional)", "")
# Get Data button
if st.button("Get Data"):
if not season:
st.warning("Please enter a season")
# Fetch data
df = fetch_sales_data(franchise_id, season, sku if sku else None)
if df is not None and not df.empty:
# Display the raw data
st.dataframe(df.style.set_properties(**{'background-color': '#f9f9f9', 'color': '#333333'}), use_container_width=True)
# Bar Chart
fig_bar = px.bar(
df,
x='sku',
y='prediction_quantity',
title='Prediction Quantity by SKU',
labels={'prediction_quantity': 'Prediction Quantity', 'sku': 'SKU'},
)
fig_bar.update_layout(
plot_bgcolor='#e6f7e6',
paper_bgcolor='#e6f7e6',
font=dict(color='#333333')
)
st.plotly_chart(fig_bar)
# Pie Chart (optional example)
fig_pie = px.pie(
df,
names='sku',
values='prediction_quantity',
title='Distribution of Prediction Quantity by SKU'
)
fig_pie.update_layout(
plot_bgcolor='#e6f7e6',
paper_bgcolor='#e6f7e6',
font=dict(color='#333333')
)
st.plotly_chart(fig_pie)
else:
st.error("No data found for the given parameters")
from datetime import datetime
import streamlit as st
import pandas as pd
import plotly.express as px
from sqlalchemy import create_engine
import psycopg2
st.markdown(
"""
<style>
.stApp {
background-color: #e6f7e6;
padding: 10px;
}
.header {
text-align: center;
padding: 10px;
font-family: Arial, sans-serif;
background: linear-gradient(to right, red, orange, yellow, green, blue, indigo, violet);
color: white;
border-radius: 8px;
}
.header h1 {
margin: 0;
color: white;
text-shadow: 1px 1px 2px black;
}
.header p {
color: #f0f0f0;
}
</style>
<div class="header">
<h1>Franchise Order Forecaster</h1>
<p>Predicting demand quantities for optimized franchise operations</p>
</div>
""",
unsafe_allow_html=True
)
# Database connection function
def get_database_connection():
try:
connection = psycopg2.connect(
database="postgres",
user="postgres",
password="Welcome@1",
host="localhost",
port="5432"
)
return connection
except Exception as e:
st.error(f"Database connection failed: {str(e)}")
return None
# Function to fetch data from database
def fetch_sales_data(franchise_id, season, sku=None):
conn = get_database_connection()
if conn:
try:
query = """
SELECT franchise_id, season, sku, prediction_quantity
FROM public.sales_prediction
WHERE franchise_id = %s AND season = %s
"""
params = [franchise_id, season]
if sku:
query += " AND sku = %s"
params.append(sku)
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
except Exception as e:
st.error(f"Error fetching data: {str(e)}")
conn.close()
return None
return None
current_year = datetime.now().year
future_years = [current_year + i for i in range(10)]
seasons = ["FALL", "HOLIDAY", "SPRING", "SUMMER"]
season_options = [f"{season}-{year}" for year in future_years for season in seasons]
franchise_id = st.number_input("FRANCHISE ID", min_value=30101, value=30101)
season = st.selectbox("SEASON", season_options)
sku = st.text_input("SKU (optional)", "")
# Get Data button
if st.button("Get Data"):
if not season:
st.warning("Please enter a season")
# Fetch data
df = fetch_sales_data(franchise_id, season, sku if sku else None)
if df is not None and not df.empty:
# Display the raw data
st.dataframe(df.style.set_properties(**{'background-color': '#f9f9f9', 'color': '#333333'}), use_container_width=True)
# Display charts only if SKU is not entered
if not sku:
# Bar Chart
fig_bar = px.bar(
df,
x='sku',
y='prediction_quantity',
title='Prediction Quantity by SKU',
labels={'prediction_quantity': 'Prediction Quantity', 'sku': 'SKU'},
)
fig_bar.update_layout(
plot_bgcolor='#e6f7e6',
paper_bgcolor='#e6f7e6',
font=dict(color='#333333')
)
st.plotly_chart(fig_bar)
# Pie Chart (optional example)
fig_pie = px.pie(
df,
names='sku',
values='prediction_quantity',
title='Distribution of Prediction Quantity by SKU'
)
fig_pie.update_layout(
plot_bgcolor='#e6f7e6',
paper_bgcolor='#e6f7e6',
font=dict(color='#333333')
)
st.plotly_chart(fig_pie)
else:
st.error("No data found for the given parameters")
from datetime import datetime
import streamlit as st
import pandas as pd
import plotly.express as px
from sqlalchemy import create_engine
import psycopg2
st.markdown(
"""
<style>
.stApp {
background-color: #f5f5f5; /* Neutral light grey for overall background */
padding: 10px;
}
.header {
text-align: center;
padding: 15px;
font-family: Arial, sans-serif;
background: linear-gradient(to right, #4caf50, #8bc34a); /* Green gradient for positivity */
color: white;
border-radius: 8px;
}
.header h1 {
margin: 0;
color: white;
text-shadow: 1px 1px 2px black;
}
.header p {
color: #f0f0f0;
}
.stNumberInput > div, .stSelectbox > div, .stTextInput > div {
background-color: #ffffff; /* White for clean input areas */
color: #333333; /* Dark grey text for contrast */
border: 1px solid #cccccc; /* Subtle border */
border-radius: 5px;
}
.stButton > button {
background-color: #4caf50; /* Green for primary action */
color: white;
border: none;
padding: 10px 20px;
border-radius: 8px;
cursor: pointer;
font-size: 16px;
}
.stButton > button:hover {
background-color: #388e3c; /* Slightly darker green for hover */
}
.stDataframe {
border: 1px solid #e0e0e0;
border-radius: 5px;
}
</style>
<div class="header">
<h1>Franchise Order Forecaster</h1>
<p>Predicting demand quantities for optimized franchise operations</p>
</div>
""",
unsafe_allow_html=True
)
# Database connection function
def get_database_connection():
try:
connection = psycopg2.connect(
database="postgres",
user="postgres",
password="Welcome@1",
host="localhost",
port="5432"
)
return connection
except Exception as e:
st.error(f"Database connection failed: {str(e)}")
return None
# Function to fetch data from database
def fetch_sales_data(franchise_id, season, sku=None):
conn = get_database_connection()
if conn:
try:
query = """
SELECT franchise_id, season, sku, prediction_quantity
FROM public.sales_prediction
WHERE franchise_id = %s AND season = %s
"""
params = [franchise_id, season]
if sku:
query += " AND sku = %s"
params.append(sku)
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
except Exception as e:
st.error(f"Error fetching data: {str(e)}")
conn.close()
return None
return None
current_year = datetime.now().year-4
future_years = [current_year + i for i in range(10)]
seasons = ["FALL", "HOLIDAY", "SPRING", "SUMMER"]
season_options = [f"{season}-{year}" for year in future_years for season in seasons]
franchise_id = st.number_input("FRANCHISE ID", min_value=30101, value=30101)
season = st.selectbox("SEASON", season_options)
sku = st.text_input("SKU (optional)", "")
# Get Data button
if st.button("Get Data"):
if not season:
st.warning("Please enter a season")
# Fetch data
df = fetch_sales_data(franchise_id, season, sku if sku else None)
if df is not None and not df.empty:
# Function to style the dataframe as markdown with column header styles
def style_table(df):
# Start with styled header
header_html = """
<style>
.styled-table th {
background-color: lightblue;
color: black;
font-weight: bold;
text-align: left;
padding: 10px;
}
.styled-table td {
text-align: left;
padding: 8px;
}
table {
border-collapse: collapse;
width: 100%;
}
tbody tr:nth-child(even) {
background-color: #f9f9f9;
}
</style>
<table class="styled-table">
<thead>
<tr>
"""
# Add column headers
for col in df.columns:
header_html += f"<th>{col}</th>"
header_html += "</tr></thead><tbody>"
# Add table rows
for _, row in df.iterrows():
header_html += "<tr>"
for cell in row:
header_html += f"<td>{cell}</td>"
header_html += "</tr>"
header_html += "</tbody></table>"
return header_html
# Render styled table
styled_html = style_table(df)
st.markdown(styled_html, unsafe_allow_html=True)
# Check if `prediction_quantity` has valid values
if df['prediction_quantity'].notnull().any():
# Display charts only if SKU is not entered
if not sku:
# Bar Chart
fig_bar = px.bar(
df,
x='sku',
y='prediction_quantity',
title='Prediction Quantity by SKU',
labels={'prediction_quantity': 'Prediction Quantity', 'sku': 'SKU'},
)
fig_bar.update_layout(
plot_bgcolor='#f5f5f5', # Grey background for chart area
paper_bgcolor='#ffffff', # White surrounding chart background
font=dict(color='#333333') # Grey font for labels
)
st.plotly_chart(fig_bar)
# Pie Chart (optional example)
fig_pie = px.pie(
df,
names='sku',
values='prediction_quantity',
title='Distribution of Prediction Quantity by SKU'
)
fig_pie.update_layout(
plot_bgcolor='#f5f5f5',
paper_bgcolor='#ffffff',
font=dict(color='#333333')
)
st.plotly_chart(fig_pie)
else:
st.error("Prediction quantity is null. No charts to display.")
else:
st.error("No data found for the given parameters.")
import asyncpg
from typing import List
import os
# Database connection configuration
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres:Welcome%401@localhost:5432/postgres")
# Async function to get a connection to the database
async def get_db_connection():
conn = await asyncpg.connect(DATABASE_URL)
return conn
# Async function to fetch data
async def fetch_sales_history(query: str, params: List):
conn = await get_db_connection()
records = await conn.fetch(query, *params)
await conn.close()
return records
\ No newline at end of file
import pandas as pd
from prophet import Prophet
# Sample data
data = {'ds': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05']),
'y': [10, 12, 15, 13, 18]}
df = pd.DataFrame(data)
# Initialize and fit the model
model = Prophet()
model.fit(df)
# Create future dataframe for predictions
future = model.make_future_dataframe(periods=7)
# Make predictions
forecast = model.predict(future)
# Print the forecast
print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(10))
\ No newline at end of file
from prophet import Prophet
import pandas as pd
import os
def convert_season_to_date(season):
"""
Converts a season-year string (e.g., 'SPRING-2023') into a date string (e.g., '2023-03-01').
Args:
season (str): Season and year (e.g., 'SPRING-2023').
Returns:
str: Date string in 'YYYY-MM-DD' format.
"""
season_month_mapping = {
'HOLIDAY': 12, # December
'SPRING': 3, # March
'SUMMER': 6, # June
'FALL': 9 # September
}
season, year = season.split('-')
month = season_month_mapping.get(season.upper(), 1) # Default to January if season not found
return f'{year}-{month:02d}-01'
def convert_date_to_season(date):
"""
Converts a date (e.g., '2023-03-01') into a season-year string (e.g., 'SPRING-2023').
"""
season_month_mapping = {
1: 'HOLIDAY', # January to February (Winter)
2: 'HOLIDAY',
3: 'SPRING', # March to May (Spring)
4: 'SPRING',
5: 'SPRING',
6: 'SUMMER', # June to August (Summer)
7: 'SUMMER',
8: 'SUMMER',
9: 'FALL', # September to November (Fall)
10: 'FALL',
11: 'FALL',
12: 'HOLIDAY' # December (Holiday season)
}
year = date.year
month = date.month
# Map month to season
season = next((name for num, name in season_month_mapping.items() if month == num), None)
if not season:
raise ValueError(f"No season mapping found for month: {month}")
return f'{season}-{year}'
def prepare_and_forecast():
"""
Prepares data and forecasts demand for each SKU using Prophet.
Args:
data_file (str): Path to the CSV file containing historical data.
sku_id (int, optional): Specific SKU ID to retrieve forecast. Defaults to None.
Returns:
dict: A dictionary of SKU forecasts.
"""
# Read data
print("Current working directory:", os.getcwd())
file_path = 'data/History.csv'
if not os.path.exists(file_path):
raise FileNotFoundError(f"The file {file_path} does not exist. Please provide the correct path.")
df = pd.read_csv(file_path)
df['DATE'] = df['SEASON'].apply(convert_season_to_date)
df['DATE'] = pd.to_datetime(df['DATE'])
sku_forecasts = {}
for sku in df['SKU'].unique():
# Filter data for the current SKU
sku_data = df[df['SKU'] == sku]
# print(sku_data)
# Group by Date and sum the quantities ordered
df_aggregated = sku_data.groupby('DATE').agg({'QUANTITY_ORDERED': 'sum'}).reset_index()
# Prepare data for Prophet
sku_data_prophet = df_aggregated.rename(columns={'DATE': 'ds', 'QUANTITY_ORDERED': 'y'})
# Initialize and fit the Prophet model
model = Prophet(yearly_seasonality=True)
model.fit(sku_data_prophet)
last_date = sku_data_prophet['ds'].max()
last_year = last_date.year
future_dates = []
# Generate dates for the next 2 years on a quarterly basis
quarters = [3, 6, 9, 12] # Representing January, April, July, October
for year_offset in range(1,5): # Forecast for the next 4 years
for month in quarters: # Iterate through quarterly months
future_dates.append(pd.to_datetime(f"{last_year + year_offset}-{month:02d}-01"))
# Create a DataFrame with the future dates
future_seasonal_df = pd.DataFrame({'ds': future_dates})
# Make future predictions (4 quarters ahead)
# future = model.make_future_dataframe(periods=5, freq='Q')
forecast = model.predict(future_seasonal_df)
combined_df = pd.concat([sku_data_prophet, forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]])
sku_forecasts[sku] = combined_df
final_data = []
# Iterate over each SKU and its forecast
for sku, forecast_df in sku_forecasts.items():
# Add season to the forecast DataFrame
forecast_df['season'] = forecast_df['ds'].apply(lambda date: convert_date_to_season(date))
forecast_df = forecast_df.rename(columns={'yhat': 'prediction_quantity'})
# Select only the required columns and add SKU information
sku_data = forecast_df.tail(16)[['season', 'prediction_quantity']].copy()
sku_data['sku'] = sku
# sku_data = forecast_df[['season', 'yhat']].copy()
# sku_data['sku'] = sku # Add SKU column
# Append the transformed data to final_data list
final_data.extend(sku_data.to_dict(orient='records'))
# Convert the final_data list into a DataFrame for better viewing/manipulation
final_df = pd.DataFrame(final_data)
# Display the resulting DataFrame
#print(final_df)
# Optionally, print or plot the forecast for the current SKU
# print(f"Forecast for SKU: {sku}")
# print(forecast[['ds', 'yhat']].tail(4)) # Print forecasted values for the next 4 periods
# # Return a specific SKU forecast if provided
# if sku_id is not None:
# return sku_forecasts.get(sku_id, None)
return final_df
from fastapi import FastAPI, HTTPException
from typing import List
from pydantic import BaseModel
import asyncpg
# Import the database functions
from database import fetch_sales_history, get_db_connection
app = FastAPI()
class SalesRecord(BaseModel):
sku_id: int
franchise_id: int
season: str
sku: int
quantity_ordered: int
async def insert_sales_data(sales_data: List[SalesRecord]):
conn = await get_db_connection()
try:
async with conn.transaction():
# Prepare the SQL query to insert data into the table
insert_query = """
INSERT INTO public.sales_history (sku_id, franchise_id, season, sku, quantity_ordered)
VALUES ($1, $2, $3, $4, $5)
"""
# Loop through the sales records and execute the insertion query
for record in sales_data:
await conn.execute(
insert_query,
record.sku_id,
record.franchise_id,
record.season,
record.sku,
record.quantity_ordered
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error inserting data: {str(e)}")
finally:
await conn.close()
# Endpoint to fetch all sales history
@app.get("/sales-history", response_model=List[SalesRecord])
async def get_sales_history():
try:
query = """
SELECT sku_id, franchise_id, season, sku, quantity_ordered
FROM public.sales_history
"""
params = []
# Fetch data from the database
sales_data = await fetch_sales_history(query, params)
sales_records = [
SalesRecord(
sku_id=record['sku_id'],
franchise_id=record['franchise_id'],
season=record['season'],
sku=record['sku'], # Directly assign sku as an integer
quantity_ordered=record['quantity_ordered']
)
for record in sales_data
]
return sales_records
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Endpoint to insert sales data via a POST request
@app.post("/sales-history")
async def create_sales_history(sales_data: List[SalesRecord]):
try:
await insert_sales_data(sales_data)
return {"message": "Sales data successfully inserted."}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error inserting data: {str(e)}")
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, Float, Date
from sqlalchemy.orm import declarative_base
# from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import forcastgenerator
# Database URL (replace with your actual database credentials)
DATABASE_URL = "postgresql://postgres:Welcome%401@localhost:5432/postgres"
# Create the FastAPI app instance
app = FastAPI()
# Initialize the database connection
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Define the database table model for sales_prediction
class SalesPrediction(Base):
__tablename__ = "sales_prediction"
id = Column(Integer, primary_key=True, index=True)
franchise_id = Column(Integer, index=True)
season = Column(String, index=True)
sku = Column(Integer, index=True)
prediction_quantity = Column(Float)
# Pydantic model for the request body
class SalesPredictionRequest(BaseModel):
franchise_id: int
season: str
sku: int
class SalesPredictionResponse(BaseModel):
franchise_id: int
season: str
sku: int
prediction_quantity: float
# Load the trained model (Replace with your actual model path)
def load_model():
prediction_model = forcastgenerator.prepare_and_forecast()
return prediction_model
def update_db_from_list_and_dataframe(object_list, df, franchise_id):
"""
Updates the database based on matches between object_list and df.
:param db: SQLAlchemy session object
:param object_list: List of objects to update
:param df: Pandas DataFrame with `sku`, `season`, and new values
"""
db = SessionLocal()
for obj in object_list:
# Find matching row in the DataFrame
matching_row = df[(df['sku'] == obj.sku) & (df['season'] == obj.season)]
# If a match is found, update the database
if not matching_row.empty:
# Extract the new value(s) to be updated
new_value = matching_row.iloc[0]['prediction_quantity'] # Replace 'value_column' with actual column name
# Update the record in the database
db.query(SalesPrediction).filter(
SalesPrediction.sku == obj.sku,
SalesPrediction.season == obj.season,
SalesPrediction.franchise_id == franchise_id
).update({'prediction_quantity': int(new_value)}) # Replace 'quantity' with the actual field to update
# Commit the changes
db.commit()
# db.refresh(sales_prediction)
def predict_quantity(season: str, sku: int, df) -> float:
# Filter the dataframe for the given SKU and season
filtered_df = df[(df['sku'] == sku) & (df['season'] == season)]
# Check if the filtered dataframe is empty
if filtered_df.empty:
return None # Return None if no record is found
else:
return filtered_df['prediction_quantity'].iloc[0]
# Initialize model (for example purposes, loading on app startup)
franchise_id: int = 30101
model = load_model()
db = SessionLocal()
sales_prediction_db = db.query(SalesPrediction).filter(
#franchise_id we are hard coding for now
SalesPrediction.franchise_id == franchise_id
).all()
#storing all the prediction values to db
update_db_from_list_and_dataframe(sales_prediction_db, model,franchise_id)
model
# Route to handle PATCH request to update prediction in DB
@app.patch("/predict/{franchise_id}/{season}/{sku}", response_model=SalesPredictionResponse)
def update_sales_prediction(
franchise_id: int,
season: str,
sku: int,
request: SalesPredictionRequest
):
db = SessionLocal()
try:
# Query the existing prediction record
sales_prediction = db.query(SalesPrediction).filter(
SalesPrediction.franchise_id == franchise_id,
SalesPrediction.season == season,
SalesPrediction.sku == sku
).first()
if sales_prediction:
# Return updated response
return SalesPredictionResponse(
franchise_id=sales_prediction.franchise_id,
season=sales_prediction.season,
sku=sales_prediction.sku,
prediction_quantity=sales_prediction.prediction_quantity
)
else:
raise HTTPException(status_code=404, detail="Record not found")
except Exception as e:
db.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
db.close()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)
\ No newline at end of file
from datetime import datetime
import streamlit as st
import plotly.express as px
import requests
import pandas as pd
# Configure the page
st.set_page_config(page_title="Sales Prediction Dashboard", layout="wide")
# Title and description
st.title("Sales Prediction Dashboard")
st.markdown("Enter the details below to get quantity predictions")
# Get the current year
current_year = datetime.now().year
# Generate future SEASON options dynamically
future_years = [current_year + i for i in range(10)] # Next 10 years
seasons = ["FALL", "HOLIDAY", "SPRING", "SUMMER"]
season_options = [f"{season}-{year}" for year in future_years for season in seasons]
st.header("Enter Franchise, Season, and SKU Details")
# Input form
with st.form("prediction_form"):
franchise_id = st.text_input("Franchise ID", placeholder="Enter Franchise ID (e.g., F001)")
# season = st.selectbox("Season", ["Summer", "Winter"])
seasons = ["FALL", "HOLIDAY", "SPRING", "SUMMER"]
# season_options = [f"{season}-{year}" for year in future_years for season in seasons]
season = st.selectbox("SEASON", season_options)
sku = st.text_input("SKU (Optional)", placeholder="Enter SKU (e.g., SKU001)")
submitted = st.form_submit_button("Get Prediction")
if submitted:
# Prepare the request data
request_data = {
"franchise_id": franchise_id,
"season": season,
"sku": sku if sku else None
}
try:
# Make API request
response = requests.post("http://localhost:8000/predict", json=request_data)
response.raise_for_status()
result = response.json()
# Display prediction
st.header("Prediction Results")
st.metric("Predicted Quantity", f"{result['predicted_quantity']:.2f}")
# Visualize data
if result['has_sku'] and result['predicted_quantity'] > 0:
# Create pie chart for SKU distribution
fig = px.pie(
values=list(result['data']['SKU Distribution'].values()),
names=list(result['data']['SKU Distribution'].keys()),
title=f"SKU Distribution for {franchise_id} - {season}"
)
st.plotly_chart(fig)
elif result['predicted_quantity'] > 0:
# Create table view for all SKUs
st.subheader("SKU Breakdown")
df = pd.DataFrame(
result['data'].items(),
columns=['SKU', 'Quantity']
)
st.dataframe(df)
else:
st.warning("No data available for the selected criteria")
except requests.exceptions.RequestException as e:
st.error(f"Error connecting to the API: {str(e)}")
except Exception as e:
st.error(f"An error occurred: {str(e)}")
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment