Lab 13 - Event-driven Concurrency
Lab goal:
- Gain experience using the
select()
system call. - Learn to implement an entirely event-driven
concurrent network server using the
select()
system call.
Event-Based Concurrent I/O
The lectures have given a very high-level overview of
the concept of event-driven concurrent I/O
operations. That overview included a basic treatment of the
select()
system call. A server that is
not event-driven performs each I/O operation and
blocks until it completes. So the only way to handle
concurrent I/O operations in such a server is by using
multiple threads and/or multiple processes. Instead, an
event-driven server waits on multiple I/O
operations concurrently, allowing it to multiplex
these different operations.
The select()
system call is widely used to
implement event-driven servers. In particular, calling
select()
allows a program to monitor multiple
file descriptors, waiting until one or more of the file
descriptors become ready
for some class of I/O
operation (e.g., input is now possible):
int select(int nfds, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout);
Three of the arguments to the select()
system call each specify an independent set of file
descriptors to be watched, and the nfds
argument should be the highest-numbered file descriptor
across all three of these sets, plus 1; in
other words, nfds
specifies the number of file
descriptors, since file descriptor numbers begin at 0.
The three sets of file descriptors are as follows:
readset
is a set of file descriptors to be watched to see if data becomes available for reading on that file descriptor (i.e., if a read from that file descriptor can now be completed without blocking).writeset
is a set of file descriptors to be watched to see if they are ready for writing on that file descriptor (i.e., if a write to that file descriptor can now be completed without blocking).exceptset
is a set of file descriptors to be watched for exceptions (i.e., if an exception occurred on that file descriptor). Theexceptset
set is not relevant to this lab and should be specified here as aNULL
pointer.
The first nfds
file descriptors (i.e., file
descriptor number 0 through number nfds
-1,
inclusive) are checked in each file descriptor set. If any
of the three file descriptor sets are specified as a
NULL
pointer, that file descriptor set is
ignored (i.e., no file descriptors are of interest for that
type of operation).
A call to select()
normally blocks the
calling process and does not return until at least
one of the specified file descriptors meets the condition
for which it is being watched. However, the
timeout
argument to select()
specifies an upper bound on the amount of time
before which the call to select()
returns,
even if none of the watched file descriptors is satisfied.
If timeout
is NULL
,
select
can block indefinitely.
Specifically, select()
will block the
calling process and not return until at least one
of the following conditions is satisfied:
- One or more file descriptors in the
readset
are ready for reading. - One or more file descriptors in the
writeset
are ready for writing. - One of more file descriptors in the
exceptset
incurred an exception. - The timeout given by
timeout
has expired.
The value returned by a call to select()
is
the total number of file descriptors that are ready (from
all three of the given file descriptor sets). In addition,
on return, select()
modifies each of the given
file descriptor sets in-place to be a subset consisting of
only those file descriptors that are ready for the given
respective operation. If, instead, the specified
timeout
expired before any of the file
descriptors became ready, then select()
returns 0.
Each file descriptor set is stored as an
fd_set
, a bit field in an array of integers.
The following macros are provided for manipulating such
file descriptor sets:
FD_ZERO(&fdset)
initializes the descriptor setfdset
to a null (i.e., empty) set.FD_SET(fd, &fdset)
adds the file descriptorfd
to the setfdset
.FD_CLR(fd, &fdset)
removes the file descriptorfd
from the setfdset
.FD_ISSET(fd, &fdset)
checks if the file descriptorfd
is a member of the setfdset
; the result is non-zero (true) if that file descriptor is in the set and is 0 (false) otherwise.
The lectures showed an event-driven concurrent echo
server using select()
. But as pointed out in
lecture, this solution suffers from certain concurrency
limitations. In particular, the solution uses
Rio_readlineb()
which is not designed to be
used in a non-blocking manner. In fact, this function does
not return until a complete line has been read. Further,
the writes done in that server are not performed using
select()
.
In this lab, you will learn to implement a
entirely event-driven server using
select()
.
Chat Switchboard
The entirely event-driven server you will implement in this lab is a chat switchboard. The function of a chat switchboard is to forward each incoming message over all client connections (i.e., to all clients) except for the client connection over which the message was received. The challenge in such a server lies in implementing this behavior in a entirely event-driven manner. You will also, in implementing this chat switchboard, revisit topics such as doubly-linked lists.
Message Handling
Client messages are stored within
message objects until they have been written
out to all clients. The fields within a
struct
msg
struct msg
are described below:
/*
* Data structure to keep track of messages. Each message object holds one
* complete line of message from a client.
*
* The message objects are maintained in a global doubly-linked list. There is a
* dummy message head at the beginning of the list and a sentinel message at the
* end. The sentinel is an empty message. A message is added to the list by
* transferring the contents of the message to the existing sentinel and adding
* the now empty message as the new sentinel.
*
* A message is added to the list only when a complete line has been read from
* the client. Its reference count is set to the number of currently active
* clients. The reference count is decremented each time the message is written
* out to a client. The message is removed from the list and deallocated
* (garbage collected) when its reference count reaches zero.
*/
struct msg {
/* Points to the previous message object in the doubly-linked list. */
struct msg *prev;
/* Points to the next message object in the doubly-linked list. */
struct msg *next;
/* Points to a dynamically allocated buffer holding the message. */
char *message;
/*
* Current size of the message. This can grow if the message is read in
* multiple stages.
*/
int size;
/*
* Reference count to keep track of number of times a message should be
* written out. This is decremented each time the message is written
* out to another client. The message object is garbage collected
* when the reference count reaches zero.
*/
int refcount;
/*
* File descriptor of the connection on which the message arrived. This is
* used to ensure that we don't write the message back on that connection
* again.
*/
int fd;
};
Connection Handling
Your chat switchboard server will listen at a specified
port number for incoming connections. When a new connection
request arrives from some client, the server accepts the
connection and creates a new
connection object for
that client. The fields within a
struct conn
struct conn
are described below:
/*
* Data structure to keep track of client connection state.
*
* The connection objects are also maintained in a global doubly-linked list.
* There is a dummy connection head at the beginning of the list.
*/
struct conn {
/* Points to the previous connection object in the doubly-linked list. */
struct conn *prev;
/* Points to the next connection object in the doubly-linked list. */
struct conn *next;
/* File descriptor associated with this connection. */
int fd;
/* Internal buffer to temporarily store the contents of a read. */
char buffer[BUF_SIZE];
/* Size of the data stored in the buffer. */
size_t size;
/*
* Message being currently read from this connection. This message has not
* been added to the doubly-linked list yet.
*/
struct msg *read_msg;
/*
* This message and the ones following it on the doubly-linked list
* have to be written out on this connection.
*/
struct msg *write_msg;
/* Number of bytes of the current message (write_msg) written. */
int written_bytes;
};
The procedure handle_new_connection()
accepts new connections, the procedure
add_client()
allocates a new connection object
and initializes the associated state, and the procedure
remove_client()
closes the client connection
and cleans up the associated state. The procedures
add_conn_list()
and
remove_conn_list()
add a connection to and
remove a connection from, respectively, the doubly-linked
list of struct conn
connection
objects.
Example of Chat Switchboard Operation
First, for this example, assume that there are two client connections, C1 and C2, over which two outstanding messages, M1 and M2, have to be written:
Now, suppose a new third client connection, C3, is accepted by the server:
Then, a new third message, M3, is finalized:
The new M3 message is added by the server to the end of the list:
And lastly, the contents of this new message are then transferred to the sentinel message. The third client now starts writing from the third message:
The procedures alloc_msg()
and
free_msg()
allocate and free message objects,
respectively. The procedures add_msg_list()
and remove_msg_list()
manipulate the
doubly-linked list of message objects. The procedure
finalize_msg()
transfers the contents of the
current message to the sentinel message and adds it as the
new sentinel message. The procedure
update_read_msg()
scans an input buffer and
breaks down its contents into one or more message objects.
The procedure decrement_refcount_and_gc()
decrements the reference count of a message, and if the
reference count goes to zero, it garbage collects the
message.
The Connection Pool
Currently active clients are managed by the server using
a
connection pool
structure. This structure also holds the read and write
file descriptor sets. The fields within a
struct conn_pool
struct conn_pool
are described below:
/*
* Data structure to keep track of active client connections.
*/
struct conn_pool {
/* Largest file descriptor in this pool. */
int maxfd;
/* Number of ready descriptors returned by select. */
int nready;
/* Set of all active descriptors for reading. */
fd_set read_set;
/* Subset of descriptors ready for reading. */
fd_set ready_read_set;
/* Set of all active descriptors for writing. */
fd_set write_set;
/* Subset of descriptors ready for writing. */
fd_set ready_write_set;
/* Doubly-linked list of active client connection objects. */
struct conn *conn_head;
/* Number of active client connections. */
unsigned int nr_conns;
/* Doubly-linked list of outstanding message objects. */
struct msg *msg_head;
/* Sentinel message of the doubly-linked list. */
struct msg *sentinel_msg;
};
The procedure init_pool()
initializes an
empty connection pool. Initially, the readset
has only the listen file descriptor in it and the
writeset
is empty. The procedure
read_messages()
reads from each ready file
descriptor in the readset
and handles the
incoming messages appropriately. The procedure
write_messages()
writes the appropriate
message to each writefile
descriptor in the
write set. Note that a message is written by the server to
a file descriptor only if it was not received from
that connection.
The Main Function
The main()
function in the server first
sets up a listening socket at the specified port and then
initializes an empty connection pool. Then,
main()
enters a
loop, where it performs a while
select()
. When this
select()
call returns, main()
handles each of the ready socket file descriptors. Note
that this call to select()
waits indefinitely
until at least one of the file descriptors is ready for the
given operation (respectively, here, reading or
writing):
int
main(int argc, char **argv)
{
....
while (true) {
/*
* Wait until:
* 1. New connection is requested.
* 2. Data is available to be read from a socket.
* 3. Socket is ready for data to be written.
*/
pool.ready_read_set = pool.read_set;
pool.ready_write_set = pool.write_set;
pool.nready = Select(pool.maxfd + 1, &pool.ready_read_set,
&pool.ready_write_set, NULL, NULL);
/* Check for new connection requests. */
if (FD_ISSET(listenfd, &pool.ready_read_set))
handle_new_connection(listenfd, &pool);
/* Check for sockets ready for writing. */
write_messages(&pool);
/* Check for sockets with new data to be read. */
read_messages(&pool);
}
....
}
GitHub Repository for This Lab
To obtain your private repo for this lab, please point your browser to this link for the starter code for the lab. Follow the same steps as for previous labs and assignments to create your repository on GitHub and then to clone it onto CLEAR. The directory for your repository for this lab will be
lab-13-event-based-concurrent-i-o-name
where name is your GitHub userid.
Submission
Be sure to git push the appropriate C source files for this lab before 11:55 PM tonight, to get credit for this lab.