class: center, middle # Event-driven Concurrency ## Alan Cox and Scott Rixner --- layout: true --- # Event-driven Concurrency - Perform tasks in response to *events* - Events could include user input, network activity, timers, etc. - COMP 321 will focus on network I/O - Do not perform I/O operations until you know they can succeed - Network "events": - Data is available to read - Space is available to write - Error has occurred --- # Example: `accept(2)` ``` while (true) { // Block waiting for a client to connect int connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientlen); // Handle connection // Close the connection close(connfd); } ``` - `accept(2)` blocks until a client connects - While waiting, the server cannot handle other clients - This is inherently sequential, processing one client at a time --- # Non-blocking I/O ``` int flags = fcntl(fd, F_GETFL, 0); // check for errors fcntl(fd, F_SETFL, flags | O_NONBLOCK); // check for errors ``` - Set the `O_NONBLOCK` flag on a file descriptor - Subsequent I/O operations on the file descriptor will not block - Instead, they will return immediately with `errno` set to `EWOULDBLOCK` (or `EAGAIN`) --- class: line-numbers # Back to `accept(2)` .twocolumn[ .col[ ``` int flags = fcntl(listenfd, F_GETFL, 0); fcntl(listenfd, F_SETFL, flags | O_NONBLOCK); while (true) { // Check if a client is trying to connect int connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientlen); if (connfd < 0) { if (errno == EWOULDBLOCK || errno == EAGAIN) { // No client is trying to connect } else { // Handle error } } else { // Handle connection? } } ``` ] .col[ - What if you get to line 11? - Process existing clients? - Call `accept(2)` again? - What if you get to line 16? - Make `connfd` non-blocking - Process this client? - Process existing clients? - Call `accept(2)` again? ] ] --- # `select(2)` ``` int select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout); ``` - Wait for one (or more) of a set of file descriptors to become ready for reading/writing - `nfds` is the highest file descriptor in any set plus 1 - `readfds`, `writefds`, and `errorfds` are sets of file descriptors to check - Sets will be **modified** to indicate which file descriptors are ready - `timeout` is the maximum time to wait, or `NULL` to wait indefinitely - Set `timeout` to zero seconds in order not to block --- # Using `select(2)` ``` // Initialize the waiting read set of file desriptors with the listen socket fd_set read_waiting; FD_ZERO(&read_waiting); FD_SET(listenfd, &read_waiting); int maxfd = listenfd + 1; while (true) { // Wait for a file descriptor we care about to become ready for reading fd_set read_ready = read_waiting; int nready = select(maxfd + 1, &read_ready, NULL, NULL, NULL); if (nready < 0) { // Handle error } else { if (FD_ISSET(listenfd, &read_ready)) { // listenfd is ready for reading, accept will not block! int connfd = accept(listenfd, (struct sockaddr *)&clientaddr, &clientlen); // Set connfd to be non-blocking, add to read_waiting set, update maxfd (if necessary) } // Handle (non-blocking) reading from ready existing connections } } ``` --- # FD Sets - Use `FD_SET`, `FD_CLR`, `FD_ZERO`, and `FD_ISSET` to manipulate the sets - Remeber that `select(2)` modifies the sets passed to it - Always copy the waiting sets before calling `select(2)`! --- # Reading and Writing - `select(2)` takes different sets for reading and writing (and checking for errors) - Register a file descriptor in the appropriate set(s) - If `select(2)` indicates a descriptor is ready for reading, that does *not* mean it is also ready for writing (and vice versa) - Still should make all socket descriptors non-blocking - In case you do multiple reads/writes accidentally or across threads --- # Limitations of `select(2)` - Size of FD sets is limited to `FD_SETSIZE` - Need to loop over all file descripters in the ready sets to find the `nready` ready file descriptors Newer alternatives, such as `epoll`, `kevent`, and others - They operate using the same principles as `select(2)` - Just with a better interface that enables better performance - These are not cross-platform - People often use an event library that abstracts the differences --- # Exercise: `uthr_check_blocked` Determine if any threads that are blocked on I/O can be moved to the runnable state based on the readiness of file descriptors - Use `select(2)` to determine which file descriptors are ready for reading/writing - Unblock the appropriate threads --- class: line-numbers # Managing Blocked I/O ``` struct fd_state { int maxfd; fd_set read_set; fd_set write_set; struct { int readers; int writers; } blocked[FD_SETSIZE]; } fd_state; ``` - Lines 2-4: Sets of sockets on which threads are blocked trying to read/write and the maximum descriptor - Lines 5-8: Number of threads blocked on each socket - `readers` should be >0 if and only if descriptor is set in `read_set` - `writers` should be >0 if and only if descriptor is set in `write_set` --- # Using `select(2)` with `fd_state` ``` struct fd_state { int maxfd; fd_set read_set; fd_set write_set; struct { int readers; int writers; } blocked[FD_SETSIZE]; } fd_state; ``` ``` // Timeout of 0 seconds struct timeval timeout = {0, 0}; // Copy file descriptor sets fd_set ready_for_read = fd_state.read_set; fd_set ready_for_write = fd_state.write_set; // Will not block because of 0 timeout nready = select(fd_state.maxfd + 1, &ready_for_read, &ready_for_write, NULL, &timeout); // If nready > 0, figure out which file descriptors are ready and unblock appropriate threads // Note that a thread can only ever be blocked on one file descriptor at a time ``` --- # Thread Information ``` enum uthr_state { UTHR_FREE = 0, UTHR_RUNNABLE, UTHR_BLOCKED, UTHR_JOINING, UTHR_ZOMBIE }; enum uthr_op { UTHR_OP_NOP = 0, UTHR_OP_READ, UTHR_OP_WRITE }; static struct uthr { // ... enum uthr_state state; // State of the thread int fd; // File descriptor on which the thread is blocked enum uthr_op op; // Operation the thread is blocked on struct uthr *prev; // Previous thread in the queue this thread is on struct uthr *next; // Next thread in the queue this thread is on }; ``` - `fd` and `op` are only valid if `state` is `UTHR_BLOCKED` - Remove unblocked threads from `blockedq` and add to `runq` - `prev` and `next` are linked to whichever queue this thread is on - Don't forget to change the thread's state appropriately --- class: center, middle # Get Started!