diff --git a/src/dev/Ethernet.py b/src/dev/Ethernet.py index 147ad156c..84fa0ae00 100644 --- a/src/dev/Ethernet.py +++ b/src/dev/Ethernet.py @@ -1,3 +1,15 @@ +# Copyright (c) 2015 ARM Limited +# All rights reserved. +# +# The license below extends only to copyright in the software and shall +# not be construed as granting a license to any other intellectual +# property including but not limited to intellectual property relating +# to a hardware implementation of the functionality of the software +# licensed hereunder. You may use the software subject to the license +# terms below provided that you ensure that this notice is replicated +# unmodified and in its entirety in all distributions of the software, +# modified or unmodified, in source code or in binary form. +# # Copyright (c) 2005-2007 The Regents of The University of Michigan # All rights reserved. # @@ -46,6 +58,20 @@ class EtherLink(EtherObject): speed = Param.NetworkBandwidth('1Gbps', "link speed") dump = Param.EtherDump(NULL, "dump object") +class MultiEtherLink(EtherObject): + type = 'MultiEtherLink' + cxx_header = "dev/multi_etherlink.hh" + int0 = SlavePort("interface 0") + delay = Param.Latency('0us', "packet transmit delay") + delay_var = Param.Latency('0ns', "packet transmit delay variability") + speed = Param.NetworkBandwidth('1Gbps', "link speed") + dump = Param.EtherDump(NULL, "dump object") + multi_rank = Param.UInt32('0', "Rank of the this gem5 process (multi run)") + sync_start = Param.Latency('5200000000000t', "first multi sync barrier") + sync_repeat = Param.Latency('10us', "multi sync barrier repeat") + server_name = Param.String('localhost', "Message server name") + server_port = Param.UInt32('2200', "Message server port") + class EtherBus(EtherObject): type = 'EtherBus' cxx_header = "dev/etherbus.hh" diff --git a/src/dev/SConscript b/src/dev/SConscript index 416e5052f..3936210a6 100644 --- a/src/dev/SConscript +++ b/src/dev/SConscript @@ -60,6 +60,10 @@ Source('etherdevice.cc') Source('etherdump.cc') Source('etherint.cc') Source('etherlink.cc') +Source('multi_packet.cc') +Source('multi_iface.cc') +Source('multi_etherlink.cc') +Source('tcp_iface.cc') Source('etherpkt.cc') Source('ethertap.cc') Source('i2cbus.cc') @@ -85,6 +89,8 @@ DebugFlag('DiskImageWrite') DebugFlag('DMA') DebugFlag('DMACopyEngine') DebugFlag('Ethernet') +DebugFlag('MultiEthernet') +DebugFlag('MultiEthernetPkt') DebugFlag('EthernetCksum') DebugFlag('EthernetDMA') DebugFlag('EthernetData') diff --git a/src/dev/etherpkt.cc b/src/dev/etherpkt.cc index 548a1f179..58b53b5e6 100644 --- a/src/dev/etherpkt.cc +++ b/src/dev/etherpkt.cc @@ -31,6 +31,7 @@ #include #include "base/misc.hh" +#include "base/inet.hh" #include "dev/etherpkt.hh" #include "sim/serialize.hh" @@ -50,3 +51,20 @@ EthPacketData::unserialize(const string &base, CheckpointIn &cp) if (length) arrayParamIn(cp, base + ".data", data, length); } + +void +EthPacketData::packAddress(uint8_t *src_addr, + uint8_t *dst_addr, + unsigned &nbytes) +{ + Net::EthHdr *hdr = (Net::EthHdr *)data; + assert(hdr->src().size() == hdr->dst().size()); + if (nbytes < hdr->src().size()) + panic("EthPacketData::packAddress() Buffer overflow"); + + memcpy(dst_addr, hdr->dst().bytes(), hdr->dst().size()); + memcpy(src_addr, hdr->src().bytes(), hdr->src().size()); + + nbytes = hdr->src().size(); +} + diff --git a/src/dev/etherpkt.hh b/src/dev/etherpkt.hh index acda9fb47..0d8998b1d 100644 --- a/src/dev/etherpkt.hh +++ b/src/dev/etherpkt.hh @@ -71,8 +71,23 @@ class EthPacketData ~EthPacketData() { if (data) delete [] data; } public: + /** + * This function pulls out the MAC source and destination addresses from + * the packet data and stores them in the caller specified buffers. + * + * @param src_addr The buffer to store the source MAC address. + * @param dst_addr The buffer to store the destination MAC address. + * @param length This is an inout parameter. The caller stores in this + * the size of the address buffers. On return, this will contain the + * actual address size stored in the buffers. (We assume that source + * address size is equal to that of the destination address.) + */ + void packAddress(uint8_t *src_addr, uint8_t *dst_addr, unsigned &length); + void serialize(const std::string &base, CheckpointOut &cp) const; void unserialize(const std::string &base, CheckpointIn &cp); + + unsigned size() const { return length; } }; typedef std::shared_ptr EthPacketPtr; diff --git a/src/dev/multi_etherlink.cc b/src/dev/multi_etherlink.cc new file mode 100644 index 000000000..b7d411ac3 --- /dev/null +++ b/src/dev/multi_etherlink.cc @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * Device module for a full duplex ethernet link for multi gem5 simulations. + */ + +#include "dev/multi_etherlink.hh" + +#include +#include +#include + +#include +#include +#include +#include + +#include "base/random.hh" +#include "base/trace.hh" +#include "debug/EthernetData.hh" +#include "debug/MultiEthernet.hh" +#include "debug/MultiEthernetPkt.hh" +#include "dev/etherdump.hh" +#include "dev/etherint.hh" +#include "dev/etherlink.hh" +#include "dev/etherobject.hh" +#include "dev/etherpkt.hh" +#include "dev/multi_iface.hh" +#include "dev/tcp_iface.hh" +#include "params/EtherLink.hh" +#include "sim/core.hh" +#include "sim/serialize.hh" +#include "sim/system.hh" + +using namespace std; + +MultiEtherLink::MultiEtherLink(const Params *p) + : EtherObject(p) +{ + DPRINTF(MultiEthernet,"MultiEtherLink::MultiEtherLink() " + "link delay:%llu\n", p->delay); + + txLink = new TxLink(name() + ".link0", this, p->speed, p->delay_var, + p->dump); + rxLink = new RxLink(name() + ".link1", this, p->delay, p->dump); + + // create the multi (TCP) interface to talk to the peer gem5 processes. + multiIface = new TCPIface(p->server_name, p->server_port, p->multi_rank, + p->sync_start, p->sync_repeat, this); + + localIface = new LocalIface(name() + ".int0", txLink, rxLink, multiIface); +} + +MultiEtherLink::~MultiEtherLink() +{ + delete txLink; + delete rxLink; + delete localIface; + delete multiIface; +} + +EtherInt* +MultiEtherLink::getEthPort(const std::string &if_name, int idx) +{ + if (if_name != "int0") { + return nullptr; + } else { + panic_if(localIface->getPeer(), "interface already connected to"); + } + return localIface; +} + +void MultiEtherLink::memWriteback() +{ + DPRINTF(MultiEthernet,"MultiEtherLink::memWriteback() called\n"); + multiIface->drainDone(); +} + +void +MultiEtherLink::serialize(CheckpointOut &cp) const +{ + multiIface->serialize("multiIface", cp); + txLink->serialize("txLink", cp); + rxLink->serialize("rxLink", cp); +} + +void +MultiEtherLink::unserialize(CheckpointIn &cp) +{ + multiIface->unserialize("multiIface", cp); + txLink->unserialize("txLink", cp); + rxLink->unserialize("rxLink", cp); +} + +void +MultiEtherLink::init() +{ + DPRINTF(MultiEthernet,"MultiEtherLink::init() called\n"); + multiIface->initRandom(); +} + +void +MultiEtherLink::startup() +{ + DPRINTF(MultiEthernet,"MultiEtherLink::startup() called\n"); + multiIface->startPeriodicSync(); +} + +void +MultiEtherLink::RxLink::setMultiInt(MultiIface *m) +{ + assert(!multiIface); + multiIface = m; + // Spawn a new receiver thread that will process messages + // coming in from peer gem5 processes. + // The receive thread will also schedule a (receive) doneEvent + // for each incoming data packet. + multiIface->spawnRecvThread(&doneEvent, linkDelay); +} + +void +MultiEtherLink::RxLink::rxDone() +{ + assert(!busy()); + + // retrieve the packet that triggered the receive done event + packet = multiIface->packetIn(); + + if (dump) + dump->dump(packet); + + DPRINTF(MultiEthernetPkt, "MultiEtherLink::MultiLink::rxDone() " + "packet received: len=%d\n", packet->length); + DDUMP(EthernetData, packet->data, packet->length); + + localIface->sendPacket(packet); + + packet = nullptr; +} + +void +MultiEtherLink::TxLink::txDone() +{ + if (dump) + dump->dump(packet); + + packet = nullptr; + assert(!busy()); + + localIface->sendDone(); +} + +bool +MultiEtherLink::TxLink::transmit(EthPacketPtr pkt) +{ + if (busy()) { + DPRINTF(MultiEthernet, "packet not sent, link busy\n"); + return false; + } + + packet = pkt; + Tick delay = (Tick)ceil(((double)pkt->length * ticksPerByte) + 1.0); + if (delayVar != 0) + delay += random_mt.random(0, delayVar); + + // send the packet to the peers + assert(multiIface); + multiIface->packetOut(pkt, delay); + + // schedule the send done event + parent->schedule(doneEvent, curTick() + delay); + + return true; +} + +void +MultiEtherLink::Link::serialize(const string &base, CheckpointOut &cp) const +{ + bool packet_exists = (packet != nullptr); + paramOut(cp, base + ".packet_exists", packet_exists); + if (packet_exists) + packet->serialize(base + ".packet", cp); + + bool event_scheduled = event->scheduled(); + paramOut(cp, base + ".event_scheduled", event_scheduled); + if (event_scheduled) { + Tick event_time = event->when(); + paramOut(cp, base + ".event_time", event_time); + } +} + +void +MultiEtherLink::Link::unserialize(const string &base, CheckpointIn &cp) +{ + bool packet_exists; + paramIn(cp, base + ".packet_exists", packet_exists); + if (packet_exists) { + packet = make_shared(16384); + packet->unserialize(base + ".packet", cp); + } + + bool event_scheduled; + paramIn(cp, base + ".event_scheduled", event_scheduled); + if (event_scheduled) { + Tick event_time; + paramIn(cp, base + ".event_time", event_time); + parent->schedule(*event, event_time); + } +} + +MultiEtherLink::LocalIface::LocalIface(const std::string &name, + TxLink *tx, + RxLink *rx, + MultiIface *m) : + EtherInt(name), txLink(tx) +{ + tx->setLocalInt(this); + rx->setLocalInt(this); + tx->setMultiInt(m); + rx->setMultiInt(m); +} + +MultiEtherLink * +MultiEtherLinkParams::create() +{ + return new MultiEtherLink(this); +} + + diff --git a/src/dev/multi_etherlink.hh b/src/dev/multi_etherlink.hh new file mode 100644 index 000000000..7d1352d60 --- /dev/null +++ b/src/dev/multi_etherlink.hh @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * Device module for a full duplex ethernet link for multi gem5 simulations. + * + * See comments in dev/multi_iface.hh for a generic description of multi + * gem5 simulations. + * + * This class is meant to be a drop in replacement for the EtherLink class for + * multi gem5 runs. + * + */ +#ifndef __DEV_MULTIETHERLINK_HH__ +#define __DEV_MULTIETHERLINK_HH__ + +#include + +#include "dev/etherlink.hh" +#include "params/MultiEtherLink.hh" + +class MultiIface; +class EthPacketData; + +/** + * Model for a fixed bandwidth full duplex ethernet link. + */ +class MultiEtherLink : public EtherObject +{ + protected: + class LocalIface; + + /** + * Model base class for a single uni-directional link. + * + * The link will encapsulate and transfer Ethernet packets to/from + * the message server. + */ + class Link + { + protected: + std::string objName; + MultiEtherLink *parent; + LocalIface *localIface; + EtherDump *dump; + MultiIface *multiIface; + Event *event; + EthPacketPtr packet; + + public: + Link(const std::string &name, MultiEtherLink *p, + EtherDump *d, Event *e) : + objName(name), parent(p), localIface(nullptr), dump(d), + multiIface(nullptr), event(e) {} + + ~Link() {} + + const std::string name() const { return objName; } + bool busy() const { return (bool)packet; } + void setLocalInt(LocalIface *i) { assert(!localIface); localIface=i; } + + void serialize(const std::string &base, CheckpointOut &cp) const; + void unserialize(const std::string &base, CheckpointIn &cp); + }; + + /** + * Model for a send link. + */ + class TxLink : public Link + { + protected: + /** + * Per byte send delay + */ + double ticksPerByte; + /** + * Random component of the send delay + */ + Tick delayVar; + + /** + * Send done callback. Called from doneEvent. + */ + void txDone(); + typedef EventWrapper DoneEvent; + friend void DoneEvent::process(); + DoneEvent doneEvent; + + public: + TxLink(const std::string &name, MultiEtherLink *p, + double invBW, Tick delay_var, EtherDump *d) : + Link(name, p, d, &doneEvent), ticksPerByte(invBW), + delayVar(delay_var), doneEvent(this) {} + ~TxLink() {} + + /** + * Register the multi interface to be used to talk to the + * peer gem5 processes. + */ + void setMultiInt(MultiIface *m) { assert(!multiIface); multiIface=m; } + + /** + * Initiate sending of a packet via this link. + * + * @param packet Ethernet packet to send + */ + bool transmit(EthPacketPtr packet); + }; + + /** + * Model for a receive link. + */ + class RxLink : public Link + { + protected: + + /** + * Transmission delay for the simulated Ethernet link. + */ + Tick linkDelay; + + /** + * Receive done callback method. Called from doneEvent. + */ + void rxDone() ; + typedef EventWrapper DoneEvent; + friend void DoneEvent::process(); + DoneEvent doneEvent; + + public: + + RxLink(const std::string &name, MultiEtherLink *p, + Tick delay, EtherDump *d) : + Link(name, p, d, &doneEvent), + linkDelay(delay), doneEvent(this) {} + ~RxLink() {} + + /** + * Register our multi interface to talk to the peer gem5 processes. + */ + void setMultiInt(MultiIface *m); + }; + + /** + * Interface to the local simulated system + */ + class LocalIface : public EtherInt + { + private: + TxLink *txLink; + + public: + LocalIface(const std::string &name, TxLink *tx, RxLink *rx, + MultiIface *m); + + bool recvPacket(EthPacketPtr pkt) { return txLink->transmit(pkt); } + void sendDone() { peer->sendDone(); } + bool isBusy() { return txLink->busy(); } + }; + + + protected: + /** + * Interface to talk to the peer gem5 processes. + */ + MultiIface *multiIface; + /** + * Send link + */ + TxLink *txLink; + /** + * Receive link + */ + RxLink *rxLink; + LocalIface *localIface; + + public: + typedef MultiEtherLinkParams Params; + MultiEtherLink(const Params *p); + ~MultiEtherLink(); + + const Params * + params() const + { + return dynamic_cast(_params); + } + + virtual EtherInt *getEthPort(const std::string &if_name, + int idx) M5_ATTR_OVERRIDE; + + void memWriteback() M5_ATTR_OVERRIDE; + void init() M5_ATTR_OVERRIDE; + void startup() M5_ATTR_OVERRIDE; + + void serialize(CheckpointOut &cp) const M5_ATTR_OVERRIDE; + void unserialize(CheckpointIn &cp) M5_ATTR_OVERRIDE; +}; + +#endif // __DEV_MULTIETHERLINK_HH__ diff --git a/src/dev/multi_iface.cc b/src/dev/multi_iface.cc new file mode 100644 index 000000000..c924c1a17 --- /dev/null +++ b/src/dev/multi_iface.cc @@ -0,0 +1,622 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * The interface class for multi gem5 simulations. + */ + +#include "dev/multi_iface.hh" + +#include +#include + +#include "base/random.hh" +#include "base/trace.hh" +#include "debug/MultiEthernet.hh" +#include "debug/MultiEthernetPkt.hh" +#include "dev/etherpkt.hh" +#include "sim/sim_exit.hh" +#include "sim/sim_object.hh" + + +MultiIface::Sync *MultiIface::sync = nullptr; +MultiIface::SyncEvent *MultiIface::syncEvent = nullptr; +unsigned MultiIface::recvThreadsNum = 0; +MultiIface *MultiIface::master = nullptr; + +bool +MultiIface::Sync::run(SyncTrigger t, Tick sync_tick) +{ + std::unique_lock sync_lock(lock); + + trigger = t; + if (trigger != SyncTrigger::periodic) { + DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n", + (unsigned)trigger); + } + + switch (state) { + case SyncState::asyncCkpt: + switch (trigger) { + case SyncTrigger::ckpt: + assert(MultiIface::syncEvent->interrupted == false); + state = SyncState::busy; + break; + case SyncTrigger::periodic: + if (waitNum == 0) { + // So all recv threads got an async checkpoint request already + // and a simExit is scheduled at the end of the current tick + // (i.e. it is a periodic sync scheduled at the same tick as + // the simExit). + state = SyncState::idle; + DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted " + "due to async ckpt scheduled\n"); + return false; + } else { + // we still need to wait for some receiver thread to get the + // aysnc ckpt request. We are going to proceed as 'interrupted' + // periodic sync. + state = SyncState::interrupted; + DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted " + "due to ckpt request is coming in\n"); + } + break; + case SyncTrigger::atomic: + assert(trigger != SyncTrigger::atomic); + } + break; + case SyncState::idle: + state = SyncState::busy; + break; + // Only one sync can be active at any time + case SyncState::interrupted: + case SyncState::busy: + assert(state != SyncState::interrupted); + assert(state != SyncState::busy); + break; + } + // Kick-off the sync unless we are in the middle of an interrupted + // periodic sync + if (state != SyncState::interrupted) { + assert(waitNum == 0); + waitNum = MultiIface::recvThreadsNum; + // initiate the global synchronisation + assert(MultiIface::master != nullptr); + MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick); + } + // now wait until all receiver threads complete the synchronisation + auto lf = [this]{ return waitNum == 0; }; + cv.wait(sync_lock, lf); + + // we are done + assert(state == SyncState::busy || state == SyncState::interrupted); + bool ret = (state != SyncState::interrupted); + state = SyncState::idle; + return ret; +} + +void +MultiIface::Sync::progress(MsgType msg) +{ + std::unique_lock sync_lock(lock); + + switch (msg) { + case MsgType::cmdAtomicSyncAck: + assert(state == SyncState::busy && trigger == SyncTrigger::atomic); + break; + case MsgType::cmdPeriodicSyncAck: + assert(state == SyncState::busy && trigger == SyncTrigger::periodic); + break; + case MsgType::cmdCkptSyncAck: + assert(state == SyncState::busy && trigger == SyncTrigger::ckpt); + break; + case MsgType::cmdCkptSyncReq: + switch (state) { + case SyncState::busy: + if (trigger == SyncTrigger::ckpt) { + // We are already in a checkpoint sync but got another ckpt + // sync request. This may happen if two (or more) peer gem5 + // processes try to start a ckpt nearly at the same time. + // Incrementing waitNum here (before decrementing it below) + // effectively results in ignoring this new ckpt sync request. + waitNum++; + break; + } + assert (waitNum == recvThreadsNum); + state = SyncState::interrupted; + // we need to fall over here to handle "recvThreadsNum == 1" case + case SyncState::interrupted: + assert(trigger == SyncTrigger::periodic); + assert(waitNum >= 1); + if (waitNum == 1) { + exitSimLoop("checkpoint"); + } + break; + case SyncState::idle: + // There is no on-going sync so we got an async ckpt request. If we + // are the only receiver thread then we need to schedule the + // checkpoint. Otherwise, only change the state to 'asyncCkpt' and + // let the last receiver thread to schedule the checkpoint at the + // 'asyncCkpt' case. + // Note that a periodic or resume sync may start later and that can + // trigger a state change to 'interrupted' (so the checkpoint may + // get scheduled at 'interrupted' case finally). + assert(waitNum == 0); + state = SyncState::asyncCkpt; + waitNum = MultiIface::recvThreadsNum; + // we need to fall over here to handle "recvThreadsNum == 1" case + case SyncState::asyncCkpt: + assert(waitNum >= 1); + if (waitNum == 1) + exitSimLoop("checkpoint"); + break; + default: + panic("Unexpected state for checkpoint request message"); + break; + } + break; + default: + panic("Unknown msg type"); + break; + } + waitNum--; + assert(state != SyncState::idle); + // Notify the simultaion thread if there is an on-going sync. + if (state != SyncState::asyncCkpt) { + sync_lock.unlock(); + cv.notify_one(); + } +} + +void MultiIface::SyncEvent::start(Tick start, Tick interval) +{ + assert(!scheduled()); + if (interval == 0) + panic("Multi synchronisation period must be greater than zero"); + repeat = interval; + schedule(start); +} + +void +MultiIface::SyncEvent::adjust(Tick start_tick, Tick repeat_tick) +{ + // The new multi interface may require earlier start of the + // synchronisation. + assert(scheduled() == true); + if (start_tick < when()) + reschedule(start_tick); + // The new multi interface may require more frequent synchronisation. + if (repeat == 0) + panic("Multi synchronisation period must be greater than zero"); + if (repeat < repeat_tick) + repeat = repeat_tick; +} + +void +MultiIface::SyncEvent::process() +{ + /* + * Note that this is a global event so this process method will be called + * by only exactly one thread. + */ + // if we are draining the system then we must not start a periodic sync (as + // it is not sure that all peer gem5 will reach this tick before taking + // the checkpoint). + if (isDraining == true) { + assert(interrupted == false); + interrupted = true; + DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted " + "due to draining\n"); + return; + } + if (interrupted == false) + scheduledAt = curTick(); + /* + * We hold the eventq lock at this point but the receiver thread may + * need the lock to schedule new recv events while waiting for the + * multi sync to complete. + * Note that the other simulation threads also release their eventq + * locks while waiting for us due to the global event semantics. + */ + curEventQueue()->unlock(); + // we do a global sync here + interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt); + // Global sync completed or got interrupted. + // we are expected to exit with the eventq lock held + curEventQueue()->lock(); + // schedule the next global sync event if this one completed. Otherwise + // (i.e. this one was interrupted by a checkpoint request), we will + // reschedule this one after the draining is complete. + if (!interrupted) + schedule(scheduledAt + repeat); +} + +void MultiIface::SyncEvent::resume() +{ + Tick sync_tick; + assert(!scheduled()); + if (interrupted) { + assert(curTick() >= scheduledAt); + // We have to complete the interrupted periodic sync asap. + // Note that this sync might be interrupted now again with a checkpoint + // request from a peer gem5... + sync_tick = curTick(); + schedule(sync_tick); + } else { + // So we completed the last periodic sync, let's find out the tick for + // next one + assert(curTick() > scheduledAt); + sync_tick = scheduledAt + repeat; + if (sync_tick < curTick()) + panic("Cannot resume periodic synchronisation"); + schedule(sync_tick); + } + DPRINTF(MultiEthernet, + "MultiIface::SyncEvent periodic sync resumed at %lld " + "(curTick:%lld)\n", sync_tick, curTick()); +} + +void MultiIface::SyncEvent::serialize(const std::string &base, + CheckpointOut &cp) const +{ + // Save the periodic multi sync schedule information + paramOut(cp, base + ".periodicSyncRepeat", repeat); + paramOut(cp, base + ".periodicSyncInterrupted", interrupted); + paramOut(cp, base + ".periodicSyncAt", scheduledAt); +} + +void MultiIface::SyncEvent::unserialize(const std::string &base, + CheckpointIn &cp) +{ + paramIn(cp, base + ".periodicSyncRepeat", repeat); + paramIn(cp, base + ".periodicSyncInterrupted", interrupted); + paramIn(cp, base + ".periodicSyncAt", scheduledAt); +} + +MultiIface::MultiIface(unsigned multi_rank, + Tick sync_start, + Tick sync_repeat, + EventManager *em) : + syncStart(sync_start), syncRepeat(sync_repeat), + recvThread(nullptr), eventManager(em), recvDone(nullptr), + scheduledRecvPacket(nullptr), linkDelay(0), rank(multi_rank) +{ + DPRINTF(MultiEthernet, "MultiIface() ctor rank:%d\n",multi_rank); + if (master == nullptr) { + assert(sync == nullptr); + assert(syncEvent == nullptr); + sync = new Sync(); + syncEvent = new SyncEvent(); + master = this; + } +} + +MultiIface::~MultiIface() +{ + assert(recvThread); + delete recvThread; + if (this == master) { + assert(syncEvent); + delete syncEvent; + assert(sync); + delete sync; + } +} + +void +MultiIface::packetOut(EthPacketPtr pkt, Tick send_delay) +{ + MultiHeaderPkt::Header header_pkt; + unsigned address_length = MultiHeaderPkt::maxAddressLength(); + + // Prepare a multi header packet for the Ethernet packet we want to + // send out. + header_pkt.msgType = MsgType::dataDescriptor; + header_pkt.sendTick = curTick(); + header_pkt.sendDelay = send_delay; + + // Store also the source and destination addresses. + pkt->packAddress(header_pkt.srcAddress, header_pkt.dstAddress, + address_length); + + header_pkt.dataPacketLength = pkt->size(); + + // Send out the multi hedare packet followed by the Ethernet packet. + sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress); + sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress); + DPRINTF(MultiEthernetPkt, + "MultiIface::sendDataPacket() done size:%d send_delay:%llu " + "src:0x%02x%02x%02x%02x%02x%02x " + "dst:0x%02x%02x%02x%02x%02x%02x\n", + pkt->size(), send_delay, + header_pkt.srcAddress[0], header_pkt.srcAddress[1], + header_pkt.srcAddress[2], header_pkt.srcAddress[3], + header_pkt.srcAddress[4], header_pkt.srcAddress[5], + header_pkt.dstAddress[0], header_pkt.dstAddress[1], + header_pkt.dstAddress[2], header_pkt.dstAddress[3], + header_pkt.dstAddress[4], header_pkt.dstAddress[5]); +} + +bool +MultiIface::recvHeader(MultiHeaderPkt::Header &header_pkt) +{ + // Blocking receive of an incoming multi header packet. + return recvRaw((void *)&header_pkt, sizeof(header_pkt)); +} + +void +MultiIface::recvData(const MultiHeaderPkt::Header &header_pkt) +{ + // We are here beacuse a header packet has been received implying + // that an Ethernet (data) packet is coming in next. + assert(header_pkt.msgType == MsgType::dataDescriptor); + // Allocate storage for the incoming Ethernet packet. + EthPacketPtr new_packet(new EthPacketData(header_pkt.dataPacketLength)); + // Now execute the blocking receive and store the incoming data directly + // in the new EthPacketData object. + if (! recvRaw((void *)(new_packet->data), header_pkt.dataPacketLength)) + panic("Missing data packet"); + + new_packet->length = header_pkt.dataPacketLength; + // Grab the event queue lock to schedule a new receive event for the + // data packet. + curEventQueue()->lock(); + // Compute the receive tick. It includes the send delay and the + // simulated link delay. + Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay; + DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, " + "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n", + header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick); + + if (recv_tick <= curTick()) { + panic("Simulators out of sync - missed packet receive by %llu ticks", + curTick() - recv_tick); + } + // Now we are about to schedule a recvDone event for the new data packet. + // We use the same recvDone object for all incoming data packets. If + // that is already scheduled - i.e. a receive event for a previous + // data packet is already pending - then we have to check whether the + // receive tick for the new packet is earlier than that of the currently + // pending event. Packets may arrive out-of-order with respect to + // simulated receive time. If that is the case, we need to re-schedule the + // recvDone event for the new packet. Otherwise, we save the packet + // pointer and the recv tick for the new packet in the recvQueue. See + // the implementation of the packetIn() method for comments on how this + // information is retrieved from the recvQueue by the simulation thread. + if (!recvDone->scheduled()) { + assert(recvQueue.size() == 0); + assert(scheduledRecvPacket == nullptr); + scheduledRecvPacket = new_packet; + eventManager->schedule(recvDone, recv_tick); + } else if (recvDone->when() > recv_tick) { + recvQueue.emplace(scheduledRecvPacket, recvDone->when()); + eventManager->reschedule(recvDone, recv_tick); + scheduledRecvPacket = new_packet; + } else { + recvQueue.emplace(new_packet, recv_tick); + } + curEventQueue()->unlock(); +} + +void +MultiIface::recvThreadFunc() +{ + EthPacketPtr new_packet; + MultiHeaderPkt::Header header; + + // The new receiver thread shares the event queue with the simulation + // thread (associated with the simulated Ethernet link). + curEventQueue(eventManager->eventQueue()); + // Main loop to wait for and process any incoming message. + for (;;) { + // recvHeader() blocks until the next multi header packet comes in. + if (!recvHeader(header)) { + // We lost connection to the peer gem5 processes most likely + // because one of them called m5 exit. So we stop here. + exit_message("info", 0, "Message server closed connection, " + "simulation is exiting"); + } + // We got a valid multi header packet, let's process it + if (header.msgType == MsgType::dataDescriptor) { + recvData(header); + } else { + // everything else must be synchronisation related command + sync->progress(header.msgType); + } + } +} + +EthPacketPtr +MultiIface::packetIn() +{ + // We are called within the process() method of the recvDone event. We + // return the packet that triggered the current receive event. + // If there is further packets in the recvQueue, we also have to schedule + // the recvEvent for the next packet with the smallest receive tick. + // The priority queue container ensures that smallest receive tick is + // always on the top of the queue. + assert(scheduledRecvPacket != nullptr); + EthPacketPtr next_packet = scheduledRecvPacket; + + if (! recvQueue.empty()) { + eventManager->schedule(recvDone, recvQueue.top().second); + scheduledRecvPacket = recvQueue.top().first; + recvQueue.pop(); + } else { + scheduledRecvPacket = nullptr; + } + + return next_packet; +} + +void +MultiIface::spawnRecvThread(Event *recv_done, Tick link_delay) +{ + assert(recvThread == nullptr); + // all receive thread must be spawned before simulation starts + assert(eventManager->eventQueue()->getCurTick() == 0); + + recvDone = recv_done; + linkDelay = link_delay; + + recvThread = new std::thread(&MultiIface::recvThreadFunc, this); + + recvThreadsNum++; +} + +DrainState +MultiIface::drain() +{ + DPRINTF(MultiEthernet,"MultiIFace::drain() called\n"); + + // This can be called multiple times in the same drain cycle. + if (master == this) { + syncEvent->isDraining = true; + } + + return DrainState::Drained; +} + +void MultiIface::drainDone() { + if (master == this) { + assert(syncEvent->isDraining == true); + syncEvent->isDraining = false; + // We need to resume the interrupted periodic sync here now that the + // draining is done. If the last periodic sync completed before the + // checkpoint then the next one is already scheduled. + if (syncEvent->interrupted) + syncEvent->resume(); + } +} + +void MultiIface::serialize(const std::string &base, CheckpointOut &cp) const +{ + // Drain the multi interface before the checkpoint is taken. We cannot call + // this as part of the normal drain cycle because this multi sync has to be + // called exactly once after the system is fully drained. + // Note that every peer will take a checkpoint but they may take it at + // different ticks. + // This sync request may interrupt an on-going periodic sync in some peers. + sync->run(SyncTrigger::ckpt, curTick()); + + // Save the periodic multi sync status + syncEvent->serialize(base, cp); + + unsigned n_rx_packets = recvQueue.size(); + if (scheduledRecvPacket != nullptr) + n_rx_packets++; + + paramOut(cp, base + ".nRxPackets", n_rx_packets); + + if (n_rx_packets > 0) { + assert(recvDone->scheduled()); + scheduledRecvPacket->serialize(base + ".rxPacket[0]", cp); + } + + for (unsigned i=1; i < n_rx_packets; i++) { + const RecvInfo recv_info = recvQueue.impl().at(i-1); + recv_info.first->serialize(base + csprintf(".rxPacket[%d]", i), cp); + Tick rx_tick = recv_info.second; + paramOut(cp, base + csprintf(".rxTick[%d]", i), rx_tick); + } +} + +void MultiIface::unserialize(const std::string &base, CheckpointIn &cp) +{ + assert(recvQueue.size() == 0); + assert(scheduledRecvPacket == nullptr); + assert(recvDone->scheduled() == false); + + // restore periodic sync info + syncEvent->unserialize(base, cp); + + unsigned n_rx_packets; + paramIn(cp, base + ".nRxPackets", n_rx_packets); + + if (n_rx_packets > 0) { + scheduledRecvPacket = std::make_shared(16384); + scheduledRecvPacket->unserialize(base + ".rxPacket[0]", cp); + // Note: receive event will be scheduled when the link is unserialized + } + + for (unsigned i=1; i < n_rx_packets; i++) { + EthPacketPtr rx_packet = std::make_shared(16384); + rx_packet->unserialize(base + csprintf(".rxPacket[%d]", i), cp); + Tick rx_tick = 0; + paramIn(cp, base + csprintf(".rxTick[%d]", i), rx_tick); + assert(rx_tick > 0); + recvQueue.emplace(rx_packet,rx_tick); + } +} + +void MultiIface::initRandom() +{ + // Initialize the seed for random generator to avoid the same sequence + // in all gem5 peer processes + assert(master != nullptr); + if (this == master) + random_mt.init(5489 * (rank+1) + 257); +} + +void MultiIface::startPeriodicSync() +{ + DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n"); + // Do a global sync here to ensure that peer gem5 processes are around + // (actually this may not be needed...) + sync->run(SyncTrigger::atomic, curTick()); + + // Start the periodic sync if it is a fresh simulation from scratch + if (curTick() == 0) { + if (this == master) { + syncEvent->start(syncStart, syncRepeat); + inform("Multi synchronisation activated: start at %lld, " + "repeat at every %lld ticks.\n", + syncStart, syncRepeat); + } else { + // In case another multiIface object requires different schedule + // for periodic sync than the master does. + syncEvent->adjust(syncStart, syncRepeat); + } + } else { + // Schedule the next periodic sync if resuming from a checkpoint + if (this == master) + syncEvent->resume(); + } + DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n"); +} diff --git a/src/dev/multi_iface.hh b/src/dev/multi_iface.hh new file mode 100644 index 000000000..0e4859ecd --- /dev/null +++ b/src/dev/multi_iface.hh @@ -0,0 +1,492 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * The interface class for multi gem5 simulations. + * + * Multi gem5 is an extension to gem5 to enable parallel simulation of a + * distributed system (e.g. simulation of a pool of machines + * connected by Ethernet links). A multi gem5 run consists of seperate gem5 + * processes running in parallel. Each gem5 process executes + * the simulation of a component of the simulated distributed system. + * (An example component can be a multi-core board with an Ethernet NIC.) + * The MultiIface class below provides services to transfer data and + * control messages among the gem5 processes. The main such services are + * as follows. + * + * 1. Send a data packet coming from a simulated Ethernet link. The packet + * will be transferred to (all) the target(s) gem5 processes. The send + * operation is always performed by the simulation thread, i.e. the gem5 + * thread that is processing the event queue associated with the simulated + * Ethernet link. + * + * 2. Spawn a receiver thread to process messages coming in from the + * from other gem5 processes. Each simulated Ethernet link has its own + * associated receiver thread. The receiver thread saves the incoming packet + * and schedule an appropriate receive event in the event queue. + * + * 3. Schedule a global barrier event periodically to keep the gem5 + * processes in sync. + * Periodic barrier event to keep peer gem5 processes in sync. The basic idea + * is that no gem5 process can go ahead further than the simulated link + * transmission delay to ensure that a corresponding receive event can always + * be scheduled for any message coming in from a peer gem5 process. + * + * + * + * This interface is an abstract class (sendRaw() and recvRaw() + * methods are pure virtual). It can work with various low level + * send/receive service implementations (e.g. TCP/IP, MPI,...). A TCP + * stream socket version is implemented in dev/src/tcp_iface.[hh,cc]. + */ +#ifndef __DEV_MULTI_IFACE_HH__ +#define __DEV_MULTI_IFACE_HH__ + +#include +#include +#include +#include +#include + +#include "dev/etherpkt.hh" +#include "dev/multi_packet.hh" +#include "sim/core.hh" +#include "sim/drain.hh" +#include "sim/global_event.hh" + +class EventManager; + +/** + * The interface class to talk to peer gem5 processes. + */ +class MultiIface : public Drainable +{ + public: + /*! + * The possible reasons a multi sync among gem5 peers is needed for. + */ + enum + class SyncTrigger { + periodic, /*!< Regular periodic sync. This can be interrupted by a + checkpoint sync request */ + ckpt, /*!< sync before taking a checkpoint */ + atomic /*!< sync that cannot be interrupted (e.g. sync at startup) */ + }; + + private: + typedef MultiHeaderPkt::MsgType MsgType; + + /** Sync State-Machine + \dot + digraph Sync { + node [shape=box, fontsize=10]; + idle -> busy + [ label="new trigger\n by run()" fontsize=8 ]; + busy -> busy + [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum > 1) || \n(msg==CkptSyncReq &&\ntrigger == ckpt)" fontsize=8 ]; + busy -> idle + [ label="new message by progress():\n(msg == SyncAck &&\nwaitNum == 1)" fontsize=8 ]; + busy -> interrupted + [ label="new message by progress():\n(msg == CkptSyncReq &&\ntrigger == periodic)" fontsize=8 ]; + idle -> asyncCkpt + [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ]; + asyncCkpt -> asyncCkpt + [ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ]; + asyncCkpt -> busy + [ label="new trigger by run():\ntrigger == ckpt" fontsize=8 ]; + asyncCkpt -> idle + [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum == 0) " fontsize=8 ]; + asyncCkpt -> interrupted + [ label="new trigger by run():\n(trigger == periodic &&\nwaitNum > 0) " fontsize=8 ]; + interrupted -> interrupted + [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum > 1)" fontsize=8 ]; + interrupted -> idle + [ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum == 1)" fontsize=8 ]; + } + \enddot + */ + /** @class Sync + * This class implements global sync operations among gem5 peer processes. + * + * @note This class is used as a singleton object (shared by all MultiIface + * objects). + */ + class Sync + { + private: + /*! + * Internal state of the sync singleton object. + */ + enum class SyncState { + busy, /*!< There is an on-going sync. */ + interrupted, /*!< An on-going periodic sync was interrupted. */ + asyncCkpt, /*!< A checkpoint (sim_exit) is already scheduled */ + idle /*!< There is no active sync. */ + }; + /** + * The lock to protect access to the MultiSync object. + */ + std::mutex lock; + /** + * Condition variable for the simulation thread to wait on + * until all receiver threads completes the current global + * synchronisation. + */ + std::condition_variable cv; + /** + * Number of receiver threads that not yet completed the current global + * synchronisation. + */ + unsigned waitNum; + /** + * The trigger for the most recent sync. + */ + SyncTrigger trigger; + /** + * Map sync triggers to request messages. + */ + std::array triggerToMsg = {{ + MsgType::cmdPeriodicSyncReq, + MsgType::cmdCkptSyncReq, + MsgType::cmdAtomicSyncReq + }}; + + /** + * Current sync state. + */ + SyncState state; + + public: + /** + * Core method to perform a full multi sync. + * + * @param t Sync trigger. + * @param sync_tick The tick the sync was expected to happen at. + * @return true if the sync completed, false if it was interrupted. + * + * @note In case of an interrupted periodic sync, sync_tick can be less + * than curTick() when we resume (i.e. re-run) it + */ + bool run(SyncTrigger t, Tick sync_tick); + /** + * Callback when the receiver thread gets a sync message. + */ + void progress(MsgType m); + + Sync() : waitNum(0), state(SyncState::idle) {} + ~Sync() {} + }; + + + /** + * The global event to schedule peridic multi sync. It is used as a + * singleton object. + * + * The periodic synchronisation works as follows. + * 1. A MultisyncEvent is scheduled as a global event when startup() is + * called. + * 2. The progress() method of the MultisyncEvent initiates a new barrier + * for each simulated Ethernet links. + * 3. Simulation thread(s) then waits until all receiver threads + * completes the ongoing barrier. The global sync event is done. + */ + class SyncEvent : public GlobalSyncEvent + { + public: + /** + * Flag to indicate that the most recent periodic sync was interrupted + * (by a checkpoint request). + */ + bool interrupted; + /** + * The tick when the most recent periodic synchronisation was scheduled + * at. + */ + Tick scheduledAt; + /** + * Flag to indicate an on-going drain cycle. + */ + bool isDraining; + + public: + /** + * Only the firstly instanstiated MultiIface object will + * call this constructor. + */ + SyncEvent() : GlobalSyncEvent(Default_Pri, 0), interrupted(false), + scheduledAt(0), isDraining(false) {} + + ~SyncEvent() { assert (scheduled() == false); } + /** + * Schedule the first periodic sync event. + * + * @param start Start tick for multi synchronisation + * @param repeat Frequency of multi synchronisation + * + */ + void start(Tick start, Tick repeat); + /** + * Reschedule (if necessary) the periodic sync event. + * + * @param start Start tick for multi synchronisation + * @param repeat Frequency of multi synchronisation + * + * @note Useful if we have multiple MultiIface objects with + * different 'start' and 'repeat' values for global sync. + */ + void adjust(Tick start, Tick repeat); + /** + * This is a global event so process() will be called by each + * simulation threads. (See further comments in the .cc file.) + */ + void process() M5_ATTR_OVERRIDE; + /** + * Schedule periodic sync when resuming from a checkpoint. + */ + void resume(); + + void serialize(const std::string &base, CheckpointOut &cp) const; + void unserialize(const std::string &base, CheckpointIn &cp); + }; + + /** + * The receive thread needs to store the packet pointer and the computed + * receive tick for each incoming data packet. This information is used + * by the simulation thread when it processes the corresponding receive + * event. (See more comments at the implemetation of the recvThreadFunc() + * and RecvPacketIn() methods.) + */ + typedef std::pair RecvInfo; + + /** + * Comparison predicate for RecvInfo, needed by the recvQueue. + */ + struct RecvInfoCompare { + bool operator()(const RecvInfo &lhs, const RecvInfo &rhs) + { + return lhs.second > rhs.second; + } + }; + + /** + * Customized priority queue used to store incoming data packets info by + * the receiver thread. We need to expose the underlying container to + * enable iterator access for serializing. + */ + class RecvQueue : public std::priority_queue, + RecvInfoCompare> + { + public: + std::vector &impl() { return c; } + const std::vector &impl() const { return c; } + }; + + /* + * The priority queue to store RecvInfo items ordered by receive ticks. + */ + RecvQueue recvQueue; + /** + * The singleton Sync object to perform multi synchronisation. + */ + static Sync *sync; + /** + * The singleton SyncEvent object to schedule periodic multi sync. + */ + static SyncEvent *syncEvent; + /** + * Tick to schedule the first multi sync event. + * This is just as optimization : we do not need any multi sync + * event until the simulated NIC is brought up by the OS. + */ + Tick syncStart; + /** + * Frequency of multi sync events in ticks. + */ + Tick syncRepeat; + /** + * Receiver thread pointer. + * Each MultiIface object must have exactly one receiver thread. + */ + std::thread *recvThread; + /** + * The event manager associated with the MultiIface object. + */ + EventManager *eventManager; + + /** + * The receive done event for the simulated Ethernet link. + * It is scheduled by the receiver thread for each incoming data + * packet. + */ + Event *recvDone; + + /** + * The packet that belongs to the currently scheduled recvDone event. + */ + EthPacketPtr scheduledRecvPacket; + + /** + * The link delay in ticks for the simulated Ethernet link. + */ + Tick linkDelay; + + /** + * The rank of this process among the gem5 peers. + */ + unsigned rank; + /** + * Total number of receiver threads (in this gem5 process). + * During the simulation it should be constant and equal to the + * number of MultiIface objects (i.e. simulated Ethernet + * links). + */ + static unsigned recvThreadsNum; + /** + * The very first MultiIface object created becomes the master. We need + * a master to co-ordinate the global synchronisation. + */ + static MultiIface *master; + + protected: + /** + * Low level generic send routine. + * @param buf buffer that holds the data to send out + * @param length number of bytes to send + * @param dest_addr address of the target (simulated NIC). This may be + * used by a subclass for optimization (e.g. optimize broadcast) + */ + virtual void sendRaw(void *buf, + unsigned length, + const MultiHeaderPkt::AddressType dest_addr) = 0; + /** + * Low level generic receive routine. + * @param buf the buffer to store the incoming message + * @param length buffer size (in bytes) + */ + virtual bool recvRaw(void *buf, unsigned length) = 0; + /** + * Low level request for synchronisation among gem5 processes. Only one + * MultiIface object needs to call this (in each gem5 process) to trigger + * a multi sync. + * + * @param sync_req Sync request command. + * @param sync_tick The tick when sync is expected to happen in the sender. + */ + virtual void syncRaw(MsgType sync_req, Tick sync_tick) = 0; + /** + * The function executed by a receiver thread. + */ + void recvThreadFunc(); + /** + * Receive a multi header packet. Called by the receiver thread. + * @param header the structure to store the incoming header packet. + * @return false if any error occured during the receive, true otherwise + * + * A header packet can carry a control command (e.g. 'barrier leave') or + * information about a data packet that is following the header packet + * back to back. + */ + bool recvHeader(MultiHeaderPkt::Header &header); + /** + * Receive a data packet. Called by the receiver thread. + * @param data_header The packet descriptor for the expected incoming data + * packet. + */ + void recvData(const MultiHeaderPkt::Header &data_header); + + public: + + /** + * ctor + * @param multi_rank Rank of this gem5 process within the multi run + * @param sync_start Start tick for multi synchronisation + * @param sync_repeat Frequency for multi synchronisation + * @param em The event manager associated with the simulated Ethernet link + */ + MultiIface(unsigned multi_rank, + Tick sync_start, + Tick sync_repeat, + EventManager *em); + + virtual ~MultiIface(); + /** + * Send out an Ethernet packet. + * @param pkt The Ethernet packet to send. + * @param send_delay The delay in ticks for the send completion event. + */ + void packetOut(EthPacketPtr pkt, Tick send_delay); + /** + * Fetch the next packet from the receive queue. + */ + EthPacketPtr packetIn(); + + /** + * spawn the receiver thread. + * @param recv_done The receive done event associated with the simulated + * Ethernet link. + * @param link_delay The link delay for the simulated Ethernet link. + */ + void spawnRecvThread(Event *recv_done, + Tick link_delay); + /** + * Initialize the random number generator with a different seed in each + * peer gem5 process. + */ + void initRandom(); + + DrainState drain() M5_ATTR_OVERRIDE; + + /** + * Callback when draining is complete. + */ + void drainDone(); + + /** + * Initialize the periodic synchronisation among peer gem5 processes. + */ + void startPeriodicSync(); + + void serialize(const std::string &base, CheckpointOut &cp) const; + void unserialize(const std::string &base, CheckpointIn &cp); + +}; + + +#endif diff --git a/src/dev/multi_packet.cc b/src/dev/multi_packet.cc new file mode 100644 index 000000000..1fd0c0e81 --- /dev/null +++ b/src/dev/multi_packet.cc @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * MultiHeaderPkt class to encapsulate multi-gem5 header packets + * + */ + +#include "dev/multi_packet.hh" + +#include +#include + +#include "base/inet.hh" + +unsigned +MultiHeaderPkt::maxAddressLength() +{ + return sizeof(AddressType); +} + +void +MultiHeaderPkt::clearAddress(AddressType &addr) +{ + std::memset(addr, 0, sizeof(addr)); +} + +bool +MultiHeaderPkt::isAddressEqual(const AddressType &addr1, + const AddressType &addr2) +{ + return (std::memcmp(addr1, addr2, sizeof(addr1)) == 0); +} + +bool +MultiHeaderPkt::isAddressLess(const AddressType &addr1, + const AddressType &addr2) +{ + return (std::memcmp(addr1, addr2, sizeof(addr1)) < 0); +} + +void +MultiHeaderPkt::copyAddress(AddressType &dest, const AddressType &src) +{ + std::memcpy(dest, src, sizeof(dest)); +} + +bool +MultiHeaderPkt::isBroadcastAddress(const AddressType &addr) +{ + return ((Net::EthAddr *)&addr)->broadcast(); +} + +bool +MultiHeaderPkt::isMulticastAddress(const AddressType &addr) +{ + return ((Net::EthAddr *)&addr)->multicast(); +} + +bool +MultiHeaderPkt::isUnicastAddress(const AddressType &addr) +{ + return ((Net::EthAddr *)&addr)->unicast(); +} diff --git a/src/dev/multi_packet.hh b/src/dev/multi_packet.hh new file mode 100644 index 000000000..4b270645d --- /dev/null +++ b/src/dev/multi_packet.hh @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * Header packet class for multi gem5 runs. + * + * For a high level description about multi gem5 see comments in + * header file multi_iface.hh. + * + * The MultiHeaderPkt class defines the format of message headers + * sent among gem5 processes during a multi gem5 simulation. A header packet + * can either carry the description of data packet (i.e. a simulated Ethernet + * packet) or a synchronisation related control command. In case of + * data packet description, the corresponding data packet always follows + * the header packet back-to-back. + */ +#ifndef __DEV_MULTI_PACKET_HH__ +#define __DEV_MULTI_PACKET_HH__ + +#include + +#include "base/types.hh" + +class MultiHeaderPkt +{ + private: + MultiHeaderPkt() {} + ~MultiHeaderPkt() {} + + public: + /** + * Simply type to help with calculating space requirements for + * the corresponding header field. + */ + typedef uint8_t AddressType[6]; + + /** + * The msg type defines what informarion a multi header packet carries. + */ + enum class MsgType + { + dataDescriptor, + cmdPeriodicSyncReq, + cmdPeriodicSyncAck, + cmdCkptSyncReq, + cmdCkptSyncAck, + cmdAtomicSyncReq, + cmdAtomicSyncAck, + unknown + }; + + struct Header + { + /** + * The msg type field is valid for all header packets. In case of + * a synchronisation control command this is the only valid field. + */ + MsgType msgType; + Tick sendTick; + Tick sendDelay; + /** + * Actual length of the simulated Ethernet packet. + */ + unsigned dataPacketLength; + /** + * Source MAC address. + */ + AddressType srcAddress; + /** + * Destination MAC address. + */ + AddressType dstAddress; + }; + + static unsigned maxAddressLength(); + + /** + * Static functions for manipulating and comparing MAC addresses. + */ + static void clearAddress(AddressType &addr); + static bool isAddressEqual(const AddressType &addr1, + const AddressType &addr2); + static bool isAddressLess(const AddressType &addr1, + const AddressType &addr2); + + static void copyAddress(AddressType &dest, + const AddressType &src); + + static bool isUnicastAddress(const AddressType &addr); + static bool isMulticastAddress(const AddressType &addr); + static bool isBroadcastAddress(const AddressType &addr); +}; + +#endif diff --git a/src/dev/tcp_iface.cc b/src/dev/tcp_iface.cc new file mode 100644 index 000000000..f18486fa3 --- /dev/null +++ b/src/dev/tcp_iface.cc @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * TCP stream socket based interface class implementation for multi gem5 runs. + */ + +#include "dev/tcp_iface.hh" + +#include +#include +#include +#include +#include + +#include +#include + +#include "base/types.hh" +#include "debug/MultiEthernet.hh" + +// MSG_NOSIGNAL does not exists on OS X +#if defined(__APPLE__) || defined(__MACH__) +#ifndef MSG_NOSIGNAL +#define MSG_NOSIGNAL SO_NOSIGPIPE +#endif +#endif + +using namespace std; + +vector TCPIface::sockRegistry; + +TCPIface::TCPIface(string server_name, unsigned server_port, + unsigned multi_rank, Tick sync_start, Tick sync_repeat, + EventManager *em) : + MultiIface(multi_rank, sync_start, sync_repeat, em) +{ + struct addrinfo addr_hint, *addr_results; + int ret; + + string port_str = to_string(server_port); + + sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + panic_if(sock < 0, "socket() failed: %s", strerror(errno)); + + bzero(&addr_hint, sizeof(addr_hint)); + addr_hint.ai_family = AF_INET; + addr_hint.ai_socktype = SOCK_STREAM; + addr_hint.ai_protocol = IPPROTO_TCP; + + ret = getaddrinfo(server_name.c_str(), port_str.c_str(), + &addr_hint, &addr_results); + panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno)); + + DPRINTF(MultiEthernet, "Connecting to %s:%u\n", + server_name.c_str(), port_str.c_str()); + + ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr), + addr_results->ai_addrlen); + panic_if(ret < 0, "connect() failed: %s", strerror(errno)); + + freeaddrinfo(addr_results); + // add our socket to the static registry + sockRegistry.push_back(sock); + // let the server know who we are + sendTCP(sock, &multi_rank, sizeof(multi_rank)); +} + +TCPIface::~TCPIface() +{ + int M5_VAR_USED ret; + + ret = close(sock); + assert(ret == 0); +} + +void +TCPIface::sendTCP(int sock, void *buf, unsigned length) +{ + ssize_t ret; + + ret = ::send(sock, buf, length, MSG_NOSIGNAL); + panic_if(ret < 0, "send() failed: %s", strerror(errno)); + panic_if(ret != length, "send() failed"); +} + +bool +TCPIface::recvTCP(int sock, void *buf, unsigned length) +{ + ssize_t ret; + + ret = ::recv(sock, buf, length, MSG_WAITALL ); + if (ret < 0) { + if (errno == ECONNRESET || errno == EPIPE) + inform("recv(): %s", strerror(errno)); + else if (ret < 0) + panic("recv() failed: %s", strerror(errno)); + } else if (ret == 0) { + inform("recv(): Connection closed"); + } else if (ret != length) + panic("recv() failed"); + + return (ret == length); +} + +void +TCPIface::syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick) +{ + /* + * Barrier is simply implemented by point-to-point messages to the server + * for now. This method is called by only one TCPIface object. + * The server will send back an 'ack' message when it gets the + * sync request from all clients. + */ + MultiHeaderPkt::Header header_pkt; + header_pkt.msgType = sync_req; + header_pkt.sendTick = sync_tick; + + for (auto s : sockRegistry) + sendTCP(s, (void *)&header_pkt, sizeof(header_pkt)); +} + diff --git a/src/dev/tcp_iface.hh b/src/dev/tcp_iface.hh new file mode 100644 index 000000000..d34d3d002 --- /dev/null +++ b/src/dev/tcp_iface.hh @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * TCP stream socket based interface class for multi gem5 runs. + * + * For a high level description about multi gem5 see comments in + * header file multi_iface.hh. + * + * The TCP subclass of MultiIface uses a separate server process + * (see tcp_server.[hh,cc] under directory gem5/util/multi). Each gem5 + * process connects to the server via a stream socket. The server process + * transfers messages and co-ordinates the synchronisation among the gem5 + * peers. + */ +#ifndef __DEV_TCP_IFACE_HH__ +#define __DEV_TCP_IFACE_HH__ + + +#include + +#include "dev/multi_iface.hh" + +class EventManager; + +class TCPIface : public MultiIface +{ + private: + /** + * The stream socket to connect to the server. + */ + int sock; + + /** + * Registry for all sockets to the server opened by this gem5 process. + */ + static std::vector sockRegistry; + + private: + + /** + * Send out a message through a TCP stream socket. + * + * @param sock TCP stream socket. + * @param buf Start address of the message. + * @param length Size of the message in bytes. + */ + void + sendTCP(int sock, void *buf, unsigned length); + + /** + * Receive the next incoming message through a TCP stream socket. + * + * @param sock TCP stream socket. + * @param buf Start address of buffer to store the message. + * @param length Exact size of the expected message in bytes. + */ + bool recvTCP(int sock, void *buf, unsigned length); + + + protected: + + virtual void + sendRaw(void *buf, unsigned length, + const MultiHeaderPkt::AddressType dest_addr=nullptr) + M5_ATTR_OVERRIDE + { + sendTCP(sock, buf, length); + } + + virtual bool recvRaw(void *buf, unsigned length) M5_ATTR_OVERRIDE + { + return recvTCP(sock, buf, length); + } + + virtual void syncRaw(MultiHeaderPkt::MsgType sync_req, + Tick sync_tick) M5_ATTR_OVERRIDE; + + public: + /** + * The ctor creates and connects the stream socket to the server. + * @param server_name The name (or IP address) of the host running the + * server process. + * @param server_port The port number the server listening for new + * connections. + * @param sync_start The tick for the first multi synchronisation. + * @param sync_repeat The frequency of multi synchronisation. + * @param em The EventManager object associated with the simulated + * Ethernet link. + */ + TCPIface(std::string server_name, unsigned server_port, + unsigned multi_rank, Tick sync_start, Tick sync_repeat, + EventManager *em); + + ~TCPIface() M5_ATTR_OVERRIDE; +}; + +#endif diff --git a/util/multi/Makefile b/util/multi/Makefile new file mode 100644 index 000000000..a58dd3307 --- /dev/null +++ b/util/multi/Makefile @@ -0,0 +1,63 @@ +# +# Copyright (c) 2015 ARM Limited +# All rights reserved +# +# The license below extends only to copyright in the software and shall +# not be construed as granting a license to any other intellectual +# property including but not limited to intellectual property relating +# to a hardware implementation of the functionality of the software +# licensed hereunder. You may use the software subject to the license +# terms below provided that you ensure that this notice is replicated +# unmodified and in its entirety in all distributions of the software, +# modified or unmodified, in source code or in binary form. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer; +# redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution; +# neither the name of the copyright holders nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Authors: Gabor Dozsa + +CXX= g++ + +DEBUG= -DDEBUG + +M5_ARCH?= ARM +M5_DIR?= ../.. + +vpath % $(M5_DIR)/build/$(M5_ARCH)/dev + +INCDIRS= -I. -I$(M5_DIR)/build/$(M5_ARCH) -I$(M5_DIR)/ext +CCFLAGS= -g -Wall -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS) + +default: tcp_server + +clean: + @rm -f tcp_server *.o *.d *~ + +tcp_server: tcp_server.o multi_packet.o + $(CXX) $(LFLAGS) -o $@ $^ + +%.o: %.cc + @echo '$(CXX) $(CCFLAGS) -c $(notdir $<) -o $@' + @$(CXX) $(CCFLAGS) -c $< -o $@ + +-include *.d diff --git a/util/multi/bootscript.rcS b/util/multi/bootscript.rcS new file mode 100644 index 000000000..95736f4b7 --- /dev/null +++ b/util/multi/bootscript.rcS @@ -0,0 +1,122 @@ +#!/bin/bash + + +# +# Copyright (c) 2015 ARM Limited +# All rights reserved +# +# The license below extends only to copyright in the software and shall +# not be construed as granting a license to any other intellectual +# property including but not limited to intellectual property relating +# to a hardware implementation of the functionality of the software +# licensed hereunder. You may use the software subject to the license +# terms below provided that you ensure that this notice is replicated +# unmodified and in its entirety in all distributions of the software, +# modified or unmodified, in source code or in binary form. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer; +# redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution; +# neither the name of the copyright holders nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Authors: Gabor Dozsa +# +# +# This is an example boot script to use for muti gem5 runs. The important +# task here is to extract the rank and size information from the kernel +# boot args and use those to configure MAC/IP addresses and hostname. +# Then we can kick off our (parallel) workload ... +# +# You are expected to costumize this scipt for your needs (e.g. change +# the command at the end of the scipt to run your tests/workloads. + +source /root/.bashrc +echo "bootscript.rcS is running" + +m='GEM5\_RANK=([0-9]+) GEM5\_SIZE=([0-9]+)' +if [[ $(cat /proc/cmdline) =~ $m ]] +then + MY_RANK=${BASH_REMATCH[1]} + MY_SIZE=${BASH_REMATCH[2]} +else + echo "(E) GEM5_RANK/GEM5_SIZE was not defined in bootargs, exiting ..." + /sbin/m5 abort +fi + +/bin/hostname node${MY_RANK} + +# Keep MAC address assignment simple for now ... +(($MY_RANK>97)) && { echo "(E) Rank must be less than 98"; /sbin/m5 abort; } +((MY_ADDR=MY_RANK+2)) +if (($MY_ADDR<10)) +then + MY_ADDR_PADDED=0${MY_ADDR} +else + MY_ADDR_PADDED=${MY_ADDR} +fi + +/sbin/ifconfig eth0 hw ether 00:90:00:00:00:${MY_ADDR_PADDED} +/sbin/ifconfig eth0 192.168.0.${MY_ADDR} netmask 255.255.255.0 up + +/sbin/ifconfig -a + +# Prepare host lists for mpirun +MY_MPI_HOSTS="192.168.0.2" +for ((i=1; i/dev/null + [ "x$WATCHDOG_PID" != "x" ] && kill $WATCHDOG_PID 2>/dev/null + exit -1 +} + + +# We need a watchdog to trigger full clean up if a gem5 process dies +watchdog_func () +{ + while true + do + sleep 30 + ((NDEAD=0)) + for p in ${SSH_PIDS[*]} + do + kill -0 $p 2>/dev/null || ((NDEAD+=1)) + done + kill -0 $SERVER_PID || ((NDEAD+=1)) + if ((NDEAD>0)) + then + # we may be in the middle of an orderly termination, + # give it some time to complete before reporting abort + sleep 60 + echo -n "(I) (some) gem5 process(es) exited" + abort_func + fi + done +} + +# This function launches the gem5 processes. We use it only to allow launching +# gem5 processes under gdb control (in the foreground) for debugging +start_func () +{ + local N=$1 + local HOST=$2 + local ENV_ARGS=$3 + shift 3 + if [ "x$GEM5_DEBUG" != "x" ] + then + gdb --args "$@" + else + ssh $HOST $ENV_ARGS "$@" &>log.$N & + fi +} + + +# Trigger full clean up in case we are being killed by external signal +trap 'abort_func' INT TERM + +# env args to be passed explicitly to gem5 processes started via ssh +ENV_ARGS="LD_LIBRARY_PATH=$LD_LIBRARY_PATH M5_PATH=$M5_PATH" + +# launch the mesage server and check if it has started okay +$TCP_SERVER $GEM5_DEBUG $NNODES $SERVER_PORT &>log.server & +SERVER_PID=$! +sleep 2 +kill -0 $SERVER_PID || { echo "Failed to start message server"; exit -1; } + +# Now launch all the gem5 processes with ssh. +echo "START $(date)" +n=0 +for ((i=0; i < ${#HOSTS[@]}; i++)) +do + h=${HOSTS[$i]} + for ((j=0; j < ${NCORES[i]}; j++)) + do + echo "starting gem5 on $h ..." + start_func $n $h "$ENV_ARGS" $GEM5_EXE -d $(pwd)/m5out.$n $GEM5_ARGS \ + --multi \ + --multi-rank=$n \ + --multi-server-name=${HOSTS[0]} \ + --multi-server-port=$SERVER_PORT \ + --testsys-toplevel-LinuxArmSystem.boot_osflags="\"GEM5_RANK=$n GEM5_SIZE=$NNODES\"" + SSH_PIDS[$n]=$! + ((n+=1)) + done +done + +[ "x$GEM5_DEBUG" == "x" ] || { kill $SERVER_PID; echo "DEBUG exit"; exit -1; } + +# start watchdog to trigger complete abort (after a grace period) if any +# gem5 process dies +watchdog_func & +WATCHDOG_PID=$! + +# wait for exit statuses +((NFAIL=0)) +for p in ${SSH_PIDS[*]} +do + wait $p || ((NFAIL+=1)) +done +wait $SERVER_PID || ((NFAIL+=1)) + +# all done, let's terminate the watchdog +kill $WATCHDOG_PID 2>/dev/null + +if ((NFAIL==0)) +then + echo "EXIT $(date)" +else + echo "ABORT $(date)" +fi diff --git a/util/multi/tcp_server.cc b/util/multi/tcp_server.cc new file mode 100644 index 000000000..1ec4303d3 --- /dev/null +++ b/util/multi/tcp_server.cc @@ -0,0 +1,463 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Copyright (c) 2008 The Regents of The University of Michigan + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + + +/* @file + * Message server implementation using TCP stream sockets for parallel gem5 + * runs. + */ +#include +#include +#include +#include + +#include +#include + +#include "tcp_server.hh" + +using namespace std; + +// Some basic macros for information and error reporting. +#define PRINTF(...) fprintf(stderr, __VA_ARGS__) + +#ifdef DEBUG +static bool debugSetup = true; +static bool debugPeriodic = false; +static bool debugSync = true; +static bool debugPkt = false; +#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__) +#else +#define DPRINTF(v,...) +#endif + +#define inform(...) do { PRINTF("info: "); \ + PRINTF(__VA_ARGS__); } while(0) + +#define panic(...) do { PRINTF("panic: "); \ + PRINTF(__VA_ARGS__); \ + PRINTF("\n[%s:%s], line %d\n", \ + __FUNCTION__, __FILE__, __LINE__); \ + exit(-1); } while(0) + +TCPServer *TCPServer::instance = nullptr; + +TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle) +{ + MultiHeaderPkt::clearAddress(address); +} + +unsigned +TCPServer::Channel::recvRaw(void *buf, unsigned size) const +{ + ssize_t n; + // This is a blocking receive. + n = recv(fd, buf, size, MSG_WAITALL); + + if (n < 0) + panic("read() failed:%s", strerror(errno)); + else if (n > 0 && n < size) + // the recv() call should wait for the full message + panic("read() failed"); + + return n; +} + +void +TCPServer::Channel::sendRaw(const void *buf, unsigned size) const +{ + ssize_t n; + n = send(fd, buf, size, MSG_NOSIGNAL); + if (n < 0) + panic("write() failed:%s", strerror(errno)); + else if (n != size) + panic("write() failed"); +} + +void TCPServer::Channel::updateAddress(const AddressType &new_address) +{ + // check if the known address has changed (e.g. the client reconfigured + // its Ethernet NIC) + if (MultiHeaderPkt::isAddressEqual(address, new_address)) + return; + + // So we have to update the address. Note that we always + // store the same address as key in the map but the ordering + // may change so we need to erase and re-insert it again. + auto info = TCPServer::instance->addressMap.find(&address); + if (info != TCPServer::instance->addressMap.end()) { + TCPServer::instance->addressMap.erase(info); + } + + MultiHeaderPkt::copyAddress(address, new_address); + TCPServer::instance->addressMap[&address] = this; +} + +void +TCPServer::Channel::headerPktIn() +{ + ssize_t n; + Header hdr_pkt; + + n = recvRaw(&hdr_pkt, sizeof(hdr_pkt)); + + if (n == 0) { + // EOF - nothing to do here, we will handle this as a POLLRDHUP event + // in the main loop. + return; + } + + if (hdr_pkt.msgType == MsgType::dataDescriptor) { + updateAddress(hdr_pkt.srcAddress); + TCPServer::instance->xferData(hdr_pkt, *this); + } else { + processCmd(hdr_pkt.msgType, hdr_pkt.sendTick); + } +} + +void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick) +{ + switch (cmd) { + case MsgType::cmdAtomicSyncReq: + DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank); + assert(state == SyncState::idle); + state = SyncState::atomic; + TCPServer::instance->syncTryComplete(SyncState::atomic, + MsgType::cmdAtomicSyncAck); + break; + case MsgType::cmdPeriodicSyncReq: + DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick); + // sanity check + if (TCPServer::instance->periodicSyncTick() == 0) { + TCPServer::instance->periodicSyncTick(send_tick); + } else if ( TCPServer::instance->periodicSyncTick() != send_tick) { + panic("Out of order periodic sync request - rank:%d " + "(send_tick:%ld ongoing:%ld)", rank, send_tick, + TCPServer::instance->periodicSyncTick()); + } + switch (state) { + case SyncState::idle: + state = SyncState::periodic; + TCPServer::instance->syncTryComplete(SyncState::periodic, + MsgType::cmdPeriodicSyncAck); + break; + case SyncState::asyncCkpt: + // An async ckpt request has already been sent to this client and + // that will interrupt this periodic sync. We can simply drop this + // message. + break; + default: + panic("Unexpected state for periodic sync request (rank:%d)", + rank); + break; + } + break; + case MsgType::cmdCkptSyncReq: + DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank); + switch (state) { + case SyncState::idle: + TCPServer::instance->ckptPropagate(*this); + // we fall through here to complete #clients==1 case + case SyncState::asyncCkpt: + state = SyncState::ckpt; + TCPServer::instance->syncTryComplete(SyncState::ckpt, + MsgType::cmdCkptSyncAck); + break; + default: + panic("Unexpected state for ckpt sync request (rank:%d)", rank); + break; + } + break; + default: + panic("Unexpected header packet (rank:%d)",rank); + break; + } +} + +TCPServer::TCPServer(unsigned clients_num, + unsigned listen_port, + int timeout_in_sec) +{ + assert(instance == nullptr); + construct(clients_num, listen_port, timeout_in_sec); + instance = this; +} + +TCPServer::~TCPServer() +{ + for (auto &c : clientsPollFd) + close(c.fd); +} + +void +TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec) +{ + int listen_sock, new_sock, ret; + unsigned client_len; + struct sockaddr_in server_addr, client_addr; + struct pollfd new_pollfd; + Channel new_channel; + + DPRINTF(debugSetup, "Start listening on port %u ...\n", port); + + listen_sock = socket(AF_INET, SOCK_STREAM, 0); + if (listen_sock < 0) + panic("socket() failed:%s", strerror(errno)); + + bzero(&server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(port); + if (bind(listen_sock, (struct sockaddr *) &server_addr, + sizeof(server_addr)) < 0) + panic("bind() failed:%s", strerror(errno)); + listen(listen_sock, 10); + + clientsPollFd.reserve(clients_num); + clientsChannel.reserve(clients_num); + + new_pollfd.events = POLLIN | POLLRDHUP; + new_pollfd.revents = 0; + while (clientsPollFd.size() < clients_num) { + new_pollfd.fd = listen_sock; + ret = poll(&new_pollfd, 1, timeout_in_sec*1000); + if (ret == 0) + panic("Timeout while waiting for clients to connect"); + assert(ret == 1 && new_pollfd.revents == POLLIN); + client_len = sizeof(client_addr); + new_sock = accept(listen_sock, + (struct sockaddr *) &client_addr, + &client_len); + if (new_sock < 0) + panic("accept() failed:%s", strerror(errno)); + new_pollfd.fd = new_sock; + new_pollfd.revents = 0; + clientsPollFd.push_back(new_pollfd); + new_channel.fd = new_sock; + new_channel.isAlive = true; + new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank)); + clientsChannel.push_back(new_channel); + + DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n", + client_addr.sin_addr.s_addr, client_addr.sin_port, + new_channel.rank); + } + ret = close(listen_sock); + assert(ret == 0); + + DPRINTF(debugSetup, "Setup complete\n"); +} + +void +TCPServer::run() +{ + int nfd; + unsigned num_active_clients = clientsPollFd.size(); + + DPRINTF(debugSetup, "Entering run() loop\n"); + while (num_active_clients == clientsPollFd.size()) { + nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1); + if (nfd == -1) + panic("poll() failed:%s", strerror(errno)); + + for (unsigned i = 0, n = 0; + i < clientsPollFd.size() && (signed)n < nfd; + i++) { + struct pollfd &pfd = clientsPollFd[i]; + if (pfd.revents) { + if (pfd.revents & POLLERR) + panic("poll() returned POLLERR"); + if (pfd.revents & POLLIN) { + clientsChannel[i].headerPktIn(); + } + if (pfd.revents & POLLRDHUP) { + // One gem5 process exited or aborted. Either way, we + // assume the full simulation should stop now (either + // because m5 exit was called or a serious error + // occurred.) So we quit the run loop here and close all + // sockets to notify the remaining peer gem5 processes. + pfd.events = 0; + clientsChannel[i].isAlive = false; + num_active_clients--; + DPRINTF(debugSetup, "POLLRDHUP event"); + } + n++; + if ((signed)n == nfd) + break; + } + } + } + DPRINTF(debugSetup, "Exiting run() loop\n"); +} + +void +TCPServer::xferData(const Header &hdr_pkt, const Channel &src) +{ + unsigned n; + assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer)); + n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength); + + if (n == 0) + panic("recvRaw() failed"); + DPRINTF(debugPkt, "Incoming data packet (from rank %d) " + "src:0x%02x%02x%02x%02x%02x%02x " + "dst:0x%02x%02x%02x%02x%02x%02x\n", + src.rank, + hdr_pkt.srcAddress[0], + hdr_pkt.srcAddress[1], + hdr_pkt.srcAddress[2], + hdr_pkt.srcAddress[3], + hdr_pkt.srcAddress[4], + hdr_pkt.srcAddress[5], + hdr_pkt.dstAddress[0], + hdr_pkt.dstAddress[1], + hdr_pkt.dstAddress[2], + hdr_pkt.dstAddress[3], + hdr_pkt.dstAddress[4], + hdr_pkt.dstAddress[5]); + // Now try to figure out the destination client(s). + auto dst_info = addressMap.find(&hdr_pkt.dstAddress); + + // First handle the multicast/broadcast or unknonw destination case. These + // all trigger a broadcast of the packet to all clients. + if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false || + dst_info == addressMap.end()) { + unsigned n = 0; + for (auto const &c: clientsChannel) { + if (c.isAlive && &c!=&src) { + c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength); + n++; + } + } + if (n == 0) { + inform("Broadcast/multicast packet dropped\n"); + } + } else { + // It is a unicast address with a known destination + Channel *dst = dst_info->second; + if (dst->isAlive) { + dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength); + DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank); + } else { + inform("Unicast packet dropped (destination exited)\n"); + } + } +} + +void +TCPServer::syncTryComplete(SyncState st, MsgType ack) +{ + // Check if the barrieris complete. If so then notify all the clients. + for (auto &c : clientsChannel) { + if (c.isAlive && (c.state != st)) { + // sync not complete yet, stop here + return; + } + } + // Sync complete, send out the acks + MultiHeaderPkt::Header hdr_pkt; + hdr_pkt.msgType = ack; + for (auto &c : clientsChannel) { + if (c.isAlive) { + c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + c.state = SyncState::idle; + } + } + // Reset periodic send tick + _periodicSyncTick = 0; + DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync, + "Sync COMPLETE\n"); +} + +void +TCPServer::ckptPropagate(Channel &ch) +{ + // Channel ch got a ckpt request that needs to be propagated to the other + // clients + MultiHeaderPkt::Header hdr_pkt; + hdr_pkt.msgType = MsgType::cmdCkptSyncReq; + for (auto &c : clientsChannel) { + if (c.isAlive && (&c != &ch)) { + switch (c.state) { + case SyncState::idle: + case SyncState::periodic: + c.sendRaw(&hdr_pkt, sizeof(hdr_pkt)); + c.state = SyncState::asyncCkpt; + break; + default: + panic("Unexpected state for ckpt sync request propagation " + "(rank:%d)\n",c.rank); + break; + } + } + } +} + +int main(int argc, char *argv[]) +{ + TCPServer *server; + int clients_num = -1, listen_port = -1; + int first_arg = 1, timeout_in_sec = 60; + + if (argc > 1 && string(argv[1]).compare("-debug") == 0) { + timeout_in_sec = -1; + first_arg++; + argc--; + } + + if (argc != 3) + panic("We need two command line args (number of clients and tcp listen" + " port"); + + clients_num = atoi(argv[first_arg]); + listen_port = atoi(argv[first_arg + 1]); + + server = new TCPServer(clients_num, listen_port, timeout_in_sec); + + server->run(); + + delete server; + + return 0; +} diff --git a/util/multi/tcp_server.hh b/util/multi/tcp_server.hh new file mode 100644 index 000000000..c5f2a9ce8 --- /dev/null +++ b/util/multi/tcp_server.hh @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2015 ARM Limited + * All rights reserved + * + * The license below extends only to copyright in the software and shall + * not be construed as granting a license to any other intellectual + * property including but not limited to intellectual property relating + * to a hardware implementation of the functionality of the software + * licensed hereunder. You may use the software subject to the license + * terms below provided that you ensure that this notice is replicated + * unmodified and in its entirety in all distributions of the software, + * modified or unmodified, in source code or in binary form. + * + * Copyright (c) 2008 The Regents of The University of Michigan + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer; + * redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution; + * neither the name of the copyright holders nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Authors: Gabor Dozsa + */ + +/* @file + * Message server using TCP stream sockets for parallel gem5 runs. + * + * For a high level description about multi gem5 see comments in + * header files src/dev/multi_iface.hh and src/dev/tcp_iface.hh. + * + * This file implements the central message server process for multi gem5. + * The server is responsible the following tasks. + * 1. Establishing a TCP socket connection for each gem5 process (clients). + * + * 2. Process data messages coming in from clients. The server checks + * the MAC addresses in the header message and transfers the message + * to the target(s) client(s). + * + * 3. Processing synchronisation related control messages. Synchronisation + * is performed as follows. The server waits for a 'barrier enter' message + * from all the clients. When the last such control message arrives, the + * server sends out a 'barrier leave' control message to all the clients. + * + * 4. Triggers complete termination in case a client exits. A client may + * exit either by calling 'm5 exit' pseudo instruction or due to a fatal + * error. In either case, we assume that the entire multi simulation needs to + * terminate. The server triggers full termination by tearing down the + * open TCP sockets. + * + * The TCPServer class is instantiated as a singleton object. + * + * The server can be built independently from the rest of gem5 (and it is + * architecture agnostic). See the Makefile in the same directory. + * + */ + +#include + +#include +#include + +#include "dev/etherpkt.hh" +#include "dev/multi_packet.hh" + +/** + * The maximum length of an Ethernet packet (allowing Jumbo frames). + */ +#define MAX_ETH_PACKET_LENGTH 9014 + +class TCPServer +{ + public: + typedef MultiHeaderPkt::AddressType AddressType; + typedef MultiHeaderPkt::Header Header; + typedef MultiHeaderPkt::MsgType MsgType; + + private: + + enum + class SyncState { periodic, ckpt, asyncCkpt, atomic, idle }; + /** + * The Channel class encapsulates all the information about a client + * and its current status. + */ + class Channel + { + private: + /** + * The MAC address of the client. + */ + AddressType address; + + /** + * Update the client MAC address. It is called every time a new data + * packet is to come in. + */ + void updateAddress(const AddressType &new_addr); + /** + * Process an incoming command message. + */ + void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick); + + + public: + /** + * TCP stream socket. + */ + int fd; + /** + * Is client connected? + */ + bool isAlive; + /** + * Current state of the channel wrt. multi synchronisation. + */ + SyncState state; + /** + * Multi rank of the client + */ + unsigned rank; + + public: + Channel(); + ~Channel () {} + + + /** + * Receive and process the next incoming header packet. + */ + void headerPktIn(); + /** + * Send raw data to the connected client. + * + * @param data The data to send. + * @param size Size of the data (in bytes). + */ + void sendRaw(const void *data, unsigned size) const; + /** + * Receive raw data from the connected client. + * + * @param buf The buffer to store the incoming data into. + * @param size Size of data to receive (in bytes). + * @return In case of success, it returns size. Zero is returned + * if the socket is already closed by the client. + */ + unsigned recvRaw(void *buf, unsigned size) const; + }; + + /** + * The array of socket descriptors needed by the poll() system call. + */ + std::vector clientsPollFd; + /** + * Array holding all clients info. + */ + std::vector clientsChannel; + + + /** + * We use a map to select the target client based on the destination + * MAC address. + */ + struct AddressCompare + { + bool operator()(const AddressType *a1, const AddressType *a2) + { + return MultiHeaderPkt::isAddressLess(*a1, *a2); + } + }; + std::map addressMap; + + /** + * As we dealt with only one message at a time, we can allocate and re-use + * a single packet buffer (to hold any incoming data packet). + */ + uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH]; + /** + * Send tick of the current periodic sync. It is used for sanity check. + */ + Tick _periodicSyncTick; + /** + * The singleton server object. + */ + static TCPServer *instance; + + /** + * Set up the socket connections to all the clients. + * + * @param listen_port The port we are listening on for new client + * connection requests. + * @param nclients The number of clients to connect to. + * @param timeout Timeout in sec to complete the setup phase + * (i.e. all gem5 establish socket connections) + */ + void construct(unsigned listen_port, unsigned nclients, int timeout); + /** + * Transfer the header and the follow up data packet to the target(s) + * clients. + * + * @param hdr The header message structure. + * @param ch The source channel for the message. + */ + void xferData(const Header &hdr, const Channel &ch); + /** + * Check if the current round of a synchronisation is completed and notify + * the clients if it is so. + * + * @param st The state all channels should have if sync is complete. + * @param ack The type of ack message to send out if the sync is compete. + */ + void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack); + /** + * Broadcast a request for checkpoint sync. + * + * @param ch The source channel of the checkpoint sync request. + */ + void ckptPropagate(Channel &ch); + /** + * Setter for current periodic send tick. + */ + void periodicSyncTick(Tick t) { _periodicSyncTick = t; } + /** + * Getter for current periodic send tick. + */ + Tick periodicSyncTick() { return _periodicSyncTick; } + + public: + + TCPServer(unsigned clients_num, unsigned listen_port, int timeout_in_sec); + ~TCPServer(); + + /** + * The main server loop that waits for and processes incoming messages. + */ + void run(); +};