diff --git a/include/bm/bm_sim/P4Objects.h b/include/bm/bm_sim/P4Objects.h index b2720fe2f..f4fe34f75 100644 --- a/include/bm/bm_sim/P4Objects.h +++ b/include/bm/bm_sim/P4Objects.h @@ -545,6 +545,8 @@ class P4Objects { void enable_arith(header_id_t header_id, int field_offset); void enable_arith(header_id_t header_id); + bool is_selector_fanout( + const Json::Value &cfg_next_nodes) const; std::unique_ptr process_cfg_selector( const Json::Value &cfg_selector) const; }; diff --git a/include/bm/bm_sim/action_profile.h b/include/bm/bm_sim/action_profile.h index 46fa34c21..a7bb807fe 100644 --- a/include/bm/bm_sim/action_profile.h +++ b/include/bm/bm_sim/action_profile.h @@ -115,7 +115,7 @@ class ActionProfile : public NamedP4Object { }; ActionProfile(const std::string &name, p4object_id_t id, bool with_selection); - + bool has_selection() const; void set_hash(std::unique_ptr h) { @@ -165,6 +165,11 @@ class ActionProfile : public NamedP4Object { void serialize(std::ostream *out) const; void deserialize(std::istream *in, const P4Objects &objs); + void set_selector_fanout(); + + std::vector get_all_mbrs_from_grp(grp_hdl_t grp) const; + std::vector get_entries_with_mbrs(const std::vector &mbrs) const; + private: using ReadLock = boost::shared_lock; using WriteLock = boost::unique_lock; @@ -291,6 +296,10 @@ class ActionProfile : public NamedP4Object { bool group_is_empty(grp_hdl_t grp) const; + bool is_selector_fanout_enabled() const { + return selector_fanout_enabled; + } + const ActionEntry &lookup(const Packet &pkt, const IndirectIndex &index) const; @@ -307,6 +316,7 @@ class ActionProfile : public NamedP4Object { std::shared_ptr grp_selector_{nullptr}; GroupSelectionIface *grp_selector{&grp_mgr}; std::unique_ptr hash{nullptr}; + bool selector_fanout_enabled{false}; }; } // namespace bm diff --git a/include/bm/bm_sim/event_logger.h b/include/bm/bm_sim/event_logger.h index 10622d193..8a454b0a5 100644 --- a/include/bm/bm_sim/event_logger.h +++ b/include/bm/bm_sim/event_logger.h @@ -99,7 +99,8 @@ class EventLogger { void action_execute(const Packet &packet, const ActionFn &action_fn, const ActionData &action_data); - + void fanout_gen(const Packet &packet, uint64_t table_id, + uint64_t parent_pkt_copy_id); void config_change(); static EventLogger *get() { diff --git a/include/bm/bm_sim/fanout_pkt_mgr.h b/include/bm/bm_sim/fanout_pkt_mgr.h new file mode 100644 index 000000000..383120bbd --- /dev/null +++ b/include/bm/bm_sim/fanout_pkt_mgr.h @@ -0,0 +1,98 @@ +/* Copyright 2013-present Contributors to the P4 Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BM_BM_SIM_FANOUT_PKT_MGR_H_ +#define BM_BM_SIM_FANOUT_PKT_MGR_H_ + +#include "logger.h" +#include "packet.h" +#include "match_tables.h" +#include "action_profile.h" +#include + +namespace bm { +class MatchTableIndirect; +using bm::ActionProfile; +using EntryVec = const std::vector; +using SelectorIface = ActionProfile::GroupSelectionIface; + +struct FanoutCtx { + bool hit{false}; + std::vector fanout_pkts; + const Packet * cur_pkt{nullptr}; + ActionProfile *action_profile{nullptr}; + MatchTableIndirect *cur_table{nullptr}; +}; + +class FanoutPktSelection: public SelectorIface{ + public: + using grp_hdl_t = ActionProfile::grp_hdl_t; + using mbr_hdl_t = ActionProfile::mbr_hdl_t; + using hash_t = ActionProfile::hash_t; + using MatchErrorCode = bm::MatchErrorCode; + + FanoutPktSelection() = default; + + // callbacks after member op, not actual member/group ops + void add_member_to_group(grp_hdl_t grp, mbr_hdl_t mbr) override; + + void remove_member_from_group(grp_hdl_t grp, mbr_hdl_t mbr) override; + + mbr_hdl_t get_from_hash(grp_hdl_t grp, hash_t h) const override; + + void reset() override {} + + private: + std::unordered_map> groups; +}; + +class FanoutPktMgr { +public: + FanoutPktMgr(const FanoutPktMgr&) = delete; + FanoutPktMgr& operator=(const FanoutPktMgr&) = delete; + static FanoutPktMgr& instance() { + static FanoutPktMgr instance_; + return instance_; + } + + inline void register_thread(std::thread::id thread_id) { + BMLOG_DEBUG("Registering thread {}", thread_id); + fanout_ctx_map.emplace(thread_id, FanoutCtx()); + } + inline SelectorIface* get_grp_selector() { + return grp_selector; + } + + + + std::vector& get_fanout_pkts(); + FanoutCtx& get_fanout_ctx(); + void set_ctx(MatchTableIndirect *table, const Packet &pkt, ActionProfile *action_profile, bool hit); + void reset_ctx(); + void replicate_for_entries(const std::vector &entries); + + std::mutex fanout_pkt_mutex; + // TODO(Hao): deduplicate packets fanout, optional + + +private: + FanoutPktMgr() = default; + std::unordered_map fanout_ctx_map; + FanoutPktSelection fanout_selection; + SelectorIface* grp_selector{&fanout_selection}; +}; + +} // namespace bm + +#endif // BM_BM_SIM_FANOUT_PKT_MGR_H_ \ No newline at end of file diff --git a/include/bm/bm_sim/logger.h b/include/bm/bm_sim/logger.h index 831d10025..04e2eaa45 100644 --- a/include/bm/bm_sim/logger.h +++ b/include/bm/bm_sim/logger.h @@ -155,4 +155,10 @@ class Logger { bm::Logger::get()->error("[{}] [cxt {}] " s, (pkt).get_unique_id(), \ (pkt).get_context(), ##__VA_ARGS__) +#define BMLOG_WARN(...) bm::Logger::get()->warn(__VA_ARGS__) + +#define BMLOG_WARN_PKT(pkt, s, ...) \ + bm::Logger::get()->warn("[{}] [cxt {}] " s, (pkt).get_unique_id(), \ + (pkt).get_context(), ##__VA_ARGS__) + #endif // BM_BM_SIM_LOGGER_H_ diff --git a/include/bm/bm_sim/match_tables.h b/include/bm/bm_sim/match_tables.h index 6348c1170..cd699fb5a 100644 --- a/include/bm/bm_sim/match_tables.h +++ b/include/bm/bm_sim/match_tables.h @@ -39,6 +39,7 @@ #include "lookup_structures.h" #include "action_entry.h" #include "action_profile.h" +#include "fanout_pkt_mgr.h" namespace bm { @@ -385,6 +386,7 @@ class MatchTable : public MatchTableAbstract { class MatchTableIndirect : public MatchTableAbstract { public: + friend class FanoutPktMgr; using mbr_hdl_t = ActionProfile::mbr_hdl_t; using IndirectIndex = ActionProfile::IndirectIndex; diff --git a/include/bm/bm_sim/packet.h b/include/bm/bm_sim/packet.h index 0da8c2146..bcae47545 100644 --- a/include/bm/bm_sim/packet.h +++ b/include/bm/bm_sim/packet.h @@ -32,6 +32,7 @@ #include #include // for std::min #include +#include #include @@ -40,6 +41,7 @@ #include "parser_error.h" #include "phv_source.h" #include "phv_forward.h" +#include "control_flow.h" namespace bm { @@ -301,6 +303,11 @@ class Packet final { //! @copydoc clone_with_phv_reset_metadata std::unique_ptr clone_with_phv_reset_metadata_ptr() const; + //! Clone the current packet, along with its PHV and registers. + Packet clone_with_phv_and_registers() const; + //! @copydoc clone_with_phv_and_registers + std::unique_ptr clone_with_phv_and_registers_ptr() const; + //! Clone the current packet, without the PHV. The value of the fields in the //! clone will be undefined and should not be accessed before setting it //! first. @@ -315,6 +322,17 @@ class Packet final { //! @copydoc clone_choose_context std::unique_ptr clone_choose_context_ptr(cxt_id_t new_cxt) const; + // Packet fanout related methods + //! Returns true if the packet has a next node set + bool has_next_node() const; + //! Get the next node, if it exists + const ControlFlowNode *get_next_node() const; + //! Set the next node, which is used to next the packet processing + void set_next_node(const ControlFlowNode *node); + //! Reset the next node + void reset_next_node(); + + //! Deleted copy constructor Packet(const Packet &other) = delete; //! Deleted copy assignment operator @@ -385,6 +403,7 @@ class Packet final { bool checksum_error{false}; + std::optional next_node{std::nullopt}; private: static CopyIdGenerator *copy_id_gen; }; diff --git a/include/bm/bm_sim/pipeline.h b/include/bm/bm_sim/pipeline.h index 39d628950..9326cc551 100644 --- a/include/bm/bm_sim/pipeline.h +++ b/include/bm/bm_sim/pipeline.h @@ -40,10 +40,12 @@ class Pipeline : public NamedP4Object { : NamedP4Object(name, id), first_node(first_node) {} //! Sends the \p pkt through the correct match-action tables and - //! condiitons. Each step is determined based on the result of the previous + //! conditions. Each step is determined based on the result of the previous //! step (table lookup or condition evaluation), according to the P4 control //! flow graph. void apply(Packet *pkt); + // Start from next_node instead of first_node + void apply_from_next_node(Packet *pkt); //! Deleted copy constructor Pipeline(const Pipeline &other) = delete; diff --git a/src/bm_sim/CMakeLists.txt b/src/bm_sim/CMakeLists.txt index a5fd07924..bc5c0b9ee 100644 --- a/src/bm_sim/CMakeLists.txt +++ b/src/bm_sim/CMakeLists.txt @@ -28,6 +28,7 @@ add_library(bmsim OBJECT event_logger.cpp expressions.cpp extern.cpp + fanout_pkt_mgr.cpp fields.cpp headers.cpp header_unions.cpp diff --git a/src/bm_sim/Makefile.am b/src/bm_sim/Makefile.am index 56a809e55..b20cda636 100644 --- a/src/bm_sim/Makefile.am +++ b/src/bm_sim/Makefile.am @@ -37,6 +37,7 @@ event_logger.cpp \ expressions.cpp \ extern.cpp \ extract.h \ +fanout_pkt_mgr.cpp \ fields.cpp \ headers.cpp \ header_unions.cpp \ diff --git a/src/bm_sim/P4Objects.cpp b/src/bm_sim/P4Objects.cpp index 2cf9ffd18..8460b6a0c 100644 --- a/src/bm_sim/P4Objects.cpp +++ b/src/bm_sim/P4Objects.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include @@ -1655,8 +1657,20 @@ P4Objects::init_pipelines(const Json::Value &cfg_root, std::unique_ptr action_profile( new ActionProfile(act_prof_name, act_prof_id, with_selection)); if (with_selection) { - auto calc = process_cfg_selector(cfg_act_prof["selector"]); - action_profile->set_hash(std::move(calc)); + // TODO(Hao): clean debug logs + BMLOG_DEBUG( + "Action profile '{}' with id {} has a selector", + act_prof_name, act_prof_id); + + if(is_selector_fanout(cfg_act_prof["selector"])){ + BMLOG_DEBUG( + "Action profile '{}' with id {} enabled selector fanout", + act_prof_name, act_prof_id); + action_profile->set_selector_fanout(); + }else{ + auto calc = process_cfg_selector(cfg_act_prof["selector"]); + action_profile->set_hash(std::move(calc)); + } } add_action_profile(act_prof_name, std::move(action_profile)); } @@ -2499,6 +2513,11 @@ P4Objects::check_hash(const std::string &name) const { return nullptr; } +bool P4Objects::is_selector_fanout(const Json::Value &cfg_selector) const { + return cfg_selector.isMember("algo") && // not to hardcode names, fix later + cfg_selector["algo"].asString() == "selector_fanout"; +} + std::unique_ptr P4Objects::process_cfg_selector(const Json::Value &cfg_selector) const { const auto selector_algo = cfg_selector["algo"].asString(); diff --git a/src/bm_sim/action_profile.cpp b/src/bm_sim/action_profile.cpp index 2bc3dab82..b622e043d 100644 --- a/src/bm_sim/action_profile.cpp +++ b/src/bm_sim/action_profile.cpp @@ -22,12 +22,14 @@ #include #include #include +#include #include #include #include #include +#include // remove namespace bm { void @@ -126,6 +128,8 @@ ActionProfile::mbr_hdl_t ActionProfile::GroupMgr::get_from_hash(grp_hdl_t grp, hash_t h) const { const auto &group_info = groups.at(grp); auto s = group_info.size(); + BMLOG_DEBUG("Choosing member from group {} with hash {} (size {})", + grp, h, s); return group_info.get_nth(h % s); } @@ -184,6 +188,28 @@ ActionProfile::lookup(const Packet &pkt, const IndirectIndex &index) const { return action_entries[mbr]; } + +std::vector +ActionProfile::get_all_mbrs_from_grp(grp_hdl_t grp) const { + assert(is_valid_grp(grp)); + Group group; + MatchErrorCode rc = get_group(grp, &group); + _BM_UNUSED(rc); + assert(rc == MatchErrorCode::SUCCESS); + return group.mbr_handles; +} + +std::vector +ActionProfile::get_entries_with_mbrs(const std::vector &mbrs) const { + std::vector entries; + entries.reserve(mbrs.size()); + for (auto m : mbrs) { + assert(is_valid_mbr(m)); + entries.push_back(&action_entries[m]); + } + return entries; +} + bool ActionProfile::has_selection() const { return with_selection; } @@ -524,6 +550,10 @@ void ActionProfile::set_group_selector( std::shared_ptr selector) { WriteLock lock = lock_write(); + BMLOG_DEBUG("Setting group selector for action profile '{}'", + get_name()); + // output stack trace, remove later + BMLOG_DEBUG("Stack trace: {}", boost::stacktrace::stacktrace()); grp_selector_ = selector; grp_selector = grp_selector_.get(); } @@ -612,9 +642,18 @@ ActionProfile::ref_count_decrease(const IndirectIndex &index) { ActionProfile::mbr_hdl_t ActionProfile::choose_from_group(grp_hdl_t grp, const Packet &pkt) const { + // TODO(Hao): PI resets to it own grp_selector, might be a bug, so I just sets + // here for now.. + if(selector_fanout_enabled) { + return FanoutPktMgr::instance().get_grp_selector()->get_from_hash(grp, 0); + } if (!hash) return grp_selector->get_from_hash(grp, 0); hash_t h = static_cast(hash->output(pkt)); return grp_selector->get_from_hash(grp, h); } +void ActionProfile::set_selector_fanout(){ + selector_fanout_enabled = true; +} + } // namespace bm diff --git a/src/bm_sim/event_logger.cpp b/src/bm_sim/event_logger.cpp index 3de163f7e..9534f54ce 100644 --- a/src/bm_sim/event_logger.cpp +++ b/src/bm_sim/event_logger.cpp @@ -40,6 +40,7 @@ enum EventType { PIPELINE_START, PIPELINE_DONE, CONDITION_EVAL, TABLE_HIT, TABLE_MISS, ACTION_EXECUTE, + FANOUT_GEN, CONFIG_CHANGE = 999 }; @@ -52,6 +53,7 @@ struct msg_hdr_t { uint64_t copy_id; } __attribute__((packed)); + namespace { void @@ -267,6 +269,24 @@ EventLogger::action_execute(const Packet &packet, (void) action_data; } + +void +EventLogger::fanout_gen(const Packet &packet, uint64_t table_id, + uint64_t parent_pkt_copy_id) { + struct msg_t : msg_hdr_t { + uint64_t table_id; + uint64_t parent_packet_copy_id; + } __attribute__((packed)); + + msg_t msg; + fill_msg_hdr(EventType::FANOUT_GEN, device_id, packet, &msg); + msg.table_id = table_id; + msg.parent_packet_copy_id = parent_pkt_copy_id; + transport_instance->send(reinterpret_cast(&msg), sizeof(msg)); +} + + + void EventLogger::config_change() { msg_hdr_t msg; diff --git a/src/bm_sim/fanout_pkt_mgr.cpp b/src/bm_sim/fanout_pkt_mgr.cpp new file mode 100644 index 000000000..7352e41f3 --- /dev/null +++ b/src/bm_sim/fanout_pkt_mgr.cpp @@ -0,0 +1,139 @@ +/* Copyright 2013-present Contributors to the P4 Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace bm { + + +void FanoutPktSelection::add_member_to_group(grp_hdl_t grp, mbr_hdl_t mbr) { + (void) grp; + (void) mbr; +} + +void FanoutPktSelection::remove_member_from_group(grp_hdl_t grp, mbr_hdl_t mbr) { + (void) grp; + (void) mbr; +} + +FanoutPktSelection::mbr_hdl_t FanoutPktSelection::get_from_hash(grp_hdl_t grp, hash_t h) const { + (void)h; + auto &ctx = FanoutPktMgr::instance().get_fanout_ctx(); + auto *action_profile = ctx.action_profile; + if (!action_profile) { + BMLOG_ERROR("No action profile set for fanout packet selection"); + throw std::runtime_error("No action profile set for fanout packet selection"); + } + + std::vector mbrs = action_profile->get_all_mbrs_from_grp(grp); + mbr_hdl_t selected_mbr = mbrs.back(); + mbrs.pop_back(); + + auto entries = action_profile->get_entries_with_mbrs(mbrs); + BMLOG_DEBUG("Fanout Selected member {} from group {} with hash {} with number of entries {}", selected_mbr, grp, h, entries.size()); + FanoutPktMgr::instance().replicate_for_entries(entries); + return selected_mbr; +} + + +std::vector& FanoutPktMgr::get_fanout_pkts() { + std::thread::id thread_id = std::this_thread::get_id(); + BMLOG_DEBUG("Getting fanout packets for thread {}", thread_id); + + std::lock_guard lock(fanout_pkt_mutex); + auto it = fanout_ctx_map.find(thread_id); + if (it == fanout_ctx_map.end()) { + BMLOG_ERROR("No fanout vector registered for thread {}", thread_id); + throw std::runtime_error("Fanout vector not found for thread"); + } + return it->second.fanout_pkts; +} + +FanoutCtx& FanoutPktMgr::get_fanout_ctx() { + std::thread::id thread_id = std::this_thread::get_id(); + auto it = fanout_ctx_map.find(thread_id); + + std::lock_guard lock(fanout_pkt_mutex); + if (it == fanout_ctx_map.end()) { + BMLOG_ERROR("No fanout context registered for thread {}", thread_id); + throw std::runtime_error("Fanout context not found for thread"); + } + return it->second; +} +void FanoutPktMgr::set_ctx(MatchTableIndirect *table, const Packet &pkt, ActionProfile *action_profile, bool hit) { + auto &ctx = get_fanout_ctx(); + ctx.cur_table = table; + ctx.cur_pkt = &pkt; + ctx.action_profile = action_profile; + ctx.hit = hit; +} + +void FanoutPktMgr::reset_ctx() { + auto &ctx = get_fanout_ctx(); + ctx.cur_table = nullptr; + ctx.cur_pkt = nullptr; + ctx.action_profile = nullptr; +} + +void FanoutPktMgr::replicate_for_entries(const std::vector &entries) { + auto &fanout_pkts = get_fanout_pkts(); + auto &ctx = get_fanout_ctx(); + auto *match_table = ctx.cur_table; + const Packet &pkt = *ctx.cur_pkt; + bool hit = ctx.hit; + + // for event logger + uint64_t parent_pkt_copy_id = pkt.get_copy_id(); + uint64_t table_id = match_table->get_id(); + for(auto entry : entries) { + // TODO(Hao): apply_action in match_tables has a full procedure, + // need to make sure that directly applying the func does not + // cause any issues + //Things to consider: + // 1. set the entry index + // 2. set meters + // 3. set counters + // 4. incorporate with the debugger? + Packet* rep_pkt = pkt.clone_with_phv_and_registers_ptr().release(); + // why egress is not copied directly? i have to set it here + rep_pkt->set_egress_port(pkt.get_egress_port()); + + entry->action_fn(rep_pkt); + BMLOG_DEBUG_PKT(*rep_pkt, "Action {} applied to fanout packet", + *entry); + + auto act_id = entry->action_fn.get_action_id(); + const ControlFlowNode *next_node = hit ? + match_table->get_next_node(act_id) : + match_table->get_next_node_default(act_id); + if (next_node == nullptr) { + BMLOG_DEBUG_PKT(*rep_pkt, "No next node for action id {}", act_id); + } else { + BMLOG_DEBUG_PKT(*rep_pkt, "Next node for action id {}: {}", act_id, next_node->get_name()); + } + rep_pkt->set_next_node(next_node); + fanout_pkts.push_back(rep_pkt); + + BMELOG(fanout_gen, *rep_pkt, table_id, parent_pkt_copy_id); + } + + reset_ctx(); +} + + + + +} \ No newline at end of file diff --git a/src/bm_sim/match_tables.cpp b/src/bm_sim/match_tables.cpp index bd1064449..1ec68b7e0 100644 --- a/src/bm_sim/match_tables.cpp +++ b/src/bm_sim/match_tables.cpp @@ -134,7 +134,6 @@ MatchTableAbstract::apply_action(Packet *pkt) { Debugger::FIELD_ACTION, action_entry.action_fn.get_action_id()); BMLOG_DEBUG_PKT(*pkt, "Action entry is {}", action_entry); - action_entry.action_fn(pkt); return next_node; @@ -715,8 +714,12 @@ MatchTableIndirect::lookup(const Packet &pkt, pkt, "Lookup in table '{}' yielded empty group", get_name()); return empty_action; } - + // A bit hacky, in order to get the next_table for fanout. + if(action_profile->is_selector_fanout_enabled()){ + FanoutPktMgr::instance().set_ctx(this, pkt, action_profile, *hit); + } const auto &entry = action_profile->lookup(pkt, index); + // Unfortunately this has to be done at this stage and cannot be done when // inserting a member because for 2 match tables sharing the same action // profile (and therefore the same members), the next node mapping can vary diff --git a/src/bm_sim/packet.cpp b/src/bm_sim/packet.cpp index a88302db3..06479762f 100644 --- a/src/bm_sim/packet.cpp +++ b/src/bm_sim/packet.cpp @@ -164,6 +164,21 @@ Packet::clone_with_phv_ptr() const { return std::unique_ptr(new Packet(clone_with_phv())); } +Packet +Packet::clone_with_phv_and_registers() const { + copy_id_t new_copy_id = copy_id_gen->add_one(packet_id); + Packet pkt(cxt_id, ingress_port, packet_id, new_copy_id, ingress_length, + buffer.clone(buffer.get_data_size()), phv_source); + pkt.phv->copy_headers(*phv); + pkt.registers = registers; + return pkt; +} + +std::unique_ptr +Packet::clone_with_phv_and_registers_ptr() const { + return std::unique_ptr(new Packet(clone_with_phv_and_registers())); +} + Packet Packet::clone_with_phv_reset_metadata() const { copy_id_t new_copy_id = copy_id_gen->add_one(packet_id); @@ -207,6 +222,20 @@ Packet::clone_no_phv_ptr() const { return clone_choose_context_ptr(cxt_id); } +bool Packet::has_next_node() const { + return next_node.has_value(); +} +const ControlFlowNode *Packet::get_next_node() const { + return next_node.value_or(nullptr); +} +void Packet::set_next_node(const ControlFlowNode *node) { + next_node = node; +} + +void Packet::reset_next_node() { + next_node.reset(); +} + /* Cannot get away with defaults here, we need to swap the phvs, otherwise we could "leak" the old phv (i.e. not put it back into the pool) */ diff --git a/src/bm_sim/pipeline.cpp b/src/bm_sim/pipeline.cpp index a0bb4864e..575911a2f 100644 --- a/src/bm_sim/pipeline.cpp +++ b/src/bm_sim/pipeline.cpp @@ -37,11 +37,15 @@ Pipeline::apply(Packet *pkt) { BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': start", get_name()); const ControlFlowNode *node = first_node; while (node) { + BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': after applying node '{}'", + get_name(), node->get_name()); if (pkt->is_marked_for_exit()) { BMLOG_DEBUG_PKT(*pkt, "Packet is marked for exit, interrupting pipeline"); break; } node = (*node)(pkt); + BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': before applying node '{}'", + get_name(), node ? node->get_name() : "None"); } BMELOG(pipeline_done, *pkt, *this); DEBUGGER_NOTIFY_CTR( @@ -50,4 +54,22 @@ Pipeline::apply(Packet *pkt) { BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': end", get_name()); } +// TODO(Hao): +// Snapshot of metadata/reg states before calling this +// Support for debugger notify and event logging +void Pipeline::apply_from_next_node(Packet *pkt) { + const ControlFlowNode *node = pkt->get_next_node(); + BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': packet fanout from node '{}'", + get_name(), node? node->get_name() : "None"); + while (node) { + if (pkt->is_marked_for_exit()) { + BMLOG_DEBUG_PKT(*pkt, "Packet is marked for exit, interrupting pipeline"); + break; + } + node = (*node)(pkt); + } + pkt->reset_next_node(); + BMLOG_DEBUG_PKT(*pkt, "Pipeline '{}': fanout end", get_name()); +} + } // namespace bm diff --git a/src/bm_sim/tables.cpp b/src/bm_sim/tables.cpp index 1d55f4d5a..2c77e216b 100644 --- a/src/bm_sim/tables.cpp +++ b/src/bm_sim/tables.cpp @@ -43,6 +43,9 @@ MatchActionTable::operator()(Packet *pkt) const { DEBUGGER_NOTIFY_CTR( Debugger::PacketId::make(pkt->get_packet_id(), pkt->get_copy_id()), DBG_CTR_EXIT(DBG_CTR_TABLE) | get_id()); + BMLOG_TRACE_PKT(*pkt, "Next node after table '{}': {}", + get_name(), + next ? next->get_name() : "None"); return next; } diff --git a/targets/simple_switch/simple_switch.cpp b/targets/simple_switch/simple_switch.cpp index ec998fce9..9df9122ae 100644 --- a/targets/simple_switch/simple_switch.cpp +++ b/targets/simple_switch/simple_switch.cpp @@ -24,6 +24,7 @@ #include #include + #include #include @@ -128,6 +129,7 @@ class SimpleSwitch::InputBuffer { NORMAL, RESUBMIT, RECIRCULATE, + SELECTOR_FANOUT, SENTINEL // signal for the ingress thread to terminate }; @@ -141,6 +143,7 @@ class SimpleSwitch::InputBuffer { std::move(item), true); case PacketType::RESUBMIT: case PacketType::RECIRCULATE: + case PacketType::SELECTOR_FANOUT: return push_front(&queue_hi, capacity_hi, &cvar_can_push_hi, std::move(item), false); case PacketType::SENTINEL: @@ -155,7 +158,7 @@ class SimpleSwitch::InputBuffer { Lock lock(mutex); cvar_can_pop.wait( lock, [this] { return (queue_hi.size() + queue_lo.size()) > 0; }); - // give higher priority to resubmit/recirculate queue + // give higher priority to resubmit/recirculate/selector-fanout queue if (queue_hi.size() > 0) { *pItem = std::move(queue_hi.back()); queue_hi.pop_back(); @@ -179,7 +182,10 @@ class SimpleSwitch::InputBuffer { std::unique_ptr &&item, bool blocking) { Lock lock(mutex); while (queue->size() == capacity) { - if (!blocking) return 0; + if (!blocking){ + BMLOG_WARN_PKT(*item, "Input buffer is full, dropping packet"); + return 0; + } cvar->wait(lock); } queue->push_front(std::move(item)); @@ -274,9 +280,15 @@ void SimpleSwitch::start_and_return_() { check_queueing_metadata(); - threads_.push_back(std::thread(&SimpleSwitch::ingress_thread, this)); + auto ingress_thread = std::thread(&SimpleSwitch::ingress_thread, this); + FanoutPktMgr::instance().register_thread( + ingress_thread.get_id()); + threads_.push_back(std::move(ingress_thread)); for (size_t i = 0; i < nb_egress_threads; i++) { - threads_.push_back(std::thread(&SimpleSwitch::egress_thread, this, i)); + auto egress_thread = std::thread(&SimpleSwitch::egress_thread, this, i); + FanoutPktMgr::instance().register_thread( + egress_thread.get_id()); + threads_.push_back(std::move(egress_thread)); } threads_.push_back(std::thread(&SimpleSwitch::transmit_thread, this)); } @@ -479,20 +491,24 @@ void SimpleSwitch::ingress_thread() { PHV *phv; + while (1) { std::unique_ptr packet; + input_buffer->pop_back(&packet); if (packet == nullptr) break; // TODO(antonin): only update these if swapping actually happened? Parser *parser = this->get_parser("parser"); + + // TODO(antonin): only update these if swapping actually happened? Pipeline *ingress_mau = this->get_pipeline("ingress"); phv = packet->get_phv(); port_t ingress_port = packet->get_ingress_port(); (void) ingress_port; - BMLOG_DEBUG_PKT(*packet, "Processing packet received on port {}", + BMLOG_DEBUG_PKT(*packet, "Processing packet received on ingress port {}", ingress_port); auto ingress_packet_size = @@ -506,19 +522,36 @@ SimpleSwitch::ingress_thread() { parser leave the buffer unchanged, and move the pop logic to the deparser. TODO? */ const Packet::buffer_state_t packet_in_state = packet->save_buffer_state(); - parser->parse(packet.get()); - if (phv->has_field("standard_metadata.parser_error")) { - phv->get_field("standard_metadata.parser_error").set( - packet->get_error_code().get()); - } + // Check if the packet has an optional continue node + // TODO(Hao): update the doc/simple_switch.md + if(packet->has_next_node()){ + ingress_mau->apply_from_next_node(packet.get()); + } else { + parser->parse(packet.get()); + if (phv->has_field("standard_metadata.parser_error")) { + phv->get_field("standard_metadata.parser_error").set( + packet->get_error_code().get()); + } + + if (phv->has_field("standard_metadata.checksum_error")) { + phv->get_field("standard_metadata.checksum_error").set( + packet->get_checksum_error() ? 1 : 0); + } - if (phv->has_field("standard_metadata.checksum_error")) { - phv->get_field("standard_metadata.checksum_error").set( - packet->get_checksum_error() ? 1 : 0); + ingress_mau->apply(packet.get()); } - ingress_mau->apply(packet.get()); + // SELECTOR_FANOUT + { + auto &fanout_pkts = FanoutPktMgr::instance().get_fanout_pkts(); + for(auto pkt: fanout_pkts){ + input_buffer->push_front(InputBuffer::PacketType::SELECTOR_FANOUT, + std::unique_ptr(pkt)); + BMLOG_DEBUG_PKT(*pkt, "SELECTOR_FANOUT packet pushed to ingress_buffer"); + } + fanout_pkts.clear(); + } packet->reset_exit(); @@ -570,6 +603,7 @@ SimpleSwitch::ingress_thread() { ->get_field("standard_metadata.ingress_port") .set(ingress_port); parser->parse(packet_copy.get()); + copy_field_list_and_set_type(packet, packet_copy, PKT_INSTANCE_TYPE_INGRESS_CLONE, field_list_id); @@ -612,7 +646,7 @@ SimpleSwitch::ingress_thread() { ingress_packet_size); phv_copy->get_field("standard_metadata.packet_length") .set(ingress_packet_size); - input_buffer->push_front( + input_buffer->push_front( InputBuffer::PacketType::RESUBMIT, std::move(packet_copy)); continue; } @@ -638,6 +672,7 @@ SimpleSwitch::ingress_thread() { f_instance_type.set(PKT_INSTANCE_TYPE_NORMAL); enqueue(egress_port, std::move(packet)); + } } @@ -652,40 +687,58 @@ SimpleSwitch::egress_thread(size_t worker_id) { egress_buffers.pop_back(worker_id, &port, &priority, &packet); if (packet == nullptr) break; + BMLOG_DEBUG_PKT(*packet, "Processing packet in egress port {}", + worker_id); Deparser *deparser = this->get_deparser("deparser"); Pipeline *egress_mau = this->get_pipeline("egress"); phv = packet->get_phv(); + // TODO(Hao): why egress_spec is cleared when passed to egress? + // should bypass the metadata setting here for the replicated pkts + Field &f_egress_spec = phv->get_field("standard_metadata.egress_spec"); + + if(packet->has_next_node()){ + egress_mau->apply_from_next_node(packet.get()); + } else { + if (phv->has_field("intrinsic_metadata.egress_global_timestamp")) { + phv->get_field("intrinsic_metadata.egress_global_timestamp") + .set(get_ts().count()); + } - if (phv->has_field("intrinsic_metadata.egress_global_timestamp")) { - phv->get_field("intrinsic_metadata.egress_global_timestamp") - .set(get_ts().count()); - } - - if (with_queueing_metadata) { - auto enq_timestamp = - phv->get_field("queueing_metadata.enq_timestamp").get(); - phv->get_field("queueing_metadata.deq_timedelta").set( - get_ts().count() - enq_timestamp); - phv->get_field("queueing_metadata.deq_qdepth").set( - egress_buffers.size(port, priority)); - if (phv->has_field("queueing_metadata.qid")) { - auto &qid_f = phv->get_field("queueing_metadata.qid"); - qid_f.set(priority); + if (with_queueing_metadata) { + auto enq_timestamp = + phv->get_field("queueing_metadata.enq_timestamp").get(); + phv->get_field("queueing_metadata.deq_timedelta").set( + get_ts().count() - enq_timestamp); + phv->get_field("queueing_metadata.deq_qdepth").set( + egress_buffers.size(port, priority)); + if (phv->has_field("queueing_metadata.qid")) { + auto &qid_f = phv->get_field("queueing_metadata.qid"); + qid_f.set(priority); + } } - } - phv->get_field("standard_metadata.egress_port").set(port); + phv->get_field("standard_metadata.egress_port").set(port); - Field &f_egress_spec = phv->get_field("standard_metadata.egress_spec"); - // When egress_spec == drop_port the packet will be dropped, thus - // here we initialize egress_spec to a value different from drop_port. - f_egress_spec.set(drop_port + 1); + // When egress_spec == drop_port the packet will be dropped, thus + // here we initialize egress_spec to a value different from drop_port. + f_egress_spec.set(drop_port + 1); + + phv->get_field("standard_metadata.packet_length").set( + packet->get_register(RegisterAccess::PACKET_LENGTH_REG_IDX)); - phv->get_field("standard_metadata.packet_length").set( - packet->get_register(RegisterAccess::PACKET_LENGTH_REG_IDX)); + egress_mau->apply(packet.get()); + } - egress_mau->apply(packet.get()); + // SELECTOR_FANOUT + { + auto &fanout_pkts = FanoutPktMgr::instance().get_fanout_pkts(); + for(auto pkt: fanout_pkts){ + egress_buffers.push_front(worker_id, 0, std::unique_ptr(pkt)); + BMLOG_DEBUG_PKT(*pkt, "SELECTOR_FANOUT packet pushed to egress_buffer"); + } + fanout_pkts.clear(); + } auto clone_mirror_session_id = RegisterAccess::get_clone_mirror_session_id(packet.get()); diff --git a/targets/simple_switch/simple_switch.h b/targets/simple_switch/simple_switch.h index cdc751bd0..80f5a2ae8 100644 --- a/targets/simple_switch/simple_switch.h +++ b/targets/simple_switch/simple_switch.h @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include #include @@ -56,6 +58,7 @@ using bm::Pipeline; using bm::McSimplePreLAG; using bm::Field; using bm::FieldList; +using bm::FanoutPktMgr; using bm::packet_id_t; using bm::p4object_id_t; diff --git a/targets/simple_switch_grpc/configure.ac b/targets/simple_switch_grpc/configure.ac index a2e37cf79..c993a2560 100644 --- a/targets/simple_switch_grpc/configure.ac +++ b/targets/simple_switch_grpc/configure.ac @@ -69,7 +69,8 @@ AM_CONDITIONAL([WITH_THRIFT], [test "$with_thrift" = yes]) # Generate makefiles AC_CONFIG_FILES([Makefile - tests/Makefile]) + tests/Makefile + tests/test_selector_fanout/Makefile]) AC_CONFIG_FILES([tests/example.run], [chmod +x tests/example.run]) diff --git a/targets/simple_switch_grpc/tests/test_selector_fanout/Makefile.am b/targets/simple_switch_grpc/tests/test_selector_fanout/Makefile.am new file mode 100644 index 000000000..c20de6226 --- /dev/null +++ b/targets/simple_switch_grpc/tests/test_selector_fanout/Makefile.am @@ -0,0 +1,44 @@ +#TODO(Hao) merge this with the outer Makefile.am + +print-top-srcdir: + @echo "top_srcdir = $(top_srcdir)" \ + @echo "top_builddir = $(top_builddir)" \ + @echo "abs_srcdir = $(abs_srcdir)" + + +ACLOCAL_AMFLAGS = ${ACLOCAL_FLAGS} -I m4 + +# Ensures that gtest has been built +SUBDIRS = ../../../../third_party + +# for deprecated Protobuf fields +AM_CXXFLAGS += -Wno-error=deprecated-declarations + +AM_CPPFLAGS += \ +-isystem $(top_srcdir)/../../third_party/gtest/include \ +-I$(top_srcdir) \ +-I$(top_srcdir)/tests \ +-I$(top_srcdir)/../../include \ +-isystem $(top_srcdir)/../../third_party/spdlog \ +-I$(top_builddir)/../../services/cpp_out -I$(top_builddir)/../../services/grpc_out \ +-DTESTDATADIR=\"$(abs_srcdir)/testdata\" + +TESTS = test_selector_fanout + +check_PROGRAMS = test_selector_fanout + +common_source = ../utils.h ../utils.cpp ../base_test.h ../base_test.cpp + + +# TODO(Hao): a new suite for selector fanout tests, be carefull with the conflicts +test_selector_fanout_SOURCES = \ +$(common_source) ../main.cpp \ +test_ingress_fanout.cpp + +test_selector_fanout_LDADD = \ +$(top_builddir)/libsimple_switch_grpc.la \ +$(top_builddir)/../simple_switch/libsimpleswitch.la \ +-lpiprotogrpc -lpiprotobuf \ +$(PROTOBUF_LIBS) $(GRPC_LIBS) \ +$(top_builddir)/../../third_party/gtest/libgtest.la + diff --git a/targets/simple_switch_grpc/tests/test_selector_fanout/test_ingress_fanout.cpp b/targets/simple_switch_grpc/tests/test_selector_fanout/test_ingress_fanout.cpp new file mode 100644 index 000000000..72a2a50ef --- /dev/null +++ b/targets/simple_switch_grpc/tests/test_selector_fanout/test_ingress_fanout.cpp @@ -0,0 +1,578 @@ +/* Copyright 2025 Contributors to the P4 Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include "base_test.h" +#include "utils.h" + +namespace p4v1 = ::p4::v1; + +namespace sswitch_grpc { + +namespace testing { + +namespace { + + +class OutputPkt{ +public: + uint32_t eg_port; + unsigned char in_; + unsigned char hash_val; + unsigned char f1; + unsigned char f2; + unsigned char f3; + OutputPkt(uint32_t eg_port, const char* header) + : eg_port(eg_port), in_(header[0]), hash_val(header[1]), + f1(header[2]), f2(header[3]), f3(header[4]) {} + + bool operator<(const OutputPkt& other) const { + return std::tie(eg_port, in_, hash_val, f1, f2, f3) < + std::tie(other.eg_port, other.in_, other.hash_val, + other.f1, other.f2, other.f3); + } + bool operator==(const OutputPkt& other) const { + return std::tie(eg_port, in_, hash_val, f1, f2, f3) == + std::tie(other.eg_port, other.in_, other.hash_val, + other.f1, other.f2, other.f3); + } +}; + +class ActionInfo{ +public: + int action_id; + std::string action_name; + std::vector> params; // pair of param id and value + + ActionInfo(int action_id, const std::string &action_name, + const std::vector> ¶ms) + : action_id(action_id), action_name(action_name), params(params) {} +}; + +class SSGrpcFanoutTest_FanoutBase : public SimpleSwitchGrpcBaseTest { + protected: + using StreamType = grpc::ClientReaderWriter; + using GroupEntry = ::p4v1::MulticastGroupEntry; + + SSGrpcFanoutTest_FanoutBase(const char* proto, const char* json) + : SimpleSwitchGrpcBaseTest(proto), + p4json_file(json), + dataplane_channel(grpc::CreateChannel( + dp_grpc_server_addr, grpc::InsecureChannelCredentials())), + dataplane_stub(p4::bm::DataplaneInterface::NewStub( + dataplane_channel)) { } + + void SetUp() override { + SimpleSwitchGrpcBaseTest::SetUp(); + update_json(p4json_file); + } + + void send_packet(StreamType* stream, const char& in_, const int& ig_port) { + // The testing header is + // header hdr_t { + // bit<8> in_; + // bit<8> hash_val; + // bit<8> f1; + // bit<8> f2; + // bit<8> f3; + // } + std::string pkt(5, '\0'); + pkt[0] = in_; + p4::bm::PacketStreamRequest request; + request.set_device_id(device_id); + request.set_port(ig_port); + request.set_packet(pkt); + stream->Write(request); + } + + bool receive_packets(StreamReceiver &stream_receiver, const uint32_t& count, + std::multiset &received) { + const std::chrono::milliseconds timeout(500); + for (size_t i = 0; i < count; i++) { + auto msg = stream_receiver.get( + [](const p4::bm::PacketStreamResponse &) { return true; }, timeout); + if (msg == nullptr) return false; + auto port = msg->port(); + if (msg->packet().size() < 5) { + return false; + } + char header[5]; + memcpy(header, msg->packet().data(), 5); + received.emplace(port, header); + } + return true; + } + + // use the receive in pre + grpc::Status send_and_receive(const char* in_, const int& ig_port, + const uint32_t& count, std::multiset &received) { + ClientContext context; + auto dp_stream = dataplane_stub->PacketStream(&context); + StreamReceiver + stream_receiver(dp_stream.get()); + send_packet(dp_stream.get(), in_[0], ig_port); + + auto close_stream = [&dp_stream]() { + dp_stream->WritesDone(); + dp_stream->Finish(); + }; + + if (!receive_packets(stream_receiver, count, received)) { + close_stream(); + return grpc::Status(grpc::StatusCode::UNKNOWN, "Failed to receive packets"); + } + close_stream(); + return grpc::Status::OK; + } + + Status add_ap_table_entry( + const int &table_id, const std::string & tbl_name, + const std::string &key_field_name, + const std::string &key_value, const std::vector &actions) { + p4v1::Entity entity; + auto *entry = entity.mutable_table_entry(); + entry->set_table_id(table_id); + auto *mf = entry->add_match(); + mf->set_field_id(get_mf_id(p4info, tbl_name, key_field_name)); + mf->mutable_exact()->set_value(key_value); + auto act_set = entry->mutable_action()->mutable_action_profile_action_set(); + // Add actions as members of the selector's group + for (const auto &act_info : actions) { + auto *action_entry = act_set->add_action_profile_actions(); + auto *act = action_entry->mutable_action(); + act->set_action_id(act_info.action_id); + + for(auto param: act_info.params) { + auto *param_entry = act->add_params(); + param_entry->set_param_id(param.first); + param_entry->set_value(std::string(1, static_cast(param.second))); + } + + action_entry->set_weight(1); + } + return write(entity, p4v1::Update::INSERT); + } + + Status add_table_entry( + const int &table_id, const std::string & tbl_name, + const std::string &key_field_name, + const std::string &key_value, const ActionInfo &action) { + p4v1::Entity entity; + auto *entry = entity.mutable_table_entry(); + entry->set_table_id(table_id); + auto *mf = entry->add_match(); + mf->set_field_id(get_mf_id(p4info, tbl_name, key_field_name)); + mf->mutable_exact()->set_value(key_value); + auto act = entry->mutable_action()->mutable_action(); + act->set_action_id(action.action_id); + for (auto param : action.params) { + auto *param_entry = act->add_params(); + param_entry->set_param_id(param.first); + param_entry->set_value(std::string(1, static_cast(param.second))); + } + return write(entity, p4v1::Update::INSERT); + } + + Status create_mc_group(const GroupEntry &entry) { + p4v1::WriteRequest request; + request.set_device_id(device_id); + auto *update = request.add_updates(); + update->set_type(p4v1::Update::INSERT); + auto *entity = update->mutable_entity(); + auto *pre_entry = entity->mutable_packet_replication_engine_entry(); + auto *mc_entry = pre_entry->mutable_multicast_group_entry(); + mc_entry->CopyFrom(entry); + p4v1::WriteResponse response; + ClientContext context; + return Write(&context, request, &response); + } + + const char *p4json_file = nullptr; + std::shared_ptr dataplane_channel{nullptr}; + std::unique_ptr dataplane_stub{nullptr}; +}; + +constexpr char ingress_single_selector_json[] = TESTDATADIR "/ingress_single_selector_test.json"; +constexpr char ingress_single_selector_proto[] = TESTDATADIR "/ingress_single_selector_test.proto.txtpb"; + +class SSGrpcFanoutTest_SingleSelector : public SSGrpcFanoutTest_FanoutBase { + protected: + using StreamType = grpc::ClientReaderWriter; + + SSGrpcFanoutTest_SingleSelector() + : SSGrpcFanoutTest_FanoutBase(ingress_single_selector_proto, ingress_single_selector_json) + { } + + void SetUp() override { + SSGrpcFanoutTest_FanoutBase::SetUp(); + table_id = get_table_id(p4info, "selector_tbl"); + action_id_1 = get_action_id(p4info, "foo1"); + action_1_param_ids.push_back(get_param_id(p4info, "foo1", "val")); + action_1_param_ids.push_back(get_param_id(p4info, "foo1", "port")); + + action_id_2 = get_action_id(p4info, "foo2"); + action_2_param_ids.push_back(get_param_id(p4info, "foo2", "val")); + action_2_param_ids.push_back(get_param_id(p4info, "foo2", "port")); + + action_id_3 = get_action_id(p4info, "foo3"); + action_3_param_ids.push_back(get_param_id(p4info, "foo3", "val")); + action_3_param_ids.push_back(get_param_id(p4info, "foo3", "port")); + } + + const char *p4json_file = nullptr; + std::shared_ptr dataplane_channel{nullptr}; + std::unique_ptr dataplane_stub{nullptr}; + int table_id; + int action_id_1; + std::vector action_1_param_ids; + int action_id_2; + std::vector action_2_param_ids; + int action_id_3; + std::vector action_3_param_ids; +}; + + + +TEST_F(SSGrpcFanoutTest_SingleSelector, SingleGroup) { + + std::vector actions = { + {action_id_1, "foo1", {{action_1_param_ids[0], 1}, + {action_1_param_ids[1], 1}}}, + {action_id_2, "foo2", {{action_2_param_ids[0], 2}, + {action_2_param_ids[1], 2}}}, + {action_id_3, "foo3", {{action_3_param_ids[0], 3}, + {action_3_param_ids[1], 3}}} + }; + + EXPECT_TRUE(add_ap_table_entry(table_id, "selector_tbl", "h.hdr.in_", "\xab", actions).ok()); + + OutputPkt expected_1(1, "\xab\x00\x01\x00\x00"); + OutputPkt expected_2(2, "\xab\x00\x00\x02\x00"); + OutputPkt expected_3(3, "\xab\x00\x00\x00\x03"); + std::multiset expected_set = {expected_1, expected_2, expected_3}; + std::multiset received_set; + + EXPECT_TRUE(send_and_receive("\xab",1,3,received_set).ok()); + EXPECT_TRUE(received_set == expected_set); +} + +TEST_F(SSGrpcFanoutTest_SingleSelector, TwoGroups) { + + std::vector actions_1 = { + {action_id_1, "foo1", {{action_1_param_ids[0], 1}, + {action_1_param_ids[1], 1}}}, + {action_id_2, "foo2", {{action_2_param_ids[0], 2}, + {action_2_param_ids[1], 2}}}, + {action_id_3, "foo3", {{action_3_param_ids[0], 3}, + {action_3_param_ids[1], 3}}} + }; + + std::vector actions_2 = { + {action_id_1, "foo1", {{action_1_param_ids[0], 4}, + {action_1_param_ids[1], 1}}}, + {action_id_2, "foo2", {{action_2_param_ids[0], 5}, + {action_2_param_ids[1], 2}}}, + {action_id_2, "foo2", {{action_2_param_ids[0], 6}, + {action_2_param_ids[1], 3}}} + }; + EXPECT_TRUE(add_ap_table_entry(table_id, "selector_tbl", "h.hdr.in_", "\xab", actions_1).ok()); + EXPECT_TRUE(add_ap_table_entry(table_id, "selector_tbl", "h.hdr.in_", "\xaa", actions_2).ok()); + + + OutputPkt expected_1_1(1, "\xab\x00\x01\x00\x00"); + OutputPkt expected_1_2(2, "\xab\x00\x00\x02\x00"); + OutputPkt expected_1_3(3, "\xab\x00\x00\x00\x03"); + std::multiset expected_set_1 = {expected_1_1, expected_1_2, expected_1_3}; + std::multiset received_set_1; + + EXPECT_TRUE(send_and_receive("\xab",1,3,received_set_1).ok()); + EXPECT_TRUE(received_set_1 == expected_set_1); + + OutputPkt expected_2_1(1, "\xaa\x00\x04\x00\x00"); + OutputPkt expected_2_2(2, "\xaa\x00\x00\x05\x00"); + OutputPkt expected_2_3(3, "\xaa\x00\x00\x06\x00"); + std::multiset expected_set_2 = {expected_2_1, expected_2_2, expected_2_3}; + std::multiset received_set_2; + + EXPECT_TRUE(send_and_receive("\xaa",1,3,received_set_2).ok()); + EXPECT_TRUE(received_set_2 == expected_set_2); +} + + + + +constexpr char ingress_single_selector_mc_json[] = TESTDATADIR "/ingress_single_selector_mc_test.json"; +constexpr char ingress_single_selector_mc_proto[] = TESTDATADIR "/ingress_single_selector_mc_test.proto.txtpb"; + +class SSGrpcFanoutTest_SingleSelector_Multicast : public SSGrpcFanoutTest_FanoutBase{ +protected: + SSGrpcFanoutTest_SingleSelector_Multicast() + : SSGrpcFanoutTest_FanoutBase(ingress_single_selector_mc_proto, ingress_single_selector_mc_json) { } + + void SetUp() override { + SSGrpcFanoutTest_FanoutBase::SetUp(); + selector_table_id = get_table_id(p4info, "selector_tbl"); + selector_action_id_1 = get_action_id(p4info, "foo1"); + selector_action_1_param_ids.push_back(get_param_id(p4info, "foo1", "val")); + selector_action_1_param_ids.push_back(get_param_id(p4info, "foo1", "port")); + + selector_action_id_2 = get_action_id(p4info, "foo2"); + selector_action_2_param_ids.push_back(get_param_id(p4info, "foo2", "val")); + selector_action_2_param_ids.push_back(get_param_id(p4info, "foo2", "port")); + + mc_table_id = get_table_id(p4info, "multicast_tbl"); + mc_multicast_action_id = get_action_id(p4info, "multicast"); + mc_multicast_action_param_ids.push_back(get_param_id(p4info, "multicast", "mc_grp")); + mc_forward_action_id_2 = get_action_id(p4info, "forward"); + mc_forward_action_param_ids_2.push_back(get_param_id(p4info, "forward", "port")); + } + + int selector_table_id; + int selector_action_id_1; + std::vector selector_action_1_param_ids; + int selector_action_id_2; + std::vector selector_action_2_param_ids; + + int mc_table_id; + int mc_multicast_action_id; + std::vector mc_multicast_action_param_ids; + int mc_forward_action_id_2; + std::vector mc_forward_action_param_ids_2; + + +}; + + +TEST_F(SSGrpcFanoutTest_SingleSelector_Multicast, SingleMCGroup) { + int16_t group_id = 1; + GroupEntry group; + group.set_multicast_group_id(group_id); + // Set group members for group 1 + std::vector group_replicas = {1, 2, 3}; + for (const auto &r : group_replicas) { + auto *replica = group.add_replicas(); + replica->set_instance(r); + replica->set_egress_port(r); + } + EXPECT_TRUE(create_mc_group(group).ok()); + + std::vector selector_actions = { + {selector_action_id_1, "foo1", {{selector_action_1_param_ids[0], 1}, + {selector_action_1_param_ids[1], 1}}}, + {selector_action_id_2, "foo2", {{selector_action_2_param_ids[0], 2}, + {selector_action_2_param_ids[1], 2}}}}; + EXPECT_TRUE(add_ap_table_entry(selector_table_id, "selector_tbl", "h.hdr.in_", "\xab", selector_actions).ok()); + + ActionInfo multicast_action(mc_multicast_action_id, "multicast", + {{mc_multicast_action_param_ids[0], group_id}}); + EXPECT_TRUE(add_table_entry(mc_table_id, "multicast_tbl", "h.hdr.f1", "\xac", multicast_action).ok()); + + OutputPkt expected_1(1, "\xab\x00\x01\x00\x00"); + OutputPkt expected_2(2, "\xab\x00\x01\x00\x00"); + OutputPkt expected_3(3, "\xab\x00\x01\x00\x00"); + OutputPkt expected_4(1, "\xab\x00\x00\x02\x00"); + OutputPkt expected_5(2, "\xab\x00\x00\x02\x00"); + OutputPkt expected_6(3, "\xab\x00\x00\x02\x00"); + std::multiset expected_set = {expected_1, expected_2, expected_3, expected_4, expected_5, expected_6}; + std::multiset received_set; + + EXPECT_TRUE(send_and_receive("\xab",1,6,received_set).ok()); + EXPECT_TRUE(received_set == expected_set); +} + + + +constexpr char ingress_two_selectors_json[] = TESTDATADIR "/ingress_two_selectors_test.json"; +constexpr char ingress_two_selectors_proto[] = TESTDATADIR "/ingress_two_selectors_test.proto.txtpb"; + +class SSGrpcFanoutTest_TwoSelectors : public SSGrpcFanoutTest_FanoutBase { + protected: + using StreamType = grpc::ClientReaderWriter; + + SSGrpcFanoutTest_TwoSelectors() + : SSGrpcFanoutTest_FanoutBase(ingress_two_selectors_proto, ingress_two_selectors_json) + { } + + void SetUp() override { + SSGrpcFanoutTest_FanoutBase::SetUp(); + selector_table_id_1 = get_table_id(p4info, "selector_tbl_1"); + selector_table_id_2 = get_table_id(p4info, "selector_tbl_2"); + action_id_1 = get_action_id(p4info, "foo1"); + action_1_param_ids.push_back(get_param_id(p4info, "foo1", "val")); + action_1_param_ids.push_back(get_param_id(p4info, "foo1", "port")); + + action_id_2 = get_action_id(p4info, "foo2"); + action_2_param_ids.push_back(get_param_id(p4info, "foo2", "val")); + action_2_param_ids.push_back(get_param_id(p4info, "foo2", "port")); + + action_id_3 = get_action_id(p4info, "foo3"); + action_3_param_ids.push_back(get_param_id(p4info, "foo3", "val")); + action_3_param_ids.push_back(get_param_id(p4info, "foo3", "port")); + } + + const char *p4json_file = nullptr; + std::shared_ptr dataplane_channel{nullptr}; + std::unique_ptr dataplane_stub{nullptr}; + int selector_table_id_1; + int selector_table_id_2; + int action_id_1; + std::vector action_1_param_ids; + int action_id_2; + std::vector action_2_param_ids; + int action_id_3; + std::vector action_3_param_ids; +}; + +TEST_F(SSGrpcFanoutTest_TwoSelectors, SingleGroup) { + // Round robin selector mode would not be able to handle this case + std::vector actions_1 = { + {action_id_1, "foo1", {{action_1_param_ids[0], 1}, + {action_1_param_ids[1], 1}}}, + {action_id_2, "foo2", {{action_2_param_ids[0], 2}, + {action_2_param_ids[1], 2}}}, + {action_id_3, "foo3", {{action_3_param_ids[0], 3}, + {action_3_param_ids[1], 3}}} + }; + + std::vector actions_2 = { + {action_id_1, "foo1", {{action_1_param_ids[0], 4}, + {action_1_param_ids[1], 2}}}, + {action_id_2, "foo2", {{action_2_param_ids[0], 5}, + {action_2_param_ids[1], 3}}}, + {action_id_3, "foo3", {{action_3_param_ids[0], 6}, + {action_3_param_ids[1], 4}}} + + }; + + EXPECT_TRUE(add_ap_table_entry(selector_table_id_1, "selector_tbl_1", "h.hdr.in_", "\xab", actions_1).ok()); + EXPECT_TRUE(add_ap_table_entry(selector_table_id_2, "selector_tbl_2", "h.hdr.in_", "\xab", actions_2).ok()); + + OutputPkt expected_1(2, "\xab\x00\x04\x00\x00"); + OutputPkt expected_2(3, "\xab\x00\x01\x05\x00"); + OutputPkt expected_3(4, "\xab\x00\x01\x00\x06"); + + OutputPkt expected_4(2, "\xab\x00\x04\x02\x00"); + OutputPkt expected_5(3, "\xab\x00\x00\x05\x00"); + OutputPkt expected_6(4, "\xab\x00\x00\x02\x06"); + + OutputPkt expected_7(2, "\xab\x00\x04\x00\x03"); + OutputPkt expected_8(3, "\xab\x00\x00\x05\x03"); + OutputPkt expected_9(4, "\xab\x00\x00\x00\x06"); + + std::multiset expected_set = {expected_1, expected_2, expected_3, expected_4, expected_5, expected_6, expected_7, expected_8, expected_9}; + std::multiset received_set; + + EXPECT_TRUE(send_and_receive("\xab",1,9,received_set).ok()); + EXPECT_TRUE(received_set == expected_set); +} + + + + +constexpr char egress_single_selector_json[] = TESTDATADIR "/egress_single_selector_test.json"; +constexpr char egress_single_selector_proto[] = TESTDATADIR "/egress_single_selector_test.proto.txtpb"; + +class SSGrpcFanoutTest_EgressSingleSelector : public SSGrpcFanoutTest_FanoutBase { + protected: + using StreamType = grpc::ClientReaderWriter; + + SSGrpcFanoutTest_EgressSingleSelector() + : SSGrpcFanoutTest_FanoutBase(egress_single_selector_proto, egress_single_selector_json) + { } + + void SetUp() override { + SSGrpcFanoutTest_FanoutBase::SetUp(); + table_id = get_table_id(p4info, "selector_tbl"); + action_id_1 = get_action_id(p4info, "foo1"); + action_1_param_ids.push_back(get_param_id(p4info, "foo1", "val")); + + action_id_2 = get_action_id(p4info, "foo2"); + action_2_param_ids.push_back(get_param_id(p4info, "foo2", "val")); + + action_id_3 = get_action_id(p4info, "foo3"); + action_3_param_ids.push_back(get_param_id(p4info, "foo3", "val")); + } + + const char *p4json_file = nullptr; + std::shared_ptr dataplane_channel{nullptr}; + std::unique_ptr dataplane_stub{nullptr}; + int table_id; + int action_id_1; + std::vector action_1_param_ids; + int action_id_2; + std::vector action_2_param_ids; + int action_id_3; + std::vector action_3_param_ids; +}; + + + +TEST_F(SSGrpcFanoutTest_EgressSingleSelector, SingleGroup) { + int16_t group_id = 2; + GroupEntry group; + group.set_multicast_group_id(group_id); + // Set group members for group 2 + std::vector group_replicas = {1,2}; + for (const auto &r : group_replicas) { + auto *replica = group.add_replicas(); + replica->set_instance(r); + replica->set_egress_port(r); + } + EXPECT_TRUE(create_mc_group(group).ok()); + + std::vector actions = { + {action_id_1, "foo1", {{action_1_param_ids[0], 1}}}, + {action_id_2, "foo2", {{action_2_param_ids[0], 2}}}, + {action_id_3, "foo3", {{action_3_param_ids[0], 3}}} + }; + + EXPECT_TRUE(add_ap_table_entry(table_id, "selector_tbl", "h.hdr.in_", "\xab", actions).ok()); + + OutputPkt expected_1(1, "\xab\x00\x01\x00\x00"); + OutputPkt expected_2(1, "\xab\x00\x00\x02\x00"); + OutputPkt expected_3(1, "\xab\x00\x00\x00\x03"); + OutputPkt expected_4(2, "\xab\x00\x01\x00\x00"); + OutputPkt expected_5(2, "\xab\x00\x00\x02\x00"); + OutputPkt expected_6(2, "\xab\x00\x00\x00\x03"); + std::multiset expected_set = {expected_1, expected_2, expected_3, expected_4, expected_5, expected_6}; + std::multiset received_set; + + EXPECT_TRUE(send_and_receive("\xab",1,6,received_set).ok()); + EXPECT_TRUE(received_set == expected_set); +} + + +} // namespace + +} // namespace testing + +} // namespace sswitch_grpc diff --git a/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.json b/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.json new file mode 100644 index 000000000..ab589388a --- /dev/null +++ b/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.json @@ -0,0 +1,175 @@ +{ + "program" : "loopback.p4", + "__meta__" : { + "version" : [2, 18], + "compiler" : "https://github.com/p4lang/p4c" + }, + "header_types" : [ + { + "name" : "scalars_0", + "id" : 0, + "fields" : [] + }, + { + "name" : "standard_metadata", + "id" : 1, + "fields" : [ + ["ingress_port", 9, false], + ["egress_spec", 9, false], + ["egress_port", 9, false], + ["clone_spec", 32, false], + ["instance_type", 32, false], + ["drop", 1, false], + ["recirculate_port", 16, false], + ["packet_length", 32, false], + ["checksum_error", 1, false], + ["_padding", 3, false] + ] + } + ], + "headers" : [ + { + "name" : "scalars", + "id" : 0, + "header_type" : "scalars_0", + "metadata" : true, + "pi_omit" : true + }, + { + "name" : "standard_metadata", + "id" : 1, + "header_type" : "standard_metadata", + "metadata" : true, + "pi_omit" : true + } + ], + "header_stacks" : [], + "header_union_types" : [], + "header_unions" : [], + "header_union_stacks" : [], + "field_lists" : [], + "errors" : [ + ["NoError", 1], + ["PacketTooShort", 2], + ["NoMatch", 3], + ["StackOutOfBounds", 4], + ["HeaderTooShort", 5], + ["ParserTimeout", 6] + ], + "enums" : [], + "parsers" : [ + { + "name" : "parser", + "id" : 0, + "init_state" : "start", + "parse_states" : [ + { + "name" : "start", + "id" : 0, + "parser_ops" : [], + "transitions" : [ + { + "value" : "default", + "mask" : null, + "next_state" : null + } + ], + "transition_key" : [] + } + ] + } + ], + "parse_vsets" : [], + "deparsers" : [ + { + "name" : "deparser", + "id" : 0, + "order" : [] + } + ], + "meter_arrays" : [], + "counter_arrays" : [], + "register_arrays" : [], + "calculations" : [], + "learn_lists" : [], + "actions" : [ + { + "name" : "redirect", + "id" : 0, + "runtime_data" : [], + "primitives" : [ + { + "op" : "assign", + "parameters" : [ + { + "type" : "field", + "value" : ["standard_metadata", "egress_spec"] + }, + { + "type" : "field", + "value" : ["standard_metadata", "ingress_port"] + } + ], + "source_info" : { + "filename" : "loopback.p4", + "line" : 21, + "column" : 4, + "source_fragment" : "modify_field(standard_metadata.egress_spec, standard_metadata.ingress_port)" + } + } + ] + } + ], + "pipelines" : [ + { + "name" : "ingress", + "id" : 0, + "init_table" : "t_redirect", + "tables" : [ + { + "name" : "t_redirect", + "id" : 0, + "source_info" : { + "filename" : "loopback.p4", + "line" : 24, + "column" : 0, + "source_fragment" : "table t_redirect { ..." + }, + "key" : [], + "match_type" : "exact", + "type" : "simple", + "max_size" : 1024, + "with_counters" : false, + "support_timeout" : false, + "direct_meters" : null, + "action_ids" : [0], + "actions" : ["redirect"], + "base_default_next" : null, + "next_tables" : { + "redirect" : null + }, + "default_entry" : { + "action_id" : 0, + "action_const" : false, + "action_data" : [], + "action_entry_const" : false + } + } + ], + "action_profiles" : [], + "conditionals" : [] + }, + { + "name" : "egress", + "id" : 1, + "init_table" : null, + "tables" : [], + "action_profiles" : [], + "conditionals" : [] + } + ], + "checksums" : [], + "force_arith" : [], + "extern_instances" : [], + "field_aliases" : [] +} \ No newline at end of file diff --git a/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.p4 b/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.p4 new file mode 100644 index 000000000..4fe80ad32 --- /dev/null +++ b/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.p4 @@ -0,0 +1,33 @@ +/* Copyright 2013-present Barefoot Networks, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +parser start { + return ingress; +} + +action redirect() { + modify_field(standard_metadata.egress_spec, standard_metadata.ingress_port); +} + +table t_redirect { + actions { redirect; } + default_action: redirect(); +} + +control ingress { + apply(t_redirect); +} + +control egress { } diff --git a/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.proto.txt b/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.proto.txt new file mode 100644 index 000000000..113a090c3 --- /dev/null +++ b/targets/simple_switch_grpc/tests/test_selector_fanout/testdata/loopback.proto.txt @@ -0,0 +1,18 @@ +tables { + preamble { + id: 33591959 + name: "t_redirect" + alias: "t_redirect" + } + action_refs { + id: 16794474 + } + size: 1024 +} +actions { + preamble { + id: 16794474 + name: "redirect" + alias: "redirect" + } +} diff --git a/tools/nanomsg_client.py b/tools/nanomsg_client.py index 20e1ce33f..b20b4b097 100755 --- a/tools/nanomsg_client.py +++ b/tools/nanomsg_client.py @@ -83,7 +83,8 @@ class MSG_TYPES: CHECKSUM_UPDATE, PIPELINE_START, PIPELINE_DONE, CONDITION_EVAL, TABLE_HIT, TABLE_MISS, - ACTION_EXECUTE) = list(range(15)) + ACTION_EXECUTE, + FANOUT_GEN) = list(range(16)) CONFIG_CHANGE = 999 @staticmethod @@ -104,6 +105,7 @@ def get_msg_class(type_): MSG_TYPES.TABLE_HIT: TableHit, MSG_TYPES.TABLE_MISS: TableMiss, MSG_TYPES.ACTION_EXECUTE: ActionExecute, + MSG_TYPES.FANOUT_GEN: FanoutGen, MSG_TYPES.CONFIG_CHANGE: ConfigChange, } return classes[type_] @@ -126,6 +128,7 @@ def get_str(type_): MSG_TYPES.TABLE_HIT: "TABLE_HIT", MSG_TYPES.TABLE_MISS: "TABLE_MISS", MSG_TYPES.ACTION_EXECUTE: "ACTION_EXECUTE", + MSG_TYPES.FANOUT_GEN: "FANOUT_GEN", MSG_TYPES.CONFIG_CHANGE: "CONFIG_CHANGE", } return strs[type_] @@ -433,6 +436,24 @@ def __str__(self): s += " (" + name + ")" return s +class FanoutGen(Msg): + def __init__(self, msg): + super(FanoutGen, self).__init__(msg) + self.type_ = MSG_TYPES.FANOUT_GEN + self.type_str = MSG_TYPES.get_str(self.type_) + self.struct_ = struct.Struct("QQ") + + def extract(self): + self.table_id, self.parent_packet_copy_id = super(FanoutGen, self).extract() + + def __str__(self): + s = super(FanoutGen, self).__str__() + s += ", table_id: " + str(self.table_id) + name = name_lookup("table", self.table_id) + if name: + s += " (" + name + ")" + s += ", parent_packet_copy_id: " + str(self.parent_packet_copy_id) + return s class ConfigChange(Msg): def __init__(self, msg): @@ -472,8 +493,9 @@ def get_msg_type(msg): try: p = MSG_TYPES.get_msg_class(msg_type)(msg) - except: + except Exception as e: print("Unknown msg type", msg_type) + print("Error:", e) continue p.extract() print(p) diff --git a/tools/runtime_CLI.py b/tools/runtime_CLI.py index 700ccafda..cbcc5b27b 100755 --- a/tools/runtime_CLI.py +++ b/tools/runtime_CLI.py @@ -1536,7 +1536,7 @@ def complete_table_indirect_set_default(self, text, line, start_index, end_index @handle_bad_input def do_table_indirect_set_default_with_group(self, line): - "Set default group for indirect match table: table_indirect_set_default " + "Set default group for indirect match table: table_indirect_set_default_with_group
" table_name, handle = self.indirect_set_default_common(line, ws=True)