@@ -1312,3 +1312,266 @@ type errorHeaderWriter struct{}
13121312func (* errorHeaderWriter ) Write (p []byte ) (int , error ) {
13131313 return 0 , errors .New ("test" )
13141314}
1315+
1316+ // TestEncoder_Reset tests that Reset allows reusing encoder for multiple files.
1317+ func TestEncoder_Reset (t * testing.T ) {
1318+ record1 := FullRecord {
1319+ Strings : []string {"first" , "record" },
1320+ Longs : []int64 {},
1321+ Enum : "A" ,
1322+ Map : map [string ]int {},
1323+ Record : & TestRecord {Long : 1 },
1324+ }
1325+ record2 := FullRecord {
1326+ Strings : []string {"second" , "record" },
1327+ Longs : []int64 {},
1328+ Enum : "B" ,
1329+ Map : map [string ]int {},
1330+ Record : & TestRecord {Long : 2 },
1331+ }
1332+
1333+ // Create first file
1334+ buf1 := & bytes.Buffer {}
1335+ enc , err := ocf .NewEncoder (schema , buf1 )
1336+ require .NoError (t , err )
1337+
1338+ err = enc .Encode (record1 )
1339+ require .NoError (t , err )
1340+
1341+ err = enc .Close ()
1342+ require .NoError (t , err )
1343+
1344+ // Reset to write to second file
1345+ buf2 := & bytes.Buffer {}
1346+ err = enc .Reset (buf2 )
1347+ require .NoError (t , err )
1348+
1349+ err = enc .Encode (record2 )
1350+ require .NoError (t , err )
1351+
1352+ err = enc .Close ()
1353+ require .NoError (t , err )
1354+
1355+ // Verify first file
1356+ dec1 , err := ocf .NewDecoder (buf1 )
1357+ require .NoError (t , err )
1358+
1359+ require .True (t , dec1 .HasNext ())
1360+ var got1 FullRecord
1361+ err = dec1 .Decode (& got1 )
1362+ require .NoError (t , err )
1363+ assert .Equal (t , record1 , got1 )
1364+ require .False (t , dec1 .HasNext ())
1365+
1366+ // Verify second file
1367+ dec2 , err := ocf .NewDecoder (buf2 )
1368+ require .NoError (t , err )
1369+
1370+ require .True (t , dec2 .HasNext ())
1371+ var got2 FullRecord
1372+ err = dec2 .Decode (& got2 )
1373+ require .NoError (t , err )
1374+ assert .Equal (t , record2 , got2 )
1375+ require .False (t , dec2 .HasNext ())
1376+ }
1377+
1378+ // TestEncoder_ResetWithPendingData tests Reset flushes pending data.
1379+ func TestEncoder_ResetWithPendingData (t * testing.T ) {
1380+ buf1 := & bytes.Buffer {}
1381+ enc , err := ocf .NewEncoder (`"long"` , buf1 , ocf .WithBlockLength (10 ))
1382+ require .NoError (t , err )
1383+
1384+ // Write data but don't close (pending data)
1385+ err = enc .Encode (int64 (42 ))
1386+ require .NoError (t , err )
1387+
1388+ // Reset should flush the pending data
1389+ buf2 := & bytes.Buffer {}
1390+ err = enc .Reset (buf2 )
1391+ require .NoError (t , err )
1392+
1393+ // Verify first file has the data
1394+ dec1 , err := ocf .NewDecoder (buf1 )
1395+ require .NoError (t , err )
1396+
1397+ require .True (t , dec1 .HasNext ())
1398+ var got int64
1399+ err = dec1 .Decode (& got )
1400+ require .NoError (t , err )
1401+ assert .Equal (t , int64 (42 ), got )
1402+ }
1403+
1404+ // TestEncoder_ResetGeneratesNewSyncMarker tests that each reset creates a new sync marker.
1405+ func TestEncoder_ResetGeneratesNewSyncMarker (t * testing.T ) {
1406+ buf1 := & bytes.Buffer {}
1407+ enc , err := ocf .NewEncoder (`"long"` , buf1 )
1408+ require .NoError (t , err )
1409+
1410+ err = enc .Encode (int64 (1 ))
1411+ require .NoError (t , err )
1412+ err = enc .Close ()
1413+ require .NoError (t , err )
1414+
1415+ // Get sync marker from first file
1416+ dec1 , err := ocf .NewDecoder (bytes .NewReader (buf1 .Bytes ()))
1417+ require .NoError (t , err )
1418+
1419+ reader1 := avro .NewReader (bytes .NewReader (buf1 .Bytes ()), 1024 )
1420+ var h1 ocf.Header
1421+ reader1 .ReadVal (ocf .HeaderSchema , & h1 )
1422+ require .NoError (t , reader1 .Error )
1423+ sync1 := h1 .Sync
1424+
1425+ // Reset to second buffer
1426+ buf2 := & bytes.Buffer {}
1427+ err = enc .Reset (buf2 )
1428+ require .NoError (t , err )
1429+
1430+ err = enc .Encode (int64 (2 ))
1431+ require .NoError (t , err )
1432+ err = enc .Close ()
1433+ require .NoError (t , err )
1434+
1435+ // Get sync marker from second file
1436+ reader2 := avro .NewReader (bytes .NewReader (buf2 .Bytes ()), 1024 )
1437+ var h2 ocf.Header
1438+ reader2 .ReadVal (ocf .HeaderSchema , & h2 )
1439+ require .NoError (t , reader2 .Error )
1440+ sync2 := h2 .Sync
1441+
1442+ // Sync markers should be different
1443+ assert .NotEqual (t , sync1 , sync2 , "each file should have a unique sync marker" )
1444+
1445+ // But both files should be readable
1446+ _ = dec1
1447+ dec2 , err := ocf .NewDecoder (buf2 )
1448+ require .NoError (t , err )
1449+ require .True (t , dec2 .HasNext ())
1450+ }
1451+
1452+ // TestEncoder_ResetMultipleTimes tests multiple sequential resets.
1453+ func TestEncoder_ResetMultipleTimes (t * testing.T ) {
1454+ buffers := make ([]* bytes.Buffer , 5 )
1455+ for i := range buffers {
1456+ buffers [i ] = & bytes.Buffer {}
1457+ }
1458+
1459+ enc , err := ocf .NewEncoder (`"long"` , buffers [0 ])
1460+ require .NoError (t , err )
1461+
1462+ for i := 0 ; i < 5 ; i ++ {
1463+ if i > 0 {
1464+ err = enc .Reset (buffers [i ])
1465+ require .NoError (t , err )
1466+ }
1467+
1468+ err = enc .Encode (int64 (i * 10 ))
1469+ require .NoError (t , err )
1470+
1471+ err = enc .Close ()
1472+ require .NoError (t , err )
1473+ }
1474+
1475+ // Verify all files
1476+ for i := 0 ; i < 5 ; i ++ {
1477+ dec , err := ocf .NewDecoder (buffers [i ])
1478+ require .NoError (t , err , "file %d" , i )
1479+
1480+ require .True (t , dec .HasNext (), "file %d" , i )
1481+ var got int64
1482+ err = dec .Decode (& got )
1483+ require .NoError (t , err , "file %d" , i )
1484+ assert .Equal (t , int64 (i * 10 ), got , "file %d" , i )
1485+ }
1486+ }
1487+
1488+ // TestEncoder_AppendToExistingFile tests appending records to an existing OCF file.
1489+ func TestEncoder_AppendToExistingFile (t * testing.T ) {
1490+ type SimpleRecord struct {
1491+ Name string `avro:"name"`
1492+ ID int64 `avro:"id"`
1493+ }
1494+ simpleSchema := `{"type":"record","name":"SimpleRecord","fields":[{"name":"name","type":"string"},{"name":"id","type":"long"}]}`
1495+
1496+ record1 := SimpleRecord {Name : "first" , ID : 1 }
1497+ record2 := SimpleRecord {Name : "second" , ID : 2 }
1498+
1499+ tmpFile , err := os .CreateTemp ("" , "append-test-*.avro" )
1500+ require .NoError (t , err )
1501+ tmpName := tmpFile .Name ()
1502+ t .Cleanup (func () { _ = os .Remove (tmpName ) })
1503+
1504+ // Write first record
1505+ enc , err := ocf .NewEncoder (simpleSchema , tmpFile )
1506+ require .NoError (t , err )
1507+ err = enc .Encode (record1 )
1508+ require .NoError (t , err )
1509+ err = enc .Close ()
1510+ require .NoError (t , err )
1511+ err = tmpFile .Close ()
1512+ require .NoError (t , err )
1513+
1514+ // Reopen file and append second record
1515+ file , err := os .OpenFile (tmpName , os .O_RDWR , 0o644 )
1516+ require .NoError (t , err )
1517+
1518+ enc2 , err := ocf .NewEncoder (simpleSchema , file )
1519+ require .NoError (t , err )
1520+ err = enc2 .Encode (record2 )
1521+ require .NoError (t , err )
1522+ err = enc2 .Close ()
1523+ require .NoError (t , err )
1524+ err = file .Close ()
1525+ require .NoError (t , err )
1526+
1527+ // Read back and verify both records
1528+ file , err = os .Open (tmpName )
1529+ require .NoError (t , err )
1530+ defer file .Close ()
1531+
1532+ dec , err := ocf .NewDecoder (file )
1533+ require .NoError (t , err )
1534+
1535+ var records []SimpleRecord
1536+ for dec .HasNext () {
1537+ var r SimpleRecord
1538+ err = dec .Decode (& r )
1539+ require .NoError (t , err )
1540+ records = append (records , r )
1541+ }
1542+ require .NoError (t , dec .Error ())
1543+
1544+ require .Len (t , records , 2 )
1545+ assert .Equal (t , record1 , records [0 ])
1546+ assert .Equal (t , record2 , records [1 ])
1547+ }
1548+
1549+ // TestEncoder_ResetPreservesCodec tests that codec is preserved across reset.
1550+ func TestEncoder_ResetPreservesCodec (t * testing.T ) {
1551+ buf1 := & bytes.Buffer {}
1552+ enc , err := ocf .NewEncoder (`"long"` , buf1 , ocf .WithCodec (ocf .Deflate ))
1553+ require .NoError (t , err )
1554+
1555+ err = enc .Encode (int64 (1 ))
1556+ require .NoError (t , err )
1557+ err = enc .Close ()
1558+ require .NoError (t , err )
1559+
1560+ buf2 := & bytes.Buffer {}
1561+ err = enc .Reset (buf2 )
1562+ require .NoError (t , err )
1563+
1564+ err = enc .Encode (int64 (2 ))
1565+ require .NoError (t , err )
1566+ err = enc .Close ()
1567+ require .NoError (t , err )
1568+
1569+ // Both files should use deflate codec
1570+ dec1 , err := ocf .NewDecoder (buf1 )
1571+ require .NoError (t , err )
1572+ assert .Equal (t , []byte ("deflate" ), dec1 .Metadata ()["avro.codec" ])
1573+
1574+ dec2 , err := ocf .NewDecoder (buf2 )
1575+ require .NoError (t , err )
1576+ assert .Equal (t , []byte ("deflate" ), dec2 .Metadata ()["avro.codec" ])
1577+ }
0 commit comments