#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
analysisAI.py — pobiera dane z MySQL, liczy preagregaty, renderuje HTML i dokłada analizę AI.
ZMIENNE ŚRODOWISKOWE (mają domyślne wartości):
OPENAI_API_KEY - klucz do OpenAI (gdy pusty -> skrypt pokaże wersję bez AI)
OPENAI_MODEL - np. gpt-4.1 (domyślnie)
MYSQL_HOST - host MySQL (domyślnie: localhost)
MYSQL_USER - użytkownik MySQL (domyślnie: root)
MYSQL_PASSWORD - hasło MySQL (domyślnie: rootpassword)
MYSQL_DATABASE - nazwa bazy (domyślnie: preDb_0dcc87940d3655fa574b253df04ca1c3)
MYSQL_PORT - port MySQL (domyślnie: 3306)
PERIOD_FROM - data od (YYYY-MM-DD); gdy brak -> poprzedni pełny miesiąc
PERIOD_TO - data do (YYYY-MM-DD, exclusive); gdy brak -> pierwszy dzień bieżącego miesiąca
INVOICE_TYPE - typ dokumentu (domyślnie: normal)
"""
import os, sys, json, math, time, warnings
from datetime import date, timedelta
API_KEY = "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"
# Wycisz ostrzeżenie urllib3 (LibreSSL na macOS itp.)
try:
from urllib3.exceptions import NotOpenSSLWarning
warnings.filterwarnings("ignore", category=NotOpenSSLWarning)
except Exception:
pass
import requests
import mysql.connector
from preaggregates import compute_preaggregates, serialize_for_ai
# --------- utils ---------
def getenv(k, d=None):
return os.environ.get(k, d)
def last_full_month_bounds():
"""Zwraca (from_iso, to_iso) dla poprzedniego pełnego miesiąca."""
today_first = date.today().replace(day=1)
to_dt = today_first
prev_last = today_first - timedelta(days=1)
from_dt = prev_last.replace(day=1)
return from_dt.isoformat(), to_dt.isoformat()
def compact_table(table, limit=30):
"""Przytnij listę rekordów i znormalizuj liczby (NaN/Inf -> None)."""
out = []
if not table:
return out
for i, row in enumerate(table):
if i >= int(limit): break
new = {}
for k, v in row.items():
if isinstance(v, float):
new[k] = round(v, 6) if math.isfinite(v) else None
else:
new[k] = v
out.append(new)
return out
def build_ai_payload(serialized, period_label):
"""Kompaktowy JSON do AI (ograniczone rozmiary)."""
return {
"kpis_hint": {"period_label": period_label},
"daily_sales": compact_table(serialized.get("daily_sales"), 30),
"product_summary": compact_table(serialized.get("product_summary"), 50),
"customer_summary": compact_table(serialized.get("customer_summary"), 50),
"top10_products_by_sales": compact_table(serialized.get("top10_products_by_sales"), 10),
"top10_customers_by_sales": compact_table(serialized.get("top10_customers_by_sales"), 10),
"product_daily_sample": compact_table(serialized.get("product_daily"), 40),
}
def call_openai_chat(api_key, model, system_prompt, user_payload_json,
temperature=0.3, connect_timeout=10, read_timeout=90, max_retries=3):
"""Wywołanie Chat Completions (retry + backoff). Zwraca HTML (sekcję) od AI."""
url = "https://api.openai.com/v1/chat/completions"
headers = {"Authorization": "Bearer " + api_key, "Content-Type": "application/json"}
body = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Dane (JSON):\n\n" + user_payload_json},
],
"temperature": temperature,
# "max_tokens": 1200, # możesz odkomentować, aby ograniczyć długość odpowiedzi
}
last_err = None
for attempt in range(1, int(max_retries) + 1):
try:
r = requests.post(url, headers=headers, json=body, timeout=(connect_timeout, read_timeout))
if 200 <= r.status_code < 300:
data = r.json()
return data.get("choices", [{}])[0].get("message", {}).get("content", "")
last_err = RuntimeError("OpenAI HTTP {}: {}".format(r.status_code, r.text))
except requests.exceptions.RequestException as e:
last_err = e
time.sleep(min(2 ** attempt, 10))
raise RuntimeError("OpenAI request failed: {}".format(last_err))
def fmt_money(v):
try:
return "{:,.2f}".format(float(v)).replace(",", " ").replace(".", ",")
except Exception:
return str(v)
def html_table(records, title=None, max_rows=20):
"""Proste generowanie tabeli HTML z listy dict-ów."""
if not records:
return '
Brak danych
'
cols = list(records[0].keys())
body_rows = records[:max_rows]
thead = "".join("{} | ".format(c) for c in cols)
trs = []
for r in body_rows:
tds = []
for c in cols:
val = r.get(c, "")
if isinstance(val, (int, float)):
# format dla kolumn „sales”, „qty”, „asp”, itp. – lekko ładniej
if "sales" in c or "total" in c or "netto" in c:
tds.append('{} | '.format(fmt_money(val)))
else:
tds.append('{} | '.format(val))
else:
tds.append('{} | '.format(val))
trs.append("{}
".format("".join(tds)))
cap = '{}
'.format(title) if title else ""
return (
cap +
''.format(thead, "".join(trs))
)
def render_report_html(period_label, kpis, parts, ai_section):
"""Składa finalny jeden z lekkim CSS inline."""
css = (
"font-family:system-ui,-apple-system,Segoe UI,Roboto,Arial,sans-serif;"
"max-width:1200px;margin:24px auto;padding:16px 20px;border:1px solid #e5e7eb;"
"border-radius:12px;background:#fff;color:#111827"
)
kpi_item = (
'
'
)
kpi_html = "".join(
kpi_item.format(label=lbl, value=val) for (lbl, val) in kpis
)
sections_html = "".join(parts)
# jeśli AI nie zwróciło
, owiń
if ai_section and not ai_section.lstrip().startswith("
Raport sprzedaży — {period_label}
{kpi_html}
{sections_html}
Analiza i rekomendacje (AI)
{ai_section if ai_section else '
Brak odpowiedzi AI (brak OPENAI_API_KEY)
'}
"""
# --------- main ---------
def main():
# Konfiguracja DB
cfg = {
"host": getenv("MYSQL_HOST", "twinpol-mysql56"),
# "host": getenv("MYSQL_HOST", "localhost"),
"user": getenv("MYSQL_USER", "root"),
"password": getenv("MYSQL_PASSWORD", "rootpassword"),
"database": getenv("MYSQL_DATABASE", "preDb_0dcc87940d3655fa574b253df04ca1c3"),
"port": int(getenv("MYSQL_PORT", "3306")),
}
# Zakres dat
period_from = getenv("PERIOD_FROM")
period_to = getenv("PERIOD_TO")
if not period_from or not period_to:
period_from, period_to = last_full_month_bounds()
period_label = "{} .. {}".format(period_from, period_to)
invoice_type = getenv("INVOICE_TYPE", "normal")
# Konfiguracja AI
#api_key = getenv("OPENAI_API_KEY", "")
api_key = API_KEY
model = getenv("OPENAI_MODEL", "gpt-4.1")
system_prompt = (
"Jesteś analitykiem sprzedaży. Zwróć TYLKO jedną sekcję HTML (bez //), "
"może być pojedynczy
z nagłówkami i listami. Podsumuj trendy, wskaż kluczowe produkty/klientów, "
"anomalia/odchylenia oraz daj 3–6 praktycznych rekomendacji. Krótko, konkretnie, po polsku."
)
# SQL -> rows
try:
cnx = mysql.connector.connect(**cfg)
cur = cnx.cursor()
cur.execute(
"""
SELECT i.document_no,
i.parent_name,
DATE(i.register_date) AS register_date,
ii.code,
ii.name,
ii.quantity,
ii.total_netto
FROM ecminvoiceoutitems AS ii
JOIN ecminvoiceouts AS i ON i.id = ii.ecminvoiceout_id
WHERE i.register_date >= %s
AND i.register_date < %s
AND i.type = %s
""",
(period_from, period_to, invoice_type),
)
rows = cur.fetchall()
cur.close()
cnx.close()
except Exception as e:
sys.stdout.write(
'
'
'
Błąd połączenia/zapytania MySQL
'
'
{}
'.format(str(e))
)
sys.exit(1)
# Preagregaty
try:
results = compute_preaggregates(rows)
serialized = serialize_for_ai(results)
except Exception as e:
sys.stdout.write(
'
'
'
Błąd preagregacji
'
'
{}
'.format(str(e))
)
sys.exit(1)
# KPI (na podstawie daily_sales)
daily = serialized.get("daily_sales") or []
total_sales = sum((r.get("sales") or 0) for r in daily)
total_qty = sum((r.get("qty") or 0) for r in daily)
total_docs = sum((r.get("docs") or 0) for r in daily)
asp = (total_sales / total_qty) if total_qty else None
kpis = [
("Sprzedaż (PLN)", fmt_money(total_sales)),
("Ilość (szt.)", "{:,.0f}".format(total_qty).replace(",", " ")),
("Dokumenty", "{:,.0f}".format(total_docs).replace(",", " ")),
("ASP (PLN/szt.)", fmt_money(asp) if asp is not None else "—"),
]
# Sekcje HTML z Twoich preagregatów
top_prod = serialized.get("top10_products_by_sales") or []
top_cli = serialized.get("top10_customers_by_sales") or []
prod_tbl = html_table(top_prod, title="Top 10 produktów (po sprzedaży)", max_rows=10)
cust_tbl = html_table(top_cli, title="Top 10 klientów (po sprzedaży)", max_rows=10)
# Dane do AI
ai_data = build_ai_payload(serialized, period_label)
ai_json = json.dumps(ai_data, ensure_ascii=False, separators=(",", ":"), default=str)
# Wołanie AI (opcjonalne)
ai_section = ""
if api_key:
try:
ai_section = call_openai_chat(
api_key=api_key,
model=model,
system_prompt=system_prompt,
user_payload_json=ai_json,
temperature=0.3,
connect_timeout=10,
read_timeout=90,
max_retries=3,
)
except Exception as e:
ai_section = (
'
Błąd wywołania AI: {}
'.format(str(e))
)
# Finalny HTML (jeden
)
report_html = render_report_html(
period_label=period_label,
kpis=kpis,
parts=[prod_tbl, cust_tbl],
ai_section=ai_section
)
sys.stdout.write(report_html)
if __name__ == "__main__":
main()