dev: Distributed Ethernet link for distributed gem5 simulations

Distributed gem5 (abbreviated dist-gem5) is the result of the
convergence effort between multi-gem5 and pd-gem5 (from Univ. of
Wisconsin). It relies on the base multi-gem5 infrastructure for packet
forwarding, synchronisation and checkpointing but combines those with
the elaborated network switch model from pd-gem5.

--HG--
rename : src/dev/net/multi_etherlink.cc => src/dev/net/dist_etherlink.cc
rename : src/dev/net/multi_etherlink.hh => src/dev/net/dist_etherlink.hh
rename : src/dev/net/multi_iface.cc => src/dev/net/dist_iface.cc
rename : src/dev/net/multi_iface.hh => src/dev/net/dist_iface.hh
rename : src/dev/net/multi_packet.hh => src/dev/net/dist_packet.hh
This commit is contained in:
Gabor Dozsa 2016-01-07 16:33:47 -06:00
parent e677494260
commit 5dec4e07b8
19 changed files with 1880 additions and 2227 deletions

View file

@ -58,19 +58,22 @@ class EtherLink(EtherObject):
speed = Param.NetworkBandwidth('1Gbps', "link speed")
dump = Param.EtherDump(NULL, "dump object")
class MultiEtherLink(EtherObject):
type = 'MultiEtherLink'
cxx_header = "dev/net/multi_etherlink.hh"
class DistEtherLink(EtherObject):
type = 'DistEtherLink'
cxx_header = "dev/net/dist_etherlink.hh"
int0 = SlavePort("interface 0")
delay = Param.Latency('0us', "packet transmit delay")
delay_var = Param.Latency('0ns', "packet transmit delay variability")
speed = Param.NetworkBandwidth('1Gbps', "link speed")
dump = Param.EtherDump(NULL, "dump object")
multi_rank = Param.UInt32('0', "Rank of the this gem5 process (multi run)")
sync_start = Param.Latency('5200000000000t', "first multi sync barrier")
sync_repeat = Param.Latency('10us', "multi sync barrier repeat")
dist_rank = Param.UInt32('0', "Rank of this gem5 process (dist run)")
dist_size = Param.UInt32('1', "Number of gem5 processes (dist run)")
sync_start = Param.Latency('5200000000000t', "first dist sync barrier")
sync_repeat = Param.Latency('10us', "dist sync barrier repeat")
server_name = Param.String('localhost', "Message server name")
server_port = Param.UInt32('2200', "Message server port")
is_switch = Param.Bool(False, "true if this a link in etherswitch")
num_nodes = Param.UInt32('2', "Number of simulate nodes")
class EtherBus(EtherObject):
type = 'EtherBus'

View file

@ -70,14 +70,14 @@ DebugFlag('EthernetIntr')
DebugFlag('EthernetPIO')
DebugFlag('EthernetSM')
# Multi gem5
Source('multi_packet.cc')
Source('multi_iface.cc')
Source('multi_etherlink.cc')
# Dist gem5
Source('dist_iface.cc')
Source('dist_etherlink.cc')
Source('tcp_iface.cc')
DebugFlag('MultiEthernet')
DebugFlag('MultiEthernetPkt')
DebugFlag('DistEthernet')
DebugFlag('DistEthernetPkt')
DebugFlag('DistEthernetCmd')
# Ethernet controllers
Source('i8254xGBe.cc')

View file

@ -38,10 +38,10 @@
*/
/* @file
* Device module for a full duplex ethernet link for multi gem5 simulations.
* Device module for a full duplex ethernet link for dist gem5 simulations.
*/
#include "dev/net/multi_etherlink.hh"
#include "dev/net/dist_etherlink.hh"
#include <arpa/inet.h>
#include <sys/socket.h>
@ -54,15 +54,15 @@
#include "base/random.hh"
#include "base/trace.hh"
#include "debug/DistEthernet.hh"
#include "debug/DistEthernetPkt.hh"
#include "debug/EthernetData.hh"
#include "debug/MultiEthernet.hh"
#include "debug/MultiEthernetPkt.hh"
#include "dev/net/dist_iface.hh"
#include "dev/net/etherdump.hh"
#include "dev/net/etherint.hh"
#include "dev/net/etherlink.hh"
#include "dev/net/etherobject.hh"
#include "dev/net/etherpkt.hh"
#include "dev/net/multi_iface.hh"
#include "dev/net/tcp_iface.hh"
#include "params/EtherLink.hh"
#include "sim/core.hh"
@ -71,33 +71,45 @@
using namespace std;
MultiEtherLink::MultiEtherLink(const Params *p)
: EtherObject(p)
DistEtherLink::DistEtherLink(const Params *p)
: EtherObject(p), linkDelay(p->delay)
{
DPRINTF(MultiEthernet,"MultiEtherLink::MultiEtherLink() "
"link delay:%llu\n", p->delay);
DPRINTF(DistEthernet,"DistEtherLink::DistEtherLink() "
"link delay:%llu ticksPerByte:%f\n", p->delay, p->speed);
txLink = new TxLink(name() + ".link0", this, p->speed, p->delay_var,
p->dump);
rxLink = new RxLink(name() + ".link1", this, p->delay, p->dump);
// create the multi (TCP) interface to talk to the peer gem5 processes.
multiIface = new TCPIface(p->server_name, p->server_port, p->multi_rank,
p->sync_start, p->sync_repeat, this);
localIface = new LocalIface(name() + ".int0", txLink, rxLink, multiIface);
Tick sync_repeat;
if (p->sync_repeat != 0) {
if (p->sync_repeat != p->delay)
warn("DistEtherLink(): sync_repeat is %lu and linkdelay is %lu",
p->sync_repeat, p->delay);
sync_repeat = p->sync_repeat;
} else {
sync_repeat = p->delay;
}
MultiEtherLink::~MultiEtherLink()
// create the dist (TCP) interface to talk to the peer gem5 processes.
distIface = new TCPIface(p->server_name, p->server_port,
p->dist_rank, p->dist_size,
p->sync_start, sync_repeat, this, p->is_switch,
p->num_nodes);
localIface = new LocalIface(name() + ".int0", txLink, rxLink, distIface);
}
DistEtherLink::~DistEtherLink()
{
delete txLink;
delete rxLink;
delete localIface;
delete multiIface;
delete distIface;
}
EtherInt*
MultiEtherLink::getEthPort(const std::string &if_name, int idx)
DistEtherLink::getEthPort(const std::string &if_name, int idx)
{
if (if_name != "int0") {
return nullptr;
@ -107,66 +119,55 @@ MultiEtherLink::getEthPort(const std::string &if_name, int idx)
return localIface;
}
void MultiEtherLink::memWriteback()
void
DistEtherLink::serialize(CheckpointOut &cp) const
{
DPRINTF(MultiEthernet,"MultiEtherLink::memWriteback() called\n");
multiIface->drainDone();
distIface->serializeSection(cp, "distIface");
txLink->serializeSection(cp, "txLink");
rxLink->serializeSection(cp, "rxLink");
}
void
MultiEtherLink::serialize(CheckpointOut &cp) const
DistEtherLink::unserialize(CheckpointIn &cp)
{
multiIface->serialize("multiIface", cp);
txLink->serialize("txLink", cp);
rxLink->serialize("rxLink", cp);
distIface->unserializeSection(cp, "distIface");
txLink->unserializeSection(cp, "txLink");
rxLink->unserializeSection(cp, "rxLink");
}
void
MultiEtherLink::unserialize(CheckpointIn &cp)
DistEtherLink::init()
{
multiIface->unserialize("multiIface", cp);
txLink->unserialize("txLink", cp);
rxLink->unserialize("rxLink", cp);
DPRINTF(DistEthernet,"DistEtherLink::init() called\n");
distIface->init(rxLink->doneEvent(), linkDelay);
}
void
MultiEtherLink::init()
DistEtherLink::startup()
{
DPRINTF(MultiEthernet,"MultiEtherLink::init() called\n");
multiIface->initRandom();
DPRINTF(DistEthernet,"DistEtherLink::startup() called\n");
distIface->startup();
}
void
MultiEtherLink::startup()
DistEtherLink::RxLink::setDistInt(DistIface *m)
{
DPRINTF(MultiEthernet,"MultiEtherLink::startup() called\n");
multiIface->startPeriodicSync();
assert(!distIface);
distIface = m;
}
void
MultiEtherLink::RxLink::setMultiInt(MultiIface *m)
{
assert(!multiIface);
multiIface = m;
// Spawn a new receiver thread that will process messages
// coming in from peer gem5 processes.
// The receive thread will also schedule a (receive) doneEvent
// for each incoming data packet.
multiIface->spawnRecvThread(&doneEvent, linkDelay);
}
void
MultiEtherLink::RxLink::rxDone()
DistEtherLink::RxLink::rxDone()
{
assert(!busy());
// retrieve the packet that triggered the receive done event
packet = multiIface->packetIn();
packet = distIface->packetIn();
if (dump)
dump->dump(packet);
DPRINTF(MultiEthernetPkt, "MultiEtherLink::MultiLink::rxDone() "
DPRINTF(DistEthernetPkt, "DistEtherLink::DistLink::rxDone() "
"packet received: len=%d\n", packet->length);
DDUMP(EthernetData, packet->data, packet->length);
@ -176,7 +177,7 @@ MultiEtherLink::RxLink::rxDone()
}
void
MultiEtherLink::TxLink::txDone()
DistEtherLink::TxLink::txDone()
{
if (dump)
dump->dump(packet);
@ -188,10 +189,10 @@ MultiEtherLink::TxLink::txDone()
}
bool
MultiEtherLink::TxLink::transmit(EthPacketPtr pkt)
DistEtherLink::TxLink::transmit(EthPacketPtr pkt)
{
if (busy()) {
DPRINTF(MultiEthernet, "packet not sent, link busy\n");
DPRINTF(DistEthernet, "packet not sent, link busy\n");
return false;
}
@ -201,8 +202,8 @@ MultiEtherLink::TxLink::transmit(EthPacketPtr pkt)
delay += random_mt.random<Tick>(0, delayVar);
// send the packet to the peers
assert(multiIface);
multiIface->packetOut(pkt, delay);
assert(distIface);
distIface->packetOut(pkt, delay);
// schedule the send done event
parent->schedule(doneEvent, curTick() + delay);
@ -211,56 +212,56 @@ MultiEtherLink::TxLink::transmit(EthPacketPtr pkt)
}
void
MultiEtherLink::Link::serialize(const string &base, CheckpointOut &cp) const
DistEtherLink::Link::serialize(CheckpointOut &cp) const
{
bool packet_exists = (packet != nullptr);
paramOut(cp, base + ".packet_exists", packet_exists);
SERIALIZE_SCALAR(packet_exists);
if (packet_exists)
packet->serialize(base + ".packet", cp);
packet->serialize("packet", cp);
bool event_scheduled = event->scheduled();
paramOut(cp, base + ".event_scheduled", event_scheduled);
SERIALIZE_SCALAR(event_scheduled);
if (event_scheduled) {
Tick event_time = event->when();
paramOut(cp, base + ".event_time", event_time);
SERIALIZE_SCALAR(event_time);
}
}
void
MultiEtherLink::Link::unserialize(const string &base, CheckpointIn &cp)
DistEtherLink::Link::unserialize(CheckpointIn &cp)
{
bool packet_exists;
paramIn(cp, base + ".packet_exists", packet_exists);
UNSERIALIZE_SCALAR(packet_exists);
if (packet_exists) {
packet = make_shared<EthPacketData>(16384);
packet->unserialize(base + ".packet", cp);
packet->unserialize("packet", cp);
}
bool event_scheduled;
paramIn(cp, base + ".event_scheduled", event_scheduled);
UNSERIALIZE_SCALAR(event_scheduled);
if (event_scheduled) {
Tick event_time;
paramIn(cp, base + ".event_time", event_time);
UNSERIALIZE_SCALAR(event_time);
parent->schedule(*event, event_time);
}
}
MultiEtherLink::LocalIface::LocalIface(const std::string &name,
DistEtherLink::LocalIface::LocalIface(const std::string &name,
TxLink *tx,
RxLink *rx,
MultiIface *m) :
DistIface *m) :
EtherInt(name), txLink(tx)
{
tx->setLocalInt(this);
rx->setLocalInt(this);
tx->setMultiInt(m);
rx->setMultiInt(m);
tx->setDistInt(m);
rx->setDistInt(m);
}
MultiEtherLink *
MultiEtherLinkParams::create()
DistEtherLink *
DistEtherLinkParams::create()
{
return new MultiEtherLink(this);
return new DistEtherLink(this);
}

View file

@ -38,30 +38,30 @@
*/
/* @file
* Device module for a full duplex ethernet link for multi gem5 simulations.
* Device module for a full duplex ethernet link for dist gem5 simulations.
*
* See comments in dev/multi_iface.hh for a generic description of multi
* See comments in dev/net/dist_iface.hh for a generic description of dist
* gem5 simulations.
*
* This class is meant to be a drop in replacement for the EtherLink class for
* multi gem5 runs.
* dist gem5 runs.
*
*/
#ifndef __DEV_NET_MULTIETHERLINK_HH__
#define __DEV_NET_MULTIETHERLINK_HH__
#ifndef __DEV_DIST_ETHERLINK_HH__
#define __DEV_DIST_ETHERLINK_HH__
#include <iostream>
#include "dev/net/etherlink.hh"
#include "params/MultiEtherLink.hh"
#include "params/DistEtherLink.hh"
class MultiIface;
class DistIface;
class EthPacketData;
/**
* Model for a fixed bandwidth full duplex ethernet link.
*/
class MultiEtherLink : public EtherObject
class DistEtherLink : public EtherObject
{
protected:
class LocalIface;
@ -72,22 +72,22 @@ class MultiEtherLink : public EtherObject
* The link will encapsulate and transfer Ethernet packets to/from
* the message server.
*/
class Link
class Link : public Serializable
{
protected:
std::string objName;
MultiEtherLink *parent;
DistEtherLink *parent;
LocalIface *localIface;
EtherDump *dump;
MultiIface *multiIface;
DistIface *distIface;
Event *event;
EthPacketPtr packet;
public:
Link(const std::string &name, MultiEtherLink *p,
Link(const std::string &name, DistEtherLink *p,
EtherDump *d, Event *e) :
objName(name), parent(p), localIface(nullptr), dump(d),
multiIface(nullptr), event(e) {}
distIface(nullptr), event(e) {}
~Link() {}
@ -95,8 +95,8 @@ class MultiEtherLink : public EtherObject
bool busy() const { return (bool)packet; }
void setLocalInt(LocalIface *i) { assert(!localIface); localIface=i; }
void serialize(const std::string &base, CheckpointOut &cp) const;
void unserialize(const std::string &base, CheckpointIn &cp);
void serialize(CheckpointOut &cp) const override;
void unserialize(CheckpointIn &cp) override;
};
/**
@ -123,17 +123,17 @@ class MultiEtherLink : public EtherObject
DoneEvent doneEvent;
public:
TxLink(const std::string &name, MultiEtherLink *p,
TxLink(const std::string &name, DistEtherLink *p,
double invBW, Tick delay_var, EtherDump *d) :
Link(name, p, d, &doneEvent), ticksPerByte(invBW),
delayVar(delay_var), doneEvent(this) {}
~TxLink() {}
/**
* Register the multi interface to be used to talk to the
* Register the dist interface to be used to talk to the
* peer gem5 processes.
*/
void setMultiInt(MultiIface *m) { assert(!multiIface); multiIface=m; }
void setDistInt(DistIface *m) { assert(!distIface); distIface=m; }
/**
* Initiate sending of a packet via this link.
@ -161,20 +161,24 @@ class MultiEtherLink : public EtherObject
void rxDone();
typedef EventWrapper<RxLink, &RxLink::rxDone> DoneEvent;
friend void DoneEvent::process();
DoneEvent doneEvent;
DoneEvent _doneEvent;
public:
RxLink(const std::string &name, MultiEtherLink *p,
RxLink(const std::string &name, DistEtherLink *p,
Tick delay, EtherDump *d) :
Link(name, p, d, &doneEvent),
linkDelay(delay), doneEvent(this) {}
Link(name, p, d, &_doneEvent),
linkDelay(delay), _doneEvent(this) {}
~RxLink() {}
/**
* Register our multi interface to talk to the peer gem5 processes.
* Register our dist interface to talk to the peer gem5 processes.
*/
void setMultiInt(MultiIface *m);
void setDistInt(DistIface *m);
/**
* Done events will be scheduled by DistIface (so we need the accessor)
*/
const DoneEvent *doneEvent() const { return &_doneEvent; }
};
/**
@ -187,7 +191,7 @@ class MultiEtherLink : public EtherObject
public:
LocalIface(const std::string &name, TxLink *tx, RxLink *rx,
MultiIface *m);
DistIface *m);
bool recvPacket(EthPacketPtr pkt) { return txLink->transmit(pkt); }
void sendDone() { peer->sendDone(); }
@ -199,7 +203,7 @@ class MultiEtherLink : public EtherObject
/**
* Interface to talk to the peer gem5 processes.
*/
MultiIface *multiIface;
DistIface *distIface;
/**
* Send link
*/
@ -210,10 +214,12 @@ class MultiEtherLink : public EtherObject
RxLink *rxLink;
LocalIface *localIface;
Tick linkDelay;
public:
typedef MultiEtherLinkParams Params;
MultiEtherLink(const Params *p);
~MultiEtherLink();
typedef DistEtherLinkParams Params;
DistEtherLink(const Params *p);
~DistEtherLink();
const Params *
params() const
@ -224,12 +230,11 @@ class MultiEtherLink : public EtherObject
virtual EtherInt *getEthPort(const std::string &if_name,
int idx) override;
void memWriteback() override;
void init() override;
void startup() override;
virtual void init() override;
virtual void startup() override;
void serialize(CheckpointOut &cp) const override;
void unserialize(CheckpointIn &cp) override;
};
#endif // __DEV_NET_MULTIETHERLINK_HH__
#endif // __DEV_DIST_ETHERLINK_HH__

808
src/dev/net/dist_iface.cc Normal file
View file

@ -0,0 +1,808 @@
/*
* Copyright (c) 2015 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
* not be construed as granting a license to any other intellectual
* property including but not limited to intellectual property relating
* to a hardware implementation of the functionality of the software
* licensed hereunder. You may use the software subject to the license
* terms below provided that you ensure that this notice is replicated
* unmodified and in its entirety in all distributions of the software,
* modified or unmodified, in source code or in binary form.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met: redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer;
* redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution;
* neither the name of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
*/
/* @file
* The interface class for dist-gem5 simulations.
*/
#include "dev/net/dist_iface.hh"
#include <queue>
#include <thread>
#include "base/random.hh"
#include "base/trace.hh"
#include "debug/DistEthernet.hh"
#include "debug/DistEthernetPkt.hh"
#include "dev/net/etherpkt.hh"
#include "sim/sim_exit.hh"
#include "sim/sim_object.hh"
using namespace std;
DistIface::Sync *DistIface::sync = nullptr;
DistIface::SyncEvent *DistIface::syncEvent = nullptr;
unsigned DistIface::distIfaceNum = 0;
unsigned DistIface::recvThreadsNum = 0;
DistIface *DistIface::master = nullptr;
void
DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
{
if (start_tick < firstAt) {
firstAt = start_tick;
inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
}
if (repeat_tick == 0)
panic("Dist synchronisation interval must be greater than zero");
if (repeat_tick < nextRepeat) {
nextRepeat = repeat_tick;
inform("Dist synchronisation interval is changed to %lu.\n",
nextRepeat);
}
}
DistIface::SyncSwitch::SyncSwitch(int num_nodes)
{
numNodes = num_nodes;
waitNum = num_nodes;
numExitReq = 0;
numCkptReq = 0;
doExit = false;
doCkpt = false;
firstAt = std::numeric_limits<Tick>::max();
nextAt = 0;
nextRepeat = std::numeric_limits<Tick>::max();
}
DistIface::SyncNode::SyncNode()
{
waitNum = 0;
needExit = ReqType::none;
needCkpt = ReqType::none;
doExit = false;
doCkpt = false;
firstAt = std::numeric_limits<Tick>::max();
nextAt = 0;
nextRepeat = std::numeric_limits<Tick>::max();
}
void
DistIface::SyncNode::run(bool same_tick)
{
std::unique_lock<std::mutex> sync_lock(lock);
Header header;
assert(waitNum == 0);
waitNum = DistIface::recvThreadsNum;
// initiate the global synchronisation
header.msgType = MsgType::cmdSyncReq;
header.sendTick = curTick();
header.syncRepeat = nextRepeat;
header.needCkpt = needCkpt;
if (needCkpt != ReqType::none)
needCkpt = ReqType::pending;
header.needExit = needExit;
if (needExit != ReqType::none)
needExit = ReqType::pending;
DistIface::master->sendCmd(header);
// now wait until all receiver threads complete the synchronisation
auto lf = [this]{ return waitNum == 0; };
cv.wait(sync_lock, lf);
// global synchronisation is done
assert(!same_tick || (nextAt == curTick()));
}
void
DistIface::SyncSwitch::run(bool same_tick)
{
std::unique_lock<std::mutex> sync_lock(lock);
Header header;
// Wait for the sync requests from the nodes
if (waitNum > 0) {
auto lf = [this]{ return waitNum == 0; };
cv.wait(sync_lock, lf);
}
assert(waitNum == 0);
assert(!same_tick || (nextAt == curTick()));
waitNum = numNodes;
// Complete the global synchronisation
header.msgType = MsgType::cmdSyncAck;
header.sendTick = nextAt;
header.syncRepeat = nextRepeat;
if (doCkpt || numCkptReq == numNodes) {
doCkpt = true;
header.needCkpt = ReqType::immediate;
numCkptReq = 0;
} else {
header.needCkpt = ReqType::none;
}
if (doExit || numExitReq == numNodes) {
doExit = true;
header.needExit = ReqType::immediate;
} else {
header.needExit = ReqType::none;
}
DistIface::master->sendCmd(header);
}
void
DistIface::SyncSwitch::progress(Tick send_tick,
Tick sync_repeat,
ReqType need_ckpt,
ReqType need_exit)
{
std::unique_lock<std::mutex> sync_lock(lock);
assert(waitNum > 0);
if (send_tick > nextAt)
nextAt = send_tick;
if (nextRepeat > sync_repeat)
nextRepeat = sync_repeat;
if (need_ckpt == ReqType::collective)
numCkptReq++;
else if (need_ckpt == ReqType::immediate)
doCkpt = true;
if (need_exit == ReqType::collective)
numExitReq++;
else if (need_exit == ReqType::immediate)
doExit = true;
waitNum--;
// Notify the simulation thread if the on-going sync is complete
if (waitNum == 0) {
sync_lock.unlock();
cv.notify_one();
}
}
void
DistIface::SyncNode::progress(Tick max_send_tick,
Tick next_repeat,
ReqType do_ckpt,
ReqType do_exit)
{
std::unique_lock<std::mutex> sync_lock(lock);
assert(waitNum > 0);
nextAt = max_send_tick;
nextRepeat = next_repeat;
doCkpt = (do_ckpt != ReqType::none);
doExit = (do_exit != ReqType::none);
waitNum--;
// Notify the simulation thread if the on-going sync is complete
if (waitNum == 0) {
sync_lock.unlock();
cv.notify_one();
}
}
void
DistIface::SyncNode::requestCkpt(ReqType req)
{
std::lock_guard<std::mutex> sync_lock(lock);
assert(req != ReqType::none);
if (needCkpt != ReqType::none)
warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
if (needCkpt == ReqType::none || req == ReqType::immediate)
needCkpt = req;
}
void
DistIface::SyncNode::requestExit(ReqType req)
{
std::lock_guard<std::mutex> sync_lock(lock);
assert(req != ReqType::none);
if (needExit != ReqType::none)
warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
if (needExit == ReqType::none || req == ReqType::immediate)
needExit = req;
}
void
DistIface::Sync::drainComplete()
{
if (doCkpt) {
// The first DistIface object called this right before writing the
// checkpoint. We need to drain the underlying physical network here.
// Note that other gem5 peers may enter this barrier at different
// ticks due to draining.
run(false);
// Only the "first" DistIface object has to perform the sync
doCkpt = false;
}
}
void
DistIface::SyncNode::serialize(CheckpointOut &cp) const
{
int need_exit = static_cast<int>(needExit);
SERIALIZE_SCALAR(need_exit);
}
void
DistIface::SyncNode::unserialize(CheckpointIn &cp)
{
int need_exit;
UNSERIALIZE_SCALAR(need_exit);
needExit = static_cast<ReqType>(need_exit);
}
void
DistIface::SyncSwitch::serialize(CheckpointOut &cp) const
{
SERIALIZE_SCALAR(numExitReq);
}
void
DistIface::SyncSwitch::unserialize(CheckpointIn &cp)
{
UNSERIALIZE_SCALAR(numExitReq);
}
void
DistIface::SyncEvent::start()
{
// Note that this may be called either from startup() or drainResume()
// At this point, all DistIface objects has already called Sync::init() so
// we have a local minimum of the start tick and repeat for the periodic
// sync.
Tick firstAt = DistIface::sync->firstAt;
repeat = DistIface::sync->nextRepeat;
// Do a global barrier to agree on a common repeat value (the smallest
// one from all participating nodes
DistIface::sync->run(curTick() == 0);
assert(!DistIface::sync->doCkpt);
assert(!DistIface::sync->doExit);
assert(DistIface::sync->nextAt >= curTick());
assert(DistIface::sync->nextRepeat <= repeat);
// if this is called at tick 0 then we use the config start param otherwise
// the maximum of the current tick of all participating nodes
if (curTick() == 0) {
assert(!scheduled());
assert(DistIface::sync->nextAt == 0);
schedule(firstAt);
} else {
if (scheduled())
reschedule(DistIface::sync->nextAt);
else
schedule(DistIface::sync->nextAt);
}
inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
DistIface::sync->nextRepeat);
}
void
DistIface::SyncEvent::process()
{
// We may not start a global periodic sync while draining before taking a
// checkpoint. This is due to the possibility that peer gem5 processes
// may not hit the same periodic sync before they complete draining and
// that would make this periodic sync clash with sync called from
// DistIface::serialize() by other gem5 processes.
// We would need a 'distributed drain' solution to eliminate this
// restriction.
// Note that if draining was not triggered by checkpointing then we are
// fine since no extra global sync will happen (i.e. all peer gem5 will
// hit this periodic sync eventually).
panic_if(_draining && DistIface::sync->doCkpt,
"Distributed sync is hit while draining");
/*
* Note that this is a global event so this process method will be called
* by only exactly one thread.
*/
/*
* We hold the eventq lock at this point but the receiver thread may
* need the lock to schedule new recv events while waiting for the
* dist sync to complete.
* Note that the other simulation threads also release their eventq
* locks while waiting for us due to the global event semantics.
*/
{
EventQueue::ScopedRelease sr(curEventQueue());
// we do a global sync here that is supposed to happen at the same
// tick in all gem5 peers
DistIface::sync->run(true);
// global sync completed
}
if (DistIface::sync->doCkpt)
exitSimLoop("checkpoint");
if (DistIface::sync->doExit)
exitSimLoop("exit request from gem5 peers");
// schedule the next periodic sync
repeat = DistIface::sync->nextRepeat;
schedule(curTick() + repeat);
}
void
DistIface::RecvScheduler::init(Event *recv_done, Tick link_delay)
{
// This is called from the receiver thread when it starts running. The new
// receiver thread shares the event queue with the simulation thread
// (associated with the simulated Ethernet link).
curEventQueue(eventManager->eventQueue());
recvDone = recv_done;
linkDelay = link_delay;
}
Tick
DistIface::RecvScheduler::calcReceiveTick(Tick send_tick,
Tick send_delay,
Tick prev_recv_tick)
{
Tick recv_tick = send_tick + send_delay + linkDelay;
// sanity check (we need atleast a send delay long window)
assert(recv_tick >= prev_recv_tick + send_delay);
panic_if(prev_recv_tick + send_delay > recv_tick,
"Receive window is smaller than send delay");
panic_if(recv_tick <= curTick(),
"Simulators out of sync - missed packet receive by %llu ticks"
"(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
"linkDelay: %lu )",
curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
linkDelay);
return recv_tick;
}
void
DistIface::RecvScheduler::resumeRecvTicks()
{
// Schedule pending packets asap in case link speed/delay changed when
// restoring from the checkpoint.
// This may be done during unserialize except that curTick() is unknown
// so we call this during drainResume().
// If we are not restoring from a checkppint then link latency could not
// change so we just return.
if (!ckptRestore)
return;
std::vector<Desc> v;
while (!descQueue.empty()) {
Desc d = descQueue.front();
descQueue.pop();
d.sendTick = curTick();
d.sendDelay = d.packet->size(); // assume 1 tick/byte max link speed
v.push_back(d);
}
for (auto &d : v)
descQueue.push(d);
if (recvDone->scheduled()) {
assert(!descQueue.empty());
eventManager->reschedule(recvDone, curTick());
} else {
assert(descQueue.empty() && v.empty());
}
ckptRestore = false;
}
void
DistIface::RecvScheduler::pushPacket(EthPacketPtr new_packet,
Tick send_tick,
Tick send_delay)
{
// Note : this is called from the receiver thread
curEventQueue()->lock();
Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
"send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
send_tick, send_delay, linkDelay, recv_tick);
// Every packet must be sent and arrive in the same quantum
assert(send_tick > master->syncEvent->when() -
master->syncEvent->repeat);
// No packet may be scheduled for receive in the arrival quantum
assert(send_tick + send_delay + linkDelay > master->syncEvent->when());
// Now we are about to schedule a recvDone event for the new data packet.
// We use the same recvDone object for all incoming data packets. Packet
// descriptors are saved in the ordered queue. The currently scheduled
// packet is always on the top of the queue.
// NOTE: we use the event queue lock to protect the receive desc queue,
// too, which is accessed both by the receiver thread and the simulation
// thread.
descQueue.emplace(new_packet, send_tick, send_delay);
if (descQueue.size() == 1) {
assert(!recvDone->scheduled());
eventManager->schedule(recvDone, recv_tick);
} else {
assert(recvDone->scheduled());
panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
"Out of order packet received (recv_tick: %lu top(): %lu\n",
recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
}
curEventQueue()->unlock();
}
EthPacketPtr
DistIface::RecvScheduler::popPacket()
{
// Note : this is called from the simulation thread when a receive done
// event is being processed for the link. We assume that the thread holds
// the event queue queue lock when this is called!
EthPacketPtr next_packet = descQueue.front().packet;
descQueue.pop();
if (descQueue.size() > 0) {
Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
descQueue.front().sendDelay,
curTick());
eventManager->schedule(recvDone, recv_tick);
}
prevRecvTick = curTick();
return next_packet;
}
void
DistIface::RecvScheduler::Desc::serialize(CheckpointOut &cp) const
{
SERIALIZE_SCALAR(sendTick);
SERIALIZE_SCALAR(sendDelay);
packet->serialize("rxPacket", cp);
}
void
DistIface::RecvScheduler::Desc::unserialize(CheckpointIn &cp)
{
UNSERIALIZE_SCALAR(sendTick);
UNSERIALIZE_SCALAR(sendDelay);
packet = std::make_shared<EthPacketData>(16384);
packet->unserialize("rxPacket", cp);
}
void
DistIface::RecvScheduler::serialize(CheckpointOut &cp) const
{
SERIALIZE_SCALAR(prevRecvTick);
// serialize the receive desc queue
std::queue<Desc> tmp_queue(descQueue);
unsigned n_desc_queue = descQueue.size();
assert(tmp_queue.size() == descQueue.size());
SERIALIZE_SCALAR(n_desc_queue);
for (int i = 0; i < n_desc_queue; i++) {
tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
tmp_queue.pop();
}
assert(tmp_queue.empty());
}
void
DistIface::RecvScheduler::unserialize(CheckpointIn &cp)
{
assert(descQueue.size() == 0);
assert(recvDone->scheduled() == false);
assert(ckptRestore == false);
UNSERIALIZE_SCALAR(prevRecvTick);
// unserialize the receive desc queue
unsigned n_desc_queue;
UNSERIALIZE_SCALAR(n_desc_queue);
for (int i = 0; i < n_desc_queue; i++) {
Desc recv_desc;
recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
descQueue.push(recv_desc);
}
ckptRestore = true;
}
DistIface::DistIface(unsigned dist_rank,
unsigned dist_size,
Tick sync_start,
Tick sync_repeat,
EventManager *em,
bool is_switch, int num_nodes) :
syncStart(sync_start), syncRepeat(sync_repeat),
recvThread(nullptr), recvScheduler(em),
rank(dist_rank), size(dist_size)
{
DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
isMaster = false;
if (master == nullptr) {
assert(sync == nullptr);
assert(syncEvent == nullptr);
if (is_switch)
sync = new SyncSwitch(num_nodes);
else
sync = new SyncNode();
syncEvent = new SyncEvent();
master = this;
isMaster = true;
}
distIfaceId = distIfaceNum;
distIfaceNum++;
}
DistIface::~DistIface()
{
assert(recvThread);
delete recvThread;
if (this == master) {
assert(syncEvent);
delete syncEvent;
assert(sync);
delete sync;
master = nullptr;
}
}
void
DistIface::packetOut(EthPacketPtr pkt, Tick send_delay)
{
Header header;
// Prepare a dist header packet for the Ethernet packet we want to
// send out.
header.msgType = MsgType::dataDescriptor;
header.sendTick = curTick();
header.sendDelay = send_delay;
header.dataPacketLength = pkt->size();
// Send out the packet and the meta info.
sendPacket(header, pkt);
DPRINTF(DistEthernetPkt,
"DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
pkt->size(), send_delay);
}
void
DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
{
EthPacketPtr new_packet;
DistHeaderPkt::Header header;
// Initialize receive scheduler parameters
recvScheduler.init(recv_done, link_delay);
// Main loop to wait for and process any incoming message.
for (;;) {
// recvHeader() blocks until the next dist header packet comes in.
if (!recvHeader(header)) {
// We lost connection to the peer gem5 processes most likely
// because one of them called m5 exit. So we stop here.
// Grab the eventq lock to stop the simulation thread
curEventQueue()->lock();
exit_message("info",
0,
"Message server closed connection, "
"simulation is exiting");
}
// We got a valid dist header packet, let's process it
if (header.msgType == MsgType::dataDescriptor) {
recvPacket(header, new_packet);
recvScheduler.pushPacket(new_packet,
header.sendTick,
header.sendDelay);
} else {
// everything else must be synchronisation related command
sync->progress(header.sendTick,
header.syncRepeat,
header.needCkpt,
header.needExit);
}
}
}
void
DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
{
assert(recvThread == nullptr);
recvThread = new std::thread(&DistIface::recvThreadFunc,
this,
const_cast<Event *>(recv_done),
link_delay);
recvThreadsNum++;
}
DrainState
DistIface::drain()
{
DPRINTF(DistEthernet,"DistIFace::drain() called\n");
// This can be called multiple times in the same drain cycle.
if (this == master)
syncEvent->draining(true);
return DrainState::Drained;
}
void
DistIface::drainResume() {
DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
if (this == master)
syncEvent->draining(false);
recvScheduler.resumeRecvTicks();
}
void
DistIface::serialize(CheckpointOut &cp) const
{
// Drain the dist interface before the checkpoint is taken. We cannot call
// this as part of the normal drain cycle because this dist sync has to be
// called exactly once after the system is fully drained.
sync->drainComplete();
unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
SERIALIZE_SCALAR(rank_orig);
SERIALIZE_SCALAR(dist_iface_id_orig);
recvScheduler.serializeSection(cp, "recvScheduler");
if (this == master) {
sync->serializeSection(cp, "Sync");
}
}
void
DistIface::unserialize(CheckpointIn &cp)
{
unsigned rank_orig, dist_iface_id_orig;
UNSERIALIZE_SCALAR(rank_orig);
UNSERIALIZE_SCALAR(dist_iface_id_orig);
panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
rank, rank_orig);
panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
"at resume (distIfaceId=%d, orig=%d)", distIfaceId,
dist_iface_id_orig);
recvScheduler.unserializeSection(cp, "recvScheduler");
if (this == master) {
sync->unserializeSection(cp, "Sync");
}
}
void
DistIface::init(const Event *done_event, Tick link_delay)
{
// Init hook for the underlaying message transport to setup/finalize
// communication channels
initTransport();
// Spawn a new receiver thread that will process messages
// coming in from peer gem5 processes.
// The receive thread will also schedule a (receive) doneEvent
// for each incoming data packet.
spawnRecvThread(done_event, link_delay);
// Adjust the periodic sync start and interval. Different DistIface
// might have different requirements. The singleton sync object
// will select the minimum values for both params.
assert(sync != nullptr);
sync->init(syncStart, syncRepeat);
// Initialize the seed for random generator to avoid the same sequence
// in all gem5 peer processes
assert(master != nullptr);
if (this == master)
random_mt.init(5489 * (rank+1) + 257);
}
void
DistIface::startup()
{
DPRINTF(DistEthernet, "DistIface::startup() started\n");
if (this == master)
syncEvent->start();
DPRINTF(DistEthernet, "DistIface::startup() done\n");
}
bool
DistIface::readyToCkpt(Tick delay, Tick period)
{
bool ret = true;
DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
"period:%lu\n", delay, period);
if (master) {
if (delay == 0) {
inform("m5 checkpoint called with zero delay => triggering collaborative "
"checkpoint\n");
sync->requestCkpt(ReqType::collective);
} else {
inform("m5 checkpoint called with non-zero delay => triggering immediate "
"checkpoint (at the next sync)\n");
sync->requestCkpt(ReqType::immediate);
}
if (period != 0)
inform("Non-zero period for m5_ckpt is ignored in "
"distributed gem5 runs\n");
ret = false;
}
return ret;
}
bool
DistIface::readyToExit(Tick delay)
{
bool ret = true;
DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
delay);
if (master) {
if (delay == 0) {
inform("m5 exit called with zero delay => triggering collaborative "
"exit\n");
sync->requestExit(ReqType::collective);
} else {
inform("m5 exit called with non-zero delay => triggering immediate "
"exit (at the next sync)\n");
sync->requestExit(ReqType::immediate);
}
ret = false;
}
return ret;
}
uint64_t
DistIface::rankParam()
{
uint64_t val;
if (master) {
val = master->rank;
} else {
warn("Dist-rank parameter is queried in single gem5 simulation.");
val = 0;
}
return val;
}
uint64_t
DistIface::sizeParam()
{
uint64_t val;
if (master) {
val = master->size;
} else {
warn("Dist-size parameter is queried in single gem5 simulation.");
val = 1;
}
return val;
}

595
src/dev/net/dist_iface.hh Normal file
View file

@ -0,0 +1,595 @@
/*
* Copyright (c) 2015 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
* not be construed as granting a license to any other intellectual
* property including but not limited to intellectual property relating
* to a hardware implementation of the functionality of the software
* licensed hereunder. You may use the software subject to the license
* terms below provided that you ensure that this notice is replicated
* unmodified and in its entirety in all distributions of the software,
* modified or unmodified, in source code or in binary form.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met: redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer;
* redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution;
* neither the name of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
*/
/* @file
* The interface class for dist gem5 simulations.
*
* dist-gem5 is an extension to gem5 to enable parallel simulation of a
* distributed system (e.g. simulation of a pool of machines
* connected by Ethernet links). A dist gem5 run consists of seperate gem5
* processes running in parallel. Each gem5 process executes
* the simulation of a component of the simulated distributed system.
* (An example component can be a dist-core board with an Ethernet NIC.)
* The DistIface class below provides services to transfer data and
* control messages among the gem5 processes. The main such services are
* as follows.
*
* 1. Send a data packet coming from a simulated Ethernet link. The packet
* will be transferred to (all) the target(s) gem5 processes. The send
* operation is always performed by the simulation thread, i.e. the gem5
* thread that is processing the event queue associated with the simulated
* Ethernet link.
*
* 2. Spawn a receiver thread to process messages coming in from the
* from other gem5 processes. Each simulated Ethernet link has its own
* associated receiver thread. The receiver thread saves the incoming packet
* and schedule an appropriate receive event in the event queue.
*
* 3. Schedule a global barrier event periodically to keep the gem5
* processes in sync.
* Periodic barrier event to keep peer gem5 processes in sync. The basic idea
* is that no gem5 process can go ahead further than the simulated link
* transmission delay to ensure that a corresponding receive event can always
* be scheduled for any message coming in from a peer gem5 process.
*
*
*
* This interface is an abstract class. It can work with various low level
* send/receive service implementations (e.g. TCP/IP, MPI,...). A TCP
* stream socket version is implemented in src/dev/net/tcp_iface.[hh,cc].
*/
#ifndef __DEV_DIST_IFACE_HH__
#define __DEV_DIST_IFACE_HH__
#include <array>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include "dev/net/dist_packet.hh"
#include "dev/net/etherpkt.hh"
#include "sim/core.hh"
#include "sim/drain.hh"
#include "sim/global_event.hh"
#include "sim/serialize.hh"
class EventManager;
/**
* The interface class to talk to peer gem5 processes.
*/
class DistIface : public Drainable, public Serializable
{
public:
typedef DistHeaderPkt::Header Header;
protected:
typedef DistHeaderPkt::MsgType MsgType;
typedef DistHeaderPkt::ReqType ReqType;
private:
class SyncEvent;
/** @class Sync
* This class implements global sync operations among gem5 peer processes.
*
* @note This class is used as a singleton object (shared by all DistIface
* objects).
*/
class Sync : public Serializable
{
protected:
/**
* The lock to protect access to the Sync object.
*/
std::mutex lock;
/**
* Condition variable for the simulation thread to wait on
* until all receiver threads completes the current global
* synchronisation.
*/
std::condition_variable cv;
/**
* Number of receiver threads that not yet completed the current global
* synchronisation.
*/
unsigned waitNum;
/**
* Flag is set if exit is permitted upon sync completion
*/
bool doExit;
/**
* Flag is set if taking a ckpt is permitted upon sync completion
*/
bool doCkpt;
/**
* The repeat value for the next periodic sync
*/
Tick nextRepeat;
/**
* Tick for the very first periodic sync
*/
Tick firstAt;
/**
* Tick for the next periodic sync (if the event is not scheduled yet)
*/
Tick nextAt;
friend class SyncEvent;
public:
/**
* Initialize periodic sync params.
*
* @param start Start tick for dist synchronisation
* @param repeat Frequency of dist synchronisation
*
*/
void init(Tick start, Tick repeat);
/**
* Core method to perform a full dist sync.
*/
virtual void run(bool same_tick) = 0;
/**
* Callback when the receiver thread gets a sync ack message.
*/
virtual void progress(Tick send_tick,
Tick next_repeat,
ReqType do_ckpt,
ReqType do_exit) = 0;
virtual void requestCkpt(ReqType req) = 0;
virtual void requestExit(ReqType req) = 0;
void drainComplete();
virtual void serialize(CheckpointOut &cp) const override = 0;
virtual void unserialize(CheckpointIn &cp) override = 0;
};
class SyncNode: public Sync
{
private:
/**
* Exit requested
*/
ReqType needExit;
/**
* Ckpt requested
*/
ReqType needCkpt;
public:
SyncNode();
~SyncNode() {}
void run(bool same_tick) override;
void progress(Tick max_req_tick,
Tick next_repeat,
ReqType do_ckpt,
ReqType do_exit) override;
void requestCkpt(ReqType req) override;
void requestExit(ReqType req) override;
void serialize(CheckpointOut &cp) const override;
void unserialize(CheckpointIn &cp) override;
};
class SyncSwitch: public Sync
{
private:
/**
* Counter for recording exit requests
*/
unsigned numExitReq;
/**
* Counter for recording ckpt requests
*/
unsigned numCkptReq;
/**
* Number of connected simulated nodes
*/
unsigned numNodes;
public:
SyncSwitch(int num_nodes);
~SyncSwitch() {}
void run(bool same_tick) override;
void progress(Tick max_req_tick,
Tick next_repeat,
ReqType do_ckpt,
ReqType do_exit) override;
void requestCkpt(ReqType) override {
panic("Switch requested checkpoint");
}
void requestExit(ReqType) override {
panic("Switch requested exit");
}
void serialize(CheckpointOut &cp) const override;
void unserialize(CheckpointIn &cp) override;
};
/**
* The global event to schedule periodic dist sync. It is used as a
* singleton object.
*
* The periodic synchronisation works as follows.
* 1. A SyncEvent is scheduled as a global event when startup() is
* called.
* 2. The process() method of the SyncEvent initiates a new barrier
* for each simulated Ethernet link.
* 3. Simulation thread(s) then waits until all receiver threads
* complete the ongoing barrier. The global sync event is done.
*/
class SyncEvent : public GlobalSyncEvent
{
private:
/**
* Flag to set when the system is draining
*/
bool _draining;
public:
/**
* Only the firstly instantiated DistIface object will
* call this constructor.
*/
SyncEvent() : GlobalSyncEvent(Sim_Exit_Pri, 0), _draining(false) {}
~SyncEvent() {}
/**
* Schedule the first periodic sync event.
*/
void start();
/**
* This is a global event so process() will only be called by
* exactly one simulation thread. (See further comments in the .cc
* file.)
*/
void process() override;
bool draining() const { return _draining; }
void draining(bool fl) { _draining = fl; }
};
/**
* Class to encapsulate information about data packets received.
* @note The main purpose of the class to take care of scheduling receive
* done events for the simulated network link and store incoming packets
* until they can be received by the simulated network link.
*/
class RecvScheduler : public Serializable
{
private:
/**
* Received packet descriptor. This information is used by the receive
* thread to schedule receive events and by the simulation thread to
* process those events.
*/
struct Desc : public Serializable
{
EthPacketPtr packet;
Tick sendTick;
Tick sendDelay;
Desc() : sendTick(0), sendDelay(0) {}
Desc(EthPacketPtr p, Tick s, Tick d) :
packet(p), sendTick(s), sendDelay(d) {}
Desc(const Desc &d) :
packet(d.packet), sendTick(d.sendTick), sendDelay(d.sendDelay) {}
void serialize(CheckpointOut &cp) const override;
void unserialize(CheckpointIn &cp) override;
};
/**
* The queue to store the receive descriptors.
*/
std::queue<Desc> descQueue;
/**
* The tick when the most recent receive event was processed.
*
* @note This information is necessary to simulate possible receiver
* link contention when calculating the receive tick for the next
* incoming data packet (see the calcReceiveTick() method)
*/
Tick prevRecvTick;
/**
* The receive done event for the simulated Ethernet link.
*
* @note This object is constructed by the simulated network link. We
* schedule this object for each incoming data packet.
*/
Event *recvDone;
/**
* The link delay in ticks for the simulated Ethernet link.
*
* @note This value is used for calculating the receive ticks for
* incoming data packets.
*/
Tick linkDelay;
/**
* The event manager associated with the simulated Ethernet link.
*
* @note It is used to access the event queue for scheduling receive
* done events for the link.
*/
EventManager *eventManager;
/**
* Calculate the tick to schedule the next receive done event.
*
* @param send_tick The tick the packet was sent.
* @param send_delay The simulated delay at the sender side.
* @param prev_recv_tick Tick when the last receive event was
* processed.
*
* @note This method tries to take into account possible receiver link
* contention and adjust receive tick for the incoming packets
* accordingly.
*/
Tick calcReceiveTick(Tick send_tick,
Tick send_delay,
Tick prev_recv_tick);
/**
* Flag to set if receive ticks for pending packets need to be
* recalculated due to changed link latencies at a resume
*/
bool ckptRestore;
public:
/**
* Scheduler for the incoming data packets.
*
* @param em The event manager associated with the simulated Ethernet
* link.
*/
RecvScheduler(EventManager *em) :
prevRecvTick(0), recvDone(nullptr), linkDelay(0),
eventManager(em), ckptRestore(false) {}
/**
* Initialize network link parameters.
*
* @note This method is called from the receiver thread (see
* recvThreadFunc()).
*/
void init(Event *recv_done, Tick link_delay);
/**
* Fetch the next packet that is to be received by the simulated network
* link.
*
* @note This method is called from the process() method of the receive
* done event associated with the network link.
*/
EthPacketPtr popPacket();
/**
* Push a newly arrived packet into the desc queue.
*/
void pushPacket(EthPacketPtr new_packet,
Tick send_tick,
Tick send_delay);
void serialize(CheckpointOut &cp) const override;
void unserialize(CheckpointIn &cp) override;
/**
* Adjust receive ticks for pending packets when restoring from a
* checkpoint
*
* @note Link speed and delay parameters may change at resume.
*/
void resumeRecvTicks();
};
/**
* Tick to schedule the first dist sync event.
* This is just as optimization : we do not need any dist sync
* event until the simulated NIC is brought up by the OS.
*/
Tick syncStart;
/**
* Frequency of dist sync events in ticks.
*/
Tick syncRepeat;
/**
* Receiver thread pointer.
* Each DistIface object must have exactly one receiver thread.
*/
std::thread *recvThread;
/**
* Meta information about data packets received.
*/
RecvScheduler recvScheduler;
protected:
/**
* The rank of this process among the gem5 peers.
*/
unsigned rank;
/**
* The number of gem5 processes comprising this dist simulation.
*/
unsigned size;
/**
* Number of DistIface objects (i.e. dist links in this gem5 process)
*/
static unsigned distIfaceNum;
/**
* Unique id for the dist link
*/
unsigned distIfaceId;
bool isMaster;
private:
/**
* Number of receiver threads (in this gem5 process)
*/
static unsigned recvThreadsNum;
/**
* The singleton Sync object to perform dist synchronisation.
*/
static Sync *sync;
/**
* The singleton SyncEvent object to schedule periodic dist sync.
*/
static SyncEvent *syncEvent;
/**
* The very first DistIface object created becomes the master. We need
* a master to co-ordinate the global synchronisation.
*/
static DistIface *master;
private:
/**
* Send out a data packet to the remote end.
* @param header Meta info about the packet (which needs to be transferred
* to the destination alongside the packet).
* @param packet Pointer to the packet to send.
*/
virtual void sendPacket(const Header &header, const EthPacketPtr &packet) = 0;
/**
* Send out a control command to the remote end.
* @param header Meta info describing the command (e.g. sync request)
*/
virtual void sendCmd(const Header &header) = 0;
/**
* Receive a header (i.e. meta info describing a data packet or a control command)
* from the remote end.
* @param header The meta info structure to store the incoming header.
*/
virtual bool recvHeader(Header &header) = 0;
/**
* Receive a packet from the remote end.
* @param header Meta info about the incoming packet (obtanied by a previous
* call to the recvHedaer() method).
* @param Pointer to packet received.
*/
virtual void recvPacket(const Header &header, EthPacketPtr &packet) = 0;
/**
* Init hook for the underlaying transport
*/
virtual void initTransport() = 0;
/**
* spawn the receiver thread.
* @param recv_done The receive done event associated with the simulated
* Ethernet link.
* @param link_delay The link delay for the simulated Ethernet link.
*/
void spawnRecvThread(const Event *recv_done, Tick link_delay);
/**
* The function executed by a receiver thread.
*/
void recvThreadFunc(Event *recv_done, Tick link_delay);
public:
/**
* ctor
* @param dist_rank Rank of this gem5 process within the dist run
* @param sync_start Start tick for dist synchronisation
* @param sync_repeat Frequency for dist synchronisation
* @param em The event manager associated with the simulated Ethernet link
*/
DistIface(unsigned dist_rank,
unsigned dist_size,
Tick sync_start,
Tick sync_repeat,
EventManager *em,
bool is_switch,
int num_nodes);
virtual ~DistIface();
/**
* Send out an Ethernet packet.
* @param pkt The Ethernet packet to send.
* @param send_delay The delay in ticks for the send completion event.
*/
void packetOut(EthPacketPtr pkt, Tick send_delay);
/**
* Fetch the packet scheduled to be received next by the simulated
* network link.
*
* @note This method is called within the process() method of the link
* receive done event. It also schedules the next receive event if the
* receive queue is not empty.
*/
EthPacketPtr packetIn() { return recvScheduler.popPacket(); }
DrainState drain() override;
void drainResume() override;
void init(const Event *e, Tick link_delay);
void startup();
void serialize(CheckpointOut &cp) const override;
void unserialize(CheckpointIn &cp) override;
/**
* Initiate the exit from the simulation.
* @param delay Delay param from the m5 exit command. If Delay is zero
* then a collaborative exit is requested (i.e. all nodes have to call
* this method before the distributed simulation can exit). If Delay is
* not zero then exit is requested asap (and it will happen at the next
* sync tick).
* @return False if we are in distributed mode (i.e. exit can happen only
* at sync), True otherwise.
*/
static bool readyToExit(Tick delay);
/**
* Initiate taking a checkpoint
* @param delay Delay param from the m5 checkpoint command. If Delay is
* zero then a collaborative checkpoint is requested (i.e. all nodes have
* to call this method before the checkpoint can be taken). If Delay is
* not zero then a checkpoint is requested asap (and it will happen at the
* next sync tick).
* @return False if we are in dist mode (i.e. exit can happen only at
* sync), True otherwise.
*/
static bool readyToCkpt(Tick delay, Tick period);
/**
* Getter for the dist rank param.
*/
static uint64_t rankParam();
/**
* Getter for the dist size param.
*/
static uint64_t sizeParam();
};
#endif

View file

@ -38,93 +38,69 @@
*/
/* @file
* Header packet class for multi gem5 runs.
* Header packet class for dist-gem5 runs.
*
* For a high level description about multi gem5 see comments in
* header file multi_iface.hh.
* For a high level description about dist-gem5 see comments in
* header file dist_iface.hh.
*
* The MultiHeaderPkt class defines the format of message headers
* sent among gem5 processes during a multi gem5 simulation. A header packet
* The DistHeaderPkt class defines the format of message headers
* sent among gem5 processes during a dist gem5 simulation. A header packet
* can either carry the description of data packet (i.e. a simulated Ethernet
* packet) or a synchronisation related control command. In case of
* data packet description, the corresponding data packet always follows
* the header packet back-to-back.
*/
#ifndef __DEV_NET_MULTI_PACKET_HH__
#define __DEV_NET_MULTI_PACKET_HH__
#ifndef __DEV_DIST_PACKET_HH__
#define __DEV_DIST_PACKET_HH__
#include <cstring>
#include "base/types.hh"
class MultiHeaderPkt
class DistHeaderPkt
{
private:
MultiHeaderPkt() {}
~MultiHeaderPkt() {}
DistHeaderPkt() {}
~DistHeaderPkt() {}
public:
enum class ReqType { immediate, collective, pending, none };
/**
* Simply type to help with calculating space requirements for
* the corresponding header field.
*/
typedef uint8_t AddressType[6];
/**
* The msg type defines what informarion a multi header packet carries.
* The msg type defines what information a dist header packet carries.
*/
enum class MsgType
{
dataDescriptor,
cmdPeriodicSyncReq,
cmdPeriodicSyncAck,
cmdCkptSyncReq,
cmdCkptSyncAck,
cmdAtomicSyncReq,
cmdAtomicSyncAck,
cmdSyncReq,
cmdSyncAck,
unknown
};
struct Header
{
/**
* The msg type field is valid for all header packets. In case of
* a synchronisation control command this is the only valid field.
* The msg type field is valid for all header packets.
*
* @note senderRank is used with data packets while collFlags are used
* by sync ack messages to trigger collective ckpt or exit events.
*/
MsgType msgType;
Tick sendTick;
union {
Tick sendDelay;
Tick syncRepeat;
};
union {
/**
* Actual length of the simulated Ethernet packet.
*/
unsigned dataPacketLength;
/**
* Source MAC address.
*/
AddressType srcAddress;
/**
* Destination MAC address.
*/
AddressType dstAddress;
struct {
ReqType needCkpt;
ReqType needExit;
};
};
};
};
static unsigned maxAddressLength();
/**
* Static functions for manipulating and comparing MAC addresses.
*/
static void clearAddress(AddressType &addr);
static bool isAddressEqual(const AddressType &addr1,
const AddressType &addr2);
static bool isAddressLess(const AddressType &addr1,
const AddressType &addr2);
static void copyAddress(AddressType &dest,
const AddressType &src);
static bool isUnicastAddress(const AddressType &addr);
static bool isMulticastAddress(const AddressType &addr);
static bool isBroadcastAddress(const AddressType &addr);
};
#endif // __DEV_NET_MULTI_PACKET_HH__
#endif

View file

@ -53,19 +53,3 @@ EthPacketData::unserialize(const string &base, CheckpointIn &cp)
arrayParamIn(cp, base + ".data", data, length);
}
void
EthPacketData::packAddress(uint8_t *src_addr,
uint8_t *dst_addr,
unsigned &nbytes)
{
Net::EthHdr *hdr = (Net::EthHdr *)data;
assert(hdr->src().size() == hdr->dst().size());
if (nbytes < hdr->src().size())
panic("EthPacketData::packAddress() Buffer overflow");
memcpy(dst_addr, hdr->dst().bytes(), hdr->dst().size());
memcpy(src_addr, hdr->src().bytes(), hdr->src().size());
nbytes = hdr->src().size();
}

View file

@ -71,18 +71,6 @@ class EthPacketData
~EthPacketData() { if (data) delete [] data; }
public:
/**
* This function pulls out the MAC source and destination addresses from
* the packet data and stores them in the caller specified buffers.
*
* @param src_addr The buffer to store the source MAC address.
* @param dst_addr The buffer to store the destination MAC address.
* @param length This is an inout parameter. The caller stores in this
* the size of the address buffers. On return, this will contain the
* actual address size stored in the buffers. (We assume that source
* address size is equal to that of the destination address.)
*/
void packAddress(uint8_t *src_addr, uint8_t *dst_addr, unsigned &length);
void serialize(const std::string &base, CheckpointOut &cp) const;
void unserialize(const std::string &base, CheckpointIn &cp);

View file

@ -1,622 +0,0 @@
/*
* Copyright (c) 2015 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
* not be construed as granting a license to any other intellectual
* property including but not limited to intellectual property relating
* to a hardware implementation of the functionality of the software
* licensed hereunder. You may use the software subject to the license
* terms below provided that you ensure that this notice is replicated
* unmodified and in its entirety in all distributions of the software,
* modified or unmodified, in source code or in binary form.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met: redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer;
* redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution;
* neither the name of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
*/
/* @file
* The interface class for multi gem5 simulations.
*/
#include "dev/net/multi_iface.hh"
#include <queue>
#include <thread>
#include "base/random.hh"
#include "base/trace.hh"
#include "debug/MultiEthernet.hh"
#include "debug/MultiEthernetPkt.hh"
#include "dev/net/etherpkt.hh"
#include "sim/sim_exit.hh"
#include "sim/sim_object.hh"
MultiIface::Sync *MultiIface::sync = nullptr;
MultiIface::SyncEvent *MultiIface::syncEvent = nullptr;
unsigned MultiIface::recvThreadsNum = 0;
MultiIface *MultiIface::master = nullptr;
bool
MultiIface::Sync::run(SyncTrigger t, Tick sync_tick)
{
std::unique_lock<std::mutex> sync_lock(lock);
trigger = t;
if (trigger != SyncTrigger::periodic) {
DPRINTF(MultiEthernet,"MultiIface::Sync::run() trigger:%d\n",
(unsigned)trigger);
}
switch (state) {
case SyncState::asyncCkpt:
switch (trigger) {
case SyncTrigger::ckpt:
assert(MultiIface::syncEvent->interrupted == false);
state = SyncState::busy;
break;
case SyncTrigger::periodic:
if (waitNum == 0) {
// So all recv threads got an async checkpoint request already
// and a simExit is scheduled at the end of the current tick
// (i.e. it is a periodic sync scheduled at the same tick as
// the simExit).
state = SyncState::idle;
DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted "
"due to async ckpt scheduled\n");
return false;
} else {
// we still need to wait for some receiver thread to get the
// aysnc ckpt request. We are going to proceed as 'interrupted'
// periodic sync.
state = SyncState::interrupted;
DPRINTF(MultiEthernet,"MultiIface::Sync::run() interrupted "
"due to ckpt request is coming in\n");
}
break;
case SyncTrigger::atomic:
assert(trigger != SyncTrigger::atomic);
}
break;
case SyncState::idle:
state = SyncState::busy;
break;
// Only one sync can be active at any time
case SyncState::interrupted:
case SyncState::busy:
assert(state != SyncState::interrupted);
assert(state != SyncState::busy);
break;
}
// Kick-off the sync unless we are in the middle of an interrupted
// periodic sync
if (state != SyncState::interrupted) {
assert(waitNum == 0);
waitNum = MultiIface::recvThreadsNum;
// initiate the global synchronisation
assert(MultiIface::master != nullptr);
MultiIface::master->syncRaw(triggerToMsg[(unsigned)trigger], sync_tick);
}
// now wait until all receiver threads complete the synchronisation
auto lf = [this]{ return waitNum == 0; };
cv.wait(sync_lock, lf);
// we are done
assert(state == SyncState::busy || state == SyncState::interrupted);
bool ret = (state != SyncState::interrupted);
state = SyncState::idle;
return ret;
}
void
MultiIface::Sync::progress(MsgType msg)
{
std::unique_lock<std::mutex> sync_lock(lock);
switch (msg) {
case MsgType::cmdAtomicSyncAck:
assert(state == SyncState::busy && trigger == SyncTrigger::atomic);
break;
case MsgType::cmdPeriodicSyncAck:
assert(state == SyncState::busy && trigger == SyncTrigger::periodic);
break;
case MsgType::cmdCkptSyncAck:
assert(state == SyncState::busy && trigger == SyncTrigger::ckpt);
break;
case MsgType::cmdCkptSyncReq:
switch (state) {
case SyncState::busy:
if (trigger == SyncTrigger::ckpt) {
// We are already in a checkpoint sync but got another ckpt
// sync request. This may happen if two (or more) peer gem5
// processes try to start a ckpt nearly at the same time.
// Incrementing waitNum here (before decrementing it below)
// effectively results in ignoring this new ckpt sync request.
waitNum++;
break;
}
assert (waitNum == recvThreadsNum);
state = SyncState::interrupted;
// we need to fall over here to handle "recvThreadsNum == 1" case
case SyncState::interrupted:
assert(trigger == SyncTrigger::periodic);
assert(waitNum >= 1);
if (waitNum == 1) {
exitSimLoop("checkpoint");
}
break;
case SyncState::idle:
// There is no on-going sync so we got an async ckpt request. If we
// are the only receiver thread then we need to schedule the
// checkpoint. Otherwise, only change the state to 'asyncCkpt' and
// let the last receiver thread to schedule the checkpoint at the
// 'asyncCkpt' case.
// Note that a periodic or resume sync may start later and that can
// trigger a state change to 'interrupted' (so the checkpoint may
// get scheduled at 'interrupted' case finally).
assert(waitNum == 0);
state = SyncState::asyncCkpt;
waitNum = MultiIface::recvThreadsNum;
// we need to fall over here to handle "recvThreadsNum == 1" case
case SyncState::asyncCkpt:
assert(waitNum >= 1);
if (waitNum == 1)
exitSimLoop("checkpoint");
break;
default:
panic("Unexpected state for checkpoint request message");
break;
}
break;
default:
panic("Unknown msg type");
break;
}
waitNum--;
assert(state != SyncState::idle);
// Notify the simultaion thread if there is an on-going sync.
if (state != SyncState::asyncCkpt) {
sync_lock.unlock();
cv.notify_one();
}
}
void MultiIface::SyncEvent::start(Tick start, Tick interval)
{
assert(!scheduled());
if (interval == 0)
panic("Multi synchronisation period must be greater than zero");
repeat = interval;
schedule(start);
}
void
MultiIface::SyncEvent::adjust(Tick start_tick, Tick repeat_tick)
{
// The new multi interface may require earlier start of the
// synchronisation.
assert(scheduled() == true);
if (start_tick < when())
reschedule(start_tick);
// The new multi interface may require more frequent synchronisation.
if (repeat == 0)
panic("Multi synchronisation period must be greater than zero");
if (repeat < repeat_tick)
repeat = repeat_tick;
}
void
MultiIface::SyncEvent::process()
{
/*
* Note that this is a global event so this process method will be called
* by only exactly one thread.
*/
// if we are draining the system then we must not start a periodic sync (as
// it is not sure that all peer gem5 will reach this tick before taking
// the checkpoint).
if (isDraining == true) {
assert(interrupted == false);
interrupted = true;
DPRINTF(MultiEthernet,"MultiIface::SyncEvent::process() interrupted "
"due to draining\n");
return;
}
if (interrupted == false)
scheduledAt = curTick();
/*
* We hold the eventq lock at this point but the receiver thread may
* need the lock to schedule new recv events while waiting for the
* multi sync to complete.
* Note that the other simulation threads also release their eventq
* locks while waiting for us due to the global event semantics.
*/
curEventQueue()->unlock();
// we do a global sync here
interrupted = !MultiIface::sync->run(SyncTrigger::periodic, scheduledAt);
// Global sync completed or got interrupted.
// we are expected to exit with the eventq lock held
curEventQueue()->lock();
// schedule the next global sync event if this one completed. Otherwise
// (i.e. this one was interrupted by a checkpoint request), we will
// reschedule this one after the draining is complete.
if (!interrupted)
schedule(scheduledAt + repeat);
}
void MultiIface::SyncEvent::resume()
{
Tick sync_tick;
assert(!scheduled());
if (interrupted) {
assert(curTick() >= scheduledAt);
// We have to complete the interrupted periodic sync asap.
// Note that this sync might be interrupted now again with a checkpoint
// request from a peer gem5...
sync_tick = curTick();
schedule(sync_tick);
} else {
// So we completed the last periodic sync, let's find out the tick for
// next one
assert(curTick() > scheduledAt);
sync_tick = scheduledAt + repeat;
if (sync_tick < curTick())
panic("Cannot resume periodic synchronisation");
schedule(sync_tick);
}
DPRINTF(MultiEthernet,
"MultiIface::SyncEvent periodic sync resumed at %lld "
"(curTick:%lld)\n", sync_tick, curTick());
}
void MultiIface::SyncEvent::serialize(const std::string &base,
CheckpointOut &cp) const
{
// Save the periodic multi sync schedule information
paramOut(cp, base + ".periodicSyncRepeat", repeat);
paramOut(cp, base + ".periodicSyncInterrupted", interrupted);
paramOut(cp, base + ".periodicSyncAt", scheduledAt);
}
void MultiIface::SyncEvent::unserialize(const std::string &base,
CheckpointIn &cp)
{
paramIn(cp, base + ".periodicSyncRepeat", repeat);
paramIn(cp, base + ".periodicSyncInterrupted", interrupted);
paramIn(cp, base + ".periodicSyncAt", scheduledAt);
}
MultiIface::MultiIface(unsigned multi_rank,
Tick sync_start,
Tick sync_repeat,
EventManager *em) :
syncStart(sync_start), syncRepeat(sync_repeat),
recvThread(nullptr), eventManager(em), recvDone(nullptr),
scheduledRecvPacket(nullptr), linkDelay(0), rank(multi_rank)
{
DPRINTF(MultiEthernet, "MultiIface() ctor rank:%d\n",multi_rank);
if (master == nullptr) {
assert(sync == nullptr);
assert(syncEvent == nullptr);
sync = new Sync();
syncEvent = new SyncEvent();
master = this;
}
}
MultiIface::~MultiIface()
{
assert(recvThread);
delete recvThread;
if (this == master) {
assert(syncEvent);
delete syncEvent;
assert(sync);
delete sync;
}
}
void
MultiIface::packetOut(EthPacketPtr pkt, Tick send_delay)
{
MultiHeaderPkt::Header header_pkt;
unsigned address_length = MultiHeaderPkt::maxAddressLength();
// Prepare a multi header packet for the Ethernet packet we want to
// send out.
header_pkt.msgType = MsgType::dataDescriptor;
header_pkt.sendTick = curTick();
header_pkt.sendDelay = send_delay;
// Store also the source and destination addresses.
pkt->packAddress(header_pkt.srcAddress, header_pkt.dstAddress,
address_length);
header_pkt.dataPacketLength = pkt->size();
// Send out the multi hedare packet followed by the Ethernet packet.
sendRaw(&header_pkt, sizeof(header_pkt), header_pkt.dstAddress);
sendRaw(pkt->data, pkt->size(), header_pkt.dstAddress);
DPRINTF(MultiEthernetPkt,
"MultiIface::sendDataPacket() done size:%d send_delay:%llu "
"src:0x%02x%02x%02x%02x%02x%02x "
"dst:0x%02x%02x%02x%02x%02x%02x\n",
pkt->size(), send_delay,
header_pkt.srcAddress[0], header_pkt.srcAddress[1],
header_pkt.srcAddress[2], header_pkt.srcAddress[3],
header_pkt.srcAddress[4], header_pkt.srcAddress[5],
header_pkt.dstAddress[0], header_pkt.dstAddress[1],
header_pkt.dstAddress[2], header_pkt.dstAddress[3],
header_pkt.dstAddress[4], header_pkt.dstAddress[5]);
}
bool
MultiIface::recvHeader(MultiHeaderPkt::Header &header_pkt)
{
// Blocking receive of an incoming multi header packet.
return recvRaw((void *)&header_pkt, sizeof(header_pkt));
}
void
MultiIface::recvData(const MultiHeaderPkt::Header &header_pkt)
{
// We are here beacuse a header packet has been received implying
// that an Ethernet (data) packet is coming in next.
assert(header_pkt.msgType == MsgType::dataDescriptor);
// Allocate storage for the incoming Ethernet packet.
EthPacketPtr new_packet(new EthPacketData(header_pkt.dataPacketLength));
// Now execute the blocking receive and store the incoming data directly
// in the new EthPacketData object.
if (! recvRaw((void *)(new_packet->data), header_pkt.dataPacketLength))
panic("Missing data packet");
new_packet->length = header_pkt.dataPacketLength;
// Grab the event queue lock to schedule a new receive event for the
// data packet.
curEventQueue()->lock();
// Compute the receive tick. It includes the send delay and the
// simulated link delay.
Tick recv_tick = header_pkt.sendTick + header_pkt.sendDelay + linkDelay;
DPRINTF(MultiEthernetPkt, "MultiIface::recvThread() packet receive, "
"send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
header_pkt.sendTick, header_pkt.sendDelay, linkDelay, recv_tick);
if (recv_tick <= curTick()) {
panic("Simulators out of sync - missed packet receive by %llu ticks",
curTick() - recv_tick);
}
// Now we are about to schedule a recvDone event for the new data packet.
// We use the same recvDone object for all incoming data packets. If
// that is already scheduled - i.e. a receive event for a previous
// data packet is already pending - then we have to check whether the
// receive tick for the new packet is earlier than that of the currently
// pending event. Packets may arrive out-of-order with respect to
// simulated receive time. If that is the case, we need to re-schedule the
// recvDone event for the new packet. Otherwise, we save the packet
// pointer and the recv tick for the new packet in the recvQueue. See
// the implementation of the packetIn() method for comments on how this
// information is retrieved from the recvQueue by the simulation thread.
if (!recvDone->scheduled()) {
assert(recvQueue.size() == 0);
assert(scheduledRecvPacket == nullptr);
scheduledRecvPacket = new_packet;
eventManager->schedule(recvDone, recv_tick);
} else if (recvDone->when() > recv_tick) {
recvQueue.emplace(scheduledRecvPacket, recvDone->when());
eventManager->reschedule(recvDone, recv_tick);
scheduledRecvPacket = new_packet;
} else {
recvQueue.emplace(new_packet, recv_tick);
}
curEventQueue()->unlock();
}
void
MultiIface::recvThreadFunc()
{
EthPacketPtr new_packet;
MultiHeaderPkt::Header header;
// The new receiver thread shares the event queue with the simulation
// thread (associated with the simulated Ethernet link).
curEventQueue(eventManager->eventQueue());
// Main loop to wait for and process any incoming message.
for (;;) {
// recvHeader() blocks until the next multi header packet comes in.
if (!recvHeader(header)) {
// We lost connection to the peer gem5 processes most likely
// because one of them called m5 exit. So we stop here.
exit_message("info", 0, "Message server closed connection, "
"simulation is exiting");
}
// We got a valid multi header packet, let's process it
if (header.msgType == MsgType::dataDescriptor) {
recvData(header);
} else {
// everything else must be synchronisation related command
sync->progress(header.msgType);
}
}
}
EthPacketPtr
MultiIface::packetIn()
{
// We are called within the process() method of the recvDone event. We
// return the packet that triggered the current receive event.
// If there is further packets in the recvQueue, we also have to schedule
// the recvEvent for the next packet with the smallest receive tick.
// The priority queue container ensures that smallest receive tick is
// always on the top of the queue.
assert(scheduledRecvPacket != nullptr);
EthPacketPtr next_packet = scheduledRecvPacket;
if (! recvQueue.empty()) {
eventManager->schedule(recvDone, recvQueue.top().second);
scheduledRecvPacket = recvQueue.top().first;
recvQueue.pop();
} else {
scheduledRecvPacket = nullptr;
}
return next_packet;
}
void
MultiIface::spawnRecvThread(Event *recv_done, Tick link_delay)
{
assert(recvThread == nullptr);
// all receive thread must be spawned before simulation starts
assert(eventManager->eventQueue()->getCurTick() == 0);
recvDone = recv_done;
linkDelay = link_delay;
recvThread = new std::thread(&MultiIface::recvThreadFunc, this);
recvThreadsNum++;
}
DrainState
MultiIface::drain()
{
DPRINTF(MultiEthernet,"MultiIFace::drain() called\n");
// This can be called multiple times in the same drain cycle.
if (master == this) {
syncEvent->isDraining = true;
}
return DrainState::Drained;
}
void MultiIface::drainDone() {
if (master == this) {
assert(syncEvent->isDraining == true);
syncEvent->isDraining = false;
// We need to resume the interrupted periodic sync here now that the
// draining is done. If the last periodic sync completed before the
// checkpoint then the next one is already scheduled.
if (syncEvent->interrupted)
syncEvent->resume();
}
}
void MultiIface::serialize(const std::string &base, CheckpointOut &cp) const
{
// Drain the multi interface before the checkpoint is taken. We cannot call
// this as part of the normal drain cycle because this multi sync has to be
// called exactly once after the system is fully drained.
// Note that every peer will take a checkpoint but they may take it at
// different ticks.
// This sync request may interrupt an on-going periodic sync in some peers.
sync->run(SyncTrigger::ckpt, curTick());
// Save the periodic multi sync status
syncEvent->serialize(base, cp);
unsigned n_rx_packets = recvQueue.size();
if (scheduledRecvPacket != nullptr)
n_rx_packets++;
paramOut(cp, base + ".nRxPackets", n_rx_packets);
if (n_rx_packets > 0) {
assert(recvDone->scheduled());
scheduledRecvPacket->serialize(base + ".rxPacket[0]", cp);
}
for (unsigned i=1; i < n_rx_packets; i++) {
const RecvInfo recv_info = recvQueue.impl().at(i-1);
recv_info.first->serialize(base + csprintf(".rxPacket[%d]", i), cp);
Tick rx_tick = recv_info.second;
paramOut(cp, base + csprintf(".rxTick[%d]", i), rx_tick);
}
}
void MultiIface::unserialize(const std::string &base, CheckpointIn &cp)
{
assert(recvQueue.size() == 0);
assert(scheduledRecvPacket == nullptr);
assert(recvDone->scheduled() == false);
// restore periodic sync info
syncEvent->unserialize(base, cp);
unsigned n_rx_packets;
paramIn(cp, base + ".nRxPackets", n_rx_packets);
if (n_rx_packets > 0) {
scheduledRecvPacket = std::make_shared<EthPacketData>(16384);
scheduledRecvPacket->unserialize(base + ".rxPacket[0]", cp);
// Note: receive event will be scheduled when the link is unserialized
}
for (unsigned i=1; i < n_rx_packets; i++) {
EthPacketPtr rx_packet = std::make_shared<EthPacketData>(16384);
rx_packet->unserialize(base + csprintf(".rxPacket[%d]", i), cp);
Tick rx_tick = 0;
paramIn(cp, base + csprintf(".rxTick[%d]", i), rx_tick);
assert(rx_tick > 0);
recvQueue.emplace(rx_packet,rx_tick);
}
}
void MultiIface::initRandom()
{
// Initialize the seed for random generator to avoid the same sequence
// in all gem5 peer processes
assert(master != nullptr);
if (this == master)
random_mt.init(5489 * (rank+1) + 257);
}
void MultiIface::startPeriodicSync()
{
DPRINTF(MultiEthernet, "MultiIface:::initPeriodicSync started\n");
// Do a global sync here to ensure that peer gem5 processes are around
// (actually this may not be needed...)
sync->run(SyncTrigger::atomic, curTick());
// Start the periodic sync if it is a fresh simulation from scratch
if (curTick() == 0) {
if (this == master) {
syncEvent->start(syncStart, syncRepeat);
inform("Multi synchronisation activated: start at %lld, "
"repeat at every %lld ticks.\n",
syncStart, syncRepeat);
} else {
// In case another multiIface object requires different schedule
// for periodic sync than the master does.
syncEvent->adjust(syncStart, syncRepeat);
}
} else {
// Schedule the next periodic sync if resuming from a checkpoint
if (this == master)
syncEvent->resume();
}
DPRINTF(MultiEthernet, "MultiIface::initPeriodicSync done\n");
}

View file

@ -1,492 +0,0 @@
/*
* Copyright (c) 2015 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
* not be construed as granting a license to any other intellectual
* property including but not limited to intellectual property relating
* to a hardware implementation of the functionality of the software
* licensed hereunder. You may use the software subject to the license
* terms below provided that you ensure that this notice is replicated
* unmodified and in its entirety in all distributions of the software,
* modified or unmodified, in source code or in binary form.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met: redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer;
* redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution;
* neither the name of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
*/
/* @file
* The interface class for multi gem5 simulations.
*
* Multi gem5 is an extension to gem5 to enable parallel simulation of a
* distributed system (e.g. simulation of a pool of machines
* connected by Ethernet links). A multi gem5 run consists of seperate gem5
* processes running in parallel. Each gem5 process executes
* the simulation of a component of the simulated distributed system.
* (An example component can be a multi-core board with an Ethernet NIC.)
* The MultiIface class below provides services to transfer data and
* control messages among the gem5 processes. The main such services are
* as follows.
*
* 1. Send a data packet coming from a simulated Ethernet link. The packet
* will be transferred to (all) the target(s) gem5 processes. The send
* operation is always performed by the simulation thread, i.e. the gem5
* thread that is processing the event queue associated with the simulated
* Ethernet link.
*
* 2. Spawn a receiver thread to process messages coming in from the
* from other gem5 processes. Each simulated Ethernet link has its own
* associated receiver thread. The receiver thread saves the incoming packet
* and schedule an appropriate receive event in the event queue.
*
* 3. Schedule a global barrier event periodically to keep the gem5
* processes in sync.
* Periodic barrier event to keep peer gem5 processes in sync. The basic idea
* is that no gem5 process can go ahead further than the simulated link
* transmission delay to ensure that a corresponding receive event can always
* be scheduled for any message coming in from a peer gem5 process.
*
*
*
* This interface is an abstract class (sendRaw() and recvRaw()
* methods are pure virtual). It can work with various low level
* send/receive service implementations (e.g. TCP/IP, MPI,...). A TCP
* stream socket version is implemented in dev/src/tcp_iface.[hh,cc].
*/
#ifndef __DEV_NET_MULTI_IFACE_HH__
#define __DEV_NET_MULTI_IFACE_HH__
#include <array>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include "dev/net/etherpkt.hh"
#include "dev/net/multi_packet.hh"
#include "sim/core.hh"
#include "sim/drain.hh"
#include "sim/global_event.hh"
class EventManager;
/**
* The interface class to talk to peer gem5 processes.
*/
class MultiIface : public Drainable
{
public:
/*!
* The possible reasons a multi sync among gem5 peers is needed for.
*/
enum
class SyncTrigger {
periodic, /*!< Regular periodic sync. This can be interrupted by a
checkpoint sync request */
ckpt, /*!< sync before taking a checkpoint */
atomic /*!< sync that cannot be interrupted (e.g. sync at startup) */
};
private:
typedef MultiHeaderPkt::MsgType MsgType;
/** Sync State-Machine
\dot
digraph Sync {
node [shape=box, fontsize=10];
idle -> busy
[ label="new trigger\n by run()" fontsize=8 ];
busy -> busy
[ label="new message by progress():\n(msg == SyncAck &&\nwaitNum > 1) || \n(msg==CkptSyncReq &&\ntrigger == ckpt)" fontsize=8 ];
busy -> idle
[ label="new message by progress():\n(msg == SyncAck &&\nwaitNum == 1)" fontsize=8 ];
busy -> interrupted
[ label="new message by progress():\n(msg == CkptSyncReq &&\ntrigger == periodic)" fontsize=8 ];
idle -> asyncCkpt
[ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ];
asyncCkpt -> asyncCkpt
[ label="new message by progress():\nmsg == CkptSyncReq" fontsize=8 ];
asyncCkpt -> busy
[ label="new trigger by run():\ntrigger == ckpt" fontsize=8 ];
asyncCkpt -> idle
[ label="new trigger by run():\n(trigger == periodic &&\nwaitNum == 0) " fontsize=8 ];
asyncCkpt -> interrupted
[ label="new trigger by run():\n(trigger == periodic &&\nwaitNum > 0) " fontsize=8 ];
interrupted -> interrupted
[ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum > 1)" fontsize=8 ];
interrupted -> idle
[ label="new message by progress():\n(msg == CkptSyncReq &&\nwaitNum == 1)" fontsize=8 ];
}
\enddot
*/
/** @class Sync
* This class implements global sync operations among gem5 peer processes.
*
* @note This class is used as a singleton object (shared by all MultiIface
* objects).
*/
class Sync
{
private:
/*!
* Internal state of the sync singleton object.
*/
enum class SyncState {
busy, /*!< There is an on-going sync. */
interrupted, /*!< An on-going periodic sync was interrupted. */
asyncCkpt, /*!< A checkpoint (sim_exit) is already scheduled */
idle /*!< There is no active sync. */
};
/**
* The lock to protect access to the MultiSync object.
*/
std::mutex lock;
/**
* Condition variable for the simulation thread to wait on
* until all receiver threads completes the current global
* synchronisation.
*/
std::condition_variable cv;
/**
* Number of receiver threads that not yet completed the current global
* synchronisation.
*/
unsigned waitNum;
/**
* The trigger for the most recent sync.
*/
SyncTrigger trigger;
/**
* Map sync triggers to request messages.
*/
std::array<MsgType, 3> triggerToMsg = {{
MsgType::cmdPeriodicSyncReq,
MsgType::cmdCkptSyncReq,
MsgType::cmdAtomicSyncReq
}};
/**
* Current sync state.
*/
SyncState state;
public:
/**
* Core method to perform a full multi sync.
*
* @param t Sync trigger.
* @param sync_tick The tick the sync was expected to happen at.
* @return true if the sync completed, false if it was interrupted.
*
* @note In case of an interrupted periodic sync, sync_tick can be less
* than curTick() when we resume (i.e. re-run) it
*/
bool run(SyncTrigger t, Tick sync_tick);
/**
* Callback when the receiver thread gets a sync message.
*/
void progress(MsgType m);
Sync() : waitNum(0), state(SyncState::idle) {}
~Sync() {}
};
/**
* The global event to schedule peridic multi sync. It is used as a
* singleton object.
*
* The periodic synchronisation works as follows.
* 1. A MultisyncEvent is scheduled as a global event when startup() is
* called.
* 2. The progress() method of the MultisyncEvent initiates a new barrier
* for each simulated Ethernet links.
* 3. Simulation thread(s) then waits until all receiver threads
* completes the ongoing barrier. The global sync event is done.
*/
class SyncEvent : public GlobalSyncEvent
{
public:
/**
* Flag to indicate that the most recent periodic sync was interrupted
* (by a checkpoint request).
*/
bool interrupted;
/**
* The tick when the most recent periodic synchronisation was scheduled
* at.
*/
Tick scheduledAt;
/**
* Flag to indicate an on-going drain cycle.
*/
bool isDraining;
public:
/**
* Only the firstly instanstiated MultiIface object will
* call this constructor.
*/
SyncEvent() : GlobalSyncEvent(Default_Pri, 0), interrupted(false),
scheduledAt(0), isDraining(false) {}
~SyncEvent() { assert (scheduled() == false); }
/**
* Schedule the first periodic sync event.
*
* @param start Start tick for multi synchronisation
* @param repeat Frequency of multi synchronisation
*
*/
void start(Tick start, Tick repeat);
/**
* Reschedule (if necessary) the periodic sync event.
*
* @param start Start tick for multi synchronisation
* @param repeat Frequency of multi synchronisation
*
* @note Useful if we have multiple MultiIface objects with
* different 'start' and 'repeat' values for global sync.
*/
void adjust(Tick start, Tick repeat);
/**
* This is a global event so process() will be called by each
* simulation threads. (See further comments in the .cc file.)
*/
void process() override;
/**
* Schedule periodic sync when resuming from a checkpoint.
*/
void resume();
void serialize(const std::string &base, CheckpointOut &cp) const;
void unserialize(const std::string &base, CheckpointIn &cp);
};
/**
* The receive thread needs to store the packet pointer and the computed
* receive tick for each incoming data packet. This information is used
* by the simulation thread when it processes the corresponding receive
* event. (See more comments at the implemetation of the recvThreadFunc()
* and RecvPacketIn() methods.)
*/
typedef std::pair<EthPacketPtr, Tick> RecvInfo;
/**
* Comparison predicate for RecvInfo, needed by the recvQueue.
*/
struct RecvInfoCompare {
bool operator()(const RecvInfo &lhs, const RecvInfo &rhs)
{
return lhs.second > rhs.second;
}
};
/**
* Customized priority queue used to store incoming data packets info by
* the receiver thread. We need to expose the underlying container to
* enable iterator access for serializing.
*/
class RecvQueue : public std::priority_queue<RecvInfo,
std::vector<RecvInfo>,
RecvInfoCompare>
{
public:
std::vector<RecvInfo> &impl() { return c; }
const std::vector<RecvInfo> &impl() const { return c; }
};
/*
* The priority queue to store RecvInfo items ordered by receive ticks.
*/
RecvQueue recvQueue;
/**
* The singleton Sync object to perform multi synchronisation.
*/
static Sync *sync;
/**
* The singleton SyncEvent object to schedule periodic multi sync.
*/
static SyncEvent *syncEvent;
/**
* Tick to schedule the first multi sync event.
* This is just as optimization : we do not need any multi sync
* event until the simulated NIC is brought up by the OS.
*/
Tick syncStart;
/**
* Frequency of multi sync events in ticks.
*/
Tick syncRepeat;
/**
* Receiver thread pointer.
* Each MultiIface object must have exactly one receiver thread.
*/
std::thread *recvThread;
/**
* The event manager associated with the MultiIface object.
*/
EventManager *eventManager;
/**
* The receive done event for the simulated Ethernet link.
* It is scheduled by the receiver thread for each incoming data
* packet.
*/
Event *recvDone;
/**
* The packet that belongs to the currently scheduled recvDone event.
*/
EthPacketPtr scheduledRecvPacket;
/**
* The link delay in ticks for the simulated Ethernet link.
*/
Tick linkDelay;
/**
* The rank of this process among the gem5 peers.
*/
unsigned rank;
/**
* Total number of receiver threads (in this gem5 process).
* During the simulation it should be constant and equal to the
* number of MultiIface objects (i.e. simulated Ethernet
* links).
*/
static unsigned recvThreadsNum;
/**
* The very first MultiIface object created becomes the master. We need
* a master to co-ordinate the global synchronisation.
*/
static MultiIface *master;
protected:
/**
* Low level generic send routine.
* @param buf buffer that holds the data to send out
* @param length number of bytes to send
* @param dest_addr address of the target (simulated NIC). This may be
* used by a subclass for optimization (e.g. optimize broadcast)
*/
virtual void sendRaw(void *buf,
unsigned length,
const MultiHeaderPkt::AddressType dest_addr) = 0;
/**
* Low level generic receive routine.
* @param buf the buffer to store the incoming message
* @param length buffer size (in bytes)
*/
virtual bool recvRaw(void *buf, unsigned length) = 0;
/**
* Low level request for synchronisation among gem5 processes. Only one
* MultiIface object needs to call this (in each gem5 process) to trigger
* a multi sync.
*
* @param sync_req Sync request command.
* @param sync_tick The tick when sync is expected to happen in the sender.
*/
virtual void syncRaw(MsgType sync_req, Tick sync_tick) = 0;
/**
* The function executed by a receiver thread.
*/
void recvThreadFunc();
/**
* Receive a multi header packet. Called by the receiver thread.
* @param header the structure to store the incoming header packet.
* @return false if any error occured during the receive, true otherwise
*
* A header packet can carry a control command (e.g. 'barrier leave') or
* information about a data packet that is following the header packet
* back to back.
*/
bool recvHeader(MultiHeaderPkt::Header &header);
/**
* Receive a data packet. Called by the receiver thread.
* @param data_header The packet descriptor for the expected incoming data
* packet.
*/
void recvData(const MultiHeaderPkt::Header &data_header);
public:
/**
* ctor
* @param multi_rank Rank of this gem5 process within the multi run
* @param sync_start Start tick for multi synchronisation
* @param sync_repeat Frequency for multi synchronisation
* @param em The event manager associated with the simulated Ethernet link
*/
MultiIface(unsigned multi_rank,
Tick sync_start,
Tick sync_repeat,
EventManager *em);
virtual ~MultiIface();
/**
* Send out an Ethernet packet.
* @param pkt The Ethernet packet to send.
* @param send_delay The delay in ticks for the send completion event.
*/
void packetOut(EthPacketPtr pkt, Tick send_delay);
/**
* Fetch the next packet from the receive queue.
*/
EthPacketPtr packetIn();
/**
* spawn the receiver thread.
* @param recv_done The receive done event associated with the simulated
* Ethernet link.
* @param link_delay The link delay for the simulated Ethernet link.
*/
void spawnRecvThread(Event *recv_done,
Tick link_delay);
/**
* Initialize the random number generator with a different seed in each
* peer gem5 process.
*/
void initRandom();
DrainState drain() override;
/**
* Callback when draining is complete.
*/
void drainDone();
/**
* Initialize the periodic synchronisation among peer gem5 processes.
*/
void startPeriodicSync();
void serialize(const std::string &base, CheckpointOut &cp) const;
void unserialize(const std::string &base, CheckpointIn &cp);
};
#endif // __DEV_NET_MULTI_IFACE_HH__

View file

@ -35,25 +35,35 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
* Mohammad Alian
*/
/* @file
* TCP stream socket based interface class implementation for multi gem5 runs.
* TCP stream socket based interface class implementation for dist-gem5 runs.
*/
#include "dev/net/tcp_iface.hh"
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include <vector>
#include "base/types.hh"
#include "debug/MultiEthernet.hh"
#include "debug/DistEthernet.hh"
#include "debug/DistEthernetCmd.hh"
#include "sim/sim_exit.hh"
#if defined(__FreeBSD__)
#include <netinet/in.h>
#endif
// MSG_NOSIGNAL does not exists on OS X
#if defined(__APPLE__) || defined(__MACH__)
@ -64,42 +74,181 @@
using namespace std;
std::vector<std::pair<TCPIface::NodeInfo, int> > TCPIface::nodes;
vector<int> TCPIface::sockRegistry;
int TCPIface::fdStatic = -1;
bool TCPIface::anyListening = false;
TCPIface::TCPIface(string server_name, unsigned server_port,
unsigned multi_rank, Tick sync_start, Tick sync_repeat,
EventManager *em) :
MultiIface(multi_rank, sync_start, sync_repeat, em)
unsigned dist_rank, unsigned dist_size,
Tick sync_start, Tick sync_repeat,
EventManager *em, bool is_switch, int num_nodes) :
DistIface(dist_rank, dist_size, sync_start, sync_repeat, em,
is_switch, num_nodes), serverName(server_name),
serverPort(server_port), isSwitch(is_switch), listening(false)
{
if (is_switch && isMaster) {
while (!listen(serverPort)) {
DPRINTF(DistEthernet, "TCPIface(listen): Can't bind port %d\n",
serverPort);
serverPort++;
}
inform("tcp_iface listening on port %d", serverPort);
// Now accept the first connection requests from each compute node and
// store the node info. The compute nodes will then wait for ack
// messages. Ack messages will be sent by initTransport() in the
// appropriate order to make sure that every compute node is always
// connected to the same switch port.
NodeInfo ni;
for (int i = 0; i < size; i++) {
accept();
DPRINTF(DistEthernet, "First connection, waiting for link info\n");
if (!recvTCP(sock, &ni, sizeof(ni)))
panic("Failed to receive link info");
nodes.push_back(make_pair(ni, sock));
}
}
}
bool
TCPIface::listen(int port)
{
if (listening)
panic("Socket already listening!");
struct sockaddr_in sockaddr;
int ret;
fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
panic_if(fdStatic < 0, "socket() failed: %s", strerror(errno));
sockaddr.sin_family = PF_INET;
sockaddr.sin_addr.s_addr = INADDR_ANY;
sockaddr.sin_port = htons(port);
// finally clear sin_zero
memset(&sockaddr.sin_zero, 0, sizeof(sockaddr.sin_zero));
ret = ::bind(fdStatic, (struct sockaddr *)&sockaddr, sizeof (sockaddr));
if (ret != 0) {
if (ret == -1 && errno != EADDRINUSE)
panic("ListenSocket(listen): bind() failed!");
return false;
}
if (::listen(fdStatic, 24) == -1) {
if (errno != EADDRINUSE)
panic("ListenSocket(listen): listen() failed!");
return false;
}
listening = true;
anyListening = true;
return true;
}
void
TCPIface::establishConnection()
{
static unsigned cur_rank = 0;
static unsigned cur_id = 0;
NodeInfo ni;
if (isSwitch) {
if (cur_id == 0) { // first connection accepted in the ctor already
auto const &iface0 =
find_if(nodes.begin(), nodes.end(),
[](const pair<NodeInfo, int> &cn) -> bool {
return cn.first.rank == cur_rank;
});
assert(iface0 != nodes.end());
assert(iface0->first.distIfaceId == 0);
sock = iface0->second;
ni = iface0->first;
} else { // additional connections from the same compute node
accept();
DPRINTF(DistEthernet, "Next connection, waiting for link info\n");
if (!recvTCP(sock, &ni, sizeof(ni)))
panic("Failed to receive link info");
assert(ni.rank == cur_rank);
assert(ni.distIfaceId == cur_id);
}
inform("Link okay (iface:%d -> (node:%d, iface:%d))",
distIfaceId, ni.rank, ni.distIfaceId);
if (ni.distIfaceId < ni.distIfaceNum - 1) {
cur_id++;
} else {
cur_rank++;
cur_id = 0;
}
// send ack
ni.distIfaceId = distIfaceId;
ni.distIfaceNum = distIfaceNum;
sendTCP(sock, &ni, sizeof(ni));
} else { // this is not a switch
connect();
// send link info
ni.rank = rank;
ni.distIfaceId = distIfaceId;
ni.distIfaceNum = distIfaceNum;
sendTCP(sock, &ni, sizeof(ni));
DPRINTF(DistEthernet, "Connected, waiting for ack (distIfaceId:%d\n",
distIfaceId);
if (!recvTCP(sock, &ni, sizeof(ni)))
panic("Failed to receive ack");
assert(ni.rank == rank);
inform("Link okay (iface:%d -> switch iface:%d)", distIfaceId,
ni.distIfaceId);
}
sockRegistry.push_back(sock);
}
void
TCPIface::accept()
{
struct sockaddr_in sockaddr;
socklen_t slen = sizeof (sockaddr);
sock = ::accept(fdStatic, (struct sockaddr *)&sockaddr, &slen);
if (sock != -1) {
int i = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&i,
sizeof(i)) < 0)
warn("ListenSocket(accept): setsockopt() TCP_NODELAY failed!");
}
}
void
TCPIface::connect()
{
struct addrinfo addr_hint, *addr_results;
int ret;
string port_str = to_string(server_port);
string port_str = to_string(serverPort);
sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
panic_if(sock < 0, "socket() failed: %s", strerror(errno));
int fl = 1;
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&fl, sizeof(fl)) < 0)
warn("ConnectSocket(connect): setsockopt() TCP_NODELAY failed!");
bzero(&addr_hint, sizeof(addr_hint));
addr_hint.ai_family = AF_INET;
addr_hint.ai_socktype = SOCK_STREAM;
addr_hint.ai_protocol = IPPROTO_TCP;
ret = getaddrinfo(server_name.c_str(), port_str.c_str(),
ret = getaddrinfo(serverName.c_str(), port_str.c_str(),
&addr_hint, &addr_results);
panic_if(ret < 0, "getaddrinf() failed: %s", strerror(errno));
DPRINTF(MultiEthernet, "Connecting to %s:%u\n",
server_name.c_str(), port_str.c_str());
DPRINTF(DistEthernet, "Connecting to %s:%s\n",
serverName.c_str(), port_str.c_str());
ret = ::connect(sock, (struct sockaddr *)(addr_results->ai_addr),
addr_results->ai_addrlen);
panic_if(ret < 0, "connect() failed: %s", strerror(errno));
freeaddrinfo(addr_results);
// add our socket to the static registry
sockRegistry.push_back(sock);
// let the server know who we are
sendTCP(sock, &multi_rank, sizeof(multi_rank));
}
TCPIface::~TCPIface()
@ -111,12 +260,20 @@ TCPIface::~TCPIface()
}
void
TCPIface::sendTCP(int sock, void *buf, unsigned length)
TCPIface::sendTCP(int sock, const void *buf, unsigned length)
{
ssize_t ret;
ret = ::send(sock, buf, length, MSG_NOSIGNAL);
panic_if(ret < 0, "send() failed: %s", strerror(errno));
if (ret < 0) {
if (errno == ECONNRESET || errno == EPIPE) {
inform("send(): %s", strerror(errno));
exit_message("info", 0, "Message server closed connection, "
"simulation is exiting");
} else {
panic("send() failed: %s", strerror(errno));
}
}
panic_if(ret != length, "send() failed");
}
@ -140,19 +297,47 @@ TCPIface::recvTCP(int sock, void *buf, unsigned length)
}
void
TCPIface::syncRaw(MultiHeaderPkt::MsgType sync_req, Tick sync_tick)
TCPIface::sendPacket(const Header &header, const EthPacketPtr &packet)
{
/*
* Barrier is simply implemented by point-to-point messages to the server
* for now. This method is called by only one TCPIface object.
* The server will send back an 'ack' message when it gets the
* sync request from all clients.
*/
MultiHeaderPkt::Header header_pkt;
header_pkt.msgType = sync_req;
header_pkt.sendTick = sync_tick;
for (auto s : sockRegistry)
sendTCP(s, (void *)&header_pkt, sizeof(header_pkt));
sendTCP(sock, &header, sizeof(header));
sendTCP(sock, packet->data, packet->length);
}
void
TCPIface::sendCmd(const Header &header)
{
DPRINTF(DistEthernetCmd, "TCPIface::sendCmd() type: %d\n",
static_cast<int>(header.msgType));
// Global commands (i.e. sync request) are always sent by the master
// DistIface. The transfer method is simply implemented as point-to-point
// messages for now
for (auto s: sockRegistry)
sendTCP(s, (void*)&header, sizeof(header));
}
bool
TCPIface::recvHeader(Header &header)
{
bool ret = recvTCP(sock, &header, sizeof(header));
DPRINTF(DistEthernetCmd, "TCPIface::recvHeader() type: %d ret: %d\n",
static_cast<int>(header.msgType), ret);
return ret;
}
void
TCPIface::recvPacket(const Header &header, EthPacketPtr &packet)
{
packet = make_shared<EthPacketData>(header.dataPacketLength);
bool ret = recvTCP(sock, packet->data, header.dataPacketLength);
panic_if(!ret, "Error while reading socket");
packet->length = header.dataPacketLength;
}
void
TCPIface::initTransport()
{
// We cannot setup the conections in the constructor because the number
// of dist interfaces (per process) is unknown until the (simobject) init
// phase. That information is necessary for global connection ordering.
establishConnection();
}

View file

@ -35,17 +35,17 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
* Mohammad Alian
*/
/* @file
* TCP stream socket based interface class for multi gem5 runs.
* TCP stream socket based interface class for dist-gem5 runs.
*
* For a high level description about multi gem5 see comments in
* header file multi_iface.hh.
* For a high level description about dist-gem5 see comments in
* header file dist_iface.hh.
*
* The TCP subclass of MultiIface uses a separate server process
* (see tcp_server.[hh,cc] under directory gem5/util/multi). Each gem5
* process connects to the server via a stream socket. The server process
* Each gem5 process connects to the server (another gem5 process which
* simulates a switch box) via a stream socket. The server process
* transfers messages and co-ordinates the synchronisation among the gem5
* peers.
*/
@ -55,11 +55,11 @@
#include <string>
#include "dev/net/multi_iface.hh"
#include "dev/net/dist_iface.hh"
class EventManager;
class TCPIface : public MultiIface
class TCPIface : public DistIface
{
private:
/**
@ -67,8 +67,28 @@ class TCPIface : public MultiIface
*/
int sock;
std::string serverName;
int serverPort;
bool isSwitch;
bool listening;
static bool anyListening;
static int fdStatic;
/**
* Registry for all sockets to the server opened by this gem5 process.
* Compute node info and storage for the very first connection from each
* node (used by the switch)
*/
struct NodeInfo
{
unsigned rank;
unsigned distIfaceId;
unsigned distIfaceNum;
};
static std::vector<std::pair<NodeInfo, int> > nodes;
/**
* Storage for all opened sockets
*/
static std::vector<int> sockRegistry;
@ -82,7 +102,7 @@ class TCPIface : public MultiIface
* @param length Size of the message in bytes.
*/
void
sendTCP(int sock, void *buf, unsigned length);
sendTCP(int sock, const void *buf, unsigned length);
/**
* Receive the next incoming message through a TCP stream socket.
@ -92,24 +112,26 @@ class TCPIface : public MultiIface
* @param length Exact size of the expected message in bytes.
*/
bool recvTCP(int sock, void *buf, unsigned length);
bool listen(int port);
void accept();
void connect();
int getfdStatic() const { return fdStatic; }
bool islistening() const { return listening; }
bool anyislistening() const { return anyListening; }
void establishConnection();
protected:
virtual void
sendRaw(void *buf, unsigned length,
const MultiHeaderPkt::AddressType dest_addr=nullptr) override
{
sendTCP(sock, buf, length);
}
void sendPacket(const Header &header,
const EthPacketPtr &packet) override;
virtual bool recvRaw(void *buf, unsigned length) override
{
return recvTCP(sock, buf, length);
}
void sendCmd(const Header &header) override;
virtual void syncRaw(MultiHeaderPkt::MsgType sync_req,
Tick sync_tick) override;
bool recvHeader(Header &header) override;
void recvPacket(const Header &header, EthPacketPtr &packet) override;
void initTransport() override;
public:
/**
@ -118,14 +140,15 @@ class TCPIface : public MultiIface
* server process.
* @param server_port The port number the server listening for new
* connections.
* @param sync_start The tick for the first multi synchronisation.
* @param sync_repeat The frequency of multi synchronisation.
* @param sync_start The tick for the first dist synchronisation.
* @param sync_repeat The frequency of dist synchronisation.
* @param em The EventManager object associated with the simulated
* Ethernet link.
*/
TCPIface(std::string server_name, unsigned server_port,
unsigned multi_rank, Tick sync_start, Tick sync_repeat,
EventManager *em);
unsigned dist_rank, unsigned dist_size,
Tick sync_start, Tick sync_repeat, EventManager *em,
bool is_switch, int num_nodes);
~TCPIface() override;
};

View file

@ -219,7 +219,7 @@ class GlobalSyncEvent : public BaseGlobalEventTemplate<GlobalSyncEvent>
};
GlobalSyncEvent(Priority p, Flags f)
: Base(p, f)
: Base(p, f), repeat(0)
{ }
GlobalSyncEvent(Tick when, Tick _repeat, Priority p, Flags f)

View file

@ -38,63 +38,33 @@
*/
/* @file
* MultiHeaderPkt class to encapsulate multi-gem5 header packets
*
* Magic key definitions for the InitParam pseudo inst
*/
#ifndef ___SIM_INITPARAM_KEYS_HH__
#define ___SIM_INITPARAM_KEYS_HH__
#include "dev/net/multi_packet.hh"
#include <cstdint>
#include <cstring>
#include "base/inet.hh"
unsigned
MultiHeaderPkt::maxAddressLength()
namespace PseudoInst {
/**
* Unique keys to retrieve various params by the initParam pseudo inst.
*
* @note Each key must be shorter than 16 characters (because we use
* two 64-bit registers two pass in the key to the initparam function)
*/
struct InitParamKey
{
return sizeof(AddressType);
}
/**
* The default key (empty string)
*/
static constexpr const char *DEFAULT = "";
/**
* Unique key for "rank" param (distributed gem5 runs)
*/
static constexpr const char *DIST_RANK = "dist-rank";
/**
* Unique key for "size" param (distributed gem5 runs)
*/
static constexpr const char *DIST_SIZE = "dist-size";
};
} // namespace PseudoInst
void
MultiHeaderPkt::clearAddress(AddressType &addr)
{
std::memset(addr, 0, sizeof(addr));
}
bool
MultiHeaderPkt::isAddressEqual(const AddressType &addr1,
const AddressType &addr2)
{
return (std::memcmp(addr1, addr2, sizeof(addr1)) == 0);
}
bool
MultiHeaderPkt::isAddressLess(const AddressType &addr1,
const AddressType &addr2)
{
return (std::memcmp(addr1, addr2, sizeof(addr1)) < 0);
}
void
MultiHeaderPkt::copyAddress(AddressType &dest, const AddressType &src)
{
std::memcpy(dest, src, sizeof(dest));
}
bool
MultiHeaderPkt::isBroadcastAddress(const AddressType &addr)
{
return ((Net::EthAddr *)&addr)->broadcast();
}
bool
MultiHeaderPkt::isMulticastAddress(const AddressType &addr)
{
return ((Net::EthAddr *)&addr)->multicast();
}
bool
MultiHeaderPkt::isUnicastAddress(const AddressType &addr)
{
return ((Net::EthAddr *)&addr)->unicast();
}
#endif

View file

@ -63,8 +63,10 @@
#include "debug/PseudoInst.hh"
#include "debug/Quiesce.hh"
#include "debug/WorkItems.hh"
#include "dev/net/dist_iface.hh"
#include "params/BaseCPU.hh"
#include "sim/full_system.hh"
#include "sim/initparam_keys.hh"
#include "sim/process.hh"
#include "sim/pseudo_inst.hh"
#include "sim/serialize.hh"
@ -357,9 +359,11 @@ void
m5exit(ThreadContext *tc, Tick delay)
{
DPRINTF(PseudoInst, "PseudoInst::m5exit(%i)\n", delay);
if (DistIface::readyToExit(delay)) {
Tick when = curTick() + delay * SimClock::Int::ns;
exitSimLoop("m5_exit instruction encountered", 0, when, 0, true);
}
}
void
m5fail(ThreadContext *tc, Tick delay, uint64_t code)
@ -471,10 +475,14 @@ initParam(ThreadContext *tc, uint64_t key_str1, uint64_t key_str2)
// Compare the key parameter with the known values to select the return
// value
uint64_t val;
if (strlen(key_str) == 0) {
if (strcmp(key_str, InitParamKey::DEFAULT) == 0) {
val = tc->getCpuPtr()->system->init_param;
} else if (strcmp(key_str, InitParamKey::DIST_RANK) == 0) {
val = DistIface::rankParam();
} else if (strcmp(key_str, InitParamKey::DIST_SIZE) == 0) {
val = DistIface::sizeParam();
} else {
panic("Unknown key for initparam pseudo instruction");
panic("Unknown key for initparam pseudo instruction:\"%s\"", key_str);
}
return val;
}
@ -529,11 +537,12 @@ m5checkpoint(ThreadContext *tc, Tick delay, Tick period)
if (!tc->getCpuPtr()->params()->do_checkpoint_insts)
return;
if (DistIface::readyToCkpt(delay, period)) {
Tick when = curTick() + delay * SimClock::Int::ns;
Tick repeat = period * SimClock::Int::ns;
exitSimLoop("checkpoint", 0, when, repeat);
}
}
uint64_t
readfile(ThreadContext *tc, Addr vaddr, uint64_t len, uint64_t offset)

View file

@ -1,63 +0,0 @@
#
# Copyright (c) 2015 ARM Limited
# All rights reserved
#
# The license below extends only to copyright in the software and shall
# not be construed as granting a license to any other intellectual
# property including but not limited to intellectual property relating
# to a hardware implementation of the functionality of the software
# licensed hereunder. You may use the software subject to the license
# terms below provided that you ensure that this notice is replicated
# unmodified and in its entirety in all distributions of the software,
# modified or unmodified, in source code or in binary form.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met: redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer;
# redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution;
# neither the name of the copyright holders nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Authors: Gabor Dozsa
CXX= g++
DEBUG= -DDEBUG
M5_ARCH?= ARM
M5_DIR?= ../..
vpath % $(M5_DIR)/build/$(M5_ARCH)/dev
INCDIRS= -I. -I$(M5_DIR)/build/$(M5_ARCH) -I$(M5_DIR)/ext
CCFLAGS= -g -Wall -O3 $(DEBUG) -std=c++11 -MMD $(INCDIRS)
default: tcp_server
clean:
@rm -f tcp_server *.o *.d *~
tcp_server: tcp_server.o multi_packet.o
$(CXX) $(LFLAGS) -o $@ $^
%.o: %.cc
@echo '$(CXX) $(CCFLAGS) -c $(notdir $<) -o $@'
@$(CXX) $(CCFLAGS) -c $< -o $@
-include *.d

View file

@ -1,463 +0,0 @@
/*
* Copyright (c) 2015 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
* not be construed as granting a license to any other intellectual
* property including but not limited to intellectual property relating
* to a hardware implementation of the functionality of the software
* licensed hereunder. You may use the software subject to the license
* terms below provided that you ensure that this notice is replicated
* unmodified and in its entirety in all distributions of the software,
* modified or unmodified, in source code or in binary form.
*
* Copyright (c) 2008 The Regents of The University of Michigan
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met: redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer;
* redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution;
* neither the name of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
*/
/* @file
* Message server implementation using TCP stream sockets for parallel gem5
* runs.
*/
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>
#include "tcp_server.hh"
using namespace std;
// Some basic macros for information and error reporting.
#define PRINTF(...) fprintf(stderr, __VA_ARGS__)
#ifdef DEBUG
static bool debugSetup = true;
static bool debugPeriodic = false;
static bool debugSync = true;
static bool debugPkt = false;
#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__)
#else
#define DPRINTF(v,...)
#endif
#define inform(...) do { PRINTF("info: "); \
PRINTF(__VA_ARGS__); } while(0)
#define panic(...) do { PRINTF("panic: "); \
PRINTF(__VA_ARGS__); \
PRINTF("\n[%s:%s], line %d\n", \
__FUNCTION__, __FILE__, __LINE__); \
exit(-1); } while(0)
TCPServer *TCPServer::instance = nullptr;
TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle)
{
MultiHeaderPkt::clearAddress(address);
}
unsigned
TCPServer::Channel::recvRaw(void *buf, unsigned size) const
{
ssize_t n;
// This is a blocking receive.
n = recv(fd, buf, size, MSG_WAITALL);
if (n < 0)
panic("read() failed:%s", strerror(errno));
else if (n > 0 && n < size)
// the recv() call should wait for the full message
panic("read() failed");
return n;
}
void
TCPServer::Channel::sendRaw(const void *buf, unsigned size) const
{
ssize_t n;
n = send(fd, buf, size, MSG_NOSIGNAL);
if (n < 0)
panic("write() failed:%s", strerror(errno));
else if (n != size)
panic("write() failed");
}
void TCPServer::Channel::updateAddress(const AddressType &new_address)
{
// check if the known address has changed (e.g. the client reconfigured
// its Ethernet NIC)
if (MultiHeaderPkt::isAddressEqual(address, new_address))
return;
// So we have to update the address. Note that we always
// store the same address as key in the map but the ordering
// may change so we need to erase and re-insert it again.
auto info = TCPServer::instance->addressMap.find(&address);
if (info != TCPServer::instance->addressMap.end()) {
TCPServer::instance->addressMap.erase(info);
}
MultiHeaderPkt::copyAddress(address, new_address);
TCPServer::instance->addressMap[&address] = this;
}
void
TCPServer::Channel::headerPktIn()
{
ssize_t n;
Header hdr_pkt;
n = recvRaw(&hdr_pkt, sizeof(hdr_pkt));
if (n == 0) {
// EOF - nothing to do here, we will handle this as a POLLRDHUP event
// in the main loop.
return;
}
if (hdr_pkt.msgType == MsgType::dataDescriptor) {
updateAddress(hdr_pkt.srcAddress);
TCPServer::instance->xferData(hdr_pkt, *this);
} else {
processCmd(hdr_pkt.msgType, hdr_pkt.sendTick);
}
}
void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick)
{
switch (cmd) {
case MsgType::cmdAtomicSyncReq:
DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank);
assert(state == SyncState::idle);
state = SyncState::atomic;
TCPServer::instance->syncTryComplete(SyncState::atomic,
MsgType::cmdAtomicSyncAck);
break;
case MsgType::cmdPeriodicSyncReq:
DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick);
// sanity check
if (TCPServer::instance->periodicSyncTick() == 0) {
TCPServer::instance->periodicSyncTick(send_tick);
} else if ( TCPServer::instance->periodicSyncTick() != send_tick) {
panic("Out of order periodic sync request - rank:%d "
"(send_tick:%ld ongoing:%ld)", rank, send_tick,
TCPServer::instance->periodicSyncTick());
}
switch (state) {
case SyncState::idle:
state = SyncState::periodic;
TCPServer::instance->syncTryComplete(SyncState::periodic,
MsgType::cmdPeriodicSyncAck);
break;
case SyncState::asyncCkpt:
// An async ckpt request has already been sent to this client and
// that will interrupt this periodic sync. We can simply drop this
// message.
break;
default:
panic("Unexpected state for periodic sync request (rank:%d)",
rank);
break;
}
break;
case MsgType::cmdCkptSyncReq:
DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank);
switch (state) {
case SyncState::idle:
TCPServer::instance->ckptPropagate(*this);
// we fall through here to complete #clients==1 case
case SyncState::asyncCkpt:
state = SyncState::ckpt;
TCPServer::instance->syncTryComplete(SyncState::ckpt,
MsgType::cmdCkptSyncAck);
break;
default:
panic("Unexpected state for ckpt sync request (rank:%d)", rank);
break;
}
break;
default:
panic("Unexpected header packet (rank:%d)",rank);
break;
}
}
TCPServer::TCPServer(unsigned clients_num,
unsigned listen_port,
int timeout_in_sec)
{
assert(instance == nullptr);
construct(clients_num, listen_port, timeout_in_sec);
instance = this;
}
TCPServer::~TCPServer()
{
for (auto &c : clientsPollFd)
close(c.fd);
}
void
TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec)
{
int listen_sock, new_sock, ret;
unsigned client_len;
struct sockaddr_in server_addr, client_addr;
struct pollfd new_pollfd;
Channel new_channel;
DPRINTF(debugSetup, "Start listening on port %u ...\n", port);
listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sock < 0)
panic("socket() failed:%s", strerror(errno));
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
if (bind(listen_sock, (struct sockaddr *) &server_addr,
sizeof(server_addr)) < 0)
panic("bind() failed:%s", strerror(errno));
listen(listen_sock, 10);
clientsPollFd.reserve(clients_num);
clientsChannel.reserve(clients_num);
new_pollfd.events = POLLIN | POLLRDHUP;
new_pollfd.revents = 0;
while (clientsPollFd.size() < clients_num) {
new_pollfd.fd = listen_sock;
ret = poll(&new_pollfd, 1, timeout_in_sec*1000);
if (ret == 0)
panic("Timeout while waiting for clients to connect");
assert(ret == 1 && new_pollfd.revents == POLLIN);
client_len = sizeof(client_addr);
new_sock = accept(listen_sock,
(struct sockaddr *) &client_addr,
&client_len);
if (new_sock < 0)
panic("accept() failed:%s", strerror(errno));
new_pollfd.fd = new_sock;
new_pollfd.revents = 0;
clientsPollFd.push_back(new_pollfd);
new_channel.fd = new_sock;
new_channel.isAlive = true;
new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank));
clientsChannel.push_back(new_channel);
DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n",
client_addr.sin_addr.s_addr, client_addr.sin_port,
new_channel.rank);
}
ret = close(listen_sock);
assert(ret == 0);
DPRINTF(debugSetup, "Setup complete\n");
}
void
TCPServer::run()
{
int nfd;
unsigned num_active_clients = clientsPollFd.size();
DPRINTF(debugSetup, "Entering run() loop\n");
while (num_active_clients == clientsPollFd.size()) {
nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1);
if (nfd == -1)
panic("poll() failed:%s", strerror(errno));
for (unsigned i = 0, n = 0;
i < clientsPollFd.size() && (signed)n < nfd;
i++) {
struct pollfd &pfd = clientsPollFd[i];
if (pfd.revents) {
if (pfd.revents & POLLERR)
panic("poll() returned POLLERR");
if (pfd.revents & POLLIN) {
clientsChannel[i].headerPktIn();
}
if (pfd.revents & POLLRDHUP) {
// One gem5 process exited or aborted. Either way, we
// assume the full simulation should stop now (either
// because m5 exit was called or a serious error
// occurred.) So we quit the run loop here and close all
// sockets to notify the remaining peer gem5 processes.
pfd.events = 0;
clientsChannel[i].isAlive = false;
num_active_clients--;
DPRINTF(debugSetup, "POLLRDHUP event");
}
n++;
if ((signed)n == nfd)
break;
}
}
}
DPRINTF(debugSetup, "Exiting run() loop\n");
}
void
TCPServer::xferData(const Header &hdr_pkt, const Channel &src)
{
unsigned n;
assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer));
n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength);
if (n == 0)
panic("recvRaw() failed");
DPRINTF(debugPkt, "Incoming data packet (from rank %d) "
"src:0x%02x%02x%02x%02x%02x%02x "
"dst:0x%02x%02x%02x%02x%02x%02x\n",
src.rank,
hdr_pkt.srcAddress[0],
hdr_pkt.srcAddress[1],
hdr_pkt.srcAddress[2],
hdr_pkt.srcAddress[3],
hdr_pkt.srcAddress[4],
hdr_pkt.srcAddress[5],
hdr_pkt.dstAddress[0],
hdr_pkt.dstAddress[1],
hdr_pkt.dstAddress[2],
hdr_pkt.dstAddress[3],
hdr_pkt.dstAddress[4],
hdr_pkt.dstAddress[5]);
// Now try to figure out the destination client(s).
auto dst_info = addressMap.find(&hdr_pkt.dstAddress);
// First handle the multicast/broadcast or unknonw destination case. These
// all trigger a broadcast of the packet to all clients.
if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false ||
dst_info == addressMap.end()) {
unsigned n = 0;
for (auto const &c: clientsChannel) {
if (c.isAlive && &c!=&src) {
c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
n++;
}
}
if (n == 0) {
inform("Broadcast/multicast packet dropped\n");
}
} else {
// It is a unicast address with a known destination
Channel *dst = dst_info->second;
if (dst->isAlive) {
dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt));
dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank);
} else {
inform("Unicast packet dropped (destination exited)\n");
}
}
}
void
TCPServer::syncTryComplete(SyncState st, MsgType ack)
{
// Check if the barrieris complete. If so then notify all the clients.
for (auto &c : clientsChannel) {
if (c.isAlive && (c.state != st)) {
// sync not complete yet, stop here
return;
}
}
// Sync complete, send out the acks
MultiHeaderPkt::Header hdr_pkt;
hdr_pkt.msgType = ack;
for (auto &c : clientsChannel) {
if (c.isAlive) {
c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
c.state = SyncState::idle;
}
}
// Reset periodic send tick
_periodicSyncTick = 0;
DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync,
"Sync COMPLETE\n");
}
void
TCPServer::ckptPropagate(Channel &ch)
{
// Channel ch got a ckpt request that needs to be propagated to the other
// clients
MultiHeaderPkt::Header hdr_pkt;
hdr_pkt.msgType = MsgType::cmdCkptSyncReq;
for (auto &c : clientsChannel) {
if (c.isAlive && (&c != &ch)) {
switch (c.state) {
case SyncState::idle:
case SyncState::periodic:
c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
c.state = SyncState::asyncCkpt;
break;
default:
panic("Unexpected state for ckpt sync request propagation "
"(rank:%d)\n",c.rank);
break;
}
}
}
}
int main(int argc, char *argv[])
{
TCPServer *server;
int clients_num = -1, listen_port = -1;
int first_arg = 1, timeout_in_sec = 60;
if (argc > 1 && string(argv[1]).compare("-debug") == 0) {
timeout_in_sec = -1;
first_arg++;
argc--;
}
if (argc != 3)
panic("We need two command line args (number of clients and tcp listen"
" port");
clients_num = atoi(argv[first_arg]);
listen_port = atoi(argv[first_arg + 1]);
server = new TCPServer(clients_num, listen_port, timeout_in_sec);
server->run();
delete server;
return 0;
}

View file

@ -1,254 +0,0 @@
/*
* Copyright (c) 2015 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
* not be construed as granting a license to any other intellectual
* property including but not limited to intellectual property relating
* to a hardware implementation of the functionality of the software
* licensed hereunder. You may use the software subject to the license
* terms below provided that you ensure that this notice is replicated
* unmodified and in its entirety in all distributions of the software,
* modified or unmodified, in source code or in binary form.
*
* Copyright (c) 2008 The Regents of The University of Michigan
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met: redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer;
* redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution;
* neither the name of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
*/
/* @file
* Message server using TCP stream sockets for parallel gem5 runs.
*
* For a high level description about multi gem5 see comments in
* header files src/dev/multi_iface.hh and src/dev/tcp_iface.hh.
*
* This file implements the central message server process for multi gem5.
* The server is responsible the following tasks.
* 1. Establishing a TCP socket connection for each gem5 process (clients).
*
* 2. Process data messages coming in from clients. The server checks
* the MAC addresses in the header message and transfers the message
* to the target(s) client(s).
*
* 3. Processing synchronisation related control messages. Synchronisation
* is performed as follows. The server waits for a 'barrier enter' message
* from all the clients. When the last such control message arrives, the
* server sends out a 'barrier leave' control message to all the clients.
*
* 4. Triggers complete termination in case a client exits. A client may
* exit either by calling 'm5 exit' pseudo instruction or due to a fatal
* error. In either case, we assume that the entire multi simulation needs to
* terminate. The server triggers full termination by tearing down the
* open TCP sockets.
*
* The TCPServer class is instantiated as a singleton object.
*
* The server can be built independently from the rest of gem5 (and it is
* architecture agnostic). See the Makefile in the same directory.
*
*/
#include <poll.h>
#include <map>
#include <vector>
#include "dev/etherpkt.hh"
#include "dev/multi_packet.hh"
/**
* The maximum length of an Ethernet packet (allowing Jumbo frames).
*/
#define MAX_ETH_PACKET_LENGTH 9014
class TCPServer
{
public:
typedef MultiHeaderPkt::AddressType AddressType;
typedef MultiHeaderPkt::Header Header;
typedef MultiHeaderPkt::MsgType MsgType;
private:
enum
class SyncState { periodic, ckpt, asyncCkpt, atomic, idle };
/**
* The Channel class encapsulates all the information about a client
* and its current status.
*/
class Channel
{
private:
/**
* The MAC address of the client.
*/
AddressType address;
/**
* Update the client MAC address. It is called every time a new data
* packet is to come in.
*/
void updateAddress(const AddressType &new_addr);
/**
* Process an incoming command message.
*/
void processCmd(MultiHeaderPkt::MsgType cmd, Tick send_tick);
public:
/**
* TCP stream socket.
*/
int fd;
/**
* Is client connected?
*/
bool isAlive;
/**
* Current state of the channel wrt. multi synchronisation.
*/
SyncState state;
/**
* Multi rank of the client
*/
unsigned rank;
public:
Channel();
~Channel () {}
/**
* Receive and process the next incoming header packet.
*/
void headerPktIn();
/**
* Send raw data to the connected client.
*
* @param data The data to send.
* @param size Size of the data (in bytes).
*/
void sendRaw(const void *data, unsigned size) const;
/**
* Receive raw data from the connected client.
*
* @param buf The buffer to store the incoming data into.
* @param size Size of data to receive (in bytes).
* @return In case of success, it returns size. Zero is returned
* if the socket is already closed by the client.
*/
unsigned recvRaw(void *buf, unsigned size) const;
};
/**
* The array of socket descriptors needed by the poll() system call.
*/
std::vector<struct pollfd> clientsPollFd;
/**
* Array holding all clients info.
*/
std::vector<Channel> clientsChannel;
/**
* We use a map to select the target client based on the destination
* MAC address.
*/
struct AddressCompare
{
bool operator()(const AddressType *a1, const AddressType *a2)
{
return MultiHeaderPkt::isAddressLess(*a1, *a2);
}
};
std::map<const AddressType *, Channel *, AddressCompare> addressMap;
/**
* As we dealt with only one message at a time, we can allocate and re-use
* a single packet buffer (to hold any incoming data packet).
*/
uint8_t packetBuffer[MAX_ETH_PACKET_LENGTH];
/**
* Send tick of the current periodic sync. It is used for sanity check.
*/
Tick _periodicSyncTick;
/**
* The singleton server object.
*/
static TCPServer *instance;
/**
* Set up the socket connections to all the clients.
*
* @param listen_port The port we are listening on for new client
* connection requests.
* @param nclients The number of clients to connect to.
* @param timeout Timeout in sec to complete the setup phase
* (i.e. all gem5 establish socket connections)
*/
void construct(unsigned listen_port, unsigned nclients, int timeout);
/**
* Transfer the header and the follow up data packet to the target(s)
* clients.
*
* @param hdr The header message structure.
* @param ch The source channel for the message.
*/
void xferData(const Header &hdr, const Channel &ch);
/**
* Check if the current round of a synchronisation is completed and notify
* the clients if it is so.
*
* @param st The state all channels should have if sync is complete.
* @param ack The type of ack message to send out if the sync is compete.
*/
void syncTryComplete(SyncState st, MultiHeaderPkt::MsgType ack);
/**
* Broadcast a request for checkpoint sync.
*
* @param ch The source channel of the checkpoint sync request.
*/
void ckptPropagate(Channel &ch);
/**
* Setter for current periodic send tick.
*/
void periodicSyncTick(Tick t) { _periodicSyncTick = t; }
/**
* Getter for current periodic send tick.
*/
Tick periodicSyncTick() { return _periodicSyncTick; }
public:
TCPServer(unsigned clients_num, unsigned listen_port, int timeout_in_sec);
~TCPServer();
/**
* The main server loop that waits for and processes incoming messages.
*/
void run();
};