Logo Search packages:      
Sourcecode: maria version File versions

server.C

Go to the documentation of this file.
// Parallelized safety property analysis -*- c++ -*-

#if defined WIN32 || defined __WIN32
# undef __STRICT_ANSI__
# include <sys/types.h>
# include <io.h>
# include <winsock.h>
typedef long ssize_t;
typedef int socklen_t;
#else
# include <sys/types.h>
# include <unistd.h>
# include <sys/socket.h>
# include <sys/un.h>
# ifndef __hpux
#  include <sys/select.h>
# endif // !__hpux
# include <netinet/in.h>
# include <netdb.h>
# ifdef _AIX
#  include <strings.h> // for bzero, used by FD_ZERO
# endif // _AIX
#endif

#if defined __CYGWIN__ || defined __sgi || defined __APPLE__
typedef int socklen_t;
#endif // __CYGWIN__ || __sgi || __APPLE__

#ifndef __WIN32
# include <sys/errno.h>
# include <sys/signal.h>
#endif // !__WIN32
#include <errno.h>
#include <signal.h>

#include <stdlib.h>

#include "ByteBuffer.h"
#include "StateList.h"
#include "StateSet.h"
#include "StateSetReporter.h"

// for computing the initial state
#include "Net.h"
#include "GlobalMarking.h"

#ifndef MSG_NOSIGNAL
/** A flag for send(2) and recv(2) not to send signals on failure */
00049 # define MSG_NOSIGNAL 0
#endif // MSG_NOSIGNAL

/**
 * @file server.C
 * Parallelized safety property analysis
 */

/* Copyright © 2002-2003 Marko Mäkelä (msmakela@tcs.hut.fi).

   This file is part of MARIA, a reachability analyzer and model checker
   for high-level Petri nets.

   MARIA is free software; you can redistribute it and/or modify it
   under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2, or (at your option)
   any later version.

   MARIA is distributed in the hope that it will be useful, but
   WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   General Public License for more details.

   The GNU General Public License is often shipped with GNU software, and
   is generally kept in a file called COPYING or LICENSE.  If you do not
   have a copy of the license, write to the Free Software Foundation,
   59 Temple Place, Suite 330, Boston, MA 02111 USA. */

/** active file descriptors */
00078 static fd_set fds;
/** the highest active file descriptor */
00080 static int nfd;
/** number of active and idle client connections */
00082 static unsigned numfd;

/** Close a socket.
 * @param fd      socket to be closed
 */
static void
00088 close_socket (int fd)
{
  close (fd);
  assert (FD_ISSET (fd, &fds));
  FD_CLR (fd, &fds);
  numfd--;
  if (nfd == fd + 1) {
    while (--fd && !FD_ISSET (fd, &fds));
    nfd = fd + 1;
  }
}

/** Client status */
00101 class Client
{
public:
  /** Constructor
   * @param addr_ IP address of the client (0=not connected via IP)
   * @param port_ TCP port of the client
   */
00108   Client (unsigned long addr_, unsigned short port_) :
    addr (addr_), port (port_), rbuf (), rbufu (0), rbufv (0),
    length (0), states (), sbuf (), sbufu (0) {}
private:
  /** Copy constructor */
  Client (const class Client& old);
  /** Assignment operator */
  class Client& operator= (const class Client& other);
public:
  /** Destructor */
00118   ~Client () {}
  /** Determine the current length of the queue */
00120   unsigned getLength () const { return length; }
  /** Send a state to the client
   * @param buf         the encoded state (deflated)
   * @param size  length of the state in bytes
   * @param offset      file offset in the counterexample trace
   */
00126   void push (word_t* buf, size_t size, long offset) {
    length++, states.push_allocated (buf, size, offset);
    sbuf.append (size), sbuf.append (buf, size);
  }
  /** Remove a state when the client has processed it
   * @param item  (output) the dequeued item
   */
00133   void pop (struct StateList::item& item) {
    assert (length > 0);
    length--;
    states.pop (true, &item);
  }
  /** Remove a state when the client has processed it */
00139   void pop () {
    struct StateList::item item;
    assert (length > 0);
    length--;
    states.pop (true, &item);
    delete[] item.data;
  }
  /** Append the queue of a retired client to another queue
   * @param q           the queue to receive the states
   */
00149   void retire (class StateList& q) {
    q.push (states);
    length = 0;
  }
  /** Get the file offset in the counterexample trace for the topmost state */
00154   long getOffset () const {
    assert (length > 0);
    StateList::const_iterator i = states.end ();
    return (--i)->offset;
  }
  /** Determine whether there is pending data for the client */
00160   bool hasUnsent () const {
    assert (sbufu ? sbufu < sbuf.getLength () : !sbuf.getLength ());
    return sbufu > 0;
  }
  /** Send pending data
   * @param fd          the socket to send the data to
   * @return            true on success; false on failure
   */
00168   bool sendBuf (int fd) {
    assert (sbuf.getLength () > sbufu);
    const char* buf = reinterpret_cast<const char*>(sbuf.getBuf ());
    ssize_t i = send (fd, buf + sbufu, sbuf.getLength () - sbufu,
                  MSG_NOSIGNAL);
    if (i < 0) {
#if defined WIN32 || defined __WIN32
      fprintf (stderr, "send: WinSock error %d\n", WSAGetLastError ());
#else
      perror ("send");
#endif
      return false;
    }
    if ((sbufu += i) == sbuf.getLength ())
      sbuf.clear (), sbufu = 0;
    return true;
  }

  /** Notify that the client is terminating */
00187   void printTerminating () const {
    if (addr)
      fprintf (stderr, "client %u.%u.%u.%u:%u terminating\n",
             unsigned (addr >> 24) & 0xff,
             unsigned (addr >> 16) & 0xff,
             unsigned (addr >> 8) & 0xff,
             unsigned (addr) & 0xff,
             port);
  }

  /** the IP address of the client (0=not connected via IP) */
00198   const unsigned long addr;
  /** the TCP port of the client */
00200   const unsigned short port;
  /** Buffer for inbound data */
00202   class BytePacker rbuf;
  /** Index of the first unread byte in rbuf */
00204   unsigned rbufu;
  /** Index of the first unvisited byte in rbuf (for addStates packets) */
00206   unsigned rbufv;
private:
  /** Length of the state queue */
00209   unsigned length;
  /** States that have been sent to the client */
00211   class StateList states;
  /** Buffer for outbound data */
00213   class BytePacker sbuf;
  /** Index of the first unsent byte in sbuf */
00215   unsigned sbufu;
};

/** serve the clients
 * @param s       the master socket file descriptor
 * @param reporter      the state space interface
 * @param breadth flag: apply breadth-first search
 */
void
00224 serve (int s,
       class StateSetReporter& reporter,
       bool breadth)
{
  FD_ZERO (&fds);
  FD_SET (s, &fds);
  numfd = 0;
  nfd = s + 1;
  /** Initial state */
  class BitPacker initial;
  /** Active clients */
  class Client** clients = 0;
  /** Allocated size of clients[] */
  unsigned numAllocClients = 0;
  /** Queue of retired states */
  class StateList retired;
  /** Number of pending states in the server queue */
  unsigned numPending = 1;
  /** Total number of states being processed by the clients */
  unsigned numPendingC = 0;

  if (!reporter.net.getInitMarking ()->encode
      (initial, *reporter.net.getInitMarking (), 0))
    assert (false);
  // add the initial state to the set
  initial.deflate ();
  reporter.report (initial.getBuf (), initial.getNumBytes (), false, false, 0);

  for (;;) {
    extern volatile bool interrupted;
    int i, fd;
    if (interrupted || (!numPending && !numPendingC)) {
    cleanup:
      for (i = 0; i < nfd; i++) {
      if (!FD_ISSET (i, &fds))
        continue;
      close (i);
      if (i != s)
        delete clients[i];
      }
      delete[] clients;
      return;
    }
    /** copy fds for the select(2) call */
    fd_set readfds, writefds;
    memcpy (&readfds, &fds, sizeof fds);
    if (numPending) {
      memcpy (&writefds, &fds, sizeof fds);
      FD_CLR (s, &writefds);
    }
    if ((i = select (nfd, &readfds, numPending ? &writefds : 0, 0, 0)) <= 0) {
      if (errno == EINTR)
      continue;
#if defined WIN32 || defined __WIN32
      fprintf (stderr, "select: WinSock error %d\n", WSAGetLastError ());
#else
      perror ("select");
#endif
      goto cleanup;
    }

    if (FD_ISSET (s, &readfds)) {
      struct sockaddr_in addr;
#ifdef __hpux
      int addrlen = sizeof addr;
#else // __hpux
      socklen_t addrlen = sizeof addr;
#endif // __hpux
      fd = accept (s, reinterpret_cast<struct sockaddr*>(&addr), &addrlen);
      if (fd < 0) {
#if defined WIN32 || defined __WIN32
      fprintf (stderr, "accept: WinSock error %d\n", WSAGetLastError ());
#else
      perror ("accept");
#endif
      }
      else {
      if (!numAllocClients) {
        for (numAllocClients = 1; numAllocClients <= unsigned (fd);
             numAllocClients <<= 1);
        memset (clients = new class Client*[numAllocClients],
              0, numAllocClients * sizeof *clients);
      }
      else if (unsigned (fd) >= numAllocClients) {
        unsigned n = numAllocClients;
        do
          n <<= 1;
        while (unsigned (fd) >= n);
        class Client** c = new class Client*[n];
        memcpy (c, clients, numAllocClients * sizeof *clients);
        delete[] clients;
        clients = c;
        memset (clients + numAllocClients, 0,
              (n - numAllocClients) * sizeof *clients);
        numAllocClients = n;
      }

      clients[fd] = addrlen == sizeof addr && addr.sin_family == AF_INET
        ? new class Client (ntohl (addr.sin_addr.s_addr),
                        ntohs (addr.sin_port))
        : new class Client (0, 0);

      FD_SET (fd, &fds);
      if (fd >= nfd)
        nfd = fd + 1;
      numfd++;
      }
      FD_CLR (s, &readfds);
      i--;
    }

    /* process client requests until all clients terminate */
    for (fd = nfd;;) {
      while (--fd && !FD_ISSET (fd, &readfds));
      if (!fd)
      break;
      i--;

      class Client& c = *clients[fd];
      assert (c.rbufu ? c.rbufu < c.rbuf.getLength () : !c.rbuf.getLength ());
      c.rbuf.allocate (2048);

      ssize_t rlen = recv (fd, reinterpret_cast<char*>
                     (c.rbuf.getBuf () + c.rbuf.getLength ()),
                     c.rbuf.getAllocated () - c.rbuf.getLength (),
                     MSG_NOSIGNAL);
      if (rlen < 0) {
#if defined WIN32 || defined __WIN32
      {
        int wsaerror = WSAGetLastError ();
        if (wsaerror != WSAECONNRESET)
          fprintf (stderr, "recv: WinSock error %d\n", wsaerror);
      }
#else
      if (errno != ECONNRESET)
        perror ("recv");
#endif
      cleanup_readfd:
      c.printTerminating ();
      assert (numPendingC >= c.getLength ());
      numPending += c.getLength ();
      numPendingC -= c.getLength ();
      c.retire (retired);
      delete clients[fd];
      clients[fd] = 0;
      close_socket (fd);
      continue;
      }
      else if (!rlen)
      goto cleanup_readfd;

      c.rbuf.setLength (c.rbuf.getLength () + rlen);

      for (class ByteUnpacker u (c.rbuf.getBuf () + c.rbufu);;) {
      unsigned a;
      if ((c.rbufu = u.buf - c.rbuf.getBuf ()) == c.rbuf.getLength ())
        c.rbufu = c.rbufv = 0, c.rbuf.clear (), u.buf = c.rbuf.getBuf ();
      if (!u.extract (c.rbuf, a)) {
      nodata:
        break;
      }
      switch (a) {
      default:
        fprintf (stderr, "socket %d: unexpected request %u\n", fd, a);
        goto cleanup_readfd;
      case StateSet::initialState:
        if (!u.extract (c.rbuf, a) ||
            !u.ensureData (c.rbuf, a))
          goto nodata;
        if (a != initial.getNumBytes () ||
            memcmp (u.buf, initial.getBuf (), a)) {
          fprintf (stderr, "socket %d: initial state mismatch\n", fd);
          goto cleanup_readfd;
        }
        u.buf += a;
        break;
      case StateSet::addStates:
        if (!c.getLength ()) {
        nowork:
          fprintf (stderr, "socket %d: unexpected submission\n", fd);
          goto cleanup_readfd;
        }
        if (!u.extract (c.rbuf, a))
          goto nodata;
        if (a) {
          reporter.setOffset (c.getOffset ());
          while (a--) {
            unsigned numBytes;
            if (!u.extract (c.rbuf, numBytes) ||
              !u.ensureData (c.rbuf, numBytes))
            goto nodata;
            if (c.rbuf.getBuf () + c.rbufv >= u.buf) {
            // the state has been read already
            u.buf += numBytes;
            continue;
            }
            if (reporter.report (u.buf, numBytes, false, false, numPendingC))
            numPending++;
            c.rbufv = (u.buf += numBytes) - c.rbuf.getBuf ();
          }
        }
        // remove the state, as the client has processed it
        numPendingC--, c.pop ();
        break;
      case StateSet::deadlockState:
      case StateSet::deadlockFatal:
      case StateSet::rejectState:
      case StateSet::rejectFatal:
      case StateSet::propertyError:
      case StateSet::inconsistent:
      case StateSet::deadlockState | 0x20:
      case StateSet::deadlockFatal | 0x20:
      case StateSet::rejectState | 0x20:
      case StateSet::rejectFatal | 0x20:
      case StateSet::propertyError | 0x20:
      case StateSet::inconsistent | 0x20:
        if (!c.getLength ())
          goto nowork;
        else {
          unsigned dlen;
          if (!u.extract (c.rbuf, dlen) ||
            !u.ensureData (c.rbuf, dlen))
            goto nodata;
          unsigned char* dstate = dlen ? new unsigned char[dlen] : 0;
          if (dlen)
            u.extract (dstate, dlen);
          reporter.reject (dstate, dlen, c.getOffset (),
                       a & 0x20, a & ~0x20);
          delete[] dstate;
        }
        switch (a & ~0x20) {
        case StateSet::deadlockFatal:
        case StateSet::rejectFatal:
        case StateSet::propertyError:
        case StateSet::inconsistent:
          goto cleanup;
        default:
          break;
        }
        break;
      }
      }
    }

    assert (i >= 0);

    if (!i || !numfd)
      continue;

    /** number of unprocessed states per client */
    const unsigned numStates = (numPending + numPendingC + numfd - 1) / numfd;

    /* distribute unprocessed states to clients until the queue is empty */
    for (fd = nfd; i-- && numPending; ) {
      while (fd--, !FD_ISSET (fd, &writefds))
      assert (fd > 0);
      class Client& c = *clients[fd];
      assert (numPendingC >= c.getLength ());
      if (c.hasUnsent ()) {
      if (!c.sendBuf (fd)) {
      cleanup_writefd:
        c.printTerminating ();
        assert (numPendingC >= c.getLength ());
        numPending += c.getLength ();
        numPendingC -= c.getLength ();
        c.retire (retired);
        delete clients[fd];
        clients[fd] = 0;
        close_socket (fd);
      }
      continue;
      }
      if (c.getLength () >= numStates)
      continue;
      for (unsigned j = numStates - c.getLength (); j--; ) {
      assert (numPending > 0);
      word_t* state;
      long offset;
      size_t numBytes;
      if (retired.empty ())
        state = reporter.pop (breadth, numBytes, offset);
      else
        state = retired.pop (true, offset, &numBytes);
      c.push (state, numBytes, offset);
      numPendingC++;
      if (!--numPending)
        break;
      }
      if (!c.sendBuf (fd))
      goto cleanup_writefd;
    }
  }
}

static struct sockaddr_in addr_in;

/** fork client processes
 * @param num           number of clients to fork
 * @param server  (output) flag: is this the server process?
 * @return        socket number, or <0 on error
 */
int
00526 createJobs (unsigned num,
          bool& server)
{
  assert (!addr_in.sin_family || addr_in.sin_family == AF_INET);
  assert (addr_in.sin_family == AF_INET || num);
  server = true;
#if defined __WIN32 || defined WIN32
  int s;
  if (0 > (s = socket (PF_INET, SOCK_STREAM, 0))) {
    fprintf (stderr, "socket (PF_INET, SOCK_STREAM, 0): WinSock error %d",
           WSAGetLastError ());
    return -1;
  }
  if (bind (s, reinterpret_cast<const struct sockaddr*>(&addr_in),
          sizeof addr_in))
    fprintf (stderr, "bind: WinSock error %d", WSAGetLastError ());
  else if (listen (s, num))
    fprintf (stderr, "listen: WinSock error %d", WSAGetLastError ());
  else
    return s;
#else // __WIN32 || WIN32
  struct sockaddr_un addr;
  if (!addr_in.sin_family) {
    char* sname = tmpnam (0);
    unsigned len = strlen (sname);
    addr.sun_family = AF_UNIX;
    if (len >= sizeof addr.sun_path)
      len = sizeof addr.sun_path - 1;
    memcpy (addr.sun_path, sname, len);
    addr.sun_path[len] = 0;
  }
  int s;
  if (0 > (s = socket (addr_in.sin_family ? PF_INET : PF_UNIX,
                   SOCK_STREAM, 0))) {
    perror (addr_in.sin_family
          ? "socket (PF_INET, SOCK_STREAM, 0)"
          : "socket (PF_UNIX, SOCK_STREAM, 0)");
    return -1;
  }
  if (bind (s, addr_in.sin_family
          ? reinterpret_cast<const struct sockaddr*>(&addr_in)
          : reinterpret_cast<const struct sockaddr*>(&addr),
          addr_in.sin_family ? sizeof addr_in : sizeof addr))
    perror ("bind");
  else if (listen (s, num))
    perror ("listen");
  else if (addr_in.sin_family)
    return s;
  else {
    while (num--) {
      pid_t pid = fork ();
      if (!pid) {
      server = false;
      close (s);
      if (0 > (s = socket (PF_UNIX, SOCK_STREAM, 0)))
        perror ("socket (PF_UNIX, SOCK_STREAM, 0)");
      else if (connect (s, reinterpret_cast<const struct sockaddr*>(&addr),
                    sizeof addr)) {
        fputs (addr.sun_path, stderr);
        perror (": connect");
        close (s);
      }
      else
        return s;
      return -1;
      }
      else if (pid < 0) {
      perror ("fork");
      close (s);
      return -1;
      }
    }

    return s;
  }
  unlink (addr.sun_path);
#endif // __WIN32 || WIN32
  return -1;
}

#if defined WIN32 || defined __WIN32
/** flag: has the Windows socket layer (winsock) been initialized */
static bool called_WSAStartup = false;
/** data for windows socket layer */
static WSADATA wsadata;
#endif

/** Resolve a TCP/IP port/address string.
 * @param address the address to be resolved
 * @return        pointer to the corresponding structure, or 0 on error
 */
struct sockaddr_in*
00618 resolve (const char* address)
{
  char* endp;
  unsigned num = strtoul (address, &endp, 0);
  if (!*address || (*endp && *endp != '/') || !num || num > 65535) {
    fprintf (stderr, "error in connection string: `%s'\n", address);
    return 0;
  }
#if defined WIN32 || defined __WIN32
  if (!called_WSAStartup) {
    called_WSAStartup = true;
    if (int st = WSAStartup (MAKEWORD (1, 1), &wsadata)) {
      fprintf (stderr, "WSAStartup returned %d", st);
      return 0;
    }
  }
#endif
  addr_in.sin_family = 0;
  addr_in.sin_port = htons (num);
  if (*endp++ == '/') {
    const struct hostent* h = gethostbyname (endp);
    if (!h) {
      fputs ("gethostbyname: ", stderr);
      fputs (endp, stderr);
      fputs (": ", stderr);
      switch (h_errno) {
      case HOST_NOT_FOUND:
      fputs ("host not found\n", stderr);
      break;
      case NO_ADDRESS:
      fputs ("no address\n", stderr);
      break;
      case NO_RECOVERY:
      fputs ("non-recoverable error\n", stderr);
      break;
      case TRY_AGAIN:
      fputs ("try again later\n", stderr);
      break;
      default:
      fprintf (stderr, "unknown error %d\n", h_errno);
#if defined WIN32 || defined __WIN32
      fprintf (stderr, "WinSock error %d\n", WSAGetLastError ());
#endif
      break;
      }
      return 0;
    }
    if (h->h_addrtype != AF_INET ||
      h->h_length != sizeof addr_in.sin_addr) {
      fputs ("gethostbyname ", stderr);
      fputs (endp, stderr);
      fputs (": not an IPv4 address\n", stderr);
    }
    else
      memcpy (&addr_in.sin_addr, h->h_addr, sizeof addr_in.sin_addr);
  }
  else
    addr_in.sin_addr.s_addr = INADDR_ANY;

  addr_in.sin_family = AF_INET;
  return &addr_in;
}

/** Connect to the server process specified by resolve ().
 * @return        socket number for client; 0 for the server; <0 on error
 */
int
00685 client_connect (void)
{
  int s;
  assert (addr_in.sin_family == AF_INET);
  if (addr_in.sin_addr.s_addr == INADDR_ANY)
    return 0;
  if (0 > (s = socket (PF_INET, SOCK_STREAM, 0))) {
#if defined WIN32 || defined __WIN32
    fprintf (stderr, "socket (PF_INET, SOCK_STREAM, 0): WinSock error %d",
           WSAGetLastError ());
#else
    perror ("socket (PF_INET, SOCK_STREAM, 0)");
#endif
  }
  else if (connect (s, reinterpret_cast<const struct sockaddr*>(&addr_in),
                sizeof addr_in)) {
#if defined WIN32 || defined __WIN32
    int wsaerror = WSAGetLastError ();
    switch (wsaerror) {
    case WSAECONNREFUSED:
      fputs ("connect: Connection refused\n", stderr);
      break;
    case WSAENETUNREACH:
      fputs ("connect: Network unreachable\n", stderr);
      break;
    case WSAEHOSTUNREACH:
      fputs ("connect: Host unreachable\n", stderr);
      break;
    default:
      fprintf (stderr, "connect: WinSock error %d\n", wsaerror);
      break;
    }
#else
    perror ("connect");
#endif
    close (s);
  }
  else
    return s;
  return -1;
}

/** Clean up the communications structures */
void
00729 comm_cleanup (void)
{
#if defined WIN32 || defined __WIN32
  if (called_WSAStartup) {
    called_WSAStartup = false;
    WSACleanup ();
  }
#endif
}

Generated by  Doxygen 1.6.0   Back to index