151 lines
5.9 KiB
Python
151 lines
5.9 KiB
Python
# --- preagg.py ---------------------------------------------------------------
|
||
from __future__ import annotations
|
||
import pandas as pd
|
||
import numpy as np
|
||
from typing import Callable, Dict, List
|
||
|
||
# Rejestr agregatorów: name -> funkcja(df) -> DataFrame
|
||
# AGGREGATORS: Dict[str, Callable[[pd.DataFrame], pd.DataFrame]] = {}
|
||
AGGREGATORS = {}
|
||
def aggregator(name: str):
|
||
"""Dekorator do łatwego rejestrowania nowych agregatorów."""
|
||
def _wrap(func: Callable[[pd.DataFrame], pd.DataFrame]):
|
||
AGGREGATORS[name] = func
|
||
return func
|
||
return _wrap
|
||
|
||
def to_df(rows: List[tuple]) -> pd.DataFrame:
|
||
"""Konwersja rows -> DataFrame (dopasuj nazwy kolumn do SELECT-a)."""
|
||
cols = [
|
||
"document_no",
|
||
"customer_name", # i.parent_name
|
||
"register_date", # DATE(i.register_date)
|
||
"product_code", # ii.code
|
||
"product_name", # ii.name
|
||
"quantity", # ii.quantity
|
||
"total_netto", # ii.total_netto (wartość sprzedaży netto)
|
||
]
|
||
df = pd.DataFrame(rows, columns=cols)
|
||
if df.empty:
|
||
return df
|
||
# Typy
|
||
df["register_date"] = pd.to_datetime(df["register_date"])
|
||
df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce").fillna(0.0)
|
||
df["total_netto"] = pd.to_numeric(df["total_netto"], errors="coerce").fillna(0.0)
|
||
# ASP (Average Selling Price) – średnia cena pozycji
|
||
# Uwaga: ASP lepiej liczyć ważoną średnią w agregatach; tu to „unit price” na pozycji.
|
||
df["unit_price"] = np.where(df["quantity"] != 0, df["total_netto"] / df["quantity"], np.nan)
|
||
return df
|
||
|
||
# ------------------- Wbudowane agregatory (możesz dopisywać kolejne) -------------------
|
||
|
||
@aggregator("daily_sales")
|
||
def daily_sales(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""Dzienna sprzedaż: ilość, wartość, liczba dokumentów, ASP ważony."""
|
||
if df.empty:
|
||
return df
|
||
g = df.groupby(pd.Grouper(key="register_date", freq="D"))
|
||
out = g.agg(
|
||
qty=("quantity", "sum"),
|
||
sales=("total_netto", "sum"),
|
||
docs=("document_no", "nunique"),
|
||
).reset_index()
|
||
# ASP ważony (sales / qty)
|
||
out["asp"] = np.where(out["qty"] != 0, out["sales"] / out["qty"], np.nan)
|
||
# Zmiana d/d
|
||
out["sales_pct_change_dod"] = out["sales"].pct_change()
|
||
# Rolling 7
|
||
out["sales_rolling7"] = out["sales"].rolling(7, min_periods=1).mean()
|
||
return out
|
||
|
||
@aggregator("product_summary")
|
||
def product_summary(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""Podsumowanie po produkcie."""
|
||
if df.empty:
|
||
return df
|
||
g = df.groupby(["product_code", "product_name"], as_index=False).agg(
|
||
qty=("quantity", "sum"),
|
||
sales=("total_netto", "sum"),
|
||
docs=("document_no", "nunique"),
|
||
)
|
||
g["asp_weighted"] = np.where(g["qty"] != 0, g["sales"] / g["qty"], np.nan)
|
||
# Udział w koszyku (mix % po wartości)
|
||
total_sales = g["sales"].sum()
|
||
g["mix_share_sales"] = np.where(total_sales > 0, g["sales"] / total_sales, 0.0)
|
||
return g.sort_values("sales", ascending=False)
|
||
|
||
@aggregator("customer_summary")
|
||
def customer_summary(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""Podsumowanie po kliencie."""
|
||
if df.empty:
|
||
return df
|
||
g = df.groupby(["customer_name"], as_index=False).agg(
|
||
qty=("quantity", "sum"),
|
||
sales=("total_netto", "sum"),
|
||
docs=("document_no", "nunique"),
|
||
distinct_products=("product_code", "nunique"),
|
||
)
|
||
g["asp_weighted"] = np.where(g["qty"] != 0, g["sales"] / g["qty"], np.nan)
|
||
return g.sort_values("sales", ascending=False)
|
||
|
||
@aggregator("product_daily")
|
||
def product_daily(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""Dzienna sprzedaż per produkt (przydatne do trendów/rollingów w AI)."""
|
||
if df.empty:
|
||
return df
|
||
g = (df
|
||
.groupby([pd.Grouper(key="register_date", freq="D"), "product_code", "product_name"], as_index=False)
|
||
.agg(qty=("quantity", "sum"),
|
||
sales=("total_netto", "sum")))
|
||
# Rolling 7 per produkt
|
||
g = g.sort_values(["product_code", "register_date"])
|
||
g["sales_rolling7"] = g.groupby("product_code")["sales"].transform(lambda s: s.rolling(7, min_periods=1).mean())
|
||
g["sales_pct_change_dod"] = g.groupby("product_code")["sales"].pct_change()
|
||
return g
|
||
|
||
@aggregator("top10_products_by_sales")
|
||
def top10_products_by_sales(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""Top 10 produktów po wartości sprzedaży (okres z wejścia)."""
|
||
base = AGGREGATORS["product_summary"](df)
|
||
return base.nlargest(10, "sales")
|
||
|
||
@aggregator("top10_customers_by_sales")
|
||
def top10_customers_by_sales(df: pd.DataFrame) -> pd.DataFrame:
|
||
"""Top 10 klientów po wartości sprzedaży."""
|
||
base = AGGREGATORS["customer_summary"](df)
|
||
return base.nlargest(10, "sales")
|
||
|
||
# ------------------- Runner -------------------
|
||
|
||
# def compute_preaggregates(rows: List[tuple]) -> dict[str, pd.DataFrame]:
|
||
def compute_preaggregates(rows):
|
||
"""Główny punkt wejścia: rows -> df -> uruchom wszystkie agregatory."""
|
||
df = to_df(rows)
|
||
# results: dict[str, pd.DataFrame] = {}
|
||
results = {}
|
||
for name, fn in AGGREGATORS.items():
|
||
try:
|
||
results[name] = fn(df).copy()
|
||
except Exception as e:
|
||
# Niech agregat nie wysadza całości – zapisz pusty DF + info
|
||
results[name] = pd.DataFrame({"_error": [str(e)], "_aggregator": [name]})
|
||
return results
|
||
|
||
def serialize_for_ai(results: dict[str, pd.DataFrame]) -> dict[str, list[dict]]:
|
||
"""
|
||
Konwersja wyników do lekkiego JSON-a (listy rekordów),
|
||
który łatwo przekazać do modelu AI lub zapisać do pliku.
|
||
"""
|
||
# out: dict[str, list[dict]] = {}
|
||
out = {}
|
||
for name, df in results.items():
|
||
if df is None or df.empty:
|
||
out[name] = []
|
||
else:
|
||
# zaokrąglij liczby dla czytelności (opcjonalnie)
|
||
df2 = df.copy()
|
||
for c in df2.select_dtypes(include=[np.number]).columns:
|
||
df2[c] = df2[c].round(6)
|
||
out[name] = df2.to_dict(orient="records")
|
||
return out
|