diff --git a/07-select/select.c b/07-select/select.c
index 6715537..5bceae3 100644
--- a/07-select/select.c
+++ b/07-select/select.c
@@ -101,6 +101,70 @@ static int start_proc(struct proc *proc) {
+// For the input side, we use a separate thread that reads from our
+// stdin and pushes the data to our filter processes. We do this in a
+// separate thread to avoid deadlocks where we want to push data to a
+// filter but the filter waits for us reading from its stdout.
+// Alternatively, we could use select not only for waiting for
+// read-ready file descriptors but also for write-ready ones.
+void* stdin_thread(void* data) {
+ // We need some buffer to handle the data. The size is actually arbitrary.
+ int buflen = 4096;
+ char *buf = malloc(buflen);
+ if (!buf) die("malloc");
+
+ while (true) {
+ // Read len bytes from _our_ stdin.
+ int len = read(STDIN_FILENO, buf, buflen);
+
+ // An error occurred or our stdin was closed. In this case,
+ // we close the stdin of our filters and terminate the stdin
+ // thread.
+ if (len < 0 || len == 0) {
+ // fprintf(stderr, "Stdin was closed %d\n", nprocs);
+ for (int i = 0; i < nprocs; i++) {
+ if (procs[i].pid != 0)
+ close(procs[i].stdin);
+ }
+ return NULL;
+ }
+
+ // We write the buffer to the stdin of all running processes.
+ // We ignore all errors, because we like to live on the edge.
+ for (int i = 0; i < nprocs; i++) {
+ if (procs[i].pid != 0)
+ write(procs[i].stdin, buf, len);
+ }
+ }
+}
+
+// When a process is ready and can provide us some data at his stdout,
+// we call this function. Here we drain the stdout and print the
+// result to our stdout. We print the output line wise and insert
+// banners after each newline.
+int drain_proc(struct proc *proc, char *buf, size_t buflen) {
+ int len = read(proc->stdout, buf, buflen-1);
+ if (len < 0) {
+ die("write to filter");
+ } else if (len == 0) { // EOF. process closed stdout.
+ int state;
+ waitpid(proc->pid, &state, 0);
+ fprintf(stderr, "[%s] filter exited. exitcode=%d\n", proc->cmd, WEXITSTATUS(state));
+ proc->pid = 0;
+ }
+
+ // Line-wise print with buffer. This is not meant to be effective
+ for (int i = 0; i < len; i++) {
+ if (proc->last_char == '\n')
+ printf("[%s] ", proc->cmd);
+
+ putchar(buf[i]);
+ proc->last_char = buf[i];
+ }
+ return len;
+}
+
+
int main(int argc, char *argv[]) {
if (argc <= 1) {
fprintf(stderr, "usage: %s [CMD-1] (<CMD-2> <CMD-3> ...)", argv[0]);
@@ -122,7 +186,50 @@ int main(int argc, char *argv[]) {
fprintf(stderr, "[%s] Started filter as pid %d\n", procs[i].cmd, procs[i].pid);
}
- // FIXME: Read from stdin and push data to the proc[*].stdin
- // FIXME: Use select(2)
- // FIXME: Read from proc[*].stdout and push data to stdout
+ // We create an thread for handling of stdin.
+ pthread_t handle;
+ int rc = pthread_create(&handle, NULL, stdin_thread, NULL);
+ if (rc < 0) die("pthread_create");
+
+ // Allocate an buffer that we use for each filter
+ const int MAX_LINE = 4096;
+ char *buf = malloc(MAX_LINE);
+ if (! buf) die("malloc");
+
+ // In this loop, we use select(2) to wait for the proc[*].stdout
+ // to become ready. If so, we read the data from that pipe and
+ // print it to our stdout.
+ while (true) {
+ // First, we create a file descriptor set, which contains all
+ // stdout descriptors of the running filter processes.
+ // Thereby, we also have to calculate the maximum
+ // file-descriptor number (nfds).
+ int nfds = 0;
+ fd_set readfds;
+ FD_ZERO(&readfds);
+ for (int i = 0; i < nprocs; i++) {
+ if (procs[i].pid != 0) {
+ FD_SET(procs[i].stdout, &readfds);
+ if (nfds < procs[i].stdout)
+ nfds = procs[i].stdout;
+ }
+ }
+
+ // No process is still alive, we can exit the program
+ if (nfds == 0)
+ break;
+
+ // Use select(2) to wait for any proc[*].stdout to become ready
+ int rc = select(nfds + 1, &readfds, NULL, NULL, NULL);
+ if (rc == -1) die("select");
+
+
+ // Determine which descriptor(s) became ready and drain the
+ // process' stdout.
+ for (int i = 0; i < nprocs; i++) {
+ if (FD_ISSET(procs[i].stdout, &readfds)) {
+ drain_proc(&procs[i], buf, MAX_LINE);
+ }
+ }
+ }
}