diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 63f9b8f0..7fdb663e 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -6,8 +6,10 @@ import json import os import shutil +import struct import uuid import warnings +import zipfile from collections import defaultdict, namedtuple from contextlib import contextmanager from enum import Enum @@ -26,9 +28,10 @@ Union, overload, ) +from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipInfo try: - from typing import Literal + from typing import Literal # pylint: disable=ungrouped-imports except ImportError: # Python <3.8 backport from typing_extensions import Literal # type: ignore @@ -37,7 +40,7 @@ from sqlalchemy.sql import func from sqlalchemy.sql.expression import delete, select, text, update -from .database import Obj, get_session +from .database import Obj, Pack, PackState, get_session from .exceptions import InconsistentContent, NotExistent, NotInitialised from .utils import ( CallbackStreamWrapper, @@ -51,9 +54,11 @@ ZlibStreamDecompresser, chunk_iterator, compute_hash_and_size, + compute_hash_crc_and_size, detect_where_sorted, get_compressobj_instance, get_hash, + get_md5, get_stream_decompresser, is_known_hash, merge_sorted, @@ -62,11 +67,14 @@ safe_flush_to_disk, yield_first_element, ) +from .zipsupport import is_zip, write_end_record, write_zip_header ObjQueryResults = namedtuple( "ObjQueryResults", ["hashkey", "offset", "length", "compressed", "size"] ) +FN_SIZE = 64 # Size for the file name used in LOCAL header for each object + class ObjectType(Enum): """Enum that describes the various types of an objec (as returned in ``meta['type']``).""" @@ -119,6 +127,9 @@ class Container: # pylint: disable=too-many-public-methods # (after VACUUMing, as mentioned above). _MAX_CHUNK_ITERATE_LENGTH = 9500 + # Size of the ZIP local file header + _ZIP_HEADER_SIZE = 30 + FN_SIZE + 20 + def __init__(self, folder: Union[str, Path]) -> None: """Create the class that represents the container. @@ -132,6 +143,7 @@ def __init__(self, folder: Union[str, Path]) -> None: # IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`! self._current_pack_id: Optional[int] = None self._config: Optional[dict] = None + self._hash_place_holder = " " * FN_SIZE def get_folder(self) -> str: """Return the path to the folder that will host the object-store container.""" @@ -267,7 +279,11 @@ def _get_pack_id_to_write_to(self) -> int: """ # Default to zero if not set (e.g. if it's None) pack_id = self._current_pack_id or 0 + sealed_packs = self.get_sealed_packs(include_archived=True) while True: + if pack_id in sealed_packs: + pack_id += 1 + continue pack_path = self._get_pack_path_from_pack_id(pack_id) if not os.path.exists(pack_path): # Use this ID - the pack file does not exist yet @@ -1134,7 +1150,10 @@ def get_total_size(self) -> Dict[str, int]: @contextmanager def lock_pack( - self, pack_id: str, allow_repack_pack: bool = False + self, + pack_id: str, + allow_repack_pack: bool = False, + mode="ab", ) -> Iterator[StreamWriteBytesType]: """Lock the given pack id. Use as a context manager. @@ -1147,6 +1166,10 @@ def lock_pack( """ assert self._is_valid_pack_id(pack_id, allow_repack_pack=allow_repack_pack) + # For sealed pack, we only allow reading + if pack_id in self.get_sealed_packs(): + assert mode.startswith("r") and "+" not in mode + # Open file in exclusive mode lock_file = os.path.join(self._get_pack_folder(), f"{pack_id}.lock") pack_file = self._get_pack_path_from_pack_id( @@ -1154,7 +1177,7 @@ def lock_pack( ) try: with open(lock_file, "x"): - with open(pack_file, "ab") as pack_handle: + with open(pack_file, mode) as pack_handle: yield pack_handle finally: # Release resource (I check if it exists in case there was an exception) @@ -1248,7 +1271,7 @@ def _write_data_to_packfile( read_handle: StreamReadBytesType, compress: bool, hash_type: Optional[str] = None, - ) -> Union[Tuple[int, None], Tuple[int, str]]: + ) -> Union[Tuple[int, int, None], Tuple[int, int, str]]: """Append data, read from read_handle until it ends, to the correct packfile. Return the number of bytes READ (note that this will be different @@ -1278,8 +1301,13 @@ def _write_data_to_packfile( if compress: compressobj = self._get_compressobj_instance() - + write_zip_header( + pack_handle, + self._hash_place_holder, + "zlib" if compress else None, + ) count_read_bytes = 0 + offset = pack_handle.tell() while True: chunk = read_handle.read(self._CHUNKSIZE) if chunk == b"": @@ -1300,7 +1328,11 @@ def _write_data_to_packfile( # compressobj pack_handle.write(compressobj.flush()) - return (count_read_bytes, hasher.hexdigest() if hash_type else None) + return ( + offset, + count_read_bytes, + hasher.hexdigest() if hash_type else None, + ) def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches self, @@ -1396,7 +1428,6 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches obj_dict["hashkey"] = loose_hashkey obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress - obj_dict["offset"] = pack_handle.tell() try: with open( self._get_loose_path_from_hashkey(loose_hashkey), "rb" @@ -1404,6 +1435,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches # The second parameter is `None` since we are not computing the hash # We can instead pass the hash algorithm and assert that it is correct ( + obj_dict["offset"], obj_dict["size"], new_hashkey, ) = self._write_data_to_packfile( @@ -1669,6 +1701,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b obj_dict: Dict[str, Any] = {} obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress + obj_dict["offset"] = pack_handle.tell() with stream_context_manager as stream: if no_holes and no_holes_read_twice: @@ -1688,8 +1721,8 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # I therefore need to seek back to zero, because the next line will read it again # in _write_data_to_packfile. stream.seek(0) - ( + obj_dict["offset"], obj_dict["size"], obj_dict["hashkey"], ) = self._write_data_to_packfile( @@ -1699,6 +1732,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b hash_type=self.hash_type, ) obj_dict["length"] = pack_handle.tell() - obj_dict["offset"] + # Here, we have appended the object to the pack file. # And now that we are done, we know the hash key. # However, we have to cope with the fact that an object with the same hash key @@ -2246,11 +2280,12 @@ def export( ) # Let us also compute the hash - def _validate_hashkeys_pack( - self, pack_id: int, callback: Optional[Callable] = None - ) -> Dict[ - str, Union[List[Union[str, Any]], List[Any]] - ]: # pylint: disable=too-many-locals + def _validate_hashkeys_pack( # pylint: disable=too-many-branches, too-many-locals + self, + pack_id: int, + callback: Optional[Callable] = None, + include_crc: bool = False, + ) -> Dict[str, Any]: """Validate all hashkeys and returns a dictionary of problematic entries. The keys are the problem type, the values are a list of hashkeys of problematic objects. @@ -2278,6 +2313,8 @@ def _validate_hashkeys_pack( - after every object has been processed, with ``action=='update'`` and value equal to the number of newly processed entries since the last call. - at the end, with ``action=='close'`` and value equal to ``None``. + :param include_crc: If set to ``True``, compute the CRC32 value while validating the pack. The CRC32 is needed + for sealing the pack. Here is a minimal example of progress bar using the ``tqdm`` library: @@ -2328,6 +2365,9 @@ def callback(self, action, value): # Open the pack only once, read it in order pack_path = self._get_pack_path_from_pack_id(str(pack_id)) current_pos = 0 + if include_crc: + computed_crc = {} + zip_compatible = self._is_pack_zip_compatible(pack_id) with open(pack_path, mode="rb") as pack_handle: stmt = ( select(Obj.hashkey, Obj.size, Obj.offset, Obj.length, Obj.compressed) @@ -2341,9 +2381,16 @@ def callback(self, action, value): if compressed: obj_reader = self._get_stream_decompresser()(obj_reader) - computed_hash, computed_size = compute_hash_and_size( - obj_reader, self.hash_type - ) + if include_crc: + ( + computed_hash, + computed_crc[hashkey], + computed_size, + ) = compute_hash_crc_and_size(obj_reader, self.hash_type) + else: + computed_hash, computed_size = compute_hash_and_size( + obj_reader, self.hash_type + ) # Check object correctness if computed_hash != hashkey: @@ -2352,8 +2399,12 @@ def callback(self, action, value): invalid_sizes.append(hashkey) # Check that there are no overlapping objects - if offset < current_pos: + + if ( + zip_compatible and (offset < current_pos + self._ZIP_HEADER_SIZE) + ) or (not zip_compatible and (offset < current_pos)): overlapping.append(hashkey) + current_pos = offset + length if callback: @@ -2369,11 +2420,14 @@ def callback(self, action, value): # Perform any wrap-up, if needed callback(action="close", value=None) - return { + output = { "invalid_hashes_packed": invalid_hashes, "invalid_sizes_packed": invalid_sizes, "overlapping_packed": overlapping, - } + } # type: Dict[str, Any] + if include_crc: + output["crc_of_hashes"] = computed_crc + return output def validate( self, callback: Optional[Callable] = None @@ -2408,7 +2462,7 @@ def validate( for pack_id in all_pack_ids: pack_errors = self._validate_hashkeys_pack( - pack_id=pack_id, callback=callback + pack_id=pack_id, callback=callback, include_crc=False ) for error_type, problematic_objects in pack_errors.items(): all_errors[error_type] += problematic_objects @@ -2572,9 +2626,14 @@ def repack_pack( .where(Obj.pack_id == pack_id) .order_by(Obj.offset) ) - for rowid, hashkey, size, offset, length, compressed in session.execute( - stmt - ): + for ( + rowid, + hashkey, + size, + offset, + length, + compressed, + ) in session.execute(stmt): # Since I am assuming above that the method is `KEEP`, I will just transfer # the bytes. Otherwise I have to properly take into account compression in the # source and in the destination. @@ -2586,6 +2645,11 @@ def repack_pack( obj_dict["pack_id"] = self._REPACK_PACK_ID obj_dict["compressed"] = compressed obj_dict["size"] = size + write_zip_header( + write_pack_handle, + obj_dict["hashkey"].ljust(FN_SIZE), # Ensure constant size + "zlib" if compressed else None, + ) obj_dict["offset"] = write_pack_handle.tell() # Transfer data in chunks. @@ -2662,3 +2726,227 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. + + def seal_pack(self, pack_id: int): # pylint: disable=too-many-locals + """ + Seal a pack by adding a central directory in the end, making the file a fully functional + ZIP file. + + The sealing process consists of following steps: + + 1. Validate the pack and compute the CRC32 for each key on-the-fly + 2. Gather the information for each record as ZIP record + 3. Go through each record and update the local header, include the CRC32, filename and file lengths fields. + 4. Write the central directory file header and end of centra directory record + 5. Update the SQLite database to mark that the pack is sealed. + """ + # Validate if the pack is sealable + # Trying to seal a pack file that was not written in ZIP compatible format can result in + # serious data corrutpion + if not self._is_pack_zip_compatible(pack_id): + raise ValueError( + f"Pack file {pack_id} is not written in ZIP compatible format and cannot be sealed." + ) + + with open( + self._get_pack_path_from_pack_id(pack_id, allow_repack_pack=False), + mode="rb", + ) as pack_handle: + if is_zip(pack_handle): + raise ValueError(f"Pack file {pack_id} has been sealed already!") + + # Valid the pack conetents + errors_and_crc = self._validate_hashkeys_pack(pack_id, include_crc=True) + crc_of_hashes = errors_and_crc.pop("crc_of_hashes") + for error_type, problematic_objects in errors_and_crc.items(): + if problematic_objects: + raise ValueError( + f"Detected problematic objects: {problematic_objects} with error '{error_type}'" + ) + + # Gather information for each object + session = self._get_cached_session() + all_zipinfo = [] + stmt = ( + select( + Obj.id, + Obj.hashkey, + Obj.size, + Obj.offset, + Obj.length, + Obj.compressed, + ) + .where(Obj.pack_id == pack_id) + .order_by(Obj.offset) + ) + + for _, hashkey, size, offset, length, compressed in session.execute(stmt): + zipinfo = ZipInfo(filename=hashkey.ljust(FN_SIZE)) + zipinfo.compress_type = ZIP_DEFLATED if compressed else ZIP_STORED + zipinfo.file_size = size + zipinfo.compress_size = length + # Offset for the header + zipinfo.header_offset = offset - self._ZIP_HEADER_SIZE + zipinfo.CRC = crc_of_hashes[hashkey] + all_zipinfo.append(zipinfo) + + # We write the end of file table + with self.lock_pack( + str(pack_id), allow_repack_pack=False, mode="r+b" + ) as write_pack_handle: + + # Update the hashes as filenames + for zipinfo in all_zipinfo: + write_pack_handle.seek(zipinfo.header_offset, 0) + local_header = struct.unpack( + zipfile.structFileHeader, # type: ignore + write_pack_handle.read(zipfile.sizeFileHeader), # type: ignore + ) + # Check the magic + if local_header[0] != zipfile.stringFileHeader: # type: ignore + raise ValueError(f"Cannot find ZIP header for record {zipinfo}") + + # rewrite the file name + write_pack_handle.write(zipinfo.filename.encode("ascii")) + + # Update the CRC field + write_pack_handle.seek(zipinfo.header_offset + 14, 0) + write_pack_handle.write(struct.pack(" bool: + """Check if a pack is written in zip compatible format""" + pack_path = self._get_pack_path_from_pack_id(str(pack_id)) + with open(pack_path, mode="rb") as pack_handle: + # Validate the existence of ZIP local header for the first record + local_header = struct.unpack( + "<4s", pack_handle.read(4) + ) # Read the local header signature + if local_header[0] == zipfile.stringFileHeader: # type: ignore + return True + return False + + def get_sealed_packs(self, include_archived=True) -> Dict[int, str]: + """ + Get a dictionary of sealed packs and their MD5 + + :param include_archived: Also included archived packs. These packs have been sealed previously. + """ + session = self._get_cached_session() + if include_archived: + stmt = ( + select(Pack.id, Pack.md5) + .where( + (Pack.state == PackState.SEALED.value) + | (PackState == PackState.ARCHIVED.value) + ) + .order_by(Pack.id) + ) + else: + stmt = ( + select(Pack.id, Pack.md5) + .where(Pack.state == PackState.SEALED.value) + .order_by(Pack.id) + ) + + results = {} + for pack_id, pack_md5 in session.execute(stmt): + results[pack_id] = pack_md5 + return results + + def get_all_pack_info(self): + """Get the status of the packs""" + session = self._get_cached_session() + stmt = select( + Obj.pack_id, + ).distinct() + pack_ids = sorted(x[0] for x in session.execute(stmt)) + + # Locate those stored in Pack table + stmt = select( + Pack.id, + Pack.md5, + Pack.state, + Pack.location, + ) + entry_in_pack_table = {} + for pack_id, md5_value, state, location in session.execute(stmt): + entry_in_pack_table[pack_id] = { + "pack_id": pack_id, + "md5": md5_value, + "state": state, + "location": location, + } + # Combine the two + info = [] + for pack_id in pack_ids: + if pack_id in entry_in_pack_table: + info.append(entry_in_pack_table[pack_id]) + else: + info.append( + { + "pack_id": pack_id, + "md5": None, + "state": PackState.UNSEALED.value, + "location": None, + } + ) + return info + + def seal_all_sealable_packs(self): + """ + Find packs that are no longer active for writing and seal them. + """ + + # Find which packs are eligible for sealing + pack_id = 0 + eligible = [] + sealed = self.get_sealed_packs(include_archived=True) + # Go through all paths + while True: + pack_path = self._get_pack_path_from_pack_id(pack_id) + if not os.path.exists(pack_path): + # Use this ID - the pack file does not exist yet + break + if ( + os.path.getsize(pack_path) >= self.pack_size_target + and pack_id not in sealed + ): + # Use this ID - the pack file is not "full" yet + eligible.append(pack_id) + # Try the next pack + pack_id += 1 + + # Seal the packs one by one + for pack_id in eligible: + self.seal_pack(pack_id) diff --git a/disk_objectstore/database.py b/disk_objectstore/database.py index 73acc06b..79e9aa99 100644 --- a/disk_objectstore/database.py +++ b/disk_objectstore/database.py @@ -1,4 +1,5 @@ """Models for the container index file (SQLite DB).""" +import enum import os from typing import Optional @@ -10,6 +11,14 @@ Base = declarative_base() # pylint: disable=invalid-name,useless-suppression +class PackState(enum.Enum): + """Enum for valid sate of seal packs""" + + SEALED = "Sealed" + ARCHIVED = "Archived" + UNSEALED = "Unsealed" + + class Obj(Base): # pylint: disable=too-few-public-methods """The main (and only) table to store object metadata (hashkey, offset, length, ...).""" @@ -31,6 +40,17 @@ class Obj(Base): # pylint: disable=too-few-public-methods ) # integer ID of the pack in which this entry is stored +class Pack(Base): # pylint: disable=too-few-public-methods + """The table for storing the state of pack files. If missing, it means that the pack is currently active""" + + __tablename__ = "db_pack" + + id = Column(Integer, primary_key=True) + state = Column(String, nullable=False, unique=False) + md5 = Column(String, unique=True, nullable=False) + location = Column(String, nullable=True) + + def get_session( path: str, create: bool = False, raise_if_missing: bool = False ) -> Optional[Session]: diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 5622aa02..f9aa945e 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -24,7 +24,7 @@ Type, Union, ) -from zlib import error +from zlib import crc32, error from .exceptions import ClosingNotAllowed, ModificationNotAllowed @@ -960,7 +960,7 @@ def _compute_hash_for_filename(filename: str, hash_type: str) -> Optional[str]: class HashWriterWrapper: """A class that gets a stream open in write mode and wraps it in a new class that computes a hash while writing.""" - def __init__(self, write_stream: BinaryIO, hash_type: str) -> None: + def __init__(self, write_stream: BinaryIO, hash_type: str, do_crc=True) -> None: """Create the class from a given compressed bytestream. :param write_stream: an open bytes stream that supports the .write() method. @@ -971,6 +971,8 @@ def __init__(self, write_stream: BinaryIO, hash_type: str) -> None: self._write_stream = write_stream assert "b" in self._write_stream.mode self._hash_type = hash_type + self._do_crc = do_crc + self._crc = 0 self._hash = get_hash(self._hash_type)() self._position = self._write_stream.tell() @@ -1007,12 +1009,21 @@ def write(self, data: bytes) -> int: # Update the hash information self._hash.update(data) + + # Update CRC + if self._do_crc: + self._crc = crc32(data, self._crc) + return self._position def hexdigest(self) -> str: """Return the hexdigest of the hash computed until now.""" return self._hash.hexdigest() + @property + def crc32(self) -> int: + return self._crc + @property def closed(self) -> bool: """Return True if the underlying file is closed.""" @@ -1130,6 +1141,34 @@ def compute_hash_and_size( return hasher.hexdigest(), size +def compute_hash_crc_and_size( + stream: StreamReadBytesType, + hash_type: str, +) -> Tuple[str, int, int]: + """Given a stream and a hash type, return the hash key (hexdigest) and the total size. + + :param stream: an open stream + :param hash_type: the string with a name of a valid hash type + :return: a tuple with ``(hash, size)`` where ``hash`` is the hexdigest and ``size`` is the size in bytes + """ + _hash_chunksize = 524288 + hasher = get_hash(hash_type)() + + # Read and hash all content + size = 0 + crc_value = 0 + while True: + next_chunk = stream.read(_hash_chunksize) + if not next_chunk: + # Empty returned value: EOF + break + hasher.update(next_chunk) + size += len(next_chunk) + crc_value += crc32(next_chunk, crc_value) + + return hasher.hexdigest(), crc_value, size + + def detect_where_sorted( # pylint: disable=too-many-branches, too-many-statements left_iterator: Iterable[Any], right_iterator: Iterable[Any], @@ -1271,3 +1310,15 @@ def merge_sorted(iterator1: Iterable[Any], iterator2: Iterable[Any]) -> Iterator for item, _ in detect_where_sorted(iterator1, iterator2): # Whereever it is (only left, only right, on both) I return the object. yield item + + +def get_md5(handle): + """Compute MD5 of a stream""" + md5_object = hashlib.md5() + block_size = 128 * md5_object.block_size + + chunk = handle.read(block_size) + while chunk: + md5_object.update(chunk) + chunk = handle.read(block_size) + return md5_object diff --git a/disk_objectstore/zipsupport.py b/disk_objectstore/zipsupport.py new file mode 100644 index 00000000..660c3b6f --- /dev/null +++ b/disk_objectstore/zipsupport.py @@ -0,0 +1,253 @@ +""" +Utility module providing ZIP format support + +Many of the functions are adapted from the zipfile stdlib +""" +import struct +import sys +import zipfile +from zipfile import ZIP64_LIMIT, ZIP_DEFLATED, ZIP_FILECOUNT_LIMIT, ZIP_STORED, ZipInfo + +# pylint: disable=too-many-locals, invalid-name, protected-access, too-many-statements +_DD_SIGNATURE = 0x08074B50 +_EXTRA_FIELD_STRUCT = struct.Struct(" ZIP64_LIMIT or zinfo.compress_size > ZIP64_LIMIT: + extra.append(zinfo.file_size) + extra.append(zinfo.compress_size) + file_size = 0xFFFFFFFF + compress_size = 0xFFFFFFFF + else: + file_size = zinfo.file_size + compress_size = zinfo.compress_size + + if zinfo.header_offset > ZIP64_LIMIT: + extra.append(zinfo.header_offset) + header_offset = 0xFFFFFFFF + else: + header_offset = zinfo.header_offset + + extra_data = zinfo.extra + min_version = 0 + if extra: + # Append a ZIP64 field to the extra's + extra_data = _strip_extra(extra_data, (1,)) + extra_data = ( + struct.pack(" ZIP_FILECOUNT_LIMIT: + requires_zip64 = "Files count" + elif centDirOffset > ZIP64_LIMIT: + requires_zip64 = "Central directory offset" + elif centDirSize > ZIP64_LIMIT: + requires_zip64 = "Central directory size" + if requires_zip64: + # Need to write the ZIP64 end-of-archive records + zip64endrec = struct.pack( + zipfile.structEndArchive64, # type: ignore + zipfile.stringEndArchive64, # type: ignore + 44, + 45, + 45, + 0, + 0, + centDirCount, + centDirCount, + centDirSize, + centDirOffset, + ) + fhandle.write(zip64endrec) + + zip64locrec = struct.pack( + zipfile.structEndArchive64Locator, # type: ignore + zipfile.stringEndArchive64Locator, # type: ignore + 0, + pos2, + 1, + ) + fhandle.write(zip64locrec) + centDirCount = min(centDirCount, 0xFFFF) + centDirSize = min(centDirSize, 0xFFFFFFFF) + centDirOffset = min(centDirOffset, 0xFFFFFFFF) + + endrec = struct.pack( + zipfile.structEndArchive, # type: ignore + zipfile.stringEndArchive, # type: ignore + 0, + 0, + centDirCount, + centDirCount, + centDirSize, + centDirOffset, + len(COMMENT), + ) + fhandle.write(endrec) + fhandle.write(COMMENT.encode()) + fhandle.flush() + + +def _strip_extra(extra, xids): + # Remove Extra Fields with specified IDs. + unpack = _EXTRA_FIELD_STRUCT.unpack + modified = False + buffer = [] + start = i = 0 + while i + 4 <= len(extra): + xid, xlen = unpack(extra[i : i + 4]) + j = i + 4 + xlen + if xid in xids: + if i != start: + buffer.append(extra[start:i]) + start = j + modified = True + i = j + if not modified: + return extra + return b"".join(buffer) + + +# Copied from zipfile.py +def is_zip(fpin): + """This if a file is a ZIP archive""" + # Determine file size + fpin.seek(0, 2) + filesize = fpin.tell() + + # Check to see if this is ZIP file with no archive comment (the + # "end of central directory" structure should be the last item in the + # file if this is the case). + try: + fpin.seek(-zipfile.sizeEndCentDir, 2) # type: ignore + except OSError: + return False + data = fpin.read() + if ( + len(data) == zipfile.sizeEndCentDir # type: ignore + and data[0:4] == zipfile.stringEndArchive # type: ignore + and data[-2:] == b"\000\000" + ): + # the signature is correct and there's no comment, unpack structure + return True + + # Either this is not a ZIP file, or it is a ZIP file with an archive + # comment. Search the end of the file for the "end of central directory" + # record signature. The comment is the last item in the ZIP file and may be + # up to 64K long. It is assumed that the "end of central directory" magic + # number does not appear in the comment. + maxCommentStart = max(filesize - (1 << 16) - zipfile.sizeEndCentDir, 0) # type: ignore + fpin.seek(maxCommentStart, 0) + data = fpin.read() + start = data.rfind(zipfile.stringEndArchive) # type: ignore + if start >= 0: + # found the magic number; attempt to unpack and interpret + recData = data[start : start + zipfile.sizeEndCentDir] # type: ignore + if len(recData) != zipfile.sizeEndCentDir: # type: ignore + # Zip file is corrupted. + return False + return True + + # Unable to find a valid end of central directory structure + return False diff --git a/docs/pages/design.md b/docs/pages/design.md index f7eac8c7..7769253b 100644 --- a/docs/pages/design.md +++ b/docs/pages/design.md @@ -192,3 +192,26 @@ In addition, the following design choices have been made: Therefore, caches are per blocks/pages in linux, not per file. Concatenating files does not impact performance on cache efficiency. + +## ZIP Compatible pack format + +The pack file contains the raw binary data of each record, concatenated together. +While this format if efficient on space, it heavily relies on the SQLite index database. +A recent improvement has made the pack file include headers that are similar to the ZIP archive format. +In this format, a local header is written before each record, containing ZIP signature, compressed and uncompressed size of the record, and the CRC32 check sum. +When the pack file is no longer active, e.g. it has exceeded the target size, it can be *sealed* such that it +becomes a valid ZIP archive and can be extracted and validated, using a wide array of software packages. + +The changes are mainly for improving resilience in the case of catastrophic data lost - if the index database is lost or corrupted, the data can still be extracted from the pack files themselves. In the pack files are also damaged, the information in local and central header may be still be used to recover as much data as possible. + +The sealing operation involves checking the integrity of the pack file, calculating the CRC32 checksum needed, and update the local headers. This operation can take a while to complete, but will not cause any down time of the container. Finally, a central header directory of all records to the file is appended. + +Once, a pack file is *sealed*, its status is added to a table of the index database (`Pack` table). If a pack file is missing from this table, it simply means it is not *sealed* yet. Packs that are *sealed* will not be selected for writing new data, even if their sizes are below the target pack sizes. + +Pack files generated prior to this format change will continue to work, but they cannot be *sealed* due to the lack of local header. Repacking is necessary for regerating sealable files. + +The inclusion of local and central directory headers means there is a storage overhead for each record. +The estimated size of each record is 66 bytes for the local header, and 62 + (20, if exceeding 4GB) bytes for the central directory, e.g. 128 + (20) bytes in total. +Hence, storing many small files can be inefficient, however, it is expected that the majority of the records will be much larger, making this overhead insignificant. + +When extracted, each record will result in a single, uncompressed file named with the first 16 characters of its hash. diff --git a/tests/test_container.py b/tests/test_container.py index da1b1df3..0ce060d7 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -9,12 +9,14 @@ import shutil import stat import tempfile +from zipfile import ZipFile import psutil import pytest import disk_objectstore.exceptions as exc from disk_objectstore import CompressMode, Container, ObjectType, database, utils +from disk_objectstore.container import FN_SIZE COMPRESSION_ALGORITHMS_TO_TEST = ["zlib+1", "zlib+9"] @@ -1039,6 +1041,7 @@ def test_sizes( == total_object_size ) + total_header_size = (temp_container._ZIP_HEADER_SIZE) * len(data) if compress_packs: compressed_data = {} for key, val in data.items(): @@ -1051,7 +1054,10 @@ def test_sizes( size_info = temp_container.get_total_size() assert size_info["total_size_packed"] == total_object_size assert size_info["total_size_packed_on_disk"] == total_compressed_size - assert size_info["total_size_packfiles_on_disk"] == total_compressed_size + assert ( + size_info["total_size_packfiles_on_disk"] + == total_compressed_size + total_header_size + ) assert size_info["total_size_packindexes_on_disk"] == os.path.getsize( temp_container._get_pack_index_path() ) @@ -1060,7 +1066,10 @@ def test_sizes( size_info = temp_container.get_total_size() assert size_info["total_size_packed"] == total_object_size assert size_info["total_size_packed_on_disk"] == total_object_size - assert size_info["total_size_packfiles_on_disk"] == total_object_size + assert ( + size_info["total_size_packfiles_on_disk"] + == total_object_size + total_header_size + ) assert size_info["total_size_packindexes_on_disk"] == os.path.getsize( temp_container._get_pack_index_path() ) @@ -1307,7 +1316,7 @@ def test_stream_meta( # pylint: disable=too-many-locals "size": len(content_packed), "pack_id": 0, # First pack, it's a new container "pack_compressed": compress, - "pack_offset": 0, # Only one object in the pack, must start from zero + "pack_offset": temp_container._ZIP_HEADER_SIZE, # Only one object in the pack, must start from zero "pack_length": object_pack_length, }, }, @@ -1423,7 +1432,7 @@ def test_stream_meta_single(temp_container, compress, compression_algorithm): "size": len(content_packed), "pack_id": 0, # First pack, it's a new container "pack_compressed": compress, - "pack_offset": 0, # Only one object in the pack, must start from zero + "pack_offset": temp_container._ZIP_HEADER_SIZE, # Only one object in the pack, must start from zero "pack_length": object_pack_length, }, }, @@ -2199,6 +2208,7 @@ def test_validate_corrupt_packed(temp_container): with open( temp_container._get_pack_path_from_pack_id(str(meta["pack_id"])), "wb" ) as fhandle: + fhandle.seek(temp_container._ZIP_HEADER_SIZE, 0) fhandle.write(b"CORRU890890890809PT") errors = temp_container.validate() @@ -2263,6 +2273,7 @@ def test_validate_corrupt_packed_size(temp_container): # pylint: disable=invali temp_container._get_pack_path_from_pack_id(str(meta["pack_id"])), "wb" ) as fhandle: # Short corrupted string so also the size is wrong + fhandle.write(b" " * temp_container._ZIP_HEADER_SIZE) # Write a dummy header fhandle.write(b"COR") errors = temp_container.validate() @@ -2972,7 +2983,7 @@ def test_clean_storage_with_duplicates_original_deleted( ) @pytest.mark.parametrize("use_streams", [True, False]) @pytest.mark.parametrize("compress", [True, False]) -def test_packs_no_holes( +def test_packs_no_holes( # pylint: disable=too-many-locals temp_container, no_holes, no_holes_read_twice, use_streams, compress, monkeypatch ): """Test what happens when writing directly to packs and asking not to leave back holes.""" @@ -3010,16 +3021,17 @@ def test_packs_no_holes( sizes = temp_container.get_total_size() assert sizes["total_size_packed"] == len(content1) + len(content2) + len(content3) - + header_size = temp_container._ZIP_HEADER_SIZE if no_holes: assert ( - sizes["total_size_packed_on_disk"] == sizes["total_size_packfiles_on_disk"] + sizes["total_size_packed_on_disk"] + header_size * 3 + == sizes["total_size_packfiles_on_disk"] ) else: # We have added twice each object. Note: we cannot use total_size_packed because this would be # before compression assert ( - 2 * sizes["total_size_packed_on_disk"] + 2 * (sizes["total_size_packed_on_disk"] + header_size * 3) == sizes["total_size_packfiles_on_disk"] ) @@ -3178,7 +3190,9 @@ def test_packs_read_in_order(temp_dir): def test_repack(temp_dir): """Test the repacking functionality.""" temp_container = Container(temp_dir) - temp_container.init_container(clear=True, pack_size_target=39) + temp_container.init_container( + clear=True, pack_size_target=39 + 4 * temp_container._ZIP_HEADER_SIZE + ) # data of 10 bytes each. Will fill two packs. data = [ @@ -3219,7 +3233,9 @@ def test_repack(temp_dir): assert counts["packed"] == len(data) size = temp_container.get_total_size() assert size["total_size_packed"] == 10 * len(data) - assert size["total_size_packfiles_on_disk"] == 10 * len(data) + assert size["total_size_packfiles_on_disk"] == ( + 10 + temp_container._ZIP_HEADER_SIZE + ) * len(data) # I delete an object in the middle, an object at the end of a pack, and an object at the beginning. # I also delete the only object @@ -3239,7 +3255,9 @@ def test_repack(temp_dir): # I deleted 4 objects assert size["total_size_packed"] == 10 * (len(data) - len(to_delete)) # Still full size on disk - assert size["total_size_packfiles_on_disk"] == 10 * len(data) + assert size["total_size_packfiles_on_disk"] == ( + 10 + temp_container._ZIP_HEADER_SIZE + ) * len(data) # I now repack temp_container.repack(compress_mode=CompressMode.KEEP) @@ -3255,7 +3273,9 @@ def test_repack(temp_dir): size = temp_container.get_total_size() assert size["total_size_packed"] == 10 * (len(data) - len(to_delete)) # This time also the size on disk should be adapted (it's the main goal of repacking) - assert size["total_size_packfiles_on_disk"] == 10 * (len(data) - len(to_delete)) + assert size["total_size_packfiles_on_disk"] == ( + 10 + temp_container._ZIP_HEADER_SIZE + ) * (len(data) - len(to_delete)) # Check that the content is still correct # Should not raise @@ -3266,6 +3286,68 @@ def test_repack(temp_dir): temp_container.close() +def test_sealing(temp_dir): + """Test the repacking functionality.""" + + temp_container = Container(temp_dir) + temp_container.init_container(clear=True) + + # data of 10 bytes each. + data = [ + b"-123456789", + b"a123456789", + ] + hexkeys = [] + for value in data: + hexkeys.append(temp_container.add_objects_to_pack([value])[0]) + temp_container.seal_pack("0") + temp_container.list_all_objects() + + with ZipFile(temp_container._get_pack_path_from_pack_id("0")) as zif: + for idx, expected in enumerate(data): + with zif.open(hexkeys[idx][:FN_SIZE], mode="r") as fhandle: + zdata = fhandle.read() + assert zdata == expected + # Check the zipfile + zif.testzip() + + sealed = temp_container.get_sealed_packs() + assert 0 in sealed + + infos = temp_container.get_all_pack_info() + assert infos[0]["state"] == "Sealed" + assert infos[0]["location"] is None + assert infos[0]["pack_id"] == 0 + + # Since pack 0 is seal, new data should be written to pack 1 + assert temp_container._get_pack_id_to_write_to() == 1 + + # Sealed pack cannot be sealed again + with pytest.raises(ValueError, match="been sealed already"): + temp_container.seal_pack(0) + + # Some new data + data = [ + b"-123456788", + b"a123456788", + ] + hexkeys = [] + for value in data: + hexkeys.append(temp_container.add_objects_to_pack([value])[0]) + + # Destroy header - the file is no longer ZIP compatible, e.g. similar to the old + # format + with open(f"{temp_dir}/packs/1", "r+b") as fhandle: + fhandle.write(b"0000") + + # Sealed pack cannot be sealed again + with pytest.raises(ValueError, match=r"ZIP compatible"): + temp_container.seal_pack(1) + + # Important before exiting from the tests + temp_container.close() + + def test_not_implemented_repacks(temp_container): """Check the error for not implemented repack methods.""" # We need to have at least one pack