From 0b2cb9f8ac12ab9eb6d20653d1e1df59f4c6599e Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 9 Dec 2021 22:02:33 +0000 Subject: [PATCH 01/21] Initial commit for Zip-compatibility support --- disk_objectstore/container.py | 149 +++++++++++++++++++++-- disk_objectstore/utils.py | 15 ++- disk_objectstore/zipsupport.py | 210 +++++++++++++++++++++++++++++++++ 3 files changed, 364 insertions(+), 10 deletions(-) create mode 100644 disk_objectstore/zipsupport.py diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index e34fc7d2..a30a2d1d 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -6,8 +6,10 @@ import json import os import shutil +import struct import uuid import warnings +import zipfile from collections import defaultdict, namedtuple from contextlib import contextmanager from enum import Enum @@ -26,6 +28,14 @@ Union, overload, ) +from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipInfo +from zlib import crc32 + +from disk_objectstore.zipsupport import ( + write_end_record, + write_file_describer, + write_zip_header, +) try: from typing import Literal @@ -35,7 +45,7 @@ from sqlalchemy.orm.session import Session from sqlalchemy.sql import func -from sqlalchemy.sql.expression import delete, select, text, update +from sqlalchemy.sql.expression import delete, select, text, update, values from .database import Obj, get_session from .exceptions import InconsistentContent, NotExistent, NotInitialised @@ -337,6 +347,9 @@ def init_container( if not is_known_hash(hash_type): raise ValueError(f'Unknown hash type "{hash_type}"') + # Place holder for unknown hash + self._hash_place_holder = get_hash(self.hash_type)(b"").hexdigest() + if clear: if os.path.exists(self._folder): shutil.rmtree(self._folder) @@ -1134,7 +1147,10 @@ def get_total_size(self) -> Dict[str, int]: @contextmanager def lock_pack( - self, pack_id: str, allow_repack_pack: bool = False + self, + pack_id: str, + allow_repack_pack: bool = False, + mode="ab", ) -> Iterator[StreamWriteBytesType]: """Lock the given pack id. Use as a context manager. @@ -1154,7 +1170,7 @@ def lock_pack( ) try: with open(lock_file, "x"): - with open(pack_file, "ab") as pack_handle: + with open(pack_file, mode) as pack_handle: yield pack_handle finally: # Release resource (I check if it exists in case there was an exception) @@ -1248,7 +1264,7 @@ def _write_data_to_packfile( read_handle: StreamReadBytesType, compress: bool, hash_type: Optional[str] = None, - ) -> Union[Tuple[int, None], Tuple[int, str]]: + ) -> Union[Tuple[int, None, int], Tuple[int, str, int]]: """Append data, read from read_handle until it ends, to the correct packfile. Return the number of bytes READ (note that this will be different @@ -1280,6 +1296,7 @@ def _write_data_to_packfile( compressobj = self._get_compressobj_instance() count_read_bytes = 0 + crc_value = 0 while True: chunk = read_handle.read(self._CHUNKSIZE) if chunk == b"": @@ -1290,6 +1307,7 @@ def _write_data_to_packfile( count_read_bytes += len(chunk) if hash_type: hasher.update(chunk) + crc_value = crc32(chunk, crc_value) if compress: pack_handle.write(compressobj.compress(chunk)) else: @@ -1300,7 +1318,7 @@ def _write_data_to_packfile( # compressobj pack_handle.write(compressobj.flush()) - return (count_read_bytes, hasher.hexdigest() if hash_type else None) + return (count_read_bytes, hasher.hexdigest() if hash_type else None, crc_value) def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches self, @@ -1396,6 +1414,11 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches obj_dict["hashkey"] = loose_hashkey obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress + # Write the header + write_zip_header( + pack_handle, loose_hashkey, None if not compress else "zlib" + ) + # Record the start of the actual data obj_dict["offset"] = pack_handle.tell() try: with open( @@ -1406,6 +1429,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches ( obj_dict["size"], new_hashkey, + crc_value, ) = self._write_data_to_packfile( pack_handle=pack_handle, read_handle=loose_handle, @@ -1424,6 +1448,11 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches ) obj_dict["length"] = pack_handle.tell() - obj_dict["offset"] + # Write the tailing marker + write_file_describer( + pack_handle, crc_value, obj_dict["length"], obj_dict["size"] + ) + # Appending for later bulk commit - see comments in add_streamed_objects_to_pack obj_dicts.append(obj_dict) @@ -1670,6 +1699,11 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b obj_dict: Dict[str, Any] = {} obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress + write_zip_header( + pack_handle, + self._hash_place_holder, + "zlib" if compress else None, + ) obj_dict["offset"] = pack_handle.tell() with stream_context_manager as stream: if no_holes and no_holes_read_twice: @@ -1693,6 +1727,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b ( obj_dict["size"], obj_dict["hashkey"], + crc_value, ) = self._write_data_to_packfile( pack_handle=pack_handle, read_handle=stream, @@ -1700,6 +1735,10 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b hash_type=self.hash_type, ) obj_dict["length"] = pack_handle.tell() - obj_dict["offset"] + write_file_describer( + pack_handle, crc_value, obj_dict["length"], obj_dict["size"] + ) + # Here, we have appended the object to the pack file. # And now that we are done, we know the hash key. # However, we have to cope with the fact that an object with the same hash key @@ -2579,9 +2618,14 @@ def repack_pack( .where(Obj.pack_id == pack_id) .order_by(Obj.offset) ) - for rowid, hashkey, size, offset, length, compressed in session.execute( - stmt - ): + for ( + rowid, + hashkey, + size, + ofggfset, + length, + compressed, + ) in session.execute(stmt): # Since I am assuming above that the method is `KEEP`, I will just transfer # the bytes. Otherwise I have to properly take into account compression in the # source and in the destination. @@ -2593,7 +2637,13 @@ def repack_pack( obj_dict["pack_id"] = self._REPACK_PACK_ID obj_dict["compressed"] = compressed obj_dict["size"] = size + write_zip_header( + write_pack_handle, + obj_dict["hashkey"], + "zlib" if compressed else None, + ) obj_dict["offset"] = write_pack_handle.tell() + crc_value = 0 # Transfer data in chunks. # No need to rehash - it's the same container so the same hash. @@ -2605,7 +2655,15 @@ def repack_pack( # Returns an empty bytes object on EOF. break write_pack_handle.write(chunk) + crc_value = crc32(chunk, crc_value) obj_dict["length"] = write_pack_handle.tell() - obj_dict["offset"] + # Write trailing file describer + write_file_describer( + write_pack_handle, + crc_value, + obj_dict["length"], + obj_dict["size"], + ) # Appending for later bulk commit # I will assume that all objects of a single pack fit in memory @@ -2671,3 +2729,78 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. + + @property + def _zip_header_size(self): + """Return the expected size of the zip header if written by this package""" + return 30 + len(self._hash_place_holder) + 20 + + def seal_pack(self, pack_id, dryrun=False): + """ + Seal a pack by adding a central directory in the end, making the file a fully functional + ZIP file. + """ + session = self._get_cached_session() + all_zipinfo = [] + header_size = self._zip_header_size + # Size of the file local header + # the extra field takes 20 bytes with format " Optional[str]: class HashWriterWrapper: """A class that gets a stream open in write mode and wraps it in a new class that computes a hash while writing.""" - def __init__(self, write_stream: BinaryIO, hash_type: str) -> None: + def __init__(self, write_stream: BinaryIO, hash_type: str, do_crc=True) -> None: """Create the class from a given compressed bytestream. :param write_stream: an open bytes stream that supports the .write() method. @@ -971,6 +971,8 @@ def __init__(self, write_stream: BinaryIO, hash_type: str) -> None: self._write_stream = write_stream assert "b" in self._write_stream.mode self._hash_type = hash_type + self._do_crc = do_crc + self._crc = 0 self._hash = get_hash(self._hash_type)() self._position = self._write_stream.tell() @@ -1008,12 +1010,21 @@ def write(self, data: bytes) -> int: # Update the hash information self._hash.update(data) + + # Update CRC + if self._do_crc: + self._crc = crc32(data, self._crc) + return self._position def hexdigest(self) -> str: """Return the hexdigest of the hash computed until now.""" return self._hash.hexdigest() + @property + def crc32(self) -> int: + return self._crc + @property def closed(self) -> bool: """Return True if the underlying file is closed.""" diff --git a/disk_objectstore/zipsupport.py b/disk_objectstore/zipsupport.py new file mode 100644 index 00000000..96cb32f3 --- /dev/null +++ b/disk_objectstore/zipsupport.py @@ -0,0 +1,210 @@ +import struct +import sys +import zipfile +from zipfile import ( + ZIP64_LIMIT, + ZIP64_VERSION, + ZIP_DEFLATED, + ZIP_FILECOUNT_LIMIT, + ZIP_STORED, + ZipInfo, +) + +_DD_SIGNATURE = 0x08074B50 +_EXTRA_FIELD_STRUCT = struct.Struct(" ZIP64_LIMIT or zinfo.compress_size > ZIP64_LIMIT: + extra.append(zinfo.file_size) + extra.append(zinfo.compress_size) + file_size = 0xFFFFFFFF + compress_size = 0xFFFFFFFF + else: + file_size = zinfo.file_size + compress_size = zinfo.compress_size + + if zinfo.header_offset > ZIP64_LIMIT: + extra.append(zinfo.header_offset) + header_offset = 0xFFFFFFFF + else: + header_offset = zinfo.header_offset + + extra_data = zinfo.extra + min_version = 0 + if extra: + # Append a ZIP64 field to the extra's + extra_data = _strip_extra(extra_data, (1,)) + extra_data = ( + struct.pack(" ZIP_FILECOUNT_LIMIT: + requires_zip64 = "Files count" + elif centDirOffset > ZIP64_LIMIT: + requires_zip64 = "Central directory offset" + elif centDirSize > ZIP64_LIMIT: + requires_zip64 = "Central directory size" + if requires_zip64: + # Need to write the ZIP64 end-of-archive records + zip64endrec = struct.pack( + zipfile.structEndArchive64, + zipfile.stringEndArchive64, + 44, + 45, + 45, + 0, + 0, + centDirCount, + centDirCount, + centDirSize, + centDirOffset, + ) + fhandle.write(zip64endrec) + + zip64locrec = struct.pack( + zipfile.structEndArchive64Locator, + zipfile.stringEndArchive64Locator, + 0, + pos2, + 1, + ) + fhandle.write(zip64locrec) + centDirCount = min(centDirCount, 0xFFFF) + centDirSize = min(centDirSize, 0xFFFFFFFF) + centDirOffset = min(centDirOffset, 0xFFFFFFFF) + + endrec = struct.pack( + zipfile.structEndArchive, + zipfile.stringEndArchive, + 0, + 0, + centDirCount, + centDirCount, + centDirSize, + centDirOffset, + len(COMMENT), + ) + fhandle.write(endrec) + fhandle.write(COMMENT) + fhandle.flush() + + +def _strip_extra(extra, xids): + # Remove Extra Fields with specified IDs. + unpack = _EXTRA_FIELD_STRUCT.unpack + modified = False + buffer = [] + start = i = 0 + while i + 4 <= len(extra): + xid, xlen = unpack(extra[i : i + 4]) + j = i + 4 + xlen + if xid in xids: + if i != start: + buffer.append(extra[start:i]) + start = j + modified = True + i = j + if not modified: + return extra + return b"".join(buffer) From 4e200f55c12beace4d0c474d9319f4dd2a4e6dba Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 9 Dec 2021 22:27:46 +0000 Subject: [PATCH 02/21] Fix a bug and tests --- disk_objectstore/container.py | 14 ++++++++------ tests/test_container.py | 17 +++++++++++++---- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index a30a2d1d..a823b9c2 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -348,7 +348,14 @@ def init_container( raise ValueError(f'Unknown hash type "{hash_type}"') # Place holder for unknown hash - self._hash_place_holder = get_hash(self.hash_type)(b"").hexdigest() + hasher = get_hash(hash_type)() + hasher.update(b"") + self._hash_place_holder = hasher.hexdigest() + + # Size of the ZIP local file header + self._zip_header_size = 30 + len(self._hash_place_holder) + 20 + # Size of the Data descriptor + self._zip_dd_size = 24 # struct.calcsize(" Date: Thu, 9 Dec 2021 23:24:02 +0000 Subject: [PATCH 03/21] Added test for sealing pack files --- disk_objectstore/container.py | 15 +++++++++++---- disk_objectstore/zipsupport.py | 5 ++++- tests/test_container.py | 27 +++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index a823b9c2..a482fa38 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -32,6 +32,8 @@ from zlib import crc32 from disk_objectstore.zipsupport import ( + _DD_SIGNATURE, + _DD_SIZE, write_end_record, write_file_describer, write_zip_header, @@ -2773,7 +2775,7 @@ def seal_pack(self, pack_id, dryrun=False): all_zipinfo.sort(key=lambda x: x.header_offset) # We write the end of file table with self.lock_pack( - str(self._REPACK_PACK_ID), allow_repack_pack=False, mode="r+b" + str(pack_id), allow_repack_pack=False, mode="r+b" ) as write_pack_handle: # Update the hashes as filenames @@ -2797,12 +2799,17 @@ def seal_pack(self, pack_id, dryrun=False): write_pack_handle.seek(-local_header[-2], 1) write_pack_handle.write(zipinfo.filename.encode("ascii")) + # skip to the end of the record and Find the CRC + offset = local_header[-1] + zipinfo.compress_size + write_pack_handle.seek(offset, 1) + data = write_pack_handle.read(_DD_SIZE) + dd_sign, zipinfo.CRC, _, _ = struct.unpack(" Date: Thu, 9 Dec 2021 23:33:45 +0000 Subject: [PATCH 04/21] Added is_zip function --- disk_objectstore/container.py | 4 ++++ disk_objectstore/zipsupport.py | 44 ++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index a482fa38..a4d6df67 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2813,3 +2813,7 @@ def seal_pack(self, pack_id, dryrun=False): with self.lock_pack(str(pack_id), allow_repack_pack=False) as write_pack_handle: write_end_record(write_pack_handle, all_zipinfo) + + def _is_pack_sealed(self, pack_id): + """Check if a pack is sealed""" + pack_loc = self._get_pack_path_from_pack_id(str(pack_id)) diff --git a/disk_objectstore/zipsupport.py b/disk_objectstore/zipsupport.py index 6f2ae798..4ad2d71d 100644 --- a/disk_objectstore/zipsupport.py +++ b/disk_objectstore/zipsupport.py @@ -211,3 +211,47 @@ def _strip_extra(extra, xids): if not modified: return extra return b"".join(buffer) + + +# Copied from zipfile.py +def is_zip(fpin): + """This if a file is a ZIP archive""" + # Determine file size + fpin.seek(0, 2) + filesize = fpin.tell() + + # Check to see if this is ZIP file with no archive comment (the + # "end of central directory" structure should be the last item in the + # file if this is the case). + try: + fpin.seek(-zipfile.sizeEndCentDir, 2) + except OSError: + return False + data = fpin.read() + if ( + len(data) == zipfile.sizeEndCentDir + and data[0:4] == zipfile.stringEndArchive + and data[-2:] == b"\000\000" + ): + # the signature is correct and there's no comment, unpack structure + return True + + # Either this is not a ZIP file, or it is a ZIP file with an archive + # comment. Search the end of the file for the "end of central directory" + # record signature. The comment is the last item in the ZIP file and may be + # up to 64K long. It is assumed that the "end of central directory" magic + # number does not appear in the comment. + maxCommentStart = max(filesize - (1 << 16) - zipfile.sizeEndCentDir, 0) + fpin.seek(maxCommentStart, 0) + data = fpin.read() + start = data.rfind(zipfile.stringEndArchive) + if start >= 0: + # found the magic number; attempt to unpack and interpret + recData = data[start : start + zipfile.sizeEndCentDir] + if len(recData) != zipfile.sizeEndCentDir: + # Zip file is corrupted. + return False + return True + + # Unable to find a valid end of central directory structure + return False From 831171fa3b8f09ac4539ec9a41e1f657619fccf6 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Fri, 10 Dec 2021 10:22:08 +0000 Subject: [PATCH 05/21] only include part of the hashkey as filename The full hashkey is 64 bytes long, that is a bit too much. The need here is to ensure each file haev different names. If there are two files with the same name the ZIP file is still valid, but requires careful handling when extracting the file. Optionally, we can verify filename uniqueness when sealing the pack. --- disk_objectstore/container.py | 14 +++++++++----- tests/test_container.py | 17 +++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index a4d6df67..0e2b6703 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -79,6 +79,8 @@ "ObjQueryResults", ["hashkey", "offset", "length", "compressed", "size"] ) +FN_SIZE = 16 # Size for the file name used in LOCAL header for each object + class ObjectType(Enum): """Enum that describes the various types of an objec (as returned in ``meta['type']``).""" @@ -352,7 +354,7 @@ def init_container( # Place holder for unknown hash hasher = get_hash(hash_type)() hasher.update(b"") - self._hash_place_holder = hasher.hexdigest() + self._hash_place_holder = hasher.hexdigest()[:FN_SIZE] # Size of the ZIP local file header self._zip_header_size = 30 + len(self._hash_place_holder) + 20 @@ -1425,7 +1427,9 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches obj_dict["compressed"] = compress # Write the header write_zip_header( - pack_handle, loose_hashkey, None if not compress else "zlib" + pack_handle, + loose_hashkey[:FN_SIZE], + None if not compress else "zlib", ) # Record the start of the actual data obj_dict["offset"] = pack_handle.tell() @@ -2631,7 +2635,7 @@ def repack_pack( rowid, hashkey, size, - ofggfset, + offset, length, compressed, ) in session.execute(stmt): @@ -2648,7 +2652,7 @@ def repack_pack( obj_dict["size"] = size write_zip_header( write_pack_handle, - obj_dict["hashkey"], + obj_dict["hashkey"][:FN_SIZE], "zlib" if compressed else None, ) obj_dict["offset"] = write_pack_handle.tell() @@ -2763,7 +2767,7 @@ def seal_pack(self, pack_id, dryrun=False): ) for rowid, hashkey, size, offset, length, compressed in session.execute(stmt): - zipinfo = ZipInfo(filename=hashkey) + zipinfo = ZipInfo(filename=hashkey[:FN_SIZE]) zipinfo.compress_type = ZIP_DEFLATED if compressed else ZIP_STORED zipinfo.file_size = size zipinfo.compress_size = length diff --git a/tests/test_container.py b/tests/test_container.py index 9ce58729..86baebcb 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -9,12 +9,14 @@ import shutil import stat import tempfile +from zipfile import ZipFile import psutil import pytest import disk_objectstore.exceptions as exc from disk_objectstore import CompressMode, Container, ObjectType, database, utils +from disk_objectstore.container import FN_SIZE COMPRESSION_ALGORITHMS_TO_TEST = ["zlib+1", "zlib+9"] @@ -3305,7 +3307,6 @@ def test_repack(temp_dir): def test_sealing(temp_dir): """Test the repacking functionality.""" - from zipfile import ZipFile temp_container = Container(temp_dir) temp_container.init_container(clear=True) @@ -3321,13 +3322,13 @@ def test_sealing(temp_dir): temp_container.seal_pack("0") temp_container.list_all_objects() - zif = ZipFile(temp_container._get_pack_path_from_pack_id("0")) - for i in range(len(data)): - with zif.open(hexkeys[i], mode="r") as fh: - zdata = fh.read() - assert zdata == data[i] - # Check the zipfile - zif.testzip() + with ZipFile(temp_container._get_pack_path_from_pack_id("0")) as zif: + for idx, expected in enumerate(data): + with zif.open(hexkeys[idx][:FN_SIZE], mode="r") as fhandle: + zdata = fhandle.read() + assert zdata == expected + # Check the zipfile + zif.testzip() def test_not_implemented_repacks(temp_container): From 515e61b5ac4fa4e44e61d35779dcf08757908b70 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Fri, 17 Dec 2021 17:45:39 +0000 Subject: [PATCH 06/21] Remove the writting of data descriptor The data descriptor is designed for non-seekable stream. Previously, it was used to store the file/compressed lengths. However, since the pack file is only made to be valid ZIP file at the time of sealing we can skip the datadescriptor and instead update the CRC, file_size and compressed_size at the time of sealling, togther with the filename. This save the 20 bytes data descriptor for each record. Note that the CRC still needs to be stored. I make it store in the SQLite database for now. This means that old SQLite database may need to be migrated. Alternatiely, I can have it in a different table. It can be decided later. --- disk_objectstore/container.py | 66 +++++++++++++++++++++++------------ disk_objectstore/database.py | 5 +++ tests/test_container.py | 4 +-- 3 files changed, 49 insertions(+), 26 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 0e2b6703..9e7c18ee 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1460,11 +1460,12 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches ) ) obj_dict["length"] = pack_handle.tell() - obj_dict["offset"] + obj_dict["crc"] = crc_value # Write the tailing marker - write_file_describer( - pack_handle, crc_value, obj_dict["length"], obj_dict["size"] - ) + # write_file_describer( + # pack_handle, crc_value, obj_dict["length"], obj_dict["size"] + # ) # Appending for later bulk commit - see comments in add_streamed_objects_to_pack obj_dicts.append(obj_dict) @@ -1748,9 +1749,11 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b hash_type=self.hash_type, ) obj_dict["length"] = pack_handle.tell() - obj_dict["offset"] - write_file_describer( - pack_handle, crc_value, obj_dict["length"], obj_dict["size"] - ) + obj_dict["crc"] = crc_value + + # write_file_describer( + # pack_handle, crc_value, obj_dict["length"], obj_dict["size"] + # ) # Here, we have appended the object to the pack file. # And now that we are done, we know the hash key. @@ -2627,6 +2630,7 @@ def repack_pack( Obj.offset, Obj.length, Obj.compressed, + Obj.crc, ) .where(Obj.pack_id == pack_id) .order_by(Obj.offset) @@ -2638,6 +2642,7 @@ def repack_pack( offset, length, compressed, + crc, ) in session.execute(stmt): # Since I am assuming above that the method is `KEEP`, I will just transfer # the bytes. Otherwise I have to properly take into account compression in the @@ -2650,13 +2655,13 @@ def repack_pack( obj_dict["pack_id"] = self._REPACK_PACK_ID obj_dict["compressed"] = compressed obj_dict["size"] = size + obj_dict["crc"] = crc write_zip_header( write_pack_handle, obj_dict["hashkey"][:FN_SIZE], "zlib" if compressed else None, ) obj_dict["offset"] = write_pack_handle.tell() - crc_value = 0 # Transfer data in chunks. # No need to rehash - it's the same container so the same hash. @@ -2668,15 +2673,15 @@ def repack_pack( # Returns an empty bytes object on EOF. break write_pack_handle.write(chunk) - crc_value = crc32(chunk, crc_value) obj_dict["length"] = write_pack_handle.tell() - obj_dict["offset"] + # Write trailing file describer - write_file_describer( - write_pack_handle, - crc_value, - obj_dict["length"], - obj_dict["size"], - ) + # write_file_describer( + # write_pack_handle, + # crc_value, + # obj_dict["length"], + # obj_dict["size"], + # ) # Appending for later bulk commit # I will assume that all objects of a single pack fit in memory @@ -2761,16 +2766,20 @@ def seal_pack(self, pack_id, dryrun=False): Obj.offset, Obj.length, Obj.compressed, + Obj.crc, ) .where(Obj.pack_id == pack_id) .order_by(Obj.offset) ) - for rowid, hashkey, size, offset, length, compressed in session.execute(stmt): + for rowid, hashkey, size, offset, length, compressed, crc in session.execute( + stmt + ): zipinfo = ZipInfo(filename=hashkey[:FN_SIZE]) zipinfo.compress_type = ZIP_DEFLATED if compressed else ZIP_STORED zipinfo.file_size = size zipinfo.compress_size = length + zipinfo.CRC = crc # Offset for the header zipinfo.header_offset = offset - header_size all_zipinfo.append(zipinfo) @@ -2787,11 +2796,12 @@ def seal_pack(self, pack_id, dryrun=False): write_pack_handle.seek(zipinfo.header_offset, 0) header_data = write_pack_handle.read(zipfile.sizeFileHeader) local_header = struct.unpack(zipfile.structFileHeader, header_data) + fnlen = local_header[-2] # Check the magic if local_header[0] != zipfile.stringFileHeader: raise ValueError(f"Cannot find ZIP header for record {zipinfo}") # Read the filename - fname = write_pack_handle.read(local_header[-2]).decode("ascii") + fname = write_pack_handle.read(fnlen).decode("ascii") # Seek back and rewrite the file name if fname != zipinfo.filename: @@ -2800,15 +2810,25 @@ def seal_pack(self, pack_id, dryrun=False): f"Dryrun - changing filename {fname} to {zipinfo.filename}" ) else: - write_pack_handle.seek(-local_header[-2], 1) + write_pack_handle.seek(-fnlen, 1) write_pack_handle.write(zipinfo.filename.encode("ascii")) - # skip to the end of the record and Find the CRC - offset = local_header[-1] + zipinfo.compress_size - write_pack_handle.seek(offset, 1) - data = write_pack_handle.read(_DD_SIZE) - dd_sign, zipinfo.CRC, _, _ = struct.unpack(" Date: Fri, 17 Dec 2021 23:02:29 +0000 Subject: [PATCH 07/21] Move writing header inside _write_data_to_packfile The return values have been updated to contain the actual offset of the data, not including the header. --- disk_objectstore/container.py | 31 +++++++++++++++---------------- tests/test_container.py | 7 ++++--- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 9e7c18ee..7235230c 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1305,9 +1305,14 @@ def _write_data_to_packfile( if compress: compressobj = self._get_compressobj_instance() - + write_zip_header( + pack_handle, + self._hash_place_holder, + "zlib" if compress else None, + ) count_read_bytes = 0 crc_value = 0 + offset = pack_handle.tell() while True: chunk = read_handle.read(self._CHUNKSIZE) if chunk == b"": @@ -1329,7 +1334,12 @@ def _write_data_to_packfile( # compressobj pack_handle.write(compressobj.flush()) - return (count_read_bytes, hasher.hexdigest() if hash_type else None, crc_value) + return ( + offset, + count_read_bytes, + hasher.hexdigest() if hash_type else None, + crc_value, + ) def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches self, @@ -1425,14 +1435,6 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches obj_dict["hashkey"] = loose_hashkey obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress - # Write the header - write_zip_header( - pack_handle, - loose_hashkey[:FN_SIZE], - None if not compress else "zlib", - ) - # Record the start of the actual data - obj_dict["offset"] = pack_handle.tell() try: with open( self._get_loose_path_from_hashkey(loose_hashkey), "rb" @@ -1440,6 +1442,7 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches # The second parameter is `None` since we are not computing the hash # We can instead pass the hash algorithm and assert that it is correct ( + obj_dict["offset"], obj_dict["size"], new_hashkey, crc_value, @@ -1713,11 +1716,7 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b obj_dict: Dict[str, Any] = {} obj_dict["pack_id"] = pack_int_id obj_dict["compressed"] = compress - write_zip_header( - pack_handle, - self._hash_place_holder, - "zlib" if compress else None, - ) + obj_dict["offset"] = pack_handle.tell() with stream_context_manager as stream: if no_holes and no_holes_read_twice: @@ -1737,8 +1736,8 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b # I therefore need to seek back to zero, because the next line will read it again # in _write_data_to_packfile. stream.seek(0) - ( + obj_dict["offset"], obj_dict["size"], obj_dict["hashkey"], crc_value, diff --git a/tests/test_container.py b/tests/test_container.py index 3733be43..9069a8f3 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3046,16 +3046,17 @@ def test_packs_no_holes( sizes = temp_container.get_total_size() assert sizes["total_size_packed"] == len(content1) + len(content2) + len(content3) - + header_size = temp_container._zip_header_size if no_holes: assert ( - sizes["total_size_packed_on_disk"] == sizes["total_size_packfiles_on_disk"] + sizes["total_size_packed_on_disk"] + header_size * 3 + == sizes["total_size_packfiles_on_disk"] ) else: # We have added twice each object. Note: we cannot use total_size_packed because this would be # before compression assert ( - 2 * sizes["total_size_packed_on_disk"] + 2 * (sizes["total_size_packed_on_disk"] + header_size * 3) == sizes["total_size_packfiles_on_disk"] ) From 75ef344bbf2d14b23c41c4ccc17047b97e60ec83 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 23 Dec 2021 07:08:22 +0000 Subject: [PATCH 08/21] Remove on-the-fly ucomputation of CRC --- disk_objectstore/container.py | 33 +-------------------------------- disk_objectstore/database.py | 5 ----- 2 files changed, 1 insertion(+), 37 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 7235230c..449d8d7e 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1311,7 +1311,6 @@ def _write_data_to_packfile( "zlib" if compress else None, ) count_read_bytes = 0 - crc_value = 0 offset = pack_handle.tell() while True: chunk = read_handle.read(self._CHUNKSIZE) @@ -1323,7 +1322,6 @@ def _write_data_to_packfile( count_read_bytes += len(chunk) if hash_type: hasher.update(chunk) - crc_value = crc32(chunk, crc_value) if compress: pack_handle.write(compressobj.compress(chunk)) else: @@ -1338,7 +1336,6 @@ def _write_data_to_packfile( offset, count_read_bytes, hasher.hexdigest() if hash_type else None, - crc_value, ) def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches @@ -1445,7 +1442,6 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches obj_dict["offset"], obj_dict["size"], new_hashkey, - crc_value, ) = self._write_data_to_packfile( pack_handle=pack_handle, read_handle=loose_handle, @@ -1463,12 +1459,6 @@ def pack_all_loose( # pylint: disable=too-many-locals,too-many-branches ) ) obj_dict["length"] = pack_handle.tell() - obj_dict["offset"] - obj_dict["crc"] = crc_value - - # Write the tailing marker - # write_file_describer( - # pack_handle, crc_value, obj_dict["length"], obj_dict["size"] - # ) # Appending for later bulk commit - see comments in add_streamed_objects_to_pack obj_dicts.append(obj_dict) @@ -1740,7 +1730,6 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b obj_dict["offset"], obj_dict["size"], obj_dict["hashkey"], - crc_value, ) = self._write_data_to_packfile( pack_handle=pack_handle, read_handle=stream, @@ -1748,11 +1737,6 @@ def add_streamed_objects_to_pack( # pylint: disable=too-many-locals, too-many-b hash_type=self.hash_type, ) obj_dict["length"] = pack_handle.tell() - obj_dict["offset"] - obj_dict["crc"] = crc_value - - # write_file_describer( - # pack_handle, crc_value, obj_dict["length"], obj_dict["size"] - # ) # Here, we have appended the object to the pack file. # And now that we are done, we know the hash key. @@ -2629,7 +2613,6 @@ def repack_pack( Obj.offset, Obj.length, Obj.compressed, - Obj.crc, ) .where(Obj.pack_id == pack_id) .order_by(Obj.offset) @@ -2641,7 +2624,6 @@ def repack_pack( offset, length, compressed, - crc, ) in session.execute(stmt): # Since I am assuming above that the method is `KEEP`, I will just transfer # the bytes. Otherwise I have to properly take into account compression in the @@ -2654,7 +2636,6 @@ def repack_pack( obj_dict["pack_id"] = self._REPACK_PACK_ID obj_dict["compressed"] = compressed obj_dict["size"] = size - obj_dict["crc"] = crc write_zip_header( write_pack_handle, obj_dict["hashkey"][:FN_SIZE], @@ -2674,14 +2655,6 @@ def repack_pack( write_pack_handle.write(chunk) obj_dict["length"] = write_pack_handle.tell() - obj_dict["offset"] - # Write trailing file describer - # write_file_describer( - # write_pack_handle, - # crc_value, - # obj_dict["length"], - # obj_dict["size"], - # ) - # Appending for later bulk commit # I will assume that all objects of a single pack fit in memory obj_dicts.append(obj_dict) @@ -2765,20 +2738,16 @@ def seal_pack(self, pack_id, dryrun=False): Obj.offset, Obj.length, Obj.compressed, - Obj.crc, ) .where(Obj.pack_id == pack_id) .order_by(Obj.offset) ) - for rowid, hashkey, size, offset, length, compressed, crc in session.execute( - stmt - ): + for rowid, hashkey, size, offset, length, compressed in session.execute(stmt): zipinfo = ZipInfo(filename=hashkey[:FN_SIZE]) zipinfo.compress_type = ZIP_DEFLATED if compressed else ZIP_STORED zipinfo.file_size = size zipinfo.compress_size = length - zipinfo.CRC = crc # Offset for the header zipinfo.header_offset = offset - header_size all_zipinfo.append(zipinfo) diff --git a/disk_objectstore/database.py b/disk_objectstore/database.py index 28c5ca22..73acc06b 100644 --- a/disk_objectstore/database.py +++ b/disk_objectstore/database.py @@ -29,11 +29,6 @@ class Obj(Base): # pylint: disable=too-few-public-methods pack_id = Column( Integer, nullable=False ) # integer ID of the pack in which this entry is stored - # CRC of the object - used for sealing the pack and making it ZIP compatible - crc = Column( - Integer, - nullable=True, - ) def get_session( From b7052979fade4f6b2dc617be84a07fd6b90e9f3b Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 23 Dec 2021 08:09:37 +0000 Subject: [PATCH 09/21] Added pack table to the database The Pack table contains a list of sealed/archived packs. --- disk_objectstore/container.py | 99 ++++++++++++++++++++++++++++------- disk_objectstore/database.py | 12 +++++ disk_objectstore/utils.py | 40 ++++++++++++++ 3 files changed, 131 insertions(+), 20 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 449d8d7e..78b88d0b 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -34,6 +34,7 @@ from disk_objectstore.zipsupport import ( _DD_SIGNATURE, _DD_SIZE, + is_zip, write_end_record, write_file_describer, write_zip_header, @@ -49,7 +50,7 @@ from sqlalchemy.sql import func from sqlalchemy.sql.expression import delete, select, text, update, values -from .database import Obj, get_session +from .database import Obj, Pack, get_session from .exceptions import InconsistentContent, NotExistent, NotInitialised from .utils import ( CallbackStreamWrapper, @@ -63,9 +64,11 @@ ZlibStreamDecompresser, chunk_iterator, compute_hash_and_size, + compute_hash_crc_and_size, detect_where_sorted, get_compressobj_instance, get_hash, + get_md5, get_stream_decompresser, is_known_hash, merge_sorted, @@ -2288,7 +2291,10 @@ def export( # Let us also compute the hash def _validate_hashkeys_pack( - self, pack_id: int, callback: Optional[Callable] = None + self, + pack_id: int, + callback: Optional[Callable] = None, + include_crc: bool = False, ) -> Dict[ str, Union[List[Union[str, Any]], List[Any]] ]: # pylint: disable=too-many-locals @@ -2319,6 +2325,8 @@ def _validate_hashkeys_pack( - after every object has been processed, with ``action=='update'`` and value equal to the number of newly processed entries since the last call. - at the end, with ``action=='close'`` and value equal to ``None``. + :param include_crc: If set to ``True``, compute the CRC32 value while validating the pack. The CRC32 is needed + for sealing the pack. Here is a minimal example of progress bar using the ``tqdm`` library: @@ -2369,6 +2377,8 @@ def callback(self, action, value): # Open the pack only once, read it in order pack_path = self._get_pack_path_from_pack_id(str(pack_id)) current_pos = 0 + if include_crc: + computed_crc = {} with open(pack_path, mode="rb") as pack_handle: stmt = ( select(Obj.hashkey, Obj.size, Obj.offset, Obj.length, Obj.compressed) @@ -2382,9 +2392,16 @@ def callback(self, action, value): if compressed: obj_reader = self._get_stream_decompresser()(obj_reader) - computed_hash, computed_size = compute_hash_and_size( - obj_reader, self.hash_type - ) + if include_crc: + ( + computed_hash, + computed_crc[hashkey], + computed_size, + ) = compute_hash_crc_and_size(obj_reader, self.hash_type) + else: + computed_hash, computed_size = compute_hash_and_size( + obj_reader, self.hash_type + ) # Check object correctness if computed_hash != hashkey: @@ -2410,11 +2427,14 @@ def callback(self, action, value): # Perform any wrap-up, if needed callback(action="close", value=None) - return { + output = { "invalid_hashes_packed": invalid_hashes, "invalid_sizes_packed": invalid_sizes, "overlapping_packed": overlapping, } + if include_crc: + output["crc_of_hashes"] = computed_crc + return output def validate( self, callback: Optional[Callable] = None @@ -2449,7 +2469,7 @@ def validate( for pack_id in all_pack_ids: pack_errors = self._validate_hashkeys_pack( - pack_id=pack_id, callback=callback + pack_id=pack_id, callback=callback, include_crc=False ) for error_type, problematic_objects in pack_errors.items(): all_errors[error_type] += problematic_objects @@ -2720,11 +2740,46 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. - def seal_pack(self, pack_id, dryrun=False): + def seal_pack(self, pack_id): """ Seal a pack by adding a central directory in the end, making the file a fully functional ZIP file. + + The sealing process consists of following steps: + + 1. Validate the pack and compute the CRC32 for each key on-the-fly + 2. Gather the information for each record as ZIP record + 3. Go through each record and update the local header, include the CRC32, filename and file lengths fields. + 4. Write the central directory file header and end of centra directory record + 5. Update the SQLite database to mark that the pack is sealed. """ + # Validate if the pack is sealable + # Trying to seal a pack file that was not written in ZIP compatible format can result in + # serious data corrutpion + with self.lock_pack( + str(pack_id), allow_repack_pack=False, mode="rb" + ) as pack_handle: + # Validate the existence of ZIP local header for the first record + header_data = pack_handle.read(zipfile.sizeFileHeader) + local_header = struct.unpack(zipfile.structFileHeader, header_data) + if local_header[0] != zipfile.stringFileHeader: + raise ValueError( + f"Pack file {pack_id} is not written in ZIP compatible format and cannot be sealled." + ) + + if is_zip(pack_handle): + raise ValueError(f"Pack file {pack_id} has already been sealed!") + + # Valid the pack conetents + errors_and_crc = self._validate_hashkeys_pack(pack_id, include_crc=True) + crc_of_hashes = errors_and_crc.pop("crc_of_hashes") + for error_type, problematic_objects in errors_and_crc.items(): + if problematic_objects: + raise ValueError( + f"Detected problematic objects: {problematic_objects} with error '{error_type}'" + ) + + # Gather information for each object session = self._get_cached_session() all_zipinfo = [] header_size = self._zip_header_size @@ -2750,6 +2805,7 @@ def seal_pack(self, pack_id, dryrun=False): zipinfo.compress_size = length # Offset for the header zipinfo.header_offset = offset - header_size + zipinfo.CRC = crc_of_hashes[hashkey] all_zipinfo.append(zipinfo) # Sort the zipinfo data so they are ordered by the header offset value @@ -2772,14 +2828,8 @@ def seal_pack(self, pack_id, dryrun=False): fname = write_pack_handle.read(fnlen).decode("ascii") # Seek back and rewrite the file name - if fname != zipinfo.filename: - if dryrun: - print( - f"Dryrun - changing filename {fname} to {zipinfo.filename}" - ) - else: - write_pack_handle.seek(-fnlen, 1) - write_pack_handle.write(zipinfo.filename.encode("ascii")) + write_pack_handle.seek(-fnlen, 1) + write_pack_handle.write(zipinfo.filename.encode("ascii")) # Update the CRC field write_pack_handle.seek(zipinfo.header_offset + 14, 0) @@ -2798,14 +2848,23 @@ def seal_pack(self, pack_id, dryrun=False): # Update the file size and the compressed size in the zip header write_pack_handle.write(bytes_to_write) - if dryrun: - return - # Finally, write the final central directory - with self.lock_pack(str(pack_id), allow_repack_pack=False) as write_pack_handle: + with self.lock_pack( + str(pack_id), allow_repack_pack=False, mode="ab" + ) as write_pack_handle: write_end_record(write_pack_handle, all_zipinfo) + with self.lock_pack( + str(pack_id), allow_repack_pack=False, mode="rb" + ) as read_pack_handle: + md5_obj = get_md5(read_pack_handle) + + # Update the SQLite database + pack = Pack(id=int(pack_id), state="Sealed", md5=md5_obj.hexdigest()) + session.add(pack) + session.commit() + def _is_pack_sealed(self, pack_id): """Check if a pack is sealed""" pack_loc = self._get_pack_path_from_pack_id(str(pack_id)) diff --git a/disk_objectstore/database.py b/disk_objectstore/database.py index 73acc06b..92033055 100644 --- a/disk_objectstore/database.py +++ b/disk_objectstore/database.py @@ -1,5 +1,6 @@ """Models for the container index file (SQLite DB).""" import os +from enum import unique from typing import Optional from sqlalchemy import Boolean, Column, Integer, String, create_engine, event @@ -31,6 +32,17 @@ class Obj(Base): # pylint: disable=too-few-public-methods ) # integer ID of the pack in which this entry is stored +class Pack(Base): # pylint: disable=too-few-public-methods + """The table for storing the state of pack files. If missing, it means that the pack is currently active""" + + __tablename__ = "db_pack" + + id = Column(Integer, primary_key=True) + state = Column(String, nullable=False, unique=False) + md5 = Column(String, unique=True, nullable=False) + location = Column(String, nullable=True) + + def get_session( path: str, create: bool = False, raise_if_missing: bool = False ) -> Optional[Session]: diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index b0b963b7..940fb8a1 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -1142,6 +1142,34 @@ def compute_hash_and_size( return hasher.hexdigest(), size +def compute_hash_crc_and_size( + stream: StreamReadBytesType, + hash_type: str, +) -> Tuple[str, int, int]: + """Given a stream and a hash type, return the hash key (hexdigest) and the total size. + + :param stream: an open stream + :param hash_type: the string with a name of a valid hash type + :return: a tuple with ``(hash, size)`` where ``hash`` is the hexdigest and ``size`` is the size in bytes + """ + _hash_chunksize = 524288 + hasher = get_hash(hash_type)() + + # Read and hash all content + size = 0 + crc_value = 0 + while True: + next_chunk = stream.read(_hash_chunksize) + if not next_chunk: + # Empty returned value: EOF + break + hasher.update(next_chunk) + size += len(next_chunk) + crc_value += crc32(next_chunk, crc_value) + + return hasher.hexdigest(), crc_value, size + + def detect_where_sorted( # pylint: disable=too-many-branches, too-many-statements left_iterator: Iterable[Any], right_iterator: Iterable[Any], @@ -1286,3 +1314,15 @@ def merge_sorted(iterator1: Iterable[Any], iterator2: Iterable[Any]) -> Iterator for item, _ in detect_where_sorted(iterator1, iterator2): # Whereever it is (only left, only right, on both) I return the object. yield item + + +def get_md5(handle): + """Compute MD5 of a stream""" + md5_object = hashlib.md5() + block_size = 128 * md5_object.block_size + + chunk = handle.read(block_size) + while chunk: + md5_object.update(chunk) + chunk = handle.read(block_size) + return md5_object From 00af11b5a1effa07a6f4ef4296673e3192d26a32 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 23 Dec 2021 08:48:03 +0000 Subject: [PATCH 10/21] Added check for zip compatibility Previous pack format should be validated differently compared with the zip-compatible format, due to the existence of local headers. --- disk_objectstore/container.py | 18 +++++++++++++++--- tests/test_container.py | 1 + 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 78b88d0b..fec4750d 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2379,6 +2379,7 @@ def callback(self, action, value): current_pos = 0 if include_crc: computed_crc = {} + zip_compatible = self._is_zip_compatible(pack_id) with open(pack_path, mode="rb") as pack_handle: stmt = ( select(Obj.hashkey, Obj.size, Obj.offset, Obj.length, Obj.compressed) @@ -2410,8 +2411,12 @@ def callback(self, action, value): invalid_sizes.append(hashkey) # Check that there are no overlapping objects - if offset < current_pos: + + if ( + zip_compatible and (offset < current_pos + self._zip_header_size) + ) or (not zip_compatible and (offset < current_pos)): overlapping.append(hashkey) + current_pos = offset + length if callback: @@ -2865,6 +2870,13 @@ def seal_pack(self, pack_id): session.add(pack) session.commit() - def _is_pack_sealed(self, pack_id): + def _is_zip_compatible(self, pack_id): """Check if a pack is sealed""" - pack_loc = self._get_pack_path_from_pack_id(str(pack_id)) + pack_path = self._get_pack_path_from_pack_id(str(pack_id)) + with open(pack_path, mode="rb") as pack_handle: + # Validate the existence of ZIP local header for the first record + header_data = pack_handle.read(zipfile.sizeFileHeader) + local_header = struct.unpack(zipfile.structFileHeader, header_data) + if local_header[0] == zipfile.stringFileHeader: + return True + return False diff --git a/tests/test_container.py b/tests/test_container.py index 9069a8f3..eba9a0bf 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -2235,6 +2235,7 @@ def test_validate_corrupt_packed(temp_container): with open( temp_container._get_pack_path_from_pack_id(str(meta["pack_id"])), "wb" ) as fhandle: + fhandle.seek(temp_container._zip_header_size, 0) fhandle.write(b"CORRU890890890809PT") errors = temp_container.validate() From fb1e871e27eaab03195c6eb2ad0c897e6be080e3 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 23 Dec 2021 08:52:05 +0000 Subject: [PATCH 11/21] Refectoring --- disk_objectstore/container.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index fec4750d..9f7a76c6 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2379,7 +2379,7 @@ def callback(self, action, value): current_pos = 0 if include_crc: computed_crc = {} - zip_compatible = self._is_zip_compatible(pack_id) + zip_compatible = self._is_pack_zip_compatible(pack_id) with open(pack_path, mode="rb") as pack_handle: stmt = ( select(Obj.hashkey, Obj.size, Obj.offset, Obj.length, Obj.compressed) @@ -2745,7 +2745,7 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. - def seal_pack(self, pack_id): + def seal_pack(self, pack_id: str): """ Seal a pack by adding a central directory in the end, making the file a fully functional ZIP file. @@ -2761,17 +2761,13 @@ def seal_pack(self, pack_id): # Validate if the pack is sealable # Trying to seal a pack file that was not written in ZIP compatible format can result in # serious data corrutpion + if not self._is_pack_zip_compatible(pack_id): + raise ValueError( + f"Pack file {pack_id} is not written in ZIP compatible format and cannot be sealled." + ) with self.lock_pack( str(pack_id), allow_repack_pack=False, mode="rb" ) as pack_handle: - # Validate the existence of ZIP local header for the first record - header_data = pack_handle.read(zipfile.sizeFileHeader) - local_header = struct.unpack(zipfile.structFileHeader, header_data) - if local_header[0] != zipfile.stringFileHeader: - raise ValueError( - f"Pack file {pack_id} is not written in ZIP compatible format and cannot be sealled." - ) - if is_zip(pack_handle): raise ValueError(f"Pack file {pack_id} has already been sealed!") @@ -2870,8 +2866,8 @@ def seal_pack(self, pack_id): session.add(pack) session.commit() - def _is_zip_compatible(self, pack_id): - """Check if a pack is sealed""" + def _is_pack_zip_compatible(self, pack_id: str) -> bool: + """Check if a pack is written in zip compatible format""" pack_path = self._get_pack_path_from_pack_id(str(pack_id)) with open(pack_path, mode="rb") as pack_handle: # Validate the existence of ZIP local header for the first record From da2d705f0867699558fe99c8e1c02a7da0bccb7f Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 23 Dec 2021 09:01:10 +0000 Subject: [PATCH 12/21] Update the tests --- disk_objectstore/container.py | 12 +++++------- tests/test_container.py | 30 +++++++++++++++++++----------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 9f7a76c6..73414e52 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -136,6 +136,9 @@ class Container: # pylint: disable=too-many-public-methods # (after VACUUMing, as mentioned above). _MAX_CHUNK_ITERATE_LENGTH = 9500 + # Size of the ZIP local file header + _ZIP_HEADER_SIZE = 30 + FN_SIZE + 20 + def __init__(self, folder: Union[str, Path]) -> None: """Create the class that represents the container. @@ -359,11 +362,6 @@ def init_container( hasher.update(b"") self._hash_place_holder = hasher.hexdigest()[:FN_SIZE] - # Size of the ZIP local file header - self._zip_header_size = 30 + len(self._hash_place_holder) + 20 - # Size of the Data descriptor - self._zip_dd_size = 24 # struct.calcsize(" Date: Thu, 23 Dec 2021 14:55:59 +0000 Subject: [PATCH 13/21] Fix compatibility check --- disk_objectstore/container.py | 11 ++++------- tests/test_container.py | 1 + 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 73414e52..7f7b0617 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -152,6 +152,7 @@ def __init__(self, folder: Union[str, Path]) -> None: # IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`! self._current_pack_id: Optional[int] = None self._config: Optional[dict] = None + self._hash_place_holder = "0" * FN_SIZE def get_folder(self) -> str: """Return the path to the folder that will host the object-store container.""" @@ -357,11 +358,6 @@ def init_container( if not is_known_hash(hash_type): raise ValueError(f'Unknown hash type "{hash_type}"') - # Place holder for unknown hash - hasher = get_hash(hash_type)() - hasher.update(b"") - self._hash_place_holder = hasher.hexdigest()[:FN_SIZE] - if clear: if os.path.exists(self._folder): shutil.rmtree(self._folder) @@ -2869,8 +2865,9 @@ def _is_pack_zip_compatible(self, pack_id: str) -> bool: pack_path = self._get_pack_path_from_pack_id(str(pack_id)) with open(pack_path, mode="rb") as pack_handle: # Validate the existence of ZIP local header for the first record - header_data = pack_handle.read(zipfile.sizeFileHeader) - local_header = struct.unpack(zipfile.structFileHeader, header_data) + local_header = struct.unpack( + "<4s", pack_handle.read(4) + ) # Read the local header signature if local_header[0] == zipfile.stringFileHeader: return True return False diff --git a/tests/test_container.py b/tests/test_container.py index f17b1e65..991320b3 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -2300,6 +2300,7 @@ def test_validate_corrupt_packed_size(temp_container): # pylint: disable=invali temp_container._get_pack_path_from_pack_id(str(meta["pack_id"])), "wb" ) as fhandle: # Short corrupted string so also the size is wrong + fhandle.write(b" " * temp_container._ZIP_HEADER_SIZE) # Write a dummy header fhandle.write(b"COR") errors = temp_container.validate() From 269661d969dfcd94b5d71758d329768cef59a6e9 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 23 Dec 2021 16:07:57 +0000 Subject: [PATCH 14/21] Added method to seal all eligible packs The method to get the pack id to write to is also updated such that it only return unsealed packs --- disk_objectstore/container.py | 109 ++++++++++++++++++++++++++++++++-- disk_objectstore/database.py | 9 +++ tests/test_container.py | 11 ++++ 3 files changed, 125 insertions(+), 4 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 7f7b0617..f9acd515 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -50,7 +50,7 @@ from sqlalchemy.sql import func from sqlalchemy.sql.expression import delete, select, text, update, values -from .database import Obj, Pack, get_session +from .database import Obj, Pack, PackState, get_session from .exceptions import InconsistentContent, NotExistent, NotInitialised from .utils import ( CallbackStreamWrapper, @@ -288,7 +288,11 @@ def _get_pack_id_to_write_to(self) -> int: """ # Default to zero if not set (e.g. if it's None) pack_id = self._current_pack_id or 0 + sealed_packs = self.get_sealed_packs(include_archived=True) while True: + if pack_id in sealed_packs: + pack_id += 1 + continue pack_path = self._get_pack_path_from_pack_id(pack_id) if not os.path.exists(pack_path): # Use this ID - the pack file does not exist yet @@ -2739,7 +2743,7 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. - def seal_pack(self, pack_id: str): + def seal_pack(self, pack_id: int): """ Seal a pack by adding a central directory in the end, making the file a fully functional ZIP file. @@ -2793,7 +2797,7 @@ def seal_pack(self, pack_id: str): .order_by(Obj.offset) ) - for rowid, hashkey, size, offset, length, compressed in session.execute(stmt): + for _, hashkey, size, offset, length, compressed in session.execute(stmt): zipinfo = ZipInfo(filename=hashkey[:FN_SIZE]) zipinfo.compress_type = ZIP_DEFLATED if compressed else ZIP_STORED zipinfo.file_size = size @@ -2856,7 +2860,9 @@ def seal_pack(self, pack_id: str): md5_obj = get_md5(read_pack_handle) # Update the SQLite database - pack = Pack(id=int(pack_id), state="Sealed", md5=md5_obj.hexdigest()) + pack = Pack( + id=int(pack_id), state=PackState.Sealed.value, md5=md5_obj.hexdigest() + ) session.add(pack) session.commit() @@ -2871,3 +2877,98 @@ def _is_pack_zip_compatible(self, pack_id: str) -> bool: if local_header[0] == zipfile.stringFileHeader: return True return False + + def get_sealed_packs(self, include_archived=True) -> Dict[int, str]: + """ + Get a dictionary of sealed packs and their MD5 + + :param include_archived: Also included archived packs. These packs have been sealed previously. + """ + session = self._get_cached_session() + if include_archived: + stmt = ( + select(Pack.id, Pack.md5) + .where( + (Pack.state == PackState.Sealed.value) + | (PackState == PackState.Archived.value) + ) + .order_by(Pack.id) + ) + else: + stmt = ( + select(Pack.id, Pack.md5) + .where(Pack.state == PackState.Sealed.value) + .order_by(Pack.id) + ) + + results = {} + for pack_id, pack_md5 in session.execute(stmt): + results[pack_id] = pack_md5 + return results + + def get_all_pack_info(self): + """Get the status of the packs""" + session = self._get_cached_session() + stmt = select( + Obj.pack_id, + ).distinct() + pack_ids = sorted(x[0] for x in session.execute(stmt)) + + # Locate those stored in Pack table + stmt = select( + Pack.id, + Pack.md5, + Pack.state, + Pack.location, + ) + entry_in_pack_table = {} + for pack_id, md5_value, state, location in session.execute(stmt): + entry_in_pack_table[pack_id] = { + "pack_id": pack_id, + "md5": md5_value, + "state": state, + "location": location, + } + # Combine the two + info = [] + for pack_id in pack_ids: + if pack_id in entry_in_pack_table: + info.append(entry_in_pack_table[pack_id]) + else: + info.append( + { + "pack_id": pack_id, + "md5": None, + "state": PackState.Unsealed.value, + "location": None, + } + ) + return info + + def seal_all_sealable_packs(self): + """ + Find packs that are no longer active for writing and seal them. + """ + + # Find which packs are eligible for sealing + pack_id = 0 + eligible = [] + sealed = self.get_sealed_packs(include_archived=True) + # Go through all paths + while True: + pack_path = self._get_pack_path_from_pack_id(pack_id) + if not os.path.exists(pack_path): + # Use this ID - the pack file does not exist yet + break + if ( + os.path.getsize(pack_path) >= self.pack_size_target + and pack_id not in sealed + ): + # Use this ID - the pack file is not "full" yet + eligible.append(pack_id) + # Try the next pack + pack_id += 1 + + # Seal the packs one by one + for pack_id in eligible: + self.seal_pack(pack_id) diff --git a/disk_objectstore/database.py b/disk_objectstore/database.py index 92033055..bd90cb36 100644 --- a/disk_objectstore/database.py +++ b/disk_objectstore/database.py @@ -1,4 +1,5 @@ """Models for the container index file (SQLite DB).""" +import enum import os from enum import unique from typing import Optional @@ -11,6 +12,14 @@ Base = declarative_base() # pylint: disable=invalid-name,useless-suppression +class PackState(enum.Enum): + """Enum for valid sate of seal packs""" + + Sealed = "Sealed" + Archived = "Archived" + Unsealed = "Unsealed" + + class Obj(Base): # pylint: disable=too-few-public-methods """The main (and only) table to store object metadata (hashkey, offset, length, ...).""" diff --git a/tests/test_container.py b/tests/test_container.py index 991320b3..87a85423 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3339,6 +3339,17 @@ def test_sealing(temp_dir): # Check the zipfile zif.testzip() + sealed = temp_container.get_sealed_packs() + assert 0 in sealed + + infos = temp_container.get_all_pack_info() + assert infos[0]["state"] == "Sealed" + assert infos[0]["location"] is None + assert infos[0]["pack_id"] == 0 + + # Since pack 0 is seal, new data should be written to pack 1 + assert temp_container._get_pack_id_to_write_to() == 1 + def test_not_implemented_repacks(temp_container): """Check the error for not implemented repack methods.""" From 404544211d5cd56f10a45c849f037aa4917756d7 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 23 Dec 2021 16:16:37 +0000 Subject: [PATCH 15/21] ensure sealed packs will not be opened for writing --- disk_objectstore/container.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index f9acd515..1d040e7e 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1175,6 +1175,10 @@ def lock_pack( """ assert self._is_valid_pack_id(pack_id, allow_repack_pack=allow_repack_pack) + # For sealed pack, we only allow reading + if pack_id in self.get_sealed_packs(): + assert mode.startswith("r") and "+" not in mode + # Open file in exclusive mode lock_file = os.path.join(self._get_pack_folder(), f"{pack_id}.lock") pack_file = self._get_pack_path_from_pack_id( @@ -2823,11 +2827,8 @@ def seal_pack(self, pack_id: int): # Check the magic if local_header[0] != zipfile.stringFileHeader: raise ValueError(f"Cannot find ZIP header for record {zipinfo}") - # Read the filename - fname = write_pack_handle.read(fnlen).decode("ascii") - # Seek back and rewrite the file name - write_pack_handle.seek(-fnlen, 1) + # rewrite the file name write_pack_handle.write(zipinfo.filename.encode("ascii")) # Update the CRC field From 1e2c92b153303bf97fcdb9f385a7c9d5bb2be5eb Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Fri, 24 Dec 2021 13:18:07 +0000 Subject: [PATCH 16/21] Pre-commit fix Fix and supress various pre-commit messages. --- disk_objectstore/container.py | 76 ++++++++++++++-------------------- disk_objectstore/database.py | 7 ++-- disk_objectstore/zipsupport.py | 54 +++++++++++------------- tests/test_container.py | 29 ++++++++++++- 4 files changed, 85 insertions(+), 81 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 1d040e7e..fdc21266 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -29,26 +29,16 @@ overload, ) from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipInfo -from zlib import crc32 - -from disk_objectstore.zipsupport import ( - _DD_SIGNATURE, - _DD_SIZE, - is_zip, - write_end_record, - write_file_describer, - write_zip_header, -) try: - from typing import Literal + from typing import Literal # pylint: disable=ungrouped-imports except ImportError: # Python <3.8 backport from typing_extensions import Literal # type: ignore from sqlalchemy.orm.session import Session from sqlalchemy.sql import func -from sqlalchemy.sql.expression import delete, select, text, update, values +from sqlalchemy.sql.expression import delete, select, text, update from .database import Obj, Pack, PackState, get_session from .exceptions import InconsistentContent, NotExistent, NotInitialised @@ -77,6 +67,7 @@ safe_flush_to_disk, yield_first_element, ) +from .zipsupport import is_zip, write_end_record, write_zip_header ObjQueryResults = namedtuple( "ObjQueryResults", ["hashkey", "offset", "length", "compressed", "size"] @@ -1280,7 +1271,7 @@ def _write_data_to_packfile( read_handle: StreamReadBytesType, compress: bool, hash_type: Optional[str] = None, - ) -> Union[Tuple[int, None, int], Tuple[int, str, int]]: + ) -> Union[Tuple[int, int, None], Tuple[int, int, str]]: """Append data, read from read_handle until it ends, to the correct packfile. Return the number of bytes READ (note that this will be different @@ -2292,14 +2283,12 @@ def export( ) # Let us also compute the hash - def _validate_hashkeys_pack( + def _validate_hashkeys_pack( # pylint: disable=too-many-branches, too-many-locals self, pack_id: int, callback: Optional[Callable] = None, include_crc: bool = False, - ) -> Dict[ - str, Union[List[Union[str, Any]], List[Any]] - ]: # pylint: disable=too-many-locals + ) -> Dict[str, Any]: """Validate all hashkeys and returns a dictionary of problematic entries. The keys are the problem type, the values are a list of hashkeys of problematic objects. @@ -2438,7 +2427,7 @@ def callback(self, action, value): "invalid_hashes_packed": invalid_hashes, "invalid_sizes_packed": invalid_sizes, "overlapping_packed": overlapping, - } + } # type: Dict[str, Any] if include_crc: output["crc_of_hashes"] = computed_crc return output @@ -2747,7 +2736,7 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. - def seal_pack(self, pack_id: int): + def seal_pack(self, pack_id: int): # pylint: disable=too-many-locals """ Seal a pack by adding a central directory in the end, making the file a fully functional ZIP file. @@ -2765,7 +2754,7 @@ def seal_pack(self, pack_id: int): # serious data corrutpion if not self._is_pack_zip_compatible(pack_id): raise ValueError( - f"Pack file {pack_id} is not written in ZIP compatible format and cannot be sealled." + f"Pack file {pack_id} is not written in ZIP compatible format and cannot be sealed." ) with self.lock_pack( str(pack_id), allow_repack_pack=False, mode="rb" @@ -2785,9 +2774,6 @@ def seal_pack(self, pack_id: int): # Gather information for each object session = self._get_cached_session() all_zipinfo = [] - header_size = self._ZIP_HEADER_SIZE - # Size of the file local header - # the extra field takes 20 bytes with format " bool: + def _is_pack_zip_compatible(self, pack_id: int) -> bool: """Check if a pack is written in zip compatible format""" pack_path = self._get_pack_path_from_pack_id(str(pack_id)) with open(pack_path, mode="rb") as pack_handle: @@ -2875,7 +2859,7 @@ def _is_pack_zip_compatible(self, pack_id: str) -> bool: local_header = struct.unpack( "<4s", pack_handle.read(4) ) # Read the local header signature - if local_header[0] == zipfile.stringFileHeader: + if local_header[0] == zipfile.stringFileHeader: # type: ignore return True return False @@ -2890,15 +2874,15 @@ def get_sealed_packs(self, include_archived=True) -> Dict[int, str]: stmt = ( select(Pack.id, Pack.md5) .where( - (Pack.state == PackState.Sealed.value) - | (PackState == PackState.Archived.value) + (Pack.state == PackState.SEALED.value) + | (PackState == PackState.ARCHIVED.value) ) .order_by(Pack.id) ) else: stmt = ( select(Pack.id, Pack.md5) - .where(Pack.state == PackState.Sealed.value) + .where(Pack.state == PackState.SEALED.value) .order_by(Pack.id) ) @@ -2940,7 +2924,7 @@ def get_all_pack_info(self): { "pack_id": pack_id, "md5": None, - "state": PackState.Unsealed.value, + "state": PackState.UNSEALED.value, "location": None, } ) diff --git a/disk_objectstore/database.py b/disk_objectstore/database.py index bd90cb36..79e9aa99 100644 --- a/disk_objectstore/database.py +++ b/disk_objectstore/database.py @@ -1,7 +1,6 @@ """Models for the container index file (SQLite DB).""" import enum import os -from enum import unique from typing import Optional from sqlalchemy import Boolean, Column, Integer, String, create_engine, event @@ -15,9 +14,9 @@ class PackState(enum.Enum): """Enum for valid sate of seal packs""" - Sealed = "Sealed" - Archived = "Archived" - Unsealed = "Unsealed" + SEALED = "Sealed" + ARCHIVED = "Archived" + UNSEALED = "Unsealed" class Obj(Base): # pylint: disable=too-few-public-methods diff --git a/disk_objectstore/zipsupport.py b/disk_objectstore/zipsupport.py index 4ad2d71d..660c3b6f 100644 --- a/disk_objectstore/zipsupport.py +++ b/disk_objectstore/zipsupport.py @@ -1,15 +1,14 @@ +""" +Utility module providing ZIP format support + +Many of the functions are adapted from the zipfile stdlib +""" import struct import sys import zipfile -from zipfile import ( - ZIP64_LIMIT, - ZIP64_VERSION, - ZIP_DEFLATED, - ZIP_FILECOUNT_LIMIT, - ZIP_STORED, - ZipInfo, -) +from zipfile import ZIP64_LIMIT, ZIP_DEFLATED, ZIP_FILECOUNT_LIMIT, ZIP_STORED, ZipInfo +# pylint: disable=too-many-locals, invalid-name, protected-access, too-many-statements _DD_SIGNATURE = 0x08074B50 _EXTRA_FIELD_STRUCT = struct.Struct("= 0: # found the magic number; attempt to unpack and interpret - recData = data[start : start + zipfile.sizeEndCentDir] - if len(recData) != zipfile.sizeEndCentDir: + recData = data[start : start + zipfile.sizeEndCentDir] # type: ignore + if len(recData) != zipfile.sizeEndCentDir: # type: ignore # Zip file is corrupted. return False return True diff --git a/tests/test_container.py b/tests/test_container.py index 87a85423..f6f32014 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3010,7 +3010,7 @@ def test_clean_storage_with_duplicates_original_deleted( ) @pytest.mark.parametrize("use_streams", [True, False]) @pytest.mark.parametrize("compress", [True, False]) -def test_packs_no_holes( +def test_packs_no_holes( # pylint: disable=too-many-locals temp_container, no_holes, no_holes_read_twice, use_streams, compress, monkeypatch ): """Test what happens when writing directly to packs and asking not to leave back holes.""" @@ -3320,7 +3320,7 @@ def test_sealing(temp_dir): temp_container = Container(temp_dir) temp_container.init_container(clear=True) - # data of 10 bytes each. Will fill two packs. + # data of 10 bytes each. data = [ b"-123456789", b"a123456789", @@ -3350,6 +3350,31 @@ def test_sealing(temp_dir): # Since pack 0 is seal, new data should be written to pack 1 assert temp_container._get_pack_id_to_write_to() == 1 + # Sealed pack cannot be sealed again + with pytest.raises(ValueError, match="has already been sealed"): + temp_container.seal_pack(0) + + # Some new data + data = [ + b"-123456788", + b"a123456788", + ] + hexkeys = [] + for value in data: + hexkeys.append(temp_container.add_objects_to_pack([value])[0]) + + # Destroy header - the file is no longer ZIP compatible, e.g. similar to the old + # format + with open(f"{temp_dir}/packs/1", "r+b") as fhandle: + fhandle.write(b"0000") + + # Sealed pack cannot be sealed again + with pytest.raises(ValueError, match=r"ZIP compatible"): + temp_container.seal_pack(1) + + # Important before exiting from the tests + temp_container.close() + def test_not_implemented_repacks(temp_container): """Check the error for not implemented repack methods.""" From 15a9de05dff60c15c9601b4218f5b68f5828aaaa Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sun, 26 Dec 2021 10:38:03 +0000 Subject: [PATCH 17/21] Update the readme about ZIP compatible pack format --- README.md | 23 +++++++++++++++++++++++ disk_objectstore/container.py | 7 ++++--- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 22de537c..3557b4bf 100644 --- a/README.md +++ b/README.md @@ -448,6 +448,29 @@ This implementation, in particular, addresses the following aspects: - A number of streamins APIs are exposed to the users, who are encouraged to use this if they are not sure of the size of the objects and want to avoid out-of-memory crashes. +## ZIP Compatible pack format + +The pack file contains the raw binary data of each record, concatenated together. +While this format if efficient on space, it heavily relies on the SQLite index database. +A recent improvement has made the pack file include headers that are similar to the ZIP archive format. +In this format, a local header is written before each record, containing ZIP signature, compressed and uncompressed size of the record, and the CRC32 check sum. +When the pack file is no longer active, e.g. it has exceeded the target size, it can be *sealed* such that it +becomes a valid ZIP archive and can be extracted and validated, using a wide array of software packages. + +The changes are mainly for improving resilience in the case of catastrophic data lost - if the index database is lost or corrupted, the data can still be extracted from the pack files themselves. In the pack files are also damaged, the information in local and central header may be still be used to recover as much data as possible. + +The sealing operation involves checking the integrity of the pack file, calculating the CRC32 checksum needed, and update the local headers. This operation can take a while to complete, but will not cause any down time of the container. Finally, a central header directory of all records to the file is appended. + +Once, a pack file is *sealed*, its status is added to a table of the index database (`Pack` table). If a pack file is missing from this table, it simply means it is not *sealed* yet. Packs that are *sealed* will not be selected for writing new data, even if their sizes are below the target pack sizes. + +Pack files generated prior to this format change will continue to work, but they cannot be *sealed* due to the lack of local header. Repacking is necessary for regerating sealable files. + +The inclusion of local and central directory headers means there is a storage overhead for each record. +The estimated size of each record is 66 bytes for the local header, and 62 + (20, if exceeding 4GB) bytes for the central directory, e.g. 128 + (20) bytes in total. +Hence, storing many small files can be inefficient, however, it is expected that the majority of the records will be much larger, making this overhead insignificant. + +When extracted, each record will result in a single, uncompressed file named with the first 16 characters of its hash. + ## Further design choices In addition, the following design choices have been made: diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index fdc21266..37c9590f 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2756,11 +2756,12 @@ def seal_pack(self, pack_id: int): # pylint: disable=too-many-locals raise ValueError( f"Pack file {pack_id} is not written in ZIP compatible format and cannot be sealed." ) - with self.lock_pack( - str(pack_id), allow_repack_pack=False, mode="rb" + + with open( + self._get_pack_path_from_pack_id(pack_id, allow_repack_pack=False) ) as pack_handle: if is_zip(pack_handle): - raise ValueError(f"Pack file {pack_id} has already been sealed!") + raise ValueError(f"Pack file {pack_id} has been sealed already!") # Valid the pack conetents errors_and_crc = self._validate_hashkeys_pack(pack_id, include_crc=True) From c6fa56c0253ea15ff49ea7c6ccdd854b77dac36e Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sun, 26 Dec 2021 11:45:25 +0000 Subject: [PATCH 18/21] Fix mode keyword argument --- disk_objectstore/container.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 37c9590f..5f0f29b5 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2758,7 +2758,8 @@ def seal_pack(self, pack_id: int): # pylint: disable=too-many-locals ) with open( - self._get_pack_path_from_pack_id(pack_id, allow_repack_pack=False) + self._get_pack_path_from_pack_id(pack_id, allow_repack_pack=False), + mode="rb", ) as pack_handle: if is_zip(pack_handle): raise ValueError(f"Pack file {pack_id} has been sealed already!") From 47c34ffea9a724a8f571c1a519236883d8ce64fb Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sun, 26 Dec 2021 14:56:11 +0000 Subject: [PATCH 19/21] Fix test --- tests/test_container.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_container.py b/tests/test_container.py index f6f32014..e7f2fa09 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3351,7 +3351,7 @@ def test_sealing(temp_dir): assert temp_container._get_pack_id_to_write_to() == 1 # Sealed pack cannot be sealed again - with pytest.raises(ValueError, match="has already been sealed"): + with pytest.raises(ValueError, match="been sealed already"): temp_container.seal_pack(0) # Some new data From b99b84d75b1ee957efdf68cb0639efd2ee5bee23 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Fri, 21 Jan 2022 15:23:59 +0000 Subject: [PATCH 20/21] Use the full hash for the filename Now the file name will take 64 characters (bytes) long, including the full hexdigest of the SHA256 hash. --- disk_objectstore/container.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 5f0f29b5..5d5b164e 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -73,7 +73,7 @@ "ObjQueryResults", ["hashkey", "offset", "length", "compressed", "size"] ) -FN_SIZE = 16 # Size for the file name used in LOCAL header for each object +FN_SIZE = 64 # Size for the file name used in LOCAL header for each object class ObjectType(Enum): @@ -143,7 +143,7 @@ def __init__(self, folder: Union[str, Path]) -> None: # IMPORANT! IF YOU ADD MORE, REMEMBER TO CLEAR THEM IN `init_container()`! self._current_pack_id: Optional[int] = None self._config: Optional[dict] = None - self._hash_place_holder = "0" * FN_SIZE + self._hash_place_holder = " " * FN_SIZE def get_folder(self) -> str: """Return the path to the folder that will host the object-store container.""" @@ -2654,7 +2654,7 @@ def repack_pack( obj_dict["size"] = size write_zip_header( write_pack_handle, - obj_dict["hashkey"][:FN_SIZE], + obj_dict["hashkey"].ljust(FN_SIZE), # Ensure constant size "zlib" if compressed else None, ) obj_dict["offset"] = write_pack_handle.tell() @@ -2790,7 +2790,7 @@ def seal_pack(self, pack_id: int): # pylint: disable=too-many-locals ) for _, hashkey, size, offset, length, compressed in session.execute(stmt): - zipinfo = ZipInfo(filename=hashkey[:FN_SIZE]) + zipinfo = ZipInfo(filename=hashkey.ljust(FN_SIZE)) zipinfo.compress_type = ZIP_DEFLATED if compressed else ZIP_STORED zipinfo.file_size = size zipinfo.compress_size = length From 3ccd60a2357894c6aeca3a121b5cd6ad28eef958 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 11 May 2022 13:40:14 +0000 Subject: [PATCH 21/21] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/pages/design.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/pages/design.md b/docs/pages/design.md index 2a639086..7769253b 100644 --- a/docs/pages/design.md +++ b/docs/pages/design.md @@ -214,4 +214,4 @@ The inclusion of local and central directory headers means there is a storage ov The estimated size of each record is 66 bytes for the local header, and 62 + (20, if exceeding 4GB) bytes for the central directory, e.g. 128 + (20) bytes in total. Hence, storing many small files can be inefficient, however, it is expected that the majority of the records will be much larger, making this overhead insignificant. -When extracted, each record will result in a single, uncompressed file named with the first 16 characters of its hash. \ No newline at end of file +When extracted, each record will result in a single, uncompressed file named with the first 16 characters of its hash.