819 lines
28 KiB
Python
819 lines
28 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import re
|
|
from collections import deque
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin, urldefrag, urlparse
|
|
|
|
from playwright.async_api import (
|
|
async_playwright,
|
|
TimeoutError as PlaywrightTimeoutError,
|
|
)
|
|
|
|
from .ensure_playwright import ensure_chromium_installed
|
|
|
|
RESOURCE_TO_DIRECTIVE = {
|
|
"script": "script-src",
|
|
"stylesheet": "style-src",
|
|
"image": "img-src",
|
|
"font": "font-src",
|
|
"media": "media-src",
|
|
"xhr": "connect-src",
|
|
"fetch": "connect-src",
|
|
"websocket": "connect-src",
|
|
"eventsource": "connect-src",
|
|
}
|
|
|
|
BASELINE_DIRECTIVES = {
|
|
"default-src": {"'self'"},
|
|
"base-uri": {"'self'"},
|
|
"object-src": {"'none'"},
|
|
"frame-ancestors": {"'self'"},
|
|
"form-action": {"'self'"},
|
|
}
|
|
|
|
|
|
def origin_of(url: str) -> str:
|
|
p = urlparse(url)
|
|
if not p.scheme or not p.netloc:
|
|
return ""
|
|
return f"{p.scheme}://{p.netloc}"
|
|
|
|
|
|
def sha256_base64(s: str) -> str:
|
|
h = hashlib.sha256(s.encode("utf-8")).digest()
|
|
return base64.b64encode(h).decode("ascii")
|
|
|
|
|
|
def normalize_csp_string(csp: str) -> str:
|
|
s = (csp or "").strip()
|
|
if not s:
|
|
return s
|
|
return s if s.endswith(";") else s + ";"
|
|
|
|
|
|
async def collect_inline(page, *, max_attr_hashes: int = 2000):
|
|
"""
|
|
Collect inline <script> (no src), <style> blocks, plus:
|
|
- style="..." attributes (CSP3 style-src-attr / unsafe-hashes)
|
|
- inline event handler attributes (onclick="...", onload="...", etc) (CSP3 script-src-attr / unsafe-hashes)
|
|
|
|
IMPORTANT: Hashes must be computed over the EXACT string bytes. Do NOT strip.
|
|
"""
|
|
data = await page.evaluate(
|
|
"""(maxAttr) => {
|
|
const inlineScripts = [...document.querySelectorAll('script:not([src])')]
|
|
.map(s => ({
|
|
nonce: s.nonce || s.getAttribute('nonce') || null,
|
|
text: s.textContent ?? ''
|
|
}));
|
|
|
|
const inlineStyles = [...document.querySelectorAll('style')]
|
|
.map(st => ({
|
|
nonce: st.nonce || st.getAttribute('nonce') || null,
|
|
text: st.textContent ?? ''
|
|
}));
|
|
|
|
const styleAttrs = [];
|
|
const handlerAttrs = [];
|
|
|
|
// style="..."
|
|
for (const el of document.querySelectorAll('[style]')) {
|
|
if (styleAttrs.length >= maxAttr) break;
|
|
const v = el.getAttribute('style');
|
|
if (v !== null) styleAttrs.push(v);
|
|
}
|
|
|
|
// inline event handlers: on*
|
|
// Iterate elements and look for attributes starting with "on"
|
|
const all = document.querySelectorAll('*');
|
|
for (let i = 0; i < all.length; i++) {
|
|
if (handlerAttrs.length >= maxAttr) break;
|
|
const el = all[i];
|
|
const names = el.getAttributeNames ? el.getAttributeNames() : [];
|
|
for (const name of names) {
|
|
if (handlerAttrs.length >= maxAttr) break;
|
|
if (name && name.toLowerCase().startsWith('on')) {
|
|
const v = el.getAttribute(name);
|
|
if (v !== null) handlerAttrs.push(v);
|
|
}
|
|
}
|
|
}
|
|
|
|
const dataImgs = [...document.querySelectorAll('img[src^="data:"]')].length > 0;
|
|
const dataFonts = [...document.querySelectorAll('link[rel="preload"][as="font"][href^="data:"]')].length > 0;
|
|
|
|
return { inlineScripts, inlineStyles, styleAttrs, handlerAttrs, dataImgs, dataFonts };
|
|
}""",
|
|
max_attr_hashes,
|
|
)
|
|
|
|
script_nonces = {x["nonce"] for x in data["inlineScripts"] if x.get("nonce")}
|
|
style_nonces = {x["nonce"] for x in data["inlineStyles"] if x.get("nonce")}
|
|
|
|
script_hashes = set()
|
|
for x in data["inlineScripts"]:
|
|
raw = x.get("text") or ""
|
|
if raw.strip(): # skip pure-whitespace blocks, but DO NOT strip for hashing
|
|
script_hashes.add(f"'sha256-{sha256_base64(raw)}'")
|
|
|
|
style_hashes = set()
|
|
for x in data["inlineStyles"]:
|
|
raw = x.get("text") or ""
|
|
if raw.strip():
|
|
style_hashes.add(f"'sha256-{sha256_base64(raw)}'")
|
|
|
|
# style="..." attribute hashes
|
|
style_attr_hashes = set()
|
|
for v in data.get("styleAttrs") or []:
|
|
if isinstance(v, str) and v.strip():
|
|
style_attr_hashes.add(f"'sha256-{sha256_base64(v)}'")
|
|
|
|
# on*="..." handler hashes
|
|
handler_attr_hashes = set()
|
|
for v in data.get("handlerAttrs") or []:
|
|
if isinstance(v, str) and v.strip():
|
|
handler_attr_hashes.add(f"'sha256-{sha256_base64(v)}'")
|
|
|
|
return (
|
|
script_nonces,
|
|
style_nonces,
|
|
script_hashes,
|
|
style_hashes,
|
|
style_attr_hashes,
|
|
handler_attr_hashes,
|
|
bool(data.get("dataImgs")),
|
|
bool(data.get("dataFonts")),
|
|
)
|
|
|
|
|
|
async def extract_links(page, base_origin: str) -> list[str]:
|
|
hrefs = await page.evaluate(
|
|
"""() => [...document.querySelectorAll('a[href]')].map(a => a.getAttribute('href'))"""
|
|
)
|
|
out: list[str] = []
|
|
for href in hrefs or []:
|
|
if not href:
|
|
continue
|
|
abs_url = urljoin(base_origin + "/", href)
|
|
abs_url, _frag = urldefrag(abs_url)
|
|
p = urlparse(abs_url)
|
|
if p.scheme in ("http", "https") and origin_of(abs_url) == base_origin:
|
|
out.append(abs_url)
|
|
return out
|
|
|
|
|
|
def build_csp(
|
|
directives: dict[str, set[str]],
|
|
*,
|
|
base_origin: str,
|
|
nonce_detected: bool,
|
|
script_hashes: set[str],
|
|
style_hashes: set[str],
|
|
style_attr_hashes: set[str],
|
|
handler_attr_hashes: set[str],
|
|
allow_data_img: bool,
|
|
allow_data_font: bool,
|
|
allow_blob: bool,
|
|
allow_unsafe_eval: bool,
|
|
upgrade_insecure_requests: bool,
|
|
) -> str:
|
|
csp: dict[str, set[str]] = {k: set(v) for k, v in BASELINE_DIRECTIVES.items()}
|
|
|
|
# Merge observed origins into directives.
|
|
for d, vals in directives.items():
|
|
if vals:
|
|
csp.setdefault(d, set()).update(vals)
|
|
|
|
# Always keep 'self' on these directives if present.
|
|
for d in (
|
|
"script-src",
|
|
"style-src",
|
|
"img-src",
|
|
"connect-src",
|
|
"font-src",
|
|
"media-src",
|
|
"frame-src",
|
|
):
|
|
if d in csp:
|
|
csp[d].add("'self'")
|
|
|
|
# Inline handling:
|
|
# - If we detected nonce attributes, emit nonce *template*. You must replace {NONCE} per response.
|
|
if nonce_detected:
|
|
csp.setdefault("script-src", {"'self'"}).add("'nonce-{NONCE}'")
|
|
csp.setdefault("style-src", {"'self'"}).add("'nonce-{NONCE}'")
|
|
|
|
# Hashes for inline <script>/<style> blocks
|
|
if script_hashes:
|
|
csp.setdefault("script-src", {"'self'"}).update(script_hashes)
|
|
if style_hashes:
|
|
csp.setdefault("style-src", {"'self'"}).update(style_hashes)
|
|
|
|
# unsafe-hashes: needed for style="" and on*="" attribute hashes (CSP3 behavior)
|
|
# We include hashes BOTH in the base directives and the CSP3 *-attr directives for best compatibility.
|
|
if handler_attr_hashes:
|
|
csp.setdefault("script-src", {"'self'"}).add("'unsafe-hashes'")
|
|
csp["script-src"].update(handler_attr_hashes)
|
|
csp.setdefault("script-src-attr", set()).update({"'unsafe-hashes'"})
|
|
csp["script-src-attr"].update(handler_attr_hashes)
|
|
|
|
if style_attr_hashes:
|
|
csp.setdefault("style-src", {"'self'"}).add("'unsafe-hashes'")
|
|
csp["style-src"].update(style_attr_hashes)
|
|
csp.setdefault("style-src-attr", set()).update({"'unsafe-hashes'"})
|
|
csp["style-src-attr"].update(style_attr_hashes)
|
|
|
|
if allow_unsafe_eval:
|
|
csp.setdefault("script-src", {"'self'"}).add("'unsafe-eval'")
|
|
|
|
if allow_data_img:
|
|
csp.setdefault("img-src", {"'self'"}).add("data:")
|
|
if allow_data_font:
|
|
csp.setdefault("font-src", {"'self'"}).add("data:")
|
|
|
|
if allow_blob:
|
|
for d in ("img-src", "media-src", "worker-src", "connect-src"):
|
|
csp.setdefault(d, {"'self'"}).add("blob:")
|
|
|
|
if upgrade_insecure_requests:
|
|
csp["upgrade-insecure-requests"] = set()
|
|
|
|
# Serialize
|
|
parts: list[str] = []
|
|
for k in sorted(csp.keys()):
|
|
vals = csp[k]
|
|
if vals:
|
|
parts.append(f"{k} {' '.join(sorted(vals))}")
|
|
else:
|
|
parts.append(f"{k}")
|
|
return "; ".join(parts) + ";"
|
|
|
|
|
|
_SOURCEMAP_RE = re.compile(r"sourceMappingURL\s*=\s*([^\s*]+)", re.IGNORECASE)
|
|
|
|
|
|
def _looks_like_js_or_css(url: str) -> bool:
|
|
p = urlparse(url)
|
|
path = (p.path or "").lower()
|
|
return path.endswith(".js") or path.endswith(".css")
|
|
|
|
|
|
def _extract_sourcemap_origin(
|
|
asset_url: str, body_bytes: bytes, headers: dict
|
|
) -> set[str]:
|
|
out: set[str] = set()
|
|
|
|
# Header-based pointers
|
|
sm = headers.get("sourcemap") or headers.get("x-sourcemap")
|
|
if sm:
|
|
map_url = urljoin(asset_url, sm)
|
|
out.add(origin_of(map_url))
|
|
|
|
# Body-based pointer: map comment is usually near end, so just scan the tail
|
|
tail = body_bytes[
|
|
-200_000:
|
|
] # big enough to survive minification/compression quirks
|
|
text = tail.decode("utf-8", errors="ignore")
|
|
|
|
m = _SOURCEMAP_RE.search(text)
|
|
if not m:
|
|
return {o for o in out if o}
|
|
|
|
ref = m.group(1).strip().strip('"').strip("'")
|
|
if ref and not ref.startswith("data:"):
|
|
map_url = urljoin(asset_url, ref)
|
|
out.add(origin_of(map_url))
|
|
|
|
return {o for o in out if o}
|
|
|
|
|
|
@dataclass
|
|
class CrawlResult:
|
|
visited: list[str]
|
|
csp: str
|
|
nonce_detected: bool
|
|
directives: dict[str, list[str]]
|
|
notes: list[str]
|
|
violations: list[dict]
|
|
|
|
|
|
async def crawl_and_generate_csp(
|
|
start_url: str,
|
|
*,
|
|
max_pages: int = 10,
|
|
timeout_ms: int = 20000,
|
|
settle_ms: int = 1500,
|
|
headless: bool = True,
|
|
browsers_path: Path | None = None,
|
|
auto_install: bool = True,
|
|
with_deps: bool = False,
|
|
allow_blob: bool = False,
|
|
allow_unsafe_eval: bool = False,
|
|
upgrade_insecure_requests: bool = False,
|
|
include_sourcemaps: bool = False,
|
|
ignore_non_html: bool = False,
|
|
bypass_csp: bool = False,
|
|
evaluate: str | None = None, # CSP string to inject as Report-Only and evaluate
|
|
) -> CrawlResult:
|
|
start_url, _ = urldefrag(start_url)
|
|
base_origin = origin_of(start_url)
|
|
if not base_origin:
|
|
raise ValueError(f"Invalid start URL: {start_url}")
|
|
|
|
if auto_install:
|
|
await ensure_chromium_installed(
|
|
browsers_path=browsers_path, with_deps=with_deps
|
|
)
|
|
|
|
visited: set[str] = set()
|
|
q: deque[str] = deque([start_url])
|
|
|
|
# Collect CSP ingredients
|
|
directives: dict[str, set[str]] = {
|
|
d: set() for d in set(RESOURCE_TO_DIRECTIVE.values()) | {"frame-src"}
|
|
}
|
|
script_hashes: set[str] = set()
|
|
style_hashes: set[str] = set()
|
|
style_attr_hashes: set[str] = set()
|
|
handler_attr_hashes: set[str] = set()
|
|
nonce_detected = False
|
|
allow_data_img = False
|
|
allow_data_font = False
|
|
notes: list[str] = []
|
|
|
|
evaluate_policy = normalize_csp_string(evaluate) if evaluate else None
|
|
# Captured CSP violations (Report-Only) when --evaluate is used.
|
|
violations: list[dict] = []
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(headless=headless)
|
|
context = await browser.new_context()
|
|
|
|
# Optionally strip any existing CSP headers, and/or inject a Report-Only CSP for evaluation.
|
|
# NOTE: This operates on *document response headers* only.
|
|
if bypass_csp or evaluate_policy:
|
|
|
|
async def _route_handler(route, request):
|
|
try:
|
|
if request.resource_type != "document":
|
|
return await route.continue_()
|
|
|
|
# IMPORTANT: Don't rewrite CSP on third-party iframe/object documents.
|
|
# Otherwise --evaluate / --bypass-csp will mutate embedded origins
|
|
# (e.g. asciinema.org) and produce bogus frame-ancestors violations.
|
|
req_origin = origin_of(request.url)
|
|
if not req_origin or req_origin != base_origin:
|
|
return await route.continue_()
|
|
|
|
resp = await route.fetch()
|
|
hdrs = {k.lower(): v for k, v in (resp.headers or {}).items()}
|
|
|
|
# Only treat actual HTML documents as candidates for CSP header rewriting.
|
|
# (Playwright classifies iframe navigations as "document" even when non-HTML.)
|
|
ct = (hdrs.get("content-type") or "").lower()
|
|
is_html = ("text/html" in ct) or ("application/xhtml+xml" in ct)
|
|
if not is_html:
|
|
return await route.fulfill(response=resp)
|
|
|
|
if bypass_csp:
|
|
hdrs.pop("content-security-policy", None)
|
|
hdrs.pop("content-security-policy-report-only", None)
|
|
|
|
if evaluate_policy:
|
|
hdrs["content-security-policy-report-only"] = evaluate_policy
|
|
|
|
try:
|
|
return await route.fulfill(response=resp, headers=hdrs)
|
|
except TypeError:
|
|
body = await resp.body()
|
|
return await route.fulfill(
|
|
status=resp.status, headers=hdrs, body=body
|
|
)
|
|
except Exception:
|
|
try:
|
|
return await route.continue_()
|
|
except Exception:
|
|
return
|
|
|
|
await context.route("**/*", _route_handler)
|
|
|
|
def on_request(req):
|
|
"""
|
|
Playwright sometimes classifies "connect-like" activity as resource_type == "other".
|
|
Heuristic: treat resource_type=="other" with sec-fetch-dest=="empty" as connect-src.
|
|
"""
|
|
try:
|
|
url = req.url
|
|
parsed = urlparse(url)
|
|
if parsed.scheme not in ("http", "https", "ws", "wss"):
|
|
return
|
|
|
|
rtype = req.resource_type
|
|
directive = RESOURCE_TO_DIRECTIVE.get(rtype)
|
|
|
|
if directive is None and rtype == "other":
|
|
hdrs = {k.lower(): v for k, v in (req.headers or {}).items()}
|
|
# For fetch/xhr/beacon/pings, browsers typically send: sec-fetch-dest: empty
|
|
if (hdrs.get("sec-fetch-dest") or "").lower() == "empty":
|
|
directive = "connect-src"
|
|
|
|
if directive is None:
|
|
return
|
|
|
|
req_origin = origin_of(url)
|
|
if req_origin and req_origin != base_origin:
|
|
directives.setdefault(directive, set()).add(req_origin)
|
|
except Exception:
|
|
return
|
|
|
|
context.on("request", on_request)
|
|
|
|
max_queue = max_pages * 20
|
|
|
|
while q and len(visited) < max_pages:
|
|
url = q.popleft()
|
|
if url in visited:
|
|
continue
|
|
visited.add(url)
|
|
|
|
page = await context.new_page()
|
|
|
|
# If evaluating a candidate CSP, capture Report-Only violations.
|
|
if evaluate_policy:
|
|
|
|
def _record_violation(_source, payload):
|
|
try:
|
|
if (
|
|
isinstance(payload, dict)
|
|
and payload.get("disposition") == "report"
|
|
):
|
|
violations.append(payload)
|
|
except Exception:
|
|
return
|
|
|
|
try:
|
|
await page.expose_binding("__cspresso_violation", _record_violation)
|
|
await page.add_init_script(
|
|
"() => { try { window.addEventListener('securitypolicyviolation', (e) => { "
|
|
"const payload = {documentURI:e.documentURI, referrer:e.referrer, blockedURI:e.blockedURI, "
|
|
"violatedDirective:e.violatedDirective, effectiveDirective:e.effectiveDirective, originalPolicy:e.originalPolicy, "
|
|
"disposition:e.disposition, sourceFile:e.sourceFile, lineNumber:e.lineNumber, columnNumber:e.columnNumber, "
|
|
"statusCode:e.statusCode, sample:e.sample}; "
|
|
"if (typeof window.__cspresso_violation === 'function') { window.__cspresso_violation(payload); }"
|
|
"}, true); } catch(_){} }"
|
|
)
|
|
except Exception:
|
|
pass # nosec
|
|
|
|
def _on_console(msg):
|
|
try:
|
|
t = msg.text or ""
|
|
tl = t.lower()
|
|
if (
|
|
"content security policy" in tl
|
|
or "content-security-policy" in tl
|
|
) and (
|
|
"would violate" in tl
|
|
or "report-only" in tl
|
|
or "report only" in tl
|
|
):
|
|
violations.append(
|
|
{
|
|
"console": True,
|
|
"type": msg.type,
|
|
"text": t,
|
|
"documentURI": page.url,
|
|
"disposition": "report",
|
|
}
|
|
)
|
|
except Exception:
|
|
return
|
|
|
|
page.on("console", _on_console)
|
|
|
|
pending: set[asyncio.Task] = set()
|
|
|
|
if include_sourcemaps:
|
|
|
|
async def handle_response(resp):
|
|
try:
|
|
url = resp.url
|
|
if not _looks_like_js_or_css(url):
|
|
return
|
|
|
|
headers = {
|
|
k.lower(): v for k, v in (resp.headers or {}).items()
|
|
}
|
|
|
|
# Read the *actual* bytes the browser received
|
|
body = await resp.body()
|
|
origins = _extract_sourcemap_origin(url, body, headers)
|
|
|
|
for o in origins:
|
|
if o and o != base_origin:
|
|
directives.setdefault("connect-src", set()).add(o)
|
|
|
|
except Exception:
|
|
return
|
|
|
|
def on_response(resp):
|
|
t = asyncio.create_task(handle_response(resp))
|
|
pending.add(t)
|
|
t.add_done_callback(lambda _t: pending.discard(_t))
|
|
|
|
page.on("response", on_response)
|
|
|
|
try:
|
|
resp = await page.goto(url, wait_until="load", timeout=timeout_ms)
|
|
try:
|
|
await page.wait_for_load_state(
|
|
"networkidle", timeout=min(5000, timeout_ms)
|
|
)
|
|
except PlaywrightTimeoutError:
|
|
notes.append(
|
|
f"Timed out waiting for networkidle on {url}; continuing after load anyway."
|
|
)
|
|
|
|
ct = ""
|
|
if resp is not None:
|
|
ct = (await resp.header_value("content-type") or "").lower()
|
|
|
|
is_html = ("text/html" in ct) or ("application/xhtml+xml" in ct)
|
|
if not is_html and ignore_non_html:
|
|
# Still count as visited, but don't hash inline attrs / don't extract links.
|
|
continue
|
|
|
|
# Give the page a moment to run hydration / delayed fetches.
|
|
if settle_ms > 0:
|
|
await page.wait_for_timeout(settle_ms)
|
|
|
|
(
|
|
s_nonces,
|
|
st_nonces,
|
|
s_hashes,
|
|
st_hashes,
|
|
st_attr_hashes,
|
|
h_attr_hashes,
|
|
has_data_img,
|
|
has_data_font,
|
|
) = await collect_inline(page)
|
|
|
|
if include_sourcemaps and pending:
|
|
# Give the handler a moment to finish reading bodies
|
|
await asyncio.wait(pending, timeout=5.0)
|
|
|
|
if s_nonces or st_nonces:
|
|
nonce_detected = True
|
|
script_hashes.update(s_hashes)
|
|
style_hashes.update(st_hashes)
|
|
style_attr_hashes.update(st_attr_hashes)
|
|
handler_attr_hashes.update(h_attr_hashes)
|
|
|
|
allow_data_img = allow_data_img or has_data_img
|
|
allow_data_font = allow_data_font or has_data_font
|
|
|
|
# Frame destinations
|
|
for fr in page.frames:
|
|
if fr.url and fr.url != "about:blank":
|
|
fr_origin = origin_of(fr.url)
|
|
if fr_origin and fr_origin != base_origin:
|
|
directives["frame-src"].add(fr_origin)
|
|
|
|
# Enqueue same-origin links
|
|
links = await extract_links(page, base_origin)
|
|
for link in links:
|
|
if link not in visited and link not in q and len(q) < max_queue:
|
|
q.append(link)
|
|
|
|
finally:
|
|
await page.close()
|
|
|
|
await browser.close()
|
|
|
|
csp = build_csp(
|
|
directives=directives,
|
|
base_origin=base_origin,
|
|
nonce_detected=nonce_detected,
|
|
script_hashes=script_hashes,
|
|
style_hashes=style_hashes,
|
|
style_attr_hashes=style_attr_hashes,
|
|
handler_attr_hashes=handler_attr_hashes,
|
|
allow_data_img=allow_data_img,
|
|
allow_data_font=allow_data_font,
|
|
allow_blob=allow_blob,
|
|
allow_unsafe_eval=allow_unsafe_eval,
|
|
upgrade_insecure_requests=upgrade_insecure_requests,
|
|
)
|
|
|
|
if style_attr_hashes or handler_attr_hashes:
|
|
notes.append(
|
|
'Detected inline attribute code (style="..." and/or on*="..."). '
|
|
"Hashes for these require 'unsafe-hashes' (and modern browsers may use style-src-attr/script-src-attr)."
|
|
)
|
|
if nonce_detected:
|
|
notes.append(
|
|
"Nonce detected: replace {NONCE} per HTML response (server must generate and inject nonce)."
|
|
)
|
|
|
|
directives_out = {k: sorted(v) for k, v in directives.items() if v}
|
|
|
|
# De-duplicate violations (same doc+directive+blocked URI) to keep output stable.
|
|
if violations:
|
|
seen = set()
|
|
uniq: list[dict] = []
|
|
for v in violations:
|
|
if not isinstance(v, dict):
|
|
continue
|
|
key = (
|
|
v.get("documentURI"),
|
|
v.get("effectiveDirective") or v.get("violatedDirective"),
|
|
v.get("blockedURI"),
|
|
v.get("sourceFile"),
|
|
v.get("lineNumber"),
|
|
v.get("columnNumber"),
|
|
)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
uniq.append(v)
|
|
violations = uniq
|
|
|
|
return CrawlResult(
|
|
visited=sorted(visited),
|
|
csp=csp,
|
|
nonce_detected=nonce_detected,
|
|
directives=directives_out,
|
|
notes=notes,
|
|
violations=violations,
|
|
)
|
|
|
|
|
|
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
|
ap = argparse.ArgumentParser(
|
|
prog="cspresso",
|
|
description="Crawl up to N pages (same-origin) with Playwright and generate a draft CSP.",
|
|
)
|
|
ap.add_argument("url", help="Start URL (e.g. https://example.com)")
|
|
ap.add_argument(
|
|
"--max-pages",
|
|
type=int,
|
|
default=10,
|
|
help="Maximum number of pages to visit (default: 10)",
|
|
)
|
|
ap.add_argument(
|
|
"--timeout-ms",
|
|
type=int,
|
|
default=20000,
|
|
help="Navigation timeout in ms (default: 20000)",
|
|
)
|
|
ap.add_argument(
|
|
"--settle-ms",
|
|
type=int,
|
|
default=1500,
|
|
help="Extra time after networkidle to allow hydration/delayed requests (default: 1500)",
|
|
)
|
|
|
|
ap.add_argument(
|
|
"--headed",
|
|
action="store_true",
|
|
help="Run with a visible browser window (not headless)",
|
|
)
|
|
|
|
ap.add_argument(
|
|
"--no-install",
|
|
action="store_true",
|
|
help="Do not auto-install Chromium if missing",
|
|
)
|
|
ap.add_argument(
|
|
"--with-deps",
|
|
action="store_true",
|
|
help="When installing, include Playwright OS deps (Linux). May require elevated privileges.",
|
|
)
|
|
ap.add_argument(
|
|
"--browsers-path",
|
|
default=None,
|
|
help="Directory to install/playwright browsers (default: ./.pw-browsers).",
|
|
)
|
|
|
|
ap.add_argument(
|
|
"--allow-blob",
|
|
action="store_true",
|
|
help="Include blob: in common directives (drafty)",
|
|
)
|
|
ap.add_argument(
|
|
"--unsafe-eval",
|
|
action="store_true",
|
|
help="Include 'unsafe-eval' in script-src (not recommended)",
|
|
)
|
|
ap.add_argument(
|
|
"--upgrade-insecure-requests",
|
|
action="store_true",
|
|
help="Add upgrade-insecure-requests directive",
|
|
)
|
|
ap.add_argument(
|
|
"--include-sourcemaps",
|
|
action="store_true",
|
|
default=False,
|
|
help="Analyze JS/CSS for sourceMappingURL and add map origins to connect-src",
|
|
)
|
|
|
|
ap.add_argument(
|
|
"--bypass-csp",
|
|
action="store_true",
|
|
help="Strip any existing CSP/CSP-Report-Only response headers from HTML documents (useful for discovery or evaluation).",
|
|
)
|
|
ap.add_argument(
|
|
"--evaluate",
|
|
metavar="CSP",
|
|
default=None,
|
|
help="Inject the provided CSP string as Content-Security-Policy-Report-Only on HTML documents and exit 1 if any Report-Only violations are detected. Quote the value.",
|
|
)
|
|
ap.add_argument(
|
|
"--ignore-non-html",
|
|
action="store_true",
|
|
default=False,
|
|
help="Ignore non-HTML pages that get crawled (which might trigger Chromium's word-wrap hash: https://stackoverflow.com/a/69838710)",
|
|
)
|
|
ap.add_argument(
|
|
"--json", action="store_true", help="Output JSON instead of a header line"
|
|
)
|
|
return ap.parse_args(argv)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
args = _parse_args(argv)
|
|
browsers_path = Path(args.browsers_path).resolve() if args.browsers_path else None
|
|
|
|
result = asyncio.run(
|
|
crawl_and_generate_csp(
|
|
args.url,
|
|
max_pages=args.max_pages,
|
|
timeout_ms=args.timeout_ms,
|
|
settle_ms=args.settle_ms,
|
|
headless=not args.headed,
|
|
browsers_path=browsers_path,
|
|
auto_install=not args.no_install,
|
|
with_deps=args.with_deps,
|
|
allow_blob=args.allow_blob,
|
|
allow_unsafe_eval=args.unsafe_eval,
|
|
upgrade_insecure_requests=args.upgrade_insecure_requests,
|
|
include_sourcemaps=args.include_sourcemaps,
|
|
bypass_csp=args.bypass_csp,
|
|
evaluate=args.evaluate,
|
|
ignore_non_html=args.ignore_non_html,
|
|
)
|
|
)
|
|
|
|
if args.json:
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"visited": result.visited,
|
|
"nonce_detected": result.nonce_detected,
|
|
"csp": result.csp,
|
|
"directives": result.directives,
|
|
"notes": result.notes,
|
|
"violations": result.violations,
|
|
"evaluated_policy": args.evaluate,
|
|
},
|
|
indent=2,
|
|
sort_keys=True,
|
|
)
|
|
)
|
|
return 1 if (args.evaluate and result.violations) else 0
|
|
|
|
# Default: print header + visited pages as comments.
|
|
for u in result.visited:
|
|
print(f"# visited: {u}")
|
|
for n in result.notes:
|
|
print(f"# NOTE: {n}")
|
|
print("Content-Security-Policy:", result.csp)
|
|
|
|
if args.evaluate:
|
|
if result.violations:
|
|
print("# CSP Report-Only violations detected:")
|
|
for v in result.violations:
|
|
try:
|
|
blocked = v.get("blockedURI")
|
|
eff = v.get("effectiveDirective") or v.get("violatedDirective")
|
|
doc = v.get("documentURI")
|
|
print(f"# - {eff} blocked={blocked} on {doc}")
|
|
except Exception:
|
|
print(f"# - {v}")
|
|
return 1
|
|
return 0
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
sys.exit(main())
|