diff --git a/Lib/http/server.py b/Lib/http/server.py index 160d3eefc7cbdf..bb1f1499ce5cc5 100644 --- a/Lib/http/server.py +++ b/Lib/http/server.py @@ -112,6 +112,11 @@ DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8" +def _validate_header_string(value): + """Validate header values preventing CRLF injection.""" + if isinstance(value, str) and ('\r' in value or '\n' in value): + raise ValueError('Invalid header name/value: contains CR or LF') + class HTTPServer(socketserver.TCPServer): allow_reuse_address = True # Seems to make sense in testing environment @@ -553,6 +558,8 @@ def send_response_only(self, code, message=None): def send_header(self, keyword, value): """Send a MIME header to the headers buffer.""" + _validate_header_string(keyword) + _validate_header_string(value) if self.request_version != 'HTTP/0.9': if not hasattr(self, '_headers_buffer'): self._headers_buffer = [] diff --git a/Lib/test/test_httpservers.py b/Lib/test/test_httpservers.py index 7da5e3a1957588..71f56e4f1dfe1b 100644 --- a/Lib/test/test_httpservers.py +++ b/Lib/test/test_httpservers.py @@ -1068,6 +1068,32 @@ def test_header_buffering_of_send_header(self): self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') self.assertEqual(output.numWrites, 1) + def test_crlf_injection_in_header_value(self): + input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') + output = AuditableBytesIO() + handler = SocketlessRequestHandler() + handler.rfile = input + handler.wfile = output + handler.request_version = 'HTTP/1.1' + + with self.assertRaises(ValueError) as ctx: + handler.send_header('X-Custom', 'value\r\nSet-Cookie: custom=true') + self.assertIn('Invalid header name/value: contains CR or LF', + str(ctx.exception)) + + def test_crlf_injection_in_header_name(self): + input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') + output = AuditableBytesIO() + handler = SocketlessRequestHandler() + handler.rfile = input + handler.wfile = output + handler.request_version = 'HTTP/1.1' + + with self.assertRaises(ValueError) as ctx: + handler.send_header('X-Inj\r\nSet-Cookie', 'value') + self.assertIn('Invalid header name/value: contains CR or LF', + str(ctx.exception)) + def test_header_unbuffered_when_continue(self): def _readAndReseek(f): diff --git a/Lib/test/test_wsgiref.py b/Lib/test/test_wsgiref.py index e04a4d2c2218a3..5c88dc989aa919 100644 --- a/Lib/test/test_wsgiref.py +++ b/Lib/test/test_wsgiref.py @@ -503,6 +503,39 @@ def testExtras(self): '\r\n' ) + def test_crlf_rejection_in_setitem(self): + h = Headers() + for crlf in ('\r\n', '\r', '\n'): + with self.subTest(crlf_repr=repr(crlf)): + with self.assertRaises(ValueError) as ctx: + h['X-Custom'] = f'value{crlf}Set-Cookie: test' + self.assertIn('CR or LF', str(ctx.exception)) + + def test_crlf_rejection_in_setdefault(self): + for crlf in ('\r\n', '\r', '\n'): + with self.subTest(crlf_repr=repr(crlf)): + h = Headers() + with self.assertRaises(ValueError) as ctx: + h.setdefault('X-Custom', f'value{crlf}Set-Cookie: test') + self.assertIn('CR or LF', str(ctx.exception)) + + def test_crlf_rejection_in_add_header(self): + for crlf in ('\r\n', '\r', '\n'): + with self.subTest(location='value', crlf_repr=repr(crlf)): + h = Headers() + with self.assertRaises(ValueError) as ctx: + h.add_header('X-Custom', f'value{crlf}Set-Cookie: test') + self.assertIn('CR or LF', str(ctx.exception)) + + with self.subTest(location='param', crlf_repr=repr(crlf)): + h = Headers() + with self.assertRaises(ValueError) as ctx: + h.add_header('Content-Disposition', + 'attachment', + filename=f'test{crlf}.txt') + self.assertIn('CR or LF', str(ctx.exception)) + + class ErrorHandler(BaseCGIHandler): """Simple handler subclass for testing BaseHandler""" diff --git a/Lib/wsgiref/headers.py b/Lib/wsgiref/headers.py index c78879f80c7df2..544558f082b4d8 100644 --- a/Lib/wsgiref/headers.py +++ b/Lib/wsgiref/headers.py @@ -33,17 +33,14 @@ def __init__(self, headers=None): if type(headers) is not list: raise TypeError("Headers must be a list of name/value tuples") self._headers = headers - if __debug__: - for k, v in headers: - self._convert_string_type(k) - self._convert_string_type(v) - - def _convert_string_type(self, value): - """Convert/check value type.""" - if type(value) is str: - return value - raise AssertionError("Header names/values must be" - " of type str (got {0})".format(repr(value))) + + def _validate_header_string(self, value): + """Validate header type and value.""" + if not isinstance(value, str): + raise AssertionError("Header names/values must be" + " of type str (got {0})".format(repr(value))) + if '\r' in value or '\n' in value: + raise ValueError('Invalid header name/value: contains CR or LF') def __len__(self): """Return the total number of headers, including duplicates.""" @@ -52,16 +49,18 @@ def __len__(self): def __setitem__(self, name, val): """Set the value of a header.""" del self[name] - self._headers.append( - (self._convert_string_type(name), self._convert_string_type(val))) + self._validate_header_string(name) + self._validate_header_string(val) + self._headers.append((name, val)) def __delitem__(self,name): """Delete all occurrences of a header, if present. Does *not* raise an exception if the header is missing. """ - name = self._convert_string_type(name.lower()) - self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name] + self._headers[:] = [ + kv for kv in self._headers if kv[0].lower() != name.lower() + ] def __getitem__(self,name): """Get the first header value for 'name' @@ -87,15 +86,13 @@ def get_all(self, name): fields deleted and re-inserted are always appended to the header list. If no fields exist with the given name, returns an empty list. """ - name = self._convert_string_type(name.lower()) - return [kv[1] for kv in self._headers if kv[0].lower()==name] + return [kv[1] for kv in self._headers if kv[0].lower()==name.lower()] def get(self,name,default=None): """Get the first header value for 'name', or return 'default'""" - name = self._convert_string_type(name.lower()) for k,v in self._headers: - if k.lower()==name: + if k.lower()==name.lower(): return v return default @@ -148,8 +145,9 @@ def setdefault(self,name,value): and value 'value'.""" result = self.get(name) if result is None: - self._headers.append((self._convert_string_type(name), - self._convert_string_type(value))) + self._validate_header_string(name) + self._validate_header_string(value) + self._headers.append((name, value)) return value else: return result @@ -170,15 +168,16 @@ def add_header(self, _name, _value, **_params): *not* handle '(charset, language, value)' tuples: all values must be strings or None. """ + self._validate_header_string(_name) parts = [] if _value is not None: - _value = self._convert_string_type(_value) + self._validate_header_string(_value) parts.append(_value) for k, v in _params.items(): - k = self._convert_string_type(k) + self._validate_header_string(k) if v is None: parts.append(k.replace('_', '-')) else: - v = self._convert_string_type(v) + self._validate_header_string(v) parts.append(_formatparam(k.replace('_', '-'), v)) - self._headers.append((self._convert_string_type(_name), "; ".join(parts))) + self._headers.append((_name, "; ".join(parts))) diff --git a/Misc/NEWS.d/next/Library/2025-12-11-22-28-21.gh-issue-142533.SN0f9_.rst b/Misc/NEWS.d/next/Library/2025-12-11-22-28-21.gh-issue-142533.SN0f9_.rst new file mode 100644 index 00000000000000..2b8f567d2d497b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-12-11-22-28-21.gh-issue-142533.SN0f9_.rst @@ -0,0 +1,2 @@ +Reject CR/LF in HTTP headers in ``http.server`` and ``wsgiref.headers`` to +prevent CRLF injection attacks.