@@ -43,6 +43,11 @@ func TestWatch_out_of_order(t *testing.T) {
4343 watch , cancel := nDB .Watch ("table1" , "network1" )
4444 defer cancel ()
4545
46+ got := drainChannel (watch .C )
47+ assert .Check (t , is .DeepEqual (got , []events.Event {
48+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "key1" , Value : []byte ("value1" )}),
49+ }))
50+
4651 // Receive events from node1, with events not received or received out of order
4752 // Create, (hidden update), delete
4853 appendTableEvent (4 , TableEventTypeCreate , "key2" , []byte ("a" ))
@@ -69,7 +74,7 @@ func TestWatch_out_of_order(t *testing.T) {
6974 d .NotifyMsg (msgs .Compound ())
7075 msgs .Reset ()
7176
72- got : = drainChannel (watch .C )
77+ got = drainChannel (watch .C )
7378 assert .Check (t , is .DeepEqual (got , []events.Event {
7479 CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "key2" , Value : []byte ("a" )}),
7580 // Delete value should match last observed value,
@@ -87,6 +92,137 @@ func TestWatch_out_of_order(t *testing.T) {
8792 }))
8893}
8994
95+ func TestWatch_filters (t * testing.T ) {
96+ nDB := new (DefaultConfig ())
97+ nDB.networkBroadcasts = & memberlist.TransmitLimitedQueue {}
98+ nDB .nodeBroadcasts = & memberlist.TransmitLimitedQueue {}
99+ assert .Assert (t , nDB .JoinNetwork ("network1" ))
100+ assert .Assert (t , nDB .JoinNetwork ("network2" ))
101+
102+ (& eventDelegate {nDB }).NotifyJoin (& memberlist.Node {
103+ Name : "node1" ,
104+ Addr : net .IPv4 (1 , 2 , 3 , 4 ),
105+ })
106+
107+ var ltime serf.LamportClock
108+ msgs := messageBuffer {t : t }
109+ msgs .Append (MessageTypeNetworkEvent , & NetworkEvent {
110+ Type : NetworkEventTypeJoin ,
111+ LTime : ltime .Increment (),
112+ NodeName : "node1" ,
113+ NetworkID : "network1" ,
114+ })
115+ msgs .Append (MessageTypeNetworkEvent , & NetworkEvent {
116+ Type : NetworkEventTypeJoin ,
117+ LTime : ltime .Increment (),
118+ NodeName : "node1" ,
119+ NetworkID : "network2" ,
120+ })
121+ for _ , nid := range []string {"network1" , "network2" } {
122+ for _ , tname := range []string {"table1" , "table2" } {
123+ msgs .Append (MessageTypeTableEvent , & TableEvent {
124+ Type : TableEventTypeCreate ,
125+ LTime : ltime .Increment (),
126+ NodeName : "node1" ,
127+ NetworkID : nid ,
128+ TableName : tname ,
129+ Key : nid + "." + tname + ".dead" ,
130+ Value : []byte ("deaddead" ),
131+ })
132+ msgs .Append (MessageTypeTableEvent , & TableEvent {
133+ Type : TableEventTypeDelete ,
134+ LTime : ltime .Increment (),
135+ NodeName : "node1" ,
136+ NetworkID : nid ,
137+ TableName : tname ,
138+ Key : nid + "." + tname + ".dead" ,
139+ Value : []byte ("deaddead" ),
140+ })
141+ msgs .Append (MessageTypeTableEvent , & TableEvent {
142+ Type : TableEventTypeCreate ,
143+ LTime : ltime .Increment (),
144+ NodeName : "node1" ,
145+ NetworkID : nid ,
146+ TableName : tname ,
147+ Key : nid + "." + tname + ".update" ,
148+ Value : []byte ("initial" ),
149+ })
150+ msgs .Append (MessageTypeTableEvent , & TableEvent {
151+ Type : TableEventTypeCreate ,
152+ LTime : ltime .Increment (),
153+ NodeName : "node1" ,
154+ NetworkID : nid ,
155+ TableName : tname ,
156+ Key : nid + "." + tname ,
157+ Value : []byte ("a" ),
158+ })
159+ msgs .Append (MessageTypeTableEvent , & TableEvent {
160+ Type : TableEventTypeUpdate ,
161+ LTime : ltime .Increment (),
162+ NodeName : "node1" ,
163+ NetworkID : nid ,
164+ TableName : tname ,
165+ Key : nid + "." + tname + ".update" ,
166+ Value : []byte ("updated" ),
167+ })
168+ }
169+ }
170+ (& delegate {nDB }).NotifyMsg (msgs .Compound ())
171+
172+ watchAll , cancel := nDB .Watch ("" , "" )
173+ defer cancel ()
174+ watchNetwork1Tables , cancel := nDB .Watch ("" , "network1" )
175+ defer cancel ()
176+ watchTable1AllNetworks , cancel := nDB .Watch ("table1" , "" )
177+ defer cancel ()
178+ watchTable1Network1 , cancel := nDB .Watch ("table1" , "network1" )
179+ defer cancel ()
180+
181+ var gotAll , gotNetwork1Tables , gotTable1AllNetworks , gotTable1Network1 []events.Event
182+ L:
183+ for {
184+ select {
185+ case ev := <- watchAll .C :
186+ gotAll = append (gotAll , ev )
187+ case ev := <- watchNetwork1Tables .C :
188+ gotNetwork1Tables = append (gotNetwork1Tables , ev )
189+ case ev := <- watchTable1AllNetworks .C :
190+ gotTable1AllNetworks = append (gotTable1AllNetworks , ev )
191+ case ev := <- watchTable1Network1 .C :
192+ gotTable1Network1 = append (gotTable1Network1 , ev )
193+ case <- time .After (time .Second ):
194+ break L
195+ }
196+ }
197+
198+ assert .Check (t , is .DeepEqual (gotAll , []events.Event {
199+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "network1.table1" , Value : []byte ("a" )}),
200+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "network1.table1.update" , Value : []byte ("updated" )}),
201+ CreateEvent (event {Table : "table2" , NetworkID : "network1" , Key : "network1.table2" , Value : []byte ("a" )}),
202+ CreateEvent (event {Table : "table2" , NetworkID : "network1" , Key : "network1.table2.update" , Value : []byte ("updated" )}),
203+ CreateEvent (event {Table : "table1" , NetworkID : "network2" , Key : "network2.table1" , Value : []byte ("a" )}),
204+ CreateEvent (event {Table : "table1" , NetworkID : "network2" , Key : "network2.table1.update" , Value : []byte ("updated" )}),
205+ CreateEvent (event {Table : "table2" , NetworkID : "network2" , Key : "network2.table2" , Value : []byte ("a" )}),
206+ CreateEvent (event {Table : "table2" , NetworkID : "network2" , Key : "network2.table2.update" , Value : []byte ("updated" )}),
207+ }))
208+ assert .Check (t , is .DeepEqual (gotNetwork1Tables , []events.Event {
209+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "network1.table1" , Value : []byte ("a" )}),
210+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "network1.table1.update" , Value : []byte ("updated" )}),
211+ CreateEvent (event {Table : "table2" , NetworkID : "network1" , Key : "network1.table2" , Value : []byte ("a" )}),
212+ CreateEvent (event {Table : "table2" , NetworkID : "network1" , Key : "network1.table2.update" , Value : []byte ("updated" )}),
213+ }))
214+ assert .Check (t , is .DeepEqual (gotTable1AllNetworks , []events.Event {
215+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "network1.table1" , Value : []byte ("a" )}),
216+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "network1.table1.update" , Value : []byte ("updated" )}),
217+ CreateEvent (event {Table : "table1" , NetworkID : "network2" , Key : "network2.table1" , Value : []byte ("a" )}),
218+ CreateEvent (event {Table : "table1" , NetworkID : "network2" , Key : "network2.table1.update" , Value : []byte ("updated" )}),
219+ }))
220+ assert .Check (t , is .DeepEqual (gotTable1Network1 , []events.Event {
221+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "network1.table1" , Value : []byte ("a" )}),
222+ CreateEvent (event {Table : "table1" , NetworkID : "network1" , Key : "network1.table1.update" , Value : []byte ("updated" )}),
223+ }))
224+ }
225+
90226func drainChannel (ch <- chan events.Event ) []events.Event {
91227 var events []events.Event
92228 for {
0 commit comments