Skip to content
This repository was archived by the owner on Jan 18, 2026. It is now read-only.
93 changes: 72 additions & 21 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ type decoderConfig struct {
CodecOptions codecOptions
}

func newDecoderConfig(opts ...DecoderFunc) *decoderConfig {
cfg := decoderConfig{
DecoderConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
CodecOptions: codecOptions{
DeflateCompressionLevel: flate.DefaultCompression,
},
}
for _, opt := range opts {
opt(&cfg)
}
return &cfg
}

Comment on lines +62 to +75
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that this change helped anything.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoiding duplicate code.

// DecoderFunc represents a configuration function for Decoder.
type DecoderFunc func(cfg *decoderConfig)

Expand Down Expand Up @@ -96,23 +110,16 @@ type Decoder struct {
codec Codec

count int64
size int64
n int64
}

// NewDecoder returns a new decoder that reads from reader r.
func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
cfg := decoderConfig{
DecoderConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
CodecOptions: codecOptions{
DeflateCompressionLevel: flate.DefaultCompression,
},
}
for _, opt := range opts {
opt(&cfg)
}

reader := avro.NewReader(r, 1024)

cfg := newDecoderConfig(opts...)

h, err := readHeader(reader, cfg.SchemaCache, cfg.CodecOptions)
if err != nil {
return nil, fmt.Errorf("decoder: %w", err)
Expand All @@ -131,6 +138,31 @@ func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
}, nil
}

// NewDecoderWithHeader returns a new decoder that reads from reader r using the provided header.
func NewDecoderWithHeader(r *avro.Reader, h *OCFHeader, opts ...DecoderFunc) (*Decoder, error) {
cfg := newDecoderConfig(opts...)
decReader := bytesx.NewResetReader([]byte{})
return &Decoder{
reader: r,
resetReader: decReader,
decoder: cfg.DecoderConfig.NewDecoder(h.Schema, decReader),
meta: h.Meta,
sync: h.Sync,
codec: h.Codec,
schema: h.Schema,
}, nil
}

// DecodeHeader reads and decodes the Avro container file header from r.
func DecodeHeader(r *avro.Reader, opts ...DecoderFunc) (*OCFHeader, error) {
cfg := newDecoderConfig(opts...)
h, err := readHeader(r, cfg.SchemaCache, cfg.CodecOptions)
if err != nil {
return nil, fmt.Errorf("decoder: %w", err)
}
return h, nil
}

// Metadata returns the header metadata.
func (d *Decoder) Metadata() map[string][]byte {
return d.meta
Expand All @@ -145,8 +177,8 @@ func (d *Decoder) Schema() avro.Schema {
// HasNext determines if there is another value to read.
func (d *Decoder) HasNext() bool {
if d.count <= 0 {
count := d.readBlock()
d.count = count
d.count, d.size = d.readBlock()
d.n = d.count
}

if d.reader.Error != nil {
Expand Down Expand Up @@ -184,11 +216,29 @@ func (d *Decoder) Close() error {
return nil
}

func (d *Decoder) readBlock() int64 {
// BlockStatus represents the status of the current block.
type BlockStatus struct {
Current int64
Count int64
Size int64
Offset int64
}

// BlockStatus returns the current block status.
func (d *Decoder) BlockStatus() *BlockStatus {
return &BlockStatus{
Current: d.n - d.count + 1,
Count: d.n,
Size: d.size,
Offset: d.reader.InputOffset(),
}
}

func (d *Decoder) readBlock() (int64, int64) {
_ = d.reader.Peek()
if errors.Is(d.reader.Error, io.EOF) {
// There is no next block
return 0
return 0, 0
}

count := d.reader.ReadLong()
Expand Down Expand Up @@ -220,7 +270,7 @@ func (d *Decoder) readBlock() int64 {
d.reader.Error = errors.New("decoder: invalid block")
}

return count
return count, size
}

type encoderConfig struct {
Expand Down Expand Up @@ -379,7 +429,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e
return nil, err
}

writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig))
writer := avro.NewWriter(w, 512, avro.WithWriterConfig(avro.DefaultConfig))
Comment on lines -382 to +432
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix bug #590

buf := &bytes.Buffer{}
e := &Encoder{
writer: writer,
Expand Down Expand Up @@ -420,7 +470,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e
return nil, err
}

writer := avro.NewWriter(w, 512, avro.WithWriterConfig(cfg.EncodingConfig))
writer := avro.NewWriter(w, 512, avro.WithWriterConfig(avro.DefaultConfig))
Comment on lines -423 to +473
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same above

writer.WriteVal(HeaderSchema, header)
if err = writer.Flush(); err != nil {
return nil, err
Expand Down Expand Up @@ -567,14 +617,15 @@ func (e *Encoder) writerBlock() error {
return e.writer.Flush()
}

type ocfHeader struct {
// OCFHeader represents the parsed header of an OCF file.
type OCFHeader struct { //nolint:revive
Schema avro.Schema
Codec Codec
Meta map[string][]byte
Sync [16]byte
}

func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts codecOptions) (*ocfHeader, error) {
func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts codecOptions) (*OCFHeader, error) {
var h Header
reader.ReadVal(HeaderSchema, &h)
if reader.Error != nil {
Expand All @@ -594,7 +645,7 @@ func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache, codecOpts co
return nil, err
}

return &ocfHeader{
return &OCFHeader{
Schema: schema,
Codec: codec,
Meta: h.Meta,
Expand Down
187 changes: 187 additions & 0 deletions ocf/ocf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
"testing"

"github.com/hamba/avro/v2"
Expand Down Expand Up @@ -1274,6 +1277,7 @@ func TestWithSchemaMarshaler(t *testing.T) {

want, err := os.ReadFile("testdata/full-schema.json")
require.NoError(t, err)
want = bytes.ReplaceAll(want, []byte("\r\n"), []byte("\n"))
assert.Equal(t, want, got)
}

Expand Down Expand Up @@ -1313,6 +1317,134 @@ func (*errorHeaderWriter) Write(p []byte) (int, error) {
return 0, errors.New("test")
}

func TestConcurrentDecode(t *testing.T) {
// build an in-memory OCF with many records
unionStr := "union value"
base := FullRecord{
Strings: []string{"s1", "s2"},
Longs: []int64{1, 2},
Enum: "A",
Map: map[string]int{"k": 1},
Nullable: &unionStr,
Fixed: [16]byte{0x01, 0x02, 0x03},
Record: &TestRecord{Long: 1, String: "r", Int: 0, Float: 1.1, Double: 2.2, Bool: true},
}

const total = 200
buf := &bytes.Buffer{}
enc, err := ocf.NewEncoder(schema, buf, ocf.WithBlockLength(10))
require.NoError(t, err)
for i := 0; i < total; i++ {
base.Record.Int = int32(i)
require.NoError(t, enc.Encode(base))
}
require.NoError(t, enc.Close())

// decode header once
data := buf.Bytes()
r0 := avro.NewReader(bytes.NewReader(data), 1024)
hdr, err := ocf.DecodeHeader(r0)
require.NoError(t, err)

// concurrency values to test; caller requirement: configurable concurrency
concs := []int64{1, 2, 3, 5}

// split file into parts by size and let workers align to sync using SkipTo
headerEnd := r0.InputOffset()
for _, conc := range concs {
t.Run(fmt.Sprintf("concurrency=%d", conc), func(t *testing.T) {
totalData := int64(len(data)) - headerEnd
partSize := totalData / int64(conc)

recCh := make(chan FullRecord, total)
var wg sync.WaitGroup

errCh := make(chan error, 1)
sendErr := func(err error) {
select {
case errCh <- err:
default:
}
}

for i := int64(0); i < conc; i++ {
start := headerEnd + i*partSize
end := headerEnd + (i+1)*partSize
if i == conc-1 {
end = int64(len(data))
}

wg.Add(1)
go func(start, end int64) {
defer wg.Done()
r := avro.NewReader(bytes.NewReader(data[start:]), 1024)
skipped := int64(0)
// align to next sync marker unless starting at header end
if start != headerEnd {
n, err := r.SkipTo(hdr.Sync[:])
if err != nil && !errors.Is(err, io.EOF) {
sendErr(err)
return
}
// if SkipTo advanced past our partition end, nothing to do
skipped = int64(n)
if start+skipped >= end {
return
}
}

dec, err := ocf.NewDecoderWithHeader(r, hdr)
if err != nil {
sendErr(err)
return
}
defer dec.Close()

for dec.HasNext() {
var rec FullRecord
if err := dec.Decode(&rec); err != nil {
sendErr(err)
return
}
recCh <- rec
bs := dec.BlockStatus()
if bs.Current > bs.Count {
if start+bs.Offset > end {
return
}
}
}
if err := dec.Error(); err != nil {
sendErr(err)
return
}
}(start, end)
}

go func() { wg.Wait(); close(recCh) }()

var got []int32
for r := range recCh {
got = append(got, r.Record.Int)
}

select {
case e := <-errCh:
if e != nil {
t.Fatalf("worker error: %v", e)
}
default:
}

require.Equal(t, total, len(got), "unexpected number of records read")
sort.Slice(got, func(i, j int) bool { return got[i] < got[j] })
for i := 0; i < total; i++ {
require.Equal(t, int32(i), got[i])
}
})
}
}

// TestEncoder_Reset tests that Reset allows reusing encoder for multiple files.
func TestEncoder_Reset(t *testing.T) {
record1 := FullRecord{
Expand Down Expand Up @@ -1575,3 +1707,58 @@ func TestEncoder_ResetPreservesCodec(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, []byte("deflate"), dec2.Metadata()["avro.codec"])
}

type CustomTagTestObject struct {
StringField string `json:"string_field"`
IntField int `json:"int_field"`
}

func TestCustomTagKey(t *testing.T) {
// Define schema matching the json tags
schemaStr := `{
"type": "record",
"name": "CustomTagTestObject",
"fields": [
{"name": "string_field", "type": "string"},
{"name": "int_field", "type": "int"}
]
}`

// Create a Config with TagKey set to "json"
config := avro.Config{
TagKey: "json",
}.Freeze()

// Create a buffer to write the OCF file to
var buf bytes.Buffer

// Create OCF encoder with custom encoding config
enc, err := ocf.NewEncoder(schemaStr, &buf, ocf.WithEncodingConfig(config))
require.NoError(t, err)

// Data to encode
data := CustomTagTestObject{
StringField: "hello",
IntField: 42,
}

// Encode using the OCF encoder
err = enc.Encode(data)
require.NoError(t, err)

// Close the encoder to flush data
err = enc.Close()
require.NoError(t, err)

// Verify the output by decoding
dec, err := ocf.NewDecoder(&buf, ocf.WithDecoderConfig(config))
require.NoError(t, err)

var result CustomTagTestObject
require.True(t, dec.HasNext())
err = dec.Decode(&result)
require.NoError(t, err)

assert.Equal(t, data.StringField, result.StringField)
assert.Equal(t, data.IntField, result.IntField)
}
Loading