diff --git a/docs/web.rst b/docs/web.rst index 00066ccdf..3b8153b30 100644 --- a/docs/web.rst +++ b/docs/web.rst @@ -151,6 +151,7 @@ The `Application` object serving this request + .. automethod:: RequestHandler.check_allowed_origin .. automethod:: RequestHandler.check_etag_header .. automethod:: RequestHandler.check_xsrf_cookie .. automethod:: RequestHandler.compute_etag diff --git a/tornado/test/web_test.py b/tornado/test/web_test.py index 36049e26b..fa4ed119a 100644 --- a/tornado/test/web_test.py +++ b/tornado/test/web_test.py @@ -3195,6 +3195,44 @@ def test_xsrf_httponly(self): self.assertTrue(abs((expires - header_expires).total_seconds()) < 10) +class CheckSameOriginTest(SimpleHandlerTestCase): + class Handler(RequestHandler): + def post(self): + self.write("ok") + + def get_app_kwargs(self): + return dict(check_allowed_origin=True) + + def _post(self, headers): + return self.fetch("/", method="POST", body="x=1", headers=headers) + + def test_sec_fetch_site_success(self): + response = self._post({"Sec-Fetch-Site": "same-origin"}) + self.assertEqual(response.code, 200) + + def test_sec_fetch_site_fail(self): + with ExpectLog(gen_log, ".*Cross-origin request"): + response = self._post({"Sec-Fetch-Site": "cross-site"}) + self.assertEqual(response.code, 403) + + def test_fallback_success(self): + response = self._post({"Origin": self.get_url("")}) + self.assertEqual(response.code, 200) + + def test_fallback_referrer_success(self): + response = self._post({"Referrer": self.get_url("/foo/bar")}) + self.assertEqual(response.code, 200) + + def test_fallback_fail(self): + with ExpectLog(gen_log, ".*Cross-origin request"): + response = self._post({"Origin": "https://evil.example.com/"}) + self.assertEqual(response.code, 403) + + def test_fallback_no_origin(self): + response = self._post({}) + self.assertEqual(response.code, 200) + + class FinishExceptionTest(SimpleHandlerTestCase): class Handler(RequestHandler): def get(self): diff --git a/tornado/web.py b/tornado/web.py index 39a060f68..ff3d357ff 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -1692,6 +1692,40 @@ def xsrf_form_html(self) -> str: + '"/>' ) + def check_allowed_origin(self) -> None: + """Check if a request should be rejected as cross-origin. + + If the setting check_allowed_origin is True, this is called for non-safe + HTTP requests (i.e. not GET, HEAD or OPTIONS). It raises HTTPError 403 + to reject a request. + """ + origin = self.request.headers.get("Origin") + if origin is not None: + origin = origin.lower() + if origin in self.application.settings.get("allowed_origins", ()): + return # Origin in explicit allowlist + + if (sfs := self.request.headers.get("Sec-Fetch-Site")) is not None: + # All major browsers send the Sec-Fetch-Site header since ~2023 + # for 'potentially trustworthy' URLs (roughly, HTTPS or localhost) + if sfs in ("same-origin", "none"): + return # OK according to Sec-Fetch-Site + raise HTTPError( + 403, + f"Cross-origin request with unsafe method (Sec-Fetch-Site: {sfs})", + ) + + if origin is None: # Sec-Fetch-Site must also be missing to reach here + return # Probably a non-browser request + + host = self.request.headers.get("Host") + if urllib.parse.urlsplit(origin).netloc != host: + raise HTTPError( + 403, + f"Cross-origin request with unsafe method (Origin {origin!r} does not " + f"match Host {host!r} or allowed_origins)", + ) + def static_url( self, path: str, include_host: bool | None = None, **kwargs: Any ) -> str: @@ -1828,12 +1862,11 @@ async def _execute( } # If XSRF cookies are turned on, reject form submissions without # the proper cookie - if self.request.method not in ( - "GET", - "HEAD", - "OPTIONS", - ) and self.application.settings.get("xsrf_cookies"): - self.check_xsrf_cookie() + if self.request.method not in ("GET", "HEAD", "OPTIONS"): + if self.application.settings.get("xsrf_cookies"): + self.check_xsrf_cookie() + if self.application.settings.get("check_allowed_origin"): + self.check_allowed_origin() result = self.prepare() if result is not None: