diff --git a/pypdf/constants.py b/pypdf/constants.py index c1069b69ab..e294c5fa46 100644 --- a/pypdf/constants.py +++ b/pypdf/constants.py @@ -236,6 +236,7 @@ class FilterTypes(StrEnum): CCITT_FAX_DECODE = "/CCITTFaxDecode" # abbreviation: CCF DCT_DECODE = "/DCTDecode" # abbreviation: DCT JPX_DECODE = "/JPXDecode" + BROTLI_DECODE = "/BrotliDecode" # abbreviation: Br, PDF 2.0 JBIG2_DECODE = "/JBIG2Decode" @@ -249,6 +250,7 @@ class FilterTypeAbbreviations: RL = "/RL" CCF = "/CCF" DCT = "/DCT" + BR = "/Br" class LzwFilterParameters: diff --git a/pypdf/filters.py b/pypdf/filters.py index 39abc8c988..4ec8828731 100644 --- a/pypdf/filters.py +++ b/pypdf/filters.py @@ -72,9 +72,15 @@ is_null_or_none, ) +try: + import brotli +except ImportError: + brotli = None + MAX_DECLARED_STREAM_LENGTH = 75_000_000 MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH = 75_000_000 +BROTLI_MAX_OUTPUT_LENGTH = 75_000_000 JBIG2_MAX_OUTPUT_LENGTH = 75_000_000 LZW_MAX_OUTPUT_LENGTH = 75_000_000 RUN_LENGTH_MAX_OUTPUT_LENGTH = 75_000_000 @@ -557,6 +563,73 @@ def decode( return data +class BrotliDecode: + @staticmethod + def decode( + data: bytes, + decode_parms: Optional[DictionaryObject] = None, + **kwargs: Any, + ) -> bytes: + """ + Decompresses data encoded using the Brotli compression method, + reproducing the original data; §7.4.11, ISO 32000-2:2020. + + Please note that the output length is limited to avoid memory + issues. If you need to process larger content streams, consider + adapting ``pypdf.filters.BROTLI_MAX_OUTPUT_LENGTH``. In case you + are only dealing with trusted inputs and/or want to disable these + limits, set the value to ``0``. + + Args: + data: text to decode. + decode_parms: this filter does not use parameters. + + Returns: + decoded data. + + Raises: + DependencyError: If the ``brotli`` package is not installed. + + """ + if brotli is None: + raise DependencyError("brotli is required for BrotliDecode. Install it with: pip install pypdf[brotli]") + if not BROTLI_MAX_OUTPUT_LENGTH: + return bytes(brotli.decompress(data)) + decompressor = brotli.Decompressor() + chunks: list[bytes] = [] + total_length = 0 + chunk_size = 65536 + for i in range(0, len(data), chunk_size): + output: bytes = decompressor.process(data[i : i + chunk_size]) + chunks.append(output) + total_length += len(output) + if total_length > BROTLI_MAX_OUTPUT_LENGTH: + raise LimitReachedError( + f"Limit reached while decompressing. {len(data) - i - chunk_size} bytes remaining." + ) + return b"".join(chunks) + + @staticmethod + def encode(data: bytes, **kwargs: Any) -> bytes: + """ + Compresses data using the Brotli compression method; + §7.4.11, ISO 32000-2:2020. + + Args: + data: The data to be compressed. + + Returns: + The compressed data. + + Raises: + DependencyError: If the ``brotli`` package is not installed. + + """ + if brotli is None: + raise DependencyError("brotli is required for BrotliDecode. Install it with: pip install pypdf[brotli]") + return bytes(brotli.compress(data)) + + @dataclass class CCITTParameters: """§7.4.6, optional parameters for the CCITTFaxDecode filter.""" @@ -827,6 +900,9 @@ def decode_stream_data(stream: StreamObject) -> bytes: data = DCTDecode.decode(data) elif filter_name == FT.JPX_DECODE: data = JPXDecode.decode(data) + elif filter_name in (FT.BROTLI_DECODE, FTA.BR): + _deprecate_inline_image_filters(filter_name=filter_name, old_name=FTA.BR, new_name=FT.BROTLI_DECODE) + data = BrotliDecode.decode(data) elif filter_name == FT.JBIG2_DECODE: data = JBIG2Decode.decode(data, params) elif filter_name == "/Crypt": diff --git a/pyproject.toml b/pyproject.toml index bcb45daa94..d9f8aa923b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,9 +42,11 @@ Source = "https://github.com/py-pdf/pypdf" crypto = ["cryptography"] cryptodome = ["PyCryptodome"] image = ["Pillow>=8.0.0"] +brotli = ["brotli"] full = [ "cryptography", - "Pillow>=8.0.0" + "Pillow>=8.0.0", + "brotli", ] dev = [ "flit", diff --git a/tests/test_filters.py b/tests/test_filters.py index dcc41f0a3e..9a1fdc9eb8 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -18,6 +18,7 @@ from pypdf.filters import ( ASCII85Decode, ASCIIHexDecode, + BrotliDecode, CCITParameters, CCITTFaxDecode, CCITTParameters, @@ -27,6 +28,13 @@ decode_stream_data, decompress, ) + +try: + import brotli + + HAS_BROTLI = True +except ImportError: + HAS_BROTLI = False from pypdf.generic import ( ArrayObject, BooleanObject, @@ -869,6 +877,85 @@ def test_rle_decode_exception_with_corrupted_stream(caplog): assert caplog.messages == ["Early EOD in RunLengthDecode, check if output is OK"] +@pytest.mark.parametrize("s", filter_inputs) +@pytest.mark.skipif(not HAS_BROTLI, reason="brotli not installed") +def test_brotli_decode_encode(s): + """BrotliDecode encode() and decode() methods work as expected.""" + s_bytes = s.encode() + encoded = BrotliDecode.encode(s_bytes) + assert BrotliDecode.decode(encoded) == s_bytes + + +@mock.patch("pypdf.filters.brotli", None) +def test_brotli_missing_installation(): + """BrotliDecode raises DependencyError when brotli is not installed.""" + with pytest.raises(DependencyError): + BrotliDecode.decode(b"test data") + + with pytest.raises(DependencyError): + BrotliDecode.encode(b"test data") + + +@pytest.mark.skipif(not HAS_BROTLI, reason="brotli not installed") +def test_brotli_decode_output_limit(): + """BrotliDecode raises LimitReachedError when output exceeds limit.""" + large_data = b"A" * 1000 + compressed = BrotliDecode.encode(large_data) + with mock.patch("pypdf.filters.BROTLI_MAX_OUTPUT_LENGTH", 100), \ + pytest.raises(LimitReachedError): + BrotliDecode.decode(compressed) + + +@pytest.mark.skipif(not HAS_BROTLI, reason="brotli not installed") +def test_brotli_decode_stream_data(): + """BrotliDecode works correctly through decode_stream_data.""" + original = b"Hello, Brotli!" + compressed = BrotliDecode.encode(original) + stream = DictionaryObject() + stream[NameObject("/Filter")] = NameObject("/BrotliDecode") + stream._data = compressed # type: ignore[attr-defined] + assert decode_stream_data(stream) == original + + +@mock.patch("pypdf.filters.brotli", None) +def test_brotli_decode_stream_data_missing(): + """decode_stream_data raises DependencyError for BrotliDecode when brotli is missing.""" + stream = DictionaryObject() + stream[NameObject("/Filter")] = NameObject("/BrotliDecode") + stream._data = b"dummy" # type: ignore[attr-defined] + with pytest.raises(DependencyError): + decode_stream_data(stream) + + +@pytest.mark.skipif(not HAS_BROTLI, reason="brotli not installed") +def test_brotli_pdf_roundtrip(): + """A PDF with BrotliDecode-compressed content stream can be read back.""" + original_text = "Hello, Brotli PDF!" + writer = PdfWriter() + writer.add_blank_page(width=200, height=200) + page = writer.pages[0] + + # Build a minimal content stream with BrotliDecode filter + content = f"BT /F1 12 Tf 50 150 Td ({original_text}) Tj ET".encode() + compressed = brotli.compress(content) + + stream = StreamObject() + stream[NameObject("/Filter")] = NameObject("/BrotliDecode") + stream._data = compressed + + page[NameObject("/Contents")] = writer._add_object(stream) + + buf = BytesIO() + writer.write(buf) + + buf.seek(0) + reader = PdfReader(buf) + page_content = reader.pages[0].get_contents() + assert page_content is not None + raw_data = page_content.get_data() + assert original_text.encode() in raw_data + + def test_decompress(): data = string.printable.encode("utf-8") + string.printable[::-1].encode("utf-8") compressed = FlateDecode.encode(data)