* Add --bypass-csp option to ignore an existing enforcing CSP to avoid it skewing results
* Add `--evaluate` option to test a proposed CSP without needing to install it (best to use in conjunction with --bypass-csp`)
This commit is contained in:
parent
16cd1e4b40
commit
55a815564f
5 changed files with 234 additions and 8 deletions
|
|
@ -1,4 +1,5 @@
|
|||
import sys
|
||||
from .crawl import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
sys.exit(main())
|
||||
|
|
|
|||
|
|
@ -48,6 +48,13 @@ def sha256_base64(s: str) -> str:
|
|||
return base64.b64encode(h).decode("ascii")
|
||||
|
||||
|
||||
def normalize_csp_string(csp: str) -> str:
|
||||
s = (csp or "").strip()
|
||||
if not s:
|
||||
return s
|
||||
return s if s.endswith(";") else s + ";"
|
||||
|
||||
|
||||
async def collect_inline(page, *, max_attr_hashes: int = 2000):
|
||||
"""
|
||||
Collect inline <script> (no src), <style> blocks, plus:
|
||||
|
|
@ -291,6 +298,7 @@ class CrawlResult:
|
|||
nonce_detected: bool
|
||||
directives: dict[str, list[str]]
|
||||
notes: list[str]
|
||||
violations: list[dict]
|
||||
|
||||
|
||||
async def crawl_and_generate_csp(
|
||||
|
|
@ -308,6 +316,8 @@ async def crawl_and_generate_csp(
|
|||
upgrade_insecure_requests: bool = False,
|
||||
include_sourcemaps: bool = False,
|
||||
ignore_non_html: bool = False,
|
||||
bypass_csp: bool = False,
|
||||
evaluate: str | None = None, # CSP string to inject as Report-Only and evaluate
|
||||
) -> CrawlResult:
|
||||
start_url, _ = urldefrag(start_url)
|
||||
base_origin = origin_of(start_url)
|
||||
|
|
@ -335,10 +345,48 @@ async def crawl_and_generate_csp(
|
|||
allow_data_font = False
|
||||
notes: list[str] = []
|
||||
|
||||
evaluate_policy = normalize_csp_string(evaluate) if evaluate else None
|
||||
# Captured CSP violations (Report-Only) when --evaluate is used.
|
||||
violations: list[dict] = []
|
||||
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=headless)
|
||||
context = await browser.new_context()
|
||||
|
||||
# Optionally strip any existing CSP headers, and/or inject a Report-Only CSP for evaluation.
|
||||
# NOTE: This operates on *document response headers* only.
|
||||
if bypass_csp or evaluate_policy:
|
||||
|
||||
async def _route_handler(route, request):
|
||||
try:
|
||||
if request.resource_type != "document":
|
||||
return await route.continue_()
|
||||
|
||||
resp = await route.fetch()
|
||||
hdrs = {k.lower(): v for k, v in (resp.headers or {}).items()}
|
||||
|
||||
if bypass_csp:
|
||||
hdrs.pop("content-security-policy", None)
|
||||
hdrs.pop("content-security-policy-report-only", None)
|
||||
|
||||
if evaluate_policy:
|
||||
hdrs["content-security-policy-report-only"] = evaluate_policy
|
||||
|
||||
try:
|
||||
return await route.fulfill(response=resp, headers=hdrs)
|
||||
except TypeError:
|
||||
body = await resp.body()
|
||||
return await route.fulfill(
|
||||
status=resp.status, headers=hdrs, body=body
|
||||
)
|
||||
except Exception:
|
||||
try:
|
||||
return await route.continue_()
|
||||
except Exception:
|
||||
return
|
||||
|
||||
await context.route("**/*", _route_handler)
|
||||
|
||||
def on_request(req):
|
||||
"""
|
||||
Playwright sometimes classifies "connect-like" activity as resource_type == "other".
|
||||
|
|
@ -380,6 +428,59 @@ async def crawl_and_generate_csp(
|
|||
|
||||
page = await context.new_page()
|
||||
|
||||
# If evaluating a candidate CSP, capture Report-Only violations.
|
||||
if evaluate_policy:
|
||||
|
||||
def _record_violation(_source, payload):
|
||||
try:
|
||||
if (
|
||||
isinstance(payload, dict)
|
||||
and payload.get("disposition") == "report"
|
||||
):
|
||||
violations.append(payload)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
try:
|
||||
await page.expose_binding("__cspresso_violation", _record_violation)
|
||||
await page.add_init_script(
|
||||
"() => { try { window.addEventListener('securitypolicyviolation', (e) => { "
|
||||
"const payload = {documentURI:e.documentURI, referrer:e.referrer, blockedURI:e.blockedURI, "
|
||||
"violatedDirective:e.violatedDirective, effectiveDirective:e.effectiveDirective, originalPolicy:e.originalPolicy, "
|
||||
"disposition:e.disposition, sourceFile:e.sourceFile, lineNumber:e.lineNumber, columnNumber:e.columnNumber, "
|
||||
"statusCode:e.statusCode, sample:e.sample}; "
|
||||
"if (typeof window.__cspresso_violation === 'function') { window.__cspresso_violation(payload); }"
|
||||
"}, true); } catch(_){} }"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _on_console(msg):
|
||||
try:
|
||||
t = msg.text or ""
|
||||
tl = t.lower()
|
||||
if (
|
||||
"content security policy" in tl
|
||||
or "content-security-policy" in tl
|
||||
) and (
|
||||
"would violate" in tl
|
||||
or "report-only" in tl
|
||||
or "report only" in tl
|
||||
):
|
||||
violations.append(
|
||||
{
|
||||
"console": True,
|
||||
"type": msg.type,
|
||||
"text": t,
|
||||
"documentURI": page.url,
|
||||
"disposition": "report",
|
||||
}
|
||||
)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
page.on("console", _on_console)
|
||||
|
||||
pending: set[asyncio.Task] = set()
|
||||
|
||||
if include_sourcemaps:
|
||||
|
|
@ -499,12 +600,35 @@ async def crawl_and_generate_csp(
|
|||
)
|
||||
|
||||
directives_out = {k: sorted(v) for k, v in directives.items() if v}
|
||||
|
||||
# De-duplicate violations (same doc+directive+blocked URI) to keep output stable.
|
||||
if violations:
|
||||
seen = set()
|
||||
uniq: list[dict] = []
|
||||
for v in violations:
|
||||
if not isinstance(v, dict):
|
||||
continue
|
||||
key = (
|
||||
v.get("documentURI"),
|
||||
v.get("effectiveDirective") or v.get("violatedDirective"),
|
||||
v.get("blockedURI"),
|
||||
v.get("sourceFile"),
|
||||
v.get("lineNumber"),
|
||||
v.get("columnNumber"),
|
||||
)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
uniq.append(v)
|
||||
violations = uniq
|
||||
|
||||
return CrawlResult(
|
||||
visited=sorted(visited),
|
||||
csp=csp,
|
||||
nonce_detected=nonce_detected,
|
||||
directives=directives_out,
|
||||
notes=notes,
|
||||
violations=violations,
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -576,6 +700,18 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
|||
default=False,
|
||||
help="Analyze JS/CSS for sourceMappingURL and add map origins to connect-src",
|
||||
)
|
||||
|
||||
ap.add_argument(
|
||||
"--bypass-csp",
|
||||
action="store_true",
|
||||
help="Strip any existing CSP/CSP-Report-Only response headers from HTML documents (useful for discovery or evaluation).",
|
||||
)
|
||||
ap.add_argument(
|
||||
"--evaluate",
|
||||
metavar="CSP",
|
||||
default=None,
|
||||
help="Inject the provided CSP string as Content-Security-Policy-Report-Only on HTML documents and exit 1 if any Report-Only violations are detected. Quote the value.",
|
||||
)
|
||||
ap.add_argument(
|
||||
"--ignore-non-html",
|
||||
action="store_true",
|
||||
|
|
@ -588,7 +724,7 @@ def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
|
|||
return ap.parse_args(argv)
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> None:
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
args = _parse_args(argv)
|
||||
browsers_path = Path(args.browsers_path).resolve() if args.browsers_path else None
|
||||
|
||||
|
|
@ -606,6 +742,8 @@ def main(argv: list[str] | None = None) -> None:
|
|||
allow_unsafe_eval=args.unsafe_eval,
|
||||
upgrade_insecure_requests=args.upgrade_insecure_requests,
|
||||
include_sourcemaps=args.include_sourcemaps,
|
||||
bypass_csp=args.bypass_csp,
|
||||
evaluate=args.evaluate,
|
||||
ignore_non_html=args.ignore_non_html,
|
||||
)
|
||||
)
|
||||
|
|
@ -619,12 +757,14 @@ def main(argv: list[str] | None = None) -> None:
|
|||
"csp": result.csp,
|
||||
"directives": result.directives,
|
||||
"notes": result.notes,
|
||||
"violations": result.violations,
|
||||
"evaluated_policy": args.evaluate,
|
||||
},
|
||||
indent=2,
|
||||
sort_keys=True,
|
||||
)
|
||||
)
|
||||
return
|
||||
return 1 if (args.evaluate and result.violations) else 0
|
||||
|
||||
# Default: print header + visited pages as comments.
|
||||
for u in result.visited:
|
||||
|
|
@ -633,6 +773,24 @@ def main(argv: list[str] | None = None) -> None:
|
|||
print(f"# NOTE: {n}")
|
||||
print("Content-Security-Policy:", result.csp)
|
||||
|
||||
if args.evaluate:
|
||||
if result.violations:
|
||||
print("# CSP Report-Only violations detected:")
|
||||
for v in result.violations:
|
||||
try:
|
||||
blocked = v.get("blockedURI")
|
||||
eff = v.get("effectiveDirective") or v.get("violatedDirective")
|
||||
doc = v.get("documentURI")
|
||||
print(f"# - {eff} blocked={blocked} on {doc}")
|
||||
except Exception:
|
||||
print(f"# - {v}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
import sys
|
||||
|
||||
sys.exit(main())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue