Skip to content

Commit 2bffcc1

Browse files
authored
support reseting encoder to reduce load on GC. (hamba#587)
1 parent 97d31ef commit 2bffcc1

File tree

3 files changed

+305
-0
lines changed

3 files changed

+305
-0
lines changed

encoder.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ func (e *Encoder) Encode(v any) error {
3131
return e.w.Error
3232
}
3333

34+
// Reset resets the encoder to write to a new io.Writer.
35+
func (e *Encoder) Reset(w io.Writer) {
36+
e.w.Reset(w)
37+
}
38+
3439
// Marshal returns the Avro encoding of v.
3540
func Marshal(schema Schema, v any) ([]byte, error) {
3641
return DefaultConfig.Marshal(schema, v)

ocf/ocf.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,9 @@ type Encoder struct {
333333
blockLength int
334334
count int
335335
blockSize int
336+
337+
// Stored for Reset.
338+
header Header
336339
}
337340

338341
// NewEncoder returns a new encoder that writes to w using schema s.
@@ -386,6 +389,11 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e
386389
codec: h.Codec,
387390
blockLength: cfg.BlockLength,
388391
blockSize: cfg.BlockSize,
392+
header: Header{
393+
Magic: magicBytes,
394+
Meta: h.Meta,
395+
Sync: h.Sync,
396+
},
389397
}
390398
return e, nil
391399
}
@@ -427,6 +435,7 @@ func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, e
427435
codec: codec,
428436
blockLength: cfg.BlockLength,
429437
blockSize: cfg.BlockSize,
438+
header: header,
430439
}
431440
return e, nil
432441
}
@@ -515,6 +524,34 @@ func (e *Encoder) Close() error {
515524
return err
516525
}
517526

527+
// Reset flushes any pending data, resets the encoder to write to a new io.Writer,
528+
// and writes a fresh header with a new sync marker. The schema, codec, and other
529+
// settings are preserved from the original encoder.
530+
// This allows reusing the encoder for multiple files without reallocating buffers.
531+
func (e *Encoder) Reset(w io.Writer) error {
532+
if err := e.Flush(); err != nil {
533+
return err
534+
}
535+
536+
// Generate new sync marker for the new file.
537+
_, _ = rand.Read(e.header.Sync[:])
538+
e.sync = e.header.Sync
539+
540+
// Reset writer to new output and write header.
541+
e.writer.Reset(w)
542+
e.writer.WriteVal(HeaderSchema, e.header)
543+
if err := e.writer.Flush(); err != nil {
544+
return err
545+
}
546+
547+
// Reset buffer and encoder.
548+
e.buf.Reset()
549+
e.encoder.Reset(e.buf)
550+
e.count = 0
551+
552+
return nil
553+
}
554+
518555
func (e *Encoder) writerBlock() error {
519556
e.writer.WriteLong(int64(e.count))
520557

ocf/ocf_test.go

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,3 +1312,266 @@ type errorHeaderWriter struct{}
13121312
func (*errorHeaderWriter) Write(p []byte) (int, error) {
13131313
return 0, errors.New("test")
13141314
}
1315+
1316+
// TestEncoder_Reset tests that Reset allows reusing encoder for multiple files.
1317+
func TestEncoder_Reset(t *testing.T) {
1318+
record1 := FullRecord{
1319+
Strings: []string{"first", "record"},
1320+
Longs: []int64{},
1321+
Enum: "A",
1322+
Map: map[string]int{},
1323+
Record: &TestRecord{Long: 1},
1324+
}
1325+
record2 := FullRecord{
1326+
Strings: []string{"second", "record"},
1327+
Longs: []int64{},
1328+
Enum: "B",
1329+
Map: map[string]int{},
1330+
Record: &TestRecord{Long: 2},
1331+
}
1332+
1333+
// Create first file
1334+
buf1 := &bytes.Buffer{}
1335+
enc, err := ocf.NewEncoder(schema, buf1)
1336+
require.NoError(t, err)
1337+
1338+
err = enc.Encode(record1)
1339+
require.NoError(t, err)
1340+
1341+
err = enc.Close()
1342+
require.NoError(t, err)
1343+
1344+
// Reset to write to second file
1345+
buf2 := &bytes.Buffer{}
1346+
err = enc.Reset(buf2)
1347+
require.NoError(t, err)
1348+
1349+
err = enc.Encode(record2)
1350+
require.NoError(t, err)
1351+
1352+
err = enc.Close()
1353+
require.NoError(t, err)
1354+
1355+
// Verify first file
1356+
dec1, err := ocf.NewDecoder(buf1)
1357+
require.NoError(t, err)
1358+
1359+
require.True(t, dec1.HasNext())
1360+
var got1 FullRecord
1361+
err = dec1.Decode(&got1)
1362+
require.NoError(t, err)
1363+
assert.Equal(t, record1, got1)
1364+
require.False(t, dec1.HasNext())
1365+
1366+
// Verify second file
1367+
dec2, err := ocf.NewDecoder(buf2)
1368+
require.NoError(t, err)
1369+
1370+
require.True(t, dec2.HasNext())
1371+
var got2 FullRecord
1372+
err = dec2.Decode(&got2)
1373+
require.NoError(t, err)
1374+
assert.Equal(t, record2, got2)
1375+
require.False(t, dec2.HasNext())
1376+
}
1377+
1378+
// TestEncoder_ResetWithPendingData tests Reset flushes pending data.
1379+
func TestEncoder_ResetWithPendingData(t *testing.T) {
1380+
buf1 := &bytes.Buffer{}
1381+
enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithBlockLength(10))
1382+
require.NoError(t, err)
1383+
1384+
// Write data but don't close (pending data)
1385+
err = enc.Encode(int64(42))
1386+
require.NoError(t, err)
1387+
1388+
// Reset should flush the pending data
1389+
buf2 := &bytes.Buffer{}
1390+
err = enc.Reset(buf2)
1391+
require.NoError(t, err)
1392+
1393+
// Verify first file has the data
1394+
dec1, err := ocf.NewDecoder(buf1)
1395+
require.NoError(t, err)
1396+
1397+
require.True(t, dec1.HasNext())
1398+
var got int64
1399+
err = dec1.Decode(&got)
1400+
require.NoError(t, err)
1401+
assert.Equal(t, int64(42), got)
1402+
}
1403+
1404+
// TestEncoder_ResetGeneratesNewSyncMarker tests that each reset creates a new sync marker.
1405+
func TestEncoder_ResetGeneratesNewSyncMarker(t *testing.T) {
1406+
buf1 := &bytes.Buffer{}
1407+
enc, err := ocf.NewEncoder(`"long"`, buf1)
1408+
require.NoError(t, err)
1409+
1410+
err = enc.Encode(int64(1))
1411+
require.NoError(t, err)
1412+
err = enc.Close()
1413+
require.NoError(t, err)
1414+
1415+
// Get sync marker from first file
1416+
dec1, err := ocf.NewDecoder(bytes.NewReader(buf1.Bytes()))
1417+
require.NoError(t, err)
1418+
1419+
reader1 := avro.NewReader(bytes.NewReader(buf1.Bytes()), 1024)
1420+
var h1 ocf.Header
1421+
reader1.ReadVal(ocf.HeaderSchema, &h1)
1422+
require.NoError(t, reader1.Error)
1423+
sync1 := h1.Sync
1424+
1425+
// Reset to second buffer
1426+
buf2 := &bytes.Buffer{}
1427+
err = enc.Reset(buf2)
1428+
require.NoError(t, err)
1429+
1430+
err = enc.Encode(int64(2))
1431+
require.NoError(t, err)
1432+
err = enc.Close()
1433+
require.NoError(t, err)
1434+
1435+
// Get sync marker from second file
1436+
reader2 := avro.NewReader(bytes.NewReader(buf2.Bytes()), 1024)
1437+
var h2 ocf.Header
1438+
reader2.ReadVal(ocf.HeaderSchema, &h2)
1439+
require.NoError(t, reader2.Error)
1440+
sync2 := h2.Sync
1441+
1442+
// Sync markers should be different
1443+
assert.NotEqual(t, sync1, sync2, "each file should have a unique sync marker")
1444+
1445+
// But both files should be readable
1446+
_ = dec1
1447+
dec2, err := ocf.NewDecoder(buf2)
1448+
require.NoError(t, err)
1449+
require.True(t, dec2.HasNext())
1450+
}
1451+
1452+
// TestEncoder_ResetMultipleTimes tests multiple sequential resets.
1453+
func TestEncoder_ResetMultipleTimes(t *testing.T) {
1454+
buffers := make([]*bytes.Buffer, 5)
1455+
for i := range buffers {
1456+
buffers[i] = &bytes.Buffer{}
1457+
}
1458+
1459+
enc, err := ocf.NewEncoder(`"long"`, buffers[0])
1460+
require.NoError(t, err)
1461+
1462+
for i := 0; i < 5; i++ {
1463+
if i > 0 {
1464+
err = enc.Reset(buffers[i])
1465+
require.NoError(t, err)
1466+
}
1467+
1468+
err = enc.Encode(int64(i * 10))
1469+
require.NoError(t, err)
1470+
1471+
err = enc.Close()
1472+
require.NoError(t, err)
1473+
}
1474+
1475+
// Verify all files
1476+
for i := 0; i < 5; i++ {
1477+
dec, err := ocf.NewDecoder(buffers[i])
1478+
require.NoError(t, err, "file %d", i)
1479+
1480+
require.True(t, dec.HasNext(), "file %d", i)
1481+
var got int64
1482+
err = dec.Decode(&got)
1483+
require.NoError(t, err, "file %d", i)
1484+
assert.Equal(t, int64(i*10), got, "file %d", i)
1485+
}
1486+
}
1487+
1488+
// TestEncoder_AppendToExistingFile tests appending records to an existing OCF file.
1489+
func TestEncoder_AppendToExistingFile(t *testing.T) {
1490+
type SimpleRecord struct {
1491+
Name string `avro:"name"`
1492+
ID int64 `avro:"id"`
1493+
}
1494+
simpleSchema := `{"type":"record","name":"SimpleRecord","fields":[{"name":"name","type":"string"},{"name":"id","type":"long"}]}`
1495+
1496+
record1 := SimpleRecord{Name: "first", ID: 1}
1497+
record2 := SimpleRecord{Name: "second", ID: 2}
1498+
1499+
tmpFile, err := os.CreateTemp("", "append-test-*.avro")
1500+
require.NoError(t, err)
1501+
tmpName := tmpFile.Name()
1502+
t.Cleanup(func() { _ = os.Remove(tmpName) })
1503+
1504+
// Write first record
1505+
enc, err := ocf.NewEncoder(simpleSchema, tmpFile)
1506+
require.NoError(t, err)
1507+
err = enc.Encode(record1)
1508+
require.NoError(t, err)
1509+
err = enc.Close()
1510+
require.NoError(t, err)
1511+
err = tmpFile.Close()
1512+
require.NoError(t, err)
1513+
1514+
// Reopen file and append second record
1515+
file, err := os.OpenFile(tmpName, os.O_RDWR, 0o644)
1516+
require.NoError(t, err)
1517+
1518+
enc2, err := ocf.NewEncoder(simpleSchema, file)
1519+
require.NoError(t, err)
1520+
err = enc2.Encode(record2)
1521+
require.NoError(t, err)
1522+
err = enc2.Close()
1523+
require.NoError(t, err)
1524+
err = file.Close()
1525+
require.NoError(t, err)
1526+
1527+
// Read back and verify both records
1528+
file, err = os.Open(tmpName)
1529+
require.NoError(t, err)
1530+
defer file.Close()
1531+
1532+
dec, err := ocf.NewDecoder(file)
1533+
require.NoError(t, err)
1534+
1535+
var records []SimpleRecord
1536+
for dec.HasNext() {
1537+
var r SimpleRecord
1538+
err = dec.Decode(&r)
1539+
require.NoError(t, err)
1540+
records = append(records, r)
1541+
}
1542+
require.NoError(t, dec.Error())
1543+
1544+
require.Len(t, records, 2)
1545+
assert.Equal(t, record1, records[0])
1546+
assert.Equal(t, record2, records[1])
1547+
}
1548+
1549+
// TestEncoder_ResetPreservesCodec tests that codec is preserved across reset.
1550+
func TestEncoder_ResetPreservesCodec(t *testing.T) {
1551+
buf1 := &bytes.Buffer{}
1552+
enc, err := ocf.NewEncoder(`"long"`, buf1, ocf.WithCodec(ocf.Deflate))
1553+
require.NoError(t, err)
1554+
1555+
err = enc.Encode(int64(1))
1556+
require.NoError(t, err)
1557+
err = enc.Close()
1558+
require.NoError(t, err)
1559+
1560+
buf2 := &bytes.Buffer{}
1561+
err = enc.Reset(buf2)
1562+
require.NoError(t, err)
1563+
1564+
err = enc.Encode(int64(2))
1565+
require.NoError(t, err)
1566+
err = enc.Close()
1567+
require.NoError(t, err)
1568+
1569+
// Both files should use deflate codec
1570+
dec1, err := ocf.NewDecoder(buf1)
1571+
require.NoError(t, err)
1572+
assert.Equal(t, []byte("deflate"), dec1.Metadata()["avro.codec"])
1573+
1574+
dec2, err := ocf.NewDecoder(buf2)
1575+
require.NoError(t, err)
1576+
assert.Equal(t, []byte("deflate"), dec2.Metadata()["avro.codec"])
1577+
}

0 commit comments

Comments
 (0)