@@ -4,11 +4,13 @@ import (
44 "bufio"
55 "bytes"
66 "context"
7+ "errors"
78 "fmt"
89 "io"
910 "os"
1011 "path/filepath"
1112 "strings"
13+ "sync"
1214 "sync/atomic"
1315 "testing"
1416 "text/tabwriter"
@@ -244,6 +246,77 @@ func TestFollowLogsProducerGone(t *testing.T) {
244246 }
245247}
246248
249+ type lineDecoder struct {
250+ r * bufio.Reader
251+ resetCount int
252+ }
253+
254+ func (d * lineDecoder ) Decode () (* logger.Message , error ) {
255+ line , err := d .r .ReadString ('\n' )
256+ if err != nil {
257+ return nil , err
258+ }
259+ m := logger .NewMessage ()
260+ m .Line = []byte (line )
261+ return m , nil
262+ }
263+
264+ func (d * lineDecoder ) Reset (r io.Reader ) {
265+ d .r = bufio .NewReader (r )
266+ d .resetCount ++
267+ }
268+
269+ func (d * lineDecoder ) Close () {
270+ }
271+
272+ func TestFollowLogsHandleDecodeErr (t * testing.T ) {
273+ lw := logger .NewLogWatcher ()
274+ defer lw .ConsumerGone ()
275+
276+ fw , err := os .CreateTemp ("" , t .Name ())
277+ assert .NilError (t , err )
278+ defer os .Remove (fw .Name ())
279+
280+ fr , err := os .Open (fw .Name ())
281+ assert .NilError (t , err )
282+
283+ dec := & lineDecoder {}
284+ dec .Reset (fr )
285+
286+ var since , until time.Time
287+ rotate := make (chan interface {})
288+ evict := make (chan interface {})
289+
290+ var wg sync.WaitGroup
291+ wg .Add (1 )
292+ go func () {
293+ defer wg .Done ()
294+ followLogs (fr , lw , rotate , evict , dec , since , until )
295+ }()
296+
297+ sendReceive := func (f io.Writer , message string ) {
298+ _ , err = f .Write ([]byte (message ))
299+ assert .NilError (t , err )
300+ m := <- lw .Msg
301+ assert .Equal (t , message , string (m .Line ))
302+ }
303+
304+ sendReceive (fw , "log1\n " )
305+ sendReceive (fw , "log2\n " )
306+
307+ ft , err := os .OpenFile (fw .Name (), os .O_WRONLY | os .O_TRUNC , 0600 )
308+ assert .NilError (t , err )
309+
310+ sendReceive (ft , "log3\n " )
311+
312+ evict <- errors .New ("stop followLogs" )
313+ wg .Wait ()
314+
315+ // followLogs calls Reset() in the beginning,
316+ // each 3 writes result Reset(), then handleDecodeErr() calles Reset().
317+ assert .Equal (t , 5 , dec .resetCount )
318+ }
319+
247320func TestCheckCapacityAndRotate (t * testing.T ) {
248321 dir , err := os .MkdirTemp ("" , t .Name ())
249322 assert .NilError (t , err )
0 commit comments