gem5/util/multi/tcp_server.cc
Gabor Dozsa fc5bf6713f dev: add support for multi gem5 runs
Multi gem5 is an extension to gem5 to enable parallel simulation of a
distributed system (e.g. simulation of a pool of machines
connected by Ethernet links). A multi gem5 run consists of seperate gem5
processes running in parallel (potentially on different hosts/slots on
a cluster). Each gem5 process executes the simulation of a component of the
simulated distributed system (e.g. a multi-core board with an Ethernet NIC).

The patch implements the "distributed" Ethernet link device
(dev/src/multi_etherlink.[hh.cc]). This device will send/receive
(simulated) Ethernet packets to/from peer gem5 processes. The interface
to talk to the peer gem5 processes is defined in dev/src/multi_iface.hh and
in tcp_iface.hh.

There is also a central message server process (util/multi/tcp_server.[hh,cc])
which acts like an Ethernet switch and transfers messages among the gem5 peers.

A multi gem5 simulations can be kicked off by the util/multi/gem5-multi.sh
wrapper script.

Checkpoints are supported by multi-gem5. The checkpoint must be
initiated by a single gem5 process. E.g., the gem5 process with rank 0
can take a checkpoint from the bootscript just before it invokes
'mpirun' to launch an MPI test. The message server process will notify
all the other peer gem5 processes and make them take a checkpoint, too
(after completing a global synchronisation to ensure that there are no
inflight messages among gem5).
2015-07-15 19:53:50 -05:00

464 lines
15 KiB
C++

/*
* Copyright (c) 2015 ARM Limited
* All rights reserved
*
* The license below extends only to copyright in the software and shall
* not be construed as granting a license to any other intellectual
* property including but not limited to intellectual property relating
* to a hardware implementation of the functionality of the software
* licensed hereunder. You may use the software subject to the license
* terms below provided that you ensure that this notice is replicated
* unmodified and in its entirety in all distributions of the software,
* modified or unmodified, in source code or in binary form.
*
* Copyright (c) 2008 The Regents of The University of Michigan
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met: redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer;
* redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution;
* neither the name of the copyright holders nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* Authors: Gabor Dozsa
*/
/* @file
* Message server implementation using TCP stream sockets for parallel gem5
* runs.
*/
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>
#include "tcp_server.hh"
using namespace std;
// Some basic macros for information and error reporting.
#define PRINTF(...) fprintf(stderr, __VA_ARGS__)
#ifdef DEBUG
static bool debugSetup = true;
static bool debugPeriodic = false;
static bool debugSync = true;
static bool debugPkt = false;
#define DPRINTF(v,...) if (v) PRINTF(__VA_ARGS__)
#else
#define DPRINTF(v,...)
#endif
#define inform(...) do { PRINTF("info: "); \
PRINTF(__VA_ARGS__); } while(0)
#define panic(...) do { PRINTF("panic: "); \
PRINTF(__VA_ARGS__); \
PRINTF("\n[%s:%s], line %d\n", \
__FUNCTION__, __FILE__, __LINE__); \
exit(-1); } while(0)
TCPServer *TCPServer::instance = nullptr;
TCPServer::Channel::Channel() : fd(-1), isAlive(false), state(SyncState::idle)
{
MultiHeaderPkt::clearAddress(address);
}
unsigned
TCPServer::Channel::recvRaw(void *buf, unsigned size) const
{
ssize_t n;
// This is a blocking receive.
n = recv(fd, buf, size, MSG_WAITALL);
if (n < 0)
panic("read() failed:%s", strerror(errno));
else if (n > 0 && n < size)
// the recv() call should wait for the full message
panic("read() failed");
return n;
}
void
TCPServer::Channel::sendRaw(const void *buf, unsigned size) const
{
ssize_t n;
n = send(fd, buf, size, MSG_NOSIGNAL);
if (n < 0)
panic("write() failed:%s", strerror(errno));
else if (n != size)
panic("write() failed");
}
void TCPServer::Channel::updateAddress(const AddressType &new_address)
{
// check if the known address has changed (e.g. the client reconfigured
// its Ethernet NIC)
if (MultiHeaderPkt::isAddressEqual(address, new_address))
return;
// So we have to update the address. Note that we always
// store the same address as key in the map but the ordering
// may change so we need to erase and re-insert it again.
auto info = TCPServer::instance->addressMap.find(&address);
if (info != TCPServer::instance->addressMap.end()) {
TCPServer::instance->addressMap.erase(info);
}
MultiHeaderPkt::copyAddress(address, new_address);
TCPServer::instance->addressMap[&address] = this;
}
void
TCPServer::Channel::headerPktIn()
{
ssize_t n;
Header hdr_pkt;
n = recvRaw(&hdr_pkt, sizeof(hdr_pkt));
if (n == 0) {
// EOF - nothing to do here, we will handle this as a POLLRDHUP event
// in the main loop.
return;
}
if (hdr_pkt.msgType == MsgType::dataDescriptor) {
updateAddress(hdr_pkt.srcAddress);
TCPServer::instance->xferData(hdr_pkt, *this);
} else {
processCmd(hdr_pkt.msgType, hdr_pkt.sendTick);
}
}
void TCPServer::Channel::processCmd(MsgType cmd, Tick send_tick)
{
switch (cmd) {
case MsgType::cmdAtomicSyncReq:
DPRINTF(debugSync,"Atomic sync request (rank:%d)\n",rank);
assert(state == SyncState::idle);
state = SyncState::atomic;
TCPServer::instance->syncTryComplete(SyncState::atomic,
MsgType::cmdAtomicSyncAck);
break;
case MsgType::cmdPeriodicSyncReq:
DPRINTF(debugPeriodic,"PERIODIC sync request (at %ld)\n",send_tick);
// sanity check
if (TCPServer::instance->periodicSyncTick() == 0) {
TCPServer::instance->periodicSyncTick(send_tick);
} else if ( TCPServer::instance->periodicSyncTick() != send_tick) {
panic("Out of order periodic sync request - rank:%d "
"(send_tick:%ld ongoing:%ld)", rank, send_tick,
TCPServer::instance->periodicSyncTick());
}
switch (state) {
case SyncState::idle:
state = SyncState::periodic;
TCPServer::instance->syncTryComplete(SyncState::periodic,
MsgType::cmdPeriodicSyncAck);
break;
case SyncState::asyncCkpt:
// An async ckpt request has already been sent to this client and
// that will interrupt this periodic sync. We can simply drop this
// message.
break;
default:
panic("Unexpected state for periodic sync request (rank:%d)",
rank);
break;
}
break;
case MsgType::cmdCkptSyncReq:
DPRINTF(debugSync, "CKPT sync request (rank:%d)\n",rank);
switch (state) {
case SyncState::idle:
TCPServer::instance->ckptPropagate(*this);
// we fall through here to complete #clients==1 case
case SyncState::asyncCkpt:
state = SyncState::ckpt;
TCPServer::instance->syncTryComplete(SyncState::ckpt,
MsgType::cmdCkptSyncAck);
break;
default:
panic("Unexpected state for ckpt sync request (rank:%d)", rank);
break;
}
break;
default:
panic("Unexpected header packet (rank:%d)",rank);
break;
}
}
TCPServer::TCPServer(unsigned clients_num,
unsigned listen_port,
int timeout_in_sec)
{
assert(instance == nullptr);
construct(clients_num, listen_port, timeout_in_sec);
instance = this;
}
TCPServer::~TCPServer()
{
for (auto &c : clientsPollFd)
close(c.fd);
}
void
TCPServer::construct(unsigned clients_num, unsigned port, int timeout_in_sec)
{
int listen_sock, new_sock, ret;
unsigned client_len;
struct sockaddr_in server_addr, client_addr;
struct pollfd new_pollfd;
Channel new_channel;
DPRINTF(debugSetup, "Start listening on port %u ...\n", port);
listen_sock = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sock < 0)
panic("socket() failed:%s", strerror(errno));
bzero(&server_addr, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(port);
if (bind(listen_sock, (struct sockaddr *) &server_addr,
sizeof(server_addr)) < 0)
panic("bind() failed:%s", strerror(errno));
listen(listen_sock, 10);
clientsPollFd.reserve(clients_num);
clientsChannel.reserve(clients_num);
new_pollfd.events = POLLIN | POLLRDHUP;
new_pollfd.revents = 0;
while (clientsPollFd.size() < clients_num) {
new_pollfd.fd = listen_sock;
ret = poll(&new_pollfd, 1, timeout_in_sec*1000);
if (ret == 0)
panic("Timeout while waiting for clients to connect");
assert(ret == 1 && new_pollfd.revents == POLLIN);
client_len = sizeof(client_addr);
new_sock = accept(listen_sock,
(struct sockaddr *) &client_addr,
&client_len);
if (new_sock < 0)
panic("accept() failed:%s", strerror(errno));
new_pollfd.fd = new_sock;
new_pollfd.revents = 0;
clientsPollFd.push_back(new_pollfd);
new_channel.fd = new_sock;
new_channel.isAlive = true;
new_channel.recvRaw(&new_channel.rank, sizeof(new_channel.rank));
clientsChannel.push_back(new_channel);
DPRINTF(debugSetup, "New client connection addr:%u port:%hu rank:%d\n",
client_addr.sin_addr.s_addr, client_addr.sin_port,
new_channel.rank);
}
ret = close(listen_sock);
assert(ret == 0);
DPRINTF(debugSetup, "Setup complete\n");
}
void
TCPServer::run()
{
int nfd;
unsigned num_active_clients = clientsPollFd.size();
DPRINTF(debugSetup, "Entering run() loop\n");
while (num_active_clients == clientsPollFd.size()) {
nfd = poll(&clientsPollFd[0], clientsPollFd.size(), -1);
if (nfd == -1)
panic("poll() failed:%s", strerror(errno));
for (unsigned i = 0, n = 0;
i < clientsPollFd.size() && (signed)n < nfd;
i++) {
struct pollfd &pfd = clientsPollFd[i];
if (pfd.revents) {
if (pfd.revents & POLLERR)
panic("poll() returned POLLERR");
if (pfd.revents & POLLIN) {
clientsChannel[i].headerPktIn();
}
if (pfd.revents & POLLRDHUP) {
// One gem5 process exited or aborted. Either way, we
// assume the full simulation should stop now (either
// because m5 exit was called or a serious error
// occurred.) So we quit the run loop here and close all
// sockets to notify the remaining peer gem5 processes.
pfd.events = 0;
clientsChannel[i].isAlive = false;
num_active_clients--;
DPRINTF(debugSetup, "POLLRDHUP event");
}
n++;
if ((signed)n == nfd)
break;
}
}
}
DPRINTF(debugSetup, "Exiting run() loop\n");
}
void
TCPServer::xferData(const Header &hdr_pkt, const Channel &src)
{
unsigned n;
assert(hdr_pkt.dataPacketLength <= sizeof(packetBuffer));
n = src.recvRaw(packetBuffer, hdr_pkt.dataPacketLength);
if (n == 0)
panic("recvRaw() failed");
DPRINTF(debugPkt, "Incoming data packet (from rank %d) "
"src:0x%02x%02x%02x%02x%02x%02x "
"dst:0x%02x%02x%02x%02x%02x%02x\n",
src.rank,
hdr_pkt.srcAddress[0],
hdr_pkt.srcAddress[1],
hdr_pkt.srcAddress[2],
hdr_pkt.srcAddress[3],
hdr_pkt.srcAddress[4],
hdr_pkt.srcAddress[5],
hdr_pkt.dstAddress[0],
hdr_pkt.dstAddress[1],
hdr_pkt.dstAddress[2],
hdr_pkt.dstAddress[3],
hdr_pkt.dstAddress[4],
hdr_pkt.dstAddress[5]);
// Now try to figure out the destination client(s).
auto dst_info = addressMap.find(&hdr_pkt.dstAddress);
// First handle the multicast/broadcast or unknonw destination case. These
// all trigger a broadcast of the packet to all clients.
if (MultiHeaderPkt::isUnicastAddress(hdr_pkt.dstAddress) == false ||
dst_info == addressMap.end()) {
unsigned n = 0;
for (auto const &c: clientsChannel) {
if (c.isAlive && &c!=&src) {
c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
c.sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
n++;
}
}
if (n == 0) {
inform("Broadcast/multicast packet dropped\n");
}
} else {
// It is a unicast address with a known destination
Channel *dst = dst_info->second;
if (dst->isAlive) {
dst->sendRaw(&hdr_pkt, sizeof(hdr_pkt));
dst->sendRaw(packetBuffer, hdr_pkt.dataPacketLength);
DPRINTF(debugPkt, "Unicast packet sent (to rank %d)\n",dst->rank);
} else {
inform("Unicast packet dropped (destination exited)\n");
}
}
}
void
TCPServer::syncTryComplete(SyncState st, MsgType ack)
{
// Check if the barrieris complete. If so then notify all the clients.
for (auto &c : clientsChannel) {
if (c.isAlive && (c.state != st)) {
// sync not complete yet, stop here
return;
}
}
// Sync complete, send out the acks
MultiHeaderPkt::Header hdr_pkt;
hdr_pkt.msgType = ack;
for (auto &c : clientsChannel) {
if (c.isAlive) {
c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
c.state = SyncState::idle;
}
}
// Reset periodic send tick
_periodicSyncTick = 0;
DPRINTF(st == SyncState::periodic ? debugPeriodic : debugSync,
"Sync COMPLETE\n");
}
void
TCPServer::ckptPropagate(Channel &ch)
{
// Channel ch got a ckpt request that needs to be propagated to the other
// clients
MultiHeaderPkt::Header hdr_pkt;
hdr_pkt.msgType = MsgType::cmdCkptSyncReq;
for (auto &c : clientsChannel) {
if (c.isAlive && (&c != &ch)) {
switch (c.state) {
case SyncState::idle:
case SyncState::periodic:
c.sendRaw(&hdr_pkt, sizeof(hdr_pkt));
c.state = SyncState::asyncCkpt;
break;
default:
panic("Unexpected state for ckpt sync request propagation "
"(rank:%d)\n",c.rank);
break;
}
}
}
}
int main(int argc, char *argv[])
{
TCPServer *server;
int clients_num = -1, listen_port = -1;
int first_arg = 1, timeout_in_sec = 60;
if (argc > 1 && string(argv[1]).compare("-debug") == 0) {
timeout_in_sec = -1;
first_arg++;
argc--;
}
if (argc != 3)
panic("We need two command line args (number of clients and tcp listen"
" port");
clients_num = atoi(argv[first_arg]);
listen_port = atoi(argv[first_arg + 1]);
server = new TCPServer(clients_num, listen_port, timeout_in_sec);
server->run();
delete server;
return 0;
}