cspresso/src/cspresso/crawl.py
Miguel Jacq fe58397da7
Some checks failed
CI / test (push) Failing after 1m20s
Lint / test (push) Failing after 28s
Trivy / test (push) Successful in 23s
Initial commit
2026-01-02 09:59:52 +11:00

620 lines
20 KiB
Python

from __future__ import annotations
import argparse
import asyncio
import base64
import hashlib
import json
import re
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from urllib.parse import urljoin, urldefrag, urlparse
from playwright.async_api import async_playwright
from .ensure_playwright import ensure_chromium_installed
RESOURCE_TO_DIRECTIVE = {
"script": "script-src",
"stylesheet": "style-src",
"image": "img-src",
"font": "font-src",
"media": "media-src",
"xhr": "connect-src",
"fetch": "connect-src",
"websocket": "connect-src",
"eventsource": "connect-src",
}
BASELINE_DIRECTIVES = {
"default-src": {"'self'"},
"base-uri": {"'self'"},
"object-src": {"'none'"},
"frame-ancestors": {"'self'"},
"form-action": {"'self'"},
}
def origin_of(url: str) -> str:
p = urlparse(url)
if not p.scheme or not p.netloc:
return ""
return f"{p.scheme}://{p.netloc}"
def sha256_base64(s: str) -> str:
h = hashlib.sha256(s.encode("utf-8")).digest()
return base64.b64encode(h).decode("ascii")
async def collect_inline(page, *, max_attr_hashes: int = 2000):
"""
Collect inline <script> (no src), <style> blocks, plus:
- style="..." attributes (CSP3 style-src-attr / unsafe-hashes)
- inline event handler attributes (onclick="...", onload="...", etc) (CSP3 script-src-attr / unsafe-hashes)
IMPORTANT: Hashes must be computed over the EXACT string bytes. Do NOT strip.
"""
data = await page.evaluate(
"""(maxAttr) => {
const inlineScripts = [...document.querySelectorAll('script:not([src])')]
.map(s => ({
nonce: s.nonce || s.getAttribute('nonce') || null,
text: s.textContent ?? ''
}));
const inlineStyles = [...document.querySelectorAll('style')]
.map(st => ({
nonce: st.nonce || st.getAttribute('nonce') || null,
text: st.textContent ?? ''
}));
const styleAttrs = [];
const handlerAttrs = [];
// style="..."
for (const el of document.querySelectorAll('[style]')) {
if (styleAttrs.length >= maxAttr) break;
const v = el.getAttribute('style');
if (v !== null) styleAttrs.push(v);
}
// inline event handlers: on*
// Iterate elements and look for attributes starting with "on"
const all = document.querySelectorAll('*');
for (let i = 0; i < all.length; i++) {
if (handlerAttrs.length >= maxAttr) break;
const el = all[i];
const names = el.getAttributeNames ? el.getAttributeNames() : [];
for (const name of names) {
if (handlerAttrs.length >= maxAttr) break;
if (name && name.toLowerCase().startsWith('on')) {
const v = el.getAttribute(name);
if (v !== null) handlerAttrs.push(v);
}
}
}
const dataImgs = [...document.querySelectorAll('img[src^="data:"]')].length > 0;
const dataFonts = [...document.querySelectorAll('link[rel="preload"][as="font"][href^="data:"]')].length > 0;
return { inlineScripts, inlineStyles, styleAttrs, handlerAttrs, dataImgs, dataFonts };
}""",
max_attr_hashes,
)
script_nonces = {x["nonce"] for x in data["inlineScripts"] if x.get("nonce")}
style_nonces = {x["nonce"] for x in data["inlineStyles"] if x.get("nonce")}
script_hashes = set()
for x in data["inlineScripts"]:
raw = x.get("text") or ""
if raw.strip(): # skip pure-whitespace blocks, but DO NOT strip for hashing
script_hashes.add(f"'sha256-{sha256_base64(raw)}'")
style_hashes = set()
for x in data["inlineStyles"]:
raw = x.get("text") or ""
if raw.strip():
style_hashes.add(f"'sha256-{sha256_base64(raw)}'")
# style="..." attribute hashes
style_attr_hashes = set()
for v in data.get("styleAttrs") or []:
if isinstance(v, str) and v.strip():
style_attr_hashes.add(f"'sha256-{sha256_base64(v)}'")
# on*="..." handler hashes
handler_attr_hashes = set()
for v in data.get("handlerAttrs") or []:
if isinstance(v, str) and v.strip():
handler_attr_hashes.add(f"'sha256-{sha256_base64(v)}'")
return (
script_nonces,
style_nonces,
script_hashes,
style_hashes,
style_attr_hashes,
handler_attr_hashes,
bool(data.get("dataImgs")),
bool(data.get("dataFonts")),
)
async def extract_links(page, base_origin: str) -> list[str]:
hrefs = await page.evaluate(
"""() => [...document.querySelectorAll('a[href]')].map(a => a.getAttribute('href'))"""
)
out: list[str] = []
for href in hrefs or []:
if not href:
continue
abs_url = urljoin(base_origin + "/", href)
abs_url, _frag = urldefrag(abs_url)
p = urlparse(abs_url)
if p.scheme in ("http", "https") and origin_of(abs_url) == base_origin:
out.append(abs_url)
return out
def build_csp(
directives: dict[str, set[str]],
*,
base_origin: str,
nonce_detected: bool,
script_hashes: set[str],
style_hashes: set[str],
style_attr_hashes: set[str],
handler_attr_hashes: set[str],
allow_data_img: bool,
allow_data_font: bool,
allow_blob: bool,
allow_unsafe_eval: bool,
upgrade_insecure_requests: bool,
) -> str:
csp: dict[str, set[str]] = {k: set(v) for k, v in BASELINE_DIRECTIVES.items()}
# Merge observed origins into directives.
for d, vals in directives.items():
if vals:
csp.setdefault(d, set()).update(vals)
# Always keep 'self' on these directives if present.
for d in (
"script-src",
"style-src",
"img-src",
"connect-src",
"font-src",
"media-src",
"frame-src",
):
if d in csp:
csp[d].add("'self'")
# Inline handling:
# - If we detected nonce attributes, emit nonce *template*. You must replace {NONCE} per response.
if nonce_detected:
csp.setdefault("script-src", {"'self'"}).add("'nonce-{NONCE}'")
csp.setdefault("style-src", {"'self'"}).add("'nonce-{NONCE}'")
# Hashes for inline <script>/<style> blocks
if script_hashes:
csp.setdefault("script-src", {"'self'"}).update(script_hashes)
if style_hashes:
csp.setdefault("style-src", {"'self'"}).update(style_hashes)
# unsafe-hashes: needed for style="" and on*="" attribute hashes (CSP3 behavior)
# We include hashes BOTH in the base directives and the CSP3 *-attr directives for best compatibility.
if handler_attr_hashes:
csp.setdefault("script-src", {"'self'"}).add("'unsafe-hashes'")
csp["script-src"].update(handler_attr_hashes)
csp.setdefault("script-src-attr", set()).update({"'unsafe-hashes'"})
csp["script-src-attr"].update(handler_attr_hashes)
if style_attr_hashes:
csp.setdefault("style-src", {"'self'"}).add("'unsafe-hashes'")
csp["style-src"].update(style_attr_hashes)
csp.setdefault("style-src-attr", set()).update({"'unsafe-hashes'"})
csp["style-src-attr"].update(style_attr_hashes)
if allow_unsafe_eval:
csp.setdefault("script-src", {"'self'"}).add("'unsafe-eval'")
if allow_data_img:
csp.setdefault("img-src", {"'self'"}).add("data:")
if allow_data_font:
csp.setdefault("font-src", {"'self'"}).add("data:")
if allow_blob:
for d in ("img-src", "media-src", "worker-src", "connect-src"):
csp.setdefault(d, {"'self'"}).add("blob:")
if upgrade_insecure_requests:
csp["upgrade-insecure-requests"] = set()
# Serialize
parts: list[str] = []
for k in sorted(csp.keys()):
vals = csp[k]
if vals:
parts.append(f"{k} {' '.join(sorted(vals))}")
else:
parts.append(f"{k}")
return "; ".join(parts) + ";"
_SOURCEMAP_RE = re.compile(r"sourceMappingURL\s*=\s*([^\s*]+)", re.IGNORECASE)
def _looks_like_js_or_css(url: str) -> bool:
p = urlparse(url)
path = (p.path or "").lower()
return path.endswith(".js") or path.endswith(".css")
def _extract_sourcemap_origin(
asset_url: str, body_bytes: bytes, headers: dict
) -> set[str]:
out: set[str] = set()
# Header-based pointers
sm = headers.get("sourcemap") or headers.get("x-sourcemap")
if sm:
map_url = urljoin(asset_url, sm)
out.add(origin_of(map_url))
# Body-based pointer: map comment is usually near end, so just scan the tail
tail = body_bytes[
-200_000:
] # big enough to survive minification/compression quirks
text = tail.decode("utf-8", errors="ignore")
m = _SOURCEMAP_RE.search(text)
if not m:
return {o for o in out if o}
ref = m.group(1).strip().strip('"').strip("'")
if ref and not ref.startswith("data:"):
map_url = urljoin(asset_url, ref)
out.add(origin_of(map_url))
return {o for o in out if o}
@dataclass
class CrawlResult:
visited: list[str]
csp: str
nonce_detected: bool
directives: dict[str, list[str]]
notes: list[str]
async def crawl_and_generate_csp(
start_url: str,
*,
max_pages: int = 10,
timeout_ms: int = 20000,
settle_ms: int = 1500,
headless: bool = True,
browsers_path: Path | None = None,
auto_install: bool = True,
with_deps: bool = False,
allow_blob: bool = False,
allow_unsafe_eval: bool = False,
upgrade_insecure_requests: bool = False,
include_sourcemaps: bool = False,
) -> CrawlResult:
start_url, _ = urldefrag(start_url)
base_origin = origin_of(start_url)
if not base_origin:
raise ValueError(f"Invalid start URL: {start_url}")
if auto_install:
await ensure_chromium_installed(
browsers_path=browsers_path, with_deps=with_deps
)
visited: set[str] = set()
q: deque[str] = deque([start_url])
# Collect CSP ingredients
directives: dict[str, set[str]] = {
d: set() for d in set(RESOURCE_TO_DIRECTIVE.values()) | {"frame-src"}
}
script_hashes: set[str] = set()
style_hashes: set[str] = set()
style_attr_hashes: set[str] = set()
handler_attr_hashes: set[str] = set()
nonce_detected = False
allow_data_img = False
allow_data_font = False
notes: list[str] = []
async with async_playwright() as p:
browser = await p.chromium.launch(headless=headless)
context = await browser.new_context()
def on_request(req):
"""
Playwright sometimes classifies "connect-like" activity as resource_type == "other".
Heuristic: treat resource_type=="other" with sec-fetch-dest=="empty" as connect-src.
"""
try:
url = req.url
parsed = urlparse(url)
if parsed.scheme not in ("http", "https", "ws", "wss"):
return
rtype = req.resource_type
directive = RESOURCE_TO_DIRECTIVE.get(rtype)
if directive is None and rtype == "other":
hdrs = {k.lower(): v for k, v in (req.headers or {}).items()}
# For fetch/xhr/beacon/pings, browsers typically send: sec-fetch-dest: empty
if (hdrs.get("sec-fetch-dest") or "").lower() == "empty":
directive = "connect-src"
if directive is None:
return
req_origin = origin_of(url)
if req_origin and req_origin != base_origin:
directives.setdefault(directive, set()).add(req_origin)
except Exception:
return
context.on("request", on_request)
max_queue = max_pages * 20
while q and len(visited) < max_pages:
url = q.popleft()
if url in visited:
continue
visited.add(url)
page = await context.new_page()
pending: set[asyncio.Task] = set()
if include_sourcemaps:
async def handle_response(resp):
try:
url = resp.url
if not _looks_like_js_or_css(url):
return
headers = {
k.lower(): v for k, v in (resp.headers or {}).items()
}
# Read the *actual* bytes the browser received
body = await resp.body()
origins = _extract_sourcemap_origin(url, body, headers)
for o in origins:
if o and o != base_origin:
directives.setdefault("connect-src", set()).add(o)
except Exception:
# If you want to debug failures, print(traceback.format_exc())
return
def on_response(resp):
t = asyncio.create_task(handle_response(resp))
pending.add(t)
t.add_done_callback(lambda _t: pending.discard(_t))
page.on("response", on_response)
try:
await page.goto(url, wait_until="networkidle", timeout=timeout_ms)
# Give the page a moment to run hydration / delayed fetches.
if settle_ms > 0:
await page.wait_for_timeout(settle_ms)
(
s_nonces,
st_nonces,
s_hashes,
st_hashes,
st_attr_hashes,
h_attr_hashes,
has_data_img,
has_data_font,
) = await collect_inline(page)
if include_sourcemaps and pending:
# Give the handler a moment to finish reading bodies
await asyncio.wait(pending, timeout=5.0)
if s_nonces or st_nonces:
nonce_detected = True
script_hashes.update(s_hashes)
style_hashes.update(st_hashes)
style_attr_hashes.update(st_attr_hashes)
handler_attr_hashes.update(h_attr_hashes)
allow_data_img = allow_data_img or has_data_img
allow_data_font = allow_data_font or has_data_font
# Frame destinations
for fr in page.frames:
if fr.url and fr.url != "about:blank":
fr_origin = origin_of(fr.url)
if fr_origin and fr_origin != base_origin:
directives["frame-src"].add(fr_origin)
# Enqueue same-origin links
links = await extract_links(page, base_origin)
for link in links:
if link not in visited and link not in q and len(q) < max_queue:
q.append(link)
finally:
await page.close()
await browser.close()
csp = build_csp(
directives=directives,
base_origin=base_origin,
nonce_detected=nonce_detected,
script_hashes=script_hashes,
style_hashes=style_hashes,
style_attr_hashes=style_attr_hashes,
handler_attr_hashes=handler_attr_hashes,
allow_data_img=allow_data_img,
allow_data_font=allow_data_font,
allow_blob=allow_blob,
allow_unsafe_eval=allow_unsafe_eval,
upgrade_insecure_requests=upgrade_insecure_requests,
)
if style_attr_hashes or handler_attr_hashes:
notes.append(
'Detected inline attribute code (style="..." and/or on*="..."). '
"Hashes for these require 'unsafe-hashes' (and modern browsers may use style-src-attr/script-src-attr)."
)
if nonce_detected:
notes.append(
"Nonce detected: replace {NONCE} per HTML response (server must generate and inject nonce)."
)
directives_out = {k: sorted(v) for k, v in directives.items() if v}
return CrawlResult(
visited=sorted(visited),
csp=csp,
nonce_detected=nonce_detected,
directives=directives_out,
notes=notes,
)
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
ap = argparse.ArgumentParser(
prog="csp-crawl",
description="Crawl up to N pages (same-origin) with Playwright and generate a draft CSP.",
)
ap.add_argument("url", help="Start URL (e.g. https://example.com)")
ap.add_argument(
"--max-pages",
type=int,
default=10,
help="Maximum number of pages to visit (default: 10)",
)
ap.add_argument(
"--timeout-ms",
type=int,
default=20000,
help="Navigation timeout in ms (default: 20000)",
)
ap.add_argument(
"--settle-ms",
type=int,
default=1500,
help="Extra time after networkidle to allow hydration/delayed requests (default: 1500)",
)
ap.add_argument(
"--headed",
action="store_true",
help="Run with a visible browser window (not headless)",
)
ap.add_argument(
"--no-install",
action="store_true",
help="Do not auto-install Chromium if missing",
)
ap.add_argument(
"--with-deps",
action="store_true",
help="When installing, include Playwright OS deps (Linux). May require elevated privileges.",
)
ap.add_argument(
"--browsers-path",
default=None,
help="Directory to install/playwright browsers (default: ./.pw-browsers).",
)
ap.add_argument(
"--allow-blob",
action="store_true",
help="Include blob: in common directives (drafty)",
)
ap.add_argument(
"--unsafe-eval",
action="store_true",
help="Include 'unsafe-eval' in script-src (not recommended)",
)
ap.add_argument(
"--upgrade-insecure-requests",
action="store_true",
help="Add upgrade-insecure-requests directive",
)
ap.add_argument(
"--include-sourcemaps",
action="store_true",
default=False,
help="Analyze JS/CSS for sourceMappingURL and add map origins to connect-src",
)
ap.add_argument(
"--json", action="store_true", help="Output JSON instead of a header line"
)
return ap.parse_args(argv)
def main(argv: list[str] | None = None) -> None:
args = _parse_args(argv)
browsers_path = Path(args.browsers_path).resolve() if args.browsers_path else None
result = asyncio.run(
crawl_and_generate_csp(
args.url,
max_pages=args.max_pages,
timeout_ms=args.timeout_ms,
settle_ms=args.settle_ms,
headless=not args.headed,
browsers_path=browsers_path,
auto_install=not args.no_install,
with_deps=args.with_deps,
allow_blob=args.allow_blob,
allow_unsafe_eval=args.unsafe_eval,
upgrade_insecure_requests=args.upgrade_insecure_requests,
include_sourcemaps=args.include_sourcemaps,
)
)
if args.json:
print(
json.dumps(
{
"visited": result.visited,
"nonce_detected": result.nonce_detected,
"csp": result.csp,
"directives": result.directives,
"notes": result.notes,
},
indent=2,
sort_keys=True,
)
)
return
# Default: print header + visited pages as comments.
for u in result.visited:
print(f"# visited: {u}")
for n in result.notes:
print(f"# NOTE: {n}")
print("Content-Security-Policy:", result.csp)
if __name__ == "__main__":
main()