@@ -126,16 +126,184 @@ void L3MulticastManager::enqueue(const std::string& table_name,
126126}
127127
128128ReturnCode L3MulticastManager::drain () {
129- // Have it return success to avoid problems in p4orch unit tests.
130- return ReturnCode (StatusCode::SWSS_RC_SUCCESS)
131- << " L3MulticastManager::drain is not implemented" ;
129+ SWSS_LOG_ENTER ();
130+ // This manager subscribes to two tables. We will drain pending entries
131+ // based on the table. A table-specific drain function will process entries
132+ // associated with each table.
133+ std::string prev_table;
134+
135+ // Pending tuples (unverified) to process for a given table.
136+ std::deque<swss::KeyOpFieldsValuesTuple> tuple_list;
137+ ReturnCode status;
138+
139+ while (!m_entries.empty ()) {
140+ auto key_op_fvs_tuple = m_entries.front ();
141+ m_entries.pop_front ();
142+ std::string table_name;
143+ std::string key;
144+ parseP4RTKey (kfvKey (key_op_fvs_tuple), &table_name, &key);
145+
146+ if (prev_table == " " ) {
147+ prev_table = table_name;
148+ }
149+
150+ // We have moved on to a different table, so drain the previous entries.
151+ if (table_name != prev_table) {
152+ if (prev_table == APP_P4RT_MULTICAST_ROUTER_INTERFACE_TABLE_NAME) {
153+ // This drain function will drain unexecuted entries upon failure.
154+ status = drainMulticastRouterInterfaceEntries (tuple_list);
155+ } else if (prev_table == APP_P4RT_REPLICATION_L2_MULTICAST_TABLE_NAME) {
156+ // This drain function will drain unexecuted entries upon failure.
157+ status = drainMulticastReplicationEntries (tuple_list);
158+ } else {
159+ status = ReturnCode (StatusCode::SWSS_RC_NOT_EXECUTED)
160+ << " Unexpected table " << QuotedVar (prev_table);
161+ // Drain tuples associated with unknown table as not executed.
162+ drainMgmtWithNotExecuted (tuple_list, m_publisher);
163+ }
164+ prev_table = table_name;
165+ }
166+ if (!status.ok ()) {
167+ // The entry we popped has not been processed yet.
168+ // Return SWSS_RC_NOT_EXECUTED.
169+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (key_op_fvs_tuple),
170+ kfvFieldsValues (key_op_fvs_tuple),
171+ ReturnCode (StatusCode::SWSS_RC_NOT_EXECUTED),
172+ /* replace=*/ true );
173+ break ;
174+ } else {
175+ tuple_list.push_back (key_op_fvs_tuple);
176+ }
177+ } // while
178+
179+ // If no failure, process any pending entries associated with the table.
180+ if (status.ok () && !tuple_list.empty ()) {
181+ if (prev_table == APP_P4RT_MULTICAST_ROUTER_INTERFACE_TABLE_NAME) {
182+ status = drainMulticastRouterInterfaceEntries (tuple_list);
183+ } else if (prev_table == APP_P4RT_REPLICATION_L2_MULTICAST_TABLE_NAME) {
184+ status = drainMulticastReplicationEntries (tuple_list);
185+ } else {
186+ status = ReturnCode (StatusCode::SWSS_RC_NOT_EXECUTED)
187+ << " Unexpected table " << QuotedVar (prev_table);
188+ // Drain tuples associated with unknown table as not executed.
189+ drainMgmtWithNotExecuted (tuple_list, m_publisher);
190+ }
191+ }
192+ drainWithNotExecuted (); // drain the main queue
193+ return status;
194+ }
195+
196+ // Drain entries associated with the multicast router interface table.
197+ ReturnCode L3MulticastManager::drainMulticastRouterInterfaceEntries (
198+ std::deque<swss::KeyOpFieldsValuesTuple>& router_interface_tuples) {
199+ SWSS_LOG_ENTER ();
200+
201+ ReturnCode status;
202+ std::vector<P4MulticastRouterInterfaceEntry>
203+ multicast_router_interface_entry_list;
204+ std::deque<swss::KeyOpFieldsValuesTuple> tuple_list;
205+
206+ std::string prev_op;
207+ bool prev_update = false ;
208+
209+ while (!router_interface_tuples.empty ()) {
210+ auto key_op_fvs_tuple = router_interface_tuples.front ();
211+ router_interface_tuples.pop_front ();
212+ std::string table_name;
213+ std::string key;
214+ parseP4RTKey (kfvKey (key_op_fvs_tuple), &table_name, &key);
215+ const std::vector<swss::FieldValueTuple>& attributes =
216+ kfvFieldsValues (key_op_fvs_tuple);
217+
218+ // Form entry object
219+ auto router_interface_entry_or =
220+ deserializeMulticastRouterInterfaceEntry (key, attributes);
221+
222+ if (!router_interface_entry_or.ok ()) {
223+ status = router_interface_entry_or.status ();
224+ SWSS_LOG_ERROR (" Unable to deserialize APP DB entry with key %s: %s" ,
225+ QuotedVar (table_name + " :" + key).c_str (),
226+ status.message ().c_str ());
227+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (key_op_fvs_tuple),
228+ kfvFieldsValues (key_op_fvs_tuple), status,
229+ /* replace=*/ true );
230+ break ;
231+ }
232+ auto & router_interface_entry = *router_interface_entry_or;
233+
234+ // Validate entry
235+ const std::string& operation = kfvOp (key_op_fvs_tuple);
236+ status = validateMulticastRouterInterfaceEntry (router_interface_entry,
237+ operation);
238+ if (!status.ok ()) {
239+ SWSS_LOG_ERROR (
240+ " Validation failed for router interface APP DB entry with key %s: %s" ,
241+ QuotedVar (table_name + " :" + key).c_str (), status.message ().c_str ());
242+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (key_op_fvs_tuple),
243+ kfvFieldsValues (key_op_fvs_tuple), status,
244+ /* replace=*/ true );
245+ break ;
246+ }
247+
248+ // Now, start processing batch of entries.
249+ auto * router_interface_entry_ptr = getMulticastRouterInterfaceEntry (
250+ router_interface_entry.multicast_router_interface_entry_key );
251+ bool update = router_interface_entry_ptr != nullptr ;
252+
253+ if (prev_op == " " ) {
254+ prev_op = operation;
255+ prev_update = update;
256+ }
257+ // Process the entries if the operation type changes.
258+ if (operation != prev_op || update != prev_update) {
259+ status = processMulticastRouterInterfaceEntries (
260+ multicast_router_interface_entry_list, tuple_list, prev_op,
261+ prev_update);
262+ multicast_router_interface_entry_list.clear ();
263+ tuple_list.clear ();
264+ prev_op = operation;
265+ prev_update = update;
266+ }
267+
268+ if (!status.ok ()) {
269+ // Return SWSS_RC_NOT_EXECUTED if failure has occured.
270+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (key_op_fvs_tuple),
271+ kfvFieldsValues (key_op_fvs_tuple),
272+ ReturnCode (StatusCode::SWSS_RC_NOT_EXECUTED),
273+ /* replace=*/ true );
274+ break ;
275+ } else {
276+ multicast_router_interface_entry_list.push_back (router_interface_entry);
277+ tuple_list.push_back (key_op_fvs_tuple);
278+ }
279+ } // while
280+
281+ // Process any pending entries.
282+ if (!multicast_router_interface_entry_list.empty ()) {
283+ auto rc = processMulticastRouterInterfaceEntries (
284+ multicast_router_interface_entry_list, tuple_list, prev_op,
285+ prev_update);
286+ if (!rc.ok ()) {
287+ status = rc;
288+ }
289+ }
290+ drainMgmtWithNotExecuted (router_interface_tuples, m_publisher);
291+ return status;
292+ }
293+
294+ // Drain entries associated with the multicast replication table, and those
295+ // only.
296+ ReturnCode L3MulticastManager::drainMulticastReplicationEntries (
297+ std::deque<swss::KeyOpFieldsValuesTuple>& replication_tuples) {
298+ return ReturnCode (StatusCode::SWSS_RC_UNIMPLEMENTED)
299+ << " L3MulticastManager::drainMulticastReplicationEntries is not "
300+ << " implemented yet" ;
132301}
133302
134303ReturnCodeOr<P4MulticastRouterInterfaceEntry>
135304L3MulticastManager::deserializeMulticastRouterInterfaceEntry (
136305 const std::string& key,
137- const std::vector<swss::FieldValueTuple>& attributes,
138- const std::string& table_name) {
306+ const std::vector<swss::FieldValueTuple>& attributes) {
139307 SWSS_LOG_ENTER ();
140308
141309 P4MulticastRouterInterfaceEntry router_interface_entry = {};
@@ -162,15 +330,16 @@ L3MulticastManager::deserializeMulticastRouterInterfaceEntry(
162330 if (value != p4orch::kSetSrcMac ) {
163331 return ReturnCode (StatusCode::SWSS_RC_INVALID_PARAM)
164332 << " Unexpected action " << QuotedVar (value) << " in "
165- << table_name ;
333+ << APP_P4RT_MULTICAST_ROUTER_INTERFACE_TABLE_NAME ;
166334 }
167335 } else if (field == prependParamField (p4orch::kSrcMac )) {
168336 router_interface_entry.src_mac = swss::MacAddress (value);
169337 } else if (field == prependParamField (p4orch::kMulticastMetadata )) {
170338 router_interface_entry.multicast_metadata = value;
171339 } else if (field != p4orch::kControllerMetadata ) {
172340 return ReturnCode (StatusCode::SWSS_RC_INVALID_PARAM)
173- << " Unexpected field " << QuotedVar (field) << " in " << table_name;
341+ << " Unexpected field " << QuotedVar (field) << " in "
342+ << APP_P4RT_MULTICAST_ROUTER_INTERFACE_TABLE_NAME;
174343 }
175344 }
176345 return router_interface_entry;
@@ -179,8 +348,7 @@ L3MulticastManager::deserializeMulticastRouterInterfaceEntry(
179348ReturnCodeOr<P4MulticastReplicationEntry>
180349L3MulticastManager::deserializeMulticastReplicationEntry (
181350 const std::string& key,
182- const std::vector<swss::FieldValueTuple>& attributes,
183- const std::string& table_name) {
351+ const std::vector<swss::FieldValueTuple>& attributes) {
184352 SWSS_LOG_ENTER ();
185353 P4MulticastReplicationEntry replication_entry = {};
186354 try {
@@ -211,7 +379,8 @@ L3MulticastManager::deserializeMulticastReplicationEntry(
211379 replication_entry.multicast_metadata = value;
212380 } else if (field != p4orch::kControllerMetadata ) {
213381 return ReturnCode (StatusCode::SWSS_RC_INVALID_PARAM)
214- << " Unexpected field " << QuotedVar (field) << " in " << table_name;
382+ << " Unexpected field " << QuotedVar (field) << " in "
383+ << APP_P4RT_REPLICATION_L2_MULTICAST_TABLE_NAME;
215384 }
216385 }
217386 return replication_entry;
@@ -245,9 +414,25 @@ std::string L3MulticastManager::verifyMulticastReplicationState(
245414ReturnCode L3MulticastManager::validateMulticastRouterInterfaceEntry (
246415 const P4MulticastRouterInterfaceEntry& multicast_router_interface_entry,
247416 const std::string& operation) {
248- return ReturnCode (StatusCode::SWSS_RC_UNIMPLEMENTED)
249- << " L3MulticastManager::verifyMulticastRouterInterfaceState is not "
250- << " implemented" ;
417+ // Confirm match fields are populated.
418+ if (multicast_router_interface_entry.multicast_replica_port .empty ()) {
419+ return ReturnCode (StatusCode::SWSS_RC_INVALID_PARAM)
420+ << " No match field entry multicast_replica_port provided" ;
421+ }
422+ if (multicast_router_interface_entry.multicast_replica_instance .empty ()) {
423+ return ReturnCode (StatusCode::SWSS_RC_INVALID_PARAM)
424+ << " No match field entry multicast_replica_instance provided" ;
425+ }
426+
427+ if (operation == SET_COMMAND) {
428+ return validateSetMulticastRouterInterfaceEntry (
429+ multicast_router_interface_entry);
430+ } else if (operation == DEL_COMMAND) {
431+ return validateDelMulticastRouterInterfaceEntry (
432+ multicast_router_interface_entry);
433+ }
434+ return ReturnCode (StatusCode::SWSS_RC_INVALID_PARAM)
435+ << " Unknown operation type " << QuotedVar (operation);
251436}
252437
253438ReturnCode L3MulticastManager::validateMulticastReplicationEntry (
@@ -258,13 +443,112 @@ ReturnCode L3MulticastManager::validateMulticastReplicationEntry(
258443 << " implemented" ;
259444}
260445
446+ ReturnCode L3MulticastManager::validateSetMulticastRouterInterfaceEntry (
447+ const P4MulticastRouterInterfaceEntry& multicast_router_interface_entry) {
448+ auto * router_interface_entry_ptr = getMulticastRouterInterfaceEntry (
449+ multicast_router_interface_entry.multicast_router_interface_entry_key );
450+
451+ bool is_update_operation = router_interface_entry_ptr != nullptr ;
452+ if (is_update_operation) {
453+ // Confirm RIF had SAI object ID.
454+ if (router_interface_entry_ptr->router_interface_oid ==
455+ SAI_OBJECT_TYPE_NULL) {
456+ return ReturnCode (StatusCode::SWSS_RC_NOT_FOUND)
457+ << " RIF was not assigned before updating multicast router "
458+ " interface "
459+ " entry with keys "
460+ << QuotedVar (
461+ multicast_router_interface_entry.multicast_replica_port )
462+ << " and "
463+ << QuotedVar (multicast_router_interface_entry
464+ .multicast_replica_instance );
465+ }
466+
467+ // Confirm we have a reference to the RIF object ID.
468+ if (m_rifOidToRouterInterfaceEntries.find (
469+ router_interface_entry_ptr->router_interface_oid ) ==
470+ m_rifOidToRouterInterfaceEntries.end ()) {
471+ return ReturnCode (StatusCode::SWSS_RC_NOT_FOUND)
472+ << " Expected RIF OID is missing from map: "
473+ << router_interface_entry_ptr->router_interface_oid ;
474+ }
475+
476+ // Confirm the RIF object ID exists in central mapper.
477+ std::string rif_key = KeyGenerator::generateMulticastRouterInterfaceRifKey (
478+ router_interface_entry_ptr->multicast_replica_port ,
479+ router_interface_entry_ptr->src_mac );
480+ bool exist_in_mapper =
481+ m_p4OidMapper->existsOID (SAI_OBJECT_TYPE_ROUTER_INTERFACE, rif_key);
482+ if (!exist_in_mapper) {
483+ return ReturnCode (StatusCode::SWSS_RC_NOT_FOUND)
484+ << " Multicast router interface entry exists in manager but RIF "
485+ " does "
486+ " not exist in the centralized map" ;
487+ }
488+ }
489+ // No additional validation required for add operation.
490+ return ReturnCode ();
491+ }
492+
493+ ReturnCode L3MulticastManager::validateDelMulticastRouterInterfaceEntry (
494+ const P4MulticastRouterInterfaceEntry& multicast_router_interface_entry) {
495+ auto * router_interface_entry_ptr = getMulticastRouterInterfaceEntry (
496+ multicast_router_interface_entry.multicast_router_interface_entry_key );
497+
498+ // Can't delete what isn't there.
499+ if (router_interface_entry_ptr == nullptr ) {
500+ return ReturnCode (StatusCode::SWSS_RC_NOT_FOUND)
501+ << " Multicast router interface entry exists does not exist" ;
502+ }
503+
504+ // Confirm we have a reference to the RIF object ID.
505+ if (m_rifOidToRouterInterfaceEntries.find (
506+ router_interface_entry_ptr->router_interface_oid ) ==
507+ m_rifOidToRouterInterfaceEntries.end ()) {
508+ return ReturnCode (StatusCode::SWSS_RC_NOT_FOUND)
509+ << " Expected RIF OID is missing from map: "
510+ << router_interface_entry_ptr->router_interface_oid ;
511+ }
512+
513+ // Confirm the RIF object ID exists in central mapper.
514+ std::string rif_key = KeyGenerator::generateMulticastRouterInterfaceRifKey (
515+ multicast_router_interface_entry.multicast_replica_port ,
516+ multicast_router_interface_entry.src_mac );
517+ if (!m_p4OidMapper->existsOID (SAI_OBJECT_TYPE_ROUTER_INTERFACE, rif_key)) {
518+ RETURN_INTERNAL_ERROR_AND_RAISE_CRITICAL (
519+ " Multicast router interface entry does not exist in the central map" );
520+ }
521+ return ReturnCode ();
522+ }
523+
261524ReturnCode L3MulticastManager::processMulticastRouterInterfaceEntries (
262525 std::vector<P4MulticastRouterInterfaceEntry>& entries,
263526 const std::deque<swss::KeyOpFieldsValuesTuple>& tuple_list,
264527 const std::string& op, bool update) {
265- return ReturnCode (StatusCode::SWSS_RC_UNIMPLEMENTED)
266- << " L3MulticastManager::processMulticastRouterInterfaceEntries is not "
267- << " implemented" ;
528+ SWSS_LOG_ENTER ();
529+
530+ ReturnCode status;
531+ std::vector<ReturnCode> statuses;
532+ // In syncd, bulk SAI calls use mode SAI_BULK_OP_ERROR_MODE_STOP_ON_ERROR.
533+ if (op == SET_COMMAND) {
534+ if (!update) {
535+ statuses = addMulticastRouterInterfaceEntries (entries);
536+ } else {
537+ statuses = updateMulticastRouterInterfaceEntries (entries);
538+ }
539+ } else {
540+ statuses = deleteMulticastRouterInterfaceEntries (entries);
541+ }
542+ // Check status of each entry.
543+ for (size_t i = 0 ; i < entries.size (); ++i) {
544+ m_publisher->publish (APP_P4RT_TABLE_NAME, kfvKey (tuple_list[i]),
545+ kfvFieldsValues (tuple_list[i]), statuses[i],
546+ /* replace=*/ true );
547+ if (status.ok () && !statuses[i].ok ()) {
548+ status = statuses[i];
549+ }
550+ }
551+ return status;
268552}
269553
270554ReturnCode L3MulticastManager::createRouterInterface (
0 commit comments