From 0ee917e7b7ce42d2f46ab9339a211b98bf708aeb Mon Sep 17 00:00:00 2001 From: Ved Sharma Date: Wed, 1 Apr 2026 13:15:32 -0500 Subject: [PATCH] test: Add missing coverage for chunking module edge cases Adds tests for untested guard clauses (IndexError/ValueError/TokenUtilError error paths), ChunkIterator constructor fallbacks, TextChunk property raises and caching behavior, _sanitize, make_batches_of_textchunk batch sizes, and the broken_sentence flag reset logic. Co-Authored-By: Claude Sonnet 4.6 --- tests/chunking_test.py | 273 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 273 insertions(+) diff --git a/tests/chunking_test.py b/tests/chunking_test.py index 75e84157..a2ba8ae1 100644 --- a/tests/chunking_test.py +++ b/tests/chunking_test.py @@ -561,5 +561,278 @@ def test_text_chunk_properties( self.assertEqual(chunk.additional_context, expected_context) +class SentenceIteratorEdgeCasesTest(absltest.TestCase): + + def test_negative_curr_token_pos_raises_index_error(self): + tokenized_text = tokenizer.tokenize("Hello world.") + with self.assertRaises(IndexError): + chunking.SentenceIterator(tokenized_text, curr_token_pos=-1) + + def test_curr_token_pos_beyond_length_raises_index_error(self): + tokenized_text = tokenizer.tokenize("Hello world.") + with self.assertRaises(IndexError): + chunking.SentenceIterator( + tokenized_text, + curr_token_pos=len(tokenized_text.tokens) + 1, + ) + + def test_curr_token_pos_at_length_raises_stop_iteration(self): + tokenized_text = tokenizer.tokenize("Hello world.") + sentence_iter = chunking.SentenceIterator( + tokenized_text, + curr_token_pos=len(tokenized_text.tokens), + ) + with self.assertRaises(StopIteration): + next(sentence_iter) + + def test_mid_document_start(self): + # "First sentence." = [First, sentence, .] = 3 tokens (indices 0-2). + # "Second sentence." starts at index 3. + text = "First sentence. Second sentence." + tokenized_text = tokenizer.tokenize(text) + sentence_iter = chunking.SentenceIterator(tokenized_text, curr_token_pos=3) + sentence_interval = next(sentence_iter) + self.assertEqual(sentence_interval.start_index, 3) + self.assertEqual( + chunking.get_token_interval_text(tokenized_text, sentence_interval), + "Second sentence.", + ) + + def test_text_without_punctuation_is_one_sentence(self): + text = "This text has no punctuation at all" + tokenized_text = tokenizer.tokenize(text) + sentence_iter = chunking.SentenceIterator(tokenized_text) + sentence_interval = next(sentence_iter) + self.assertEqual( + chunking.get_token_interval_text(tokenized_text, sentence_interval), + text, + ) + with self.assertRaises(StopIteration): + next(sentence_iter) + + +class ChunkIteratorConstructorTest(absltest.TestCase): + + def test_no_text_and_no_document_raises_value_error(self): + with self.assertRaises(ValueError): + chunking.ChunkIterator( + text=None, + max_char_buffer=100, + tokenizer_impl=tokenizer.RegexTokenizer(), + ) + + def test_none_text_uses_document_text(self): + document = data.Document(text="Hello world.", document_id="doc1") + chunk_iter = chunking.ChunkIterator( + text=None, + max_char_buffer=100, + document=document, + tokenizer_impl=tokenizer.RegexTokenizer(), + ) + chunk = next(chunk_iter) + self.assertEqual(chunk.chunk_text, "Hello world.") + + def test_empty_tokenized_text_retokenizes_from_document(self): + # TokenizedText with no tokens should trigger re-tokenization using + # document.text as fallback. + document = data.Document(text="Hello world.") + empty_tokenized = tokenizer.TokenizedText(text="", tokens=[]) + chunk_iter = chunking.ChunkIterator( + text=empty_tokenized, + max_char_buffer=100, + document=document, + tokenizer_impl=tokenizer.RegexTokenizer(), + ) + chunk = next(chunk_iter) + self.assertEqual(chunk.chunk_text, "Hello world.") + + def test_exact_buffer_size_fits_in_one_chunk(self): + # "Hello world." is 12 chars; max_char_buffer=12 uses > (not >=), + # so the text should fit in a single chunk. + text = "Hello world." + tokenized_text = tokenizer.tokenize(text) + chunk_iter = chunking.ChunkIterator( + tokenized_text, + max_char_buffer=12, + tokenizer_impl=tokenizer.RegexTokenizer(), + ) + chunks = list(chunk_iter) + self.assertLen(chunks, 1) + self.assertEqual(chunks[0].chunk_text, text) + + +class CreateTokenIntervalTest(absltest.TestCase): + + def test_negative_start_index_raises(self): + with self.assertRaises(ValueError): + chunking.create_token_interval(-1, 5) + + def test_equal_indices_raises(self): + with self.assertRaises(ValueError): + chunking.create_token_interval(3, 3) + + def test_start_greater_than_end_raises(self): + with self.assertRaises(ValueError): + chunking.create_token_interval(5, 3) + + +class GetTokenIntervalTextTest(absltest.TestCase): + + def test_invalid_interval_raises_value_error(self): + tokenized_text = tokenizer.tokenize("Hello world.") + invalid_interval = tokenizer.TokenInterval(start_index=2, end_index=2) + with self.assertRaises(ValueError): + chunking.get_token_interval_text(tokenized_text, invalid_interval) + + def test_token_util_error_on_empty_return(self): + tokenized_text = tokenizer.tokenize("Hello world.") + valid_interval = tokenizer.TokenInterval(start_index=0, end_index=2) + with mock.patch("langextract.core.tokenizer.tokens_text", return_value=""): + with self.assertRaises(chunking.TokenUtilError): + chunking.get_token_interval_text(tokenized_text, valid_interval) + + +class GetCharIntervalTest(absltest.TestCase): + + def test_invalid_interval_raises_value_error(self): + tokenized_text = tokenizer.tokenize("Hello world.") + invalid_interval = tokenizer.TokenInterval(start_index=2, end_index=2) + with self.assertRaises(ValueError): + chunking.get_char_interval(tokenized_text, invalid_interval) + + +class TextChunkMissingDocumentTest(absltest.TestCase): + + def test_chunk_text_raises_when_no_document(self): + chunk = chunking.TextChunk( + token_interval=tokenizer.TokenInterval(start_index=0, end_index=1), + document=None, + ) + with self.assertRaises(ValueError): + _ = chunk.chunk_text + + def test_char_interval_raises_when_no_document(self): + chunk = chunking.TextChunk( + token_interval=tokenizer.TokenInterval(start_index=0, end_index=1), + document=None, + ) + with self.assertRaises(ValueError): + _ = chunk.char_interval + + def test_str_shows_unavailable_when_no_document(self): + chunk = chunking.TextChunk( + token_interval=tokenizer.TokenInterval(start_index=0, end_index=1), + document=None, + ) + self.assertIn("", str(chunk)) + + +class SanitizeTest(absltest.TestCase): + + def test_whitespace_only_raises_value_error(self): + with self.assertRaises(ValueError): + chunking._sanitize(" \n\t ") + + def test_mixed_whitespace_collapsed_to_single_space(self): + result = chunking._sanitize("hello\n\t world") + self.assertEqual(result, "hello world") + + def test_leading_trailing_whitespace_stripped(self): + result = chunking._sanitize(" hello world ") + self.assertEqual(result, "hello world") + + +class SanitizedChunkTextTest(absltest.TestCase): + + def test_sanitized_chunk_text_collapses_whitespace(self): + text = "Hello\n world." + document = data.Document(text=text) + chunk_iter = chunking.ChunkIterator( + text=document.tokenized_text, + max_char_buffer=200, + document=document, + tokenizer_impl=tokenizer.RegexTokenizer(), + ) + chunk = next(chunk_iter) + self.assertEqual(chunk.sanitized_chunk_text, "Hello world.") + + +class ChunkCachingTest(absltest.TestCase): + + def _make_chunk(self) -> chunking.TextChunk: + text = "Hello world." + document = data.Document(text=text) + chunk_iter = chunking.ChunkIterator( + text=document.tokenized_text, + max_char_buffer=200, + document=document, + tokenizer_impl=tokenizer.RegexTokenizer(), + ) + return next(chunk_iter) + + def test_chunk_text_is_cached(self): + chunk = self._make_chunk() + with mock.patch( + "langextract.chunking.get_token_interval_text", + wraps=chunking.get_token_interval_text, + ) as mock_fn: + _ = chunk.chunk_text + _ = chunk.chunk_text + mock_fn.assert_called_once() + + def test_char_interval_is_cached(self): + chunk = self._make_chunk() + first_call = chunk.char_interval + second_call = chunk.char_interval + self.assertIs(first_call, second_call) + + +class MakeBatchesAdditionalTest(absltest.TestCase): + + def _make_chunk_iter(self, text, max_char_buffer): + document = data.Document(text=text) + return chunking.ChunkIterator( + text=document.tokenized_text, + max_char_buffer=max_char_buffer, + document=document, + tokenizer_impl=tokenizer.RegexTokenizer(), + ) + + def test_batch_length_one_puts_each_chunk_in_own_batch(self): + chunk_iter = self._make_chunk_iter("One. Two. Three.", max_char_buffer=6) + batches = [ + list(b) for b in chunking.make_batches_of_textchunk(chunk_iter, 1) + ] + for batch in batches: + self.assertLen(batch, 1) + self.assertGreater(len(batches), 1) + + def test_batch_length_larger_than_chunks_produces_one_batch(self): + chunk_iter = self._make_chunk_iter("Hello.", max_char_buffer=100) + batches = [ + list(b) for b in chunking.make_batches_of_textchunk(chunk_iter, 1000) + ] + self.assertLen(batches, 1) + + +class BrokenSentenceResetTest(absltest.TestCase): + + def test_merging_resumes_after_broken_sentence(self): + # "Word word word word." (20 chars) exceeds max_char_buffer=15 and is + # broken across chunks. Afterwards, "Hi." and "Bye." are each short enough + # to merge and should appear together in a single final chunk. + text = "Word word word word. Hi. Bye." + tokenized_text = tokenizer.tokenize(text) + chunk_iter = chunking.ChunkIterator( + tokenized_text, + max_char_buffer=15, + tokenizer_impl=tokenizer.RegexTokenizer(), + ) + chunks = list(chunk_iter) + last_chunk_text = chunks[-1].chunk_text + self.assertIn("Hi.", last_chunk_text) + self.assertIn("Bye.", last_chunk_text) + + if __name__ == "__main__": absltest.main()