# --- preagg.py --------------------------------------------------------------- from __future__ import annotations import pandas as pd import numpy as np from typing import Callable, Dict, List # Rejestr agregatorów: name -> funkcja(df) -> DataFrame # AGGREGATORS: Dict[str, Callable[[pd.DataFrame], pd.DataFrame]] = {} AGGREGATORS = {} def aggregator(name: str): """Dekorator do łatwego rejestrowania nowych agregatorów.""" def _wrap(func: Callable[[pd.DataFrame], pd.DataFrame]): AGGREGATORS[name] = func return func return _wrap def to_df(rows: List[tuple]) -> pd.DataFrame: """Konwersja rows -> DataFrame (dopasuj nazwy kolumn do SELECT-a).""" cols = [ "document_no", "customer_name", # i.parent_name "register_date", # DATE(i.register_date) "product_code", # ii.code "product_name", # ii.name "quantity", # ii.quantity "total_netto", # ii.total_netto (wartość sprzedaży netto) ] df = pd.DataFrame(rows, columns=cols) if df.empty: return df # Typy df["register_date"] = pd.to_datetime(df["register_date"]) df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce").fillna(0.0) df["total_netto"] = pd.to_numeric(df["total_netto"], errors="coerce").fillna(0.0) # ASP (Average Selling Price) – średnia cena pozycji # Uwaga: ASP lepiej liczyć ważoną średnią w agregatach; tu to „unit price” na pozycji. df["unit_price"] = np.where(df["quantity"] != 0, df["total_netto"] / df["quantity"], np.nan) return df # ------------------- Wbudowane agregatory (możesz dopisywać kolejne) ------------------- @aggregator("daily_sales") def daily_sales(df: pd.DataFrame) -> pd.DataFrame: """Dzienna sprzedaż: ilość, wartość, liczba dokumentów, ASP ważony.""" if df.empty: return df g = df.groupby(pd.Grouper(key="register_date", freq="D")) out = g.agg( qty=("quantity", "sum"), sales=("total_netto", "sum"), docs=("document_no", "nunique"), ).reset_index() # ASP ważony (sales / qty) out["asp"] = np.where(out["qty"] != 0, out["sales"] / out["qty"], np.nan) # Zmiana d/d out["sales_pct_change_dod"] = out["sales"].pct_change() # Rolling 7 out["sales_rolling7"] = out["sales"].rolling(7, min_periods=1).mean() return out @aggregator("product_summary") def product_summary(df: pd.DataFrame) -> pd.DataFrame: """Podsumowanie po produkcie.""" if df.empty: return df g = df.groupby(["product_code", "product_name"], as_index=False).agg( qty=("quantity", "sum"), sales=("total_netto", "sum"), docs=("document_no", "nunique"), ) g["asp_weighted"] = np.where(g["qty"] != 0, g["sales"] / g["qty"], np.nan) # Udział w koszyku (mix % po wartości) total_sales = g["sales"].sum() g["mix_share_sales"] = np.where(total_sales > 0, g["sales"] / total_sales, 0.0) return g.sort_values("sales", ascending=False) @aggregator("customer_summary") def customer_summary(df: pd.DataFrame) -> pd.DataFrame: """Podsumowanie po kliencie.""" if df.empty: return df g = df.groupby(["customer_name"], as_index=False).agg( qty=("quantity", "sum"), sales=("total_netto", "sum"), docs=("document_no", "nunique"), distinct_products=("product_code", "nunique"), ) g["asp_weighted"] = np.where(g["qty"] != 0, g["sales"] / g["qty"], np.nan) return g.sort_values("sales", ascending=False) @aggregator("product_daily") def product_daily(df: pd.DataFrame) -> pd.DataFrame: """Dzienna sprzedaż per produkt (przydatne do trendów/rollingów w AI).""" if df.empty: return df g = (df .groupby([pd.Grouper(key="register_date", freq="D"), "product_code", "product_name"], as_index=False) .agg(qty=("quantity", "sum"), sales=("total_netto", "sum"))) # Rolling 7 per produkt g = g.sort_values(["product_code", "register_date"]) g["sales_rolling7"] = g.groupby("product_code")["sales"].transform(lambda s: s.rolling(7, min_periods=1).mean()) g["sales_pct_change_dod"] = g.groupby("product_code")["sales"].pct_change() return g @aggregator("top10_products_by_sales") def top10_products_by_sales(df: pd.DataFrame) -> pd.DataFrame: """Top 10 produktów po wartości sprzedaży (okres z wejścia).""" base = AGGREGATORS["product_summary"](df) return base.nlargest(10, "sales") @aggregator("top10_customers_by_sales") def top10_customers_by_sales(df: pd.DataFrame) -> pd.DataFrame: """Top 10 klientów po wartości sprzedaży.""" base = AGGREGATORS["customer_summary"](df) return base.nlargest(10, "sales") # ------------------- Runner ------------------- def compute_preaggregates(rows: List[tuple]) -> dict[str, pd.DataFrame]: #def compute_preaggregates(rows): """Główny punkt wejścia: rows -> df -> uruchom wszystkie agregatory.""" df = to_df(rows) # results: dict[str, pd.DataFrame] = {} results = {} for name, fn in AGGREGATORS.items(): try: results[name] = fn(df).copy() except Exception as e: # Niech agregat nie wysadza całości – zapisz pusty DF + info results[name] = pd.DataFrame({"_error": [str(e)], "_aggregator": [name]}) return results def serialize_for_ai(results: dict[str, pd.DataFrame]) -> dict[str, list[dict]]: """ Konwersja wyników do lekkiego JSON-a (listy rekordów), który łatwo przekazać do modelu AI lub zapisać do pliku. """ # out: dict[str, list[dict]] = {} out = {} for name, df in results.items(): if df is None or df.empty: out[name] = [] else: # zaokrąglij liczby dla czytelności (opcjonalnie) df2 = df.copy() for c in df2.select_dtypes(include=[np.number]).columns: df2[c] = df2[c].round(6) out[name] = df2.to_dict(orient="records") return out