Logo Search packages:      
Sourcecode: maria version File versions

void serve ( int  s,
class StateSetReporter reporter,
bool  breadth 
)

serve the clients

Parameters:
s the master socket file descriptor
reporter the state space interface
breadth flag: apply breadth-first search

Initial state

Active clients

Allocated size of clients[]

Queue of retired states

Number of pending states in the server queue

Total number of states being processed by the clients

copy fds for the select(2) call

number of unprocessed states per client

Definition at line 224 of file server.C.

References BytePacker::allocate(), cleanup(), close_socket(), BitPacker::deflate(), StateList::empty(), GlobalMarking::encode(), fds, BitPacker::getBuf(), Net::getInitMarking(), Client::getLength(), BitPacker::getNumBytes(), Client::hasUnsent(), interrupted, MSG_NOSIGNAL, StateReporter::net, nfd, numfd, StateList::pop(), StateSetReporter::pop(), Client::printTerminating(), Client::push(), Client::rbuf, StateSetReporter::reject(), StateSetReporter::report(), Client::retire(), Client::sendBuf(), and StateSetReporter::setOffset().

Referenced by safetyP().

{
  FD_ZERO (&fds);
  FD_SET (s, &fds);
  numfd = 0;
  nfd = s + 1;
  /** Initial state */
  class BitPacker initial;
  /** Active clients */
  class Client** clients = 0;
  /** Allocated size of clients[] */
  unsigned numAllocClients = 0;
  /** Queue of retired states */
  class StateList retired;
  /** Number of pending states in the server queue */
  unsigned numPending = 1;
  /** Total number of states being processed by the clients */
  unsigned numPendingC = 0;

  if (!reporter.net.getInitMarking ()->encode
      (initial, *reporter.net.getInitMarking (), 0))
    assert (false);
  // add the initial state to the set
  initial.deflate ();
  reporter.report (initial.getBuf (), initial.getNumBytes (), false, false, 0);

  for (;;) {
    extern volatile bool interrupted;
    int i, fd;
    if (interrupted || (!numPending && !numPendingC)) {
    cleanup:
      for (i = 0; i < nfd; i++) {
      if (!FD_ISSET (i, &fds))
        continue;
      close (i);
      if (i != s)
        delete clients[i];
      }
      delete[] clients;
      return;
    }
    /** copy fds for the select(2) call */
    fd_set readfds, writefds;
    memcpy (&readfds, &fds, sizeof fds);
    if (numPending) {
      memcpy (&writefds, &fds, sizeof fds);
      FD_CLR (s, &writefds);
    }
    if ((i = select (nfd, &readfds, numPending ? &writefds : 0, 0, 0)) <= 0) {
      if (errno == EINTR)
      continue;
#if defined WIN32 || defined __WIN32
      fprintf (stderr, "select: WinSock error %d\n", WSAGetLastError ());
#else
      perror ("select");
#endif
      goto cleanup;
    }

    if (FD_ISSET (s, &readfds)) {
      struct sockaddr_in addr;
#ifdef __hpux
      int addrlen = sizeof addr;
#else // __hpux
      socklen_t addrlen = sizeof addr;
#endif // __hpux
      fd = accept (s, reinterpret_cast<struct sockaddr*>(&addr), &addrlen);
      if (fd < 0) {
#if defined WIN32 || defined __WIN32
      fprintf (stderr, "accept: WinSock error %d\n", WSAGetLastError ());
#else
      perror ("accept");
#endif
      }
      else {
      if (!numAllocClients) {
        for (numAllocClients = 1; numAllocClients <= unsigned (fd);
             numAllocClients <<= 1);
        memset (clients = new class Client*[numAllocClients],
              0, numAllocClients * sizeof *clients);
      }
      else if (unsigned (fd) >= numAllocClients) {
        unsigned n = numAllocClients;
        do
          n <<= 1;
        while (unsigned (fd) >= n);
        class Client** c = new class Client*[n];
        memcpy (c, clients, numAllocClients * sizeof *clients);
        delete[] clients;
        clients = c;
        memset (clients + numAllocClients, 0,
              (n - numAllocClients) * sizeof *clients);
        numAllocClients = n;
      }

      clients[fd] = addrlen == sizeof addr && addr.sin_family == AF_INET
        ? new class Client (ntohl (addr.sin_addr.s_addr),
                        ntohs (addr.sin_port))
        : new class Client (0, 0);

      FD_SET (fd, &fds);
      if (fd >= nfd)
        nfd = fd + 1;
      numfd++;
      }
      FD_CLR (s, &readfds);
      i--;
    }

    /* process client requests until all clients terminate */
    for (fd = nfd;;) {
      while (--fd && !FD_ISSET (fd, &readfds));
      if (!fd)
      break;
      i--;

      class Client& c = *clients[fd];
      assert (c.rbufu ? c.rbufu < c.rbuf.getLength () : !c.rbuf.getLength ());
      c.rbuf.allocate (2048);

      ssize_t rlen = recv (fd, reinterpret_cast<char*>
                     (c.rbuf.getBuf () + c.rbuf.getLength ()),
                     c.rbuf.getAllocated () - c.rbuf.getLength (),
                     MSG_NOSIGNAL);
      if (rlen < 0) {
#if defined WIN32 || defined __WIN32
      {
        int wsaerror = WSAGetLastError ();
        if (wsaerror != WSAECONNRESET)
          fprintf (stderr, "recv: WinSock error %d\n", wsaerror);
      }
#else
      if (errno != ECONNRESET)
        perror ("recv");
#endif
      cleanup_readfd:
      c.printTerminating ();
      assert (numPendingC >= c.getLength ());
      numPending += c.getLength ();
      numPendingC -= c.getLength ();
      c.retire (retired);
      delete clients[fd];
      clients[fd] = 0;
      close_socket (fd);
      continue;
      }
      else if (!rlen)
      goto cleanup_readfd;

      c.rbuf.setLength (c.rbuf.getLength () + rlen);

      for (class ByteUnpacker u (c.rbuf.getBuf () + c.rbufu);;) {
      unsigned a;
      if ((c.rbufu = u.buf - c.rbuf.getBuf ()) == c.rbuf.getLength ())
        c.rbufu = c.rbufv = 0, c.rbuf.clear (), u.buf = c.rbuf.getBuf ();
      if (!u.extract (c.rbuf, a)) {
      nodata:
        break;
      }
      switch (a) {
      default:
        fprintf (stderr, "socket %d: unexpected request %u\n", fd, a);
        goto cleanup_readfd;
      case StateSet::initialState:
        if (!u.extract (c.rbuf, a) ||
            !u.ensureData (c.rbuf, a))
          goto nodata;
        if (a != initial.getNumBytes () ||
            memcmp (u.buf, initial.getBuf (), a)) {
          fprintf (stderr, "socket %d: initial state mismatch\n", fd);
          goto cleanup_readfd;
        }
        u.buf += a;
        break;
      case StateSet::addStates:
        if (!c.getLength ()) {
        nowork:
          fprintf (stderr, "socket %d: unexpected submission\n", fd);
          goto cleanup_readfd;
        }
        if (!u.extract (c.rbuf, a))
          goto nodata;
        if (a) {
          reporter.setOffset (c.getOffset ());
          while (a--) {
            unsigned numBytes;
            if (!u.extract (c.rbuf, numBytes) ||
              !u.ensureData (c.rbuf, numBytes))
            goto nodata;
            if (c.rbuf.getBuf () + c.rbufv >= u.buf) {
            // the state has been read already
            u.buf += numBytes;
            continue;
            }
            if (reporter.report (u.buf, numBytes, false, false, numPendingC))
            numPending++;
            c.rbufv = (u.buf += numBytes) - c.rbuf.getBuf ();
          }
        }
        // remove the state, as the client has processed it
        numPendingC--, c.pop ();
        break;
      case StateSet::deadlockState:
      case StateSet::deadlockFatal:
      case StateSet::rejectState:
      case StateSet::rejectFatal:
      case StateSet::propertyError:
      case StateSet::inconsistent:
      case StateSet::deadlockState | 0x20:
      case StateSet::deadlockFatal | 0x20:
      case StateSet::rejectState | 0x20:
      case StateSet::rejectFatal | 0x20:
      case StateSet::propertyError | 0x20:
      case StateSet::inconsistent | 0x20:
        if (!c.getLength ())
          goto nowork;
        else {
          unsigned dlen;
          if (!u.extract (c.rbuf, dlen) ||
            !u.ensureData (c.rbuf, dlen))
            goto nodata;
          unsigned char* dstate = dlen ? new unsigned char[dlen] : 0;
          if (dlen)
            u.extract (dstate, dlen);
          reporter.reject (dstate, dlen, c.getOffset (),
                       a & 0x20, a & ~0x20);
          delete[] dstate;
        }
        switch (a & ~0x20) {
        case StateSet::deadlockFatal:
        case StateSet::rejectFatal:
        case StateSet::propertyError:
        case StateSet::inconsistent:
          goto cleanup;
        default:
          break;
        }
        break;
      }
      }
    }

    assert (i >= 0);

    if (!i || !numfd)
      continue;

    /** number of unprocessed states per client */
    const unsigned numStates = (numPending + numPendingC + numfd - 1) / numfd;

    /* distribute unprocessed states to clients until the queue is empty */
    for (fd = nfd; i-- && numPending; ) {
      while (fd--, !FD_ISSET (fd, &writefds))
      assert (fd > 0);
      class Client& c = *clients[fd];
      assert (numPendingC >= c.getLength ());
      if (c.hasUnsent ()) {
      if (!c.sendBuf (fd)) {
      cleanup_writefd:
        c.printTerminating ();
        assert (numPendingC >= c.getLength ());
        numPending += c.getLength ();
        numPendingC -= c.getLength ();
        c.retire (retired);
        delete clients[fd];
        clients[fd] = 0;
        close_socket (fd);
      }
      continue;
      }
      if (c.getLength () >= numStates)
      continue;
      for (unsigned j = numStates - c.getLength (); j--; ) {
      assert (numPending > 0);
      word_t* state;
      long offset;
      size_t numBytes;
      if (retired.empty ())
        state = reporter.pop (breadth, numBytes, offset);
      else
        state = retired.pop (true, offset, &numBytes);
      c.push (state, numBytes, offset);
      numPendingC++;
      if (!--numPending)
        break;
      }
      if (!c.sendBuf (fd))
      goto cleanup_writefd;
    }
  }
}


Generated by  Doxygen 1.6.0   Back to index