How to Build Traceable and Evaluated LLM Workflows Using Promptflow, Prompty, and OpenAI


(WORK_DIR / "judge.prompty").write_text("""---
name: Judge
model:
 api: chat
 configuration:
   type: openai
   connection: open_ai_connection
   model: gpt-4o-mini
 parameters:
   temperature: 0
   max_tokens: 150
   response_format: {type: json_object}
inputs:
 question: {type: string}
 answer:   {type: string}
 expected: {type: string}
---
system:
You are an exacting grader. Decide whether the assistant's answer contains the expected fact (case-insensitive, allowing reasonable phrasing/synonyms). Reply ONLY as JSON: {"score": 0 or 1, "reason": "..."}.


user:
Question: {{question}}
Expected: {{expected}}
Answer:   {{answer}}
""")


(WORK_DIR / "eval_flow.py").write_text(textwrap.dedent('''
   import json
   from pathlib import Path
   from promptflow.tracing import trace
   from promptflow.core import Prompty


   BASE = Path(__file__).parent


   class Evaluator:
       def __init__(self):
           self.judge = Prompty.load(source=BASE / "judge.prompty")


       @trace
       def __call__(self, question: str, answer: str, expected: str) -> dict:
           raw = self.judge(question=question, answer=answer, expected=expected)
           if isinstance(raw, str):
               try: raw = json.loads(raw)
               except Exception: raw = {"score": 0, "reason": f"unparseable:{raw[:80]}"}
           return {"score": int(raw.get("score", 0)), "reason": str(raw.get("reason",""))}


       def __aggregate__(self, line_results):
           """Run-level aggregation. Whatever this returns shows up in pf.get_metrics()."""
           scores = [r["score"] for r in line_results if r]
           return {
               "accuracy": (sum(scores) / len(scores)) if scores else 0.0,
               "passed":   sum(scores),
               "total":    len(scores),
           }
'''))


(WORK_DIR / "eval.flex.yaml").write_text(
   "$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json\n"
   "entry: eval_flow:Evaluator\n"
)


print("\n=== Evaluation run ===")
eval_run = pf.run(
   flow=str(WORK_DIR / "eval.flex.yaml"),
   data=str(data_path),
   run=base_run,
   column_mapping={
       "question": "${data.question}",
       "expected": "${data.expected}",
       "answer":   "${run.outputs.answer}",
   },
   stream=True,
)


eval_details = pf.get_details(eval_run)
print(eval_details)


print("\n=== Aggregated metrics (from __aggregate__) ===")
print(json.dumps(pf.get_metrics(eval_run), indent=2))


import pandas as pd
if "outputs.score" in eval_details.columns:
   s = pd.to_numeric(eval_details["outputs.score"], errors="coerce").fillna(0)
   print(f"Manual accuracy: {s.mean():.2%}  ({int(s.sum())}/{len(s)})")