# worker.py import os, json, io, uuid import datetime as dt from typing import Dict, Any, List import polars as pl import pymysql from tenacity import retry, wait_exponential, stop_after_attempt from dotenv import load_dotenv load_dotenv() AI_MODEL = os.getenv("AI_MODEL", "gpt-5-pro") AI_API_KEY = os.getenv("AI_API_KEY") MYSQL_CONF = dict( host=os.getenv("MYSQL_HOST", "localhost"), user=os.getenv("MYSQL_USER", "root"), password=os.getenv("MYSQL_PASSWORD", ""), database=os.getenv("MYSQL_DB", "sales"), cursorclass=pymysql.cursors.DictCursor, ) def mysql_query(sql: str, params: tuple = ()) -> pl.DataFrame: conn = pymysql.connect(**MYSQL_CONF) try: with conn.cursor() as cur: cur.execute(sql, params) rows = cur.fetchall() finally: conn.close() return pl.from_dicts(rows) def to_csv(df: pl.DataFrame) -> str: buf = io.StringIO() df.write_csv(buf) return buf.getvalue() SQL_KPIS_DAILY = """ SELECT DATE(invoice_date) AS d, SUM(net_amount) AS revenue, SUM(quantity) AS qty, ROUND(100*SUM(net_amount - cost_amount)/NULLIF(SUM(net_amount),0), 2) AS gross_margin_pct, ROUND(100*SUM(discount_amount)/NULLIF(SUM(gross_amount),0), 2) AS discount_pct FROM fact_invoices WHERE invoice_date BETWEEN %s AND %s GROUP BY 1 ORDER BY 1; """ SQL_TOP_SEGMENTS = """ SELECT {axis} AS key, ANY_VALUE({label}) AS label, SUM(net_amount) AS revenue, SUM(quantity) AS qty, ROUND(100*SUM(net_amount - cost_amount)/NULLIF(SUM(net_amount),0), 2) AS gross_margin_pct, ROUND(100*(SUM(net_amount) - LAG(SUM(net_amount)) OVER(ORDER BY 1))/ NULLIF(LAG(SUM(net_amount)) OVER(ORDER BY 1),0), 2) AS trend_30d FROM fact_invoices WHERE invoice_date BETWEEN DATE_SUB(%s, INTERVAL 60 DAY) AND %s GROUP BY 1 ORDER BY revenue DESC LIMIT %s; """ class AIClient: def __init__(self, api_key: str): self.api_key = api_key @retry(wait=wait_exponential(multiplier=1, min=1, max=20), stop=stop_after_attempt(6)) def structured_analysis(self, prompt: str, schema: Dict[str, Any]) -> Dict[str, Any]: # TODO: PODMIEŃ na realne wywołanie modelu z "Structured Outputs" raise NotImplementedError("Wire your model SDK here") @retry(wait=wait_exponential(multiplier=1, min=1, max=20), stop=stop_after_attempt(6)) def batch_submit(self, ndjson_lines: List[str]) -> str: # TODO: PODMIEŃ na rzeczywiste Batch API raise NotImplementedError def run_online(from_date: str, to_date: str, currency: str, axis: str, label: str, top_n: int, goal: str) -> Dict[str, Any]: kpis = mysql_query(SQL_KPIS_DAILY, (from_date, to_date)) top = mysql_query(SQL_TOP_SEGMENTS.format(axis=axis, label=label), (from_date, to_date, top_n)) csv_blocks = ("## kpis_daily\n" + to_csv(kpis) + "\n\n" + "## top_segments\n" + to_csv(top)) with open(os.path.join(os.path.dirname(__file__), "sales-analysis.schema.json"), "r", encoding="utf-8") as f: schema = json.load(f) prompt = f""" Jesteś analitykiem sprzedaży. Otrzymasz: (a) kontekst, (b) dane. Zwróć **wyłącznie** JSON zgodny ze schema. Kontekst: - Waluta: {currency} - Zakres: {from_date} → {to_date} - Cel: {goal} - Poziom segmentacji: {axis} Dane (CSV): {csv_blocks} Wskazówki: - Użyj danych jak są (nie wymyślaj liczb). - W meta.scope wpisz opis zakresu i segmentacji. - Jeśli brak anomalii – anomalies: []. - Kwoty do 2 miejsc, procenty do 1. """ ai = AIClient(AI_API_KEY) result = ai.structured_analysis(prompt, schema) out_dir = os.path.join(os.path.dirname(__file__), "out") os.makedirs(out_dir, exist_ok=True) out_path = os.path.join(out_dir, f"{uuid.uuid4()}.json") with open(out_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False) return {"status": "ok", "path": out_path} def run_batch(from_date: str, to_date: str, axis: str, label: str): # Zgodnie z blueprintem – generujemy linie NDJSON (skrót; pełny wariant w PDF) # TODO: dodać realne wywołania batch_submit i zapisać ID/stan raise NotImplementedError("Implement batch per blueprint") if __name__ == "__main__": import argparse p = argparse.ArgumentParser() sub = p.add_subparsers(dest="cmd") o = sub.add_parser("online") o.add_argument("from_date"); o.add_argument("to_date"); o.add_argument("currency") o.add_argument("axis", choices=["sku_id","client_id","region_code"]) o.add_argument("label"); o.add_argument("top_n", type=int, nargs="?", default=50) o.add_argument("goal") b = sub.add_parser("batch") b.add_argument("from_date"); b.add_argument("to_date"); b.add_argument("axis"); b.add_argument("label") args = p.parse_args() if args.cmd == "online": print(run_online(args.from_date, args.to_date, args.currency, args.axis, args.label, args.top_n, args.goal)) elif args.cmd == "batch": print(run_batch(args.from_date, args.to_date, args.axis, args.label)) else: p.print_help()