Skip to content
This repository was archived by the owner on Jan 18, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (e *Encoder) Encode(v any) error {
return e.w.Error
}

// Reset resets the encoder to write to a new io.Writer.
func (e *Encoder) Reset(w io.Writer) {
e.w.Reset(w)
}

// Marshal returns the Avro encoding of v.
func Marshal(schema Schema, v any) ([]byte, error) {
return DefaultConfig.Marshal(schema, v)
Expand Down
32 changes: 32 additions & 0 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ type Encoder struct {
blockLength int
count int
blockSize int

// Stored for Reset.
header Header
}

// NewEncoder returns a new encoder that writes to w using schema s.
Expand Down Expand Up @@ -419,6 +422,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e
codec: codec,
blockLength: cfg.BlockLength,
blockSize: cfg.BlockSize,
header: header,
}
return e, nil
}
Expand Down Expand Up @@ -501,6 +505,34 @@ func (e *Encoder) Close() error {
return e.Flush()
}

// Reset flushes any pending data, resets the encoder to write to a new io.Writer,
// and writes a fresh header with a new sync marker. The schema, codec, and other
// settings are preserved from the original encoder.
// This allows reusing the encoder for multiple files without reallocating buffers.
func (e *Encoder) Reset(w io.Writer) error {
if err := e.Flush(); err != nil {
return err
}

// Generate new sync marker for the new file.
_, _ = rand.Read(e.header.Sync[:])
e.sync = e.header.Sync

// Reset writer to new output and write header.
e.writer.Reset(w)
e.writer.WriteVal(HeaderSchema, e.header)
if err := e.writer.Flush(); err != nil {
return err
}

// Reset buffer and encoder.
e.buf.Reset()
e.encoder.Reset(e.buf)
e.count = 0

return nil
}

func (e *Encoder) writerBlock() error {
e.writer.WriteLong(int64(e.count))

Expand Down
202 changes: 202 additions & 0 deletions ocf/ocf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,3 +1312,205 @@ type errorHeaderWriter struct{}
func (*errorHeaderWriter) Write(p []byte) (int, error) {
return 0, errors.New("test")
}

// TestEncoder_Reset tests that Reset allows reusing encoder for multiple files.
func TestEncoder_Reset(t *testing.T) {
record1 := FullRecord{
Strings: []string{"first", "record"},
Longs: []int64{},
Enum: "A",
Map: map[string]int{},
Record: &TestRecord{Long: 1},
}
record2 := FullRecord{
Strings: []string{"second", "record"},
Longs: []int64{},
Enum: "B",
Map: map[string]int{},
Record: &TestRecord{Long: 2},
}

// Create first file
buf1 := &bytes.Buffer{}
enc, err := ocf.NewEncoder(schema, buf1)
require.NoError(t, err)

err = enc.Encode(record1)
require.NoError(t, err)

err = enc.Close()
require.NoError(t, err)

// Reset to write to second file
buf2 := &bytes.Buffer{}
err = enc.Reset(buf2)
require.NoError(t, err)

err = enc.Encode(record2)
require.NoError(t, err)

err = enc.Close()
require.NoError(t, err)

// Verify first file
dec1, err := ocf.NewDecoder(buf1)
require.NoError(t, err)

require.True(t, dec1.HasNext())
var got1 FullRecord
err = dec1.Decode(&got1)
require.NoError(t, err)
assert.Equal(t, record1, got1)
require.False(t, dec1.HasNext())

// Verify second file
dec2, err := ocf.NewDecoder(buf2)
require.NoError(t, err)

require.True(t, dec2.HasNext())
var got2 FullRecord
err = dec2.Decode(&got2)
require.NoError(t, err)
assert.Equal(t, record2, got2)
require.False(t, dec2.HasNext())
}

// TestEncoder_ResetWithPendingData tests Reset flushes pending data.
func TestEncoder_ResetWithPendingData(t *testing.T) {
buf1 := &bytes.Buffer{}
enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithBlockLength(10))
require.NoError(t, err)

// Write data but don't close (pending data)
err = enc.Encode(int64(42))
require.NoError(t, err)

// Reset should flush the pending data
buf2 := &bytes.Buffer{}
err = enc.Reset(buf2)
require.NoError(t, err)

// Verify first file has the data
dec1, err := ocf.NewDecoder(buf1)
require.NoError(t, err)

require.True(t, dec1.HasNext())
var got int64
err = dec1.Decode(&got)
require.NoError(t, err)
assert.Equal(t, int64(42), got)
}

// TestEncoder_ResetGeneratesNewSyncMarker tests that each reset creates a new sync marker.
func TestEncoder_ResetGeneratesNewSyncMarker(t *testing.T) {
buf1 := &bytes.Buffer{}
enc, err := ocf.NewEncoder(`"long"`, buf1)
require.NoError(t, err)

err = enc.Encode(int64(1))
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)

// Get sync marker from first file
dec1, err := ocf.NewDecoder(bytes.NewReader(buf1.Bytes()))
require.NoError(t, err)

reader1 := avro.NewReader(bytes.NewReader(buf1.Bytes()), 1024)
var h1 ocf.Header
reader1.ReadVal(ocf.HeaderSchema, &h1)
require.NoError(t, reader1.Error)
sync1 := h1.Sync

// Reset to second buffer
buf2 := &bytes.Buffer{}
err = enc.Reset(buf2)
require.NoError(t, err)

err = enc.Encode(int64(2))
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)

// Get sync marker from second file
reader2 := avro.NewReader(bytes.NewReader(buf2.Bytes()), 1024)
var h2 ocf.Header
reader2.ReadVal(ocf.HeaderSchema, &h2)
require.NoError(t, reader2.Error)
sync2 := h2.Sync

// Sync markers should be different
assert.NotEqual(t, sync1, sync2, "each file should have a unique sync marker")

// But both files should be readable
_ = dec1
dec2, err := ocf.NewDecoder(buf2)
require.NoError(t, err)
require.True(t, dec2.HasNext())
}

// TestEncoder_ResetMultipleTimes tests multiple sequential resets.
func TestEncoder_ResetMultipleTimes(t *testing.T) {
buffers := make([]*bytes.Buffer, 5)
for i := range buffers {
buffers[i] = &bytes.Buffer{}
}

enc, err := ocf.NewEncoder(`"long"`, buffers[0])
require.NoError(t, err)

for i := 0; i < 5; i++ {
if i > 0 {
err = enc.Reset(buffers[i])
require.NoError(t, err)
}

err = enc.Encode(int64(i * 10))
require.NoError(t, err)

err = enc.Close()
require.NoError(t, err)
}

// Verify all files
for i := 0; i < 5; i++ {
dec, err := ocf.NewDecoder(buffers[i])
require.NoError(t, err, "file %d", i)

require.True(t, dec.HasNext(), "file %d", i)
var got int64
err = dec.Decode(&got)
require.NoError(t, err, "file %d", i)
assert.Equal(t, int64(i*10), got, "file %d", i)
}
}

// TestEncoder_ResetPreservesCodec tests that codec is preserved across reset.
func TestEncoder_ResetPreservesCodec(t *testing.T) {
buf1 := &bytes.Buffer{}
enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithCodec(ocf.Deflate))
require.NoError(t, err)

err = enc.Encode(int64(1))
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)

buf2 := &bytes.Buffer{}
err = enc.Reset(buf2)
require.NoError(t, err)

err = enc.Encode(int64(2))
require.NoError(t, err)
err = enc.Close()
require.NoError(t, err)

// Both files should use deflate codec
dec1, err := ocf.NewDecoder(buf1)
require.NoError(t, err)
assert.Equal(t, []byte("deflate"), dec1.Metadata()["avro.codec"])

dec2, err := ocf.NewDecoder(buf2)
require.NoError(t, err)
assert.Equal(t, []byte("deflate"), dec2.Metadata()["avro.codec"])
}