#!/usr/bin/env python3 from __future__ import annotations import importlib import json import os import re import shutil import subprocess import sys import textwrap import traceback from pathlib import Path from typing import Any, Callable from unittest.mock import patch from ansi2html import Ansi2HTMLConverter from devtools import PrettyFormat THIS_DIR = Path(__file__).parent DOCS_DIR = (THIS_DIR / '..').resolve() EXAMPLES_DIR = DOCS_DIR / 'examples' TMP_EXAMPLES_DIR = DOCS_DIR / '.tmp_examples' UPGRADED_TMP_EXAMPLES_DIR = TMP_EXAMPLES_DIR / 'upgraded' MAX_LINE_LENGTH = int( re.search(r'max_line_length = (\d+)', (EXAMPLES_DIR / '.editorconfig').read_text()).group(1) # type: ignore ) LONG_LINE = 50 LOWEST_VERSION = (3, 7) HIGHEST_VERSION = (3, 10) pformat = PrettyFormat(simple_cutoff=LONG_LINE) Error = Callable[..., None] Version = tuple[int, int] PYTHON_CODE_MD_TMPL = """ === "Python {version} and above" ```py {code} ``` """.strip() JSON_OUTPUT_MD_TMPL = """ Outputs: ```json {output} ``` """ def to_string(value: Any) -> str: # attempt to build a pretty equivalent of the print output if isinstance(value, (dict, list, tuple, set)): return pformat(value) elif isinstance(value, str) and any(re.fullmatch(r, value, flags=re.DOTALL) for r in ['{".+}', r'\[.+\]']): try: obj = json.loads(value) except ValueError: # not JSON, not a problem pass else: s = json.dumps(obj) if len(s) > LONG_LINE: json.dumps(obj, indent=2) else: return s return str(value) class MockPrint: def __init__(self, file: Path) -> None: self.file = file self.statements: list[tuple[int, str]] = [] def __call__(self, *args: Any, sep: str = ' ', **kwargs: Any) -> None: frame = sys._getframe(4) if sys.version_info >= (3, 8) else sys._getframe(3) if not self.file.samefile(frame.f_code.co_filename): # happens when index_error.py imports index_main.py return s = sep.join(map(to_string, args)) self.statements.append((frame.f_lineno, s)) class MockPath: def read_text(self, *args: Any, **kwargs: Any) -> str: return '{"foobar": "spam"}' def build_print_lines(s: str, max_len_reduction: int = 0) -> list[str]: print_lines = [] max_len = MAX_LINE_LENGTH - 3 - max_len_reduction for line in s.split('\n'): if len(line) > max_len: print_lines += textwrap.wrap(line, width=max_len) else: print_lines.append(line) return print_lines def build_print_statement(line_no: int, s: str, lines: list[str]) -> None: indent = '' for back in range(1, 100): m = re.search(r'^( *)print\(', lines[line_no - back]) if m: indent = m.group(1) break print_lines = build_print_lines(s, len(indent)) if len(print_lines) > 2: text = textwrap.indent('"""\n{}\n"""'.format('\n'.join(print_lines)), indent) else: text = '\n'.join(f'{indent}#> {line}' for line in print_lines) lines.insert(line_no, text) def all_md_contents() -> str: file_contents = [] for f in DOCS_DIR.glob('**/*.md'): file_contents.append(f.read_text()) return '\n\n\n'.join(file_contents) def gen_ansi_output() -> None: conv = Ansi2HTMLConverter() input_file = EXAMPLES_DIR / 'devtools_main.py' os.environ['PY_DEVTOOLS_HIGHLIGHT'] = 'true' p = subprocess.run((sys.executable, str(input_file)), stdout=subprocess.PIPE, check=True, encoding='utf8') html = conv.convert(p.stdout, full=False).strip('\r\n') full_html = f'
\n
\n{html}\n
\n
' path = TMP_EXAMPLES_DIR / f'{input_file.stem}.html' path.write_text(full_html) print(f'generated ansi output to {path}') dont_execute_re = re.compile(r'^# dont-execute\n', flags=re.M | re.I) dont_upgrade_re = re.compile(r'^# dont-upgrade\n', flags=re.M | re.I) requires_re = re.compile(r'^# requires: *(.+)\n', flags=re.M | re.I) required_py_re = re.compile(r'^# *requires *python *(\d+).(\d+)', flags=re.M) def should_execute(file_name: str, file_text: str) -> tuple[str, bool, Version]: m = required_py_re.search(file_text) if m: lowest_version = (int(m.groups()[0]), int(m.groups()[1])) if sys.version_info >= lowest_version: return required_py_re.sub('', file_text), True, lowest_version else: v = '.'.join(m.groups()) print(f'WARNING: {file_name} requires python {v}, not running') return ( required_py_re.sub(f'# requires python {v}, NOT EXECUTED!', file_text), False, lowest_version, ) elif dont_execute_re.search(file_text): return dont_execute_re.sub('', file_text), False, LOWEST_VERSION return file_text, True, LOWEST_VERSION def should_upgrade(file_text: str) -> tuple[str, bool]: if dont_upgrade_re.search(file_text): return dont_upgrade_re.sub('', file_text), False return file_text, True def get_requirements(file_text: str) -> tuple[str, str | None]: m = requires_re.search(file_text) if m: return requires_re.sub('', file_text), m.groups()[0] return file_text, None def exec_file(file: Path, file_text: str, error: Error) -> tuple[list[str], str | None]: no_print_intercept_re = re.compile(r'^# no-print-intercept\n', flags=re.M) print_intercept = not bool(no_print_intercept_re.search(file_text)) if not print_intercept: file_text = no_print_intercept_re.sub('', file_text) if file.stem in sys.modules: del sys.modules[file.stem] mp = MockPrint(file) mod = None with patch.object(Path, 'read_text', MockPath.read_text), patch('builtins.print') as patch_print: if print_intercept: patch_print.side_effect = mp try: mod = importlib.import_module(file.stem) except Exception: tb = traceback.format_exception(*sys.exc_info()) error(''.join(e for e in tb if '/pydantic/docs/examples/' in e or not e.startswith(' File '))) if mod and mod.__file__ != str(file): error(f'module path "{mod.__file__}" is not same as "{file}", name may shadow another module?') lines = file_text.split('\n') to_json_line = '# output-json' if to_json_line in lines: lines = [line for line in lines if line != to_json_line] if len(mp.statements) != 1: error('should have exactly one print statement') print_lines = build_print_lines(mp.statements[0][1]) return lines, '\n'.join(print_lines) + '\n' else: for line_no, print_string in reversed(mp.statements): build_print_statement(line_no, print_string, lines) return lines, None def filter_lines(lines: list[str], error: Any) -> tuple[list[str], bool]: ignored_above = False try: ignore_above = lines.index('# ignore-above') except ValueError: pass else: ignored_above = True lines = lines[ignore_above + 1 :] try: ignore_below = lines.index('# ignore-below') except ValueError: pass else: lines = lines[:ignore_below] lines = '\n'.join(lines).split('\n') if any(len(line) > MAX_LINE_LENGTH for line in lines): error(f'lines longer than {MAX_LINE_LENGTH} characters') return lines, ignored_above def upgrade_code(content: str, min_version: Version = HIGHEST_VERSION) -> str: import pyupgrade._main # type: ignore import autoflake # type: ignore upgraded = pyupgrade._main._fix_plugins( content, settings=pyupgrade._main.Settings( min_version=min_version, keep_percent_format=True, keep_mock=False, keep_runtime_typing=True, ), ) upgraded = autoflake.fix_code(upgraded, remove_all_unused_imports=True) return upgraded def ensure_used(file: Path, all_md: str, error: Error) -> None: """Ensures that example is used appropriately""" file_tmpl = '{{!.tmp_examples/{}!}}' md_name = file.stem + '.md' if file_tmpl.format(md_name) not in all_md: if file_tmpl.format(file.name) in all_md: error( f'incorrect usage, change filename to {md_name!r} in docs.' "make sure you don't specify ```py code blocks around examples," 'they are automatically generated now.' ) else: error( 'file not used anywhere. correct usage:', file_tmpl.format(md_name), ) def check_style(file_text: str, error: Error) -> None: if '\n\n\n\n' in file_text: error('too many new lines') if not file_text.endswith('\n'): error('no trailing new line') if re.search('^ *# *>', file_text, flags=re.M): error('contains comments with print output, please remove') def populate_upgraded_versions(file: Path, file_text: str, lowest_version: Version) -> list[tuple[Path, str, Version]]: versions = [] major, minor = lowest_version assert major == HIGHEST_VERSION[0], 'Wow, Python 4 is out? Congrats!' upgraded_file_text = file_text while minor < HIGHEST_VERSION[1]: minor += 1 new_file_text = upgrade_code(file_text, min_version=(major, minor)) if upgraded_file_text != new_file_text: upgraded_file_text = new_file_text new_file = UPGRADED_TMP_EXAMPLES_DIR / (file.stem + f'_{major}_{minor}' + file.suffix) new_file.write_text(upgraded_file_text) versions.append((new_file, upgraded_file_text, (major, minor))) return versions def v2_hack() -> bool: """ Temporary hack while working on V2. TODO remove once tests are working again (although test generation needs to be completely rewritten to match watchfiles et. al.) """ return True def exec_examples() -> int: # noqa: C901 (I really don't want to decompose it any further) errors = [] all_md = all_md_contents() new_files = {} os.environ.update( { 'my_auth_key': 'xxx', 'my_api_key': 'xxx', 'database_dsn': 'postgres://postgres@localhost:5432/env_db', 'v0': '0', 'sub_model': '{"v1": "json-1", "v2": "json-2"}', 'sub_model__v2': 'nested-2', 'sub_model__v3': '3', 'sub_model__deep__v4': 'v4', } ) sys.path.append(str(EXAMPLES_DIR)) if sys.version_info < HIGHEST_VERSION: print("WARNING: examples for 3.10+ requires python 3.10. They won't be executed") else: UPGRADED_TMP_EXAMPLES_DIR.mkdir(parents=True, exist_ok=True) sys.path.append(str(UPGRADED_TMP_EXAMPLES_DIR)) for file in sorted(EXAMPLES_DIR.iterdir()): markdown_name = file.stem + '.md' def error(*desc: str) -> None: errors.append((file, desc)) previous_frame = sys._getframe(1) filename = Path(previous_frame.f_globals['__file__']).relative_to(Path.cwd()) location = f'{filename}:{previous_frame.f_lineno}' sys.stderr.write(f'{location}: error in {file.relative_to(Path.cwd())}:\n{" ".join(desc)}\n') if not file.is_file(): # __pycache__, maybe others continue if file.suffix != '.py': # just copy new_files[file.name] = file.read_text() continue file_text = file.read_text('utf-8') ensure_used(file, all_md, error) check_style(file_text, error) if v2_hack(): new_files[markdown_name] = f'```py\n{file_text}\n```\n' continue file_text, execute, lowest_version = should_execute(file.name, file_text) file_text, upgrade = should_upgrade(file_text) file_text, requirements = get_requirements(file_text) if upgrade and upgrade_code(file_text, min_version=lowest_version) != file_text: error("pyupgrade would upgrade file. If it's not desired, add '# dont-upgrade' line at the top of the file") versions: list[tuple[Path, str, Version]] = [(file, file_text, lowest_version)] if upgrade: versions.extend(populate_upgraded_versions(file, file_text, lowest_version)) json_outputs: set[str | None] = set() should_run_as_is = not requirements final_content: list[str] = [] for file, file_text, lowest_version in versions: if execute and sys.version_info >= lowest_version: lines, json_output = exec_file(file, file_text, error) json_outputs.add(json_output) else: lines = file_text.split('\n') lines, ignored_lines_before_script = filter_lines(lines, error) should_run_as_is = should_run_as_is and not ignored_lines_before_script final_content.append( PYTHON_CODE_MD_TMPL.format( version='.'.join(map(str, lowest_version)), code=textwrap.indent('\n'.join(lines), ' '), ) ) if should_run_as_is: final_content.append('_(This script is complete, it should run "as is")_') elif requirements: final_content.append(f'_(This script requires {requirements})_') else: error( 'script may not run as is, but requirements were not specified.', 'specify `# requires: ` in the end of the script', ) if len(json_outputs) > 1: error('json output should not differ between versions') if json_outputs: json_output, *_ = json_outputs if json_output: final_content.append(JSON_OUTPUT_MD_TMPL.format(output=json_output)) new_files[markdown_name] = '\n'.join(final_content) if errors: print(f'\n{len(errors)} errors, not writing files\n') return 1 if TMP_EXAMPLES_DIR.exists(): shutil.rmtree(TMP_EXAMPLES_DIR) print(f'writing {len(new_files)} example files to {TMP_EXAMPLES_DIR}') TMP_EXAMPLES_DIR.mkdir() for file_name, content in new_files.items(): (TMP_EXAMPLES_DIR / file_name).write_text(content, 'utf-8') gen_ansi_output() return 0 if __name__ == '__main__': sys.exit(exec_examples())