From 443194bca4491fae4400bae9dad2a0470692bdbf Mon Sep 17 00:00:00 2001 From: maderix Date: Tue, 3 Mar 2026 05:24:35 -0800 Subject: [PATCH] Dashboard v2: live stats, JSON parsing, all three pipelines - Parse static pipeline JSON step/batch/perf lines for real-time updates - Running elapsed time, ms/step from wall-clock timestamps, steps/sec - Compute ANE + Total TFLOPS from FLOPs/step when not reported directly - Support --ane (train_large_ane) and --no-ane-extras flags - Dynamic pipeline timing breakdown + CKPT_PATH per mode --- training/dashboard.py | 148 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 130 insertions(+), 18 deletions(-) diff --git a/training/dashboard.py b/training/dashboard.py index 06d46a2..5926a8f 100644 --- a/training/dashboard.py +++ b/training/dashboard.py @@ -1,6 +1,6 @@ """TUI dashboard for ANE training (train_large). Uses blessed for terminal UI.""" -import argparse, fcntl, math, os, re, select, signal, struct, subprocess, sys, time, threading +import argparse, fcntl, json, math, os, re, select, signal, struct, subprocess, sys, time, threading from collections import deque from pathlib import Path @@ -20,7 +20,9 @@ except ImportError: DIM, HIDDEN, HEADS, SEQ, VOCAB, NLAYERS = 768, 2048, 12, 256, 32000, 12 HD = DIM // HEADS -CKPT_PATH = 'ane_stories110M_ckpt.bin' +CKPT_PATH_STATIC = 'ane_stories110M_ckpt.bin' +CKPT_PATH_DYNAMIC = 'training_dynamic/ane_stories110M_dyn_ckpt.bin' +CKPT_PATH = CKPT_PATH_STATIC # set in main() based on --dynamic TOKENIZER_PATH = str(Path(__file__).resolve().parent.parent.parent / 'assets' / 'models' / 'tokenizer.bin') @@ -56,6 +58,9 @@ class State: self.mem_mb_history = deque(maxlen=300) self.proc_mem_mb_history = deque(maxlen=300) self.train_pid = None + self.step_timestamps = [] # (step, time.monotonic()) for running ms/step + self.train_start = None # wall clock when first step seen + self.compile_ms = 0.0 # total compile time S = State() @@ -278,23 +283,69 @@ def sysmetrics_thread(): RE_CONFIG = re.compile(r'dim=(\d+) hidden=(\d+) heads=(\d+) seq=(\d+) vocab=(\d+) layers=(\d+)') RE_PARAMS = re.compile(r'Params: ([\d.]+)M \(transformer ([\d.]+)M \+ embed ([\d.]+)M\)') RE_KERNELS = re.compile(r'Kernels: (\d+).*?(\d+) weight-bearing') +RE_KERNELS_DYN = re.compile(r'Kernels: (\d+) compiled, (\d+) weight-bearing') RE_ACCUM = re.compile(r'Accum (\d+).*LR=([\d.e+-]+)') RE_STEP = re.compile(r'step\s+(\d+)\s+loss=([\d.]+)(?:\s+lr=([\d.e+-]+))?(?:\s+([\d.]+)ms/step)?') RE_BATCH = re.compile(r'\[batch (\d+): compile=([\d.]+)ms train=([\d.]+)ms \(([\d.]+)ms/step\) compiles=(\d+)\]') RE_TIMING = re.compile(r'ane=([\d.]+) io=([\d.]+) cls=([\d.]+) elem=([\d.]+) rms=([\d.]+) cblas_wait=([\d.]+)') +RE_TIMING_DYN = re.compile(r'ane_fwd=([\d.]+) io_fwd=([\d.]+) rms=([\d.]+) ane_bwd=([\d.]+) io_bwd=([\d.]+) silu=([\d.]+) rms_bwd=([\d.]+) cls=([\d.]+) cblas_wait=([\d.]+) dw_copy=([\d.]+)') RE_RESTART = re.compile(r'\[exec\(\) restart step (\d+)') RE_RESUME = re.compile(r'\[RESUMED step (\d+), loss=([\d.]+)\]') RE_FLOPS = re.compile(r'FLOPs/step: fwd=([\d.]+)M bwd_dx=([\d.]+)M bwd_dW=([\d.]+)M sdpa_bwd=([\d.]+)M total=([\d.]+)M') RE_ANE_FLOPS = re.compile(r'ANE FLOPs/step: ([\d.]+)M') RE_ANE_TFLOPS = re.compile(r'ANE TFLOPS:\s+([\d.]+)') RE_ANE_UTIL = re.compile(r'ANE utilization:\s+([\d.]+)%') -RE_EFFICIENCY = re.compile(r'(Total steps|Wall time|Compile time|Train time|Avg compile|Avg train|ANE TFLOPS|Total TFLOPS|ANE utilization):?\s+(.+)') +RE_EFFICIENCY = re.compile(r'(Total steps|Wall time|Compile time|Compile|Train time|Avg compile|Avg train|ANE TFLOPS|Total TFLOPS|ANE utilization):?\s+(.+)') +RE_COMPILED = re.compile(r'Compiled (\d+) kernels in (\d+)ms') RE_ANE_POWER = re.compile(r'ANE Power:\s+([\d.]+)\s*mW') RE_CPU_POWER = re.compile(r'CPU Power:\s+([\d.]+)\s*mW') RE_GPU_POWER = re.compile(r'GPU Power:\s+([\d.]+)\s*mW') def parse_line(line): S.logs.append(line) + # Parse JSON lines from static pipeline ({"type":"step",...} or {"type":"batch",...}) + stripped = line.strip() + if stripped.startswith('{'): + try: + j = json.loads(stripped) + jt = j.get('type') + if jt == 'step': + S.step, S.loss = j['step'], j['loss'] + S.loss_history.append((S.step, S.loss)) + S.best_loss = min(S.best_loss, S.loss) + S.compiles = j.get('compiles', S.compiles) + now = time.monotonic() + if S.train_start is None: + S.train_start = now + S.step_timestamps.append((S.step, now)) + if len(S.step_timestamps) >= 2: + dt = S.step_timestamps[-1][1] - S.step_timestamps[-2][1] + if dt > 0: + S.ms_per_step = dt * 1000 + # Extract component timing from JSON + ct = {} + for k in ('t_ane', 't_io', 't_cls', 't_elem', 't_rms', 't_cblas_wait'): + if k in j: + ct[k[2:]] = j[k] # strip 't_' prefix + if ct: + S.component_timing = ct + return + elif jt == 'batch': + S.batch_num = j.get('batch', S.batch_num) + compile_ms = j.get('compile_ms', 0) + train_ms = j.get('train_ms', 0) + S.ms_per_step = j.get('ms_per_step', S.ms_per_step) + S.compile_ms += compile_ms + S.compile_pct = 100 * S.compile_ms / (S.compile_ms + train_ms) if S.compile_ms + train_ms > 0 else 0 + return + elif jt == 'perf': + if 'ane_tflops' in j: + S.flops['ane_tflops'] = j['ane_tflops'] + if 'ane_util_pct' in j: + S.flops['ane_util'] = j['ane_util_pct'] + return + except (json.JSONDecodeError, KeyError): + pass m = RE_CONFIG.search(line) if m: S.model_config = dict(zip(['dim', 'hidden', 'heads', 'seq', 'vocab', 'layers'], map(int, m.groups()))) @@ -303,7 +354,7 @@ def parse_line(line): if m: S.params = {'total': float(m[1]), 'transformer': float(m[2]), 'embed': float(m[3])} return - m = RE_KERNELS.search(line) + m = RE_KERNELS_DYN.search(line) or RE_KERNELS.search(line) if m: S.kernels = {'total': int(m[1]), 'weight_bearing': int(m[2])} return @@ -327,6 +378,14 @@ def parse_line(line): S.training['lr'] = m[3] if m[4]: S.ms_per_step = float(m[4]) + now = time.monotonic() + if S.train_start is None: + S.train_start = now + S.step_timestamps.append((S.step, now)) + if not m[4] and len(S.step_timestamps) >= 2: + dt = S.step_timestamps[-1][1] - S.step_timestamps[-2][1] + if dt > 0: + S.ms_per_step = dt * 1000 S.loss_history.append((S.step, S.loss)) S.best_loss = min(S.best_loss, S.loss) return @@ -338,6 +397,16 @@ def parse_line(line): S.compiles = int(m[5]) S.compile_pct = 100 * compile_ms / (compile_ms + train_ms) if compile_ms + train_ms > 0 else 0 return + m = RE_TIMING_DYN.search(line) + if m: + vals = list(map(float, m.groups())) + S.component_timing = { + 'ane_fwd': vals[0], 'io_fwd': vals[1], 'rms': vals[2], + 'ane_bwd': vals[3], 'io_bwd': vals[4], 'silu': vals[5], + 'rms_bwd': vals[6], 'cls': vals[7], 'cblas_wait': vals[8], 'dw_copy': vals[9], + '_dynamic': True + } + return m = RE_TIMING.search(line) if m: S.component_timing = dict(zip(['ane', 'io', 'cls', 'elem', 'rms', 'cblas_wait'], map(float, m.groups()))) @@ -350,6 +419,11 @@ def parse_line(line): if m: S.flops['ane_util'] = float(m[1]) return + m = RE_COMPILED.search(line) + if m: + S.compiles = int(m[1]) + S.compile_ms += float(m[2]) + return m = RE_EFFICIENCY.search(line) if m: S.efficiency[m[1].strip()] = m[2].strip() @@ -518,23 +592,49 @@ def draw(term): # Training stats (right panel) sr = row step_str = f'{S.step}' + (f'/{S.total_steps}' if S.total_steps and S.total_steps < 999999 else '') - put(sr, mid_x + 1, f' Step: {step_str} Loss: {S.loss:.4f}' if S.loss else ' Step: --', term.yellow) + # Elapsed time + elapsed = 0.0 + if S.train_start: + elapsed = time.monotonic() - S.train_start + elapsed_str = f'{elapsed:.1f}s' if elapsed < 60 else f'{elapsed/60:.1f}m' + put(sr, mid_x + 1, f' Step: {step_str} Loss: {S.loss:.4f} [{elapsed_str}]' if S.loss else ' Step: --', term.yellow) sr += 1 - put(sr, mid_x + 1, f' Best: {S.best_loss:.4f} ms/step: {S.ms_per_step:.1f}' if S.best_loss < float('inf') else ' Best: --') + # ms/step + steps/sec + sps = 1000.0 / S.ms_per_step if S.ms_per_step > 0 else 0 + put(sr, mid_x + 1, f' Best: {S.best_loss:.4f} {S.ms_per_step:.1f}ms/step ({sps:.1f} steps/s)' if S.best_loss < float('inf') else ' Best: --') sr += 1 + # TFLOPS ane_tflops = S.flops.get('ane_tflops', 0) ane_util = S.flops.get('ane_util', 0) + total_tflops = 0 + if S.ms_per_step > 0 and S.flops.get('ane', 0) > 0: + if not ane_tflops: + ane_tflops = (S.flops['ane'] * 1e6) / (S.ms_per_step * 1e-3) / 1e12 + total_tflops = (S.flops.get('total', 0) * 1e6) / (S.ms_per_step * 1e-3) / 1e12 + if not ane_util and ane_tflops: + ane_util = 100.0 * ane_tflops / 15.8 + compile_str = f' Compile: {S.compile_ms/1000:.1f}s' if S.compile_ms > 0 else '' if ane_tflops: - put(sr, mid_x + 1, f' ANE: {ane_tflops:.2f}T Compile: {S.compile_pct:.0f}% Util: {ane_util:.1f}%') - else: - put(sr, mid_x + 1, f' Compile: {S.compile_pct:.0f}%') + tflops_str = f' ANE: {ane_tflops:.2f}T' + if total_tflops: + tflops_str += f' Total: {total_tflops:.2f}T' + tflops_str += f' Util: {ane_util:.1f}%{compile_str}' + put(sr, mid_x + 1, tflops_str) + elif compile_str: + put(sr, mid_x + 1, f'{compile_str}') sr += 1 ct = S.component_timing if ct: - put(sr, mid_x + 1, f' ane={ct.get("ane", 0):.1f} io={ct.get("io", 0):.1f} cls={ct.get("cls", 0):.1f} elem={ct.get("elem", 0):.1f}') - sr += 1 - put(sr, mid_x + 1, f' rms={ct.get("rms", 0):.1f} cblas_wait={ct.get("cblas_wait", 0):.1f} ms/step') - sr += 1 + if ct.get('_dynamic'): + put(sr, mid_x + 1, f' fwd={ct.get("ane_fwd",0):.1f} bwd={ct.get("ane_bwd",0):.1f} io={ct.get("io_fwd",0)+ct.get("io_bwd",0):.1f} silu={ct.get("silu",0):.1f}') + sr += 1 + put(sr, mid_x + 1, f' cls={ct.get("cls",0):.1f} rms={ct.get("rms",0)+ct.get("rms_bwd",0):.1f} dw={ct.get("dw_copy",0):.1f} ms/step') + sr += 1 + else: + put(sr, mid_x + 1, f' ane={ct.get("ane", 0):.1f} io={ct.get("io", 0):.1f} cls={ct.get("cls", 0):.1f} elem={ct.get("elem", 0):.1f}') + sr += 1 + put(sr, mid_x + 1, f' rms={ct.get("rms", 0):.1f} cblas_wait={ct.get("cblas_wait", 0):.1f} ms/step') + sr += 1 pw = S.power if any(pw.values()): put(sr, mid_x + 1, '\u2500 Power ' + '\u2500' * max(0, right_w - 9), term.cyan) @@ -663,9 +763,12 @@ def set_nonblock(fd): fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) -def spawn_training(resume=False, steps=10000, dynamic=False, scratch=False, lr=None, accum=None): +def spawn_training(resume=False, steps=10000, dynamic=False, ane=False, scratch=False, + lr=None, accum=None, no_ane_extras=False): if dynamic: cmd = 'cd training_dynamic && make 2>&1 && ./train' + elif ane: + cmd = 'make train_large_ane 2>&1 && ./train_large_ane' else: cmd = 'make train_large 2>&1 && ./train_large' if resume: @@ -674,8 +777,10 @@ def spawn_training(resume=False, steps=10000, dynamic=False, scratch=False, lr=N cmd += ' --scratch' if lr is not None: cmd += f' --lr {lr}' - if accum is not None: + if accum is not None and dynamic: cmd += f' --accum {accum}' + if no_ane_extras and ane: + cmd += ' --no-ane-extras' cmd += f' --steps {steps}' proc = subprocess.Popen( ['bash', '-c', cmd], @@ -697,7 +802,9 @@ def spawn_powermetrics(): def main(): parser = argparse.ArgumentParser(description='ANE Training Dashboard (stories110M)') parser.add_argument('--resume', action='store_true', help='Resume from checkpoint') - parser.add_argument('--dynamic', action='store_true', help='Use v2 dynamic weight pipeline (training_dynamic/)') + parser.add_argument('--dynamic', action='store_true', help='Dynamic weight pipeline (training_dynamic/)') + parser.add_argument('--ane', action='store_true', help='PR#19: ANE-offloaded classifier/softmax/rmsnorm_bwd') + parser.add_argument('--no-ane-extras', action='store_true', help='Disable ANE extras (use with --ane)') parser.add_argument('--scratch', action='store_true', help='Train from scratch (random init)') parser.add_argument('--lr', type=float, default=None, help='Learning rate') parser.add_argument('--accum', type=int, default=None, help='Gradient accumulation steps') @@ -711,11 +818,15 @@ def main(): args.steps = 999999999 S.total_steps = args.steps + global CKPT_PATH + CKPT_PATH = CKPT_PATH_DYNAMIC if args.dynamic else CKPT_PATH_STATIC + term = Terminal() procs = [] train_proc = spawn_training(resume=args.resume, steps=args.steps, dynamic=args.dynamic, - scratch=args.scratch, lr=args.lr, accum=args.accum) + scratch=args.scratch, lr=args.lr, accum=args.accum, + ane=args.ane, no_ane_extras=args.no_ane_extras) S.train_pid = train_proc.pid procs.append(train_proc) @@ -856,7 +967,8 @@ def main(): train_proc.terminate() train_proc.wait() train_proc = spawn_training(resume=True, steps=args.steps, dynamic=args.dynamic, - lr=args.lr, accum=args.accum) + lr=args.lr, accum=args.accum, + ane=args.ane, no_ane_extras=args.no_ane_extras) S.train_pid = train_proc.pid procs = [p for p in procs if p.poll() is None] procs.append(train_proc)