Version 1.0
struct el { struct el *next; long key; long data; }; struct el head; spinlock_t mutex;
struct el *insert(long key, long data) { struct el *p; p = kmalloc(sizeof(*p), GPF_ATOMIC); spin_lock(&mutex); p->next = head.next; p->key = key; p->data = data; head.next = p; spin_unlock(&mutex); } struct el *search(long key) { struct el *p; p = head.next; while (p != &head) { if (p->key == key) { return (p); } p = p->next; } return (NULL); } /* Example use. */ spin_lock(&mutex); p = search(key); if (p != NULL) { /* do stuff with p */ } spin_unlock(&mutex);These implementations are quite straightforward, but are subject to locking bottlenecks.
struct el *insert(long key, long data) { struct el *p; p = kmalloc(sizeof(*p), GPF_ATOMIC); spin_lock(&mutex); p->next = head.next; p->key = key; p->data = data; wmb(); head.next = p; spin_unlock(&mutex); } struct el *search(long key) { struct el *p; p = head.next; while (p != &head) { rmb(); if (p->key == key) { return (p); } p = p->next; }; return (NULL); }(Note: see read-copy update for information on how to delete elements from this list while still permitting lock-free searches.)
The rmb()s in search() cause unnecessary performance degradation on CPUs (such as the i386, IA64, PPC, and SPARC) where data dependencies result in an implied rmb(). In addition, code that traverses a chain of pointers would have to be broken up in order to insert the needed rmb()s. For example:
d = p->next->data;would have to be rewritten as:
q = p->next; rmb(); d = q->data;One could imagine much uglier code expansion where there are more dereferences in a single expression. The inefficiencies and code bloat could be avoided if there were a primitive like wmb() that allowed read-side data dependencies to act as implicit rmb() invocations.
However, some CPUs have no such instruction, the Alpha being a case in point. On these CPUs, a wmb() only guarantees that the invalidate requests corresponding to the writes will be emitted in order. The wmb() does not guarantee that the reading CPU will process these invalidates in order.
For example, consider a CPU with a partitioned cache, as shown in the following diagram:
Here, even-numbered cachelines are maintained in cache bank 0, and odd-numbered cache lines are maintained in cache bank 1. Suppose that head was maintained in cache bank 0, and that a newly allocated element was maintained in cache bank 1. The insert() code's wmb() would guarantee that the invalidates corresponding to the writes to the next, key, and data fields would appear on the bus before the write to head->next.
But suppose that the reading CPU's cache bank 1 was extremely busy, with lots of pending invalidates and outstanding accesses, and that the reading CPU's cache bank 0 was idle. The invalidation corresponding to head->next could then be processed before that of the three fields. If search() were to be executing just at that time, it would pick up the new value of head->next, but, since the invalidates corresponding to the three fields had not yet been processed, it could pick up the old (garbage!) value corresponding to these fields, possibly resulting in an oops or worse.
Placing an rmb() between the access to head->next and the three fields fixes this problem. The rmb() forces all outstanding invalidates to be processed before any subsequent reads are allowed to proceed. Since the invalidate corresponding to the three fields arrived before that of head->next, this will guarantee that if the new value of head->next was read, then the new value of the three fields will also be read. No oopses (or worse).
However, all the rmb()s add complexity, are easy to leave out, and hurt performance of all architectures. And if you forget a needed rmb(), you end up with very intermittent and difficult-to-diagnose memory-corruption errors. Just what we don't need in Linux!
So, there is strong motivation for a way of eliminating the need for these rmb()s.
struct el *insert(long key, long data) { struct el *p; p = kmalloc(sizeof(*p), GPF_ATOMIC); spin_lock(&mutex); p->next = head.next; p->key = key; p->data = data; wmbdd(); head.next = p; spin_unlock(&mutex); } struct el *search(long key) { struct el *p; p = head.next; while (p != &head) { if (p->key == key) { return (p); } p = p->next; } return (NULL); }This code is much nicer:
After discussions in lkml about this, it was decided that putting an explicit read_barrier_depends() is the appropriate thing to do in the linux kernel. Linus also suggested that the barrier names be made more explict. With such a primitive, the code looks as follows:
struct el *insert(long key, long data) { struct el *p; p = kmalloc(sizeof(*p), GPF_ATOMIC); spin_lock(&mutex); p->next = head.next; p->key = key; p->data = data; write_barrier(); head.next = p; spin_unlock(&mutex); } struct el *search(long key) { struct el *p; p = head.next; while (p != &head) { read_barrier_depends(); if (p->key == key) { return (p); } p = p->next; } return (NULL); }A preliminary patch for this is barriers-2.5.7-1.patch. The future releases of this patch can be found along with the RCU package here.
Besides, we cannot be 100% certain that there won't be some other CPU lacking a synchronous invalidation instruction...