Lab 13 - Event-driven Concurrency

Lab goal:


Event-Based Concurrent I/O

The lectures have given a very high-level overview of the concept of event-driven concurrent I/O operations. That overview included a basic treatment of the select() system call. A server that is not event-driven performs each I/O operation and blocks until it completes. So the only way to handle concurrent I/O operations in such a server is by using multiple threads and/or multiple processes. Instead, an event-driven server waits on multiple I/O operations concurrently, allowing it to multiplex these different operations.

The select() system call is widely used to implement event-driven servers. In particular, calling select() allows a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become ready for some class of I/O operation (e.g., input is now possible):

int select(int nfds, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout);

Three of the arguments to the select() system call each specify an independent set of file descriptors to be watched, and the nfds argument should be the highest-numbered file descriptor across all three of these sets, plus 1; in other words, nfds specifies the number of file descriptors, since file descriptor numbers begin at 0. The three sets of file descriptors are as follows:

The first nfds file descriptors (i.e., file descriptor number 0 through number nfds-1, inclusive) are checked in each file descriptor set. If any of the three file descriptor sets are specified as a NULL pointer, that file descriptor set is ignored (i.e., no file descriptors are of interest for that type of operation).

A call to select() normally blocks the calling process and does not return until at least one of the specified file descriptors meets the condition for which it is being watched. However, the timeout argument to select() specifies an upper bound on the amount of time before which the call to select() returns, even if none of the watched file descriptors is satisfied. If timeout is NULL, select can block indefinitely.

Specifically, select() will block the calling process and not return until at least one of the following conditions is satisfied:

The value returned by a call to select() is the total number of file descriptors that are ready (from all three of the given file descriptor sets). In addition, on return, select() modifies each of the given file descriptor sets in-place to be a subset consisting of only those file descriptors that are ready for the given respective operation. If, instead, the specified timeout expired before any of the file descriptors became ready, then select() returns 0.

Each file descriptor set is stored as an fd_set, a bit field in an array of integers. The following macros are provided for manipulating such file descriptor sets:

The lectures showed an event-driven concurrent echo server using select(). But as pointed out in lecture, this solution suffers from certain concurrency limitations. In particular, the solution uses Rio_readlineb() which is not designed to be used in a non-blocking manner. In fact, this function does not return until a complete line has been read. Further, the writes done in that server are not performed using select().

In this lab, you will learn to implement a entirely event-driven server using select().


Chat Switchboard

The entirely event-driven server you will implement in this lab is a chat switchboard. The function of a chat switchboard is to forward each incoming message over all client connections (i.e., to all clients) except for the client connection over which the message was received. The challenge in such a server lies in implementing this behavior in a entirely event-driven manner. You will also, in implementing this chat switchboard, revisit topics such as doubly-linked lists.

Message Handling

Client messages are stored within struct msg message objects until they have been written out to all clients. The fields within a struct msg are described below:

/*
 * Data structure to keep track of messages. Each message object holds one
 * complete line of message from a client.
 *
 * The message objects are maintained in a global doubly-linked list. There is a
 * dummy message head at the beginning of the list and a sentinel message at the
 * end. The sentinel is an empty message. A message is added to the list by
 * transferring the contents of the message to the existing sentinel and adding
 * the now empty message as the new sentinel.
 *
 * A message is added to the list only when a complete line has been read from
 * the client. Its reference count is set to the number of currently active
 * clients. The reference count is decremented each time the message is written
 * out to a client. The message is removed from the list and deallocated
 * (garbage collected) when its reference count reaches zero.
 */
struct msg {
        /* Points to the previous message object in the doubly-linked list. */
        struct msg *prev;
        /* Points to the next message object in the doubly-linked list. */
        struct msg *next;
        /* Points to a dynamically allocated buffer holding the message. */
        char *message;
        /*
         * Current size of the message. This can grow if the message is read in
         * multiple stages.
         */
        int size;
        /*
         * Reference count to keep track of number of times a message should be
         * written out. This is decremented each time the message is written
         * out to another client. The message object is garbage collected
         * when the reference count reaches zero.
         */
        int refcount;
        /*
         * File descriptor of the connection on which the message arrived. This is
         * used to ensure that we don't write the message back on that connection
         * again.
         */
        int fd;
};

Connection Handling

Your chat switchboard server will listen at a specified port number for incoming connections. When a new connection request arrives from some client, the server accepts the connection and creates a new struct conn connection object for that client. The fields within a struct conn are described below:

/*
 * Data structure to keep track of client connection state.
 *
 * The connection objects are also maintained in a global doubly-linked list.
 * There is a dummy connection head at the beginning of the list.
 */
struct conn {
        /* Points to the previous connection object in the doubly-linked list. */
        struct conn *prev;
        /* Points to the next connection object in the doubly-linked list. */
        struct conn *next;
        /* File descriptor associated with this connection. */
        int fd;
        /* Internal buffer to temporarily store the contents of a read. */
        char buffer[BUF_SIZE];
        /* Size of the data stored in the buffer. */
        size_t size;
        /*
         * Message being currently read from this connection.  This message has not
         * been added to the doubly-linked list yet.
         */
        struct msg *read_msg;
        /*
         * This message and the ones following it on the doubly-linked list
         * have to be written out on this connection.
         */
        struct msg *write_msg;
        /* Number of bytes of the current message (write_msg) written. */
        int written_bytes;
};

The procedure handle_new_connection() accepts new connections, the procedure add_client() allocates a new connection object and initializes the associated state, and the procedure remove_client() closes the client connection and cleans up the associated state. The procedures add_conn_list() and remove_conn_list() add a connection to and remove a connection from, respectively, the doubly-linked list of struct conn connection objects.

Example of Chat Switchboard Operation

First, for this example, assume that there are two client connections, C1 and C2, over which two outstanding messages, M1 and M2, have to be written:

Now, suppose a new third client connection, C3, is accepted by the server:

Then, a new third message, M3, is finalized:

The new M3 message is added by the server to the end of the list:

And lastly, the contents of this new message are then transferred to the sentinel message. The third client now starts writing from the third message:

The procedures alloc_msg() and free_msg() allocate and free message objects, respectively. The procedures add_msg_list() and remove_msg_list() manipulate the doubly-linked list of message objects. The procedure finalize_msg() transfers the contents of the current message to the sentinel message and adds it as the new sentinel message. The procedure update_read_msg() scans an input buffer and breaks down its contents into one or more message objects. The procedure decrement_refcount_and_gc() decrements the reference count of a message, and if the reference count goes to zero, it garbage collects the message.

The Connection Pool

Currently active clients are managed by the server using a struct conn_pool connection pool structure. This structure also holds the read and write file descriptor sets. The fields within a struct conn_pool are described below:

/*
 * Data structure to keep track of active client connections.
 */
struct conn_pool {
        /* Largest file descriptor in this pool. */
        int maxfd;
        /* Number of ready descriptors returned by select. */
        int nready;
        /* Set of all active descriptors for reading. */
        fd_set read_set;
        /* Subset of descriptors ready for reading. */
        fd_set ready_read_set;
        /* Set of all active descriptors for writing. */
        fd_set write_set;
        /* Subset of descriptors ready for writing.  */
        fd_set ready_write_set;
        /* Doubly-linked list of active client connection objects. */
        struct conn *conn_head;
        /* Number of active client connections. */
        unsigned int nr_conns;
        /* Doubly-linked list of outstanding message objects. */
        struct msg *msg_head;
         /* Sentinel message of the doubly-linked list. */
        struct msg *sentinel_msg;
};

The procedure init_pool() initializes an empty connection pool. Initially, the readset has only the listen file descriptor in it and the writeset is empty. The procedure read_messages() reads from each ready file descriptor in the readset and handles the incoming messages appropriately. The procedure write_messages() writes the appropriate message to each writefile descriptor in the write set. Note that a message is written by the server to a file descriptor only if it was not received from that connection.

The Main Function

The main() function in the server first sets up a listening socket at the specified port and then initializes an empty connection pool. Then, main() enters a while loop, where it performs a select(). When this select() call returns, main() handles each of the ready socket file descriptors. Note that this call to select() waits indefinitely until at least one of the file descriptors is ready for the given operation (respectively, here, reading or writing):

int
main(int argc, char **argv)
{
        ....

        while (true) {
                /*
                 * Wait until:
                 * 1. New connection is requested.
                 * 2. Data is available to be read from a socket.
                 * 3. Socket is ready for data to be written.
                 */
                pool.ready_read_set = pool.read_set;
                pool.ready_write_set = pool.write_set;

                pool.nready = Select(pool.maxfd + 1, &pool.ready_read_set,
                    &pool.ready_write_set, NULL, NULL);

                /* Check for new connection requests. */
                if (FD_ISSET(listenfd, &pool.ready_read_set))
                        handle_new_connection(listenfd, &pool);

                /* Check for sockets ready for writing. */
                write_messages(&pool);

                /* Check for sockets with new data to be read. */
                read_messages(&pool);
        }

        ....
}

GitHub Repository for This Lab

To obtain your private repo for this lab, please point your browser to this link for the starter code for the lab. Follow the same steps as for previous labs and assignments to create your repository on GitHub and then to clone it onto CLEAR. The directory for your repository for this lab will be

lab-13-event-based-concurrent-i-o-name

where name is your GitHub userid.


Submission

Be sure to git push the appropriate C source files for this lab before 11:55 PM tonight, to get credit for this lab.