diff --git a/10-epoll/epoll.c b/10-epoll/epoll.c
index 88d8ee2..b62e659 100644
--- a/10-epoll/epoll.c
+++ b/10-epoll/epoll.c
@@ -92,7 +92,67 @@ static int start_proc(struct proc *proc) {
}
-// FIXME: Implement a 'int copy_splice(int in_fd, int out_fd);'
+
+// Adds a file descriptor to an open epoll instance's list of
+// interesting file descriptors.
+//// events - For which events are we waiting (usually EPOLLIN)
+//// data - The kernel returns this data when an event occurs
+void epoll_add(int epoll_fd, int fd, int events, uint64_t data) {
+ struct epoll_event ev;
+ ev.events = events;
+ ev.data.u64 = data;
+
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ev) == -1) {
+ die("epoll_ctl: activate");
+ }
+}
+
+// Remove a file descriptor from the interest list.
+void epoll_del(int epoll_fd, int fd) {
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
+ die("epoll_ctl: reset");
+ }
+}
+
+// This function copies (some) bytes from in_fd to out_fd, as fast as
+// possible. For this we have a fast path with splice(2) and a slow
+// path with read(2)/write(2)
+int copy_splice(int in_fd, int out_fd) {
+ // We use a static buffer for the slow path
+ static char buf[4096];
+
+ // First, we try our fast path. We instruct splice to move as many
+ // data as possible (up to INT_MAX) between the descriptors.
+ // However, the splice should not block.
+ int len = splice(in_fd, 0, out_fd, 0, INT_MAX, SPLICE_F_NONBLOCK);
+ if (len >= 0) return len;
+
+ // Splice would have blocked, we will try again later on
+ if (errno == EAGAIN)
+ return 0;
+
+ // Splice fails with EINVAL if source or destination fd are not
+ // spliceable (e.g., the terminal), we fall back to our slow path
+ // and do doing regular read/write I/O.
+
+ if (errno != EINVAL) die("splice");
+
+ // We read a buffer full of data
+ len = read(in_fd, buf, 4096);
+ if (len < 0) die("read");
+
+ // As writes can be short, we issue writes until our buffer
+ // becomes empty.
+ int to_write = len;
+ char *ptr = buf;
+ do {
+ int written = write(out_fd, ptr, to_write);
+ to_write -= written;
+ ptr += len;
+ } while (to_write > 0);
+
+ return len;
+}
// This function prints an array of uint64_t (elements) as line with
// throughput measures. The function throttles its output to one line
@@ -140,8 +200,74 @@ int main(int argc, char *argv[]) {
fprintf(stderr, "[%s] Started filter as pid %d\n", procs[i].cmd, procs[i].pid);
}
- // FIXME: Arrange file descriptors in pairs of input -> output
- // FIXME: Setup epoll to listen on the input descriptors
- // FIXME: Receive events and copy data around.
- // FIXME: call print_throughput from time to time.
+ // We create an epoll instance.
+ int epoll_fd = epoll_create1(0);
+ if (epoll_fd == -1)
+ die("epoll_create");
+
+ // For each fd->fd connection we have a read_fd and an write_fd.
+ // On a higher level, this program connects file descriptors as
+ // pairs like this:
+ //
+ // read_fds[i] ----> write_fds[i]
+ int pairs = 1 + nprocs;
+ int read_fds[pairs];
+ int write_fds[pairs];
+
+ // Arrange descriptors in read_fds/write_fds arrays as pairs
+ read_fds[0] = STDIN_FILENO;
+ for (int i = 0; i < nprocs; i++) {
+ write_fds[i] = procs[i].stdin;
+ read_fds[i+1] = procs[i].stdout;
+ }
+ write_fds[nprocs] = STDOUT_FILENO;
+
+ // We now setup the epoll device. We listen for EPOLLIN on the
+ // read_fds and set the pair index as event data.
+ for (int i = 0; i < 1 + nprocs; i++)
+ epoll_add(epoll_fd, read_fds[i], EPOLLIN, i);
+
+ // For the throughput measurements, we use this array to store how
+ // many bytes we have transferred for a given pair.
+ uint64_t bytes[pairs];
+ memset(bytes, 0, sizeof(bytes));
+
+
+ // As long as we have active pairs, we continue to listen for
+ // events on our epoll device.
+ while (pairs > 0) {
+ // We use epoll_wait(2) to wait for at least one event, but we
+ // can receive up to ten events. We do not set a timeout but
+ // wait forever if necessary.
+ struct epoll_event event[10];
+ int nfds = epoll_wait(epoll_fd, event, 10, -1);
+
+ // Consume the events.
+ for (int n = 0; n < nfds; n++) {
+ // The kernel has given us our pair identifier. This is
+ // the great benefit of the epoll interface over
+ // select(2). With this user_data tunneled through the
+ // kernel, we can directly identify the event in our
+ // user space logic.
+ uint64_t pair = event[n].data.u64;
+
+ // If new data arrived, we shuffle data between the paired fds.
+ if (event[n].events & EPOLLIN) {
+ bytes[pair] += copy_splice(read_fds[pair], write_fds[pair]);
+ }
+
+ // If the input was closed (the other end hung up), we
+ // delete the event, close the file descriptors of the
+ // pair and decrement the number of active pairs.
+ if (event[n].events & EPOLLHUP) {
+ epoll_del(epoll_fd, read_fds[pair]);
+ close(read_fds[pair]);
+ close(write_fds[pair]);
+ pairs --;
+ }
+ }
+
+ // We try to dump the throughput information
+ print_throughput(bytes, pairs);
+ }
}