mirror of
https://github.com/kennethreitz/kennethreitz.org.git
synced 2026-06-05 06:46:13 +00:00
f4730575d5
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1641 lines
52 KiB
Python
1641 lines
52 KiB
Python
"""Responder-based engine for kennethreitz.org."""
|
|
|
|
import responder
|
|
|
|
from tuftecms.core.markdown import render_markdown_file
|
|
from tuftecms.core.cache import (
|
|
get_blog_cache,
|
|
clear_all_caches,
|
|
get_sidenotes_cache,
|
|
get_outlines_cache,
|
|
get_quotes_cache,
|
|
get_connections_cache,
|
|
get_terms_cache,
|
|
get_themes_cache,
|
|
)
|
|
from tuftecms.utils.content import (
|
|
get_directory_structure,
|
|
extract_intelligent_date,
|
|
generate_folder_icon,
|
|
get_cached_markdown_title,
|
|
)
|
|
from tuftecms.utils.svg_icons import generate_unique_svg_icon
|
|
from tuftecms.blueprints.content import extract_exif_data
|
|
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
import hashlib
|
|
import html
|
|
import io
|
|
import logging
|
|
import random
|
|
import re
|
|
import textwrap
|
|
import os
|
|
|
|
DATA_DIR = Path("data")
|
|
|
|
# --- Search index (built once at startup) ---
|
|
_search_index = []
|
|
|
|
|
|
def _build_search_index():
|
|
"""Scan all markdown files once and build a cached search index."""
|
|
global _search_index
|
|
index = []
|
|
|
|
for md_file in DATA_DIR.rglob("*.md"):
|
|
if md_file.name == "index.md":
|
|
url = "/" + str(md_file.parent.relative_to(DATA_DIR))
|
|
else:
|
|
url = "/" + str(md_file.relative_to(DATA_DIR).with_suffix(""))
|
|
|
|
try:
|
|
raw = md_file.read_text()
|
|
except (OSError, UnicodeDecodeError):
|
|
continue
|
|
|
|
# Extract title from first heading
|
|
title = md_file.stem.replace("-", " ").replace("_", " ").title()
|
|
for line in raw.split("\n"):
|
|
line_stripped = line.strip()
|
|
if line_stripped.startswith("# "):
|
|
title = line_stripped[2:].strip()
|
|
break
|
|
|
|
# Determine section from path
|
|
parts = md_file.relative_to(DATA_DIR).parts
|
|
section = parts[0] if len(parts) > 1 else ""
|
|
|
|
# Generate icon
|
|
if md_file.name == "index.md":
|
|
icon = generate_folder_icon(title, size=18)
|
|
else:
|
|
icon = generate_unique_svg_icon(title, size=18)
|
|
|
|
index.append({
|
|
"title": title,
|
|
"url": url,
|
|
"raw_text": raw.lower(),
|
|
"raw": raw,
|
|
"section": section,
|
|
"icon": icon,
|
|
})
|
|
|
|
_search_index = index
|
|
|
|
|
|
api = responder.API(
|
|
templates_dir="tuftecms/templates",
|
|
static_dir="tuftecms/static",
|
|
static_route="/static",
|
|
enable_logging=True,
|
|
title="kennethreitz.org",
|
|
description="API for kennethreitz.org",
|
|
)
|
|
|
|
# --- Rate limiting ---
|
|
from responder.ext.ratelimit import RateLimiter
|
|
|
|
RateLimiter(requests=600, period=60).install(api)
|
|
|
|
# Suppress access logging for static assets.
|
|
class _StaticFilter(logging.Filter):
|
|
def filter(self, record):
|
|
msg = record.getMessage()
|
|
return "/static/" not in msg and not msg.endswith((".js", ".css", ".ico", ".png", ".woff2"))
|
|
|
|
logging.getLogger("responder.access").addFilter(_StaticFilter())
|
|
logging.getLogger("fontTools.subset").setLevel(logging.WARNING)
|
|
|
|
# --- Markdown render cache ---
|
|
_render_cache = {}
|
|
|
|
|
|
def _cached_render(file_path):
|
|
"""Cache rendered markdown by file path."""
|
|
key = str(file_path)
|
|
if key not in _render_cache:
|
|
_render_cache[key] = render_markdown_file(file_path)
|
|
return _render_cache[key]
|
|
|
|
|
|
# Custom template filters
|
|
api.templates.context["current_year"] = datetime.now().year
|
|
|
|
|
|
def strftime_filter(value, fmt="%B %d, %Y"):
|
|
if value == "now":
|
|
return datetime.now().strftime(fmt)
|
|
if hasattr(value, "strftime"):
|
|
return value.strftime(fmt)
|
|
return value
|
|
|
|
|
|
api.templates._env.filters["strftime"] = strftime_filter
|
|
|
|
|
|
class FakeConfig(dict):
|
|
"""Minimal config stand-in for Flask template compatibility."""
|
|
def get(self, key, default=None):
|
|
return os.environ.get(key, default)
|
|
|
|
|
|
class RequestWrapper:
|
|
"""Wraps Responder request to provide Flask-like interface for templates."""
|
|
def __init__(self, req, path):
|
|
self._req = req
|
|
self.path = path
|
|
self.environ = {}
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self._req, name)
|
|
|
|
|
|
_config = FakeConfig()
|
|
|
|
try:
|
|
from weasyprint import HTML
|
|
_pdf_available = True
|
|
except (ImportError, OSError):
|
|
_pdf_available = False
|
|
|
|
|
|
def render(template, req, path="/", **kwargs):
|
|
"""Render a template with common context."""
|
|
kwargs.setdefault("current_year", datetime.now().year)
|
|
kwargs.setdefault("current_path", path)
|
|
kwargs["request"] = RequestWrapper(req, path)
|
|
kwargs["config"] = _config
|
|
kwargs.setdefault("pdf_available", _pdf_available)
|
|
return api.templates.render(template, **kwargs)
|
|
|
|
|
|
# --- Cache for OG images ---
|
|
_og_image_cache = {}
|
|
_OG_CACHE_MAX = 256
|
|
|
|
|
|
# --- Bot detection ---
|
|
|
|
_BOT_PATTERNS = re.compile(
|
|
r"(?i)(googlebot|bingbot|slurp|duckduckbot|baiduspider|yandexbot|sogou|"
|
|
r"exabot|facebookexternalhit|facebot|ia_archiver|alexa|msnbot|"
|
|
r"semrushbot|ahrefsbot|dotbot|petalbot|mj12bot|bytespider|"
|
|
r"gptbot|chatgpt|claudebot|anthropic|ccbot|commoncrawl|"
|
|
r"scrapy|python-requests|httpx|curl|wget|go-http-client|"
|
|
r"applebot|twitterbot|linkedinbot|whatsapp|telegrambot|"
|
|
r"dataprovider|censys|zgrab|masscan|nuclei|httpie)"
|
|
)
|
|
|
|
|
|
def _detect_bot(req):
|
|
"""Detect and log bot/scraper requests. Returns bot name or None."""
|
|
ua = req.headers.get("User-Agent", "")
|
|
if not ua:
|
|
return "unknown (no user-agent)"
|
|
m = _BOT_PATTERNS.search(ua)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
|
|
# --- Routes ---
|
|
# IMPORTANT: All specific routes must be defined BEFORE the catch-all route.
|
|
|
|
|
|
@api.route("/")
|
|
async def homepage(req, resp):
|
|
blog_data = get_blog_cache()
|
|
recent_posts = blog_data.get("posts", [])[:5]
|
|
resp.html = render("homepage.html", req, "/",
|
|
title="Home",
|
|
recent_posts=recent_posts,
|
|
)
|
|
|
|
|
|
@api.route("/health")
|
|
async def health(req, resp):
|
|
resp.media = {"status": "healthy", "timestamp": datetime.now().isoformat()}
|
|
|
|
|
|
@api.route("/random")
|
|
async def random_post(req, resp):
|
|
blog_data = get_blog_cache()
|
|
posts = blog_data.get("posts", [])
|
|
if posts:
|
|
chosen = random.choice(posts)
|
|
api.log.info("Redirecting to random post: %s", chosen["url"])
|
|
resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
|
|
api.redirect(resp, chosen["url"])
|
|
else:
|
|
resp.status_code = 404
|
|
|
|
|
|
@api.route("/search")
|
|
async def search_page(req, resp):
|
|
resp.html = render("search.html", req, "/search",
|
|
title="Search",
|
|
)
|
|
|
|
|
|
# --- Feed routes ---
|
|
|
|
|
|
@api.route("/robots.txt")
|
|
async def robots_txt(req, resp):
|
|
bot = _detect_bot(req)
|
|
ua = req.headers.get("User-Agent", "no user-agent")
|
|
api.log.info("Bot detected: %s (%s)", bot or "unknown", ua)
|
|
robots_content = """# Robots.txt for kennethreitz.org
|
|
# Welcome, friendly crawlers!
|
|
|
|
User-agent: *
|
|
Allow: /
|
|
|
|
# Sitemap location
|
|
Sitemap: https://kennethreitz.org/sitemap.xml
|
|
|
|
# Crawl-delay suggestion (be gentle)
|
|
Crawl-delay: 1
|
|
|
|
# Disallow admin/internal paths (none currently, but good practice)
|
|
# Disallow: /admin/"""
|
|
resp.text = robots_content.strip()
|
|
resp.headers["Content-Type"] = "text/plain"
|
|
|
|
|
|
@api.route("/sitemap")
|
|
async def sitemap_page(req, resp):
|
|
blog_data = get_blog_cache()
|
|
posts = blog_data.get("posts", [])
|
|
|
|
sitemap_data = {
|
|
"homepage": [{"url": "/", "title": "Home", "modified": None}],
|
|
"directory": [
|
|
{"url": "/archive", "title": "Archive", "modified": None},
|
|
{"url": "/sidenotes", "title": "Sidenotes", "modified": None},
|
|
{"url": "/outlines", "title": "Outlines", "modified": None},
|
|
{"url": "/quotes", "title": "Quotes", "modified": None},
|
|
{"url": "/connections", "title": "Connections", "modified": None},
|
|
{"url": "/terms", "title": "Terms", "modified": None},
|
|
{"url": "/graph", "title": "Graph", "modified": None},
|
|
{"url": "/search", "title": "Search", "modified": None},
|
|
],
|
|
"article": [
|
|
{"url": post["url"], "title": post["title"], "modified": post["pub_date"]}
|
|
for post in posts
|
|
],
|
|
}
|
|
|
|
total_items = (
|
|
len(sitemap_data["homepage"])
|
|
+ len(sitemap_data["directory"])
|
|
+ len(sitemap_data["article"])
|
|
)
|
|
|
|
resp.html = render("sitemap.html", req, "/sitemap",
|
|
sitemap_data=sitemap_data,
|
|
total_items=total_items,
|
|
title="Sitemap",
|
|
)
|
|
|
|
|
|
@api.route("/sitemap.xml")
|
|
async def sitemap_xml(req, resp):
|
|
blog_data = get_blog_cache()
|
|
posts = blog_data.get("posts", [])
|
|
|
|
urls = []
|
|
|
|
# Add homepage
|
|
urls.append({
|
|
"url": "https://kennethreitz.org/",
|
|
"lastmod": datetime.now().strftime("%Y-%m-%d"),
|
|
"priority": "1.0",
|
|
})
|
|
|
|
# Add static pages
|
|
static_pages = [
|
|
"/archive",
|
|
"/sidenotes",
|
|
"/outlines",
|
|
"/quotes",
|
|
"/connections",
|
|
"/terms",
|
|
"/graph",
|
|
"/search",
|
|
"/docs",
|
|
"/docs/getting-started",
|
|
"/docs/content-structure",
|
|
"/docs/sidenotes",
|
|
"/docs/customization",
|
|
"/docs/deployment",
|
|
]
|
|
|
|
for page in static_pages:
|
|
urls.append({
|
|
"url": f"https://kennethreitz.org{page}",
|
|
"lastmod": datetime.now().strftime("%Y-%m-%d"),
|
|
"priority": "0.8",
|
|
})
|
|
|
|
# Add blog posts
|
|
for post in posts:
|
|
urls.append({
|
|
"url": f"https://kennethreitz.org{post['url']}",
|
|
"lastmod": post["date_str"],
|
|
"priority": "0.9",
|
|
})
|
|
|
|
# Generate XML
|
|
xml_lines = ['<?xml version="1.0" encoding="UTF-8"?>']
|
|
xml_lines.append('<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">')
|
|
|
|
for url_data in urls:
|
|
xml_lines.append(" <url>")
|
|
xml_lines.append(f" <loc>{url_data['url']}</loc>")
|
|
xml_lines.append(f" <lastmod>{url_data['lastmod']}</lastmod>")
|
|
xml_lines.append(f" <priority>{url_data['priority']}</priority>")
|
|
xml_lines.append(" </url>")
|
|
|
|
xml_lines.append("</urlset>")
|
|
xml_content = "\n".join(xml_lines)
|
|
|
|
resp.text = xml_content
|
|
resp.headers["Content-Type"] = "application/xml"
|
|
|
|
|
|
@api.route("/feed.xml")
|
|
async def feed_xml(req, resp):
|
|
await _rss_feed(req, resp)
|
|
|
|
|
|
@api.route("/rss.xml")
|
|
async def rss_xml(req, resp):
|
|
await _rss_feed(req, resp)
|
|
|
|
|
|
async def _rss_feed(req, resp):
|
|
"""Generate RSS feed."""
|
|
blog_data = get_blog_cache()
|
|
posts = blog_data.get("posts", [])
|
|
|
|
recent_posts = posts[:20]
|
|
|
|
rss_lines = ['<?xml version="1.0" encoding="UTF-8"?>']
|
|
rss_lines.append('<rss version="2.0">')
|
|
rss_lines.append(" <channel>")
|
|
rss_lines.append(" <title>Kenneth Reitz</title>")
|
|
rss_lines.append(" <link>https://kennethreitz.org</link>")
|
|
rss_lines.append(
|
|
" <description>Creator of Requests, Pipenv, and other tools. Writing about technology, consciousness, and human-centered design.</description>"
|
|
)
|
|
rss_lines.append(" <language>en-us</language>")
|
|
rss_lines.append(
|
|
f' <lastBuildDate>{datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")}</lastBuildDate>'
|
|
)
|
|
|
|
for post in recent_posts:
|
|
rss_lines.append(" <item>")
|
|
rss_lines.append(f' <title>{html.escape(post["title"])}</title>')
|
|
rss_lines.append(
|
|
f' <link>https://kennethreitz.org{post["url"]}</link>'
|
|
)
|
|
rss_lines.append(
|
|
f' <guid>https://kennethreitz.org{post["url"]}</guid>'
|
|
)
|
|
rss_lines.append(
|
|
f' <description>{html.escape(post.get("description", post.get("excerpt", "")))}</description>'
|
|
)
|
|
|
|
pub_date = post["pub_date"]
|
|
rss_date = pub_date.strftime("%a, %d %b %Y %H:%M:%S %z") if pub_date else ""
|
|
if rss_date:
|
|
rss_lines.append(f" <pubDate>{rss_date}</pubDate>")
|
|
|
|
rss_lines.append(" </item>")
|
|
|
|
rss_lines.append(" </channel>")
|
|
rss_lines.append("</rss>")
|
|
|
|
rss_content = "\n".join(rss_lines)
|
|
|
|
resp.text = rss_content
|
|
resp.headers["Content-Type"] = "application/xml"
|
|
|
|
|
|
# --- OG image route ---
|
|
|
|
|
|
@api.route("/og-image/{path:path}.png")
|
|
async def og_image(req, resp, *, path):
|
|
"""Generate a dynamic Open Graph image for a post."""
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
api.log.info("Generating OG image for /%s", path)
|
|
|
|
# Check cache
|
|
cache_key = path
|
|
if cache_key in _og_image_cache:
|
|
resp.content = _og_image_cache[cache_key]
|
|
resp.headers["Content-Type"] = "image/png"
|
|
resp.headers["Cache-Control"] = "public, max-age=86400"
|
|
return
|
|
|
|
# Validate path stays within data directory
|
|
file_path = (DATA_DIR / f"{path}.md").resolve()
|
|
if not str(file_path).startswith(str(DATA_DIR.resolve())):
|
|
resp.status_code = 403
|
|
return
|
|
|
|
# Resolve the post title from the markdown file
|
|
title = path.split("/")[-1].replace("-", " ").replace("_", " ").title()
|
|
subtitle = None
|
|
|
|
if file_path.exists():
|
|
content = file_path.read_text()
|
|
for line in content.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("# "):
|
|
title = line[2:].strip()
|
|
break
|
|
for line in content.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("*") and line.endswith("*") and len(line) < 60:
|
|
subtitle = line.strip("*")
|
|
break
|
|
|
|
# Image dimensions (standard OG)
|
|
width, height = 1200, 630
|
|
|
|
# Create image with Tufte cream background
|
|
img = Image.new("RGB", (width, height), "#fffff8")
|
|
draw = ImageDraw.Draw(img)
|
|
|
|
# Draw subtle bottom gradient
|
|
for y in range(height - 80, height):
|
|
alpha = (y - (height - 80)) / 80
|
|
r = int(255 * (1 - alpha) + 245 * alpha)
|
|
g = int(255 * (1 - alpha) + 245 * alpha)
|
|
b = int(248 * (1 - alpha) + 232 * alpha)
|
|
draw.line([(0, y), (width, y)], fill=(r, g, b))
|
|
|
|
# Load fonts (bundled et-book)
|
|
font_dir = Path(__file__).parent / "tuftecms" / "static" / "tufte" / "et-book"
|
|
try:
|
|
font_italic = ImageFont.truetype(
|
|
str(font_dir / "et-book-display-italic-old-style-figures" / "et-book-display-italic-old-style-figures.ttf"),
|
|
62,
|
|
)
|
|
font_roman = ImageFont.truetype(
|
|
str(font_dir / "et-book-roman-line-figures" / "et-book-roman-line-figures.ttf"),
|
|
26,
|
|
)
|
|
font_small = ImageFont.truetype(
|
|
str(font_dir / "et-book-roman-line-figures" / "et-book-roman-line-figures.ttf"),
|
|
22,
|
|
)
|
|
except (OSError, IOError):
|
|
font_italic = ImageFont.load_default()
|
|
font_roman = ImageFont.load_default()
|
|
font_small = ImageFont.load_default()
|
|
|
|
# Draw accent line
|
|
draw.rectangle([80, 180, 200, 184], fill="#333333")
|
|
|
|
# Word-wrap and draw title
|
|
max_chars = 28 if len(title) > 28 else 40
|
|
wrapped = textwrap.wrap(title, width=max_chars)
|
|
y_pos = 210
|
|
for line in wrapped[:3]:
|
|
draw.text((80, y_pos), line, font=font_italic, fill="#111111")
|
|
y_pos += 72
|
|
|
|
# Draw subtitle/date if available
|
|
if subtitle:
|
|
draw.text((80, y_pos + 20), subtitle, font=font_roman, fill="#666666")
|
|
|
|
# Draw separator line
|
|
draw.rectangle([80, height - 110, width - 80, height - 108], fill="#dddddd")
|
|
|
|
# Draw site URL
|
|
draw.text((80, height - 85), "kennethreitz.org", font=font_small, fill="#999999")
|
|
|
|
# Draw author name on right
|
|
author_text = "Kenneth Reitz"
|
|
bbox = draw.textbbox((0, 0), author_text, font=font_small)
|
|
author_width = bbox[2] - bbox[0]
|
|
draw.text((width - 80 - author_width, height - 85), author_text, font=font_small, fill="#999999")
|
|
|
|
# Draw decorative circles
|
|
cx, cy = 1060, 300
|
|
for r_val, c in [(50, "#dddddd"), (35, "#cccccc"), (20, "#bbbbbb")]:
|
|
draw.ellipse([cx - r_val, cy - r_val, cx + r_val, cy + r_val], outline=c, width=2)
|
|
draw.ellipse([cx - 5, cy - 5, cx + 5, cy + 5], fill="#999999")
|
|
|
|
# Export to PNG bytes
|
|
buf = io.BytesIO()
|
|
img.save(buf, format="PNG", optimize=True)
|
|
png_bytes = buf.getvalue()
|
|
|
|
# Cache it (bounded)
|
|
if len(_og_image_cache) >= _OG_CACHE_MAX:
|
|
_og_image_cache.pop(next(iter(_og_image_cache)))
|
|
_og_image_cache[cache_key] = png_bytes
|
|
|
|
resp.content = png_bytes
|
|
resp.headers["Content-Type"] = "image/png"
|
|
resp.headers["Cache-Control"] = "public, max-age=86400"
|
|
|
|
|
|
# --- Archive routes ---
|
|
|
|
|
|
@api.route("/archive")
|
|
async def archive(req, resp):
|
|
blog_data = get_blog_cache()
|
|
posts = blog_data.get("posts", [])
|
|
|
|
# Group posts by year
|
|
grouped_posts = defaultdict(list)
|
|
for post in posts:
|
|
year = post["pub_date"].year
|
|
grouped_posts[year].append(post)
|
|
|
|
# Sort years in descending order
|
|
grouped_posts = dict(sorted(grouped_posts.items(), reverse=True))
|
|
|
|
resp.html = render("archive.html", req, "/archive",
|
|
posts=posts,
|
|
grouped_posts=grouped_posts,
|
|
archive_title="Complete Archive",
|
|
title="Archive",
|
|
)
|
|
|
|
|
|
@api.route("/archive/by-length")
|
|
async def archive_by_length(req, resp):
|
|
blog_data = get_blog_cache()
|
|
posts = blog_data.get("posts", [])
|
|
|
|
# Calculate reading time (average 200 words per minute)
|
|
for post in posts:
|
|
word_count = post.get("word_count", 0)
|
|
post["reading_time"] = max(1, round(word_count / 200))
|
|
|
|
# Group by reading time ranges
|
|
quick_reads = [p for p in posts if p["reading_time"] <= 3]
|
|
short_reads = [p for p in posts if 4 <= p["reading_time"] <= 7]
|
|
medium_reads = [p for p in posts if 8 <= p["reading_time"] <= 15]
|
|
long_reads = [p for p in posts if p["reading_time"] > 15]
|
|
|
|
# Sort each group by word count descending
|
|
quick_reads.sort(key=lambda x: x.get("word_count", 0), reverse=True)
|
|
short_reads.sort(key=lambda x: x.get("word_count", 0), reverse=True)
|
|
medium_reads.sort(key=lambda x: x.get("word_count", 0), reverse=True)
|
|
long_reads.sort(key=lambda x: x.get("word_count", 0), reverse=True)
|
|
|
|
grouped_posts = {}
|
|
if quick_reads:
|
|
grouped_posts["Quick Reads (1-3 min)"] = quick_reads
|
|
if short_reads:
|
|
grouped_posts["Short Reads (4-7 min)"] = short_reads
|
|
if medium_reads:
|
|
grouped_posts["Medium Reads (8-15 min)"] = medium_reads
|
|
if long_reads:
|
|
grouped_posts["Long Reads (15+ min)"] = long_reads
|
|
|
|
resp.html = render("archive-by-length.html", req, "/archive/by-length",
|
|
posts=posts,
|
|
grouped_posts=grouped_posts,
|
|
archive_title="By Reading Time",
|
|
title="Archive by Reading Time",
|
|
)
|
|
|
|
|
|
@api.route("/archive/sidenotes")
|
|
async def sidenotes(req, resp):
|
|
cache_data = get_sidenotes_cache()
|
|
blog_data = get_blog_cache()
|
|
|
|
# Create lookup for post metadata
|
|
post_lookup = {
|
|
f"/essays/{post['path'].split('/')[-1]}": post
|
|
for post in blog_data.get("posts", [])
|
|
}
|
|
|
|
# Convert cache format to template format
|
|
articles = []
|
|
total_sidenotes = 0
|
|
|
|
for _, sidenotes_list in cache_data.get("sidenotes", {}).items():
|
|
if sidenotes_list:
|
|
first_sidenote = sidenotes_list[0]
|
|
article_title = first_sidenote.get("title", "Unknown")
|
|
article_url = first_sidenote.get("url", "#")
|
|
|
|
post_data = post_lookup.get(article_url, {})
|
|
|
|
articles.append({
|
|
"title": article_title,
|
|
"url": article_url,
|
|
"sidenotes": sidenotes_list,
|
|
"category": "essays",
|
|
"date": post_data.get("pub_date"),
|
|
"unique_icon": post_data.get("unique_icon"),
|
|
})
|
|
total_sidenotes += len(sidenotes_list)
|
|
|
|
articles.sort(key=lambda x: x.get("date") or datetime.min, reverse=True)
|
|
|
|
resp.html = render("sidenotes.html", req, "/archive/sidenotes",
|
|
articles=articles,
|
|
total_count=total_sidenotes,
|
|
title="Sidenotes",
|
|
)
|
|
|
|
|
|
@api.route("/archive/outlines")
|
|
async def outlines(req, resp):
|
|
cache_data = get_outlines_cache()
|
|
blog_data = get_blog_cache()
|
|
|
|
post_lookup = {
|
|
f"/essays/{post['path'].split('/')[-1]}": post
|
|
for post in blog_data.get("posts", [])
|
|
}
|
|
|
|
articles = []
|
|
total_headings = 0
|
|
|
|
for _, headings_list in cache_data.get("outlines", {}).items():
|
|
if headings_list:
|
|
first_heading = headings_list[0]
|
|
article_title = first_heading.get("title", "Unknown")
|
|
article_url = first_heading.get("url", "#")
|
|
|
|
post_data = post_lookup.get(article_url, {})
|
|
|
|
articles.append({
|
|
"title": article_title,
|
|
"url": article_url,
|
|
"headings": headings_list,
|
|
"category": "essays",
|
|
"date": post_data.get("pub_date"),
|
|
"unique_icon": post_data.get("unique_icon"),
|
|
})
|
|
total_headings += len(headings_list)
|
|
|
|
articles.sort(key=lambda x: x.get("date") or datetime.min, reverse=True)
|
|
|
|
resp.html = render("outlines.html", req, "/archive/outlines",
|
|
articles=articles,
|
|
total_count=total_headings,
|
|
title="Outlines",
|
|
)
|
|
|
|
|
|
@api.route("/archive/quotes")
|
|
async def quotes(req, resp):
|
|
cache_data = get_quotes_cache()
|
|
blog_data = get_blog_cache()
|
|
|
|
post_lookup = {
|
|
f"/essays/{post['path'].split('/')[-1]}": post
|
|
for post in blog_data.get("posts", [])
|
|
}
|
|
|
|
articles = []
|
|
total_quotes = 0
|
|
|
|
for _, quotes_list in cache_data.get("quotes", {}).items():
|
|
if quotes_list:
|
|
first_quote = quotes_list[0]
|
|
article_title = first_quote.get("title", "Unknown")
|
|
article_url = first_quote.get("url", "#")
|
|
|
|
post_data = post_lookup.get(article_url, {})
|
|
|
|
articles.append({
|
|
"title": article_title,
|
|
"url": article_url,
|
|
"quotes": quotes_list,
|
|
"category": "essays",
|
|
"date": post_data.get("pub_date"),
|
|
"unique_icon": post_data.get("unique_icon"),
|
|
})
|
|
total_quotes += len(quotes_list)
|
|
|
|
articles.sort(key=lambda x: x.get("date") or datetime.min, reverse=True)
|
|
|
|
resp.html = render("quotes.html", req, "/archive/quotes",
|
|
articles=articles,
|
|
total_count=total_quotes,
|
|
title="Quotes",
|
|
)
|
|
|
|
|
|
@api.route("/archive/connections")
|
|
async def connections(req, resp):
|
|
cache_data = get_connections_cache()
|
|
blog_data = get_blog_cache()
|
|
|
|
post_lookup = {
|
|
f"/essays/{post['path'].split('/')[-1]}": post
|
|
for post in blog_data.get("posts", [])
|
|
}
|
|
|
|
articles = []
|
|
outgoing_data = cache_data.get("outgoing", {})
|
|
incoming_data = cache_data.get("incoming", {})
|
|
stats = cache_data.get("stats", {})
|
|
|
|
for file_path, connections_list in outgoing_data.items():
|
|
if connections_list:
|
|
file_stem = Path(file_path).stem
|
|
article_url = f"/essays/{file_stem}"
|
|
|
|
post_data = post_lookup.get(article_url, {})
|
|
article_title = post_data.get("title", file_stem.replace("-", " ").title())
|
|
|
|
incoming_connections = incoming_data.get(article_url, [])
|
|
|
|
formatted_outgoing = []
|
|
for conn in connections_list:
|
|
formatted_outgoing.append(
|
|
{"link_text": conn["text"], "target_url": conn["url"]}
|
|
)
|
|
|
|
formatted_incoming = []
|
|
for conn in incoming_connections:
|
|
formatted_incoming.append({
|
|
"link_text": conn["text"],
|
|
"source_url": conn.get("source_url", "#"),
|
|
"context": conn.get("context", ""),
|
|
})
|
|
|
|
articles.append({
|
|
"title": article_title,
|
|
"url": article_url,
|
|
"outgoing_connections": formatted_outgoing,
|
|
"incoming_connections": formatted_incoming,
|
|
"category": "essays",
|
|
"date": post_data.get("pub_date"),
|
|
"unique_icon": post_data.get("unique_icon"),
|
|
})
|
|
|
|
articles.sort(key=lambda x: x.get("date") or datetime.min, reverse=True)
|
|
|
|
resp.html = render("connections.html", req, "/archive/connections",
|
|
articles=articles,
|
|
total_outgoing=stats.get("total_outgoing", 0),
|
|
total_incoming=stats.get("total_incoming", 0),
|
|
title="Connections",
|
|
)
|
|
|
|
|
|
@api.route("/archive/terms")
|
|
async def terms(req, resp):
|
|
terms_data = get_terms_cache()
|
|
stats = terms_data.get("stats", {})
|
|
resp.html = render("terms.html", req, "/archive/terms",
|
|
terms=terms_data.get("terms", {}),
|
|
total_terms=stats.get("total_terms", 0),
|
|
total_occurrences=stats.get("total_references", 0),
|
|
title="Index of Terms",
|
|
)
|
|
|
|
|
|
@api.route("/archive/themes")
|
|
async def themes_archive(req, resp):
|
|
themes_data = get_themes_cache()
|
|
themes = themes_data.get("themes", {})
|
|
stats = themes_data.get("stats", {})
|
|
|
|
sorted_themes = sorted(themes.items(), key=lambda x: x[0])
|
|
|
|
resp.html = render("themes.html", req, "/archive/themes",
|
|
themes=dict(sorted_themes),
|
|
total_themes=stats.get("total_themes", 0),
|
|
total_occurrences=stats.get("total_occurrences", 0),
|
|
title="Themes",
|
|
)
|
|
|
|
|
|
@api.route("/archive/graph")
|
|
async def graph(req, resp):
|
|
resp.html = render("graph.html", req, "/archive/graph",
|
|
title="Knowledge Graph",
|
|
)
|
|
|
|
|
|
@api.route("/archive/graph/data")
|
|
async def graph_data(req, resp):
|
|
connections_data = get_connections_cache()
|
|
blog_data = get_blog_cache()
|
|
|
|
post_lookup = {
|
|
f"/essays/{post['path'].split('/')[-1]}": post
|
|
for post in blog_data.get("posts", [])
|
|
}
|
|
|
|
nodes = []
|
|
edges = []
|
|
node_ids = set()
|
|
|
|
for source, targets in connections_data.get("outgoing", {}).items():
|
|
source_id = source.replace("data/", "").replace(".md", "")
|
|
source_url = (
|
|
f"/essays/{source_id.split('/')[-1]}"
|
|
if "essays" in source_id
|
|
else f"/{source_id}"
|
|
)
|
|
|
|
if source_id not in node_ids:
|
|
post_data = post_lookup.get(source_url, {})
|
|
|
|
nodes.append({
|
|
"id": source_id,
|
|
"title": post_data.get(
|
|
"title", source_id.split("/")[-1].replace("-", " ").title()
|
|
),
|
|
"label": post_data.get(
|
|
"title", source_id.split("/")[-1].replace("-", " ").title()
|
|
),
|
|
"url": source_url,
|
|
"category": source_id.split("/")[0] if "/" in source_id else "root",
|
|
"group": source_id.split("/")[0] if "/" in source_id else "root",
|
|
})
|
|
node_ids.add(source_id)
|
|
|
|
for target_info in targets:
|
|
target_url = target_info["url"].strip("/")
|
|
if target_url:
|
|
target_id = target_url.replace("/", "_")
|
|
|
|
if target_id not in node_ids:
|
|
target_post_data = post_lookup.get(f"/{target_url}", {})
|
|
|
|
nodes.append({
|
|
"id": target_id,
|
|
"title": target_post_data.get(
|
|
"title",
|
|
target_url.split("/")[-1].replace("-", " ").title(),
|
|
),
|
|
"label": target_post_data.get(
|
|
"title",
|
|
target_url.split("/")[-1].replace("-", " ").title(),
|
|
),
|
|
"url": f"/{target_url}",
|
|
"category": (
|
|
target_url.split("/")[0]
|
|
if "/" in target_url
|
|
else "root"
|
|
),
|
|
"group": (
|
|
target_url.split("/")[0]
|
|
if "/" in target_url
|
|
else "root"
|
|
),
|
|
})
|
|
node_ids.add(target_id)
|
|
|
|
edges.append({
|
|
"source": source_id,
|
|
"target": target_id,
|
|
"label": target_info["text"],
|
|
"link_text": target_info["text"],
|
|
})
|
|
|
|
resp.media = {"nodes": nodes, "edges": edges}
|
|
|
|
|
|
# --- Directory route ---
|
|
|
|
|
|
@api.route("/directory")
|
|
async def directory(req, resp):
|
|
items = get_directory_structure(DATA_DIR)
|
|
resp.html = render("directory.html", req, "/directory",
|
|
items=items,
|
|
current_path="/",
|
|
breadcrumb=[],
|
|
title="Directory",
|
|
)
|
|
|
|
|
|
# --- API routes ---
|
|
|
|
|
|
@api.route("/api/blog")
|
|
async def api_blog(req, resp):
|
|
blog_data = get_blog_cache()
|
|
posts = blog_data.get("posts", [])
|
|
|
|
posts_data = [
|
|
{
|
|
"title": post.get("title", ""),
|
|
"url": post.get("url", ""),
|
|
"unique_icon": post.get("unique_icon", ""),
|
|
}
|
|
for post in posts
|
|
]
|
|
|
|
resp.media = {"posts": posts_data, "total": len(posts_data)}
|
|
|
|
|
|
@api.route("/api/search")
|
|
async def api_search(req, resp):
|
|
query = req.params.get("q", "").strip()
|
|
api.log.info("Search query: %s", query)
|
|
|
|
if len(query) < 2:
|
|
resp.media = {
|
|
"query": query,
|
|
"results": [],
|
|
"error": "Query must be at least 2 characters long",
|
|
}
|
|
return
|
|
|
|
results = []
|
|
query_lower = query.lower()
|
|
|
|
for entry in _search_index:
|
|
score = 0
|
|
matches = []
|
|
|
|
# Search in title (highest weight)
|
|
if query_lower in entry["title"].lower():
|
|
score += 10
|
|
matches.append("title")
|
|
|
|
# Search in content (medium weight)
|
|
if query_lower in entry["raw_text"]:
|
|
score += 5
|
|
matches.append("content")
|
|
|
|
if score > 0:
|
|
snippet = ""
|
|
if "content" in matches:
|
|
query_index = entry["raw_text"].find(query_lower)
|
|
if query_index != -1:
|
|
raw = entry["raw"]
|
|
start = max(0, query_index - 100)
|
|
end = min(len(raw), query_index + len(query) + 100)
|
|
snippet = raw[start:end].strip()
|
|
if start > 0:
|
|
snippet = "..." + snippet
|
|
if end < len(raw):
|
|
snippet = snippet + "..."
|
|
|
|
results.append({
|
|
"title": entry["title"],
|
|
"url": entry["url"],
|
|
"snippet": snippet,
|
|
"section": entry["section"],
|
|
"score": score,
|
|
"matches": matches,
|
|
"unique_icon": entry["icon"],
|
|
})
|
|
|
|
results.sort(key=lambda x: x["score"], reverse=True)
|
|
results = results[:50]
|
|
|
|
resp.media = {"query": query, "results": results, "total": len(results)}
|
|
|
|
|
|
@api.route("/api/search/autocomplete")
|
|
async def api_search_autocomplete(req, resp):
|
|
query = req.params.get("q", "").strip()
|
|
|
|
if len(query) < 1:
|
|
resp.media = {"results": []}
|
|
return
|
|
|
|
query_lower = query.lower()
|
|
matches = []
|
|
|
|
for entry in _search_index:
|
|
if query_lower in entry["title"].lower():
|
|
matches.append({
|
|
"title": entry["title"],
|
|
"url": entry["url"],
|
|
"icon": entry["icon"],
|
|
})
|
|
if len(matches) >= 8:
|
|
break
|
|
|
|
resp.media = {"results": matches}
|
|
|
|
|
|
@api.route("/api/icon/{article_path:path}")
|
|
async def api_icon(req, resp, *, article_path):
|
|
"""Generate and return an SVG icon for a given article."""
|
|
title = article_path # fallback to path
|
|
is_folder = False
|
|
|
|
data_path = Path("data") / f"{article_path}.md"
|
|
if data_path.exists():
|
|
cached_title = get_cached_markdown_title(data_path)
|
|
if cached_title:
|
|
title = cached_title
|
|
else:
|
|
dir_path = Path("data") / article_path
|
|
if dir_path.is_dir():
|
|
index_file = dir_path / "index.md"
|
|
if index_file.exists():
|
|
is_folder = True
|
|
cached_title = get_cached_markdown_title(index_file)
|
|
if cached_title:
|
|
title = cached_title
|
|
else:
|
|
title = article_path.split("/")[-1].replace("-", " ").replace("_", " ").title()
|
|
else:
|
|
title = article_path.split("/")[-1].replace("-", " ").replace("_", " ").title()
|
|
|
|
if is_folder:
|
|
icon_data = generate_folder_icon(title, size=24)
|
|
else:
|
|
icon_data = generate_unique_svg_icon(title, size=24)
|
|
|
|
resp.media = {"success": True, "path": article_path, "icon": icon_data}
|
|
|
|
|
|
@api.route("/api/debug-cache")
|
|
async def api_debug_cache(req, resp):
|
|
cache_info = {
|
|
"blog": get_blog_cache()["stats"],
|
|
"sidenotes": get_sidenotes_cache()["stats"],
|
|
"outlines": get_outlines_cache()["stats"],
|
|
"quotes": get_quotes_cache()["stats"],
|
|
"connections": get_connections_cache()["stats"],
|
|
"terms": get_terms_cache()["stats"],
|
|
}
|
|
|
|
lru_cache_info = {
|
|
"blog_cache": {
|
|
"hits": get_blog_cache.cache_info().hits,
|
|
"misses": get_blog_cache.cache_info().misses,
|
|
"maxsize": get_blog_cache.cache_info().maxsize,
|
|
"currsize": get_blog_cache.cache_info().currsize,
|
|
},
|
|
"sidenotes_cache": {
|
|
"hits": get_sidenotes_cache.cache_info().hits,
|
|
"misses": get_sidenotes_cache.cache_info().misses,
|
|
"maxsize": get_sidenotes_cache.cache_info().maxsize,
|
|
"currsize": get_sidenotes_cache.cache_info().currsize,
|
|
},
|
|
"outlines_cache": {
|
|
"hits": get_outlines_cache.cache_info().hits,
|
|
"misses": get_outlines_cache.cache_info().misses,
|
|
"maxsize": get_outlines_cache.cache_info().maxsize,
|
|
"currsize": get_outlines_cache.cache_info().currsize,
|
|
},
|
|
"quotes_cache": {
|
|
"hits": get_quotes_cache.cache_info().hits,
|
|
"misses": get_quotes_cache.cache_info().misses,
|
|
"maxsize": get_quotes_cache.cache_info().maxsize,
|
|
"currsize": get_quotes_cache.cache_info().currsize,
|
|
},
|
|
"connections_cache": {
|
|
"hits": get_connections_cache.cache_info().hits,
|
|
"misses": get_connections_cache.cache_info().misses,
|
|
"maxsize": get_connections_cache.cache_info().maxsize,
|
|
"currsize": get_connections_cache.cache_info().currsize,
|
|
},
|
|
"terms_cache": {
|
|
"hits": get_terms_cache.cache_info().hits,
|
|
"misses": get_terms_cache.cache_info().misses,
|
|
"maxsize": get_terms_cache.cache_info().maxsize,
|
|
"currsize": get_terms_cache.cache_info().currsize,
|
|
},
|
|
}
|
|
|
|
resp.media = {
|
|
"status": "Cache data loaded successfully",
|
|
"cache_stats": cache_info,
|
|
"lru_cache_info": lru_cache_info,
|
|
}
|
|
|
|
|
|
@api.route("/api/directory-tree")
|
|
async def api_directory_tree(req, resp):
|
|
folders = []
|
|
files = []
|
|
|
|
try:
|
|
for item in sorted(DATA_DIR.iterdir()):
|
|
if item.name.startswith(".") or item.name in ["__pycache__", "node_modules"]:
|
|
continue
|
|
|
|
relative_path = item.relative_to(DATA_DIR)
|
|
url_path = "/" + str(relative_path)
|
|
|
|
if item.is_dir():
|
|
display_name = item.name.replace("-", " ").replace("_", " ").title()
|
|
index_file = item / "index.md"
|
|
if index_file.exists():
|
|
title = get_cached_markdown_title(index_file)
|
|
if title:
|
|
display_name = title
|
|
|
|
icon_svg = generate_folder_icon(display_name, size=18)
|
|
folders.append({
|
|
"name": item.name,
|
|
"path": url_path,
|
|
"is_dir": True,
|
|
"icon": icon_svg,
|
|
})
|
|
elif item.suffix == ".md" and item.name != "index.md":
|
|
display_name = item.stem
|
|
title = get_cached_markdown_title(item)
|
|
if title:
|
|
display_name = title
|
|
|
|
icon_svg = generate_unique_svg_icon(display_name, size=18)
|
|
files.append({
|
|
"name": item.stem,
|
|
"path": url_path.replace(".md", ""),
|
|
"is_dir": False,
|
|
"icon": icon_svg,
|
|
})
|
|
except (PermissionError, OSError):
|
|
pass
|
|
|
|
items = folders + files
|
|
resp.media = {"items": items}
|
|
|
|
|
|
@api.route("/api/themes")
|
|
async def api_themes(req, resp):
|
|
themes_dir = DATA_DIR / "themes"
|
|
items = []
|
|
if themes_dir.is_dir():
|
|
for f in sorted(themes_dir.glob("*.md")):
|
|
if f.name == "index.md":
|
|
continue
|
|
title = get_cached_markdown_title(f) or f.stem.replace("-", " ").replace("_", " ").title()
|
|
icon = generate_unique_svg_icon(title, size=18)
|
|
items.append({"name": title, "path": f"/themes/{f.stem}", "icon": icon})
|
|
resp.media = {"themes": items}
|
|
|
|
|
|
@api.route("/api/schema")
|
|
async def api_schema(req, resp):
|
|
"""Serve auto-generated OpenAPI schema from registered routes."""
|
|
paths = {}
|
|
skip = {"catch_all", "serve_static", "serve_data_file", "api_schema", "api_docs"}
|
|
for r in api.router.routes:
|
|
fn = getattr(r, "endpoint", None)
|
|
route = getattr(r, "route", None)
|
|
if not fn or not route or fn.__name__ in skip:
|
|
continue
|
|
if not route.startswith("/api"):
|
|
continue
|
|
doc = (fn.__doc__ or fn.__name__.replace("_", " ").title()).split("\n")[0].strip()
|
|
params = []
|
|
for part in route.split("/"):
|
|
if part.startswith("{") and part.endswith("}"):
|
|
name = part.strip("{}").split(":")[0]
|
|
params.append({"name": name, "in": "path", "required": True, "schema": {"type": "string"}})
|
|
op = {"summary": doc, "responses": {"200": {"description": "OK"}}}
|
|
if params:
|
|
op["parameters"] = params
|
|
# Detect query params from docstring hints or known routes
|
|
if "search" in route and "q" not in [p["name"] for p in params]:
|
|
op.setdefault("parameters", []).append(
|
|
{"name": "q", "in": "query", "required": True, "schema": {"type": "string"}}
|
|
)
|
|
paths[route] = {"get": op}
|
|
resp.media = {
|
|
"openapi": "3.0.0",
|
|
"info": {"title": "kennethreitz.org", "version": "1.0"},
|
|
"paths": paths,
|
|
}
|
|
|
|
|
|
@api.route("/api")
|
|
async def api_docs(req, resp):
|
|
"""Serve interactive API documentation."""
|
|
resp.html = """<!DOCTYPE html>
|
|
<html><head>
|
|
<title>API — kennethreitz.org</title>
|
|
<meta charset="utf-8">
|
|
<link rel="stylesheet" href="https://unpkg.com/swagger-ui-dist/swagger-ui.css">
|
|
<script src="https://unpkg.com/swagger-ui-dist/swagger-ui-bundle.js"></script>
|
|
</head><body>
|
|
<div id="swagger-ui"></div>
|
|
<script>SwaggerUIBundle({url: "/api/schema", dom_id: "#swagger-ui", deepLinking: true, presets: [SwaggerUIBundle.presets.apis, SwaggerUIBundle.SwaggerUIStandalonePreset], layout: "BaseLayout"});</script>
|
|
</body></html>"""
|
|
|
|
|
|
@api.route("/api/oembed")
|
|
async def api_oembed(req, resp):
|
|
"""Proxy oEmbed discovery and fetch to avoid CORS issues."""
|
|
import json
|
|
import re
|
|
import urllib.request
|
|
|
|
url = req.params.get("url", "").strip()
|
|
if not url or not url.startswith("https://"):
|
|
resp.status_code = 400
|
|
resp.media = {"error": "Missing or invalid url parameter"}
|
|
return
|
|
|
|
try:
|
|
# Step 1: Discover oEmbed endpoint from the target page.
|
|
r = urllib.request.Request(url, headers={"User-Agent": "kennethreitz.org"})
|
|
res = urllib.request.urlopen(r, timeout=5)
|
|
page_html = res.read().decode("utf-8", errors="replace")
|
|
|
|
endpoint = None
|
|
for pattern in [
|
|
r'<link[^>]+type=["\']application/json\+oembed["\'][^>]+href=["\']([^"\']+)["\']',
|
|
r'<link[^>]+href=["\']([^"\']+)["\'][^>]+type=["\']application/json\+oembed["\']',
|
|
]:
|
|
match = re.search(pattern, page_html)
|
|
if match:
|
|
endpoint = match.group(1)
|
|
break
|
|
|
|
if not endpoint:
|
|
resp.status_code = 404
|
|
resp.media = {"error": "No oEmbed endpoint found"}
|
|
return
|
|
|
|
# Step 2: Fetch oEmbed data from the discovered endpoint.
|
|
r = urllib.request.Request(
|
|
endpoint, headers={"User-Agent": "kennethreitz.org"}
|
|
)
|
|
res = urllib.request.urlopen(r, timeout=5)
|
|
data = json.loads(res.read())
|
|
|
|
resp.media = data
|
|
except Exception:
|
|
resp.status_code = 502
|
|
resp.media = {"error": "oEmbed discovery failed"}
|
|
|
|
|
|
# --- Data file serving ---
|
|
|
|
|
|
@api.route("/data/{path:path}")
|
|
async def serve_data_file(req, resp, *, path):
|
|
"""Serve static files from the data directory."""
|
|
file_path = (DATA_DIR / path).resolve()
|
|
if not str(file_path).startswith(str(DATA_DIR.resolve())):
|
|
resp.status_code = 403
|
|
return
|
|
if file_path.exists() and file_path.is_file():
|
|
resp.file(str(file_path))
|
|
else:
|
|
resp.status_code = 404
|
|
|
|
|
|
# --- PDF export ---
|
|
|
|
|
|
def _generate_pdf(md_path):
|
|
"""Generate PDF bytes from a markdown file. Returns bytes or raises."""
|
|
from weasyprint import HTML
|
|
from weasyprint.text.fonts import FontConfiguration
|
|
|
|
content_data = render_markdown_file(md_path)
|
|
pdf_html = api.templates.render(
|
|
"pdf.html",
|
|
content=content_data["content"],
|
|
title=content_data["title"],
|
|
metadata=content_data.get("metadata", {}),
|
|
current_year=datetime.now().year,
|
|
reading_time=content_data.get("reading_time"),
|
|
word_count=content_data.get("word_count"),
|
|
unique_icon=content_data.get("unique_icon"),
|
|
)
|
|
font_config = FontConfiguration()
|
|
pdf_buffer = io.BytesIO()
|
|
HTML(string=pdf_html, base_url="https://kennethreitz.org/").write_pdf(
|
|
pdf_buffer, font_config=font_config
|
|
)
|
|
pdf_buffer.seek(0)
|
|
return pdf_buffer.read()
|
|
|
|
|
|
@api.route("/{path:path}.pdf")
|
|
async def serve_pdf(req, resp, *, path):
|
|
"""Generate and serve a PDF version of content."""
|
|
file_path = DATA_DIR / f"{path}.md"
|
|
if not file_path.exists():
|
|
resp.status_code = 404
|
|
return
|
|
|
|
try:
|
|
api.log.info("Generating PDF for /%s", path)
|
|
resp.content = _generate_pdf(file_path)
|
|
resp.headers["Content-Type"] = "application/pdf"
|
|
resp.headers["Content-Disposition"] = f'inline; filename="{path.split("/")[-1]}.pdf"'
|
|
except (ImportError, OSError):
|
|
resp.status_code = 503
|
|
resp.text = "PDF generation requires WeasyPrint"
|
|
|
|
|
|
# --- Legacy URL resolver ---
|
|
|
|
|
|
# Pre-built lookup tables for legacy URL resolution (built once at import time).
|
|
_legacy_dirs = {} # slug -> url
|
|
_legacy_files = {} # stripped_slug -> url
|
|
|
|
|
|
def _build_legacy_index():
|
|
"""Build lookup tables from the data directory for fast legacy URL resolution."""
|
|
for match in DATA_DIR.rglob("*/"):
|
|
if (match / "index.md").exists():
|
|
rel = match.relative_to(DATA_DIR)
|
|
_legacy_dirs[match.name] = f"/{rel}"
|
|
for match in DATA_DIR.rglob("*.md"):
|
|
if match.name == "index.md":
|
|
continue
|
|
stem = match.stem
|
|
stripped = re.sub(r"^\d{4}-\d{2}-", "", stem)
|
|
rel = match.relative_to(DATA_DIR)
|
|
url = f"/{rel.with_suffix('')}"
|
|
_legacy_files[stripped] = url
|
|
_legacy_files[stem] = url
|
|
|
|
|
|
_build_legacy_index()
|
|
|
|
|
|
def _resolve_legacy_url(path):
|
|
"""Try to find current content matching a legacy URL pattern.
|
|
|
|
Handles:
|
|
- Date-path URLs: essays/2013/01/27/slug -> essays/2013-01-slug
|
|
- Bare slugs: /tattoos -> /photography/tattoos, /slug -> /essays/...-slug
|
|
- Hyphen/underscore normalization
|
|
"""
|
|
# Pattern 1: /essays/YYYY/MM/DD/slug or /essays/YYYY/MM/slug
|
|
m = re.match(r"essays/(\d{4})/(\d{2})(?:/\d{2})?/(.+)$", path)
|
|
if m:
|
|
year, month, slug = m.groups()
|
|
normalized = slug.replace("-", "_")
|
|
candidate = DATA_DIR / "essays" / f"{year}-{month}-{normalized}.md"
|
|
if candidate.exists():
|
|
return f"/essays/{year}-{month}-{normalized}"
|
|
# Try fuzzy match on just the slug within that year
|
|
for f in (DATA_DIR / "essays").glob(f"{year}-{month}-*"):
|
|
if normalized in f.stem:
|
|
return f"/essays/{f.stem}"
|
|
|
|
# Pattern 2: bare slug — fast lookup in pre-built index
|
|
slug = path.strip("/").split("/")[-1]
|
|
normalized = slug.replace("-", "_")
|
|
|
|
# Check directories first
|
|
if slug in _legacy_dirs:
|
|
return _legacy_dirs[slug]
|
|
if normalized in _legacy_dirs:
|
|
return _legacy_dirs[normalized]
|
|
|
|
# Check files
|
|
if normalized in _legacy_files:
|
|
return _legacy_files[normalized]
|
|
if slug in _legacy_files:
|
|
return _legacy_files[slug]
|
|
|
|
return None
|
|
|
|
|
|
# --- Catch-all route (MUST be last) ---
|
|
|
|
|
|
@api.route("/static/{path:path}")
|
|
async def serve_static(req, resp, *, path):
|
|
"""Serve static files explicitly."""
|
|
static_path = Path("tuftecms/static") / path
|
|
if static_path.exists() and static_path.is_file():
|
|
resp.file(str(static_path))
|
|
else:
|
|
resp.status_code = 404
|
|
|
|
|
|
@api.route("/{path:path}")
|
|
async def catch_all(req, resp, *, path):
|
|
"""Main content route -- serves markdown files or directories."""
|
|
bot = _detect_bot(req)
|
|
if bot:
|
|
api.log.info("Bot detected: %s crawling /%s", bot, path)
|
|
file_path = DATA_DIR / f"{path}.md"
|
|
dir_path = DATA_DIR / path
|
|
index_path = dir_path / "index.md"
|
|
raw_path = DATA_DIR / path
|
|
|
|
# Serve raw markdown source
|
|
if path.endswith(".md"):
|
|
source_path = DATA_DIR / path
|
|
if source_path.exists():
|
|
resp.text = source_path.read_text()
|
|
resp.headers["Content-Type"] = "text/plain; charset=utf-8"
|
|
return
|
|
resp.status_code = 404
|
|
return
|
|
|
|
# Serve PDF export
|
|
if path.endswith(".pdf"):
|
|
md_path = DATA_DIR / f"{path[:-4]}.md"
|
|
if md_path.exists():
|
|
api.log.info("Generating PDF for /%s", path[:-4])
|
|
try:
|
|
resp.content = _generate_pdf(md_path)
|
|
resp.headers["Content-Type"] = "application/pdf"
|
|
resp.headers["Content-Disposition"] = f'inline; filename="{path.split("/")[-1]}"'
|
|
except (ImportError, OSError):
|
|
resp.status_code = 503
|
|
resp.text = "PDF generation requires WeasyPrint"
|
|
return
|
|
resp.status_code = 404
|
|
return
|
|
|
|
# Serve raw files (images, etc.) directly from data directory
|
|
if raw_path.exists() and raw_path.is_file():
|
|
resp.file(str(raw_path))
|
|
return
|
|
|
|
# Serve markdown file
|
|
if file_path.exists():
|
|
api.log.info("Serving content: /%s", path)
|
|
content_data = _cached_render(file_path)
|
|
|
|
# Detect themes for essays
|
|
article_themes = []
|
|
if path.startswith("essays/"):
|
|
themes_data = get_themes_cache().get("themes", {})
|
|
essay_url = f"/essays/{file_path.stem}"
|
|
for theme_name, theme_info in themes_data.items():
|
|
for article in theme_info.get("articles", []):
|
|
if article.get("url") == essay_url:
|
|
article_themes.append({"name": theme_name})
|
|
break
|
|
|
|
resp.html = render("post.html", req, f"/{path}",
|
|
content=content_data["content"],
|
|
title=content_data["title"],
|
|
metadata=content_data.get("metadata", {}),
|
|
github_file=f"data/{path}.md",
|
|
reading_time=content_data.get("reading_time"),
|
|
word_count=content_data.get("word_count"),
|
|
tags=content_data.get("tags", []),
|
|
series_posts=content_data.get("series_posts", []),
|
|
series_name=content_data.get("series_name"),
|
|
unique_icon=content_data.get("unique_icon"),
|
|
article_themes=article_themes,
|
|
)
|
|
return
|
|
|
|
# Serve directory with index.md
|
|
if dir_path.is_dir() and index_path.exists():
|
|
api.log.info("Serving directory: /%s", path)
|
|
content_data = _cached_render(index_path)
|
|
items = get_directory_structure(dir_path)
|
|
|
|
# Check for image gallery
|
|
image_extensions = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".svg"}
|
|
image_items = []
|
|
for item in items:
|
|
if not item["is_dir"]:
|
|
item_path = DATA_DIR / item["url_path"].lstrip("/")
|
|
if item_path.suffix.lower() in image_extensions:
|
|
item["is_image"] = True
|
|
item["static_path"] = item["url_path"]
|
|
item["is_data_file"] = True
|
|
if item_path.suffix.lower() in {".jpg", ".jpeg"}:
|
|
item["exif"] = extract_exif_data(item_path)
|
|
else:
|
|
item["exif"] = {}
|
|
image_items.append(item)
|
|
else:
|
|
item["is_image"] = False
|
|
else:
|
|
item["is_image"] = False
|
|
|
|
is_image_gallery = len(image_items) > 0
|
|
|
|
if is_image_gallery:
|
|
parts = path.split("/")
|
|
breadcrumb = []
|
|
current = ""
|
|
for part in parts:
|
|
current = f"{current}/{part}" if current else part
|
|
breadcrumb.append({
|
|
"name": part.replace("-", " ").replace("_", " ").title(),
|
|
"path": f"/{current}",
|
|
})
|
|
|
|
folder_icon = generate_folder_icon(content_data["title"], size=32)
|
|
|
|
resp.html = render("directory.html", req, f"/{path}",
|
|
items=items,
|
|
image_items=image_items,
|
|
is_image_gallery=True,
|
|
title=content_data["title"],
|
|
current_path=f"/{path}",
|
|
breadcrumb=breadcrumb,
|
|
current_year=datetime.now().year,
|
|
folder_icon=folder_icon,
|
|
index_content={"content": content_data["content"]},
|
|
content_position="top",
|
|
)
|
|
else:
|
|
resp.html = render("post.html", req, f"/{path}",
|
|
content=content_data["content"],
|
|
title=content_data["title"],
|
|
metadata=content_data.get("metadata", {}),
|
|
items=items,
|
|
current_path=f"/{path}",
|
|
github_file=f"data/{path}/index.md",
|
|
current_year=datetime.now().year,
|
|
reading_time=content_data.get("reading_time"),
|
|
word_count=content_data.get("word_count"),
|
|
unique_icon=content_data.get("unique_icon"),
|
|
)
|
|
return
|
|
|
|
# Serve directory listing (no index.md)
|
|
if dir_path.is_dir():
|
|
api.log.info("Serving directory listing: /%s", path)
|
|
items = get_directory_structure(dir_path)
|
|
title = path.split("/")[-1].replace("-", " ").replace("_", " ").title()
|
|
|
|
parts = path.split("/")
|
|
breadcrumb = []
|
|
current = ""
|
|
for part in parts:
|
|
current = f"{current}/{part}" if current else part
|
|
breadcrumb.append({
|
|
"name": part.replace("-", " ").replace("_", " ").title(),
|
|
"path": f"/{current}",
|
|
})
|
|
|
|
folder_icon = generate_folder_icon(title, size=32)
|
|
|
|
resp.html = render("directory.html", req, f"/{path}",
|
|
items=items,
|
|
title=title,
|
|
current_path=f"/{path}",
|
|
breadcrumb=breadcrumb,
|
|
current_year=datetime.now().year,
|
|
folder_icon=folder_icon,
|
|
)
|
|
return
|
|
|
|
# Try to resolve legacy URLs before giving up
|
|
redirect_to = _resolve_legacy_url(path)
|
|
if redirect_to:
|
|
api.log.info("Legacy redirect: /%s -> %s", path, redirect_to)
|
|
resp.status_code = 301
|
|
resp.headers["Location"] = redirect_to
|
|
return
|
|
|
|
# Nothing found
|
|
api.log.warning("Not found: /%s", path)
|
|
resp.status_code = 404
|
|
resp.html = render("error.html", req, f"/{path}",
|
|
title="Not Found",
|
|
current_year=datetime.now().year,
|
|
error_code=404,
|
|
error_message="Page not found.",
|
|
)
|
|
|
|
|
|
# Warm caches on startup
|
|
@api.on_event("startup")
|
|
async def warm_caches():
|
|
import threading
|
|
def _warm():
|
|
api.log.info("Starting background cache warming...")
|
|
try:
|
|
get_blog_cache()
|
|
from tuftecms.core.cache import (
|
|
get_sidenotes_cache, get_outlines_cache,
|
|
get_quotes_cache, get_connections_cache,
|
|
get_terms_cache, get_themes_cache,
|
|
)
|
|
get_sidenotes_cache()
|
|
get_outlines_cache()
|
|
get_quotes_cache()
|
|
get_connections_cache()
|
|
get_terms_cache()
|
|
get_themes_cache()
|
|
_build_search_index()
|
|
api.log.info("Cache warming complete!")
|
|
except Exception as e:
|
|
api.log.error("Cache warming failed: %s", e)
|
|
threading.Thread(target=_warm, daemon=True).start()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
port = int(os.environ.get("PORT", 8000))
|
|
api.run(port=port)
|