Files
crm.twinpol.com/modules/EcmInvoiceOuts/ai/analysisAI.py
2025-09-07 20:41:59 +02:00

355 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
analysisAI.py — pobiera dane z MySQL, liczy preagregaty, renderuje HTML i dodaje analizę AI.
ZMIENNE ŚRODOWISKOWE (wszystkie mają domyślne wartości):
OPENAI_API_KEY - klucz do OpenAI (gdy pusty -> fallback bez AI)
OPENAI_MODEL - np. gpt-4.1 (domyślnie), alternatywnie gpt-4.1-mini
MYSQL_HOST - host MySQL (domyślnie: twinpol-mysql56 lub localhost)
MYSQL_USER - użytkownik MySQL (domyślnie: root)
MYSQL_PASSWORD - hasło MySQL (domyślnie: rootpassword)
MYSQL_DATABASE - nazwa bazy (domyślnie: preDb_0dcc87940d3655fa574b253df04ca1c3)
MYSQL_PORT - port MySQL (domyślnie: 3306)
PERIOD_FROM - data od (YYYY-MM-DD); gdy brak -> poprzedni pełny miesiąc
PERIOD_TO - data do (YYYY-MM-DD, exclusive); gdy brak -> 1. dzień bieżącego miesiąca
INVOICE_TYPE - typ dokumentu (domyślnie: normal)
"""
import os, sys, json, math, time, warnings
from datetime import date, timedelta
API_KEY = "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"
#5 pro
#API_KEY = "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"
# ──(1) Wycisz ostrzeżenia urllib3 (LibreSSL / stary OpenSSL) ───────────────────
try:
from urllib3.exceptions import NotOpenSSLWarning
warnings.filterwarnings("ignore", category=NotOpenSSLWarning)
except Exception:
pass
# ──(2) Importy zewnętrzne ──────────────────────────────────────────────────────
import requests
import mysql.connector
# Twoje preagregaty (muszą być w tym samym katalogu / PYTHONPATH)
from preaggregates import compute_preaggregates, serialize_for_ai
# ──(3) Konfiguracja klucza AI ──────────────────────────────────────────────────
# Wpisz tutaj klucz jeśli chcesz mieć go „na sztywno”, inaczej zostaw pusty:
API_KEY_HARDCODE = API_KEY # np. "sk-xxxx..." (NIEZALECANE w produkcji)
# ──(4) Utils ───────────────────────────────────────────────────────────────────
def getenv(k, d=None):
return os.environ.get(k, d)
def last_full_month_bounds():
"""Zwraca (from_iso, to_iso) dla poprzedniego pełnego miesiąca."""
today_first = date.today().replace(day=1)
to_dt = today_first
prev_last = today_first - timedelta(days=1)
from_dt = prev_last.replace(day=1)
return from_dt.isoformat(), to_dt.isoformat()
def compact_table(table, limit=30):
"""Przytnij listę rekordów (list[dict]) i znormalizuj liczby (NaN/Inf -> None)."""
out = []
if not table:
return out
lim = int(limit)
for i, row in enumerate(table):
if i >= lim: break
new = {}
for k, v in row.items():
if isinstance(v, float):
new[k] = round(v, 6) if math.isfinite(v) else None
else:
new[k] = v
out.append(new)
return out
def build_ai_payload(serialized, period_label):
"""Kompaktowy JSON do AI (rozmiar przycięty, ale zawiera wszystkie główne tabele)."""
return {
"kpis_hint": {"period_label": period_label},
"daily_sales": compact_table(serialized.get("daily_sales"), 60),
"product_summary": compact_table(serialized.get("product_summary"), 100),
"customer_summary": compact_table(serialized.get("customer_summary"), 100),
"top10_products_by_sales": compact_table(serialized.get("top10_products_by_sales"), 10),
"top10_customers_by_sales": compact_table(serialized.get("top10_customers_by_sales"), 10),
"product_daily_sample": compact_table(serialized.get("product_daily"), 100),
}
def call_openai_chat(api_key, model, system_prompt, user_payload_json,
temperature=0.3, connect_timeout=10, read_timeout=90, max_retries=3):
"""Wywołanie Chat Completions (retry + backoff). Zwraca HTML (sekcję) od AI."""
url = "https://api.openai.com/v1/chat/completions"
headers = {"Authorization": "Bearer " + api_key, "Content-Type": "application/json"}
body = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Dane (JSON):\n\n" + user_payload_json},
],
"temperature": temperature,
# "max_tokens": 1200, # opcjonalnie ogranicz długość odpowiedzi
}
last_err = None
for attempt in range(1, int(max_retries) + 1):
try:
r = requests.post(url, headers=headers, json=body, timeout=(connect_timeout, read_timeout))
if 200 <= r.status_code < 300:
data = r.json()
return data.get("choices", [{}])[0].get("message", {}).get("content", "")
last_err = RuntimeError("OpenAI HTTP {}: {}".format(r.status_code, r.text))
except requests.exceptions.RequestException as e:
last_err = e
time.sleep(min(2 ** attempt, 10))
raise RuntimeError("OpenAI request failed: {}".format(last_err))
def fmt_money(v):
try:
return "{:,.2f}".format(float(v)).replace(",", " ").replace(".", ",")
except Exception:
return str(v)
def html_table(records, title=None, max_rows=20):
"""Proste generowanie tabeli HTML z listy dict-ów (lekki CSS inline w <style> poniżej)."""
if not records:
return '<div class="empty">Brak danych</div>'
cols = list(records[0].keys())
body_rows = records[:max_rows]
thead = "".join("<th>{}</th>".format(c) for c in cols)
trs = []
for r in body_rows:
tds = []
for c in cols:
val = r.get(c, "")
if isinstance(val, (int, float)):
if any(x in c.lower() for x in ("sales", "total", "netto", "value", "asp", "qty", "quantity", "share", "change")):
tds.append('<td class="num">{}</td>'.format(fmt_money(val) if "sales" in c.lower() or "total" in c.lower() or "netto" in c.lower() else val))
else:
tds.append('<td class="num">{}</td>'.format(val))
else:
tds.append('<td>{}</td>'.format(val))
trs.append("<tr>{}</tr>".format("".join(tds)))
cap = '<div class="tbl-title">{}</div>'.format(title) if title else ""
return (
cap +
'<div class="tbl-wrap"><table class="tbl">'
'<thead><tr>{}</tr></thead><tbody>{}</tbody></table></div>'.format(thead, "".join(trs))
)
def render_report_html(period_label, kpis, parts, ai_section, model_alias):
"""Składa finalny jeden <div> z lekkim CSS inline."""
css = (
"font-family:system-ui,-apple-system,Segoe UI,Roboto,Arial,sans-serif;"
"max-width:1200px;margin:24px auto;padding:16px 20px;border:1px solid #e5e7eb;"
"border-radius:12px;background:#fff;color:#111827"
)
kpi_item = (
'<div class="kpi"><div class="kpi-label">{label}</div>'
'<div class="kpi-value">{value}</div></div>'
)
kpi_html = "".join(kpi_item.format(label=lbl, value=val) for (lbl, val) in kpis)
sections_html = "".join(parts)
if ai_section and not ai_section.lstrip().startswith("<div"):
ai_section = '<div class="ai-section">{}</div>'.format(ai_section)
return f"""
<div style="{css}">
<h2 style="margin:0 0 12px;font-size:22px;">Raport sprzedaży — {period_label}</h2>
<div style="display:grid;grid-template-columns:repeat(4,minmax(0,1fr));gap:12px;margin:12px 0 20px;">
{kpi_html}
</div>
{sections_html}
<div style="margin-top:20px;border-top:1px solid #e5e7eb;padding-top:16px;">
<h3 style="margin:0 0 8px;font-size:18px;">Analiza i rekomendacje (AI{(' · ' + model_alias) if model_alias else ''})</h3>
{ai_section if ai_section else '<div style="color:#6b7280">Brak odpowiedzi AI</div>'}
</div>
</div>
<style>
.kpi {{background:#f8fafc;border:1px solid #e5e7eb;border-radius:10px;padding:12px;}}
.kpi-label {{font-size:12px;color:#6b7280;margin-bottom:4px;}}
.kpi-value {{font-size:18px;font-weight:700;}}
.tbl-title {{font-weight:600;margin:16px 0 8px;font-size:15px;}}
.tbl-wrap {{overflow-x:auto;border:1px solid #e5e7eb;border-radius:8px;}}
.tbl {{border-collapse:collapse;width:100%;font-size:14px;}}
.tbl thead th {{text-align:left;background:#f3f4f6;padding:8px;border-bottom:1px solid #e5e7eb;white-space:nowrap;}}
.tbl tbody td {{padding:8px;border-bottom:1px solid #f3f4f6;vertical-align:top;}}
.tbl td.num {{text-align:right;white-space:nowrap;}}
.empty {{color:#6b7280;font-style:italic;margin:8px 0;}}
.ai-section {{background:#f8fafc;border:1px solid #e5e7eb;border-radius:10px;padding:12px;}}
</style>
"""
# ──(5) Główna logika ───────────────────────────────────────────────────────────
def main():
# Konfiguracja DB
cfg = {
"host": getenv("MYSQL_HOST", "twinpol-mysql56"),
"user": getenv("MYSQL_USER", "root"),
"password": getenv("MYSQL_PASSWORD", "rootpassword"),
"database": getenv("MYSQL_DATABASE", "preDb_0dcc87940d3655fa574b253df04ca1c3"),
"port": int(getenv("MYSQL_PORT", "3306")),
}
# Zakres dat
period_from = getenv("PERIOD_FROM")
period_to = getenv("PERIOD_TO")
if not period_from or not period_to:
period_from, period_to = last_full_month_bounds()
period_label = "{} .. {}".format(period_from, period_to)
invoice_type = getenv("INVOICE_TYPE", "normal")
# Konfiguracja AI (model do API + alias do UI)
api_key = API_KEY_HARDCODE or getenv("OPENAI_API_KEY", "")
model = getenv("OPENAI_MODEL", "gpt-4.1")
MODEL_ALIAS = {
"gpt-4.1": "GPT-4.1",
"gpt-4.1-mini": "GPT-4.1-mini",
"gpt-4o": "GPT-4o",
"gpt-4o-mini": "GPT-4o-mini",
}
model_alias = MODEL_ALIAS.get(model, model)
system_prompt = (
"Jesteś analitykiem sprzedaży. Zwróć TYLKO jedną sekcję HTML (bez <html>/<head>/<body>), "
"może być pojedynczy <div> z nagłówkami i listami. Podsumuj kluczowe trendy (dzień, mix), wskaż top produkty/klientów, "
"anomalia/odchylenia oraz daj 36 praktycznych rekomendacji dla sprzedaży/zaopatrzenia/marketingu. Krótko i konkretnie, po polsku."
)
# SQL -> rows
try:
cnx = mysql.connector.connect(**cfg)
cur = cnx.cursor()
cur.execute(
"""
SELECT i.document_no,
i.parent_name,
DATE(i.register_date) AS register_date,
ii.code,
ii.name,
ii.quantity,
ii.total_netto
FROM ecminvoiceoutitems AS ii
JOIN ecminvoiceouts AS i ON i.id = ii.ecminvoiceout_id
WHERE i.register_date >= %s
AND i.register_date < %s
AND i.type = %s
""",
(period_from, period_to, invoice_type),
)
rows = cur.fetchall()
cur.close()
cnx.close()
except Exception as e:
sys.stdout.write(
'<div style="font-family:system-ui,-apple-system,Segoe UI,Roboto,Arial,sans-serif;'
'max-width:900px;margin:24px auto;padding:16px 20px;border:1px solid #fecaca;'
'border-radius:12px;background:#fff5f5;color:#991b1b;">'
'<h3 style="margin:0 0 8px;font-size:18px;">Błąd połączenia/zapytania MySQL</h3>'
f'<p style="margin:0;">{str(e)}</p></div>'
)
sys.exit(1)
# Preagregaty
try:
results = compute_preaggregates(rows)
serialized = serialize_for_ai(results)
except Exception as e:
sys.stdout.write(
'<div style="font-family:system-ui,-apple-system,Segoe UI,Roboto,Arial,sans-serif;'
'max-width:900px;margin:24px auto;padding:16px 20px;border:1px solid #fecaca;'
'border-radius:12px;background:#fff5f5;color:#991b1b;">'
'<h3 style="margin:0 0 8px;font-size:18px;">Błąd preagregacji</h3>'
f'<p style="margin:0;">{str(e)}</p></div>'
)
sys.exit(1)
# KPI (na podstawie daily_sales)
daily = serialized.get("daily_sales") or []
total_sales = sum((r.get("sales") or 0) for r in daily)
total_qty = sum((r.get("qty") or 0) for r in daily)
total_docs = sum((r.get("docs") or 0) for r in daily)
asp = (total_sales / total_qty) if total_qty else None
kpis = [
("Sprzedaż (PLN)", fmt_money(total_sales)),
("Ilość (szt.)", "{:,.0f}".format(total_qty).replace(",", " ")),
("Dokumenty", "{:,.0f}".format(total_docs).replace(",", " ")),
("ASP (PLN/szt.)", fmt_money(asp) if asp is not None else ""),
]
# Sekcje HTML — WYŚWIETLAMY WSZYSTKIE KLUCZOWE PREAGREGATY
top_prod = serialized.get("top10_products_by_sales") or []
top_cli = serialized.get("top10_customers_by_sales") or []
daily_tbl = html_table(serialized.get("daily_sales") or [], title="Sprzedaż dzienna (skrót)", max_rows=30)
prod_sum_tbl = html_table(serialized.get("product_summary") or [], title="Podsumowanie produktów (skrót)", max_rows=30)
cust_sum_tbl = html_table(serialized.get("customer_summary") or [], title="Podsumowanie klientów (skrót)", max_rows=30)
prod_daily_tbl= html_table(serialized.get("product_daily") or [], title="Produkt × Dzień (próbka)", max_rows=30)
prod_tbl = html_table(top_prod, title="Top 10 produktów (po sprzedaży)", max_rows=10)
cust_tbl = html_table(top_cli, title="Top 10 klientów (po sprzedaży)", max_rows=10)
# Dane do AI
ai_data = build_ai_payload(serialized, period_label)
ai_json = json.dumps(ai_data, ensure_ascii=False, separators=(",", ":"), default=str)
# Wołanie AI (z fallbackiem na mini model przy 429: insufficient_quota)
ai_section = ""
if api_key:
try:
ai_section = call_openai_chat(
api_key=api_key,
model=model,
system_prompt=system_prompt,
user_payload_json=ai_json,
temperature=0.3,
connect_timeout=10,
read_timeout=90,
max_retries=3,
)
except Exception as e:
err = str(e)
if "insufficient_quota" in err or "You exceeded your current quota" in err:
# spróbuj tańszego modelu
try:
ai_section = call_openai_chat(
api_key=api_key,
model="gpt-4.1-mini",
system_prompt=system_prompt,
user_payload_json=ai_json,
temperature=0.3,
connect_timeout=10,
read_timeout=90,
max_retries=2,
)
model_alias = "GPT-5 Mini"
except Exception as ee:
ai_section = (
'<div style="color:#991b1b;background:#fff5f5;border:1px solid #fecaca;'
'padding:10px;border-radius:8px;">'
f'Brak dostępnego limitu API. {str(ee)}</div>'
)
else:
ai_section = (
'<div style="color:#991b1b;background:#fff5f5;border:1px solid #fecaca;'
'padding:10px;border-radius:8px;">'
f'Błąd wywołania AI: {err}</div>'
)
# Finalny HTML (jeden <div>)
report_html = render_report_html(
period_label=period_label,
kpis=kpis,
parts=[prod_tbl, cust_tbl, daily_tbl, prod_sum_tbl, cust_sum_tbl, prod_daily_tbl],
ai_section=ai_section,
model_alias=model_alias if api_key else ""
)
sys.stdout.write(report_html)
if __name__ == "__main__":
main()