Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions disk_objectstore/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from contextlib import contextmanager
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, overload
from typing import TYPE_CHECKING, BinaryIO, ContextManager, overload

from sqlalchemy.engine import Connection, Engine
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -576,7 +576,7 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too
# to order in python instead
session = self._get_operation_session()

obj_reader: StreamReadBytesType
obj_reader: StreamSeekBytesType

if len(hashkeys_set) <= self._MAX_CHUNK_ITERATE_LENGTH:
# Operate in chunks, due to the SQLite limits
Expand Down Expand Up @@ -1541,7 +1541,7 @@ def add_streamed_object_to_pack( # pylint: disable=too-many-arguments
)

# Close the callback so the bar doesn't remain open
streams[0].close_callback() # type: ignore[union-attr]
streams[0].close_callback() # type: ignore[attr-defined]

return retval[0]

Expand Down Expand Up @@ -1688,10 +1688,11 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b
# Get next stream, possibly preparing it to be open, or wrapping it
# if it is already open so it does not get open again
next_stream = working_stream_list.pop()
stream_context_manager: ContextManager[StreamSeekBytesType | BinaryIO]
if open_streams:
stream_context_manager = next_stream
stream_context_manager = next_stream # type: ignore[assignment]
else:
stream_context_manager = nullcontext(next_stream) # type: ignore[assignment]
stream_context_manager = nullcontext(next_stream) # type: ignore[arg-type]

if callback:
since_last_update += 1
Expand All @@ -1708,12 +1709,14 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b
obj_dict['compressed'] = compress
obj_dict['offset'] = pack_handle.tell()
with stream_context_manager as stream:
# BinaryIO from LazyOpener and StreamSeekBytesType both have all required methods
stream_typed: StreamSeekBytesType = stream # type: ignore[assignment]
if no_holes and no_holes_read_twice:
# Compute the hash key before writing (I just read once)
(
obj_dict['hashkey'],
obj_dict['size'],
) = compute_hash_and_size(stream, hash_type=self.hash_type)
) = compute_hash_and_size(stream_typed, hash_type=self.hash_type)
if obj_dict['hashkey'] in known_packed_hashkeys:
# I recomputed the hashkey and this was already there: I don't try to write on disk,
# but I just continue.
Expand All @@ -1724,14 +1727,14 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b
# I didn't continue. Then, I need to store on disk, as it is a new unknown object.
# I therefore need to seek back to zero, because the next line will read it again
# in _write_data_to_packfile.
stream.seek(0)
stream_typed.seek(0)

(
obj_dict['size'],
obj_dict['hashkey'],
) = self._write_data_to_packfile(
pack_handle=pack_handle,
read_handle=stream,
read_handle=stream_typed,
compress=compress,
hash_type=self.hash_type,
)
Expand Down
Loading
Loading