diff --git a/16-iouring/iouring.c b/16-iouring/iouring.c
index 97a34cb..c182764 100644
--- a/16-iouring/iouring.c
+++ b/16-iouring/iouring.c
@@ -152,9 +152,36 @@ struct ring ring_map(int ring_fd, struct io_uring_params p) {
.params = p,
.in_flight = 0
};
- // HINT: Use PROT_READ, PROT_WRITE, MAP_SHARED, and MAP_POPULATE
- // HINT: The three mappings use three magic mmap offsets: IORING_OFF_SQ_RING, IORING_OFF_SQES, IORING_OFF_CQ_RING
- // HINT: Dereference the {sring,cring}_mask directly
+
+ // The Submission Ring (mapping 1)
+ int sring_sz = p.sq_off.array + p.sq_entries * sizeof(unsigned);
+ void *sq_ptr = mmap(0, sring_sz, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_POPULATE,
+ ring_fd, IORING_OFF_SQ_RING);
+ if (sq_ptr == MAP_FAILED) die("mmap");
+
+ ring.sring_tail = sq_ptr + p.sq_off.tail;
+ ring.sring_mask = *(unsigned *)(sq_ptr + p.sq_off.ring_mask);
+ ring.sring = sq_ptr + p.sq_off.array;
+
+ // The SQE array (mapping 2)
+ int sqe_sz = p.sq_entries * sizeof(struct io_uring_sqe);
+ void *sqes = mmap(0, sqe_sz, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_POPULATE,
+ ring_fd, IORING_OFF_SQES);
+ if (sqes == MAP_FAILED) die("mmap");
+ ring.sqes = sqes;
+
+ // The Completion queue (mapping 3)
+ int cring_sz = p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe);
+ void *cq_ptr = mmap(0, cring_sz, PROT_READ | PROT_WRITE,
+ MAP_SHARED | MAP_POPULATE,
+ ring_fd, IORING_OFF_CQ_RING);
+ if (cq_ptr == MAP_FAILED) die("mmap");
+ ring.cring_head = cq_ptr + p.cq_off.head;
+ ring.cring_tail = cq_ptr + p.cq_off.tail;
+ ring.cring_mask = *(unsigned *)(cq_ptr + p.cq_off.ring_mask);
+ ring.cqes = cq_ptr + p.cq_off.cqes;
return ring;
}
@@ -162,20 +189,107 @@ struct ring ring_map(int ring_fd, struct io_uring_params p) {
// _single_ system call. The function returns the number of actually
// submitted random reads.
unsigned submit_random_read(struct ring *R, int fd, ssize_t fsize, unsigned count) {
- // FIXME: Prepare up to count IORING_OP_READ operations
- // HINT: Set the sqe->user_data to the address of the used destination buffer
- // FIXME: Issue a single io_ring_enter() command to submit those SQEs
- return 0;
+ // With a single io_uring_enter(2), we can submit only as many
+ // SQEs as there are sq_entries in the mmap'ed ring.
+ if (count > R->params.sq_entries)
+ count = R->params.sq_entries;
+
+ // Read the submission tail pointer with a load acquire. Since this
+ // pointer is only written by user space an from our thread, this
+ // load_acquire is *OPTIONAL*.
+ unsigned tail = load_aquire(R->sring_tail);
+
+ // Submit count SQEs. For each SQE, we push the tail index by one
+ // element
+ for (unsigned i = 0; i < count; i++) {
+ // As we only increment the tail pointer (and never wrap it
+ // around), we use the kernel provided mask to do the 'modulo'
+ // operation. For an 32-entry sring, the sring_mask is 31.
+ unsigned index = tail & R->sring_mask;
+ tail ++;
+
+ // We use an 1:1 mapping between sring entries and SQE
+ // entries. However, this is not mandatory and the user can
+ // implement its own SQE management between SQE preparation
+ // and submission. Please note, that the kernel will copy the
+ // SQE into userspace on submit, whereby the SQE is
+ // immediately reusable.
+ struct io_uring_sqe *sqe = &R->sqes[index];
+
+ // Reinitialize all SQEs with zeros
+ memset(sqe, 0, sizeof(struct io_uring_sqe));
+
+ // Allocate a destination buffer and roll the dice which
+ // block from the file should be read from disk.
+ struct buffer *buffer = alloc_buffer();
+ unsigned block_size = sizeof(buffer->data);
+ unsigned block = random() % (fsize / block_size);
+
+ // Prepare an SQE
+ sqe->opcode = IORING_OP_READ;
+ // The source block (fd, offset
+ sqe->fd = fd;
+ sqe->off = block * block_size;
+ // The destination buffer
+ sqe->addr = (unsigned long) buffer;
+ sqe->len = block_size;
+ // We set the pointer to our destination buffer as the
+ // user_data field, which will be returned in the CQE. With
+ // that information, we can free that buffer again.
+ sqe->user_data = (unsigned long) buffer;
+
+ // printf("SQE[%d]: opcode=%d, fd=%d, addr=%p, len=%x off=%llx user_data=%p\n",
+ // index, sqe->opcode, sqe->fd, (void*)sqe->addr, sqe->len, sqe->off, (void*)sqe->user_data);
+
+ // Insert the prepared SQE into the submission queue.
+ R->sring[index] = index;
+ }
+
+ // With a single store_release, we forward the actual tail
+ // pointer, whereby the prepared SQEs become visible to the
+ // kernel. After this, the kernel could already process the SQEs,
+ // but we still have to inform him about this update
+ store_release(R->sring_tail, tail);
+
+ // We use io_uring_enter (to_submit=count, min_completions=0)
+ unsigned submitted = sys_io_uring_enter(R->ring_fd, count, 0, 0);
+ if (submitted < 0)
+ die("io_uring_enter");
+ assert(submitted == count);
+
+ // Record that we now have more requests in flight.
+ R->in_flight += submitted;
+
+ return submitted;
}
// Reap one CQE from the completion ring and copy the CQE to *cqe. If
// no CQEs are available (*cring_head == *cring_tail), this function
// returns 0.
int reap_cqe(struct ring *R, struct io_uring_cqe *cqe) {
- // FIXME: Check that the cring contains an CQE, if not return 0
- // FIXME: Extract the CQE into *cqe
- // FIXME: Forward cring_head by 1 and return 1
- return 0;
+ // Read the head index of the completion queue with an
+ // load-acquire. Thereby, we also capture head updates that were
+ // issued on other CPUs on weak memory
+ unsigned head = load_aquire(R->cring_head);
+
+ // If head==tail, the completion ring is empty
+ if (head == *R->cring_tail)
+ return 0;
+
+ // Extract the CQE from the completion queue by copying
+ *cqe = R->cqes[head & R->cring_mask];
+
+ // Forward the head pointer with a store-release
+ store_release(R->cring_head, head+1);
+
+ // printf("CQE[%d]: res: %d user_data: %lld, flags=%d\n",
+ // index, cqe->res, cqe->user_data, cqe->flags);
+
+ if (cqe->res < 0) {
+ errno = -1 * cqe->res;
+ die("CQE/res");
+ }
+ return 1;
}
// This function uses reap_cqe() to extract a filled buffer from the
@@ -183,10 +297,28 @@ int reap_cqe(struct ring *R, struct io_uring_cqe *cqe) {
// io_uring_enter(min_completions=1, IORING_ENTER_GETEVENTS) if
// necessary.
struct buffer * receive_random_read(struct ring *R, bool wait) {
- // FIXME: Step 1: optimistic reap
- // FIXME: Step 2: io_uring_enter()
- // FIXME: Step 3: reap again.
+ struct buffer *buf;
+ struct io_uring_cqe cqe;
+ // Optimistic reap without issuing a system call first.
+ if (reap_cqe(R, &cqe) > 0) {
+ goto extract_buffer;
+ } else if (wait) {
+ // If we are allowed to wait, we wait for an CQE
+ sys_io_uring_enter(R->ring_fd, 0, 1, IORING_ENTER_GETEVENTS);
+
+ // This reap should always succeed as we have waited
+ if (reap_cqe(R, &cqe) > 0)
+ goto extract_buffer;
+ }
return NULL;
+ extract_buffer:
+ // Derive a buffer pointer from the cqe._user data, perform some
+ // safety checks and decrement the in_flight counter.
+ buf = (struct buffer*) cqe.user_data;
+ assert(cqe.res == sizeof(buf->data) && "Short Read");
+ assert(buf != NULL && "Invalid User Data");
+ R->in_flight--;
+ return buf;
}
int main(int argc, char *argv[]) {
@@ -215,17 +347,42 @@ int main(int argc, char *argv[]) {
ssize_t fsize = s.st_size;
struct ring R = {0};
- (void) sq_size; (void) fsize;
- // FIXME: Create an io_uring with io_uring_setup(2)
- // FIXME: Map the ring with: R = ring_map(ring_fd, params);
+ // Create an io_uring with default parameters and of with sq_size entries
+ struct io_uring_params params = { 0 };
+ int ring_fd = sys_io_uring_setup(sq_size, ¶ms);
+ // Map the ring data structures to the user space
+ R = ring_map(ring_fd, params);
+
+ // Some debug output
+ printf("init_ring: sq_size=%d\n", sq_size);
+ printf("SQ: %d entries (%p), ring: %p\n", R.params.sq_entries, R.sqes, R.sring);
+ printf("CQ: %d entries ring: %p\n", R.params.cq_entries, R.cqes);
// A per-second statistic about the performed I/O
unsigned read_blocks = 0; // Number of read blocks
ssize_t read_bytes = 0; // How many bytes where read
while(1) {
- // FIXME: Submit SQEs up to a certain threshold (e.g. R.params.sq_entries)
- // FIXME: Reap as many CQEs as possible with waiting exactly once
- // FIXME: Keep track of the total in_flight requests (submitted - completed)
+
+ // For the first loop iteration, we wait for an CQE.
+ bool wait = true;
+ while(R.in_flight > 0) {
+ struct buffer *buf = receive_random_read(&R, wait);
+ assert(buf != NULL || !wait);
+ if (!buf) break;
+
+ // Update the statistic
+ read_blocks += 1;
+ read_bytes += sizeof(buf->data);
+
+ // Free the buffer again
+ free_buffer(buf);
+
+ wait = false;
+ }
+
+ unsigned to_submit = sq_size - R.in_flight;
+ submit_random_read(&R, fd, fsize, to_submit);
+
// Every second, we output a statistic ouptu
struct timeval now2;