Files
2026-04-12 17:56:44 -04:00

1641 lines
52 KiB
Python

"""Responder-based engine for kennethreitz.org."""
import responder
from tuftecms.core.markdown import render_markdown_file
from tuftecms.core.cache import (
get_blog_cache,
clear_all_caches,
get_sidenotes_cache,
get_outlines_cache,
get_quotes_cache,
get_connections_cache,
get_terms_cache,
get_themes_cache,
)
from tuftecms.utils.content import (
get_directory_structure,
extract_intelligent_date,
generate_folder_icon,
get_cached_markdown_title,
)
from tuftecms.utils.svg_icons import generate_unique_svg_icon
from tuftecms.blueprints.content import extract_exif_data
from pathlib import Path
from datetime import datetime
from collections import defaultdict
import hashlib
import html
import io
import logging
import random
import re
import textwrap
import os
DATA_DIR = Path("data")
# --- Search index (built once at startup) ---
_search_index = []
def _build_search_index():
"""Scan all markdown files once and build a cached search index."""
global _search_index
index = []
for md_file in DATA_DIR.rglob("*.md"):
if md_file.name == "index.md":
url = "/" + str(md_file.parent.relative_to(DATA_DIR))
else:
url = "/" + str(md_file.relative_to(DATA_DIR).with_suffix(""))
try:
raw = md_file.read_text()
except (OSError, UnicodeDecodeError):
continue
# Extract title from first heading
title = md_file.stem.replace("-", " ").replace("_", " ").title()
for line in raw.split("\n"):
line_stripped = line.strip()
if line_stripped.startswith("# "):
title = line_stripped[2:].strip()
break
# Determine section from path
parts = md_file.relative_to(DATA_DIR).parts
section = parts[0] if len(parts) > 1 else ""
# Generate icon
if md_file.name == "index.md":
icon = generate_folder_icon(title, size=18)
else:
icon = generate_unique_svg_icon(title, size=18)
index.append({
"title": title,
"url": url,
"raw_text": raw.lower(),
"raw": raw,
"section": section,
"icon": icon,
})
_search_index = index
api = responder.API(
templates_dir="tuftecms/templates",
static_dir="tuftecms/static",
static_route="/static",
enable_logging=True,
title="kennethreitz.org",
description="API for kennethreitz.org",
)
# --- Rate limiting ---
from responder.ext.ratelimit import RateLimiter
RateLimiter(requests=600, period=60).install(api)
# Suppress access logging for static assets.
class _StaticFilter(logging.Filter):
def filter(self, record):
msg = record.getMessage()
return "/static/" not in msg and not msg.endswith((".js", ".css", ".ico", ".png", ".woff2"))
logging.getLogger("responder.access").addFilter(_StaticFilter())
logging.getLogger("fontTools.subset").setLevel(logging.WARNING)
# --- Markdown render cache ---
_render_cache = {}
def _cached_render(file_path):
"""Cache rendered markdown by file path."""
key = str(file_path)
if key not in _render_cache:
_render_cache[key] = render_markdown_file(file_path)
return _render_cache[key]
# Custom template filters
api.templates.context["current_year"] = datetime.now().year
def strftime_filter(value, fmt="%B %d, %Y"):
if value == "now":
return datetime.now().strftime(fmt)
if hasattr(value, "strftime"):
return value.strftime(fmt)
return value
api.templates._env.filters["strftime"] = strftime_filter
class FakeConfig(dict):
"""Minimal config stand-in for Flask template compatibility."""
def get(self, key, default=None):
return os.environ.get(key, default)
class RequestWrapper:
"""Wraps Responder request to provide Flask-like interface for templates."""
def __init__(self, req, path):
self._req = req
self.path = path
self.environ = {}
def __getattr__(self, name):
return getattr(self._req, name)
_config = FakeConfig()
try:
from weasyprint import HTML
_pdf_available = True
except (ImportError, OSError):
_pdf_available = False
def render(template, req, path="/", **kwargs):
"""Render a template with common context."""
kwargs.setdefault("current_year", datetime.now().year)
kwargs.setdefault("current_path", path)
kwargs["request"] = RequestWrapper(req, path)
kwargs["config"] = _config
kwargs.setdefault("pdf_available", _pdf_available)
return api.templates.render(template, **kwargs)
# --- Cache for OG images ---
_og_image_cache = {}
_OG_CACHE_MAX = 256
# --- Bot detection ---
_BOT_PATTERNS = re.compile(
r"(?i)(googlebot|bingbot|slurp|duckduckbot|baiduspider|yandexbot|sogou|"
r"exabot|facebookexternalhit|facebot|ia_archiver|alexa|msnbot|"
r"semrushbot|ahrefsbot|dotbot|petalbot|mj12bot|bytespider|"
r"gptbot|chatgpt|claudebot|anthropic|ccbot|commoncrawl|"
r"scrapy|python-requests|httpx|curl|wget|go-http-client|"
r"applebot|twitterbot|linkedinbot|whatsapp|telegrambot|"
r"dataprovider|censys|zgrab|masscan|nuclei|httpie)"
)
def _detect_bot(req):
"""Detect and log bot/scraper requests. Returns bot name or None."""
ua = req.headers.get("User-Agent", "")
if not ua:
return "unknown (no user-agent)"
m = _BOT_PATTERNS.search(ua)
if m:
return m.group(1)
return None
# --- Routes ---
# IMPORTANT: All specific routes must be defined BEFORE the catch-all route.
@api.route("/")
async def homepage(req, resp):
blog_data = get_blog_cache()
recent_posts = blog_data.get("posts", [])[:5]
resp.html = render("homepage.html", req, "/",
title="Home",
recent_posts=recent_posts,
)
@api.route("/health")
async def health(req, resp):
resp.media = {"status": "healthy", "timestamp": datetime.now().isoformat()}
@api.route("/random")
async def random_post(req, resp):
blog_data = get_blog_cache()
posts = blog_data.get("posts", [])
if posts:
chosen = random.choice(posts)
api.log.info("Redirecting to random post: %s", chosen["url"])
resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
api.redirect(resp, chosen["url"])
else:
resp.status_code = 404
@api.route("/search")
async def search_page(req, resp):
resp.html = render("search.html", req, "/search",
title="Search",
)
# --- Feed routes ---
@api.route("/robots.txt")
async def robots_txt(req, resp):
bot = _detect_bot(req)
ua = req.headers.get("User-Agent", "no user-agent")
api.log.info("Bot detected: %s (%s)", bot or "unknown", ua)
robots_content = """# Robots.txt for kennethreitz.org
# Welcome, friendly crawlers!
User-agent: *
Allow: /
# Sitemap location
Sitemap: https://kennethreitz.org/sitemap.xml
# Crawl-delay suggestion (be gentle)
Crawl-delay: 1
# Disallow admin/internal paths (none currently, but good practice)
# Disallow: /admin/"""
resp.text = robots_content.strip()
resp.headers["Content-Type"] = "text/plain"
@api.route("/sitemap")
async def sitemap_page(req, resp):
blog_data = get_blog_cache()
posts = blog_data.get("posts", [])
sitemap_data = {
"homepage": [{"url": "/", "title": "Home", "modified": None}],
"directory": [
{"url": "/archive", "title": "Archive", "modified": None},
{"url": "/sidenotes", "title": "Sidenotes", "modified": None},
{"url": "/outlines", "title": "Outlines", "modified": None},
{"url": "/quotes", "title": "Quotes", "modified": None},
{"url": "/connections", "title": "Connections", "modified": None},
{"url": "/terms", "title": "Terms", "modified": None},
{"url": "/graph", "title": "Graph", "modified": None},
{"url": "/search", "title": "Search", "modified": None},
],
"article": [
{"url": post["url"], "title": post["title"], "modified": post["pub_date"]}
for post in posts
],
}
total_items = (
len(sitemap_data["homepage"])
+ len(sitemap_data["directory"])
+ len(sitemap_data["article"])
)
resp.html = render("sitemap.html", req, "/sitemap",
sitemap_data=sitemap_data,
total_items=total_items,
title="Sitemap",
)
@api.route("/sitemap.xml")
async def sitemap_xml(req, resp):
blog_data = get_blog_cache()
posts = blog_data.get("posts", [])
urls = []
# Add homepage
urls.append({
"url": "https://kennethreitz.org/",
"lastmod": datetime.now().strftime("%Y-%m-%d"),
"priority": "1.0",
})
# Add static pages
static_pages = [
"/archive",
"/sidenotes",
"/outlines",
"/quotes",
"/connections",
"/terms",
"/graph",
"/search",
"/docs",
"/docs/getting-started",
"/docs/content-structure",
"/docs/sidenotes",
"/docs/customization",
"/docs/deployment",
]
for page in static_pages:
urls.append({
"url": f"https://kennethreitz.org{page}",
"lastmod": datetime.now().strftime("%Y-%m-%d"),
"priority": "0.8",
})
# Add blog posts
for post in posts:
urls.append({
"url": f"https://kennethreitz.org{post['url']}",
"lastmod": post["date_str"],
"priority": "0.9",
})
# Generate XML
xml_lines = ['<?xml version="1.0" encoding="UTF-8"?>']
xml_lines.append('<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">')
for url_data in urls:
xml_lines.append(" <url>")
xml_lines.append(f" <loc>{url_data['url']}</loc>")
xml_lines.append(f" <lastmod>{url_data['lastmod']}</lastmod>")
xml_lines.append(f" <priority>{url_data['priority']}</priority>")
xml_lines.append(" </url>")
xml_lines.append("</urlset>")
xml_content = "\n".join(xml_lines)
resp.text = xml_content
resp.headers["Content-Type"] = "application/xml"
@api.route("/feed.xml")
async def feed_xml(req, resp):
await _rss_feed(req, resp)
@api.route("/rss.xml")
async def rss_xml(req, resp):
await _rss_feed(req, resp)
async def _rss_feed(req, resp):
"""Generate RSS feed."""
blog_data = get_blog_cache()
posts = blog_data.get("posts", [])
recent_posts = posts[:20]
rss_lines = ['<?xml version="1.0" encoding="UTF-8"?>']
rss_lines.append('<rss version="2.0">')
rss_lines.append(" <channel>")
rss_lines.append(" <title>Kenneth Reitz</title>")
rss_lines.append(" <link>https://kennethreitz.org</link>")
rss_lines.append(
" <description>Creator of Requests, Pipenv, and other tools. Writing about technology, consciousness, and human-centered design.</description>"
)
rss_lines.append(" <language>en-us</language>")
rss_lines.append(
f' <lastBuildDate>{datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")}</lastBuildDate>'
)
for post in recent_posts:
rss_lines.append(" <item>")
rss_lines.append(f' <title>{html.escape(post["title"])}</title>')
rss_lines.append(
f' <link>https://kennethreitz.org{post["url"]}</link>'
)
rss_lines.append(
f' <guid>https://kennethreitz.org{post["url"]}</guid>'
)
rss_lines.append(
f' <description>{html.escape(post.get("description", post.get("excerpt", "")))}</description>'
)
pub_date = post["pub_date"]
rss_date = pub_date.strftime("%a, %d %b %Y %H:%M:%S %z") if pub_date else ""
if rss_date:
rss_lines.append(f" <pubDate>{rss_date}</pubDate>")
rss_lines.append(" </item>")
rss_lines.append(" </channel>")
rss_lines.append("</rss>")
rss_content = "\n".join(rss_lines)
resp.text = rss_content
resp.headers["Content-Type"] = "application/xml"
# --- OG image route ---
@api.route("/og-image/{path:path}.png")
async def og_image(req, resp, *, path):
"""Generate a dynamic Open Graph image for a post."""
from PIL import Image, ImageDraw, ImageFont
api.log.info("Generating OG image for /%s", path)
# Check cache
cache_key = path
if cache_key in _og_image_cache:
resp.content = _og_image_cache[cache_key]
resp.headers["Content-Type"] = "image/png"
resp.headers["Cache-Control"] = "public, max-age=86400"
return
# Validate path stays within data directory
file_path = (DATA_DIR / f"{path}.md").resolve()
if not str(file_path).startswith(str(DATA_DIR.resolve())):
resp.status_code = 403
return
# Resolve the post title from the markdown file
title = path.split("/")[-1].replace("-", " ").replace("_", " ").title()
subtitle = None
if file_path.exists():
content = file_path.read_text()
for line in content.split("\n"):
line = line.strip()
if line.startswith("# "):
title = line[2:].strip()
break
for line in content.split("\n"):
line = line.strip()
if line.startswith("*") and line.endswith("*") and len(line) < 60:
subtitle = line.strip("*")
break
# Image dimensions (standard OG)
width, height = 1200, 630
# Create image with Tufte cream background
img = Image.new("RGB", (width, height), "#fffff8")
draw = ImageDraw.Draw(img)
# Draw subtle bottom gradient
for y in range(height - 80, height):
alpha = (y - (height - 80)) / 80
r = int(255 * (1 - alpha) + 245 * alpha)
g = int(255 * (1 - alpha) + 245 * alpha)
b = int(248 * (1 - alpha) + 232 * alpha)
draw.line([(0, y), (width, y)], fill=(r, g, b))
# Load fonts (bundled et-book)
font_dir = Path(__file__).parent / "tuftecms" / "static" / "tufte" / "et-book"
try:
font_italic = ImageFont.truetype(
str(font_dir / "et-book-display-italic-old-style-figures" / "et-book-display-italic-old-style-figures.ttf"),
62,
)
font_roman = ImageFont.truetype(
str(font_dir / "et-book-roman-line-figures" / "et-book-roman-line-figures.ttf"),
26,
)
font_small = ImageFont.truetype(
str(font_dir / "et-book-roman-line-figures" / "et-book-roman-line-figures.ttf"),
22,
)
except (OSError, IOError):
font_italic = ImageFont.load_default()
font_roman = ImageFont.load_default()
font_small = ImageFont.load_default()
# Draw accent line
draw.rectangle([80, 180, 200, 184], fill="#333333")
# Word-wrap and draw title
max_chars = 28 if len(title) > 28 else 40
wrapped = textwrap.wrap(title, width=max_chars)
y_pos = 210
for line in wrapped[:3]:
draw.text((80, y_pos), line, font=font_italic, fill="#111111")
y_pos += 72
# Draw subtitle/date if available
if subtitle:
draw.text((80, y_pos + 20), subtitle, font=font_roman, fill="#666666")
# Draw separator line
draw.rectangle([80, height - 110, width - 80, height - 108], fill="#dddddd")
# Draw site URL
draw.text((80, height - 85), "kennethreitz.org", font=font_small, fill="#999999")
# Draw author name on right
author_text = "Kenneth Reitz"
bbox = draw.textbbox((0, 0), author_text, font=font_small)
author_width = bbox[2] - bbox[0]
draw.text((width - 80 - author_width, height - 85), author_text, font=font_small, fill="#999999")
# Draw decorative circles
cx, cy = 1060, 300
for r_val, c in [(50, "#dddddd"), (35, "#cccccc"), (20, "#bbbbbb")]:
draw.ellipse([cx - r_val, cy - r_val, cx + r_val, cy + r_val], outline=c, width=2)
draw.ellipse([cx - 5, cy - 5, cx + 5, cy + 5], fill="#999999")
# Export to PNG bytes
buf = io.BytesIO()
img.save(buf, format="PNG", optimize=True)
png_bytes = buf.getvalue()
# Cache it (bounded)
if len(_og_image_cache) >= _OG_CACHE_MAX:
_og_image_cache.pop(next(iter(_og_image_cache)))
_og_image_cache[cache_key] = png_bytes
resp.content = png_bytes
resp.headers["Content-Type"] = "image/png"
resp.headers["Cache-Control"] = "public, max-age=86400"
# --- Archive routes ---
@api.route("/archive")
async def archive(req, resp):
blog_data = get_blog_cache()
posts = blog_data.get("posts", [])
# Group posts by year
grouped_posts = defaultdict(list)
for post in posts:
year = post["pub_date"].year
grouped_posts[year].append(post)
# Sort years in descending order
grouped_posts = dict(sorted(grouped_posts.items(), reverse=True))
resp.html = render("archive.html", req, "/archive",
posts=posts,
grouped_posts=grouped_posts,
archive_title="Complete Archive",
title="Archive",
)
@api.route("/archive/by-length")
async def archive_by_length(req, resp):
blog_data = get_blog_cache()
posts = blog_data.get("posts", [])
# Calculate reading time (average 200 words per minute)
for post in posts:
word_count = post.get("word_count", 0)
post["reading_time"] = max(1, round(word_count / 200))
# Group by reading time ranges
quick_reads = [p for p in posts if p["reading_time"] <= 3]
short_reads = [p for p in posts if 4 <= p["reading_time"] <= 7]
medium_reads = [p for p in posts if 8 <= p["reading_time"] <= 15]
long_reads = [p for p in posts if p["reading_time"] > 15]
# Sort each group by word count descending
quick_reads.sort(key=lambda x: x.get("word_count", 0), reverse=True)
short_reads.sort(key=lambda x: x.get("word_count", 0), reverse=True)
medium_reads.sort(key=lambda x: x.get("word_count", 0), reverse=True)
long_reads.sort(key=lambda x: x.get("word_count", 0), reverse=True)
grouped_posts = {}
if quick_reads:
grouped_posts["Quick Reads (1-3 min)"] = quick_reads
if short_reads:
grouped_posts["Short Reads (4-7 min)"] = short_reads
if medium_reads:
grouped_posts["Medium Reads (8-15 min)"] = medium_reads
if long_reads:
grouped_posts["Long Reads (15+ min)"] = long_reads
resp.html = render("archive-by-length.html", req, "/archive/by-length",
posts=posts,
grouped_posts=grouped_posts,
archive_title="By Reading Time",
title="Archive by Reading Time",
)
@api.route("/archive/sidenotes")
async def sidenotes(req, resp):
cache_data = get_sidenotes_cache()
blog_data = get_blog_cache()
# Create lookup for post metadata
post_lookup = {
f"/essays/{post['path'].split('/')[-1]}": post
for post in blog_data.get("posts", [])
}
# Convert cache format to template format
articles = []
total_sidenotes = 0
for _, sidenotes_list in cache_data.get("sidenotes", {}).items():
if sidenotes_list:
first_sidenote = sidenotes_list[0]
article_title = first_sidenote.get("title", "Unknown")
article_url = first_sidenote.get("url", "#")
post_data = post_lookup.get(article_url, {})
articles.append({
"title": article_title,
"url": article_url,
"sidenotes": sidenotes_list,
"category": "essays",
"date": post_data.get("pub_date"),
"unique_icon": post_data.get("unique_icon"),
})
total_sidenotes += len(sidenotes_list)
articles.sort(key=lambda x: x.get("date") or datetime.min, reverse=True)
resp.html = render("sidenotes.html", req, "/archive/sidenotes",
articles=articles,
total_count=total_sidenotes,
title="Sidenotes",
)
@api.route("/archive/outlines")
async def outlines(req, resp):
cache_data = get_outlines_cache()
blog_data = get_blog_cache()
post_lookup = {
f"/essays/{post['path'].split('/')[-1]}": post
for post in blog_data.get("posts", [])
}
articles = []
total_headings = 0
for _, headings_list in cache_data.get("outlines", {}).items():
if headings_list:
first_heading = headings_list[0]
article_title = first_heading.get("title", "Unknown")
article_url = first_heading.get("url", "#")
post_data = post_lookup.get(article_url, {})
articles.append({
"title": article_title,
"url": article_url,
"headings": headings_list,
"category": "essays",
"date": post_data.get("pub_date"),
"unique_icon": post_data.get("unique_icon"),
})
total_headings += len(headings_list)
articles.sort(key=lambda x: x.get("date") or datetime.min, reverse=True)
resp.html = render("outlines.html", req, "/archive/outlines",
articles=articles,
total_count=total_headings,
title="Outlines",
)
@api.route("/archive/quotes")
async def quotes(req, resp):
cache_data = get_quotes_cache()
blog_data = get_blog_cache()
post_lookup = {
f"/essays/{post['path'].split('/')[-1]}": post
for post in blog_data.get("posts", [])
}
articles = []
total_quotes = 0
for _, quotes_list in cache_data.get("quotes", {}).items():
if quotes_list:
first_quote = quotes_list[0]
article_title = first_quote.get("title", "Unknown")
article_url = first_quote.get("url", "#")
post_data = post_lookup.get(article_url, {})
articles.append({
"title": article_title,
"url": article_url,
"quotes": quotes_list,
"category": "essays",
"date": post_data.get("pub_date"),
"unique_icon": post_data.get("unique_icon"),
})
total_quotes += len(quotes_list)
articles.sort(key=lambda x: x.get("date") or datetime.min, reverse=True)
resp.html = render("quotes.html", req, "/archive/quotes",
articles=articles,
total_count=total_quotes,
title="Quotes",
)
@api.route("/archive/connections")
async def connections(req, resp):
cache_data = get_connections_cache()
blog_data = get_blog_cache()
post_lookup = {
f"/essays/{post['path'].split('/')[-1]}": post
for post in blog_data.get("posts", [])
}
articles = []
outgoing_data = cache_data.get("outgoing", {})
incoming_data = cache_data.get("incoming", {})
stats = cache_data.get("stats", {})
for file_path, connections_list in outgoing_data.items():
if connections_list:
file_stem = Path(file_path).stem
article_url = f"/essays/{file_stem}"
post_data = post_lookup.get(article_url, {})
article_title = post_data.get("title", file_stem.replace("-", " ").title())
incoming_connections = incoming_data.get(article_url, [])
formatted_outgoing = []
for conn in connections_list:
formatted_outgoing.append(
{"link_text": conn["text"], "target_url": conn["url"]}
)
formatted_incoming = []
for conn in incoming_connections:
formatted_incoming.append({
"link_text": conn["text"],
"source_url": conn.get("source_url", "#"),
"context": conn.get("context", ""),
})
articles.append({
"title": article_title,
"url": article_url,
"outgoing_connections": formatted_outgoing,
"incoming_connections": formatted_incoming,
"category": "essays",
"date": post_data.get("pub_date"),
"unique_icon": post_data.get("unique_icon"),
})
articles.sort(key=lambda x: x.get("date") or datetime.min, reverse=True)
resp.html = render("connections.html", req, "/archive/connections",
articles=articles,
total_outgoing=stats.get("total_outgoing", 0),
total_incoming=stats.get("total_incoming", 0),
title="Connections",
)
@api.route("/archive/terms")
async def terms(req, resp):
terms_data = get_terms_cache()
stats = terms_data.get("stats", {})
resp.html = render("terms.html", req, "/archive/terms",
terms=terms_data.get("terms", {}),
total_terms=stats.get("total_terms", 0),
total_occurrences=stats.get("total_references", 0),
title="Index of Terms",
)
@api.route("/archive/themes")
async def themes_archive(req, resp):
themes_data = get_themes_cache()
themes = themes_data.get("themes", {})
stats = themes_data.get("stats", {})
sorted_themes = sorted(themes.items(), key=lambda x: x[0])
resp.html = render("themes.html", req, "/archive/themes",
themes=dict(sorted_themes),
total_themes=stats.get("total_themes", 0),
total_occurrences=stats.get("total_occurrences", 0),
title="Themes",
)
@api.route("/archive/graph")
async def graph(req, resp):
resp.html = render("graph.html", req, "/archive/graph",
title="Knowledge Graph",
)
@api.route("/archive/graph/data")
async def graph_data(req, resp):
connections_data = get_connections_cache()
blog_data = get_blog_cache()
post_lookup = {
f"/essays/{post['path'].split('/')[-1]}": post
for post in blog_data.get("posts", [])
}
nodes = []
edges = []
node_ids = set()
for source, targets in connections_data.get("outgoing", {}).items():
source_id = source.replace("data/", "").replace(".md", "")
source_url = (
f"/essays/{source_id.split('/')[-1]}"
if "essays" in source_id
else f"/{source_id}"
)
if source_id not in node_ids:
post_data = post_lookup.get(source_url, {})
nodes.append({
"id": source_id,
"title": post_data.get(
"title", source_id.split("/")[-1].replace("-", " ").title()
),
"label": post_data.get(
"title", source_id.split("/")[-1].replace("-", " ").title()
),
"url": source_url,
"category": source_id.split("/")[0] if "/" in source_id else "root",
"group": source_id.split("/")[0] if "/" in source_id else "root",
})
node_ids.add(source_id)
for target_info in targets:
target_url = target_info["url"].strip("/")
if target_url:
target_id = target_url.replace("/", "_")
if target_id not in node_ids:
target_post_data = post_lookup.get(f"/{target_url}", {})
nodes.append({
"id": target_id,
"title": target_post_data.get(
"title",
target_url.split("/")[-1].replace("-", " ").title(),
),
"label": target_post_data.get(
"title",
target_url.split("/")[-1].replace("-", " ").title(),
),
"url": f"/{target_url}",
"category": (
target_url.split("/")[0]
if "/" in target_url
else "root"
),
"group": (
target_url.split("/")[0]
if "/" in target_url
else "root"
),
})
node_ids.add(target_id)
edges.append({
"source": source_id,
"target": target_id,
"label": target_info["text"],
"link_text": target_info["text"],
})
resp.media = {"nodes": nodes, "edges": edges}
# --- Directory route ---
@api.route("/directory")
async def directory(req, resp):
items = get_directory_structure(DATA_DIR)
resp.html = render("directory.html", req, "/directory",
items=items,
current_path="/",
breadcrumb=[],
title="Directory",
)
# --- API routes ---
@api.route("/api/blog")
async def api_blog(req, resp):
blog_data = get_blog_cache()
posts = blog_data.get("posts", [])
posts_data = [
{
"title": post.get("title", ""),
"url": post.get("url", ""),
"unique_icon": post.get("unique_icon", ""),
}
for post in posts
]
resp.media = {"posts": posts_data, "total": len(posts_data)}
@api.route("/api/search")
async def api_search(req, resp):
query = req.params.get("q", "").strip()
api.log.info("Search query: %s", query)
if len(query) < 2:
resp.media = {
"query": query,
"results": [],
"error": "Query must be at least 2 characters long",
}
return
results = []
query_lower = query.lower()
for entry in _search_index:
score = 0
matches = []
# Search in title (highest weight)
if query_lower in entry["title"].lower():
score += 10
matches.append("title")
# Search in content (medium weight)
if query_lower in entry["raw_text"]:
score += 5
matches.append("content")
if score > 0:
snippet = ""
if "content" in matches:
query_index = entry["raw_text"].find(query_lower)
if query_index != -1:
raw = entry["raw"]
start = max(0, query_index - 100)
end = min(len(raw), query_index + len(query) + 100)
snippet = raw[start:end].strip()
if start > 0:
snippet = "..." + snippet
if end < len(raw):
snippet = snippet + "..."
results.append({
"title": entry["title"],
"url": entry["url"],
"snippet": snippet,
"section": entry["section"],
"score": score,
"matches": matches,
"unique_icon": entry["icon"],
})
results.sort(key=lambda x: x["score"], reverse=True)
results = results[:50]
resp.media = {"query": query, "results": results, "total": len(results)}
@api.route("/api/search/autocomplete")
async def api_search_autocomplete(req, resp):
query = req.params.get("q", "").strip()
if len(query) < 1:
resp.media = {"results": []}
return
query_lower = query.lower()
matches = []
for entry in _search_index:
if query_lower in entry["title"].lower():
matches.append({
"title": entry["title"],
"url": entry["url"],
"icon": entry["icon"],
})
if len(matches) >= 8:
break
resp.media = {"results": matches}
@api.route("/api/icon/{article_path:path}")
async def api_icon(req, resp, *, article_path):
"""Generate and return an SVG icon for a given article."""
title = article_path # fallback to path
is_folder = False
data_path = Path("data") / f"{article_path}.md"
if data_path.exists():
cached_title = get_cached_markdown_title(data_path)
if cached_title:
title = cached_title
else:
dir_path = Path("data") / article_path
if dir_path.is_dir():
index_file = dir_path / "index.md"
if index_file.exists():
is_folder = True
cached_title = get_cached_markdown_title(index_file)
if cached_title:
title = cached_title
else:
title = article_path.split("/")[-1].replace("-", " ").replace("_", " ").title()
else:
title = article_path.split("/")[-1].replace("-", " ").replace("_", " ").title()
if is_folder:
icon_data = generate_folder_icon(title, size=24)
else:
icon_data = generate_unique_svg_icon(title, size=24)
resp.media = {"success": True, "path": article_path, "icon": icon_data}
@api.route("/api/debug-cache")
async def api_debug_cache(req, resp):
cache_info = {
"blog": get_blog_cache()["stats"],
"sidenotes": get_sidenotes_cache()["stats"],
"outlines": get_outlines_cache()["stats"],
"quotes": get_quotes_cache()["stats"],
"connections": get_connections_cache()["stats"],
"terms": get_terms_cache()["stats"],
}
lru_cache_info = {
"blog_cache": {
"hits": get_blog_cache.cache_info().hits,
"misses": get_blog_cache.cache_info().misses,
"maxsize": get_blog_cache.cache_info().maxsize,
"currsize": get_blog_cache.cache_info().currsize,
},
"sidenotes_cache": {
"hits": get_sidenotes_cache.cache_info().hits,
"misses": get_sidenotes_cache.cache_info().misses,
"maxsize": get_sidenotes_cache.cache_info().maxsize,
"currsize": get_sidenotes_cache.cache_info().currsize,
},
"outlines_cache": {
"hits": get_outlines_cache.cache_info().hits,
"misses": get_outlines_cache.cache_info().misses,
"maxsize": get_outlines_cache.cache_info().maxsize,
"currsize": get_outlines_cache.cache_info().currsize,
},
"quotes_cache": {
"hits": get_quotes_cache.cache_info().hits,
"misses": get_quotes_cache.cache_info().misses,
"maxsize": get_quotes_cache.cache_info().maxsize,
"currsize": get_quotes_cache.cache_info().currsize,
},
"connections_cache": {
"hits": get_connections_cache.cache_info().hits,
"misses": get_connections_cache.cache_info().misses,
"maxsize": get_connections_cache.cache_info().maxsize,
"currsize": get_connections_cache.cache_info().currsize,
},
"terms_cache": {
"hits": get_terms_cache.cache_info().hits,
"misses": get_terms_cache.cache_info().misses,
"maxsize": get_terms_cache.cache_info().maxsize,
"currsize": get_terms_cache.cache_info().currsize,
},
}
resp.media = {
"status": "Cache data loaded successfully",
"cache_stats": cache_info,
"lru_cache_info": lru_cache_info,
}
@api.route("/api/directory-tree")
async def api_directory_tree(req, resp):
folders = []
files = []
try:
for item in sorted(DATA_DIR.iterdir()):
if item.name.startswith(".") or item.name in ["__pycache__", "node_modules"]:
continue
relative_path = item.relative_to(DATA_DIR)
url_path = "/" + str(relative_path)
if item.is_dir():
display_name = item.name.replace("-", " ").replace("_", " ").title()
index_file = item / "index.md"
if index_file.exists():
title = get_cached_markdown_title(index_file)
if title:
display_name = title
icon_svg = generate_folder_icon(display_name, size=18)
folders.append({
"name": item.name,
"path": url_path,
"is_dir": True,
"icon": icon_svg,
})
elif item.suffix == ".md" and item.name != "index.md":
display_name = item.stem
title = get_cached_markdown_title(item)
if title:
display_name = title
icon_svg = generate_unique_svg_icon(display_name, size=18)
files.append({
"name": item.stem,
"path": url_path.replace(".md", ""),
"is_dir": False,
"icon": icon_svg,
})
except (PermissionError, OSError):
pass
items = folders + files
resp.media = {"items": items}
@api.route("/api/themes")
async def api_themes(req, resp):
themes_dir = DATA_DIR / "themes"
items = []
if themes_dir.is_dir():
for f in sorted(themes_dir.glob("*.md")):
if f.name == "index.md":
continue
title = get_cached_markdown_title(f) or f.stem.replace("-", " ").replace("_", " ").title()
icon = generate_unique_svg_icon(title, size=18)
items.append({"name": title, "path": f"/themes/{f.stem}", "icon": icon})
resp.media = {"themes": items}
@api.route("/api/schema")
async def api_schema(req, resp):
"""Serve auto-generated OpenAPI schema from registered routes."""
paths = {}
skip = {"catch_all", "serve_static", "serve_data_file", "api_schema", "api_docs"}
for r in api.router.routes:
fn = getattr(r, "endpoint", None)
route = getattr(r, "route", None)
if not fn or not route or fn.__name__ in skip:
continue
if not route.startswith("/api"):
continue
doc = (fn.__doc__ or fn.__name__.replace("_", " ").title()).split("\n")[0].strip()
params = []
for part in route.split("/"):
if part.startswith("{") and part.endswith("}"):
name = part.strip("{}").split(":")[0]
params.append({"name": name, "in": "path", "required": True, "schema": {"type": "string"}})
op = {"summary": doc, "responses": {"200": {"description": "OK"}}}
if params:
op["parameters"] = params
# Detect query params from docstring hints or known routes
if "search" in route and "q" not in [p["name"] for p in params]:
op.setdefault("parameters", []).append(
{"name": "q", "in": "query", "required": True, "schema": {"type": "string"}}
)
paths[route] = {"get": op}
resp.media = {
"openapi": "3.0.0",
"info": {"title": "kennethreitz.org", "version": "1.0"},
"paths": paths,
}
@api.route("/api")
async def api_docs(req, resp):
"""Serve interactive API documentation."""
resp.html = """<!DOCTYPE html>
<html><head>
<title>API — kennethreitz.org</title>
<meta charset="utf-8">
<link rel="stylesheet" href="https://unpkg.com/swagger-ui-dist/swagger-ui.css">
<script src="https://unpkg.com/swagger-ui-dist/swagger-ui-bundle.js"></script>
</head><body>
<div id="swagger-ui"></div>
<script>SwaggerUIBundle({url: "/api/schema", dom_id: "#swagger-ui", deepLinking: true, presets: [SwaggerUIBundle.presets.apis, SwaggerUIBundle.SwaggerUIStandalonePreset], layout: "BaseLayout"});</script>
</body></html>"""
@api.route("/api/oembed")
async def api_oembed(req, resp):
"""Proxy oEmbed discovery and fetch to avoid CORS issues."""
import json
import re
import urllib.request
url = req.params.get("url", "").strip()
if not url or not url.startswith("https://"):
resp.status_code = 400
resp.media = {"error": "Missing or invalid url parameter"}
return
try:
# Step 1: Discover oEmbed endpoint from the target page.
r = urllib.request.Request(url, headers={"User-Agent": "kennethreitz.org"})
res = urllib.request.urlopen(r, timeout=5)
page_html = res.read().decode("utf-8", errors="replace")
endpoint = None
for pattern in [
r'<link[^>]+type=["\']application/json\+oembed["\'][^>]+href=["\']([^"\']+)["\']',
r'<link[^>]+href=["\']([^"\']+)["\'][^>]+type=["\']application/json\+oembed["\']',
]:
match = re.search(pattern, page_html)
if match:
endpoint = match.group(1)
break
if not endpoint:
resp.status_code = 404
resp.media = {"error": "No oEmbed endpoint found"}
return
# Step 2: Fetch oEmbed data from the discovered endpoint.
r = urllib.request.Request(
endpoint, headers={"User-Agent": "kennethreitz.org"}
)
res = urllib.request.urlopen(r, timeout=5)
data = json.loads(res.read())
resp.media = data
except Exception:
resp.status_code = 502
resp.media = {"error": "oEmbed discovery failed"}
# --- Data file serving ---
@api.route("/data/{path:path}")
async def serve_data_file(req, resp, *, path):
"""Serve static files from the data directory."""
file_path = (DATA_DIR / path).resolve()
if not str(file_path).startswith(str(DATA_DIR.resolve())):
resp.status_code = 403
return
if file_path.exists() and file_path.is_file():
resp.file(str(file_path))
else:
resp.status_code = 404
# --- PDF export ---
def _generate_pdf(md_path):
"""Generate PDF bytes from a markdown file. Returns bytes or raises."""
from weasyprint import HTML
from weasyprint.text.fonts import FontConfiguration
content_data = render_markdown_file(md_path)
pdf_html = api.templates.render(
"pdf.html",
content=content_data["content"],
title=content_data["title"],
metadata=content_data.get("metadata", {}),
current_year=datetime.now().year,
reading_time=content_data.get("reading_time"),
word_count=content_data.get("word_count"),
unique_icon=content_data.get("unique_icon"),
)
font_config = FontConfiguration()
pdf_buffer = io.BytesIO()
HTML(string=pdf_html, base_url="https://kennethreitz.org/").write_pdf(
pdf_buffer, font_config=font_config
)
pdf_buffer.seek(0)
return pdf_buffer.read()
@api.route("/{path:path}.pdf")
async def serve_pdf(req, resp, *, path):
"""Generate and serve a PDF version of content."""
file_path = DATA_DIR / f"{path}.md"
if not file_path.exists():
resp.status_code = 404
return
try:
api.log.info("Generating PDF for /%s", path)
resp.content = _generate_pdf(file_path)
resp.headers["Content-Type"] = "application/pdf"
resp.headers["Content-Disposition"] = f'inline; filename="{path.split("/")[-1]}.pdf"'
except (ImportError, OSError):
resp.status_code = 503
resp.text = "PDF generation requires WeasyPrint"
# --- Legacy URL resolver ---
# Pre-built lookup tables for legacy URL resolution (built once at import time).
_legacy_dirs = {} # slug -> url
_legacy_files = {} # stripped_slug -> url
def _build_legacy_index():
"""Build lookup tables from the data directory for fast legacy URL resolution."""
for match in DATA_DIR.rglob("*/"):
if (match / "index.md").exists():
rel = match.relative_to(DATA_DIR)
_legacy_dirs[match.name] = f"/{rel}"
for match in DATA_DIR.rglob("*.md"):
if match.name == "index.md":
continue
stem = match.stem
stripped = re.sub(r"^\d{4}-\d{2}-", "", stem)
rel = match.relative_to(DATA_DIR)
url = f"/{rel.with_suffix('')}"
_legacy_files[stripped] = url
_legacy_files[stem] = url
_build_legacy_index()
def _resolve_legacy_url(path):
"""Try to find current content matching a legacy URL pattern.
Handles:
- Date-path URLs: essays/2013/01/27/slug -> essays/2013-01-slug
- Bare slugs: /tattoos -> /photography/tattoos, /slug -> /essays/...-slug
- Hyphen/underscore normalization
"""
# Pattern 1: /essays/YYYY/MM/DD/slug or /essays/YYYY/MM/slug
m = re.match(r"essays/(\d{4})/(\d{2})(?:/\d{2})?/(.+)$", path)
if m:
year, month, slug = m.groups()
normalized = slug.replace("-", "_")
candidate = DATA_DIR / "essays" / f"{year}-{month}-{normalized}.md"
if candidate.exists():
return f"/essays/{year}-{month}-{normalized}"
# Try fuzzy match on just the slug within that year
for f in (DATA_DIR / "essays").glob(f"{year}-{month}-*"):
if normalized in f.stem:
return f"/essays/{f.stem}"
# Pattern 2: bare slug — fast lookup in pre-built index
slug = path.strip("/").split("/")[-1]
normalized = slug.replace("-", "_")
# Check directories first
if slug in _legacy_dirs:
return _legacy_dirs[slug]
if normalized in _legacy_dirs:
return _legacy_dirs[normalized]
# Check files
if normalized in _legacy_files:
return _legacy_files[normalized]
if slug in _legacy_files:
return _legacy_files[slug]
return None
# --- Catch-all route (MUST be last) ---
@api.route("/static/{path:path}")
async def serve_static(req, resp, *, path):
"""Serve static files explicitly."""
static_path = Path("tuftecms/static") / path
if static_path.exists() and static_path.is_file():
resp.file(str(static_path))
else:
resp.status_code = 404
@api.route("/{path:path}")
async def catch_all(req, resp, *, path):
"""Main content route -- serves markdown files or directories."""
bot = _detect_bot(req)
if bot:
api.log.info("Bot detected: %s crawling /%s", bot, path)
file_path = DATA_DIR / f"{path}.md"
dir_path = DATA_DIR / path
index_path = dir_path / "index.md"
raw_path = DATA_DIR / path
# Serve raw markdown source
if path.endswith(".md"):
source_path = DATA_DIR / path
if source_path.exists():
resp.text = source_path.read_text()
resp.headers["Content-Type"] = "text/plain; charset=utf-8"
return
resp.status_code = 404
return
# Serve PDF export
if path.endswith(".pdf"):
md_path = DATA_DIR / f"{path[:-4]}.md"
if md_path.exists():
api.log.info("Generating PDF for /%s", path[:-4])
try:
resp.content = _generate_pdf(md_path)
resp.headers["Content-Type"] = "application/pdf"
resp.headers["Content-Disposition"] = f'inline; filename="{path.split("/")[-1]}"'
except (ImportError, OSError):
resp.status_code = 503
resp.text = "PDF generation requires WeasyPrint"
return
resp.status_code = 404
return
# Serve raw files (images, etc.) directly from data directory
if raw_path.exists() and raw_path.is_file():
resp.file(str(raw_path))
return
# Serve markdown file
if file_path.exists():
api.log.info("Serving content: /%s", path)
content_data = _cached_render(file_path)
# Detect themes for essays
article_themes = []
if path.startswith("essays/"):
themes_data = get_themes_cache().get("themes", {})
essay_url = f"/essays/{file_path.stem}"
for theme_name, theme_info in themes_data.items():
for article in theme_info.get("articles", []):
if article.get("url") == essay_url:
article_themes.append({"name": theme_name})
break
resp.html = render("post.html", req, f"/{path}",
content=content_data["content"],
title=content_data["title"],
metadata=content_data.get("metadata", {}),
github_file=f"data/{path}.md",
reading_time=content_data.get("reading_time"),
word_count=content_data.get("word_count"),
tags=content_data.get("tags", []),
series_posts=content_data.get("series_posts", []),
series_name=content_data.get("series_name"),
unique_icon=content_data.get("unique_icon"),
article_themes=article_themes,
)
return
# Serve directory with index.md
if dir_path.is_dir() and index_path.exists():
api.log.info("Serving directory: /%s", path)
content_data = _cached_render(index_path)
items = get_directory_structure(dir_path)
# Check for image gallery
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg"}
image_items = []
for item in items:
if not item["is_dir"]:
item_path = DATA_DIR / item["url_path"].lstrip("/")
if item_path.suffix.lower() in image_extensions:
item["is_image"] = True
item["static_path"] = item["url_path"]
item["is_data_file"] = True
if item_path.suffix.lower() in {".jpg", ".jpeg"}:
item["exif"] = extract_exif_data(item_path)
else:
item["exif"] = {}
image_items.append(item)
else:
item["is_image"] = False
else:
item["is_image"] = False
is_image_gallery = len(image_items) > 0
if is_image_gallery:
parts = path.split("/")
breadcrumb = []
current = ""
for part in parts:
current = f"{current}/{part}" if current else part
breadcrumb.append({
"name": part.replace("-", " ").replace("_", " ").title(),
"path": f"/{current}",
})
folder_icon = generate_folder_icon(content_data["title"], size=32)
resp.html = render("directory.html", req, f"/{path}",
items=items,
image_items=image_items,
is_image_gallery=True,
title=content_data["title"],
current_path=f"/{path}",
breadcrumb=breadcrumb,
current_year=datetime.now().year,
folder_icon=folder_icon,
index_content={"content": content_data["content"]},
content_position="top",
)
else:
resp.html = render("post.html", req, f"/{path}",
content=content_data["content"],
title=content_data["title"],
metadata=content_data.get("metadata", {}),
items=items,
current_path=f"/{path}",
github_file=f"data/{path}/index.md",
current_year=datetime.now().year,
reading_time=content_data.get("reading_time"),
word_count=content_data.get("word_count"),
unique_icon=content_data.get("unique_icon"),
)
return
# Serve directory listing (no index.md)
if dir_path.is_dir():
api.log.info("Serving directory listing: /%s", path)
items = get_directory_structure(dir_path)
title = path.split("/")[-1].replace("-", " ").replace("_", " ").title()
parts = path.split("/")
breadcrumb = []
current = ""
for part in parts:
current = f"{current}/{part}" if current else part
breadcrumb.append({
"name": part.replace("-", " ").replace("_", " ").title(),
"path": f"/{current}",
})
folder_icon = generate_folder_icon(title, size=32)
resp.html = render("directory.html", req, f"/{path}",
items=items,
title=title,
current_path=f"/{path}",
breadcrumb=breadcrumb,
current_year=datetime.now().year,
folder_icon=folder_icon,
)
return
# Try to resolve legacy URLs before giving up
redirect_to = _resolve_legacy_url(path)
if redirect_to:
api.log.info("Legacy redirect: /%s -> %s", path, redirect_to)
resp.status_code = 301
resp.headers["Location"] = redirect_to
return
# Nothing found
api.log.warning("Not found: /%s", path)
resp.status_code = 404
resp.html = render("error.html", req, f"/{path}",
title="Not Found",
current_year=datetime.now().year,
error_code=404,
error_message="Page not found.",
)
# Warm caches on startup
@api.on_event("startup")
async def warm_caches():
import threading
def _warm():
api.log.info("Starting background cache warming...")
try:
get_blog_cache()
from tuftecms.core.cache import (
get_sidenotes_cache, get_outlines_cache,
get_quotes_cache, get_connections_cache,
get_terms_cache, get_themes_cache,
)
get_sidenotes_cache()
get_outlines_cache()
get_quotes_cache()
get_connections_cache()
get_terms_cache()
get_themes_cache()
_build_search_index()
api.log.info("Cache warming complete!")
except Exception as e:
api.log.error("Cache warming failed: %s", e)
threading.Thread(target=_warm, daemon=True).start()
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8000))
api.run(port=port)