Initial commit
Some checks failed
CI / test (push) Failing after 1m20s
Lint / test (push) Failing after 28s
Trivy / test (push) Successful in 23s

This commit is contained in:
Miguel Jacq 2026-01-02 09:59:52 +11:00
commit fe58397da7
Signed by: mig5
GPG key ID: 59B3F0C24135C6A9
17 changed files with 2547 additions and 0 deletions

2
src/cspresso/__init__.py Normal file
View file

@ -0,0 +1,2 @@
__all__ = ["__version__"]
__version__ = "0.1.0"

4
src/cspresso/__main__.py Normal file
View file

@ -0,0 +1,4 @@
from .crawl import main
if __name__ == "__main__":
main()

620
src/cspresso/crawl.py Normal file
View file

@ -0,0 +1,620 @@
from __future__ import annotations
import argparse
import asyncio
import base64
import hashlib
import json
import re
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urljoin, urldefrag, urlparse
from playwright.async_api import async_playwright
from .ensure_playwright import ensure_chromium_installed
RESOURCE_TO_DIRECTIVE = {
"script": "script-src",
"stylesheet": "style-src",
"image": "img-src",
"font": "font-src",
"media": "media-src",
"xhr": "connect-src",
"fetch": "connect-src",
"websocket": "connect-src",
"eventsource": "connect-src",
}
BASELINE_DIRECTIVES = {
"default-src": {"'self'"},
"base-uri": {"'self'"},
"object-src": {"'none'"},
"frame-ancestors": {"'self'"},
"form-action": {"'self'"},
}
def origin_of(url: str) -> str:
p = urlparse(url)
if not p.scheme or not p.netloc:
return ""
return f"{p.scheme}://{p.netloc}"
def sha256_base64(s: str) -> str:
h = hashlib.sha256(s.encode("utf-8")).digest()
return base64.b64encode(h).decode("ascii")
async def collect_inline(page, *, max_attr_hashes: int = 2000):
"""
Collect inline <script> (no src), <style> blocks, plus:
- style="..." attributes (CSP3 style-src-attr / unsafe-hashes)
- inline event handler attributes (onclick="...", onload="...", etc) (CSP3 script-src-attr / unsafe-hashes)
IMPORTANT: Hashes must be computed over the EXACT string bytes. Do NOT strip.
"""
data = await page.evaluate(
"""(maxAttr) => {
const inlineScripts = [...document.querySelectorAll('script:not([src])')]
.map(s => ({
nonce: s.nonce || s.getAttribute('nonce') || null,
text: s.textContent ?? ''
}));
const inlineStyles = [...document.querySelectorAll('style')]
.map(st => ({
nonce: st.nonce || st.getAttribute('nonce') || null,
text: st.textContent ?? ''
}));
const styleAttrs = [];
const handlerAttrs = [];
// style="..."
for (const el of document.querySelectorAll('[style]')) {
if (styleAttrs.length >= maxAttr) break;
const v = el.getAttribute('style');
if (v !== null) styleAttrs.push(v);
}
// inline event handlers: on*
// Iterate elements and look for attributes starting with "on"
const all = document.querySelectorAll('*');
for (let i = 0; i < all.length; i++) {
if (handlerAttrs.length >= maxAttr) break;
const el = all[i];
const names = el.getAttributeNames ? el.getAttributeNames() : [];
for (const name of names) {
if (handlerAttrs.length >= maxAttr) break;
if (name && name.toLowerCase().startsWith('on')) {
const v = el.getAttribute(name);
if (v !== null) handlerAttrs.push(v);
}
}
}
const dataImgs = [...document.querySelectorAll('img[src^="data:"]')].length > 0;
const dataFonts = [...document.querySelectorAll('link[rel="preload"][as="font"][href^="data:"]')].length > 0;
return { inlineScripts, inlineStyles, styleAttrs, handlerAttrs, dataImgs, dataFonts };
}""",
max_attr_hashes,
)
script_nonces = {x["nonce"] for x in data["inlineScripts"] if x.get("nonce")}
style_nonces = {x["nonce"] for x in data["inlineStyles"] if x.get("nonce")}
script_hashes = set()
for x in data["inlineScripts"]:
raw = x.get("text") or ""
if raw.strip(): # skip pure-whitespace blocks, but DO NOT strip for hashing
script_hashes.add(f"'sha256-{sha256_base64(raw)}'")
style_hashes = set()
for x in data["inlineStyles"]:
raw = x.get("text") or ""
if raw.strip():
style_hashes.add(f"'sha256-{sha256_base64(raw)}'")
# style="..." attribute hashes
style_attr_hashes = set()
for v in data.get("styleAttrs") or []:
if isinstance(v, str) and v.strip():
style_attr_hashes.add(f"'sha256-{sha256_base64(v)}'")
# on*="..." handler hashes
handler_attr_hashes = set()
for v in data.get("handlerAttrs") or []:
if isinstance(v, str) and v.strip():
handler_attr_hashes.add(f"'sha256-{sha256_base64(v)}'")
return (
script_nonces,
style_nonces,
script_hashes,
style_hashes,
style_attr_hashes,
handler_attr_hashes,
bool(data.get("dataImgs")),
bool(data.get("dataFonts")),
)
async def extract_links(page, base_origin: str) -> list[str]:
hrefs = await page.evaluate(
"""() => [...document.querySelectorAll('a[href]')].map(a => a.getAttribute('href'))"""
)
out: list[str] = []
for href in hrefs or []:
if not href:
continue
abs_url = urljoin(base_origin + "/", href)
abs_url, _frag = urldefrag(abs_url)
p = urlparse(abs_url)
if p.scheme in ("http", "https") and origin_of(abs_url) == base_origin:
out.append(abs_url)
return out
def build_csp(
directives: dict[str, set[str]],
*,
base_origin: str,
nonce_detected: bool,
script_hashes: set[str],
style_hashes: set[str],
style_attr_hashes: set[str],
handler_attr_hashes: set[str],
allow_data_img: bool,
allow_data_font: bool,
allow_blob: bool,
allow_unsafe_eval: bool,
upgrade_insecure_requests: bool,
) -> str:
csp: dict[str, set[str]] = {k: set(v) for k, v in BASELINE_DIRECTIVES.items()}
# Merge observed origins into directives.
for d, vals in directives.items():
if vals:
csp.setdefault(d, set()).update(vals)
# Always keep 'self' on these directives if present.
for d in (
"script-src",
"style-src",
"img-src",
"connect-src",
"font-src",
"media-src",
"frame-src",
):
if d in csp:
csp[d].add("'self'")
# Inline handling:
# - If we detected nonce attributes, emit nonce *template*. You must replace {NONCE} per response.
if nonce_detected:
csp.setdefault("script-src", {"'self'"}).add("'nonce-{NONCE}'")
csp.setdefault("style-src", {"'self'"}).add("'nonce-{NONCE}'")
# Hashes for inline <script>/<style> blocks
if script_hashes:
csp.setdefault("script-src", {"'self'"}).update(script_hashes)
if style_hashes:
csp.setdefault("style-src", {"'self'"}).update(style_hashes)
# unsafe-hashes: needed for style="" and on*="" attribute hashes (CSP3 behavior)
# We include hashes BOTH in the base directives and the CSP3 *-attr directives for best compatibility.
if handler_attr_hashes:
csp.setdefault("script-src", {"'self'"}).add("'unsafe-hashes'")
csp["script-src"].update(handler_attr_hashes)
csp.setdefault("script-src-attr", set()).update({"'unsafe-hashes'"})
csp["script-src-attr"].update(handler_attr_hashes)
if style_attr_hashes:
csp.setdefault("style-src", {"'self'"}).add("'unsafe-hashes'")
csp["style-src"].update(style_attr_hashes)
csp.setdefault("style-src-attr", set()).update({"'unsafe-hashes'"})
csp["style-src-attr"].update(style_attr_hashes)
if allow_unsafe_eval:
csp.setdefault("script-src", {"'self'"}).add("'unsafe-eval'")
if allow_data_img:
csp.setdefault("img-src", {"'self'"}).add("data:")
if allow_data_font:
csp.setdefault("font-src", {"'self'"}).add("data:")
if allow_blob:
for d in ("img-src", "media-src", "worker-src", "connect-src"):
csp.setdefault(d, {"'self'"}).add("blob:")
if upgrade_insecure_requests:
csp["upgrade-insecure-requests"] = set()
# Serialize
parts: list[str] = []
for k in sorted(csp.keys()):
vals = csp[k]
if vals:
parts.append(f"{k} {' '.join(sorted(vals))}")
else:
parts.append(f"{k}")
return "; ".join(parts) + ";"
_SOURCEMAP_RE = re.compile(r"sourceMappingURL\s*=\s*([^\s*]+)", re.IGNORECASE)
def _looks_like_js_or_css(url: str) -> bool:
p = urlparse(url)
path = (p.path or "").lower()
return path.endswith(".js") or path.endswith(".css")
def _extract_sourcemap_origin(
asset_url: str, body_bytes: bytes, headers: dict
) -> set[str]:
out: set[str] = set()
# Header-based pointers
sm = headers.get("sourcemap") or headers.get("x-sourcemap")
if sm:
map_url = urljoin(asset_url, sm)
out.add(origin_of(map_url))
# Body-based pointer: map comment is usually near end, so just scan the tail
tail = body_bytes[
-200_000:
] # big enough to survive minification/compression quirks
text = tail.decode("utf-8", errors="ignore")
m = _SOURCEMAP_RE.search(text)
if not m:
return {o for o in out if o}
ref = m.group(1).strip().strip('"').strip("'")
if ref and not ref.startswith("data:"):
map_url = urljoin(asset_url, ref)
out.add(origin_of(map_url))
return {o for o in out if o}
@dataclass
class CrawlResult:
visited: list[str]
csp: str
nonce_detected: bool
directives: dict[str, list[str]]
notes: list[str]
async def crawl_and_generate_csp(
start_url: str,
*,
max_pages: int = 10,
timeout_ms: int = 20000,
settle_ms: int = 1500,
headless: bool = True,
browsers_path: Path | None = None,
auto_install: bool = True,
with_deps: bool = False,
allow_blob: bool = False,
allow_unsafe_eval: bool = False,
upgrade_insecure_requests: bool = False,
include_sourcemaps: bool = False,
) -> CrawlResult:
start_url, _ = urldefrag(start_url)
base_origin = origin_of(start_url)
if not base_origin:
raise ValueError(f"Invalid start URL: {start_url}")
if auto_install:
await ensure_chromium_installed(
browsers_path=browsers_path, with_deps=with_deps
)
visited: set[str] = set()
q: deque[str] = deque([start_url])
# Collect CSP ingredients
directives: dict[str, set[str]] = {
d: set() for d in set(RESOURCE_TO_DIRECTIVE.values()) | {"frame-src"}
}
script_hashes: set[str] = set()
style_hashes: set[str] = set()
style_attr_hashes: set[str] = set()
handler_attr_hashes: set[str] = set()
nonce_detected = False
allow_data_img = False
allow_data_font = False
notes: list[str] = []
async with async_playwright() as p:
browser = await p.chromium.launch(headless=headless)
context = await browser.new_context()
def on_request(req):
"""
Playwright sometimes classifies "connect-like" activity as resource_type == "other".
Heuristic: treat resource_type=="other" with sec-fetch-dest=="empty" as connect-src.
"""
try:
url = req.url
parsed = urlparse(url)
if parsed.scheme not in ("http", "https", "ws", "wss"):
return
rtype = req.resource_type
directive = RESOURCE_TO_DIRECTIVE.get(rtype)
if directive is None and rtype == "other":
hdrs = {k.lower(): v for k, v in (req.headers or {}).items()}
# For fetch/xhr/beacon/pings, browsers typically send: sec-fetch-dest: empty
if (hdrs.get("sec-fetch-dest") or "").lower() == "empty":
directive = "connect-src"
if directive is None:
return
req_origin = origin_of(url)
if req_origin and req_origin != base_origin:
directives.setdefault(directive, set()).add(req_origin)
except Exception:
return
context.on("request", on_request)
max_queue = max_pages * 20
while q and len(visited) < max_pages:
url = q.popleft()
if url in visited:
continue
visited.add(url)
page = await context.new_page()
pending: set[asyncio.Task] = set()
if include_sourcemaps:
async def handle_response(resp):
try:
url = resp.url
if not _looks_like_js_or_css(url):
return
headers = {
k.lower(): v for k, v in (resp.headers or {}).items()
}
# Read the *actual* bytes the browser received
body = await resp.body()
origins = _extract_sourcemap_origin(url, body, headers)
for o in origins:
if o and o != base_origin:
directives.setdefault("connect-src", set()).add(o)
except Exception:
# If you want to debug failures, print(traceback.format_exc())
return
def on_response(resp):
t = asyncio.create_task(handle_response(resp))
pending.add(t)
t.add_done_callback(lambda _t: pending.discard(_t))
page.on("response", on_response)
try:
await page.goto(url, wait_until="networkidle", timeout=timeout_ms)
# Give the page a moment to run hydration / delayed fetches.
if settle_ms > 0:
await page.wait_for_timeout(settle_ms)
(
s_nonces,
st_nonces,
s_hashes,
st_hashes,
st_attr_hashes,
h_attr_hashes,
has_data_img,
has_data_font,
) = await collect_inline(page)
if include_sourcemaps and pending:
# Give the handler a moment to finish reading bodies
await asyncio.wait(pending, timeout=5.0)
if s_nonces or st_nonces:
nonce_detected = True
script_hashes.update(s_hashes)
style_hashes.update(st_hashes)
style_attr_hashes.update(st_attr_hashes)
handler_attr_hashes.update(h_attr_hashes)
allow_data_img = allow_data_img or has_data_img
allow_data_font = allow_data_font or has_data_font
# Frame destinations
for fr in page.frames:
if fr.url and fr.url != "about:blank":
fr_origin = origin_of(fr.url)
if fr_origin and fr_origin != base_origin:
directives["frame-src"].add(fr_origin)
# Enqueue same-origin links
links = await extract_links(page, base_origin)
for link in links:
if link not in visited and link not in q and len(q) < max_queue:
q.append(link)
finally:
await page.close()
await browser.close()
csp = build_csp(
directives=directives,
base_origin=base_origin,
nonce_detected=nonce_detected,
script_hashes=script_hashes,
style_hashes=style_hashes,
style_attr_hashes=style_attr_hashes,
handler_attr_hashes=handler_attr_hashes,
allow_data_img=allow_data_img,
allow_data_font=allow_data_font,
allow_blob=allow_blob,
allow_unsafe_eval=allow_unsafe_eval,
upgrade_insecure_requests=upgrade_insecure_requests,
)
if style_attr_hashes or handler_attr_hashes:
notes.append(
'Detected inline attribute code (style="..." and/or on*="..."). '
"Hashes for these require 'unsafe-hashes' (and modern browsers may use style-src-attr/script-src-attr)."
)
if nonce_detected:
notes.append(
"Nonce detected: replace {NONCE} per HTML response (server must generate and inject nonce)."
)
directives_out = {k: sorted(v) for k, v in directives.items() if v}
return CrawlResult(
visited=sorted(visited),
csp=csp,
nonce_detected=nonce_detected,
directives=directives_out,
notes=notes,
)
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
ap = argparse.ArgumentParser(
prog="csp-crawl",
description="Crawl up to N pages (same-origin) with Playwright and generate a draft CSP.",
)
ap.add_argument("url", help="Start URL (e.g. https://example.com)")
ap.add_argument(
"--max-pages",
type=int,
default=10,
help="Maximum number of pages to visit (default: 10)",
)
ap.add_argument(
"--timeout-ms",
type=int,
default=20000,
help="Navigation timeout in ms (default: 20000)",
)
ap.add_argument(
"--settle-ms",
type=int,
default=1500,
help="Extra time after networkidle to allow hydration/delayed requests (default: 1500)",
)
ap.add_argument(
"--headed",
action="store_true",
help="Run with a visible browser window (not headless)",
)
ap.add_argument(
"--no-install",
action="store_true",
help="Do not auto-install Chromium if missing",
)
ap.add_argument(
"--with-deps",
action="store_true",
help="When installing, include Playwright OS deps (Linux). May require elevated privileges.",
)
ap.add_argument(
"--browsers-path",
default=None,
help="Directory to install/playwright browsers (default: ./.pw-browsers).",
)
ap.add_argument(
"--allow-blob",
action="store_true",
help="Include blob: in common directives (drafty)",
)
ap.add_argument(
"--unsafe-eval",
action="store_true",
help="Include 'unsafe-eval' in script-src (not recommended)",
)
ap.add_argument(
"--upgrade-insecure-requests",
action="store_true",
help="Add upgrade-insecure-requests directive",
)
ap.add_argument(
"--include-sourcemaps",
action="store_true",
default=False,
help="Analyze JS/CSS for sourceMappingURL and add map origins to connect-src",
)
ap.add_argument(
"--json", action="store_true", help="Output JSON instead of a header line"
)
return ap.parse_args(argv)
def main(argv: list[str] | None = None) -> None:
args = _parse_args(argv)
browsers_path = Path(args.browsers_path).resolve() if args.browsers_path else None
result = asyncio.run(
crawl_and_generate_csp(
args.url,
max_pages=args.max_pages,
timeout_ms=args.timeout_ms,
settle_ms=args.settle_ms,
headless=not args.headed,
browsers_path=browsers_path,
auto_install=not args.no_install,
with_deps=args.with_deps,
allow_blob=args.allow_blob,
allow_unsafe_eval=args.unsafe_eval,
upgrade_insecure_requests=args.upgrade_insecure_requests,
include_sourcemaps=args.include_sourcemaps,
)
)
if args.json:
print(
json.dumps(
{
"visited": result.visited,
"nonce_detected": result.nonce_detected,
"csp": result.csp,
"directives": result.directives,
"notes": result.notes,
},
indent=2,
sort_keys=True,
)
)
return
# Default: print header + visited pages as comments.
for u in result.visited:
print(f"# visited: {u}")
for n in result.notes:
print(f"# NOTE: {n}")
print("Content-Security-Policy:", result.csp)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,115 @@
from __future__ import annotations
import os
import sys
import time
import subprocess
from dataclasses import dataclass
from pathlib import Path
from playwright.async_api import async_playwright, Error as PlaywrightError
@dataclass(frozen=True)
class EnsureResult:
browsers_path: Path
installed: bool
def _default_browsers_path() -> Path:
# Project-local by default. Override with PLAYWRIGHT_BROWSERS_PATH or CLI flag.
return Path(__file__).resolve().parents[2] / ".pw-browsers"
def _env_with_browsers_path(browsers_path: Path) -> dict[str, str]:
env = os.environ.copy()
env["PLAYWRIGHT_BROWSERS_PATH"] = str(browsers_path)
return env
def _acquire_install_lock(
lock_path: Path, timeout_s: float = 120.0, poll_s: float = 0.2
) -> None:
"""Very small cross-platform lock using atomic file creation.
Avoids concurrent Playwright installs when multiple processes start at once.
Not perfect, but good enough for most CLI usage.
"""
start = time.time()
while True:
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.close(fd)
return
except FileExistsError:
if time.time() - start > timeout_s:
raise TimeoutError(f"Timed out waiting for install lock: {lock_path}")
time.sleep(poll_s)
def _release_install_lock(lock_path: Path) -> None:
try:
lock_path.unlink(missing_ok=True) # Python 3.8+
except Exception:
pass
def _install_chromium(browsers_path: Path, with_deps: bool = False) -> None:
env = _env_with_browsers_path(browsers_path)
cmd = [sys.executable, "-m", "playwright", "install"]
if with_deps:
cmd.append("--with-deps")
cmd.append("chromium")
subprocess.run(cmd, check=True, env=env)
async def _can_launch_chromium(browsers_path: Path) -> bool:
# Ensure this process uses the same path too.
os.environ["PLAYWRIGHT_BROWSERS_PATH"] = str(browsers_path)
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
await browser.close()
return True
except PlaywrightError:
return False
async def ensure_chromium_installed(
browsers_path: Path | None = None,
*,
with_deps: bool = False,
lock_timeout_s: float = 120.0,
) -> EnsureResult:
"""Ensure Playwright's Chromium is installed and launchable.
Strategy:
- Attempt a tiny headless launch.
- If it fails, acquire a lock and run `python -m playwright install chromium` (optionally --with-deps).
- Retry launch once.
"""
bp = browsers_path or _default_browsers_path()
bp.mkdir(parents=True, exist_ok=True)
if await _can_launch_chromium(bp):
return EnsureResult(browsers_path=bp, installed=False)
lock_path = bp / ".install.lock"
_acquire_install_lock(lock_path, timeout_s=lock_timeout_s)
try:
# Another process might have installed while we waited; check again.
if await _can_launch_chromium(bp):
return EnsureResult(browsers_path=bp, installed=False)
_install_chromium(bp, with_deps=with_deps)
if not await _can_launch_chromium(bp):
raise RuntimeError(
"Playwright Chromium install completed, but Chromium still failed to launch. "
"On Linux, you may need additional system dependencies."
)
return EnsureResult(browsers_path=bp, installed=True)
finally:
_release_install_lock(lock_path)