diff --git a/REST/functions.php b/REST/functions.php index 88c1d08d..357055cf 100644 --- a/REST/functions.php +++ b/REST/functions.php @@ -746,5 +746,6 @@ function exportToCSVFile($res, $fullpath, array $headers = null, $delimiter = '; } fclose($fp); - return ['ok'=>true, 'path'=>$fullpath, 'rows'=>$count, 'error'=>null]; + $chmod_ok = @chmod($fullpath, 0664); + return ['ok'=>true, 'path'=>$fullpath, 'rows'=>$count, 'chmod'=>$chmod_ok, 'error'=>null]; } \ No newline at end of file diff --git a/modules/EcmInvoiceOuts/ai/analysisAI.py b/modules/EcmInvoiceOuts/ai/analysisAI.py new file mode 100644 index 00000000..5fd6f295 --- /dev/null +++ b/modules/EcmInvoiceOuts/ai/analysisAI.py @@ -0,0 +1,713 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +analysisAI.py — pobiera dane z MySQL, liczy wyłącznie WSKAZANE preagregaty, +renderuje HTML i (opcjonalnie) dodaje analizę AI — tylko jeśli ją zaznaczysz. + +Parametry CLI (z formularza PHP): + --date-from YYYY-MM-DD + --date-to YYYY-MM-DD (zamieniane wewnętrznie na +1 dzień, bo SQL ma warunek '< date_to') + --metric NAZWA (można podać wiele razy: --metric a --metric b ...) + --metrics CSV (opcjonalnie alternatywnie: --metrics a,b,c) + --ai true|false (czy uruchomić analizę AI — tylko gdy są preagregaty z danymi) + +Preagregaty: + - kpis (aliasy: basic, basic_totals) — podstawowe KPI: sprzedaż, ilość, dokumenty, ASP + - daily_sales, product_summary, customer_summary, product_daily, + top10_products_by_sales, top10_customers_by_sales (z preaggregates.py) +""" + +import os, sys, json, math, time, warnings, argparse, traceback, html +from datetime import date, timedelta, datetime + +# (1) Wycisza ostrzeżenia urllib3 (LibreSSL / stary OpenSSL) +try: + from urllib3.exceptions import NotOpenSSLWarning + warnings.filterwarnings("ignore", category=NotOpenSSLWarning) +except Exception: + pass + +# (2) Importy zewnętrzne +import requests +import mysql.connector +import pandas as pd + +LOOKER_URL = "https://lookerstudio.google.com/u/0/reporting/107d4ccc-e7eb-4c38-8dce-00700b44f60e/page/ba1YF" + +# ========== KONFIGURACJA KLUCZA AI ========== +API_KEY = "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" +API_KEY_HARDCODE = API_KEY + +# === Import preagregatów === +from preaggregates import serialize_for_ai +import preaggregates as pre # pre.AGGREGATORS, pre.to_df + +# ========== UTILKI ========== + +def html_fatal(msg, title="Błąd"): + sys.stdout.write( + '
' + f'

{html.escape(title)}

' + f'
{html.escape(msg)}
' + '
' + ) + sys.exit(1) + +def connect_html_or_die(cfg, label="MySQL"): + try: + return mysql.connector.connect(**cfg) + except mysql.connector.Error as e: + host = cfg.get("host"); port = cfg.get("port"); user = cfg.get("user") + base = (f"[{label}] Błąd połączenia ({host}:{port} jako '{user}').\n" + f"errno={getattr(e,'errno',None)} sqlstate={getattr(e,'sqlstate',None)}\n" + f"msg={getattr(e,'msg',str(e))}") + if os.environ.get("DEBUG"): + base += "\n\n" + traceback.format_exc() + html_fatal(base, title="Błąd połączenia MySQL") + +def getenv(k, d=None): + return os.environ.get(k, d) + +def last_full_month_bounds(): + today_first = date.today().replace(day=1) + to_dt = today_first + prev_last = today_first - timedelta(days=1) + from_dt = prev_last.replace(day=1) + return from_dt.isoformat(), to_dt.isoformat() + +def add_one_day(iso_date): + try: + return (datetime.strptime(iso_date, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") + except Exception: + return iso_date # w razie czego oddaj wejście + +def safe_num(v, ndigits=None): + try: + f = float(v) + if not math.isfinite(f): + return None + return round(f, ndigits) if ndigits is not None else f + except Exception: + return None + +def safe_date(v): + if v is None: + return None + try: + if hasattr(v, "date"): + return str(v.date()) + s = str(v) + if len(s) >= 10 and s[4] == '-' and s[7] == '-': + return s[:10] + return s + except Exception: + return None + +def fmt_money(v): + try: + return "{:,.2f}".format(float(v)).replace(",", " ").replace(".", ",") + except Exception: + return str(v) + +def compact_table(table, limit=30): + out = [] + if not table: + return out + lim = int(limit) + for i, row in enumerate(table): + if i >= lim: break + new = {} + for k, v in row.items(): + if isinstance(v, float): + new[k] = round(v, 6) if math.isfinite(v) else None + else: + new[k] = v + out.append(new) + return out + +def call_openai_chat(api_key, model, system_prompt, user_payload_json, + temperature=0.3, connect_timeout=10, read_timeout=90, max_retries=3): + url = "https://api.openai.com/v1/chat/completions" + headers = {"Authorization": "Bearer " + api_key, "Content-Type": "application/json"} + body = { + "model": model, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": "Dane (JSON):\n\n" + user_payload_json}, + ], + "temperature": temperature, + } + last_err = None + for attempt in range(1, int(max_retries) + 1): + try: + r = requests.post(url, headers=headers, json=body, timeout=(connect_timeout, read_timeout)) + if 200 <= r.status_code < 300: + data = r.json() + return data.get("choices", [{}])[0].get("message", {}).get("content", "") + last_err = RuntimeError("OpenAI HTTP {}: {}".format(r.status_code, r.text)) + except requests.exceptions.RequestException as e: + last_err = e + time.sleep(min(2 ** attempt, 10)) + raise RuntimeError("OpenAI request failed: {}".format(last_err)) + +def html_table(records, title=None, max_rows=20): + if not records: + return '
Brak danych
' + cols = list(records[0].keys()) + body_rows = records[:max_rows] + thead = "".join("{}".format(c) for c in cols) + trs = [] + for r in body_rows: + tds = [] + for c in cols: + val = r.get(c, "") + if isinstance(val, (int, float)): + if any(x in c.lower() for x in ("sales", "total", "netto", "value", "asp", "qty", "quantity", "share", "change")): + tds.append('{}'.format(fmt_money(val) if "sales" in c.lower() or "total" in c.lower() or "netto" in c.lower() else val)) + else: + tds.append('{}'.format(val)) + else: + tds.append('{}'.format(val)) + trs.append("{}".format("".join(tds))) + cap = '
{}
'.format(title) if title else "" + return ( + cap + + '
' + '{}{}
'.format(thead, "".join(trs)) + ) + +def render_report_html(period_label, kpis, parts, ai_section, model_alias): + css = ( + "font-family:system-ui,-apple-system,Segoe UI,Roboto,Arial,sans-serif;" + "max-width:1200px;margin:24px auto;padding:16px 20px;border:1px solid #e5e7eb;" + "border-radius:12px;background:#fff;color:#111827" + ) + kpi_item = ( + '
{label}
' + '
{value}
' + ) + kpi_html = "".join(kpi_item.format(label=lbl, value=val) for (lbl, val) in kpis) + sections_html = "".join(parts) + if ai_section and not ai_section.lstrip().startswith(" +

Raport sprzedaży — {period_label}

+
+ {kpi_html} +
+ {sections_html if sections_html.strip() else '
Nie wybrano żadnych preagregatów — brak sekcji do wyświetlenia.
'} +
+

Analiza i rekomendacje{(' (AI · ' + model_alias + ')') if model_alias else ''}

+ {ai_section if ai_section else '
Analiza AI wyłączona lub brak danych.
'} +
+ + +
+ + → Otwórz pełny raport w Looker Studio + +
+ + +""" + + +# ========== UPSerTY DO REPORTING (jak u Ciebie) ========== + +def _ensure_rank_and_share(items, key_sales="sales"): + if not items: return + total_sales = sum((x.get(key_sales) or 0) for x in items) + sorted_items = sorted( + items, + key=lambda x: ((x.get(key_sales) or 0), str(x.get("product_code") or x.get("customer_name") or "")), + reverse=True + ) + rank_map, rank = {}, 1 + for x in sorted_items: + key = x.get("product_code") or x.get("customer_name") or "" + if key not in rank_map: + rank_map[key] = rank + rank += 1 + for x in items: + key = x.get("product_code") or x.get("customer_name") or "" + if not x.get("rank_in_period"): + x["rank_in_period"] = rank_map.get(key, 0) + if "mix_share_sales" not in x: + x["mix_share_sales"] = ((x.get(key_sales) or 0) / total_sales) if total_sales else 0.0 + +def upsert_daily_sales(cur, daily): + if not daily: return 0 + sql = """ + INSERT INTO reporting_daily_sales + (period_date, qty, sales, docs, asp, sales_rolling7, sales_dod_pct) + VALUES (%s,%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + qty=VALUES(qty), sales=VALUES(sales), docs=VALUES(docs), + asp=VALUES(asp), sales_rolling7=VALUES(sales_rolling7), sales_dod_pct=VALUES(sales_dod_pct), + generated_at=CURRENT_TIMESTAMP + """ + rows = [] + for r in daily: + period_date = safe_date(r.get("register_date") or r.get("period_date") or r.get("date")) + rows.append(( + period_date, + safe_num(r.get("qty")), + safe_num(r.get("sales")), + safe_num(r.get("docs")), + safe_num(r.get("asp"), 6), + safe_num(r.get("sales_rolling7"), 6), + safe_num(r.get("sales_pct_change_dod") or r.get("sales_dod_pct"), 6), + )) + cur.executemany(sql, rows) + return len(rows) + +def upsert_product_summary(cur, prod, period_from, period_to): + if not prod: return 0 + _ensure_rank_and_share(prod, key_sales="sales") + sql = """ + INSERT INTO reporting_product_summary + (period_start, period_end, product_code, product_name, qty, sales, docs, + asp_weighted, mix_share_sales, rank_in_period) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + qty=VALUES(qty), sales=VALUES(sales), docs=VALUES(docs), + asp_weighted=VALUES(asp_weighted), mix_share_sales=VALUES(mix_share_sales), + rank_in_period=VALUES(rank_in_period), generated_at=CURRENT_TIMESTAMP + """ + rows = [] + for r in prod: + rows.append(( + period_from, period_to, + r.get("product_code"), r.get("product_name"), + safe_num(r.get("qty")), + safe_num(r.get("sales")), + safe_num(r.get("docs")), + safe_num(r.get("asp_weighted"), 6), + safe_num(r.get("mix_share_sales"), 6), + int(r.get("rank_in_period") or 0), + )) + cur.executemany(sql, rows) + return len(rows) + +def upsert_customer_summary(cur, cust, period_from, period_to): + if not cust: return 0 + _ensure_rank_and_share(cust, key_sales="sales") + sql = """ + INSERT INTO reporting_customer_summary + (period_start, period_end, customer_name, qty, sales, docs, + asp_weighted, mix_share_sales, rank_in_period) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + qty=VALUES(qty), sales=VALUES(sales), docs=VALUES(docs), + asp_weighted=VALUES(asp_weighted), mix_share_sales=VALUES(mix_share_sales), + rank_in_period=VALUES(rank_in_period), generated_at=CURRENT_TIMESTAMP + """ + rows = [] + for r in cust: + rows.append(( + period_from, period_to, + r.get("customer_name"), + safe_num(r.get("qty")), + safe_num(r.get("sales")), + safe_num(r.get("docs")), + safe_num(r.get("asp_weighted"), 6), + safe_num(r.get("mix_share_sales"), 6), + int(r.get("rank_in_period") or 0), + )) + cur.executemany(sql, rows) + return len(rows) + +def upsert_product_daily(cur, prod_daily): + if not prod_daily: return 0 + sql = """ + INSERT INTO reporting_product_daily + (period_date, product_code, product_name, qty, sales, asp) + VALUES (%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + qty=VALUES(qty), sales=VALUES(sales), asp=VALUES(asp), + generated_at=CURRENT_TIMESTAMP + """ + rows = [] + for r in prod_daily: + period_date = safe_date(r.get("register_date") or r.get("period_date") or r.get("date")) + qty = safe_num(r.get("qty")) + sales = safe_num(r.get("sales")) + asp = safe_num((sales / qty) if (qty and sales is not None and qty != 0) else r.get("asp"), 6) + rows.append(( + period_date, + r.get("product_code"), + r.get("product_name"), + qty, sales, asp + )) + cur.executemany(sql, rows) + return len(rows) + +# ========== ARGPARSE & LOGIKA WYBORU ========== + +def parse_cli_args(): + p = argparse.ArgumentParser() + p.add_argument('--date-from', dest='date_from', required=False, help='YYYY-MM-DD') + p.add_argument('--date-to', dest='date_to', required=False, help='YYYY-MM-DD (inclusive, we add +1 day internally)') + # akceptuj obie formy: wielokrotne --metric oraz (opcjonalnie) --metrics CSV + p.add_argument('--metric', dest='metric', action='append', default=[], help='Nazwa preagregatu; można podać wiele razy') + p.add_argument('--metrics', dest='metrics', action='append', default=[], help='CSV: a,b,c (można podać wiele razy)') + p.add_argument('--ai', dest='ai', choices=['true','false'], default='false') + return p.parse_args() + +def collect_metric_names(args): + names = [] + # z --metric (powtarzalne) + if args.metric: + names.extend([s.strip() for s in args.metric if s and s.strip()]) + # z --metrics (może być kilka wystąpień; każde może być CSV) + for entry in (args.metrics or []): + if not entry: + continue + for part in str(entry).replace(';', ',').replace(' ', ',').split(','): + part = part.strip() + if part: + names.append(part) + # aliasy dla kpis + alias_map = {'basic': 'kpis', 'basic_totals': 'kpis'} + names = [alias_map.get(n, n) for n in names] + # deduplikacja z zachowaniem kolejności + seen = set() + uniq = [] + for n in names: + if n not in seen: + seen.add(n) + uniq.append(n) + return uniq + +def compute_selected_preaggs(rows, names): + """ + Liczy TYLKO wskazane preagregaty. ZAWSZE zwraca DataFrame'y (nigdy listy). + Obsługuje pseudo-agregat 'kpis' (podstawowe KPI). + """ + results = {} + if not names: + return results + df = pre.to_df(rows) + + # kpis — pseudoagregat + def compute_kpis_df(dfx): + if dfx is None or dfx.empty: + return pd.DataFrame([{ + "total_sales": 0.0, + "total_qty": 0.0, + "total_docs": 0, + "asp": None, + }]) + total_sales = float(dfx["total_netto"].sum()) + total_qty = float(dfx["quantity"].sum()) + total_docs = int(dfx["document_no"].nunique()) + asp = (total_sales / total_qty) if total_qty else None + return pd.DataFrame([{ + "total_sales": total_sales, + "total_qty": total_qty, + "total_docs": total_docs, + "asp": asp, + }]) + + for name in names: + if name == 'kpis': + results[name] = compute_kpis_df(df) + continue + + fn = pre.AGGREGATORS.get(name) + if not fn: + results[name] = pd.DataFrame() # nieznany agregat -> pusty + continue + try: + out = fn(df) + if out is None: + results[name] = pd.DataFrame() + elif hasattr(out, "copy"): + results[name] = out.copy() + else: + results[name] = pd.DataFrame(out) + except Exception: + # np. top10_* na pustych danych -> zwróć pusty wynik + results[name] = pd.DataFrame() + + return results + +def sanitize_serialized(serialized_dict): + """ + Jeśli jakikolwiek agregat zwrócił błąd (np. _error), zamieniamy na pustą listę. + """ + clean = {} + for k, records in (serialized_dict or {}).items(): + if not records: + clean[k] = [] + continue + if isinstance(records, list) and isinstance(records[0], dict) and ('_error' in records[0]): + clean[k] = [] + else: + clean[k] = records + return clean + +def has_any_rows(serialized_dict): + for records in (serialized_dict or {}).values(): + if records: # lista niepusta + return True + return False + +# ========== MAIN ========== + +def main(): + # --- CLI --- + args = parse_cli_args() + with_ai = (args.ai == 'true') + metric_names = collect_metric_names(args) + + # --- Daty: preferuj CLI; 'date_to' inkluzywne (dodajemy +1 dzień dla SQL '<') --- + if args.date_from and args.date_to: + period_from, period_to = args.date_from, add_one_day(args.date_to) + shown_label = "{} .. {}".format(args.date_from, args.date_to) + else: + env_from, env_to = getenv("PERIOD_FROM"), getenv("PERIOD_TO") + if env_from and env_to: + period_from, period_to = env_from, env_to + # label dla czytelności: to-1d + try: + to_label = (datetime.strptime(period_to, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d") + except Exception: + to_label = period_to + shown_label = "{} .. {}".format(period_from, to_label) + else: + period_from, period_to = last_full_month_bounds() + # label: poprzedni pełny miesiąc + try: + to_label = (datetime.strptime(period_to, "%Y-%m-%d") - timedelta(days=1)).strftime("%Y-%m-%d") + except Exception: + to_label = period_to + shown_label = "{} .. {}".format(period_from, to_label) + + # --- DB --- + cfg = { + "host": getenv("MYSQL_HOST", "twinpol-mysql56"), + "user": getenv("MYSQL_USER", "root"), + "password": getenv("MYSQL_PASSWORD", "rootpassword"), + "database": getenv("MYSQL_DATABASE", "preDb_0dcc87940d3655fa574b253df04ca1c3"), + "port": int(getenv("MYSQL_PORT", "3306")), + } + invoice_type = getenv("INVOICE_TYPE", "normal") + + # --- SQL -> rows (UWZGLĘDNIJ DATY; typ wg ENV) --- + try: + cnx = mysql.connector.connect(**cfg) + cur = cnx.cursor() + if invoice_type: + cur.execute( + """ + SELECT i.document_no, + i.parent_name, + DATE(i.register_date) AS register_date, + ii.code, + ii.name, + ii.quantity, + ii.total_netto + FROM ecminvoiceoutitems AS ii + JOIN ecminvoiceouts AS i ON i.id = ii.ecminvoiceout_id + WHERE i.register_date >= %s + AND i.register_date < %s + AND i.type = %s + """, + (period_from, period_to, invoice_type), + ) + else: + cur.execute( + """ + SELECT i.document_no, + i.parent_name, + DATE(i.register_date) AS register_date, + ii.code, + ii.name, + ii.quantity, + ii.total_netto + FROM ecminvoiceoutitems AS ii + JOIN ecminvoiceouts AS i ON i.id = ii.ecminvoiceout_id + WHERE i.register_date >= %s + AND i.register_date < %s + """, + (period_from, period_to), + ) + rows = cur.fetchall() + cur.close() + cnx.close() + except Exception as e: + html_fatal(str(e), title="Błąd połączenia/zapytania MySQL") + + # --- LICZ TYLKO WYBRANE PREAGREGATY (w tym pseudo 'kpis') --- + results = {} + serialized = {} + if metric_names: + results = compute_selected_preaggs(rows, metric_names) + serialized = serialize_for_ai(results) + serialized = sanitize_serialized(serialized) # usuń ewentualne _error -> traktuj jako puste + else: + serialized = {} + + # --- ZAPIS do reporting (tylko to, co faktycznie policzyłeś) --- + try: + if serialized: + rep_cfg = { + "host": "host.docker.internal", + "port": 3307, + "user": "remote", + "password": os.environ.get("REPORTING_PASSWORD", "areiufh*&^yhdua"), + "database": "ai", + } + if os.environ.get("REPORTING_SSL_CA"): + rep_cfg["ssl_ca"] = os.environ["REPORTING_SSL_CA"] + if os.environ.get("REPORTING_SSL_CERT"): + rep_cfg["ssl_cert"] = os.environ["REPORTING_SSL_CERT"] + if os.environ.get("REPORTING_SSL_KEY"): + rep_cfg["ssl_key"] = os.environ["REPORTING_SSL_KEY"] + + cnx2 = connect_html_or_die(rep_cfg, label="ReportingDB") + cur2 = cnx2.cursor() + + if "daily_sales" in serialized: + upsert_daily_sales(cur2, serialized.get("daily_sales") or []) + if "product_summary" in serialized: + upsert_product_summary(cur2, serialized.get("product_summary") or [], period_from, period_to) + if "customer_summary" in serialized: + upsert_customer_summary(cur2, serialized.get("customer_summary") or [], period_from, period_to) + if "product_daily" in serialized: + upsert_product_daily(cur2, serialized.get("product_daily") or []) + + cnx2.commit() + cur2.close(); cnx2.close() + except Exception as e: + sys.stderr.write(f"[reporting] ERROR: {e}\n") + + # --- KPI: jeśli wybrano 'kpis' -> bierz z wyników; w przeciwnym razie spróbuj z daily_sales; inaczej zera --- + kpis = [] + if "kpis" in results and isinstance(results["kpis"], pd.DataFrame) and not results["kpis"].empty: + r = results["kpis"].iloc[0] + total_sales = r.get("total_sales") or 0 + total_qty = r.get("total_qty") or 0 + total_docs = r.get("total_docs") or 0 + asp = r.get("asp") + else: + daily = serialized.get("daily_sales") or [] + total_sales = sum((x.get("sales") or 0) for x in daily) if daily else 0 + total_qty = sum((x.get("qty") or 0) for x in daily) if daily else 0 + total_docs = sum((x.get("docs") or 0) for x in daily) if daily else 0 + asp = (total_sales / total_qty) if total_qty else None + + kpis = [ + ("Sprzedaż (PLN)", fmt_money(total_sales)), + ("Ilość (szt.)", "{:,.0f}".format(total_qty).replace(",", " ")), + ("Dokumenty", "{:,.0f}".format(total_docs).replace(",", " ")), + ("ASP (PLN/szt.)", fmt_money(asp) if asp is not None else "—"), + ] + + # --- Sekcje HTML: renderuj tylko te, które policzyłeś --- + parts = [] + if "top10_products_by_sales" in serialized: + parts.append(html_table(serialized.get("top10_products_by_sales") or [], title="Top 10 produktów (po sprzedaży)", max_rows=10)) + if "top10_customers_by_sales" in serialized: + parts.append(html_table(serialized.get("top10_customers_by_sales") or [], title="Top 10 klientów (po sprzedaży)", max_rows=10)) + if "daily_sales" in serialized: + parts.append(html_table(serialized.get("daily_sales") or [], title="Sprzedaż dzienna (skrót)", max_rows=30)) + if "product_summary" in serialized: + parts.append(html_table(serialized.get("product_summary") or [], title="Podsumowanie produktów (skrót)", max_rows=30)) + if "customer_summary" in serialized: + parts.append(html_table(serialized.get("customer_summary") or [], title="Podsumowanie klientów (skrót)", max_rows=30)) + if "product_daily" in serialized: + parts.append(html_table(serialized.get("product_daily") or [], title="Produkt × Dzień (próbka)", max_rows=30)) + + # --- AI tylko gdy: --ai true ORAZ jest co najmniej jeden rekord w którymś z wybranych agregatów --- + api_key = API_KEY_HARDCODE or getenv("OPENAI_API_KEY", "") + model = getenv("OPENAI_MODEL", "gpt-4.1") + MODEL_ALIAS = { + "gpt-4.1": "GPT-4.1", + "gpt-4.1-mini": "GPT-4.1-mini", + "gpt-4o": "GPT-4o", + "gpt-4o-mini": "GPT-4o-mini", + } + model_alias = MODEL_ALIAS.get(model, model) + + ai_section = "" + if with_ai and has_any_rows(serialized): + try: + ai_data = {"kpis_hint": {"period_label": shown_label}} + for name, records in serialized.items(): + ai_data[name] = compact_table(records, 100) + ai_json = json.dumps(ai_data, ensure_ascii=False, separators=(",", ":"), default=str) + + ai_section = call_openai_chat( + api_key=(api_key or ""), + model=model, + system_prompt=("Jesteś analitykiem sprzedaży. Zwróć TYLKO jedną sekcję HTML (bez //). " + "Streszcz kluczowe trendy i daj 3–6 zaleceń. Po polsku."), + user_payload_json=ai_json, + temperature=0.3, + connect_timeout=10, + read_timeout=90, + max_retries=3, + ) + except Exception as e: + err = str(e) + if "insufficient_quota" in err or "You exceeded your current quota" in err: + try: + ai_section = call_openai_chat( + api_key=(api_key or ""), + model="gpt-4.1-mini", + system_prompt=("Jesteś analitykiem sprzedaży. Zwróć TYLKO jedną sekcję HTML (bez //). " + "Streszcz kluczowe trendy i daj 3–6 zaleceń. Po polsku."), + user_payload_json=ai_json, + temperature=0.3, + connect_timeout=10, + read_timeout=90, + max_retries=2, + ) + model_alias = "GPT-4.1-mini" + except Exception as ee: + ai_section = ( + '
Brak dostępnego limitu API. {}
'.format(str(ee)) + ) + else: + ai_section = ( + '
Błąd wywołania AI: {}
'.format(err) + ) + else: + ai_section = '
Analiza AI wyłączona lub brak wybranych danych.
' + model_alias = "" + + # --- Finalny HTML --- + report_html = render_report_html( + period_label=shown_label, + kpis=kpis, + parts=parts, + ai_section=ai_section, + model_alias=(model_alias if (with_ai and has_any_rows(serialized)) else "") + ) + sys.stdout.write(report_html) + +if __name__ == "__main__": + main() diff --git a/modules/EcmInvoiceOuts/ai/enqueue.php b/modules/EcmInvoiceOuts/ai/enqueue.php deleted file mode 100644 index d8b10d73..00000000 --- a/modules/EcmInvoiceOuts/ai/enqueue.php +++ /dev/null @@ -1,21 +0,0 @@ - $id]); diff --git a/modules/EcmInvoiceOuts/ai/preaggregates.py b/modules/EcmInvoiceOuts/ai/preaggregates.py new file mode 100644 index 00000000..b000f06b --- /dev/null +++ b/modules/EcmInvoiceOuts/ai/preaggregates.py @@ -0,0 +1,150 @@ +# --- preagg.py --------------------------------------------------------------- +from __future__ import annotations +import pandas as pd +import numpy as np +from typing import Callable, Dict, List + +# Rejestr agregatorów: name -> funkcja(df) -> DataFrame +# AGGREGATORS: Dict[str, Callable[[pd.DataFrame], pd.DataFrame]] = {} +AGGREGATORS = {} +def aggregator(name: str): + """Dekorator do łatwego rejestrowania nowych agregatorów.""" + def _wrap(func: Callable[[pd.DataFrame], pd.DataFrame]): + AGGREGATORS[name] = func + return func + return _wrap + +def to_df(rows: List[tuple]) -> pd.DataFrame: + """Konwersja rows -> DataFrame (dopasuj nazwy kolumn do SELECT-a).""" + cols = [ + "document_no", + "customer_name", # i.parent_name + "register_date", # DATE(i.register_date) + "product_code", # ii.code + "product_name", # ii.name + "quantity", # ii.quantity + "total_netto", # ii.total_netto (wartość sprzedaży netto) + ] + df = pd.DataFrame(rows, columns=cols) + if df.empty: + return df + # Typy + df["register_date"] = pd.to_datetime(df["register_date"]) + df["quantity"] = pd.to_numeric(df["quantity"], errors="coerce").fillna(0.0) + df["total_netto"] = pd.to_numeric(df["total_netto"], errors="coerce").fillna(0.0) + # ASP (Average Selling Price) – średnia cena pozycji + # Uwaga: ASP lepiej liczyć ważoną średnią w agregatach; tu to „unit price” na pozycji. + df["unit_price"] = np.where(df["quantity"] != 0, df["total_netto"] / df["quantity"], np.nan) + return df + +# ------------------- Wbudowane agregatory (możesz dopisywać kolejne) ------------------- + +@aggregator("daily_sales") +def daily_sales(df: pd.DataFrame) -> pd.DataFrame: + """Dzienna sprzedaż: ilość, wartość, liczba dokumentów, ASP ważony.""" + if df.empty: + return df + g = df.groupby(pd.Grouper(key="register_date", freq="D")) + out = g.agg( + qty=("quantity", "sum"), + sales=("total_netto", "sum"), + docs=("document_no", "nunique"), + ).reset_index() + # ASP ważony (sales / qty) + out["asp"] = np.where(out["qty"] != 0, out["sales"] / out["qty"], np.nan) + # Zmiana d/d + out["sales_pct_change_dod"] = out["sales"].pct_change() + # Rolling 7 + out["sales_rolling7"] = out["sales"].rolling(7, min_periods=1).mean() + return out + +@aggregator("product_summary") +def product_summary(df: pd.DataFrame) -> pd.DataFrame: + """Podsumowanie po produkcie.""" + if df.empty: + return df + g = df.groupby(["product_code", "product_name"], as_index=False).agg( + qty=("quantity", "sum"), + sales=("total_netto", "sum"), + docs=("document_no", "nunique"), + ) + g["asp_weighted"] = np.where(g["qty"] != 0, g["sales"] / g["qty"], np.nan) + # Udział w koszyku (mix % po wartości) + total_sales = g["sales"].sum() + g["mix_share_sales"] = np.where(total_sales > 0, g["sales"] / total_sales, 0.0) + return g.sort_values("sales", ascending=False) + +@aggregator("customer_summary") +def customer_summary(df: pd.DataFrame) -> pd.DataFrame: + """Podsumowanie po kliencie.""" + if df.empty: + return df + g = df.groupby(["customer_name"], as_index=False).agg( + qty=("quantity", "sum"), + sales=("total_netto", "sum"), + docs=("document_no", "nunique"), + distinct_products=("product_code", "nunique"), + ) + g["asp_weighted"] = np.where(g["qty"] != 0, g["sales"] / g["qty"], np.nan) + return g.sort_values("sales", ascending=False) + +@aggregator("product_daily") +def product_daily(df: pd.DataFrame) -> pd.DataFrame: + """Dzienna sprzedaż per produkt (przydatne do trendów/rollingów w AI).""" + if df.empty: + return df + g = (df + .groupby([pd.Grouper(key="register_date", freq="D"), "product_code", "product_name"], as_index=False) + .agg(qty=("quantity", "sum"), + sales=("total_netto", "sum"))) + # Rolling 7 per produkt + g = g.sort_values(["product_code", "register_date"]) + g["sales_rolling7"] = g.groupby("product_code")["sales"].transform(lambda s: s.rolling(7, min_periods=1).mean()) + g["sales_pct_change_dod"] = g.groupby("product_code")["sales"].pct_change() + return g + +@aggregator("top10_products_by_sales") +def top10_products_by_sales(df: pd.DataFrame) -> pd.DataFrame: + """Top 10 produktów po wartości sprzedaży (okres z wejścia).""" + base = AGGREGATORS["product_summary"](df) + return base.nlargest(10, "sales") + +@aggregator("top10_customers_by_sales") +def top10_customers_by_sales(df: pd.DataFrame) -> pd.DataFrame: + """Top 10 klientów po wartości sprzedaży.""" + base = AGGREGATORS["customer_summary"](df) + return base.nlargest(10, "sales") + +# ------------------- Runner ------------------- + +def compute_preaggregates(rows: List[tuple]) -> dict[str, pd.DataFrame]: +#def compute_preaggregates(rows): + """Główny punkt wejścia: rows -> df -> uruchom wszystkie agregatory.""" + df = to_df(rows) +# results: dict[str, pd.DataFrame] = {} + results = {} + for name, fn in AGGREGATORS.items(): + try: + results[name] = fn(df).copy() + except Exception as e: + # Niech agregat nie wysadza całości – zapisz pusty DF + info + results[name] = pd.DataFrame({"_error": [str(e)], "_aggregator": [name]}) + return results + +def serialize_for_ai(results: dict[str, pd.DataFrame]) -> dict[str, list[dict]]: + """ + Konwersja wyników do lekkiego JSON-a (listy rekordów), + który łatwo przekazać do modelu AI lub zapisać do pliku. + """ +# out: dict[str, list[dict]] = {} + out = {} + for name, df in results.items(): + if df is None or df.empty: + out[name] = [] + else: + # zaokrąglij liczby dla czytelności (opcjonalnie) + df2 = df.copy() + for c in df2.select_dtypes(include=[np.number]).columns: + df2[c] = df2[c].round(6) + out[name] = df2.to_dict(orient="records") + return out diff --git a/modules/EcmInvoiceOuts/ai/result.php b/modules/EcmInvoiceOuts/ai/result.php deleted file mode 100644 index 81a59317..00000000 --- a/modules/EcmInvoiceOuts/ai/result.php +++ /dev/null @@ -1,12 +0,0 @@ - pl.DataFrame: - conn = pymysql.connect(**MYSQL_CONF) - try: - with conn.cursor() as cur: - cur.execute(sql, params) - rows = cur.fetchall() - finally: - conn.close() - return pl.from_dicts(rows) - -def to_csv(df: pl.DataFrame) -> str: - buf = io.StringIO() - df.write_csv(buf) - return buf.getvalue() - -SQL_KPIS_DAILY = """ -SELECT DATE(invoice_date) AS d, - SUM(net_amount) AS revenue, - SUM(quantity) AS qty, - ROUND(100*SUM(net_amount - cost_amount)/NULLIF(SUM(net_amount),0), 2) AS gross_margin_pct, - ROUND(100*SUM(discount_amount)/NULLIF(SUM(gross_amount),0), 2) AS discount_pct -FROM fact_invoices -WHERE invoice_date BETWEEN %s AND %s -GROUP BY 1 -ORDER BY 1; -""" - -SQL_TOP_SEGMENTS = """ -SELECT {axis} AS key, - ANY_VALUE({label}) AS label, - SUM(net_amount) AS revenue, - SUM(quantity) AS qty, - ROUND(100*SUM(net_amount - cost_amount)/NULLIF(SUM(net_amount),0), 2) AS gross_margin_pct, - ROUND(100*(SUM(net_amount) - LAG(SUM(net_amount)) OVER(ORDER BY 1))/ - NULLIF(LAG(SUM(net_amount)) OVER(ORDER BY 1),0), 2) AS trend_30d -FROM fact_invoices -WHERE invoice_date BETWEEN DATE_SUB(%s, INTERVAL 60 DAY) AND %s -GROUP BY 1 -ORDER BY revenue DESC -LIMIT %s; -""" - -class AIClient: - def __init__(self, api_key: str): self.api_key = api_key - @retry(wait=wait_exponential(multiplier=1, min=1, max=20), stop=stop_after_attempt(6)) - def structured_analysis(self, prompt: str, schema: Dict[str, Any]) -> Dict[str, Any]: - # TODO: PODMIEŃ na realne wywołanie modelu z "Structured Outputs" - raise NotImplementedError("Wire your model SDK here") - - @retry(wait=wait_exponential(multiplier=1, min=1, max=20), stop=stop_after_attempt(6)) - def batch_submit(self, ndjson_lines: List[str]) -> str: - # TODO: PODMIEŃ na rzeczywiste Batch API - raise NotImplementedError - -def run_online(from_date: str, to_date: str, currency: str, axis: str, label: str, top_n: int, goal: str) -> Dict[str, Any]: - kpis = mysql_query(SQL_KPIS_DAILY, (from_date, to_date)) - top = mysql_query(SQL_TOP_SEGMENTS.format(axis=axis, label=label), (from_date, to_date, top_n)) - - csv_blocks = ("## kpis_daily\n" + to_csv(kpis) + "\n\n" + - "## top_segments\n" + to_csv(top)) - - with open(os.path.join(os.path.dirname(__file__), "sales-analysis.schema.json"), "r", encoding="utf-8") as f: - schema = json.load(f) - - prompt = f""" -Jesteś analitykiem sprzedaży. Otrzymasz: (a) kontekst, (b) dane. -Zwróć **wyłącznie** JSON zgodny ze schema. - -Kontekst: -- Waluta: {currency} -- Zakres: {from_date} → {to_date} -- Cel: {goal} -- Poziom segmentacji: {axis} - -Dane (CSV): -{csv_blocks} - -Wskazówki: -- Użyj danych jak są (nie wymyślaj liczb). -- W meta.scope wpisz opis zakresu i segmentacji. -- Jeśli brak anomalii – anomalies: []. -- Kwoty do 2 miejsc, procenty do 1. -""" - - ai = AIClient(AI_API_KEY) - result = ai.structured_analysis(prompt, schema) - - out_dir = os.path.join(os.path.dirname(__file__), "out") - os.makedirs(out_dir, exist_ok=True) - out_path = os.path.join(out_dir, f"{uuid.uuid4()}.json") - with open(out_path, "w", encoding="utf-8") as f: - json.dump(result, f, ensure_ascii=False) - return {"status": "ok", "path": out_path} - -def run_batch(from_date: str, to_date: str, axis: str, label: str): - # Zgodnie z blueprintem – generujemy linie NDJSON (skrót; pełny wariant w PDF) - # TODO: dodać realne wywołania batch_submit i zapisać ID/stan - raise NotImplementedError("Implement batch per blueprint") - -if __name__ == "__main__": - import argparse - p = argparse.ArgumentParser() - sub = p.add_subparsers(dest="cmd") - o = sub.add_parser("online") - o.add_argument("from_date"); o.add_argument("to_date"); o.add_argument("currency") - o.add_argument("axis", choices=["sku_id","client_id","region_code"]) - o.add_argument("label"); o.add_argument("top_n", type=int, nargs="?", default=50) - o.add_argument("goal") - b = sub.add_parser("batch") - b.add_argument("from_date"); b.add_argument("to_date"); b.add_argument("axis"); b.add_argument("label") - args = p.parse_args() - - if args.cmd == "online": - print(run_online(args.from_date, args.to_date, args.currency, args.axis, args.label, args.top_n, args.goal)) - elif args.cmd == "batch": - print(run_batch(args.from_date, args.to_date, args.axis, args.label)) - else: - p.print_help() diff --git a/modules/EcmInvoiceOuts/report_form.php b/modules/EcmInvoiceOuts/report_form.php new file mode 100644 index 00000000..7697d5b4 --- /dev/null +++ b/modules/EcmInvoiceOuts/report_form.php @@ -0,0 +1,177 @@ +&1', $output, $returnVar); + + $ran = true; + $rc = $returnVar; + $out = implode("\n", $output); + $ok = ($returnVar === 0); + + if (!$ok && $err === '') { + $err = "Błąd uruchamiania skryptu Python (kod: " . $rc . "):\n" . $out; + } + } +} +?> + + + + + Generator raportu sprzedaży + + + + + +

Raport sprzedaży — parametry

+ +
+ +
+ Zakres dat +
+
+ +
+
+ +
+
+
+ + +
+ Preagregaty do analizy + + + + + + +
+ + +
+ Analiza AI + +
+ + +
+ + +
+

Użyte parametry

+

+ Od: + Do: + AI: +

+

Preagregaty: + '.h($p).''; + } + } else { + echo 'brak'; + } + ?> +

+ +

Wynik analizy

+ +
+ + +
+ + + + + diff --git a/modules/EcmInvoiceOuts/test.php b/modules/EcmInvoiceOuts/test.php index fc8dc317..d3ba879e 100644 --- a/modules/EcmInvoiceOuts/test.php +++ b/modules/EcmInvoiceOuts/test.php @@ -1,16 +1,28 @@ &1', $output, $returnVar); +$body = implode("\n", $output); +// błąd Pythona if ($returnVar !== 0) { + // pokaż błąd jako tekst + while (ob_get_level()) { ob_end_clean(); } + header_remove(); + header('Content-Type: text/plain; charset=utf-8'); http_response_code(500); - echo "Error running Python script:\n" . implode("\n", $output); + echo "Error running Python script:\n".$body; exit; } -// Expect a single line with the count -echo trim(implode("\n", $output)); +// --- WYMUSZENIE RENDEROWANIA HTML --- +while (ob_get_level()) { ob_end_clean(); } // wyczyść wszystkie bufory +header_remove(); // usuń nagłówki ustawione wcześniej przez framework +header('Content-Type: text/html; charset=utf-8'); +echo $body; +exit; // ZATRZYMAJ framework (np. SugarCRM), żeby nic już nie dopisywał