UDS: check connection state better

Select(2)ing on UNIX domain sockets was not working properly because
connection state wasn't properly checked/propagated. So selecting for
a read descriptor and closing the write descriptor on the other end
didn't cause select to return. Similarly, read(2) kept blocking while
it should return an error when the other end closed the socket.

Change-Id: I3f5bb52af1a6b03313d508bf915fc838357ba450
This commit is contained in:
Thomas Veerman 2013-03-05 10:10:41 +00:00
parent 449ed17833
commit 1ba514e19c
3 changed files with 326 additions and 68 deletions

View file

@ -237,17 +237,16 @@ int uds_close(message *dev_m_in, message *dev_m_out)
/* if the socket is connected, disconnect it */
if (uds_fd_table[minor].peer != -1) {
int peer = uds_fd_table[minor].peer;
/* set peer of this peer to -1 */
uds_fd_table[uds_fd_table[minor].peer].peer = -1;
uds_fd_table[peer].peer = -1;
/* error to pass to peer */
uds_fd_table[uds_fd_table[minor].peer].err = ECONNRESET;
uds_fd_table[peer].err = ECONNRESET;
/* if peer was blocked on I/O revive peer */
if (uds_fd_table[uds_fd_table[minor].peer].suspended) {
int peer = uds_fd_table[minor].peer;
if (uds_fd_table[peer].suspended) {
uds_fd_table[peer].ready_to_revive = 1;
uds_unsuspend(dev_m_in->m_source, peer);
}
@ -334,11 +333,14 @@ int uds_select(message *dev_m_in, message *dev_m_out)
break;
}
}
} else if (bytes != SUSPEND) {
uds_fd_table[minor].sel_ops_out |= SEL_RD;
}
/* check if we can write without blocking */
bytes = uds_perform_write(minor, dev_m_in->m_source, PIPE_BUF, 1);
if (bytes > 0) {
if (bytes != 0 && bytes != SUSPEND) {
/* There is room to write or there is an error condition */
uds_fd_table[minor].sel_ops_out |= SEL_WR;
}
@ -352,7 +354,7 @@ int uds_select(message *dev_m_in, message *dev_m_out)
static int uds_perform_read(int minor, endpoint_t m_source,
size_t size, int pretend)
{
int rc;
int rc, peer;
message fs_m_in;
message fs_m_out;
@ -362,7 +364,8 @@ static int uds_perform_read(int minor, endpoint_t m_source,
++call_count);
#endif
/* skip reads and writes of 0 (or less!) bytes */
peer = uds_fd_table[minor].peer;
if (size <= 0) {
return 0;
}
@ -374,8 +377,30 @@ static int uds_perform_read(int minor, endpoint_t m_source,
return EPIPE;
}
/* skip reads and writes of 0 (or less!) bytes */
if (uds_fd_table[minor].size == 0) {
if (peer == -1) {
/* We're not connected. That's only a problem when this
* socket is connection oriented. */
if (uds_fd_table[minor].type == SOCK_STREAM ||
uds_fd_table[minor].type == SOCK_SEQPACKET) {
if (uds_fd_table[minor].err == ECONNRESET) {
uds_fd_table[minor].err = 0;
return ECONNRESET;
} else {
return ENOTCONN;
}
}
}
/* Check if process is reading from a closed pipe */
if (peer != -1 && !(uds_fd_table[peer].mode & S_IWUSR) &&
uds_fd_table[minor].size == 0) {
return 0;
}
if (pretend) {
return SUSPEND;
}
@ -383,10 +408,7 @@ static int uds_perform_read(int minor, endpoint_t m_source,
/* maybe a process is blocked waiting to write? if
* needed revive the writer
*/
if (uds_fd_table[minor].peer != -1 &&
uds_fd_table[uds_fd_table[minor].peer].suspended) {
int peer = uds_fd_table[minor].peer;
if (peer != -1 && uds_fd_table[peer].suspended) {
uds_fd_table[peer].ready_to_revive = 1;
uds_unsuspend(m_source, peer);
}
@ -394,7 +416,6 @@ static int uds_perform_read(int minor, endpoint_t m_source,
#if DEBUG == 1
printf("(uds) [%d] suspending read request\n", minor);
#endif
/* Process is reading from an empty pipe,
* suspend it so some bytes can be written
*/
@ -448,10 +469,7 @@ static int uds_perform_read(int minor, endpoint_t m_source,
/* maybe a big write was waiting for us to read some data, if
* needed revive the writer
*/
if (uds_fd_table[minor].peer != -1 &&
uds_fd_table[uds_fd_table[minor].peer].suspended) {
int peer = uds_fd_table[minor].peer;
if (peer != -1 && uds_fd_table[peer].suspended) {
uds_fd_table[peer].ready_to_revive = 1;
uds_unsuspend(m_source, peer);
}
@ -459,18 +477,14 @@ static int uds_perform_read(int minor, endpoint_t m_source,
/* see if peer is blocked on select() and a write is possible
* (from peer to minor)
*/
if (uds_fd_table[minor].peer != -1 &&
uds_fd_table[uds_fd_table[minor].peer].selecting == 1 &&
(uds_fd_table[minor].size + uds_fd_table[minor].pos + 1
< PIPE_BUF)) {
int peer = uds_fd_table[minor].peer;
if (peer != -1 && uds_fd_table[peer].selecting == 1 &&
(uds_fd_table[minor].size+uds_fd_table[minor].pos + 1 < PIPE_BUF)){
/* if the peer wants to know about write being possible
* and it doesn't know about it already, then let the peer know.
*/
if ((uds_fd_table[peer].sel_ops_in & SEL_WR) &&
!(uds_fd_table[peer].sel_ops_out & SEL_WR)) {
if (peer != -1 && (uds_fd_table[peer].sel_ops_in & SEL_WR) &&
!(uds_fd_table[peer].sel_ops_out & SEL_WR)) {
/* a write on peer is possible now */
uds_fd_table[peer].sel_ops_out |= SEL_WR;
@ -560,6 +574,11 @@ static int uds_perform_write(int minor, endpoint_t m_source,
return ENOENT;
}
/* check if we write to a closed pipe */
if (!(uds_fd_table[peer].mode & S_IRUSR)) {
return EPIPE;
}
/* check if write would overrun buffer. check if message
* boundry preserving types (SEQPACKET and DGRAM) wouldn't write
* to an empty buffer. check if connectionless sockets have a
@ -718,7 +737,7 @@ int uds_write(message *dev_m_in, message *dev_m_out)
if (uds_fd_table[minor].state != UDS_INUSE) {
/* attempted to close a socket that hasn't been opened --
/* attempted to write to a socket that hasn't been opened --
* something is very wrong :(
*/
uds_set_reply(dev_m_out, DEV_REVIVE, dev_m_in->USER_ENDPT,

View file

@ -857,14 +857,12 @@ int do_shutdown(message *dev_m_in, message *dev_m_out)
switch (how) {
case SHUT_RD:
/* take away read permission */
uds_fd_table[minor].mode =
uds_fd_table[minor].mode ^ S_IRUSR;
uds_fd_table[minor].mode &= ~S_IRUSR;
break;
case SHUT_WR:
/* take away write permission */
uds_fd_table[minor].mode =
uds_fd_table[minor].mode ^ S_IWUSR;
uds_fd_table[minor].mode &= ~S_IWUSR;
break;
case SHUT_RDWR:

View file

@ -83,6 +83,9 @@
int types[3] = {SOCK_STREAM, SOCK_SEQPACKET, SOCK_DGRAM};
char sock_fullpath[PATH_MAX + 1];
void test_abort_client_server(int abort_type);
void test_abort_client(int abort_type);
void test_abort_server(pid_t pid, int abort_type);
/* timestamps for debug and error logs */
char *get_timestamp(void)
@ -115,26 +118,27 @@ char *get_timestamp(void)
}
/* macro to display information about a failed test and increment the errct */
#define test_fail(msg) \
do { \
char *timestamp; \
timestamp = get_timestamp(); \
if (errct == 0) fprintf(stderr, "\n"); \
fprintf(stderr, "[ERROR][%s] (%s Line %d) %s [pid=%d:errno=%d:%s]\n", \
timestamp, __FILE__, __LINE__, msg, getpid(), \
errno, strerror(errno)); \
fflush(stderr); \
if (timestamp != NULL) { \
free(timestamp); \
timestamp = NULL; \
} \
errct++; \
if (errct++ > MAX_ERROR) { \
printf("Too many errors; test aborted\n"); \
quit(); \
exit(1); \
} \
} while (0)
void test_fail_fl(char *msg, char *file, int line)
{
char *timestamp;
timestamp = get_timestamp();
if (errct == 0) fprintf(stderr, "\n");
fprintf(stderr, "[ERROR][%s] (%s Line %d) %s [pid=%d:errno=%d:%s]\n",
timestamp, file, line, msg, getpid(),
errno, strerror(errno));
fflush(stderr);
if (timestamp != NULL) {
free(timestamp);
timestamp = NULL;
}
errct++;
if (errct++ > MAX_ERROR) {
printf("Too many errors; test aborted\n");
quit();
exit(1);
}
}
#define test_fail(msg) test_fail_fl(msg, __FILE__, __LINE__)
/* Convert name to the full path of the socket. Assumes name is in cwd. */
char *fullpath(char *name)
@ -151,21 +155,21 @@ char *fullpath(char *name)
#if DEBUG == 1
/* macros to display debugging information */
#define debug(msg) \
do { \
char *timestamp; \
timestamp = get_timestamp(); \
fprintf(stdout,"[DEBUG][%s] (%s:%d) %s [pid=%d]\n", \
timestamp, __FILE__, __LINE__, msg, getpid()); \
fflush(stdout); \
if (timestamp != NULL) { \
free(timestamp); \
timestamp = NULL; \
} \
} while (0)
void debug_fl(char *msg, char *file, int line)
{
char *timestamp;
timestamp = get_timestamp();
fprintf(stdout,"[DEBUG][%s] (%s:%d) %s [pid=%d]\n",
timestamp, __FILE__, __LINE__, msg, getpid());
fflush(stdout);
if (timestamp != NULL) {
free(timestamp);
timestamp = NULL;
}
}
#define debug(msg) debug_fl(msg, __FILE__, __LINE__)
#else
#define debug(msg) \
do { /* nothing */; } while (0)
#define debug(msg)
#endif
#define SOCKET(sd,domain,type,protocol) \
@ -1455,6 +1459,7 @@ void test_simple_client(int type)
}
rc = write(sd, buf, strlen(buf) + 1);
if (rc == -1) {
test_fail("write");
}
@ -1548,12 +1553,10 @@ void test_simple_server(int type, pid_t pid)
if (rc == -1) {
test_fail("write");
}
rc = close(client_sd);
if (rc == -1) {
test_fail("close");
}
}
rc = close(sd);
@ -1571,10 +1574,164 @@ void test_simple_server(int type, pid_t pid)
errct += WEXITSTATUS(status);
}
void test_abort_client_server(int abort_type)
{
pid_t pid;
debug("test_simple_client_server()");
UNLINK(TEST_SUN_PATH);
/* the signal handler is only used by the client, but we have to
* install it now. if we don't the server may signal the client
* before the handler is installed.
*/
debug("installing signal handler");
if (signal(SIGUSR1, test_xfer_sighdlr) == SIG_ERR) {
test_fail("signal(SIGUSR1, test_xfer_sighdlr) failed");
}
debug("signal handler installed");
server_ready = 0;
pid = fork();
if (pid == -1) {
test_fail("fork() failed");
return;
} else if (pid == 0) {
debug("child");
test_abort_client(abort_type);
test_fail("we should never get here");
exit(1);
} else {
debug("parent");
test_abort_server(pid, abort_type);
debug("parent done");
}
UNLINK(TEST_SUN_PATH);
}
void test_abort_client(int abort_type)
{
char buf[BUFSIZE];
int sd, rc;
struct sockaddr_un addr;
sd = socket(PF_UNIX, SOCK_STREAM, 0);
if (sd == -1) {
test_fail("socket");
exit(errct);
}
while (server_ready == 0) {
debug("[client] waiting for the server");
sleep(1);
}
strncpy(addr.sun_path, TEST_SUN_PATH, sizeof(addr.sun_path) - 1);
addr.sun_family = AF_UNIX;
bzero(buf, BUFSIZE);
snprintf(buf, BUFSIZE-1, "Hello, My Name is Client.");
rc = connect(sd, (struct sockaddr *) &addr, sizeof(struct sockaddr_un));
if (rc == -1) {
test_fail("connect");
exit(errct);
}
if (abort_type == 2) {
/* Give server a chance to close connection */
sleep(2);
rc = write(sd, buf, strlen(buf) + 1);
if (rc != -1) {
test_fail("write should have failed\n");
}
if (errno != ECONNRESET) {
test_fail("errno should've been ECONNRESET\n");
}
}
rc = close(sd);
if (rc == -1) {
test_fail("close");
}
exit(errct);
}
void test_abort_server(pid_t pid, int abort_type)
{
char buf[BUFSIZE];
int sd, rc, client_sd, status;
struct sockaddr_un addr;
socklen_t addr_len;
addr_len = sizeof(struct sockaddr_un);
sd = socket(PF_UNIX, SOCK_STREAM, 0);
if (sd == -1) {
test_fail("socket");
}
strncpy(addr.sun_path, TEST_SUN_PATH, sizeof(addr.sun_path) - 1);
addr.sun_family = AF_UNIX;
rc = bind(sd, (struct sockaddr *) &addr, sizeof(struct sockaddr_un));
if (rc == -1) {
test_fail("bind");
}
rc = listen(sd, 5);
if (rc == -1) {
test_fail("listen");
}
/* we're ready for connections, time to tell the client
* to start the test
*/
kill(pid, SIGUSR1);
client_sd = accept(sd, (struct sockaddr *) &addr, &addr_len);
if (client_sd == -1) {
test_fail("accept");
}
if (abort_type == 1) {
memset(buf, '\0', BUFSIZE);
rc = read(client_sd, buf, BUFSIZE);
if (rc != -1) {
test_fail("read should've failed\n");
}
if (errno != ECONNRESET) {
test_fail("errno should've been ECONNRESET\n");
}
} /* else if (abort_type == 2) { */
rc = close(client_sd);
if (rc == -1) {
test_fail("close");
}
/* } */
rc = close(sd);
if (rc == -1) {
test_fail("close");
}
/* wait for client to exit */
do {
errno = 0;
rc = waitpid(pid, &status, 0);
} while (rc == -1 && errno == EINTR);
/* we use the exit status to get its error count */
errct += WEXITSTATUS(status);
}
void test_simple_client_server(int type)
{
pid_t pid;
debug("test_simple_client_server()");
UNLINK(TEST_SUN_PATH);
@ -2602,6 +2759,88 @@ void test_fd_passing(void) {
}
}
void test_select()
{
int i, nfds = -1;
int socks[2];
fd_set readfds, writefds;
struct timeval tv;
int res = 0;
char buf[1];
for (i = 0; i < OPEN_MAX; i++) {
FD_CLR(i, &readfds);
FD_CLR(i, &writefds);
}
tv.tv_sec = 2;
tv.tv_usec = 0; /* 2 sec time out */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) < 0) {
test_fail("Can't open socket pair.");
}
FD_SET(socks[0], &readfds);
nfds = socks[1] + 1;
/* Close the write end of the socket to generate EOF on read end */
if ((res = shutdown(socks[1], SHUT_WR)) != 0) {
test_fail("shutdown failed\n");
}
res = select(nfds, &readfds, NULL, NULL, &tv);
if (res != 1) {
test_fail("select should've returned 1 ready fd\n");
}
if (!(FD_ISSET(socks[0], &readfds))) {
test_fail("The server didn't respond within 2 seconds");
}
/* Now try to read from empty, closed pipe */
if (read(socks[0], buf, sizeof(buf)) != 0) {
test_fail("reading from empty, closed pipe should return EOF");
}
close(socks[0]);
/* Try again the other way around: create a socketpair, close the
* read end, and try to write. This should cause an EPIPE */
tv.tv_sec = 2;
tv.tv_usec = 0; /* 2 sec time out */
if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) < 0) {
test_fail("Can't open socket pair.");
}
FD_SET(socks[1], &writefds);
nfds = socks[1] + 1;
/* kill the read end of the socket to generate EPIPE on write end */
if ((res = shutdown(socks[0], SHUT_RD)) != 0) {
test_fail("shutdown failed\n");
}
res = select(nfds, NULL, &writefds, NULL, &tv);
if (res != 1) {
test_fail("select should've returned 1 ready fd\n");
}
if (!(FD_ISSET(socks[1], &writefds))) {
test_fail("The server didn't respond within 2 seconds");
}
/* Now try to write to closed pipe */
errno = 0;
if ((res = write(socks[1], buf, sizeof(buf))) != -1) {
printf("write res = %d\n", res);
test_fail("writing to empty, closed pipe should fail");
}
if (errno != EPIPE) {
printf("errno = %d\n", errno);
test_fail("writing to closed pipe should return EPIPE\n");
}
close(socks[1]);
}
int main(int argc, char *argv[])
{
int i;
@ -2633,13 +2872,15 @@ int main(int argc, char *argv[])
if (types[i] != SOCK_DGRAM) test_vectorio(types[i]);
if (types[i] != SOCK_DGRAM) test_msg(types[i]);
}
test_abort_client_server(1);
test_abort_client_server(2);
test_msg_dgram();
test_connect();
test_multiproc_read();
test_multiproc_write();
test_scm_credentials();
test_fd_passing();
test_select();
quit();
return -1; /* we should never get here */