diff --git a/modules/EcmInvoiceOuts/ai/analysisAI.py b/modules/EcmInvoiceOuts/ai/analysisAI.py new file mode 100644 index 00000000..77327e87 --- /dev/null +++ b/modules/EcmInvoiceOuts/ai/analysisAI.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +import os, sys, json +from preaggregates import compute_preaggregates, serialize_for_ai + +try: + import mysql.connector +except Exception as e: + sys.stderr.write("MySQL connector not available: %s\n" % e) + sys.exit(1) + +def getenv(key, default=None): + return os.environ.get(key, default) + +def main(): + cfg = { + #"host": getenv("MYSQL_HOST", "twinpol-mysql56"), + "host": getenv("MYSQL_HOST", "localhost"), + "user": getenv("MYSQL_USER", "root"), + "password": getenv("MYSQL_PASSWORD", "rootpassword"), + "database": getenv("MYSQL_DATABASE", "preDb_0dcc87940d3655fa574b253df04ca1c3"), + "port": int(getenv("MYSQL_PORT", "3306")), + } + + try: + cnx = mysql.connector.connect(**cfg) + cur = cnx.cursor() + #cur.execute("SELECT COUNT(*) FROM ecminvoiceouts WHERE YEAR(register_date)=2025") + cur.execute(""" + SELECT i.document_no, + i.parent_name, + DATE(i.register_date) AS register_date, + ii.code, + ii.name, + ii.quantity, + ii.total_netto + FROM ecminvoiceoutitems AS ii + JOIN ecminvoiceouts AS i ON i.id = ii.ecminvoiceout_id + WHERE i.register_date >= %s + AND i.register_date < %s + AND i.type = %s + """, ("2025-07-01", "2025-08-01", "normal")) + rows = cur.fetchall() + + results = compute_preaggregates(rows) + + # 2) podejrzyj wyniki + # ['daily_sales', 'product_summary', 'customer_summary', 'product_daily', + # 'top10_products_by_sales', 'top10_customers_by_sales'] + print(">> available tables:", list(results.keys())) +# print(results["daily_sales"].head(10)) +# print(results["product_summary"]) +# print(results["customer_summary"]) +# print(results["product_daily"]) +# print(results["top10_products_by_sales"]) +# print(results["top10_customers_by_sales"]) + results["daily_sales"].head(10) + results["product_summary"] + results["customer_summary"] + results["product_daily"] + results["top10_products_by_sales"] + results["top10_customers_by_sales"] + + # 3) zserializuj do lekkiego JSON-a (np. do AI lub do pliku) + ai_payload = serialize_for_ai(results) + print(json.dumps(ai_payload, ensure_ascii=False, indent=2, default=str)) + + cur.close() + cnx.close() + except Exception as e: + sys.stderr.write("Query error: %s\n" % e) + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/modules/EcmInvoiceOuts/ai/enqueue.php b/modules/EcmInvoiceOuts/ai/enqueue.php deleted file mode 100644 index d8b10d73..00000000 --- a/modules/EcmInvoiceOuts/ai/enqueue.php +++ /dev/null @@ -1,21 +0,0 @@ - $id]); diff --git a/modules/EcmInvoiceOuts/ai/preaggregates.py b/modules/EcmInvoiceOuts/ai/preaggregates.py new file mode 100644 index 00000000..61e8bbea --- /dev/null +++ b/modules/EcmInvoiceOuts/ai/preaggregates.py @@ -0,0 +1,150 @@ +# --- preagg.py --------------------------------------------------------------- +from __future__ import annotations +import pandas as pd +import numpy as np +from typing import Callable, Dict, List + +# Rejestr agregatorów: name -> funkcja(df) -> DataFrame +# AGGREGATORS: Dict[str, Callable[[pd.DataFrame], pd.DataFrame]] = {} +AGGREGATORS = {} +def aggregator(name: str): + """Dekorator do łatwego rejestrowania nowych agregatorów.""" + def _wrap(func: Callable[[pd.DataFrame], pd.DataFrame]): + AGGREGATORS[name] = func + return func + return _wrap + +def to_df(rows: List[tuple]) -> pd.DataFrame: + """Konwersja rows -> DataFrame (dopasuj nazwy kolumn do SELECT-a).""" + cols = [ + "document_no", + "customer_name", # i.parent_name + "register_date", # DATE(i.register_date) + "product_code", # ii.code + "product_name", # ii.name + "quantity", # ii.quantity + "total_netto", # ii.total_netto (wartość sprzedaży netto) + ] + df = pd.DataFrame(rows, columns=cols) + if df.empty: + return df + # Typy + df["register_date"] = pd.to_datetime(df["register_date"]) + df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce").fillna(0.0) + df["total_netto"] = pd.to_numeric(df["total_netto"], errors="coerce").fillna(0.0) + # ASP (Average Selling Price) – średnia cena pozycji + # Uwaga: ASP lepiej liczyć ważoną średnią w agregatach; tu to „unit price” na pozycji. + df["unit_price"] = np.where(df["quantity"] != 0, df["total_netto"] / df["quantity"], np.nan) + return df + +# ------------------- Wbudowane agregatory (możesz dopisywać kolejne) ------------------- + +@aggregator("daily_sales") +def daily_sales(df: pd.DataFrame) -> pd.DataFrame: + """Dzienna sprzedaż: ilość, wartość, liczba dokumentów, ASP ważony.""" + if df.empty: + return df + g = df.groupby(pd.Grouper(key="register_date", freq="D")) + out = g.agg( + qty=("quantity", "sum"), + sales=("total_netto", "sum"), + docs=("document_no", "nunique"), + ).reset_index() + # ASP ważony (sales / qty) + out["asp"] = np.where(out["qty"] != 0, out["sales"] / out["qty"], np.nan) + # Zmiana d/d + out["sales_pct_change_dod"] = out["sales"].pct_change() + # Rolling 7 + out["sales_rolling7"] = out["sales"].rolling(7, min_periods=1).mean() + return out + +@aggregator("product_summary") +def product_summary(df: pd.DataFrame) -> pd.DataFrame: + """Podsumowanie po produkcie.""" + if df.empty: + return df + g = df.groupby(["product_code", "product_name"], as_index=False).agg( + qty=("quantity", "sum"), + sales=("total_netto", "sum"), + docs=("document_no", "nunique"), + ) + g["asp_weighted"] = np.where(g["qty"] != 0, g["sales"] / g["qty"], np.nan) + # Udział w koszyku (mix % po wartości) + total_sales = g["sales"].sum() + g["mix_share_sales"] = np.where(total_sales > 0, g["sales"] / total_sales, 0.0) + return g.sort_values("sales", ascending=False) + +@aggregator("customer_summary") +def customer_summary(df: pd.DataFrame) -> pd.DataFrame: + """Podsumowanie po kliencie.""" + if df.empty: + return df + g = df.groupby(["customer_name"], as_index=False).agg( + qty=("quantity", "sum"), + sales=("total_netto", "sum"), + docs=("document_no", "nunique"), + distinct_products=("product_code", "nunique"), + ) + g["asp_weighted"] = np.where(g["qty"] != 0, g["sales"] / g["qty"], np.nan) + return g.sort_values("sales", ascending=False) + +@aggregator("product_daily") +def product_daily(df: pd.DataFrame) -> pd.DataFrame: + """Dzienna sprzedaż per produkt (przydatne do trendów/rollingów w AI).""" + if df.empty: + return df + g = (df + .groupby([pd.Grouper(key="register_date", freq="D"), "product_code", "product_name"], as_index=False) + .agg(qty=("quantity", "sum"), + sales=("total_netto", "sum"))) + # Rolling 7 per produkt + g = g.sort_values(["product_code", "register_date"]) + g["sales_rolling7"] = g.groupby("product_code")["sales"].transform(lambda s: s.rolling(7, min_periods=1).mean()) + g["sales_pct_change_dod"] = g.groupby("product_code")["sales"].pct_change() + return g + +@aggregator("top10_products_by_sales") +def top10_products_by_sales(df: pd.DataFrame) -> pd.DataFrame: + """Top 10 produktów po wartości sprzedaży (okres z wejścia).""" + base = AGGREGATORS["product_summary"](df) + return base.nlargest(10, "sales") + +@aggregator("top10_customers_by_sales") +def top10_customers_by_sales(df: pd.DataFrame) -> pd.DataFrame: + """Top 10 klientów po wartości sprzedaży.""" + base = AGGREGATORS["customer_summary"](df) + return base.nlargest(10, "sales") + +# ------------------- Runner ------------------- + +# def compute_preaggregates(rows: List[tuple]) -> dict[str, pd.DataFrame]: +def compute_preaggregates(rows): + """Główny punkt wejścia: rows -> df -> uruchom wszystkie agregatory.""" + df = to_df(rows) +# results: dict[str, pd.DataFrame] = {} + results = {} + for name, fn in AGGREGATORS.items(): + try: + results[name] = fn(df).copy() + except Exception as e: + # Niech agregat nie wysadza całości – zapisz pusty DF + info + results[name] = pd.DataFrame({"_error": [str(e)], "_aggregator": [name]}) + return results + +def serialize_for_ai(results: dict[str, pd.DataFrame]) -> dict[str, list[dict]]: + """ + Konwersja wyników do lekkiego JSON-a (listy rekordów), + który łatwo przekazać do modelu AI lub zapisać do pliku. + """ +# out: dict[str, list[dict]] = {} + out = {} + for name, df in results.items(): + if df is None or df.empty: + out[name] = [] + else: + # zaokrąglij liczby dla czytelności (opcjonalnie) + df2 = df.copy() + for c in df2.select_dtypes(include=[np.number]).columns: + df2[c] = df2[c].round(6) + out[name] = df2.to_dict(orient="records") + return out diff --git a/modules/EcmInvoiceOuts/ai/result.php b/modules/EcmInvoiceOuts/ai/result.php deleted file mode 100644 index 81a59317..00000000 --- a/modules/EcmInvoiceOuts/ai/result.php +++ /dev/null @@ -1,12 +0,0 @@ - pl.DataFrame: - conn = pymysql.connect(**MYSQL_CONF) - try: - with conn.cursor() as cur: - cur.execute(sql, params) - rows = cur.fetchall() - finally: - conn.close() - return pl.from_dicts(rows) - -def to_csv(df: pl.DataFrame) -> str: - buf = io.StringIO() - df.write_csv(buf) - return buf.getvalue() - -SQL_KPIS_DAILY = """ -SELECT DATE(invoice_date) AS d, - SUM(net_amount) AS revenue, - SUM(quantity) AS qty, - ROUND(100*SUM(net_amount - cost_amount)/NULLIF(SUM(net_amount),0), 2) AS gross_margin_pct, - ROUND(100*SUM(discount_amount)/NULLIF(SUM(gross_amount),0), 2) AS discount_pct -FROM fact_invoices -WHERE invoice_date BETWEEN %s AND %s -GROUP BY 1 -ORDER BY 1; -""" - -SQL_TOP_SEGMENTS = """ -SELECT {axis} AS key, - ANY_VALUE({label}) AS label, - SUM(net_amount) AS revenue, - SUM(quantity) AS qty, - ROUND(100*SUM(net_amount - cost_amount)/NULLIF(SUM(net_amount),0), 2) AS gross_margin_pct, - ROUND(100*(SUM(net_amount) - LAG(SUM(net_amount)) OVER(ORDER BY 1))/ - NULLIF(LAG(SUM(net_amount)) OVER(ORDER BY 1),0), 2) AS trend_30d -FROM fact_invoices -WHERE invoice_date BETWEEN DATE_SUB(%s, INTERVAL 60 DAY) AND %s -GROUP BY 1 -ORDER BY revenue DESC -LIMIT %s; -""" - -class AIClient: - def __init__(self, api_key: str): self.api_key = api_key - @retry(wait=wait_exponential(multiplier=1, min=1, max=20), stop=stop_after_attempt(6)) - def structured_analysis(self, prompt: str, schema: Dict[str, Any]) -> Dict[str, Any]: - # TODO: PODMIEŃ na realne wywołanie modelu z "Structured Outputs" - raise NotImplementedError("Wire your model SDK here") - - @retry(wait=wait_exponential(multiplier=1, min=1, max=20), stop=stop_after_attempt(6)) - def batch_submit(self, ndjson_lines: List[str]) -> str: - # TODO: PODMIEŃ na rzeczywiste Batch API - raise NotImplementedError - -def run_online(from_date: str, to_date: str, currency: str, axis: str, label: str, top_n: int, goal: str) -> Dict[str, Any]: - kpis = mysql_query(SQL_KPIS_DAILY, (from_date, to_date)) - top = mysql_query(SQL_TOP_SEGMENTS.format(axis=axis, label=label), (from_date, to_date, top_n)) - - csv_blocks = ("## kpis_daily\n" + to_csv(kpis) + "\n\n" + - "## top_segments\n" + to_csv(top)) - - with open(os.path.join(os.path.dirname(__file__), "sales-analysis.schema.json"), "r", encoding="utf-8") as f: - schema = json.load(f) - - prompt = f""" -Jesteś analitykiem sprzedaży. Otrzymasz: (a) kontekst, (b) dane. -Zwróć **wyłącznie** JSON zgodny ze schema. - -Kontekst: -- Waluta: {currency} -- Zakres: {from_date} → {to_date} -- Cel: {goal} -- Poziom segmentacji: {axis} - -Dane (CSV): -{csv_blocks} - -Wskazówki: -- Użyj danych jak są (nie wymyślaj liczb). -- W meta.scope wpisz opis zakresu i segmentacji. -- Jeśli brak anomalii – anomalies: []. -- Kwoty do 2 miejsc, procenty do 1. -""" - - ai = AIClient(AI_API_KEY) - result = ai.structured_analysis(prompt, schema) - - out_dir = os.path.join(os.path.dirname(__file__), "out") - os.makedirs(out_dir, exist_ok=True) - out_path = os.path.join(out_dir, f"{uuid.uuid4()}.json") - with open(out_path, "w", encoding="utf-8") as f: - json.dump(result, f, ensure_ascii=False) - return {"status": "ok", "path": out_path} - -def run_batch(from_date: str, to_date: str, axis: str, label: str): - # Zgodnie z blueprintem – generujemy linie NDJSON (skrót; pełny wariant w PDF) - # TODO: dodać realne wywołania batch_submit i zapisać ID/stan - raise NotImplementedError("Implement batch per blueprint") - -if __name__ == "__main__": - import argparse - p = argparse.ArgumentParser() - sub = p.add_subparsers(dest="cmd") - o = sub.add_parser("online") - o.add_argument("from_date"); o.add_argument("to_date"); o.add_argument("currency") - o.add_argument("axis", choices=["sku_id","client_id","region_code"]) - o.add_argument("label"); o.add_argument("top_n", type=int, nargs="?", default=50) - o.add_argument("goal") - b = sub.add_parser("batch") - b.add_argument("from_date"); b.add_argument("to_date"); b.add_argument("axis"); b.add_argument("label") - args = p.parse_args() - - if args.cmd == "online": - print(run_online(args.from_date, args.to_date, args.currency, args.axis, args.label, args.top_n, args.goal)) - elif args.cmd == "batch": - print(run_batch(args.from_date, args.to_date, args.axis, args.label)) - else: - p.print_help() diff --git a/modules/EcmInvoiceOuts/test.php b/modules/EcmInvoiceOuts/test.php index fc8dc317..06faf59f 100644 --- a/modules/EcmInvoiceOuts/test.php +++ b/modules/EcmInvoiceOuts/test.php @@ -1,16 +1,44 @@ &1', $output, $returnVar); - -if ($returnVar !== 0) { - http_response_code(500); - echo "Error running Python script:\n" . implode("\n", $output); - exit; +$bins = [ + '/var/www/venv/bin/python', + '/usr/bin/python3.11', + '/usr/bin/python3.10', + '/usr/bin/python3.9', + '/usr/local/bin/python3.11', + '/usr/local/bin/python3.10', + '/usr/local/bin/python3.9', + '/usr/bin/python3', + 'python3', +]; +foreach ($bins as $b) { + $out = []; $ret = 0; + exec("$b -V 2>&1", $out, $ret); + echo htmlspecialchars("$b -> ".($out ? implode(' ', $out) : "not found / not executable")." (ret=$ret)")."
"; } - -// Expect a single line with the count -echo trim(implode("\n", $output)); +//$python = '/usr/bin/python3'; +//$script = '/var/www/html/modules/EcmInvoiceOuts/ai/analysisAI.py'; +//$cmd = escapeshellcmd("$python $script"); +// +//// odczyt +//$output = []; +//$returnVar = 0; +//exec($cmd . ' 2>&1', $output, $returnVar); +//$body = implode("\n", $output); +// +//// błąd Pythona +//if ($returnVar !== 0) { +// // pokaż błąd jako tekst +// while (ob_get_level()) { ob_end_clean(); } +// header_remove(); +// header('Content-Type: text/plain; charset=utf-8'); +// http_response_code(500); +// echo "Error running Python script:\n".$body; +// exit; +//} +// +//// --- WYMUSZENIE RENDEROWANIA HTML --- +//while (ob_get_level()) { ob_end_clean(); } // wyczyść wszystkie bufory +//header_remove(); // usuń nagłówki ustawione wcześniej przez framework +//header('Content-Type: text/html; charset=utf-8'); +//echo $body; +//exit; // ZATRZYMAJ framework (np. SugarCRM), żeby nic już nie dopisywał