diff --git a/dump/src/util/dump/Dump.java b/dump/src/util/dump/Dump.java index 9228f87..8d0a998 100644 --- a/dump/src/util/dump/Dump.java +++ b/dump/src/util/dump/Dump.java @@ -49,7 +49,7 @@ import gnu.trove.list.array.TByteArrayList; import gnu.trove.set.hash.TLongHashSet; import util.dump.ExternalizableBean.externalizationVersion; -import util.dump.UniqueIndex.DuplicateKeyException; +import util.dump.UniqueConstraint.DuplicateKeyException; import util.dump.cache.SoftLRUCache; import util.dump.io.IOUtils; import util.dump.sort.InfiniteSorter; diff --git a/dump/src/util/dump/ExternalizableBean.java b/dump/src/util/dump/ExternalizableBean.java index 536646a..129b8e2 100644 --- a/dump/src/util/dump/ExternalizableBean.java +++ b/dump/src/util/dump/ExternalizableBean.java @@ -792,7 +792,7 @@ default void readExternal( ObjectInput in ) throws IOException, ClassNotFoundExc if ( f != null ) { f.set(this, o); } - // throw new IllegalArgumentException("The field type " + fieldTypes[i] + " in class " + getClass() + // throw new IllegalArgumentException("The field type " + fieldTypes[i] + " in " + getClass() // + " is unsupported by util.dump.ExternalizableBean."); } } @@ -802,7 +802,7 @@ default void readExternal( ObjectInput in ) throws IOException, ClassNotFoundExc throw e; } catch ( Throwable e ) { - throw new RuntimeException("Failed to read externalized instance. Maybe the field order was changed? class " + getClass(), e); + throw new RuntimeException("Failed to read externalized instance. Maybe the field order was changed? " + getClass(), e); } } diff --git a/dump/src/util/dump/GroupedIndex.java b/dump/src/util/dump/GroupedIndex.java index 9931471..1502b7b 100644 --- a/dump/src/util/dump/GroupedIndex.java +++ b/dump/src/util/dump/GroupedIndex.java @@ -3,7 +3,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; -import util.dump.UniqueIndex.DuplicateKeyException; +import util.dump.UniqueConstraint.DuplicateKeyException; import util.dump.reflection.FieldAccessor; diff --git a/dump/src/util/dump/MmapLongIdIndex.java b/dump/src/util/dump/MmapLongIdIndex.java new file mode 100644 index 0000000..4a0cb09 --- /dev/null +++ b/dump/src/util/dump/MmapLongIdIndex.java @@ -0,0 +1,1207 @@ +package util.dump; + +import static java.lang.foreign.MemoryLayout.PathElement.groupElement; +import static java.lang.foreign.MemoryLayout.PathElement.sequenceElement; +import static java.lang.foreign.MemoryLayout.sequenceLayout; +import static java.lang.foreign.MemoryLayout.structLayout; +import static java.lang.foreign.ValueLayout.JAVA_LONG; +import static java.nio.channels.FileChannel.MapMode.READ_ONLY; +import static java.nio.channels.FileChannel.MapMode.READ_WRITE; +import static java.nio.file.StandardOpenOption.CREATE_NEW; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.SPARSE; +import static java.nio.file.StandardOpenOption.WRITE; + +import java.io.File; +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.GroupLayout; +import java.lang.foreign.MemoryLayout; +import java.lang.foreign.MemorySegment; +import java.lang.invoke.VarHandle; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.EnumSet; +import java.util.Map; +import java.util.function.ToLongFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import gnu.trove.list.TLongList; +import gnu.trove.list.array.TLongArrayList; +import gnu.trove.map.TLongLongMap; +import gnu.trove.map.hash.TLongLongHashMap; +import util.dump.reflection.FieldAccessor; +import util.dump.reflection.FieldFieldAccessor; +import util.dump.reflection.Reflection; + + +/** + * Special-purpose fixed-size off-heap id-to-pos lookup (not only) for sharded dumps. + *

+ * Employs a simple array-like approach, based on the assumption that IDs either start at 1 and keep incrementing, or will be constrained to a closed range + * defined a priori. In the former case, the backing file will keep growing, while in the latter, the bounds are set at initialization. + *

+ * Requires compiler and runtime parameter --add-modules=jdk.incubator.foreign in order to work. + */ +public abstract class MmapLongIdIndex extends DumpIndex implements UniqueConstraint { + + private static final Logger _log = LoggerFactory.getLogger(MmapLongIdIndex.class); + + private static final boolean PARANOIA_MODE = true; + + public static MmapLongIdIndex forClosedRange( Dump dump, String fieldName, long minKey, long maxKey ) throws NoSuchFieldException { + return new ClosedRangeMmapLongIdIndex<>(dump, fieldName, minKey, maxKey); + } + + public static MmapLongIdIndex forClosedRange( Dump dump, FieldAccessor fieldAccessor, long minKey, long maxKey ) { + return new ClosedRangeMmapLongIdIndex<>(dump, fieldAccessor, minKey, maxKey); + } + + public static MmapLongIdIndex forOpenRange( Dump dump, String fieldName, long minKey ) throws NoSuchFieldException { + return new OpenRangeMmapLongIdIndex<>(dump, fieldName, minKey); + } + + public static MmapLongIdIndex forOpenRange( Dump dump, FieldAccessor fieldAccessor, long minKey ) { + return new OpenRangeMmapLongIdIndex<>(dump, fieldAccessor, minKey); + } + + private static boolean isPowerOfTwo( long n ) { + return n > 0L && (n & n - 1L) == 0L; + } + + protected final ToLongFunction _getKey; + + protected final Path _lookupPath; + + protected final long _minKey; + protected final long _maxKey; + + private final IndexCorrections _indexCorrections = new IndexCorrections(); + + protected FileLayout _fileLayout; + private FileChannel _readWriteFileChannel; + private Header _header; + + private MemorySegment _tableSegment; + private long _tableCapacity; + private volatile VarHandle _longArrayAccess; + + private MmapLongIdIndex( Dump dump, String fieldName, long minKey, long maxKey ) throws NoSuchFieldException { + this(dump, new FieldFieldAccessor(Reflection.getField(dump._beanClass, fieldName)), minKey, maxKey); + } + + private MmapLongIdIndex( Dump dump, FieldAccessor fieldAccessor, long minKey, long maxKey ) { + super(dump, fieldAccessor, new File(dump.getDumpFile().getParentFile(), dump.getDumpFile().getName() + "." + fieldAccessor.getName() + ".mmap.lookup")); + + _lookupPath = Paths.get(getLookupFile().getPath()); + + _minKey = minKey; + _maxKey = maxKey; + + if ( _fieldIsLong ) { + if ( _fieldIsLongObject ) { + _getKey = o -> { + try { + return (Long)_fieldAccessor.get(o); + } + catch ( Exception e ) { + throw new RuntimeException(e); + } + }; + } else { + _getKey = o -> { + try { + return _fieldAccessor.getLong(o); + } + catch ( Exception e ) { + throw new RuntimeException(e); + } + }; + } + } else if ( _fieldIsInt ) { + if ( _fieldIsIntObject ) { + _getKey = o -> { + try { + return (Integer)_fieldAccessor.get(o); + } + catch ( Exception e ) { + throw new RuntimeException(e); + } + }; + } else { + _getKey = o -> { + try { + return _fieldAccessor.getInt(o); + } + catch ( Exception e ) { + throw new RuntimeException(e); + } + }; + } + } else { + throw new IllegalStateException("only long (and int) fields implemented"); + } + } + + @Override + public void close() throws IOException { + _log.info("{} closing...", _lookupPath.getFileName()); + flushTable(); + closeHeader(); + + if ( _readWriteFileChannel != null ) { + _readWriteFileChannel.close(); + } + + super.close(); + _log.info("{} closed.", _lookupPath.getFileName()); + } + + @Override + public boolean contains( int key ) { + return contains((long)key); + } + + @Override + public boolean contains( long key ) { + long index = indexFor(key); + long pos = getPosAt(index); + + if ( pos < 0 ) { + return false; + } + + synchronized ( _dump ) { + return !_dump._deletedPositions.contains(pos); + } + } + + @Override + public boolean contains( Object key ) { + if ( key instanceof Long l ) { + return contains(l.longValue()); + } + if ( key instanceof Integer i ) { + return contains(i.intValue()); + } + + throw new IllegalArgumentException("only long (and int) fields implemented"); + } + + @Override + public void flush() throws IOException { + // We don't have to do anything to push data out-of-process, it's already in the page cache. Hammering the file to disk all the time gets us nowhere. + } + + @Override + public long[] getAllLongKeys() { + TLongList keys = new TLongArrayList(getNumKeys(), -1); + MemorySegment table = _tableSegment.asReadOnly(); + + for ( long index = 0, n = capacity(table); index < n; ++index ) { + // not used during live operation, hence concurrency is not an issue + if ( getPosAt(table, index) >= 0 ) { + keys.add(keyOffsetRevert(index)); + } + } + + return keys.toArray(); + } + + @Override + public TLongList getAllPositions() { + throw new UnsupportedOperationException(); + } + + @Override + public Object getKey( E o ) { + return keyFor(o); + } + + @Override + public int getNumKeys() { + return (int)_header.getNumKeys(); + } + + @Override + public E lookup( int key ) { + return lookup((long)key); + } + + @Override + public E lookup( long key ) { + long index = indexFor(key); + long pos = getPosAt(index); + + if ( pos < 0 ) { + return null; + } + + synchronized ( _dump ) { + return !_dump._deletedPositions.contains(pos) ? _dump.get(pos) : null; + } + } + + @Override + public E lookup( Object key ) { + if ( key instanceof Long l ) { + return lookup(l.longValue()); + } + if ( key instanceof Integer i ) { + return lookup(i.longValue()); + } + + throw new IllegalArgumentException("only long (and int) fields implemented"); + } + + protected long capacity( MemorySegment segment ) { + return segment.byteSize() / JAVA_LONG.byteSize(); + } + + @Override + protected boolean checkMeta() { + return super.checkMeta() && checkHeader(); + } + + protected boolean checkNumKeys( Header header ) { + if ( header.getNumKeys() < 0 ) { + return false; + } + if ( header.getNumKeys() == 0 && _dump.getDumpSize() > 0 ) { + return false; + } + + return header.getNumKeys() <= header.getTableBytes() / Long.BYTES; + } + + @Override + protected String getIndexType() { + return MmapLongIdIndex.class.getSimpleName(); + } + + protected void growTableSegment( long minTableSize ) { + long tableOffset = _header.getTableOffset(); + long tableSize = _header.getTableBytes(); + + long currentFileSize = tableOffset + tableSize; + long minFileSize = tableOffset + minTableSize; + + if ( minFileSize <= currentFileSize ) { + throw new IllegalArgumentException("no need to grow table if min size <= current size"); + } + + try { + long alignedFileSize = minFileSize + (-minFileSize & (_fileLayout.blockSize() - 1)); + + _header.setTableBytes(alignedFileSize - tableOffset); + mapTableSegment(); + + _log.info("{} grew from {} to {} bytes", _lookupPath.getFileName(), currentFileSize, alignedFileSize); + } + catch ( IOException e ) { + throw new RuntimeException(e); + } + } + + @Override + protected void initLookupMap() { + // does nothing + } + + @Override + protected void initLookupOutputStream() { + try { + if ( Files.exists(_lookupPath) ) { + openExisting(); + } else { + createNew(); + } + } + catch ( IOException e ) { + throw new RuntimeException(e); + } + } + + protected abstract long initialTableSize(); + + @Override + protected void load() { + // does nothing, we only need to open the mmapped file + } + + protected abstract void setPosAt( long index, long pos ); + + protected final void setPosAtVolatile( long index, long pos ) { + longArraySetVolatile(_tableSegment, index, posOffsetApply(pos)); + } + + protected long tableCapacity() { + return _tableCapacity; + } + + @Override + void add( E elem, long pos ) { + if ( add0(elem, pos) ) { + _header.incrementNumKeys(1); + } + } + + @Override + void delete( E elem, long pos ) { + if ( delete0(elem) ) { + _header.incrementNumKeys(-1); + } + } + + @Override + boolean isUpdatable( E oldItem, E newItem ) { + return true; + } + + @Override + void update( long pos, E oldElem, E newElem ) { + if ( keyFor(oldElem) == keyFor(newElem) ) { + return; // pos and key are identical => no change + } + + boolean deleted = delete0(oldElem); + boolean added = add0(newElem, pos); + + if ( added == deleted ) { + return; // key count unchanged + } + + if ( added ) { + _header.incrementNumKeys(1); + } else { // deleted + _header.incrementNumKeys(-1); + } + } + + private boolean add0( E elem, long pos ) { + long index = indexFor(elem); + boolean unique = getPosAt(index) < 0; + if ( !unique ) { + throw new DuplicateKeyException("Dump already contains an instance with the key " + keyFor(elem)); + } + + setPosAt(index, pos); + return true; + } + + private void applyHeaderCorrections() { + if ( _indexCorrections.numKeys != null ) { + _log.info("{} fixing header...", _lookupPath.getFileName()); + _header.setNumKeys(_indexCorrections.numKeys); + } + } + + private void applyTableCorrections() { + if ( _indexCorrections.rawContentCorrections != null ) { + _log.info("{} fixing table...", _lookupPath.getFileName()); + _indexCorrections.rawContentCorrections.forEachEntry(( index, posInfo ) -> { + longArraySet(_tableSegment, index, posInfo); + return true; + }); + + if ( _header.getNumKeys() != getAllLongKeys().length ) { + throw new IllegalStateException(_lookupPath.getFileName() + " inconsistencies post-fixup, wtf"); + } + } + } + + private boolean checkConsistency( Header header, MemorySegment tableSegment ) throws IOException { + return new ConsistencyCheck(header, tableSegment).perform(); + } + + private boolean checkHeader() { + if ( !Files.exists(_lookupPath) ) { + return false; + } + + try (Arena arena = Arena.ofConfined()) { + long fileSize = Files.size(_lookupPath); + if ( fileSize < LeadIn.byteSize() ) { + _log.warn("{} file is too small to contain even the header. Will rebuild index.", _lookupPath.getFileName()); + return false; + } + + try (FileChannel readonlyFileChannel = FileChannel.open(_lookupPath, READ)) { + LeadIn leadIn = new LeadIn(readonlyFileChannel.map(READ_ONLY, 0, LeadIn.byteSize(), arena)); + + if ( leadIn.getFileMagic() != LeadIn.FILE_MAGIC ) { + _log.warn("{} has wrong file magic. Will rebuild index.", _lookupPath.getFileName()); + return false; + } + + boolean checkRequired = false; + + // version or size unset, unknown, or mismatching + if ( !Header.isReadable(leadIn.getLayoutVersion()) ) { + _log.warn("{} has unknown header version {}. Will rebuild index.", _lookupPath.getFileName(), leadIn.getLayoutVersion()); + return false; + } + + // pin everything to current one-and-only version + if ( leadIn.getLayoutVersion() != Header.getCurrentLayout().layoutVersion() ) { + _log.warn("{} has mismatching header version. Will rebuild index.", _lookupPath.getFileName()); + return false; + } + + FileLayout fileLayout = Header.layoutByVersion(leadIn.getLayoutVersion()); + + if ( leadIn.getHeaderBytes() < fileLayout.headerBytes() ) { + _log.warn("{} has mismatching header size information. Will rebuild index.", _lookupPath.getFileName()); + return false; + } + + Header header = new Header(fileLayout, readonlyFileChannel.map(READ_ONLY, 0, leadIn.getHeaderBytes(), arena)); + + if ( header.getTableOffset() <= 0 || !isPowerOfTwo(header.getTableOffset()) ) { + _log.warn("{} has inconsistent alignment information. Will rebuild index.", _lookupPath.getFileName()); + return false; + } + // file size mismatch + if ( header.getTableOffset() + header.getTableBytes() != fileSize ) { + _log.warn("{} has mismatching file size information. Will rebuild index.", _lookupPath.getFileName()); + return false; + } + + // configuration changes + if ( header.getMinKey() != _minKey || header.getMaxKey() != _maxKey ) { + _log.warn("{} has mismatching key bounds. Will rebuild index.", _lookupPath.getFileName()); + return false; + } + + // not closed properly, or inconsistency between dump file and header state + if ( header.getOpenedTimestamp() >= header.getClosedTimestamp() ) { + _log.info("{} was not closed properly, checking consistency...", _lookupPath.getFileName()); + checkRequired = true; + } + + // plausibility checks + if ( !checkNumKeys(header) ) { + _log.info("{} has stored implausible numKeys, checking consistency...", _lookupPath.getFileName()); + checkRequired = true; + } + + if ( PARANOIA_MODE && !checkRequired ) { + _log.info("{} hardcoded paranoia mode enabled, checking consistency...", _lookupPath.getFileName()); + checkRequired = true; + } + + if ( checkRequired ) { + MemorySegment tableSegment = readonlyFileChannel.map(READ_ONLY, header.getTableOffset(), header.getTableBytes(), arena); + if ( !checkConsistency(header, tableSegment) ) { + return false; + } + } + + _fileLayout = fileLayout; // store this, so we won't have to restart at the lead-in + + return true; + } + } + catch ( Exception argh ) { + throw new RuntimeException(argh); + } + + } + + private void closeHeader() { + if ( _header != null ) { + _header.setClosedTimestamp(System.currentTimeMillis()); + _header.flush(); + } + } + + private void createNew() throws IOException { + Files.newByteChannel(_lookupPath, EnumSet.of(CREATE_NEW, SPARSE, WRITE)).close(); // just create sparse file; MemorySegment API takes a Path as of java 17 + _fileLayout = Header.getCurrentLayout(); + + initHeader(mapHeaderSegment()); + + mapTableSegment(); + } + + private boolean delete0( E elem ) { + long index = indexFor(elem); + boolean deleted = getPosAt(index) >= 0; + setPosAt(index, -1); + return deleted; + } + + private void flushTable() { + if ( _tableSegment != null ) { + _tableSegment.force(); + } + } + + private long getPosAt( long index ) { + if ( index < 0 ) { + throw new IndexOutOfBoundsException("index out of bounds"); + } + + if ( index < tableCapacity() ) { + return getPosAtVolatile(_tableSegment, index); + } else { + return -1; + } + } + + private long getPosAt( MemorySegment tableSegment, long index ) { + return posOffsetRevert(longArrayGet(tableSegment, index)); + } + + private long getPosAtVolatile( MemorySegment tableSegment, long index ) { + return posOffsetRevert(longArrayGetVolatile(tableSegment, index)); + } + + private long indexFor( long key ) { + if ( _minKey <= key && key <= _maxKey ) { + return keyOffsetApply(key); + } + + throw new IndexOutOfBoundsException("key out of bounds"); + } + + private long indexFor( E elem ) { + long key = keyFor(elem); + return indexFor(key); + } + + private void initHeader( MemorySegment headerSegment ) { + _header = new Header(_fileLayout, headerSegment); + + _header.setFileMagic(LeadIn.FILE_MAGIC); + _header.setLayoutVersion(_fileLayout.layoutVersion()); + _header.setHeaderBytes(_fileLayout.headerBytes()); + + _header.setTableOffset(_fileLayout.tableOffset()); + + _header.setMinKey(_minKey); + _header.setMaxKey(_maxKey); + + _header.setOpenedTimestamp(System.currentTimeMillis()); + _header.setClosedTimestamp(0L); + + _header.setTableBytes(initialTableSize()); + + _header.setNumKeys(0); + } + + private long keyFor( E elem ) { + return _getKey.applyAsLong(elem); + } + + /** + * Key offset shifts the start of the array closer to the start of the actually used key range. With sharded dumps in particular, the exact key range is + * known a priori and can be neatly catered for. + * + * @param realKey the actual key identifying the instance in the dump + * @return the index into the array where the corresponding dump position is to be stored + */ + private long keyOffsetApply( long realKey ) { + return realKey - _minKey; // => array index + } + + /** + * @param arrayIndex the index into the array where the corresponding dump position is stored + * @return the actual key identifying the instance in the dump + * @see #keyOffsetApply(long) + */ + private long keyOffsetRevert( long arrayIndex ) { + return arrayIndex + _minKey; // real key + } + + private long longArrayGet( MemorySegment array, long index ) { + return (long)_longArrayAccess.get(array, 0L, index); + } + + private long longArrayGetVolatile( MemorySegment array, long index ) { + return (long)_longArrayAccess.getVolatile(array, 0L, index); + } + + private void longArraySet( MemorySegment array, long index, long pos ) { + _longArrayAccess.set(array, 0L, index, pos); + } + + private void longArraySetVolatile( MemorySegment array, long index, long pos ) { + _longArrayAccess.setVolatile(array, 0L, index, pos); + } + + private MemorySegment mapHeaderSegment() throws IOException { + _readWriteFileChannel = FileChannel.open(_lookupPath, READ, WRITE); + return _readWriteFileChannel.map(READ_WRITE, 0, _fileLayout.headerBytes(), Arena.ofAuto()); + } + + private void mapTableSegment() throws IOException { + _tableSegment = _readWriteFileChannel.map(READ_WRITE, _header.getTableOffset(), _header.getTableBytes(), Arena.ofAuto()); + _tableCapacity = capacity(_tableSegment); + _longArrayAccess = sequenceLayout(_tableCapacity, JAVA_LONG).varHandle(sequenceElement()); + } + + private void openExisting() throws IOException { + openHeader(mapHeaderSegment()); + + mapTableSegment(); + + applyTableCorrections(); + } + + private void openHeader( MemorySegment headerSegment ) { + _header = new Header(_fileLayout, headerSegment); + _header.setOpenedTimestamp(System.currentTimeMillis()); + + applyHeaderCorrections(); + } + + /** + * Pos offset masks the fact that valid dump positions start at 0, which would collide with "empty value". Pre-filling with -1 is detrimental to + * performance, causing needless write load on initialization / segment growth, and conflicting with the idea of sparse files. + * + * @param realDumpPosition the actual position in the dump + * @return the position info to be stored in the index + */ + private long posOffsetApply( long realDumpPosition ) { + return realDumpPosition + 1; // => stored position info + } + + /** + * @param storedPositioninfo the position info that is stored in the index + * @return the actual position in the dump + * @see #posOffsetApply(long) + */ + private long posOffsetRevert( long storedPositioninfo ) { + return storedPositioninfo - 1; // => real dump position + } + + public interface Arch { + + long cacheLineBytes(); + + long pageSizeBytes(); + + final class AmdZen implements Arch { + + public static final Arch INSTANCE = new AmdZen(); + + @Override + public long cacheLineBytes() { + return 64; + } + + @Override + public long pageSizeBytes() { + return 4096; + } + } + } + + + protected record FileLayout(long layoutVersion, Arch arch, long tableOffsetInPages, long blockSizeInPages, GroupLayout headerLayout) { + + public FileLayout sanityCheck() { + if ( tableOffset() < headerBytes() ) { + throw new IllegalStateException("table overlaps header"); + } + if ( blockSize() < tableOffset() ) { + throw new IllegalStateException("table offset well exceeds first block"); + } + return this; + } + + long blockSize() { + return blockSizeInPages * arch.pageSizeBytes(); + } + + long headerBytes() { + return headerLayout.byteSize(); + } + + long tableOffset() { + return tableOffsetInPages * arch.pageSizeBytes(); + } + } + + + protected static final class Header { + + private static final Map FILE_LAYOUT_BY_VERSION; + + private static final FileLayout FILE_LAYOUT_V1 = new FileLayout(1, Arch.AmdZen.INSTANCE, 1, 1024, structLayout( // + + // these are mostly constant, might change when file layout version is updated / migrated + + LeadIn.LAYOUT, // + + JAVA_LONG.withName("tableOffset"), // + + MemoryLayout.paddingLayout(Arch.AmdZen.INSTANCE.cacheLineBytes()), // keep things cache-line-aligned + + // these change during open/close + + JAVA_LONG.withName("openedTimestamp"), // + JAVA_LONG.withName("closedTimestamp"), // + + MemoryLayout.paddingLayout(Arch.AmdZen.INSTANCE.cacheLineBytes()), // keep things cache-line-aligned + + // changes whenever file needs to grow + JAVA_LONG.withName("tableBytes"), // + + MemoryLayout.paddingLayout(Arch.AmdZen.INSTANCE.cacheLineBytes()), // keep things cache-line-aligned + + // changes whenever keys are added/removed + JAVA_LONG.withName("numKeys"), // + + JAVA_LONG.withName("minKey"), // + JAVA_LONG.withName("maxKey"), // + + MemoryLayout.paddingLayout(Arch.AmdZen.INSTANCE.cacheLineBytes()) // keep things cache-line-aligned + ).withByteAlignment(Arch.AmdZen.INSTANCE.cacheLineBytes()).withName("headerLayoutV1")) // + .sanityCheck(); + + static { + FILE_LAYOUT_BY_VERSION = Map.of(1L, FILE_LAYOUT_V1); + } + + public static FileLayout getCurrentLayout() { + return Header.layoutByVersion(1L); + } + + public static boolean isReadable( long version ) { + return FILE_LAYOUT_BY_VERSION.containsKey(version); + + } + + public static FileLayout layoutByVersion( long version ) { + return FILE_LAYOUT_BY_VERSION.get(version); + } + + private final MemorySegment _memorySegment; + + private final LeadIn _leadIn; + + private final VarHandle _tableOffset; + + private final VarHandle _minKey; + private final VarHandle _maxKey; + + private final VarHandle _openedTimestamp; + private final VarHandle _closedTimestamp; + + private final VarHandle _tableBytes; + + private final VarHandle _numKeys; + + public Header( FileLayout fileLayout, MemorySegment memorySegment ) { + + GroupLayout layout = fileLayout.headerLayout(); + + _memorySegment = memorySegment; + + _leadIn = new LeadIn(memorySegment); + + _tableOffset = layout.varHandle(groupElement("tableOffset")); + + _minKey = layout.varHandle(groupElement("minKey")); + _maxKey = layout.varHandle(groupElement("maxKey")); + + _openedTimestamp = layout.varHandle(groupElement("openedTimestamp")); + _closedTimestamp = layout.varHandle(groupElement("closedTimestamp")); + + _tableBytes = layout.varHandle(groupElement("tableBytes")); + + _numKeys = layout.varHandle(groupElement("numKeys")); + } + + public void flush() { + _memorySegment.force(); + } + + public long getClosedTimestamp() { + return getVolatile(_closedTimestamp); + } + + public long getMaxKey() { + return getVolatile(_maxKey); + } + + public long getMinKey() { + return getVolatile(_minKey); + } + + public long getNumKeys() { + return getVolatile(_numKeys); + } + + public long getOpenedTimestamp() { + return getVolatile(_openedTimestamp); + } + + public long getTableBytes() { + return getVolatile(_tableBytes); + } + + public long getTableOffset() { + return getVolatile(_tableOffset); + } + + public void incrementNumKeys( long difference ) { + _numKeys.getAndAdd(_memorySegment, 0L, difference); + } + + public void setClosedTimestamp( long closedTimestamp ) { + setVolatile(_closedTimestamp, closedTimestamp); + } + + public void setFileMagic( long fileMagic ) { + _leadIn.setFileMagic(fileMagic); + } + + public void setHeaderBytes( long headerBytes ) { + _leadIn.setHeaderBytes(headerBytes); + } + + public void setLayoutVersion( long layoutVersion ) { + _leadIn.setLayoutVersion(layoutVersion); + } + + public void setMaxKey( long maxKey ) { + setVolatile(_maxKey, maxKey); + } + + public void setMinKey( long minKey ) { + setVolatile(_minKey, minKey); + } + + public void setNumKeys( long numKeys ) { + setVolatile(_numKeys, numKeys); + } + + public void setOpenedTimestamp( long openedTimestamp ) { + setVolatile(_openedTimestamp, openedTimestamp); + } + + public void setTableBytes( long tableBytes ) { + setVolatile(_tableBytes, tableBytes); + } + + public void setTableOffset( long tableOffset ) { + setVolatile(_tableOffset, tableOffset); + } + + private long getVolatile( VarHandle varHandle ) { + return (long)varHandle.getVolatile(_memorySegment, 0L); + } + + private void setVolatile( VarHandle varHandle, long value ) { + varHandle.setVolatile(_memorySegment, 0L, value); + } + } + + + static final class ClosedRangeMmapLongIdIndex extends MmapLongIdIndex { + + private static long maxNumKeys( long minKey, long maxKey ) { + return maxKey - minKey + 1; + } + + private ClosedRangeMmapLongIdIndex( Dump dump, String fieldName, long minKey, long maxKey ) throws NoSuchFieldException { + super(dump, fieldName, minKey, maxKey); + init(); + } + + private ClosedRangeMmapLongIdIndex( Dump dump, FieldAccessor fieldAccessor, long minKey, long maxKey ) { + super(dump, fieldAccessor, minKey, maxKey); + init(); + } + + @Override + protected long initialTableSize() { + return maxNumKeys(_minKey, _maxKey) * Long.BYTES; + } + + @Override + protected void setPosAt( long index, long pos ) { + if ( index < 0 || index >= tableCapacity() ) { + throw new IndexOutOfBoundsException("index out of bounds"); + } + + setPosAtVolatile(index, pos); + } + } + + + static final class OpenRangeMmapLongIdIndex extends MmapLongIdIndex { + + private static long deriveMaxKeyFrom( long minKey ) { + // this basically rotates the zero-index of the array, just like simple index addition/subtraction does during offset compensation + // return Long.MAX_VALUE + minKey; // this totally screws up the < comparison + return minKey < 0 ? Long.MAX_VALUE + minKey : Long.MAX_VALUE; + } + + private final Object _growLock = new Object(); + + private OpenRangeMmapLongIdIndex( Dump dump, String fieldName, long minKey ) throws NoSuchFieldException { + super(dump, fieldName, minKey, deriveMaxKeyFrom(minKey)); + init(); + } + + private OpenRangeMmapLongIdIndex( Dump dump, FieldAccessor fieldAccessor, long minKey ) { + super(dump, fieldAccessor, minKey, deriveMaxKeyFrom(minKey)); + init(); + } + + protected long initialTableSize() { + return _fileLayout.blockSize() - _fileLayout.tableOffset(); // align everything to block boundaries + } + + @Override + protected void setPosAt( long index, long pos ) { + if ( index < 0 ) { + throw new IndexOutOfBoundsException("index out of bounds"); + } + + ensureTableContains(index); + setPosAtVolatile(index, pos); + } + + private void ensureTableContains( long index ) { + if ( index >= tableCapacity() ) { + synchronized ( _growLock ) { + if ( index >= tableCapacity() ) { + // fence post problem: we index the start, but segments need to cover everything to the end; hence +1 + growTableSegment((index + 1) * Long.BYTES); + } + } + } + } + } + + + private static final class IndexCorrections { + + Long numKeys; + + TLongLongMap rawContentCorrections; // no key or pos offsets here, just write values to array indexes verbatim + + TLongLongMap rawContentCorrections() { + if ( rawContentCorrections == null ) { + rawContentCorrections = new TLongLongHashMap(); + } + return rawContentCorrections; + } + } + + + /** + * Helper for reading in the bare minimum from existing files. + */ + private record LeadIn(MemorySegment _memorySegment) { + + public static final long FILE_MAGIC = 0x6861696C756C6672L; + + private static final GroupLayout LAYOUT = structLayout( // + + JAVA_LONG.withName("fileMagic"), // + JAVA_LONG.withName("layoutVersion"), // + JAVA_LONG.withName("headerBytes") // + + ).withByteAlignment(Arch.AmdZen.INSTANCE.cacheLineBytes()).withName("leadIn"); + + private static final VarHandle _fileMagic = LAYOUT.varHandle(groupElement("fileMagic")); + private static final VarHandle _layoutVersion = LAYOUT.varHandle(groupElement("layoutVersion")); + private static final VarHandle _headerBytes = LAYOUT.varHandle(groupElement("headerBytes")); + + public static long byteSize() { + return LAYOUT.byteSize(); // minimum read length: header version and bytes + } + + public long getFileMagic() { + return get(_fileMagic); + } + + public long getHeaderBytes() { + return get(_headerBytes); + } + + public long getLayoutVersion() { + return get(_layoutVersion); + } + + public void setFileMagic( long fileMagic ) { + setVolatile(_fileMagic, fileMagic); + } + + public void setHeaderBytes( long headerBytes ) { + setVolatile(_headerBytes, headerBytes); + } + + public void setLayoutVersion( long layoutVersion ) { + setVolatile(_layoutVersion, layoutVersion); + } + + private long get( VarHandle varHandle ) { + return (long)varHandle.get(_memorySegment, 0L); + } + + private void setVolatile( VarHandle varHandle, long value ) { + varHandle.setVolatile(_memorySegment, 0L, value); + } + } + + + private final class ConsistencyCheck { + + private final Header _header; + private final MemorySegment _tableSegment; + private final long _segmentCapacity; + + private boolean _consistent = true; + + private long _numKeysInIndex; + + public ConsistencyCheck( Header header, MemorySegment tableSegment ) { + _header = header; + _tableSegment = tableSegment; + _segmentCapacity = capacity(_tableSegment); + + _longArrayAccess = sequenceLayout(_segmentCapacity, JAVA_LONG).varHandle(sequenceElement()); + } + + public boolean perform() throws IOException { + long start = System.nanoTime(); + + preloadSegment(); + checkIndexContents(); + + if ( PARANOIA_MODE ) { + try (Arena arena = Arena.ofConfined()) { + MemorySegment dumpSegment = preloadDump(_dump, arena); + + checkDumpContents(); + + _log.info("{} releasing preload mapping of size {}", _lookupPath.getFileName(), dumpSegment.byteSize()); + } + } + + Duration duration = Duration.ofNanos(System.nanoTime() - start); + if ( _consistent ) { + _log.info("{} was checked and found to be {} in {}", _lookupPath.getFileName(), "consistent", duration); + } else { + _log.warn("{} was checked and found to be {} in {}", _lookupPath.getFileName(), "inconsistent", duration); + } + + return _consistent; + } + + private void checkDumpContents() { + long start = System.nanoTime(); + long numKeysInDump = 0; + + long maxKey = _header.getMaxKey(); + DumpIterator iterator = _dump.iterator(); + while ( iterator.hasNext() ) { + ++numKeysInDump; + + E element = iterator.next(); + + long elementKey = keyFor(element); + long elementIndex = keyOffsetApply(elementKey); // intentionally not bounds-checked + + // not used during live operation, hence concurrency is not an issue + long posInIndex = getPosAt(_tableSegment, elementIndex); + long posInDump = iterator.getPosition(); + + if ( elementKey > maxKey ) { + _consistent = false; + _log.warn("{} Dump contains element with key {}, but upper bound for key is {}! Will rebuild index.", _lookupPath.getFileName(), elementKey, + maxKey); + } else if ( elementIndex >= _segmentCapacity ) { + _log.info( + "{} Dump contains element with key {}, but segment capacity is {}. Has probably been added to dump already, but not yet to index; fixing.", + _lookupPath.getFileName(), elementKey, _segmentCapacity); + _indexCorrections.rawContentCorrections().put(keyOffsetApply(elementKey), posOffsetApply(posInDump)); + } + + if ( posInIndex != posInDump ) { + _consistent = false; + _log.warn("{} Index claims element with key {} to be at position {}, but dump insists on {}! Will rebuild index.", _lookupPath.getFileName(), + elementKey, posInIndex, posInDump); + } + } + if ( _numKeysInIndex != numKeysInDump ) { + _log.warn("{} numKeys differ between index ({}) and dump ({}), fixing.", _lookupPath.getFileName(), _numKeysInIndex, numKeysInDump); + _indexCorrections.numKeys = numKeysInDump; + } + + Duration duration = Duration.ofNanos(System.nanoTime() - start); + _log.info("{} was checked against dump iteration in {}", _lookupPath.getFileName(), duration); + } + + private void checkIndexContents() { + long start = System.nanoTime(); + long numKeysInIndex = 0; + + long dumpSize = _dump.getDumpSize(); + for ( long arrayIndex = 0; arrayIndex < _segmentCapacity; ++arrayIndex ) { + long position = getPosAt(_tableSegment, arrayIndex); + if ( position >= 0 ) { + ++numKeysInIndex; + + if ( position >= dumpSize ) { + _consistent = false; + _log.warn("{} Found position {} beyond the end of the dump file with size {}! Will rebuild index.", _lookupPath.getFileName(), position, + dumpSize); + } + + if ( _dump._deletedPositions.contains(position) ) { + _log.warn("{} Found deleted position {} at index {}. Has probably been deleted from dump already, but not yet from index; fixing.", + _lookupPath.getFileName(), position, arrayIndex); + _indexCorrections.rawContentCorrections().put(arrayIndex, 0); + } + } + } + if ( _header.getNumKeys() != numKeysInIndex ) { + _log.info("{} numKeys differ between caching header ({}) and bare count in actual table ({}), fixing.", _lookupPath.getFileName(), + _header.getNumKeys(), numKeysInIndex); + _indexCorrections.numKeys = numKeysInIndex; + } + + _numKeysInIndex = numKeysInIndex; + Duration duration = Duration.ofNanos(System.nanoTime() - start); + _log.info("{} had its contents checked in {}", _lookupPath.getFileName(), duration); + } + + private MemorySegment preloadDump( Dump dump, Arena arena ) throws IOException { + long start = System.nanoTime(); + + Path dumpPath = Paths.get(dump._dumpFile.getPath()); + long dumpSize = dump.getDumpSize(); + + try (FileChannel readOnlyFileChannel = FileChannel.open(dumpPath, READ)) { + MemorySegment dumpSegment = readOnlyFileChannel.map(READ_ONLY, 0, dumpSize, arena); + dumpSegment.load(); // force-fetch into memory + + Duration duration = Duration.ofNanos(System.nanoTime() - start); + _log.info("{} was mapped and preloaded in {}", dumpPath.getFileName(), duration); + return dumpSegment; + } + } + + private void preloadSegment() { + long start = System.nanoTime(); + + _tableSegment.load(); // force-fetch into memory + + Duration duration = Duration.ofNanos(System.nanoTime() - start); + _log.info("{} was preloaded in {}", _lookupPath.getFileName(), duration); + } + } +} diff --git a/dump/src/util/dump/UniqueConstraint.java b/dump/src/util/dump/UniqueConstraint.java new file mode 100644 index 0000000..6f24be4 --- /dev/null +++ b/dump/src/util/dump/UniqueConstraint.java @@ -0,0 +1,30 @@ +package util.dump; + +public interface UniqueConstraint { + + boolean contains( int key ); + boolean contains( long key ); + boolean contains( Object key ); + + long[] getAllLongKeys(); + + Object getKey( E o ); + + int getNumKeys(); + + E lookup( int key ); + E lookup( long key ); + E lookup( Object key ); + + /** + * This Exception is thrown, when trying to add a non-unique index-value to a dump. + */ + class DuplicateKeyException extends RuntimeException { + + private static final long serialVersionUID = -7959993269514169802L; + + public DuplicateKeyException( String message ) { + super(message); + } + } +} diff --git a/dump/src/util/dump/UniqueIndex.java b/dump/src/util/dump/UniqueIndex.java index e706f03..7b8edca 100644 --- a/dump/src/util/dump/UniqueIndex.java +++ b/dump/src/util/dump/UniqueIndex.java @@ -27,7 +27,7 @@ import util.dump.stream.SingleTypeObjectInputStream; -public class UniqueIndex extends DumpIndex { +public class UniqueIndex extends DumpIndex implements UniqueConstraint { protected TObjectLongHashMap _lookupObject; protected TLongLongHashMap _lookupLong; @@ -156,6 +156,7 @@ public int[] getAllIntKeys() { return _lookupInt.keys(); } + @Override public long[] getAllLongKeys() { return _lookupLong.keys(); } @@ -178,6 +179,7 @@ public TLongList getAllPositions() { return pos; } + @Override public Object getKey( E o ) { if ( _fieldIsInt ) { return getIntKey(o); @@ -202,6 +204,7 @@ public int getNumKeys() { throw new IllegalStateException("weird, all lookup maps are null"); } + @Override public E lookup( int key ) { synchronized ( _dump ) { if ( !_fieldIsInt ) { @@ -210,12 +213,13 @@ public E lookup( int key ) { } long pos = getPosition(key); if ( pos < 0 ) { - return (E)null; + return null; } return _dump.get(pos); } } + @Override public E lookup( long key ) { synchronized ( _dump ) { if ( !_fieldIsLong ) { @@ -224,12 +228,13 @@ public E lookup( long key ) { } long pos = getPosition(key); if ( pos < 0 ) { - return (E)null; + return null; } return _dump.get(pos); } } + @Override public E lookup( Object key ) { synchronized ( _dump ) { if ( (_fieldIsLong || _fieldIsLongObject) && key instanceof Long ) { @@ -244,7 +249,7 @@ public E lookup( Object key ) { } long pos = getPosition(key); if ( pos < 0 ) { - return (E)null; + return null; } return _dump.get(pos); } @@ -680,16 +685,4 @@ private void addToIgnoredPositions( long pos ) { } } - /** - * This Exception is thrown, when trying to add a non-unique index-value to a dump. - */ - public static class DuplicateKeyException extends RuntimeException { - - private static final long serialVersionUID = -7959993269514169802L; - - public DuplicateKeyException( String message ) { - super(message); - } - } - } diff --git a/dump/test/util/dump/MmapLongIdIndexTest.java b/dump/test/util/dump/MmapLongIdIndexTest.java new file mode 100644 index 0000000..53fff83 --- /dev/null +++ b/dump/test/util/dump/MmapLongIdIndexTest.java @@ -0,0 +1,620 @@ +package util.dump; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.function.BiFunction; + +import org.assertj.core.util.Arrays; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import gnu.trove.set.TIntSet; +import gnu.trove.set.hash.TIntHashSet; +import junit.framework.Assert; +import util.dump.reflection.FieldFieldAccessor; +import util.dump.reflection.Reflection; + + +@RunWith(Parameterized.class) +public class MmapLongIdIndexTest { + + private static final String DUMP_FILENAME = "DumpTest.dmp"; + private static final int READ_NUMBER = 1000; + private static final int BEAN_SIZE = 10; + private static File _tmpdir; + + @Parameters + public static Collection getDumpSizesToTestFor() { + List parameters = new ArrayList<>(); + parameters.add(Arrays.array(10)); + parameters.add(Arrays.array(1000)); + parameters.add(Arrays.array(100000)); + return parameters; + } + + @BeforeClass + public static void setUpTmpdir() throws IOException { + _tmpdir = new File("target", "tmp"); + _tmpdir.mkdirs(); + if ( !_tmpdir.isDirectory() ) { + throw new IOException("unable to create temporary directory: " + _tmpdir.getAbsolutePath()); + } + System.setProperty("java.io.tmpdir", _tmpdir.getAbsolutePath()); + } + + private Random _random; + + private final int _dumpSize; + private final long _negativeOffset; + + private final BiFunction, Field, MmapLongIdIndex> _createOpenRangeIndex = // + ( dump, field ) -> MmapLongIdIndex.forOpenRange(dump, new FieldFieldAccessor(field), minId()); + private final BiFunction, Field, MmapLongIdIndex> _createClosedRangeIndex = // + ( dump, field ) -> MmapLongIdIndex.forClosedRange(dump, new FieldFieldAccessor(field), minId(), maxId()); + + public MmapLongIdIndexTest( Integer dumpSize ) { + _dumpSize = dumpSize; + _negativeOffset = -_dumpSize; // need to cater to generated negative indexed + } + + @Before + @After + public void deleteOldTestDumps() { + File[] dumpFile = _tmpdir.listFiles(new FileFilter() { + + @Override + public boolean accept( File f ) { + return f.getName().startsWith("DumpTest."); + } + }); + for ( File df : dumpFile ) { + if ( !df.delete() ) { + System.out.println("Failed to delete old dump file " + df); + } + } + } + + @Before + public void initRandom() { + long seed = System.currentTimeMillis(); + _random = new Random(seed); + System.out.println("Seed used for this DumpTest run: " + seed); + } + + @After + public void printMemory() { + System.gc(); + long mem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.out.println(mem / (1024 * 1024) + " MB used after test run"); + } + + @Test + public void testGetNumKeys() throws Exception { + File dumpFile = new File(_tmpdir, DUMP_FILENAME); + Dump dump = new Dump<>(Bean.class, dumpFile); + try { + int numBeansToAddForTest = 500; + for ( int i = 0; i < numBeansToAddForTest; i++ ) { + dump.add(new Bean(i, null)); + } + + // reopen dump + dump.close(); + dump = new Dump<>(Bean.class, dumpFile); + + DumpIndex intIndex = MmapLongIdIndex.forOpenRange(dump, "_idInt", _negativeOffset); + DumpIndex longIndex = MmapLongIdIndex.forOpenRange(dump, "_idLong", _negativeOffset); + + assertThat(longIndex.getNumKeys()).isEqualTo(numBeansToAddForTest); + assertThat(intIndex.getNumKeys()).isEqualTo(numBeansToAddForTest); + + int deleted = 0; + for ( Bean bean : dump ) { + if ( bean._idInt % 2 == 0 ) { + dump.deleteLast(); + deleted++; + } + } + + assertThat(longIndex.getNumKeys()).isEqualTo(numBeansToAddForTest - deleted); + assertThat(intIndex.getNumKeys()).isEqualTo(numBeansToAddForTest - deleted); + } + finally { + dump.close(); + } + } + + + @Test + public void testIntKey_OpenRangeIndex() throws Exception { + testIntKeyIndex(_createOpenRangeIndex); + } + + @Test + public void testIntKey_ClosedRangeIndex() throws Exception { + testIntKeyIndex(_createClosedRangeIndex); + } + + private void testIntKeyIndex(BiFunction, Field, MmapLongIdIndex> createIndex) throws Exception { + testIndex("_idInt", new TestConfiguration() { + + @Override + public Object createKey( int id ) { + return id; + } + }, createIndex); + } + + @Test + public void testLongKey_OpenRangeIndex() throws Exception { + testLongKeyIndex(_createOpenRangeIndex); + } + + @Test + public void testLongKey_ClosedRangeIndex() throws Exception { + testLongKeyIndex(_createClosedRangeIndex); + } + + private void testLongKeyIndex(BiFunction, Field, MmapLongIdIndex> createIndex) throws Exception { + testIndex("_idLong", new TestConfiguration() { + + @Override + public Object createKey( int id ) { + return (long)id; + } + }, createIndex); + } + + @Test + public void testLongObjectKey_OpenRangeIndex() throws Exception { + testLongObjectKeyIndex(_createOpenRangeIndex); + } + + @Test + public void testLongObjectKey_ClosedRangeIndex() throws Exception { + testLongObjectKeyIndex(_createClosedRangeIndex); + } + + private void testLongObjectKeyIndex(BiFunction, Field, MmapLongIdIndex> createIndex) throws Exception { + testIndex("_idLongObject", new TestConfiguration() { + + @Override + public Object createKey( int id ) { + return (long)id; + } + }, createIndex); + } + + @Test + public void testRecreateIndex_ClosedRange() throws IOException, NoSuchFieldException { + testRecreateIndex(_createClosedRangeIndex); + } + + @Test + public void testRecreateIndex_OpenRange() throws IOException, NoSuchFieldException { + testRecreateIndex(_createOpenRangeIndex); + } + + protected void testIndex( String fieldName, TestConfiguration config, BiFunction, Field, MmapLongIdIndex> createIndex) throws Exception { + + testLateOpenIndex(fieldName, config, createIndex); + + File dumpFile = new File(_tmpdir, DUMP_FILENAME); + + deleteOldTestDumps(); + + /* create dump and index */ + Dump dump = new Dump<>(Bean.class, dumpFile); + try { + Field field = Reflection.getField(Bean.class, fieldName); + UniqueConstraint index = MmapLongIdIndex.forOpenRange(dump, new FieldFieldAccessor(field), _negativeOffset); + + fillDump(dump); + + validateNumKeys(dump, index); + + testLookup(config, field, index); + + dump.close(); + + System.out.println("Closing and re-opening dump"); + + dump = new Dump<>(Bean.class, dumpFile); + index = MmapLongIdIndex.forOpenRange(dump, new FieldFieldAccessor(field), _negativeOffset); + + validateNumKeys(dump, index); + + testLookup(config, field, index); + + /* test lookup of non-existing key */ + Object k = config.createKey(_dumpSize + 1); + Bean nonExistingBean = index.lookup(k); + Assert.assertNull(nonExistingBean); + + /* iterate dump and delete half of it */ + long t = System.currentTimeMillis(); + int id = 0; + int deletions = 0; + for ( Bean bean : dump ) { + Assert.assertEquals(config.createKey(id), field.get(bean)); + Assert.assertTrue("unexpected bean data", bean._data.startsWith("" + id)); + if ( id % 2 == 0 ) { + Bean deleted = dump.deleteLast(); + Assert.assertEquals("deleted bean != iterated bean", deleted, bean); + deletions++; + } + id++; + } + System.out.println("Iterated the whole dump. Deleted " + deletions + " items. Needed " + (System.currentTimeMillis() - t) + " ms."); + + /* lookup and assert deletions */ + t = System.currentTimeMillis(); + for ( int j = 0; j < READ_NUMBER; j++ ) { + id = _random.nextInt(_dumpSize); + k = config.createKey(id); + Bean bean = index.lookup(k); + if ( id % 2 == 0 ) { + Assert.assertNull("deleted Bean with index " + k + " is still accessable", bean); + } else { + Assert.assertNotNull("no Bean for index " + k, bean); + Assert.assertEquals(k, field.get(bean)); + Assert.assertTrue(bean._data.startsWith(id + "-")); + } + } + System.out.println("Read " + READ_NUMBER + " instances from dump. Needed " + (System.currentTimeMillis() - t) / (float)READ_NUMBER + " ms/instance."); + + /* iterate dump and update beans */ + t = System.currentTimeMillis(); + id = 1; + int updates = 0; + for ( Bean bean : dump ) { + Assert.assertEquals(config.createKey(id), field.get(bean)); + Assert.assertTrue("unexpected bean data", bean._data.startsWith("" + id)); + if ( id % 3 == 0 ) { + /* update without changing externalization size of bean */ + long oldDumpSize = dump._outputStream._n; + Bean updatedBean = new Bean(-bean._idInt, bean._data); + Bean oldVersion = dump.updateLast(updatedBean); + Assert.assertEquals("old bean != iterated bean", oldVersion, bean); + Assert.assertEquals("dump has grown, even though the update was overwrite compatible", oldDumpSize, dump._outputStream._n); + updates++; + } else { + /* update and change externalization size of bean */ + Bean updatedBean = new Bean(bean._idInt, bean._data.replaceFirst("-", "++")); + Bean oldVersion = dump.updateLast(updatedBean); + Assert.assertEquals("old bean != iterated bean", oldVersion, bean); + updates++; + } + id += 2; + } + for ( Bean bean : dump ) { + id = bean._idInt; + if ( id > 0 && id % 7 == 0 ) { + /* update and change externalization size of bean */ + Bean updatedBean = new Bean(-bean._idInt, bean._data.replaceFirst("-", "++")); + Bean oldVersion = dump.updateLast(updatedBean); + Assert.assertEquals("old bean != iterated bean", oldVersion, bean); + updates++; + } + } + System.out.println("Iterated the whole dump. Updated " + updates + " items. Needed " + (System.currentTimeMillis() - t) + " ms."); + + validateNumKeys(dump, index); + + testLookupAfterUpdates(config, field, index); + + dump.close(); + + System.out.println("Closing and re-opening dump"); + + dump = new Dump<>(Bean.class, dumpFile); + index = MmapLongIdIndex.forOpenRange(dump, new FieldFieldAccessor(field), _negativeOffset); + + validateNumKeys(dump, index); + + testLookupAfterUpdates(config, field, index); + + dump.close(); + + /* delete index meta file to invalidate the index */ + File[] metaFiles = _tmpdir.listFiles(new FileFilter() { + + @Override + public boolean accept( File f ) { + return f.getName().startsWith("DumpTest.") && f.getName().endsWith("meta"); + } + }); + for ( File df : metaFiles ) { + Assert.assertTrue("Failed to delete meta file " + df, df.delete()); + } + /* re-open, enforcing the index to be re-created */ + dump = new Dump<>(Bean.class, dumpFile); + index = MmapLongIdIndex.forOpenRange(dump, new FieldFieldAccessor(field), _negativeOffset); + + validateNumKeys(dump, index); + + /* after having re-created the index, repeat last test */ + testLookupAfterUpdates(config, field, index); + + } + finally { + dump.close(); + } + } + + private void fillDump( Dump dump ) throws IOException { + StringBuilder sb = new StringBuilder("-"); + for ( int i = 0; i < BEAN_SIZE - 15; i++ ) { // 15 is an estimation for the size of the Bean instance without this padding + sb.append('0'); + } + String padding = sb.toString(); + + /* add some elements */ + long t = System.currentTimeMillis(); + for ( int i = 0; i < _dumpSize; i++ ) { + dump.add(new Bean(i, i + padding)); + } + System.out.println("Written " + _dumpSize + " instances to dump. Needed " + (System.currentTimeMillis() - t) / (float)_dumpSize + " ms/instance."); + } + + private long maxId() { + return _dumpSize; + } + + private long minId() { + return _negativeOffset; + } + + private void testLateOpenIndex( String fieldName, TestConfiguration config, BiFunction, Field, MmapLongIdIndex> createIndex ) throws Exception { + File dumpFile = new File(_tmpdir, DUMP_FILENAME); + + /* create dump and index */ + Dump dump = new Dump<>(Bean.class, dumpFile); + try { + Field field = Reflection.getField(Bean.class, fieldName); + + fillDump(dump); + UniqueConstraint index = createIndex.apply(dump, field); + + testLookup(config, field, index); + } + finally { + dump.close(); + } + } + + private void testLookup( TestConfiguration config, Field field, UniqueConstraint index ) throws IllegalAccessException { + long t; + t = System.currentTimeMillis(); + for ( int j = 0; j < READ_NUMBER; j++ ) { + int id = _random.nextInt(_dumpSize); + Object k = config.createKey(id); + Bean bean = index.lookup(k); + Assert.assertNotNull("no Bean for index " + k, bean); + Assert.assertEquals(k, field.get(bean)); + Assert.assertTrue(bean._data.startsWith(id + "-")); + } + System.out.println("Read " + READ_NUMBER + " instances from dump. Needed " + (System.currentTimeMillis() - t) / (float)READ_NUMBER + " ms/instance."); + } + + private void testLookupAfterUpdates( TestConfiguration config, Field field, UniqueConstraint index ) throws IllegalAccessException { + long t; + Object k; + int id; + t = System.currentTimeMillis(); + for ( int j = 0; j < READ_NUMBER; j++ ) { + id = _random.nextInt(_dumpSize); + if ( id % 3 == 0 || id % 7 == 0 ) { + id = -id; + } + k = config.createKey(id); + Bean bean = index.lookup(k); + if ( id % 2 == 0 ) { + Assert.assertNull("deleted Bean with index " + k + " is still accessible", bean); + } else if ( id % 7 == 0 && id % 3 != 0 ) { + Assert.assertNotNull("no Bean for index " + k, bean); + Assert.assertEquals(config.createKey(id), field.get(bean)); + Assert.assertTrue("bean data wrong: id=" + id + ", data=" + bean._data, bean._data.startsWith(-id + "++")); + } else if ( Math.abs(id) % 3 == 0 ) { + Assert.assertNotNull("no Bean for index " + k, bean); + Assert.assertEquals(config.createKey(id), field.get(bean)); + Assert.assertTrue("bean data wrong: id=" + id + ", data=" + bean._data, bean._data.startsWith((-id) + "-")); + } else { + Assert.assertNotNull("no Bean for index " + k, bean); + Assert.assertEquals(k, field.get(bean)); + Assert.assertTrue(bean._data.startsWith(id + "++")); + } + } + System.out.println("Read " + READ_NUMBER + " instances from dump. Needed " + (System.currentTimeMillis() - t) / (float)READ_NUMBER + " ms/instance."); + } + + private void testRecreateIndex( BiFunction, Field, MmapLongIdIndex> createIndex ) throws NoSuchFieldException, IOException { + File dumpFile = new File(_tmpdir, DUMP_FILENAME); + + Dump dump = new Dump<>(Bean.class, dumpFile); + try { + Field field = Reflection.getField(Bean.class, "_idInt"); + UniqueConstraint index = createIndex.apply(dump, field); + + validateNumKeys(dump, index); + + fillDump(dump); + + validateNumKeys(dump, index); + + dump.close(); + + System.out.println("Closing and re-opening dump, deleting index"); + Assert.assertTrue("Failed to delete index", + new File(_tmpdir, DUMP_FILENAME + "._idInt.mmap.lookup").delete() && !new File(_tmpdir, DUMP_FILENAME + "._idInt.mmap.lookup").exists()); + + dump = new Dump<>(Bean.class, dumpFile); + index = MmapLongIdIndex.forOpenRange(dump, new FieldFieldAccessor(field), _negativeOffset); + + long t = System.currentTimeMillis(); + for ( int j = 0; j < READ_NUMBER; j++ ) { + int i = _random.nextInt(_dumpSize); + Bean bean = index.lookup(i); + Assert.assertNotNull("no Bean for index " + i, bean); + Assert.assertEquals(i, bean._idInt); + Assert.assertTrue(bean._data.startsWith(i + "-")); + } + System.out.println("Read " + READ_NUMBER + " instances from dump. Needed " + (System.currentTimeMillis() - t) / (float)READ_NUMBER + " ms/instance."); + + Bean nonExistingBean = index.lookup(_dumpSize + 1); + Assert.assertNull(nonExistingBean); + } + finally { + dump.close(); + } + } + + private void validateNumKeys( Dump dump, UniqueConstraint index ) { + // count keys + TIntSet keys = new TIntHashSet(); + for ( Bean bean : dump ) { + keys.add(bean._idInt); + } + + int numKeys = index.getNumKeys(); + assertThat(numKeys).isEqualTo(keys.size()); + } + + public static class Bean implements ExternalizableBean { + + @externalize(1) + long _idLong; + @externalize(2) + int _idInt; + @externalize(3) + String _idString; + @externalize(4) + Long _idLongObject; + @externalize(5) + ExternalizableId _idExternalizable; + @externalize(10) + String _data; + + public Bean() { + // for Externalization + } + + public Bean( int id, String data ) { + _idLong = id; + _idInt = id; + _idString = (id < 0 ? "" : "+") + id; + _idLongObject = (long)id; + _idExternalizable = new ExternalizableId(id); + _data = data; + } + + @Override + public boolean equals( Object obj ) { + if ( this == obj ) { + return true; + } + if ( obj == null ) { + return false; + } + if ( getClass() != obj.getClass() ) { + return false; + } + Bean other = (Bean)obj; + if ( _data == null ) { + if ( other._data != null ) { + return false; + } + } else if ( !_data.equals(other._data) ) { + return false; + } + if ( _idExternalizable == null ) { + if ( other._idExternalizable != null ) { + return false; + } + } else if ( !_idExternalizable.equals(other._idExternalizable) ) { + return false; + } + if ( _idInt != other._idInt ) { + return false; + } + if ( _idLong != other._idLong ) { + return false; + } + if ( _idLongObject == null ) { + if ( other._idLongObject != null ) { + return false; + } + } else if ( !_idLongObject.equals(other._idLongObject) ) { + return false; + } + if ( _idString == null ) { + return other._idString == null; + } else { + return _idString.equals(other._idString); + } + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException("hashCode() not needed."); + } + } + + + public static class ExternalizableId implements ExternalizableBean { + + @externalize(1) + long _id; + + public ExternalizableId() { + // for Externalization + } + + public ExternalizableId( long id ) { + _id = id; + } + + @Override + public boolean equals( Object obj ) { + if ( this == obj ) { + return true; + } + if ( obj == null ) { + return false; + } + if ( getClass() != obj.getClass() ) { + return false; + } + ExternalizableId other = (ExternalizableId)obj; + return _id == other._id; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int)(_id ^ (_id >>> 32)); + return result; + } + } + + + protected static abstract class TestConfiguration { + + public abstract Object createKey( int id ); + } + +} diff --git a/dump/test/util/dump/UniqueIndexTest.java b/dump/test/util/dump/UniqueIndexTest.java index 4746e85..8e4103c 100644 --- a/dump/test/util/dump/UniqueIndexTest.java +++ b/dump/test/util/dump/UniqueIndexTest.java @@ -382,11 +382,12 @@ private void fillDump( Dump dump ) throws IOException { for ( int i = 0; i < BEAN_SIZE - 15; i++ ) { // 15 is an estimation for the size of the Bean instance without this padding sb.append('0'); } + String padding = sb.toString(); /* add some elements */ long t = System.currentTimeMillis(); for ( int i = 0; i < _dumpSize; i++ ) { - dump.add(new Bean(i, i + sb.toString())); + dump.add(new Bean(i, i + padding)); } System.out.println("Written " + _dumpSize + " instances to dump. Needed " + (System.currentTimeMillis() - t) / (float)_dumpSize + " ms/instance."); } diff --git a/pom.xml b/pom.xml index b1dcd71..f7401a1 100644 --- a/pom.xml +++ b/pom.xml @@ -25,8 +25,8 @@ maven-compiler-plugin UTF-8 - 17 - 17 + 22 + 22 @@ -45,7 +45,9 @@ maven-surefire-plugin 3.3.1 - --add-opens java.base/java.util=ALL-UNNAMED + + --add-opens java.base/java.util=ALL-UNNAMED +