Dashboard v2: live stats, JSON parsing, all three pipelines

- Parse static pipeline JSON step/batch/perf lines for real-time updates
- Running elapsed time, ms/step from wall-clock timestamps, steps/sec
- Compute ANE + Total TFLOPS from FLOPs/step when not reported directly
- Support --ane (train_large_ane) and --no-ane-extras flags
- Dynamic pipeline timing breakdown + CKPT_PATH per mode
This commit is contained in:
maderix 2026-03-03 05:24:35 -08:00 committed by maderix
parent 3c1aae65d7
commit 443194bca4
1 changed files with 130 additions and 18 deletions

View File

@ -1,6 +1,6 @@
"""TUI dashboard for ANE training (train_large). Uses blessed for terminal UI."""
import argparse, fcntl, math, os, re, select, signal, struct, subprocess, sys, time, threading
import argparse, fcntl, json, math, os, re, select, signal, struct, subprocess, sys, time, threading
from collections import deque
from pathlib import Path
@ -20,7 +20,9 @@ except ImportError:
DIM, HIDDEN, HEADS, SEQ, VOCAB, NLAYERS = 768, 2048, 12, 256, 32000, 12
HD = DIM // HEADS
CKPT_PATH = 'ane_stories110M_ckpt.bin'
CKPT_PATH_STATIC = 'ane_stories110M_ckpt.bin'
CKPT_PATH_DYNAMIC = 'training_dynamic/ane_stories110M_dyn_ckpt.bin'
CKPT_PATH = CKPT_PATH_STATIC # set in main() based on --dynamic
TOKENIZER_PATH = str(Path(__file__).resolve().parent.parent.parent / 'assets' / 'models' / 'tokenizer.bin')
@ -56,6 +58,9 @@ class State:
self.mem_mb_history = deque(maxlen=300)
self.proc_mem_mb_history = deque(maxlen=300)
self.train_pid = None
self.step_timestamps = [] # (step, time.monotonic()) for running ms/step
self.train_start = None # wall clock when first step seen
self.compile_ms = 0.0 # total compile time
S = State()
@ -278,23 +283,69 @@ def sysmetrics_thread():
RE_CONFIG = re.compile(r'dim=(\d+) hidden=(\d+) heads=(\d+) seq=(\d+) vocab=(\d+) layers=(\d+)')
RE_PARAMS = re.compile(r'Params: ([\d.]+)M \(transformer ([\d.]+)M \+ embed ([\d.]+)M\)')
RE_KERNELS = re.compile(r'Kernels: (\d+).*?(\d+) weight-bearing')
RE_KERNELS_DYN = re.compile(r'Kernels: (\d+) compiled, (\d+) weight-bearing')
RE_ACCUM = re.compile(r'Accum (\d+).*LR=([\d.e+-]+)')
RE_STEP = re.compile(r'step\s+(\d+)\s+loss=([\d.]+)(?:\s+lr=([\d.e+-]+))?(?:\s+([\d.]+)ms/step)?')
RE_BATCH = re.compile(r'\[batch (\d+): compile=([\d.]+)ms train=([\d.]+)ms \(([\d.]+)ms/step\) compiles=(\d+)\]')
RE_TIMING = re.compile(r'ane=([\d.]+) io=([\d.]+) cls=([\d.]+) elem=([\d.]+) rms=([\d.]+) cblas_wait=([\d.]+)')
RE_TIMING_DYN = re.compile(r'ane_fwd=([\d.]+) io_fwd=([\d.]+) rms=([\d.]+) ane_bwd=([\d.]+) io_bwd=([\d.]+) silu=([\d.]+) rms_bwd=([\d.]+) cls=([\d.]+) cblas_wait=([\d.]+) dw_copy=([\d.]+)')
RE_RESTART = re.compile(r'\[exec\(\) restart step (\d+)')
RE_RESUME = re.compile(r'\[RESUMED step (\d+), loss=([\d.]+)\]')
RE_FLOPS = re.compile(r'FLOPs/step: fwd=([\d.]+)M bwd_dx=([\d.]+)M bwd_dW=([\d.]+)M sdpa_bwd=([\d.]+)M total=([\d.]+)M')
RE_ANE_FLOPS = re.compile(r'ANE FLOPs/step: ([\d.]+)M')
RE_ANE_TFLOPS = re.compile(r'ANE TFLOPS:\s+([\d.]+)')
RE_ANE_UTIL = re.compile(r'ANE utilization:\s+([\d.]+)%')
RE_EFFICIENCY = re.compile(r'(Total steps|Wall time|Compile time|Train time|Avg compile|Avg train|ANE TFLOPS|Total TFLOPS|ANE utilization):?\s+(.+)')
RE_EFFICIENCY = re.compile(r'(Total steps|Wall time|Compile time|Compile|Train time|Avg compile|Avg train|ANE TFLOPS|Total TFLOPS|ANE utilization):?\s+(.+)')
RE_COMPILED = re.compile(r'Compiled (\d+) kernels in (\d+)ms')
RE_ANE_POWER = re.compile(r'ANE Power:\s+([\d.]+)\s*mW')
RE_CPU_POWER = re.compile(r'CPU Power:\s+([\d.]+)\s*mW')
RE_GPU_POWER = re.compile(r'GPU Power:\s+([\d.]+)\s*mW')
def parse_line(line):
S.logs.append(line)
# Parse JSON lines from static pipeline ({"type":"step",...} or {"type":"batch",...})
stripped = line.strip()
if stripped.startswith('{'):
try:
j = json.loads(stripped)
jt = j.get('type')
if jt == 'step':
S.step, S.loss = j['step'], j['loss']
S.loss_history.append((S.step, S.loss))
S.best_loss = min(S.best_loss, S.loss)
S.compiles = j.get('compiles', S.compiles)
now = time.monotonic()
if S.train_start is None:
S.train_start = now
S.step_timestamps.append((S.step, now))
if len(S.step_timestamps) >= 2:
dt = S.step_timestamps[-1][1] - S.step_timestamps[-2][1]
if dt > 0:
S.ms_per_step = dt * 1000
# Extract component timing from JSON
ct = {}
for k in ('t_ane', 't_io', 't_cls', 't_elem', 't_rms', 't_cblas_wait'):
if k in j:
ct[k[2:]] = j[k] # strip 't_' prefix
if ct:
S.component_timing = ct
return
elif jt == 'batch':
S.batch_num = j.get('batch', S.batch_num)
compile_ms = j.get('compile_ms', 0)
train_ms = j.get('train_ms', 0)
S.ms_per_step = j.get('ms_per_step', S.ms_per_step)
S.compile_ms += compile_ms
S.compile_pct = 100 * S.compile_ms / (S.compile_ms + train_ms) if S.compile_ms + train_ms > 0 else 0
return
elif jt == 'perf':
if 'ane_tflops' in j:
S.flops['ane_tflops'] = j['ane_tflops']
if 'ane_util_pct' in j:
S.flops['ane_util'] = j['ane_util_pct']
return
except (json.JSONDecodeError, KeyError):
pass
m = RE_CONFIG.search(line)
if m:
S.model_config = dict(zip(['dim', 'hidden', 'heads', 'seq', 'vocab', 'layers'], map(int, m.groups())))
@ -303,7 +354,7 @@ def parse_line(line):
if m:
S.params = {'total': float(m[1]), 'transformer': float(m[2]), 'embed': float(m[3])}
return
m = RE_KERNELS.search(line)
m = RE_KERNELS_DYN.search(line) or RE_KERNELS.search(line)
if m:
S.kernels = {'total': int(m[1]), 'weight_bearing': int(m[2])}
return
@ -327,6 +378,14 @@ def parse_line(line):
S.training['lr'] = m[3]
if m[4]:
S.ms_per_step = float(m[4])
now = time.monotonic()
if S.train_start is None:
S.train_start = now
S.step_timestamps.append((S.step, now))
if not m[4] and len(S.step_timestamps) >= 2:
dt = S.step_timestamps[-1][1] - S.step_timestamps[-2][1]
if dt > 0:
S.ms_per_step = dt * 1000
S.loss_history.append((S.step, S.loss))
S.best_loss = min(S.best_loss, S.loss)
return
@ -338,6 +397,16 @@ def parse_line(line):
S.compiles = int(m[5])
S.compile_pct = 100 * compile_ms / (compile_ms + train_ms) if compile_ms + train_ms > 0 else 0
return
m = RE_TIMING_DYN.search(line)
if m:
vals = list(map(float, m.groups()))
S.component_timing = {
'ane_fwd': vals[0], 'io_fwd': vals[1], 'rms': vals[2],
'ane_bwd': vals[3], 'io_bwd': vals[4], 'silu': vals[5],
'rms_bwd': vals[6], 'cls': vals[7], 'cblas_wait': vals[8], 'dw_copy': vals[9],
'_dynamic': True
}
return
m = RE_TIMING.search(line)
if m:
S.component_timing = dict(zip(['ane', 'io', 'cls', 'elem', 'rms', 'cblas_wait'], map(float, m.groups())))
@ -350,6 +419,11 @@ def parse_line(line):
if m:
S.flops['ane_util'] = float(m[1])
return
m = RE_COMPILED.search(line)
if m:
S.compiles = int(m[1])
S.compile_ms += float(m[2])
return
m = RE_EFFICIENCY.search(line)
if m:
S.efficiency[m[1].strip()] = m[2].strip()
@ -518,23 +592,49 @@ def draw(term):
# Training stats (right panel)
sr = row
step_str = f'{S.step}' + (f'/{S.total_steps}' if S.total_steps and S.total_steps < 999999 else '')
put(sr, mid_x + 1, f' Step: {step_str} Loss: {S.loss:.4f}' if S.loss else ' Step: --', term.yellow)
# Elapsed time
elapsed = 0.0
if S.train_start:
elapsed = time.monotonic() - S.train_start
elapsed_str = f'{elapsed:.1f}s' if elapsed < 60 else f'{elapsed/60:.1f}m'
put(sr, mid_x + 1, f' Step: {step_str} Loss: {S.loss:.4f} [{elapsed_str}]' if S.loss else ' Step: --', term.yellow)
sr += 1
put(sr, mid_x + 1, f' Best: {S.best_loss:.4f} ms/step: {S.ms_per_step:.1f}' if S.best_loss < float('inf') else ' Best: --')
# ms/step + steps/sec
sps = 1000.0 / S.ms_per_step if S.ms_per_step > 0 else 0
put(sr, mid_x + 1, f' Best: {S.best_loss:.4f} {S.ms_per_step:.1f}ms/step ({sps:.1f} steps/s)' if S.best_loss < float('inf') else ' Best: --')
sr += 1
# TFLOPS
ane_tflops = S.flops.get('ane_tflops', 0)
ane_util = S.flops.get('ane_util', 0)
total_tflops = 0
if S.ms_per_step > 0 and S.flops.get('ane', 0) > 0:
if not ane_tflops:
ane_tflops = (S.flops['ane'] * 1e6) / (S.ms_per_step * 1e-3) / 1e12
total_tflops = (S.flops.get('total', 0) * 1e6) / (S.ms_per_step * 1e-3) / 1e12
if not ane_util and ane_tflops:
ane_util = 100.0 * ane_tflops / 15.8
compile_str = f' Compile: {S.compile_ms/1000:.1f}s' if S.compile_ms > 0 else ''
if ane_tflops:
put(sr, mid_x + 1, f' ANE: {ane_tflops:.2f}T Compile: {S.compile_pct:.0f}% Util: {ane_util:.1f}%')
else:
put(sr, mid_x + 1, f' Compile: {S.compile_pct:.0f}%')
tflops_str = f' ANE: {ane_tflops:.2f}T'
if total_tflops:
tflops_str += f' Total: {total_tflops:.2f}T'
tflops_str += f' Util: {ane_util:.1f}%{compile_str}'
put(sr, mid_x + 1, tflops_str)
elif compile_str:
put(sr, mid_x + 1, f'{compile_str}')
sr += 1
ct = S.component_timing
if ct:
put(sr, mid_x + 1, f' ane={ct.get("ane", 0):.1f} io={ct.get("io", 0):.1f} cls={ct.get("cls", 0):.1f} elem={ct.get("elem", 0):.1f}')
sr += 1
put(sr, mid_x + 1, f' rms={ct.get("rms", 0):.1f} cblas_wait={ct.get("cblas_wait", 0):.1f} ms/step')
sr += 1
if ct.get('_dynamic'):
put(sr, mid_x + 1, f' fwd={ct.get("ane_fwd",0):.1f} bwd={ct.get("ane_bwd",0):.1f} io={ct.get("io_fwd",0)+ct.get("io_bwd",0):.1f} silu={ct.get("silu",0):.1f}')
sr += 1
put(sr, mid_x + 1, f' cls={ct.get("cls",0):.1f} rms={ct.get("rms",0)+ct.get("rms_bwd",0):.1f} dw={ct.get("dw_copy",0):.1f} ms/step')
sr += 1
else:
put(sr, mid_x + 1, f' ane={ct.get("ane", 0):.1f} io={ct.get("io", 0):.1f} cls={ct.get("cls", 0):.1f} elem={ct.get("elem", 0):.1f}')
sr += 1
put(sr, mid_x + 1, f' rms={ct.get("rms", 0):.1f} cblas_wait={ct.get("cblas_wait", 0):.1f} ms/step')
sr += 1
pw = S.power
if any(pw.values()):
put(sr, mid_x + 1, '\u2500 Power ' + '\u2500' * max(0, right_w - 9), term.cyan)
@ -663,9 +763,12 @@ def set_nonblock(fd):
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def spawn_training(resume=False, steps=10000, dynamic=False, scratch=False, lr=None, accum=None):
def spawn_training(resume=False, steps=10000, dynamic=False, ane=False, scratch=False,
lr=None, accum=None, no_ane_extras=False):
if dynamic:
cmd = 'cd training_dynamic && make 2>&1 && ./train'
elif ane:
cmd = 'make train_large_ane 2>&1 && ./train_large_ane'
else:
cmd = 'make train_large 2>&1 && ./train_large'
if resume:
@ -674,8 +777,10 @@ def spawn_training(resume=False, steps=10000, dynamic=False, scratch=False, lr=N
cmd += ' --scratch'
if lr is not None:
cmd += f' --lr {lr}'
if accum is not None:
if accum is not None and dynamic:
cmd += f' --accum {accum}'
if no_ane_extras and ane:
cmd += ' --no-ane-extras'
cmd += f' --steps {steps}'
proc = subprocess.Popen(
['bash', '-c', cmd],
@ -697,7 +802,9 @@ def spawn_powermetrics():
def main():
parser = argparse.ArgumentParser(description='ANE Training Dashboard (stories110M)')
parser.add_argument('--resume', action='store_true', help='Resume from checkpoint')
parser.add_argument('--dynamic', action='store_true', help='Use v2 dynamic weight pipeline (training_dynamic/)')
parser.add_argument('--dynamic', action='store_true', help='Dynamic weight pipeline (training_dynamic/)')
parser.add_argument('--ane', action='store_true', help='PR#19: ANE-offloaded classifier/softmax/rmsnorm_bwd')
parser.add_argument('--no-ane-extras', action='store_true', help='Disable ANE extras (use with --ane)')
parser.add_argument('--scratch', action='store_true', help='Train from scratch (random init)')
parser.add_argument('--lr', type=float, default=None, help='Learning rate')
parser.add_argument('--accum', type=int, default=None, help='Gradient accumulation steps')
@ -711,11 +818,15 @@ def main():
args.steps = 999999999
S.total_steps = args.steps
global CKPT_PATH
CKPT_PATH = CKPT_PATH_DYNAMIC if args.dynamic else CKPT_PATH_STATIC
term = Terminal()
procs = []
train_proc = spawn_training(resume=args.resume, steps=args.steps, dynamic=args.dynamic,
scratch=args.scratch, lr=args.lr, accum=args.accum)
scratch=args.scratch, lr=args.lr, accum=args.accum,
ane=args.ane, no_ane_extras=args.no_ane_extras)
S.train_pid = train_proc.pid
procs.append(train_proc)
@ -856,7 +967,8 @@ def main():
train_proc.terminate()
train_proc.wait()
train_proc = spawn_training(resume=True, steps=args.steps, dynamic=args.dynamic,
lr=args.lr, accum=args.accum)
lr=args.lr, accum=args.accum,
ane=args.ane, no_ane_extras=args.no_ane_extras)
S.train_pid = train_proc.pid
procs = [p for p in procs if p.poll() is None]
procs.append(train_proc)