From: Suparna Bhattacharya <suparna@in.ibm.com>

There was a correctness issue with AIO O_SYNC writes which could potentially
cause i/o completion notifications before the entire writeback actually
completes for a given write request.  To fix this, AIO retries in this case,
need to check if writeback has completed for the page for which wakeup was
just issued, before moving on to the next.

Simple as it sounds, this turned out to be non-trivial because
operate_on_page_range() issues a lock_page and unlock_page for every such
check, wherein the unlock_page could trigger a cycle of spurious wakeups and
hence very large numbers of retries in certain circumstances involving a
stream of AIO writes where there are pairs of pages hashed to the same wait
queue, or a second random write just issued to the same page.  

The solution implemented in this patch avoids the lock_page/ unlock_page
calls in operate_on_page_range for the page_waiter case.  Instead, it
provides modified versions of gang page lookup routines
(find_get_pages_next(), and pagevec_lookup_next()) which also return the next
index to be looked up, thus avoiding the need for holding the page lock just
for index stabilization for the next pagevec lookup.


 mm/page-writeback.c |   36 ++++++++++++++++++++----------------
 1 files changed, 20 insertions(+), 16 deletions(-)

diff -puN mm/page-writeback.c~aio-gang_lookup-fix mm/page-writeback.c
--- 25/mm/page-writeback.c~aio-gang_lookup-fix	2004-01-04 17:39:51.000000000 -0800
+++ 25-akpm/mm/page-writeback.c	2004-01-04 17:39:51.000000000 -0800
@@ -640,7 +640,7 @@ static ssize_t operate_on_page_range(str
 {
 	pgoff_t first = pos >> PAGE_CACHE_SHIFT;
 	pgoff_t last = (pos + count - 1) >> PAGE_CACHE_SHIFT;	/* inclusive */
-	pgoff_t next = first;
+	pgoff_t next = first, curr = first;
 	struct pagevec pvec;
 	ssize_t ret = 0, bytes = 0;
 	int i;
@@ -649,25 +649,25 @@ static ssize_t operate_on_page_range(str
 		return 0;
 
 	pagevec_init(&pvec, 0);
-	while (pagevec_lookup(&pvec, mapping, next,
+	while (pagevec_lookup(&pvec, mapping, &next,
 				min((pgoff_t)PAGEVEC_SIZE, last - next + 1))) {
 		for (i = 0; i < pagevec_count(&pvec); i++) {
 			struct page *page = pvec.pages[i];
 
-			lock_page(page);	/* stabilise ->index */
-			if (!page->mapping) {	/* truncated */
-				unlock_page(page);
-				next++;
-				continue;
-			}
-			next = page->index + 1;
-			ret = (*operator)(page);
-			if (ret == -EIOCBRETRY)
+			curr = page->index;
+			if ((curr > next) || !page->mapping) /* truncated ?*/ {
+				curr = next;
 				break;
-			if (PageError(page)) {
-				if (!ret)
-					ret = -EIO;
+			} else {
+				ret = (*operator)(page);
+				if (ret == -EIOCBRETRY)
+					break;
+				if (PageError(page)) {
+					if (!ret)
+						ret = -EIO;
+				}
 			}
+			curr++;
 			if (next > last)
 				break;
 		}
@@ -675,7 +675,7 @@ static ssize_t operate_on_page_range(str
 		if ((next > last) || (ret == -EIOCBRETRY))
 			break;
 	}
-	bytes = (next << PAGE_CACHE_SHIFT) - pos;
+	bytes = (curr << PAGE_CACHE_SHIFT) - pos;
 	if (bytes > count)
 		bytes = count;
 	return (bytes && (!ret || (ret == -EIOCBRETRY))) ? bytes : ret;
@@ -683,7 +683,6 @@ static ssize_t operate_on_page_range(str
 
 static int page_waiter(struct page *page)
 {
-	unlock_page(page);
 	return wait_on_page_writeback_wq(page, current->io_wait);
 }
 
@@ -700,6 +699,11 @@ static int page_writer(struct page *page
 		.nr_to_write	= 1,
 	};
 
+	lock_page(page);
+	if (!page->mapping) {	/* truncated */
+		unlock_page(page);
+		return 0;
+	}
 	if (!test_clear_page_dirty(page)) {
 		unlock_page(page);
 		return 0;

_