Files
ietf-draft-analyzer/src/webui/data/analysis.py
Christian Nennemann d11e980a6a
Some checks failed
CI / test (3.11) (push) Failing after 9s
CI / test (3.12) (push) Failing after 9s
fix(webui): timeline opens on full landscape, drop undated points
After the chronological-order fix the page opened on the earliest month
(1995-12), showing only a handful of same-colored dots while the legend
listed every category — looked broken.

- Initialise the plot on the LAST frame (full landscape); Play now replays
  the build-up from the start (fromcurrent: false), slider starts at the end.
- Make _extract_month robust: year-only / junk dates (ISO/ETSI 'time' like
  '2015/CD Amd 2', bare '2023', '') no longer yield malformed month labels
  ('2015-/C') or a garbled 'unknown' frame badge.
- Drop undated docs from the temporal animation (they remain on /landscape).

At the full initial frame every category has points, so the legend matches
what is drawn.
2026-05-23 12:41:06 +02:00

2053 lines
79 KiB
Python

"""Analysis, visualization, and complex computation data access functions."""
from __future__ import annotations
import json
import re
from collections import Counter, defaultdict
from typing import TypedDict
import numpy as np
from sklearn.cluster import AgglomerativeClustering
from sklearn.manifold import TSNE
from sklearn.preprocessing import normalize as sk_normalize
from ietf_analyzer.config import Config
from ietf_analyzer.db import Database
SAFETY_CATEGORIES = {"AI safety/alignment", "Agent identity/auth", "Policy/governance"}
CAPABILITY_CATEGORIES = {"A2A protocols", "Agent discovery/reg", "Autonomous netops",
"Data formats/interop", "Human-agent interaction", "Model serving/inference"}
from webui.data._shared import _cached, _extract_month
from webui.data.drafts import get_draft_detail
_ARCH_LAYERS = [
{"id": "transport", "label": "Transport & Networking", "order": 0,
"keywords": {"transport", "network", "routing", "tunnel", "packet", "flow", "traffic", "qos", "sdwan", "mpls", "bgp", "ospf", "segment", "srv6", "quic", "http", "grpc", "mqtt", "yang", "snmp", "netconf", "restconf"}},
{"id": "identity", "label": "Identity & Trust", "order": 1,
"keywords": {"identity", "auth", "authentication", "authorization", "credential", "certificate", "trust", "attestation", "oauth", "token", "signing", "verification", "verifiable", "did", "vc", "pki", "spiffe", "acl"}},
{"id": "discovery", "label": "Discovery & Registration", "order": 2,
"keywords": {"discovery", "registration", "registry", "catalog", "advertisement", "announce", "capability", "service", "lookup", "resolution", "dns", "directory"}},
{"id": "communication", "label": "Agent Communication", "order": 3,
"keywords": {"a2a", "agent", "communication", "message", "messaging", "protocol", "exchange", "negotiation", "handshake", "session", "dialogue", "interaction", "mcp", "interop"}},
{"id": "coordination", "label": "Task & Coordination", "order": 4,
"keywords": {"task", "delegation", "orchestration", "workflow", "planning", "coordination", "consensus", "collaboration", "multi-agent", "swarm", "composition", "scheduling"}},
{"id": "intelligence", "label": "AI & Inference", "order": 5,
"keywords": {"model", "inference", "learning", "training", "ml", "neural", "llm", "embedding", "reasoning", "decision", "prediction", "classification", "generative", "rag", "fine-tuning"}},
{"id": "safety", "label": "Safety & Governance", "order": 6,
"keywords": {"safety", "ethical", "governance", "policy", "audit", "explainability", "transparency", "accountability", "bias", "fairness", "compliance", "regulation", "risk", "shutdown", "alignment", "adversarial", "privacy", "consent"}},
{"id": "application", "label": "Application Domains", "order": 7,
"keywords": {"healthcare", "autonomous", "vehicle", "robotics", "iot", "digital twin", "supply chain", "finance", "manufacturing", "energy", "smart", "edge", "cloud", "sensing"}},
]
_LAYER_KEYWORDS = {l["id"]: l["keywords"] for l in _ARCH_LAYERS}
class TimelineData(TypedDict):
"""Monthly category counts from :func:`get_timeline_data`."""
months: list[str]
series: dict[str, list[int]]
categories: list[str]
class SimilarityGraphStats(TypedDict):
"""Stats sub-dict in similarity graph."""
node_count: int
edge_count: int
avg_similarity: float
class SimilarityGraph(TypedDict):
"""Draft similarity network from :func:`get_similarity_graph`."""
nodes: list[dict]
edges: list[dict]
stats: SimilarityGraphStats
class CitationGraphStats(TypedDict):
"""Stats sub-dict in citation graph."""
node_count: int
edge_count: int
rfc_count: int
draft_count: int
class CitationGraph(TypedDict):
"""Citation network from :func:`get_citation_graph`."""
nodes: list[dict]
edges: list[dict]
stats: CitationGraphStats
class MonitorCost(TypedDict):
"""Cost sub-dict in monitor status."""
input_tokens: int
output_tokens: int
estimated_usd: float
class MonitorPipeline(TypedDict):
"""Pipeline sub-dict in monitor status."""
total_drafts: int
rated: int
embedded: int
with_ideas: int
idea_total: int
gap_count: int
class MonitorStatus(TypedDict):
"""Monitor status from :func:`get_monitor_status`."""
last_run: dict | None
runs: list[dict]
unprocessed: dict[str, int]
total_runs: int
pipeline: MonitorPipeline
cost: MonitorCost
def get_ideas_by_type(db: Database) -> dict:
"""Return ideas grouped by type with counts."""
all_ideas = db.all_ideas()
type_counts = Counter(i.get("type", "other") or "other" for i in all_ideas)
return {
"total": len(all_ideas),
"by_type": dict(type_counts.most_common()),
"ideas": all_ideas,
}
def get_idea_detail(db: Database, idea_id: int) -> dict | None:
"""Return a single idea with source draft info and similar ideas."""
row = db.conn.execute("SELECT * FROM ideas WHERE id = ?", (idea_id,)).fetchone()
if not row:
return None
idea = {
"id": row["id"],
"title": row["title"],
"description": row["description"],
"type": row["idea_type"],
"draft_name": row["draft_name"],
"novelty_score": row["novelty_score"],
}
# Get source draft info
draft = db.get_draft(row["draft_name"])
if draft:
idea["draft_title"] = draft.title
idea["draft_date"] = draft.date
# Get category from ratings
rated = db.drafts_with_ratings(limit=2000)
for d, r in rated:
if d.name == row["draft_name"]:
idea["categories"] = r.categories
break
# Find similar ideas using embeddings
similar = []
emb_row = db.conn.execute(
"SELECT vector FROM idea_embeddings WHERE idea_id = ?", (idea_id,)
).fetchone()
if emb_row:
target_vec = np.frombuffer(emb_row["vector"], dtype=np.float32)
all_embs = db.all_idea_embeddings()
# Compute cosine similarities
scores = []
for other_id, other_vec in all_embs.items():
if other_id == idea_id:
continue
cos_sim = float(np.dot(target_vec, other_vec) / (
np.linalg.norm(target_vec) * np.linalg.norm(other_vec) + 1e-9))
scores.append((other_id, cos_sim))
scores.sort(key=lambda x: x[1], reverse=True)
top_5 = scores[:5]
# Fetch idea details for top 5
if top_5:
ids = [s[0] for s in top_5]
sim_map = {s[0]: s[1] for s in top_5}
placeholders = ",".join("?" * len(ids))
sim_rows = db.conn.execute(
f"SELECT id, title, idea_type, draft_name FROM ideas WHERE id IN ({placeholders})",
ids,
).fetchall()
sim_dict = {r["id"]: r for r in sim_rows}
for sid, score in top_5:
sr = sim_dict.get(sid)
if sr:
similar.append({
"id": sr["id"],
"title": sr["title"],
"type": sr["idea_type"],
"draft_name": sr["draft_name"],
"similarity": round(score, 3),
})
idea["similar"] = similar
return idea
def get_timeline_data(db: Database) -> TimelineData:
"""Return monthly counts by category for timeline chart."""
pairs = db.drafts_with_ratings(limit=1000)
all_drafts = db.list_drafts(limit=1000, order_by="time ASC")
rating_map = {d.name: r for d, r in pairs}
month_cat: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for d in all_drafts:
month = _extract_month(d.time)
r = rating_map.get(d.name)
if r:
cat = r.categories[0] if r.categories else "Other"
month_cat[month][cat] += 1
months = sorted(month_cat.keys())
cat_totals: Counter = Counter()
for mc in month_cat.values():
for c, cnt in mc.items():
cat_totals[c] += cnt
top_cats = [c for c, _ in cat_totals.most_common(10)]
series = {}
for cat in top_cats:
series[cat] = [month_cat[m].get(cat, 0) for m in months]
return {"months": months, "series": series, "categories": top_cats}
def get_similarity_graph(db: Database, threshold: float = 0.75) -> SimilarityGraph:
"""Return draft similarity network (cached)."""
return _cached(f"similarity_{threshold}", lambda: _compute_similarity_graph(db, threshold))
def _compute_similarity_graph(db: Database, threshold: float = 0.75) -> SimilarityGraph:
"""Return draft similarity network for force-directed graph.
Returns {nodes: [{name, title, category, score}],
edges: [{source, target, similarity}],
stats: {node_count, edge_count, avg_similarity}}
"""
embeddings = db.all_embeddings()
if len(embeddings) < 2:
return {"nodes": [], "edges": [], "stats": {"node_count": 0, "edge_count": 0, "avg_similarity": 0}}
pairs = db.drafts_with_ratings(limit=1000)
rating_map = {d.name: r for d, r in pairs}
draft_map = {d.name: d for d, _ in pairs}
# Filter to drafts with both embeddings and ratings
names = [n for n in embeddings if n in rating_map]
if len(names) < 2:
return {"nodes": [], "edges": [], "stats": {"node_count": 0, "edge_count": 0, "avg_similarity": 0}}
matrix = np.array([embeddings[n] for n in names])
# L2-normalize and compute cosine similarity
norms = np.linalg.norm(matrix, axis=1, keepdims=True)
norms[norms == 0] = 1.0
normalized = matrix / norms
sim_matrix = normalized @ normalized.T
# Find pairs above threshold (upper triangle only)
edges = []
node_set = set()
for i in range(len(names)):
for j in range(i + 1, len(names)):
sim = float(sim_matrix[i, j])
if sim >= threshold:
edges.append({"source": names[i], "target": names[j], "similarity": round(sim, 4)})
node_set.add(names[i])
node_set.add(names[j])
# Build nodes from connected drafts only
nodes = []
for name in names:
if name not in node_set:
continue
r = rating_map[name]
d = draft_map.get(name)
nodes.append({
"name": name,
"title": d.title if d else name,
"category": r.categories[0] if r.categories else "Other",
"score": round(r.composite_score, 2),
})
avg_sim = round(sum(e["similarity"] for e in edges) / max(len(edges), 1), 4)
return {
"nodes": nodes,
"edges": edges,
"stats": {"node_count": len(nodes), "edge_count": len(edges), "avg_similarity": avg_sim},
}
def get_idea_clusters(db: Database) -> dict:
"""Cluster ideas (cached for 5 min)."""
return _cached("idea_clusters", lambda: _compute_idea_clusters(db))
def _compute_idea_clusters(db: Database) -> dict:
"""Cluster ideas by embedding similarity, return clusters + t-SNE scatter.
Uses Ward linkage on L2-normalized embeddings (approximates cosine) with
a target of ~30 clusters for readable groupings. Enriches each cluster
with WG info and category breakdown.
"""
embeddings = db.all_idea_embeddings()
if not embeddings:
return {"clusters": [], "scatter": [], "stats": {"total": 0, "clustered": 0, "num_clusters": 0}, "empty": True}
# Exclude ideas from false-positive drafts
fp_names = db.false_positive_names()
# Fetch ideas with IDs for metadata lookup
rows = db.conn.execute("SELECT id, title, description, idea_type, draft_name FROM ideas").fetchall()
idea_map = {r["id"]: {"title": r["title"], "description": r["description"],
"type": r["idea_type"], "draft_name": r["draft_name"]}
for r in rows if r["draft_name"] not in fp_names}
# Remove FP ideas from embeddings too
embeddings = {k: v for k, v in embeddings.items() if k in idea_map}
# Draft -> WG and category lookup
draft_rows = db.conn.execute('SELECT name, "group", title FROM drafts').fetchall()
draft_wg = {r["name"]: r["group"] or "none" for r in draft_rows}
draft_title_map = {r["name"]: r["title"] for r in draft_rows}
rating_rows = db.conn.execute("SELECT draft_name, categories FROM ratings WHERE COALESCE(false_positive, 0) = 0").fetchall()
draft_cats: dict[str, list[str]] = {}
for r in rating_rows:
try:
draft_cats[r["draft_name"]] = json.loads(r["categories"]) if r["categories"] else []
except (json.JSONDecodeError, TypeError):
draft_cats[r["draft_name"]] = []
# Build matrix from embeddings that have matching ideas
idea_ids = [iid for iid in embeddings if iid in idea_map]
if len(idea_ids) < 5:
return {"clusters": [], "scatter": [], "stats": {"total": len(idea_ids), "clustered": 0, "num_clusters": 0}, "empty": True}
matrix = np.array([embeddings[iid] for iid in idea_ids])
matrix_norm = sk_normalize(matrix)
# Ward clustering on normalized vectors — target ~30 clusters scaled by dataset size
n_target = max(10, min(40, len(idea_ids) // 12))
try:
clustering = AgglomerativeClustering(n_clusters=n_target, linkage='ward')
labels = clustering.fit_predict(matrix_norm)
except Exception:
return {"clusters": [], "scatter": [], "stats": {"total": len(idea_ids), "clustered": 0, "num_clusters": 0}, "empty": True}
# Build cluster data
cluster_ideas_map: dict[int, list] = defaultdict(list)
for idx, iid in enumerate(idea_ids):
cluster_ideas_map[labels[idx]].append(iid)
stop = {"a", "an", "the", "of", "for", "in", "to", "and", "or", "with",
"on", "by", "is", "as", "at", "from", "that", "this", "it",
"based", "using", "protocol", "mechanism", "framework", "system",
"network", "agent", "agents"}
clusters = []
for cid in sorted(cluster_ideas_map.keys()):
members = cluster_ideas_map[cid]
ideas_in_cluster = [idea_map[iid] for iid in members if iid in idea_map]
if len(ideas_in_cluster) < 2:
continue
# Theme: most common significant words in titles
words = Counter()
for idea in ideas_in_cluster:
for w in idea["title"].lower().split():
w_clean = w.strip("()[].,;:-\"'")
if len(w_clean) > 2 and w_clean not in stop:
words[w_clean] += 1
top_words = [w for w, _ in words.most_common(4)]
theme = " ".join(top_words).title() if top_words else f"Cluster {cid}"
drafts = list({idea["draft_name"] for idea in ideas_in_cluster})
# Enrich: WG breakdown
wg_counts: dict[str, int] = Counter()
cat_counts: dict[str, int] = Counter()
for dname in drafts:
wg = draft_wg.get(dname, "none")
wg_counts[wg] += 1
for cat in draft_cats.get(dname, []):
cat_counts[cat] += 1
wg_list = [{"wg": wg, "count": cnt} for wg, cnt in wg_counts.most_common(5)]
cat_list = [{"cat": cat, "count": cnt} for cat, cnt in cat_counts.most_common(3)]
cross_wg = len([w for w in wg_counts if w != "none"]) >= 2
clusters.append({
"id": len(clusters),
"theme": theme,
"size": len(ideas_in_cluster),
"ideas": ideas_in_cluster[:20],
"drafts": drafts,
"wgs": wg_list,
"categories": cat_list,
"cross_wg": cross_wg,
"wg_count": len(wg_counts),
})
clusters.sort(key=lambda c: c["size"], reverse=True)
# Build mapping: original cluster label -> sorted index
# Each cluster remembers which original label it came from via its member ids
old_label_to_new: dict[int, int] = {}
for new_idx, c in enumerate(clusters):
c["id"] = new_idx
# Find original label for any member of this cluster
for old_cid, members in cluster_ideas_map.items():
if members and members[0] in [iid for iid in members if iid in idea_map]:
member_titles = {idea_map[m]["title"] for m in members if m in idea_map}
c_titles = {idea["title"] for idea in c["ideas"]}
if member_titles == c_titles or (member_titles & c_titles and len(members) == c["size"]):
old_label_to_new[old_cid] = new_idx
break
# Fallback: build from idea_id -> label mapping
iid_to_new: dict[int, int] = {}
for old_cid, members in cluster_ideas_map.items():
new_idx = old_label_to_new.get(old_cid, old_cid)
for iid in members:
iid_to_new[iid] = new_idx
# t-SNE for scatter
scatter = []
try:
perp = min(30, len(idea_ids) - 1)
tsne = TSNE(n_components=2, perplexity=perp, random_state=42, max_iter=500)
coords = tsne.fit_transform(matrix_norm)
for idx, iid in enumerate(idea_ids):
info = idea_map.get(iid, {})
scatter.append({
"x": round(float(coords[idx, 0]), 3),
"y": round(float(coords[idx, 1]), 3),
"cluster_id": iid_to_new.get(iid, int(labels[idx])),
"title": info.get("title", ""),
"draft_name": info.get("draft_name", ""),
"wg": draft_wg.get(info.get("draft_name", ""), ""),
})
except Exception:
pass
# --- Cross-cluster links ---
# Find pairs of clusters whose ideas are semantically related
# Use centroid similarity + best idea-pair links
links = []
if len(clusters) >= 2:
# Build cluster centroids from normalized embeddings
cluster_centroids = {}
cluster_member_indices: dict[int, list[int]] = defaultdict(list)
for idx, iid in enumerate(idea_ids):
cid = iid_to_new.get(iid, int(labels[idx]))
cluster_member_indices[cid].append(idx)
for cid, indices in cluster_member_indices.items():
if indices:
centroid = matrix_norm[indices].mean(axis=0)
norm = np.linalg.norm(centroid)
if norm > 0:
cluster_centroids[cid] = centroid / norm
# Compute pairwise centroid similarity for all cluster pairs
cids_sorted = sorted(cluster_centroids.keys())
for ci_idx, ci in enumerate(cids_sorted):
for cj in cids_sorted[ci_idx + 1:]:
sim = float(np.dot(cluster_centroids[ci], cluster_centroids[cj]))
if sim < 0.45:
continue
# Find the best idea pair across these two clusters
best_sim = 0.0
best_pair = (None, None)
# Sample up to 20 ideas per cluster to keep it fast
ci_members = cluster_member_indices[ci][:20]
cj_members = cluster_member_indices[cj][:20]
for mi in ci_members:
for mj in cj_members:
pair_sim = float(np.dot(matrix_norm[mi], matrix_norm[mj]))
if pair_sim > best_sim:
best_sim = pair_sim
best_pair = (idea_ids[mi], idea_ids[mj])
if best_sim < 0.5:
continue
# Get theme names
ci_theme = next((c["theme"] for c in clusters if c["id"] == ci), f"Cluster {ci}")
cj_theme = next((c["theme"] for c in clusters if c["id"] == cj), f"Cluster {cj}")
idea_a = idea_map.get(best_pair[0], {})
idea_b = idea_map.get(best_pair[1], {})
links.append({
"source": ci,
"target": cj,
"source_theme": ci_theme,
"target_theme": cj_theme,
"similarity": round(sim, 3),
"best_pair_sim": round(best_sim, 3),
"idea_a": idea_a.get("title", ""),
"idea_a_draft": idea_a.get("draft_name", ""),
"idea_b": idea_b.get("title", ""),
"idea_b_draft": idea_b.get("draft_name", ""),
})
links.sort(key=lambda l: l["best_pair_sim"], reverse=True)
links = links[:50] # cap at top 50 links
total = len(idea_ids)
clustered = sum(c["size"] for c in clusters)
return {
"clusters": clusters,
"scatter": scatter,
"links": links,
"stats": {"total": total, "clustered": clustered, "num_clusters": len(clusters)},
"empty": False,
}
def get_timeline_animation_data(db: Database) -> dict:
"""Timeline animation (cached for 5 min)."""
return _cached("timeline_animation", lambda: _compute_timeline_animation_data(db))
def _compute_timeline_animation_data(db: Database) -> dict:
"""Compute t-SNE on all drafts, return points with month info + category_monthly.
t-SNE is computed once on ALL drafts so coordinates are stable across
animation frames. Each point carries a ``month`` field (YYYY-MM) so the
front-end can build cumulative animation frames.
"""
embeddings = db.all_embeddings()
if len(embeddings) < 5:
return {"points": [], "months": [], "category_monthly": {}}
pairs = db.drafts_with_ratings(limit=1000)
rating_map = {d.name: r for d, r in pairs}
draft_map = {d.name: d for d, _ in pairs}
# Filter to drafts that have both embeddings and ratings
names = [n for n in embeddings if n in rating_map]
if len(names) < 5:
return {"points": [], "months": [], "category_monthly": {}}
matrix = np.array([embeddings[n] for n in names])
try:
tsne = TSNE(n_components=2, perplexity=min(30, len(names) - 1),
random_state=42, max_iter=500)
coords = tsne.fit_transform(matrix)
except Exception:
return {"points": [], "months": [], "category_monthly": {}}
# Build points with month
points = []
month_set: set[str] = set()
category_monthly: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for i, name in enumerate(names):
r = rating_map[name]
d = draft_map.get(name)
month = _extract_month(d.time if d else None)
if month == "unknown":
continue # Undated docs (e.g. ISO/ETSI) can't be placed on a temporal animation
cat = r.categories[0] if r.categories else "Other"
month_set.add(month)
category_monthly[month][cat] += 1
points.append({
"name": name,
"title": d.title if d else name,
"x": round(float(coords[i, 0]), 3),
"y": round(float(coords[i, 1]), 3),
"category": cat,
"score": round(r.composite_score, 2),
"month": month,
})
# Deliver points in chronological order so the front-end's cumulative
# filter (p.month <= frame) is append-only. Otherwise new points get
# inserted mid-array and Plotly's index-based frame transition animates
# existing markers flying to other drafts' coordinates ("jumping points").
points.sort(key=lambda p: (p["month"], p["name"]))
months = sorted(month_set)
# Convert defaultdict to plain dict for JSON
cat_monthly_plain = {m: dict(cats) for m, cats in category_monthly.items()}
return {
"points": points,
"months": months,
"category_monthly": cat_monthly_plain,
}
def get_monitor_status(db: Database) -> MonitorStatus:
"""Return monitoring status data for dashboard."""
runs = db.get_monitor_runs(limit=20)
last = runs[0] if runs else None
total_drafts = db.count_drafts()
rated_count = len(db.drafts_with_ratings(limit=10000))
unrated = len(db.unrated_drafts(limit=9999))
unembedded = len(db.drafts_without_embeddings(limit=9999))
embedded_count = total_drafts - unembedded
no_ideas = len(db.drafts_without_ideas(limit=9999))
ideas_count = total_drafts - no_ideas
idea_total = db.idea_count()
gap_count = len(db.all_gaps())
input_tok, output_tok = db.total_tokens_used()
# Estimate cost (Sonnet pricing: $3/M input, $15/M output)
est_cost = (input_tok * 3.0 / 1_000_000) + (output_tok * 15.0 / 1_000_000)
return {
"last_run": last,
"runs": runs,
"unprocessed": {"unrated": unrated, "unembedded": unembedded, "no_ideas": no_ideas},
"total_runs": len(runs),
"pipeline": {
"total_drafts": total_drafts,
"rated": rated_count,
"embedded": embedded_count,
"with_ideas": ideas_count,
"idea_total": idea_total,
"gap_count": gap_count,
},
"cost": {
"input_tokens": input_tok,
"output_tokens": output_tok,
"estimated_usd": round(est_cost, 2),
},
}
def get_citation_graph(db: Database, min_refs: int = 2) -> CitationGraph:
"""Return citation graph (cached for 5 min)."""
return _cached(f"citation_graph_{min_refs}", lambda: _compute_citation_graph(db, min_refs))
def _compute_citation_graph(db: Database, min_refs: int = 2) -> CitationGraph:
"""Return citation network data for force-directed graph.
Returns {nodes: [{id, type, title, influence, ...}],
edges: [{source, target}],
stats: {node_count, edge_count, ...}}
"""
# Get all references
rows = db.conn.execute(
"SELECT draft_name, ref_type, ref_id FROM draft_refs"
).fetchall()
# Count in-degree for each referenced item
in_degree: dict[str, int] = Counter()
edges_raw = []
for r in rows:
ref_key = f"{r['ref_type']}:{r['ref_id']}"
in_degree[ref_key] += 1
edges_raw.append((r["draft_name"], ref_key))
# Also count drafts as source nodes
draft_out: dict[str, int] = Counter()
for draft_name, _ in edges_raw:
draft_out[draft_name] += 1
# Get draft titles for labeling
draft_rows = db.conn.execute("SELECT name, title FROM drafts").fetchall()
draft_titles = {r["name"]: r["title"] for r in draft_rows}
# Get rating categories for draft coloring
rating_rows = db.conn.execute("SELECT draft_name, categories FROM ratings").fetchall()
draft_cats = {}
for r in rating_rows:
try:
cats = json.loads(r["categories"]) if r["categories"] else []
draft_cats[r["draft_name"]] = cats[0] if cats else "Other"
except Exception:
draft_cats[r["draft_name"]] = "Other"
# Filter: keep RFCs with min_refs+ references and all drafts that reference them
top_refs = {k: v for k, v in in_degree.items() if v >= min_refs}
# Build node set
node_set = set()
filtered_edges = []
for draft_name, ref_key in edges_raw:
if ref_key in top_refs:
node_set.add(draft_name)
node_set.add(ref_key)
filtered_edges.append({"source": draft_name, "target": ref_key})
# Limit to ~200 nodes max for readability
if len(node_set) > 250:
# Keep only refs with higher in-degree
sorted_refs = sorted(top_refs.items(), key=lambda x: x[1], reverse=True)
keep_refs = set(k for k, _ in sorted_refs[:80])
node_set = set()
filtered_edges = []
for draft_name, ref_key in edges_raw:
if ref_key in keep_refs:
node_set.add(draft_name)
node_set.add(ref_key)
filtered_edges.append({"source": draft_name, "target": ref_key})
# Build nodes
nodes = []
for nid in node_set:
if ":" in nid and not nid.startswith("draft-"):
# It's a reference node (rfc:1234, bcp:14, etc.)
ref_type, ref_id = nid.split(":", 1)
influence = in_degree.get(nid, 0)
if ref_type == "rfc":
try:
title = f"RFC {int(ref_id)}"
except ValueError:
title = f"RFC {ref_id}"
else:
title = f"{ref_type.upper()} {ref_id}"
nodes.append({
"id": nid,
"type": ref_type,
"title": title,
"influence": influence,
"ref_id": ref_id,
})
else:
# It's a draft node
influence = in_degree.get(nid, 0) + draft_out.get(nid, 0)
nodes.append({
"id": nid,
"type": "draft",
"title": draft_titles.get(nid, nid),
"influence": draft_out.get(nid, 0),
"category": draft_cats.get(nid, "Other"),
})
# Stats
rfc_count = sum(1 for n in nodes if n["type"] == "rfc")
draft_count = sum(1 for n in nodes if n["type"] == "draft")
return {
"nodes": nodes,
"edges": filtered_edges,
"stats": {
"node_count": len(nodes),
"edge_count": len(filtered_edges),
"rfc_count": rfc_count,
"draft_count": draft_count,
},
}
def get_landscape_tsne(db: Database) -> list[dict]:
"""Compute t-SNE (cached for 5 min)."""
return _cached("landscape_tsne", lambda: _compute_landscape_tsne(db))
def _compute_landscape_tsne(db: Database) -> list[dict]:
"""Compute t-SNE from embeddings, return [{name, title, x, y, category, score}]."""
embeddings = db.all_embeddings()
if len(embeddings) < 5:
return []
pairs = db.drafts_with_ratings(limit=1000)
rating_map = {d.name: r for d, r in pairs}
draft_map = {d.name: d for d, _ in pairs}
# Filter to drafts that have both embeddings and ratings
names = [n for n in embeddings if n in rating_map]
if len(names) < 5:
return []
matrix = np.array([embeddings[n] for n in names])
try:
tsne = TSNE(n_components=2, perplexity=min(30, len(names) - 1),
random_state=42, max_iter=500)
coords = tsne.fit_transform(matrix)
except Exception:
return []
result = []
for i, name in enumerate(names):
r = rating_map[name]
d = draft_map.get(name)
result.append({
"name": name,
"title": d.title if d else name,
"x": round(float(coords[i, 0]), 3),
"y": round(float(coords[i, 1]), 3),
"category": r.categories[0] if r.categories else "Other",
"score": round(r.composite_score, 2),
})
return result
def get_comparison_data(db: Database, names: list[str]) -> dict | None:
"""Get comparison data for a list of drafts.
Returns {
drafts: [{name, title, abstract, rating, ideas, refs, ...}],
shared_ideas: [{title, drafts: [name,...]}],
unique_ideas: {name: [{title, description}]},
shared_refs: [{type, id, drafts: [name,...]}],
unique_refs: {name: [{type, id}]},
similarities: [{a, b, similarity}],
comparison_text: str | None,
}
"""
drafts_data = []
all_ideas: dict[str, list[dict]] = {}
all_refs: dict[str, list[tuple[str, str]]] = {}
for name in names:
detail = get_draft_detail(db, name)
if not detail:
continue
drafts_data.append(detail)
all_ideas[name] = detail.get("ideas", [])
all_refs[name] = [(r["type"], r["id"]) for r in detail.get("refs", [])]
if len(drafts_data) < 2:
return None
# Find shared vs unique ideas (by title similarity)
idea_title_drafts: dict[str, list[str]] = {}
for name, ideas in all_ideas.items():
for idea in ideas:
title_lower = idea["title"].lower().strip()
if title_lower not in idea_title_drafts:
idea_title_drafts[title_lower] = []
idea_title_drafts[title_lower].append(name)
shared_ideas = [
{"title": title, "drafts": draft_list}
for title, draft_list in idea_title_drafts.items()
if len(set(draft_list)) > 1
]
unique_ideas: dict[str, list[dict]] = {}
for name, ideas in all_ideas.items():
unique = []
for idea in ideas:
title_lower = idea["title"].lower().strip()
if len(set(idea_title_drafts.get(title_lower, []))) <= 1:
unique.append({"title": idea["title"], "description": idea.get("description", "")})
unique_ideas[name] = unique
# Find shared vs unique references
ref_drafts: dict[tuple[str, str], list[str]] = {}
for name, refs in all_refs.items():
for ref in refs:
if ref not in ref_drafts:
ref_drafts[ref] = []
ref_drafts[ref].append(name)
shared_refs = [
{"type": ref[0], "id": ref[1], "drafts": draft_list}
for ref, draft_list in ref_drafts.items()
if len(set(draft_list)) > 1
]
unique_refs: dict[str, list[dict]] = {}
for name, refs in all_refs.items():
unique = []
for ref in refs:
if len(set(ref_drafts.get(ref, []))) <= 1:
unique.append({"type": ref[0], "id": ref[1]})
unique_refs[name] = unique
# Pairwise embedding similarities
embeddings = db.all_embeddings()
similarities = []
valid_names = [d["name"] for d in drafts_data]
for i in range(len(valid_names)):
for j in range(i + 1, len(valid_names)):
a, b = valid_names[i], valid_names[j]
if a in embeddings and b in embeddings:
vec_a = embeddings[a]
vec_b = embeddings[b]
dot = np.dot(vec_a, vec_b)
norm = np.linalg.norm(vec_a) * np.linalg.norm(vec_b)
sim = float(dot / norm) if norm > 0 else 0.0
similarities.append({"a": a, "b": b, "similarity": round(sim, 4)})
return {
"drafts": drafts_data,
"shared_ideas": shared_ideas,
"unique_ideas": unique_ideas,
"shared_refs": shared_refs,
"unique_refs": unique_refs,
"similarities": similarities,
"comparison_text": None,
}
def _classify_to_layer(text: str) -> str:
"""Classify a piece of text to the best-matching architectural layer."""
text_lower = text.lower()
words = set(re.findall(r"[a-z][a-z0-9-]+", text_lower))
scores: dict[str, int] = {}
for layer_id, kws in _LAYER_KEYWORDS.items():
scores[layer_id] = len(words & kws)
# Also check for multi-word keywords as substrings
for kw in kws:
if len(kw) > 4 and kw in text_lower:
scores[layer_id] += 1
best = max(scores, key=lambda k: scores[k])
return best if scores[best] > 0 else "communication" # default
def get_architecture(db: Database) -> dict:
"""Build system-of-systems architecture from idea clusters, gaps, and source coverage."""
return _cached("architecture", lambda: _compute_architecture(db), ttl=600)
def _compute_architecture(db: Database) -> dict:
"""Compute the architecture view.
Returns:
{
"components": [...], # architectural building blocks
"dependencies": [...], # edges between components
"gaps": [...], # gaps mapped to layers
"layers": [...], # layer definitions
"source_coverage": {...}, # per-layer source coverage
"stats": {...}
}
"""
# --- Gather raw data ---
cluster_data = get_idea_clusters(db)
clusters = cluster_data.get("clusters", [])
links = cluster_data.get("links", [])
all_gaps = db.all_gaps()
# Source coverage: count drafts per source per layer
draft_rows = db.conn.execute(
"SELECT d.name, d.title, d.abstract, d.source, r.categories "
"FROM drafts d LEFT JOIN ratings r ON d.name = r.draft_name "
"WHERE COALESCE(r.false_positive, 0) = 0"
).fetchall()
# Build components from idea clusters
components = []
cluster_to_component: dict[int, int] = {} # cluster_id -> component index
for cl in clusters:
if cl["size"] < 3:
continue # skip tiny clusters
# Determine layer from cluster theme + idea titles
text_blob = cl.get("theme", "")
for idea in cl.get("ideas", [])[:10]:
text_blob += " " + idea.get("title", "") + " " + idea.get("description", "")
layer = _classify_to_layer(text_blob)
# Source coverage for this component's drafts
draft_names = set(cl.get("drafts", []))
sources: Counter = Counter()
comp_drafts: list[dict] = []
for dr in draft_rows:
if dr["name"] in draft_names:
sources[dr["source"] or "ietf"] += 1
comp_drafts.append({"name": dr["name"], "title": (dr["title"] or dr["name"])[:80], "source": dr["source"] or "ietf"})
# Idea type breakdown
type_counts: Counter = Counter()
for idea in cl.get("ideas", []):
t = idea.get("type", "")
if t:
type_counts[t] += 1
# Maturity: rough proxy from idea count and source diversity
maturity = min(5, 1 + len(sources) + (1 if cl["size"] >= 10 else 0) + (1 if cl.get("cross_wg") else 0))
comp = {
"id": len(components),
"cluster_id": cl["id"],
"name": cl.get("theme", f"Component {cl['id']}"),
"layer": layer,
"size": cl["size"],
"draft_count": len(draft_names),
"drafts": comp_drafts[:20],
"sources": dict(sources.most_common()),
"type_breakdown": dict(type_counts.most_common(5)),
"maturity": maturity,
"wgs": cl.get("wgs", [])[:3],
"top_ideas": [{"title": i["title"], "type": i.get("type", ""), "draft_name": i.get("draft_name", "")}
for i in cl.get("ideas", [])[:5]],
"categories": cl.get("categories", []),
}
cluster_to_component[cl["id"]] = comp["id"]
components.append(comp)
# Build dependencies from cross-cluster links
dependencies = []
for link in links:
src_comp = cluster_to_component.get(link["source"])
tgt_comp = cluster_to_component.get(link["target"])
if src_comp is not None and tgt_comp is not None and src_comp != tgt_comp:
dependencies.append({
"source": src_comp,
"target": tgt_comp,
"similarity": link.get("best_pair_sim", link.get("similarity", 0)),
"idea_a": link.get("idea_a", ""),
"idea_b": link.get("idea_b", ""),
})
# Map gaps to layers
gap_items = []
for gap in all_gaps:
text = gap["topic"] + " " + gap.get("description", "") + " " + gap.get("category", "")
layer = _classify_to_layer(text)
gap_items.append({
"id": gap["id"],
"topic": gap["topic"],
"description": gap["description"],
"evidence": gap.get("evidence", ""),
"severity": gap.get("severity", "medium"),
"category": gap.get("category", ""),
"layer": layer,
})
# Source coverage per layer
source_coverage: dict[str, dict[str, int]] = {l["id"]: Counter() for l in _ARCH_LAYERS}
for dr in draft_rows:
text = (dr["title"] or "") + " " + (dr["abstract"] or "")[:200]
layer = _classify_to_layer(text)
source_coverage[layer][dr["source"] or "ietf"] += 1
# Convert Counters to dicts
source_coverage = {k: dict(v) for k, v in source_coverage.items()}
# Layer summary stats
layer_info = []
for l in _ARCH_LAYERS:
lid = l["id"]
comp_count = sum(1 for c in components if c["layer"] == lid)
idea_count = sum(c["size"] for c in components if c["layer"] == lid)
gap_count = sum(1 for g in gap_items if g["layer"] == lid)
layer_info.append({
"id": l["id"],
"label": l["label"],
"order": l["order"],
"component_count": comp_count,
"idea_count": idea_count,
"gap_count": gap_count,
"coverage": source_coverage.get(lid, {}),
"total_drafts": sum(source_coverage.get(lid, {}).values()),
})
return {
"components": components,
"dependencies": dependencies,
"gaps": gap_items,
"layers": layer_info,
"stats": {
"total_components": len(components),
"total_dependencies": len(dependencies),
"total_gaps": len(gap_items),
"layers_with_gaps": len(set(g["layer"] for g in gap_items)),
},
}
def get_idea_analysis(db: Database) -> dict:
"""Return comprehensive idea analysis data for the idea-analysis page.
Includes novelty distribution, type breakdown with avg novelty,
top novel ideas, ideas-per-draft distribution, cross-tab of type x source,
shared ideas across drafts, and idea novelty vs draft rating correlation.
"""
from collections import Counter, defaultdict
from difflib import SequenceMatcher
# Fetch raw data
all_ideas = db.conn.execute(
"""SELECT i.id, i.draft_name, i.title, i.description, i.idea_type,
i.novelty_score
FROM ideas i ORDER BY i.novelty_score DESC NULLS LAST"""
).fetchall()
all_ideas = [dict(r) for r in all_ideas]
# Draft ratings lookup
ratings_rows = db.conn.execute(
"""SELECT d.name, d.title as draft_title, d.source,
r.novelty AS r_novelty, r.maturity, r.overlap, r.momentum, r.relevance
FROM drafts d LEFT JOIN ratings r ON d.name = r.draft_name"""
).fetchall()
draft_info = {}
for r in ratings_rows:
row = dict(r)
# Compute composite score (average of 5 dimensions)
dims = [row.get("r_novelty"), row.get("maturity"), row.get("overlap"),
row.get("momentum"), row.get("relevance")]
valid = [d for d in dims if d is not None]
row["composite_score"] = sum(valid) / len(valid) if valid else None
draft_info[row["name"]] = row
total = len(all_ideas)
scored = [i for i in all_ideas if i.get("novelty_score") is not None]
unscored = total - len(scored)
avg_novelty = sum(i["novelty_score"] for i in scored) / len(scored) if scored else 0
# Embedding coverage
embed_count = db.conn.execute("SELECT COUNT(*) FROM idea_embeddings").fetchone()[0]
# --- Novelty score distribution (histogram) ---
novelty_dist = Counter(i["novelty_score"] for i in scored)
novelty_histogram = {
"labels": [1, 2, 3, 4, 5],
"values": [novelty_dist.get(s, 0) for s in [1, 2, 3, 4, 5]],
}
# --- Ideas by type with counts and avg novelty ---
type_data = defaultdict(lambda: {"count": 0, "novelty_sum": 0, "novelty_n": 0})
for idea in all_ideas:
t = idea.get("idea_type") or "other"
type_data[t]["count"] += 1
if idea.get("novelty_score") is not None:
type_data[t]["novelty_sum"] += idea["novelty_score"]
type_data[t]["novelty_n"] += 1
by_type = []
for t, d in sorted(type_data.items(), key=lambda x: x[1]["count"], reverse=True):
avg = d["novelty_sum"] / d["novelty_n"] if d["novelty_n"] > 0 else 0
by_type.append({"type": t, "count": d["count"], "avg_novelty": round(avg, 2)})
type_names = [t["type"] for t in by_type]
# --- Top 20 most novel ideas (score 4-5) ---
top_novel = []
for idea in all_ideas:
if idea.get("novelty_score") and idea["novelty_score"] >= 4:
di = draft_info.get(idea["draft_name"], {})
top_novel.append({
"title": idea["title"],
"description": idea["description"],
"type": idea.get("idea_type", "other"),
"novelty_score": idea["novelty_score"],
"draft_name": idea["draft_name"],
"draft_title": di.get("draft_title", ""),
"draft_score": di.get("composite_score"),
})
top_novel.sort(key=lambda x: (x["novelty_score"], x.get("draft_score") or 0), reverse=True)
top_novel = top_novel[:20]
# --- Ideas per draft distribution ---
ideas_per_draft = Counter(i["draft_name"] for i in all_ideas)
ipd_dist = Counter(ideas_per_draft.values())
ideas_per_draft_hist = {
"labels": sorted(ipd_dist.keys()),
"values": [ipd_dist[k] for k in sorted(ipd_dist.keys())],
}
# Also top drafts by idea count
top_idea_drafts = []
for name, count in ideas_per_draft.most_common(10):
di = draft_info.get(name, {})
top_idea_drafts.append({
"name": name,
"draft_title": di.get("draft_title", ""),
"idea_count": count,
"score": di.get("composite_score"),
})
# --- Cross-tabulation: idea_type x source ---
type_source = defaultdict(lambda: defaultdict(int))
for idea in all_ideas:
t = idea.get("idea_type") or "other"
di = draft_info.get(idea["draft_name"], {})
source = di.get("source", "ietf") or "ietf"
type_source[t][source] += 1
sources = sorted(set(
di.get("source", "ietf") or "ietf" for di in draft_info.values()
))
cross_tab = []
for t in type_names:
row = {"type": t}
for s in sources:
row[s] = type_source[t].get(s, 0)
cross_tab.append(row)
# --- Shared ideas across drafts ---
idea_groups: list[dict] = []
for idea in all_ideas:
title_lower = idea["title"].lower().strip()
matched = False
for group in idea_groups:
ratio = SequenceMatcher(None, title_lower, group["canonical"]).ratio()
if ratio >= 0.75:
group["ideas"].append(idea)
group["drafts"].add(idea["draft_name"])
matched = True
break
if not matched:
idea_groups.append({
"canonical": title_lower,
"title": idea["title"],
"ideas": [idea],
"drafts": {idea["draft_name"]},
})
shared_ideas = []
for g in sorted(idea_groups, key=lambda x: len(x["drafts"]), reverse=True):
if len(g["drafts"]) < 2:
break
shared_ideas.append({
"title": g["title"],
"appearances": len(g["drafts"]),
"drafts": sorted(g["drafts"])[:8],
"types": list(set(i.get("idea_type", "other") for i in g["ideas"])),
})
# --- Scatter: draft avg idea novelty vs draft relevance ---
draft_idea_novelty = defaultdict(list)
for idea in scored:
draft_idea_novelty[idea["draft_name"]].append(idea["novelty_score"])
scatter_data = []
for name, scores in draft_idea_novelty.items():
di = draft_info.get(name, {})
if di.get("relevance") is not None and di.get("composite_score") is not None:
scatter_data.append({
"name": name,
"avg_idea_novelty": round(sum(scores) / len(scores), 2),
"relevance": di["relevance"],
"score": di["composite_score"],
"idea_count": len(scores),
"source": di.get("source", "ietf") or "ietf",
})
# --- Sunburst data: type -> novelty band ---
sunburst_labels = []
sunburst_parents = []
sunburst_values = []
# Root
sunburst_labels.append("All Ideas")
sunburst_parents.append("")
sunburst_values.append(total)
novelty_bands = {"High (4-5)": lambda s: s is not None and s >= 4,
"Medium (3)": lambda s: s is not None and s == 3,
"Low (1-2)": lambda s: s is not None and s <= 2,
"Unscored": lambda s: s is None}
for t_info in by_type:
t = t_info["type"]
sunburst_labels.append(t)
sunburst_parents.append("All Ideas")
sunburst_values.append(t_info["count"])
# Sub-bands
type_ideas = [i for i in all_ideas if (i.get("idea_type") or "other") == t]
for band, fn in novelty_bands.items():
cnt = sum(1 for i in type_ideas if fn(i.get("novelty_score")))
if cnt > 0:
sunburst_labels.append(f"{t} - {band}")
sunburst_parents.append(t)
sunburst_values.append(cnt)
return {
"total": total,
"scored": len(scored),
"unscored": unscored,
"avg_novelty": round(avg_novelty, 2),
"embed_count": embed_count,
"embed_pct": round(embed_count / total * 100, 1) if total > 0 else 0,
"type_count": len(by_type),
"novelty_histogram": novelty_histogram,
"by_type": by_type,
"top_novel": top_novel,
"ideas_per_draft_hist": ideas_per_draft_hist,
"top_idea_drafts": top_idea_drafts,
"cross_tab": cross_tab,
"sources": sources,
"shared_ideas": shared_ideas,
"scatter_data": scatter_data,
"sunburst": {
"labels": sunburst_labels,
"parents": sunburst_parents,
"values": sunburst_values,
},
}
def get_trends_data(db: Database) -> dict:
"""Return temporal evolution data for the /trends page.
Returns dict with:
- monthly_submissions: [{month, source, count}, ...]
- monthly_ratings: [{month, novelty, maturity, overlap, momentum, relevance}, ...]
- monthly_categories: [{month, category, count}, ...]
- safety_ratio: [{month, safety, capability, ratio}, ...]
- cumulative_ideas: [{month, total}, ...]
- monthly_new_authors: [{month, count}, ...]
- stats: {fastest_growing, newest_active}
- monthly_table: [{month, total, sources: {}, avg_score}, ...]
"""
conn = db.conn
# 1. Monthly submissions by source
rows = conn.execute("""
SELECT substr(time, 1, 7) AS month, source, COUNT(*) AS cnt
FROM drafts
WHERE time IS NOT NULL AND time != ''
GROUP BY month, source
ORDER BY month
""").fetchall()
monthly_submissions = [{"month": r["month"], "source": r["source"], "count": r["cnt"]} for r in rows]
# 2. Monthly average ratings (all 5 dimensions)
rows = conn.execute("""
SELECT substr(d.time, 1, 7) AS month,
AVG(r.novelty) AS novelty, AVG(r.maturity) AS maturity,
AVG(r.overlap) AS overlap, AVG(r.momentum) AS momentum,
AVG(r.relevance) AS relevance,
COUNT(*) AS cnt
FROM drafts d
JOIN ratings r ON d.name = r.draft_name
WHERE d.time IS NOT NULL AND d.time != '' AND r.false_positive = 0
GROUP BY month
ORDER BY month
""").fetchall()
monthly_ratings = [{
"month": r["month"],
"novelty": round(r["novelty"], 2),
"maturity": round(r["maturity"], 2),
"overlap": round(r["overlap"], 2),
"momentum": round(r["momentum"], 2),
"relevance": round(r["relevance"], 2),
"count": r["cnt"],
} for r in rows]
# 3. Monthly category distribution
rows = conn.execute("""
SELECT substr(d.time, 1, 7) AS month, r.categories
FROM drafts d
JOIN ratings r ON d.name = r.draft_name
WHERE d.time IS NOT NULL AND d.time != '' AND r.false_positive = 0
""").fetchall()
cat_monthly: dict[str, Counter] = defaultdict(Counter)
all_cats: Counter = Counter()
for r in rows:
month = r["month"]
try:
cats = json.loads(r["categories"]) if r["categories"] else []
except (json.JSONDecodeError, TypeError):
cats = []
for c in cats:
cat_monthly[month][c] += 1
all_cats[c] += 1
# Top 8 categories
top_cats = [c for c, _ in all_cats.most_common(8)]
months_sorted = sorted(cat_monthly.keys())
monthly_categories = []
for month in months_sorted:
for cat in top_cats:
monthly_categories.append({
"month": month,
"category": cat,
"count": cat_monthly[month].get(cat, 0),
})
# 4. Safety ratio over time
safety_ratio = []
for month in months_sorted:
safety = sum(cat_monthly[month].get(c, 0) for c in SAFETY_CATEGORIES)
capability = sum(cat_monthly[month].get(c, 0) for c in CAPABILITY_CATEGORIES)
ratio = round(safety / capability, 2) if capability > 0 else 0
safety_ratio.append({
"month": month,
"safety": safety,
"capability": capability,
"ratio": ratio,
})
# 5. Cumulative idea count over time
rows = conn.execute("""
SELECT substr(d.time, 1, 7) AS month, COUNT(i.id) AS cnt
FROM ideas i
JOIN drafts d ON i.draft_name = d.name
WHERE d.time IS NOT NULL AND d.time != ''
GROUP BY month
ORDER BY month
""").fetchall()
cumulative = 0
cumulative_ideas = []
for r in rows:
cumulative += r["cnt"]
cumulative_ideas.append({"month": r["month"], "total": cumulative})
# 6. Monthly new author count (first-time contributors)
rows = conn.execute("""
SELECT da.person_id, MIN(substr(d.time, 1, 7)) AS first_month
FROM draft_authors da
JOIN drafts d ON da.draft_name = d.name
WHERE d.time IS NOT NULL AND d.time != ''
GROUP BY da.person_id
""").fetchall()
new_author_monthly: Counter = Counter()
for r in rows:
if r["first_month"]:
new_author_monthly[r["first_month"]] += 1
monthly_new_authors = [
{"month": m, "count": new_author_monthly.get(m, 0)}
for m in months_sorted
]
# 7. Stats: fastest growing category, newest active category
fastest_growing = ""
newest_active = ""
if len(months_sorted) >= 4:
mid = len(months_sorted) // 2
early_months = months_sorted[:mid]
late_months = months_sorted[mid:]
best_growth = -999
for cat in top_cats:
early = sum(cat_monthly[m].get(cat, 0) for m in early_months)
late = sum(cat_monthly[m].get(cat, 0) for m in late_months)
if early > 0:
growth = (late - early) / early
elif late > 0:
growth = float("inf")
else:
growth = 0
if growth > best_growth:
best_growth = growth
fastest_growing = cat
# Newest active: category with latest first appearance
cat_first_month: dict[str, str] = {}
for month in months_sorted:
for cat in all_cats:
if cat not in cat_first_month and cat_monthly[month].get(cat, 0) > 0:
cat_first_month[cat] = month
if cat_first_month:
newest_active = max(cat_first_month, key=lambda c: cat_first_month[c])
# 8. Monthly breakdown table
monthly_table = []
for month in months_sorted:
# Get per-source counts
sources: dict[str, int] = {}
total = 0
for s in monthly_submissions:
if s["month"] == month:
sources[s["source"]] = s["count"]
total += s["count"]
# Get avg score
avg_row = conn.execute("""
SELECT AVG((r.novelty + r.maturity + r.overlap + r.momentum + r.relevance) / 5.0) AS avg_score
FROM drafts d JOIN ratings r ON d.name = r.draft_name
WHERE substr(d.time, 1, 7) = ? AND r.false_positive = 0
""", (month,)).fetchone()
avg_score = round(avg_row["avg_score"], 2) if avg_row and avg_row["avg_score"] else 0
monthly_table.append({
"month": month,
"total": total,
"sources": sources,
"avg_score": avg_score,
})
return {
"monthly_submissions": monthly_submissions,
"monthly_ratings": monthly_ratings,
"monthly_categories": monthly_categories,
"safety_ratio": safety_ratio,
"cumulative_ideas": cumulative_ideas,
"monthly_new_authors": monthly_new_authors,
"top_categories": top_cats,
"months": months_sorted,
"stats": {
"fastest_growing": fastest_growing,
"newest_active": newest_active,
},
"monthly_table": monthly_table,
}
def get_complexity_data(db: Database) -> dict:
"""Return draft complexity analysis data for the /complexity page.
For each rated draft, compute structural complexity metrics and
correlate with rating dimensions.
Returns dict with:
- drafts: [{name, title, pages, author_count, citation_count, idea_count,
category_count, novelty, maturity, overlap, momentum, relevance,
score, composite_complexity}, ...]
- correlations: {metric: {dimension: r_value}}
- top_complex: top 10 most complex drafts
- top_efficient: top 10 high-rating low-complexity drafts
- stats: {avg_pages, avg_authors, avg_citations, pages_coverage_pct}
- category_complexity: [{category, avg_pages, avg_authors, avg_citations, count}, ...]
- source_complexity: [{source, avg_pages, avg_authors, avg_citations, count}, ...]
"""
conn = db.conn
# Build per-draft complexity data
rows = conn.execute("""
SELECT d.name, d.title, d.pages, d.source,
r.novelty, r.maturity, r.overlap, r.momentum, r.relevance,
r.categories,
(r.novelty + r.maturity + r.overlap + r.momentum + r.relevance) / 5.0 AS score
FROM drafts d
JOIN ratings r ON d.name = r.draft_name
WHERE r.false_positive = 0
""").fetchall()
# Author counts
author_counts = db.draft_author_count_map()
# Citation counts (outgoing refs)
citation_counts = {}
for row in conn.execute("""
SELECT draft_name, COUNT(*) AS cnt FROM draft_refs GROUP BY draft_name
""").fetchall():
citation_counts[row["draft_name"]] = row["cnt"]
# Idea counts
idea_counts = {}
for row in conn.execute("""
SELECT draft_name, COUNT(*) AS cnt FROM ideas GROUP BY draft_name
""").fetchall():
idea_counts[row["draft_name"]] = row["cnt"]
drafts_data = []
total_with_pages = 0
total_drafts = 0
for r in rows:
total_drafts += 1
pages = r["pages"]
if pages is not None:
total_with_pages += 1
try:
cats = json.loads(r["categories"]) if r["categories"] else []
except (json.JSONDecodeError, TypeError):
cats = []
ac = author_counts.get(r["name"], 0)
cc = citation_counts.get(r["name"], 0)
ic = idea_counts.get(r["name"], 0)
cat_count = len(cats)
# Composite complexity: normalize each metric to 0-1 scale and average
# (raw values stored; composite calculated after we know max values)
drafts_data.append({
"name": r["name"],
"title": r["title"],
"pages": pages,
"source": r["source"] or "ietf",
"author_count": ac,
"citation_count": cc,
"idea_count": ic,
"category_count": cat_count,
"categories": cats,
"novelty": r["novelty"],
"maturity": r["maturity"],
"overlap": r["overlap"],
"momentum": r["momentum"],
"relevance": r["relevance"],
"score": round(r["score"], 2),
})
# Compute composite complexity score (normalized 0-1 each, then averaged)
max_pages = max((d["pages"] for d in drafts_data if d["pages"] is not None), default=1) or 1
max_authors = max((d["author_count"] for d in drafts_data), default=1) or 1
max_citations = max((d["citation_count"] for d in drafts_data), default=1) or 1
max_ideas = max((d["idea_count"] for d in drafts_data), default=1) or 1
for d in drafts_data:
p = (d["pages"] / max_pages) if d["pages"] is not None else 0.3 # default to median-ish
a = d["author_count"] / max_authors
c = d["citation_count"] / max_citations
i = d["idea_count"] / max_ideas
d["composite_complexity"] = round((p + a + c + i) / 4, 3)
# Correlation matrix: complexity metrics vs rating dimensions
metrics = ["pages", "author_count", "citation_count", "idea_count", "category_count"]
dimensions = ["novelty", "maturity", "overlap", "momentum", "relevance"]
def _pearson(xs: list[float], ys: list[float]) -> float:
"""Compute Pearson correlation coefficient."""
n = len(xs)
if n < 3:
return 0.0
mean_x = sum(xs) / n
mean_y = sum(ys) / n
cov = sum((x - mean_x) * (y - mean_y) for x, y in zip(xs, ys))
std_x = (sum((x - mean_x) ** 2 for x in xs)) ** 0.5
std_y = (sum((y - mean_y) ** 2 for y in ys)) ** 0.5
if std_x == 0 or std_y == 0:
return 0.0
return round(cov / (std_x * std_y), 3)
correlations: dict[str, dict[str, float]] = {}
for metric in metrics:
correlations[metric] = {}
for dim in dimensions:
if metric == "pages":
# Filter to drafts with pages data
pairs = [(d[metric], d[dim]) for d in drafts_data if d[metric] is not None]
else:
pairs = [(d[metric], d[dim]) for d in drafts_data]
if len(pairs) >= 3:
xs, ys = zip(*pairs)
correlations[metric][dim] = _pearson(list(xs), list(ys))
else:
correlations[metric][dim] = 0.0
# Top 10 most complex
sorted_by_complexity = sorted(drafts_data, key=lambda d: d["composite_complexity"], reverse=True)
top_complex = sorted_by_complexity[:10]
# Top 10 efficient: high score but low complexity
# Efficiency = score / (composite_complexity + 0.1) (avoid div by zero)
for d in drafts_data:
d["efficiency"] = round(d["score"] / (d["composite_complexity"] + 0.1), 2)
sorted_by_efficiency = sorted(drafts_data, key=lambda d: d["efficiency"], reverse=True)
top_efficient = sorted_by_efficiency[:10]
# Stats
pages_vals = [d["pages"] for d in drafts_data if d["pages"] is not None]
avg_pages = round(sum(pages_vals) / len(pages_vals), 1) if pages_vals else 0
avg_authors = round(sum(d["author_count"] for d in drafts_data) / len(drafts_data), 1) if drafts_data else 0
avg_citations = round(sum(d["citation_count"] for d in drafts_data) / len(drafts_data), 1) if drafts_data else 0
pages_coverage = round(total_with_pages / total_drafts * 100, 1) if total_drafts else 0
# Category complexity averages
cat_data: dict[str, list[dict]] = defaultdict(list)
for d in drafts_data:
for cat in d.get("categories", []):
cat_data[cat].append(d)
category_complexity = []
for cat, ds in sorted(cat_data.items(), key=lambda x: -len(x[1])):
p_vals = [d["pages"] for d in ds if d["pages"] is not None]
category_complexity.append({
"category": cat,
"avg_pages": round(sum(p_vals) / len(p_vals), 1) if p_vals else 0,
"avg_authors": round(sum(d["author_count"] for d in ds) / len(ds), 1),
"avg_citations": round(sum(d["citation_count"] for d in ds) / len(ds), 1),
"avg_score": round(sum(d["score"] for d in ds) / len(ds), 2),
"count": len(ds),
})
# Source complexity
source_data: dict[str, list[dict]] = defaultdict(list)
for d in drafts_data:
source_data[d["source"]].append(d)
source_complexity = []
for src, ds in sorted(source_data.items(), key=lambda x: -len(x[1])):
p_vals = [d["pages"] for d in ds if d["pages"] is not None]
source_complexity.append({
"source": src,
"avg_pages": round(sum(p_vals) / len(p_vals), 1) if p_vals else 0,
"avg_authors": round(sum(d["author_count"] for d in ds) / len(ds), 1),
"avg_citations": round(sum(d["citation_count"] for d in ds) / len(ds), 1),
"avg_score": round(sum(d["score"] for d in ds) / len(ds), 2),
"count": len(ds),
})
return {
"drafts": drafts_data,
"correlations": correlations,
"metrics": metrics,
"dimensions": dimensions,
"top_complex": top_complex,
"top_efficient": top_efficient,
"stats": {
"avg_pages": avg_pages,
"avg_authors": avg_authors,
"avg_citations": avg_citations,
"pages_coverage_pct": pages_coverage,
"total_drafts": total_drafts,
},
"category_complexity": category_complexity,
"source_complexity": source_complexity,
}
def get_source_comparison(db: Database) -> dict:
"""Cross-source comparison: ratings, categories, counts by standards body."""
pairs_all = db.drafts_with_ratings(limit=2000)
# Also include false positives for completeness of source counts
pairs_fp = db.drafts_with_ratings(limit=2000, include_false_positives=True)
# Build per-source data
source_stats: dict[str, dict] = {}
source_categories: dict[str, Counter] = defaultdict(Counter)
source_ratings: dict[str, dict[str, list]] = defaultdict(lambda: {
"novelty": [], "maturity": [], "overlap": [], "momentum": [], "relevance": [], "scores": [],
})
# Collect author counts per source
all_authors_by_source: dict[str, set] = defaultdict(set)
for draft, rating in pairs_all:
src = getattr(draft, "source", "ietf") or "ietf"
source_ratings[src]["novelty"].append(rating.novelty)
source_ratings[src]["maturity"].append(rating.maturity)
source_ratings[src]["overlap"].append(rating.overlap)
source_ratings[src]["momentum"].append(rating.momentum)
source_ratings[src]["relevance"].append(rating.relevance)
source_ratings[src]["scores"].append(round(rating.composite_score, 2))
for cat in rating.categories:
source_categories[src][cat] += 1
# Get all drafts (including unrated) for draft counts
all_drafts = db.list_drafts(limit=5000)
source_draft_counts: Counter = Counter()
for d in all_drafts:
src = getattr(d, "source", "ietf") or "ietf"
source_draft_counts[src] += 1
# Author counts by source
try:
rows = db.conn.execute(
"""SELECT d.source, COUNT(DISTINCT da.person_id) as author_count
FROM drafts d
JOIN draft_authors da ON d.name = da.draft_name
GROUP BY d.source"""
).fetchall()
for r in rows:
src = r["source"] or "ietf"
all_authors_by_source[src] = r["author_count"]
except Exception:
pass
# Idea counts by source
source_idea_counts: Counter = Counter()
try:
rows = db.conn.execute(
"""SELECT d.source, COUNT(*) as idea_count
FROM ideas i
JOIN drafts d ON i.draft_name = d.name
GROUP BY d.source"""
).fetchall()
for r in rows:
src = r["source"] or "ietf"
source_idea_counts[src] = r["idea_count"]
except Exception:
pass
# Build summary table
all_sources = sorted(set(source_draft_counts.keys()) | set(source_ratings.keys()))
summary = []
for src in all_sources:
rats = source_ratings.get(src, {"scores": []})
cats = source_categories.get(src, Counter())
top_cat = cats.most_common(1)[0][0] if cats else "N/A"
avg_score = round(sum(rats["scores"]) / len(rats["scores"]), 2) if rats["scores"] else 0.0
summary.append({
"source": src,
"drafts": source_draft_counts.get(src, 0),
"rated": len(rats["scores"]),
"authors": all_authors_by_source.get(src, 0),
"ideas": source_idea_counts.get(src, 0),
"avg_score": avg_score,
"top_category": top_cat,
})
# Radar data: average of each dimension per source
radar = {}
for src, rats in source_ratings.items():
if not rats["scores"]:
continue
n = len(rats["scores"])
radar[src] = {
"novelty": round(sum(rats["novelty"]) / n, 2),
"maturity": round(sum(rats["maturity"]) / n, 2),
"overlap": round(sum(rats["overlap"]) / n, 2),
"momentum": round(sum(rats["momentum"]) / n, 2),
"relevance": round(sum(rats["relevance"]) / n, 2),
"count": n,
}
# Category distribution by source (for stacked bar / heatmap)
all_cats = sorted({cat for cats in source_categories.values() for cat in cats})
heatmap = {
"sources": list(source_categories.keys()),
"categories": all_cats,
"values": [],
}
for src in heatmap["sources"]:
row = [source_categories[src].get(cat, 0) for cat in all_cats]
heatmap["values"].append(row)
# Unique/shared categories analysis
source_cat_sets = {src: set(cats.keys()) for src, cats in source_categories.items()}
unique_cats = {}
for src, cats in source_cat_sets.items():
others = set()
for s2, c2 in source_cat_sets.items():
if s2 != src:
others |= c2
unique_cats[src] = sorted(cats - others)
shared_cats = set()
for src, cats in source_cat_sets.items():
for s2, c2 in source_cat_sets.items():
if s2 != src:
shared_cats |= (cats & c2)
shared_cats = sorted(shared_cats)
return {
"summary": summary,
"radar": radar,
"heatmap": heatmap,
"unique_categories": unique_cats,
"shared_categories": shared_cats,
}
def get_citation_influence(db: Database) -> dict:
"""Return citation influence analysis data (cached for 5 min)."""
return _cached("citation_influence", lambda: _compute_citation_influence(db))
def _compute_citation_influence(db: Database) -> dict:
"""Compute citation influence metrics from the draft_refs table.
Returns dict with:
- top_cited_rfcs: top 20 most-cited RFCs with citation counts and citing drafts
- top_citing_drafts: top 20 drafts that cite the most references
- citations_by_category: average citations per category
- stats: total citations, unique RFCs, avg refs per draft
- draft_network: draft-to-draft citation edges for visualization
"""
# Get all references
rows = db.conn.execute(
"SELECT draft_name, ref_type, ref_id FROM draft_refs"
).fetchall()
# Get draft titles and categories
draft_rows = db.conn.execute("SELECT name, title FROM drafts").fetchall()
draft_titles = {r["name"]: r["title"] for r in draft_rows}
rating_rows = db.conn.execute("SELECT draft_name, categories FROM ratings").fetchall()
draft_cats: dict[str, str] = {}
for r in rating_rows:
try:
cats = json.loads(r["categories"]) if r["categories"] else []
draft_cats[r["draft_name"]] = cats[0] if cats else "Other"
except Exception:
draft_cats[r["draft_name"]] = "Other"
# Well-known RFC names
rfc_names = {
"2119": "Key words (MUST/SHALL/MAY)", "8174": "Key words update",
"8259": "JSON", "7519": "JWT", "6749": "OAuth 2.0",
"7540": "HTTP/2", "9110": "HTTP Semantics", "7525": "TLS Recommendations",
"8446": "TLS 1.3", "3986": "URIs", "7230": "HTTP/1.1 Syntax",
"7231": "HTTP/1.1 Semantics", "8288": "Web Linking", "6125": "TLS Server Identity",
"7515": "JWS", "7516": "JWE", "7517": "JWK", "7518": "JWA",
"9449": "DPoP", "6750": "OAuth Bearer", "8725": "JWT Best Practices",
"9396": "Rich Authorization Requests", "9101": "JAR",
"8414": "OAuth Server Metadata", "7591": "Dynamic Client Registration",
"8705": "mTLS for OAuth", "9068": "JWT Access Tokens",
"6819": "OAuth Threat Model", "9200": "ACE-OAuth", "9052": "COSE",
"8392": "CWT", "7252": "CoAP",
}
# In-degree: how many times each RFC is cited
rfc_citations: dict[str, list[str]] = defaultdict(list)
draft_out_count: dict[str, int] = Counter()
draft_to_draft_edges = []
total_citations = 0
for r in rows:
draft_name = r["draft_name"]
ref_type = r["ref_type"]
ref_id = r["ref_id"]
total_citations += 1
draft_out_count[draft_name] += 1
if ref_type == "rfc":
rfc_citations[ref_id].append(draft_name)
elif ref_type == "draft":
draft_to_draft_edges.append({
"source": draft_name,
"target": ref_id,
"source_title": draft_titles.get(draft_name, draft_name),
"target_title": draft_titles.get(ref_id, ref_id),
})
# Top 20 most-cited RFCs
rfc_sorted = sorted(rfc_citations.items(), key=lambda x: len(x[1]), reverse=True)
top_cited_rfcs = []
for ref_id, citing_drafts in rfc_sorted[:20]:
top_cited_rfcs.append({
"rfc_id": ref_id,
"name": rfc_names.get(ref_id, ""),
"count": len(citing_drafts),
"drafts": citing_drafts[:10], # Limit to first 10 for display
"total_drafts": len(citing_drafts),
})
# Top 20 most-citing drafts (out-degree)
draft_sorted = sorted(draft_out_count.items(), key=lambda x: x[1], reverse=True)
top_citing_drafts = []
for draft_name, count in draft_sorted[:20]:
top_citing_drafts.append({
"name": draft_name,
"title": draft_titles.get(draft_name, draft_name),
"count": count,
"category": draft_cats.get(draft_name, "Other"),
})
# Citation density by category
cat_totals: dict[str, int] = Counter()
cat_counts: dict[str, int] = Counter()
for draft_name, count in draft_out_count.items():
cat = draft_cats.get(draft_name, "Other")
cat_totals[cat] += count
cat_counts[cat] += 1
citations_by_category = []
for cat in sorted(cat_totals.keys()):
avg = cat_totals[cat] / cat_counts[cat] if cat_counts[cat] > 0 else 0
citations_by_category.append({
"category": cat,
"total_citations": cat_totals[cat],
"draft_count": cat_counts[cat],
"avg_citations": round(avg, 1),
})
citations_by_category.sort(key=lambda x: x["avg_citations"], reverse=True)
# PageRank-style influence: drafts that cite highly-cited RFCs
# Simple approximation: sum of (1 / citation_count) for each RFC cited
rfc_influence = {rid: len(drafts) for rid, drafts in rfc_citations.items()}
draft_pagerank: dict[str, float] = Counter()
for r in rows:
if r["ref_type"] == "rfc" and r["ref_id"] in rfc_influence:
# Higher score for citing highly-cited RFCs
draft_pagerank[r["draft_name"]] += rfc_influence[r["ref_id"]]
pagerank_sorted = sorted(draft_pagerank.items(), key=lambda x: x[1], reverse=True)
top_pagerank = []
for draft_name, score in pagerank_sorted[:20]:
top_pagerank.append({
"name": draft_name,
"title": draft_titles.get(draft_name, draft_name),
"score": round(score, 1),
"category": draft_cats.get(draft_name, "Other"),
"out_degree": draft_out_count.get(draft_name, 0),
})
# Stats
unique_rfcs = len(rfc_citations)
drafts_with_refs = len(draft_out_count)
avg_refs = total_citations / drafts_with_refs if drafts_with_refs > 0 else 0
return {
"top_cited_rfcs": top_cited_rfcs,
"top_citing_drafts": top_citing_drafts,
"top_pagerank": top_pagerank,
"citations_by_category": citations_by_category,
"draft_network": draft_to_draft_edges[:200], # Limit for perf
"stats": {
"total_citations": total_citations,
"unique_rfcs": unique_rfcs,
"drafts_with_refs": drafts_with_refs,
"avg_refs_per_draft": round(avg_refs, 1),
},
}
def get_bcp_analysis(db: Database) -> dict:
"""Return BCP dependency analysis data (cached for 5 min)."""
return _cached("bcp_analysis", lambda: _compute_bcp_analysis(db))
def _compute_bcp_analysis(db: Database) -> dict:
"""Compute BCP dependency analysis.
Returns dict with:
- bcps: all BCPs with citation counts and citing drafts
- co_citation: which BCPs tend to be co-cited
- by_category: BCP citation patterns by category
- coverage: what % of drafts cite at least one BCP
"""
# Get all BCP references
bcp_rows = db.conn.execute(
"SELECT draft_name, ref_id FROM draft_refs WHERE ref_type = 'bcp'"
).fetchall()
# Get draft titles and categories
draft_rows = db.conn.execute("SELECT name, title FROM drafts").fetchall()
draft_titles = {r["name"]: r["title"] for r in draft_rows}
total_drafts = len(draft_titles)
rating_rows = db.conn.execute("SELECT draft_name, categories FROM ratings").fetchall()
draft_cats: dict[str, str] = {}
for r in rating_rows:
try:
cats = json.loads(r["categories"]) if r["categories"] else []
draft_cats[r["draft_name"]] = cats[0] if cats else "Other"
except Exception:
draft_cats[r["draft_name"]] = "Other"
# BCP citation counts
bcp_citations: dict[str, list[str]] = defaultdict(list)
draft_bcps: dict[str, list[str]] = defaultdict(list)
for r in bcp_rows:
bcp_citations[r["ref_id"]].append(r["draft_name"])
draft_bcps[r["draft_name"]].append(r["ref_id"])
# All BCPs with counts
bcps = []
for bcp_id, citing_drafts in sorted(bcp_citations.items(),
key=lambda x: len(x[1]), reverse=True):
bcps.append({
"bcp_id": bcp_id,
"count": len(citing_drafts),
"drafts": citing_drafts[:10],
"total_drafts": len(citing_drafts),
})
# Co-citation matrix: which BCPs appear together in the same draft
bcp_ids = sorted(bcp_citations.keys())
co_citation = []
for i, bcp_a in enumerate(bcp_ids):
drafts_a = set(bcp_citations[bcp_a])
for j, bcp_b in enumerate(bcp_ids):
if j <= i:
continue
drafts_b = set(bcp_citations[bcp_b])
shared = len(drafts_a & drafts_b)
if shared > 0:
co_citation.append({
"bcp_a": bcp_a,
"bcp_b": bcp_b,
"count": shared,
})
# Heatmap data: full matrix for all BCPs (top 20 by citation count)
top_bcp_ids = [b["bcp_id"] for b in bcps[:20]]
heatmap_matrix = []
for bcp_a in top_bcp_ids:
row = []
drafts_a = set(bcp_citations.get(bcp_a, []))
for bcp_b in top_bcp_ids:
drafts_b = set(bcp_citations.get(bcp_b, []))
shared = len(drafts_a & drafts_b)
row.append(shared)
heatmap_matrix.append(row)
# BCP citations by category
cat_bcp_count: dict[str, Counter] = defaultdict(Counter)
for draft_name, bcp_list in draft_bcps.items():
cat = draft_cats.get(draft_name, "Other")
for bcp_id in bcp_list:
cat_bcp_count[cat][bcp_id] += 1
by_category = []
for cat in sorted(cat_bcp_count.keys()):
top_bcps = cat_bcp_count[cat].most_common(5)
by_category.append({
"category": cat,
"total_bcp_refs": sum(cat_bcp_count[cat].values()),
"unique_bcps": len(cat_bcp_count[cat]),
"top_bcps": [{"bcp_id": bid, "count": c} for bid, c in top_bcps],
})
by_category.sort(key=lambda x: x["total_bcp_refs"], reverse=True)
# Coverage
drafts_with_bcp = len(draft_bcps)
coverage_pct = (drafts_with_bcp / total_drafts * 100) if total_drafts > 0 else 0
return {
"bcps": bcps,
"co_citation": co_citation,
"heatmap_labels": top_bcp_ids,
"heatmap_matrix": heatmap_matrix,
"by_category": by_category,
"coverage": {
"total_drafts": total_drafts,
"drafts_with_bcp": drafts_with_bcp,
"coverage_pct": round(coverage_pct, 1),
"unique_bcps": len(bcp_citations),
"total_bcp_refs": len(bcp_rows),
},
}