1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import junit.framework.TestCase;
23
24 import java.util.Random;
25 import java.util.concurrent.atomic.AtomicBoolean;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 public class TestReadWriteConsistencyControl extends TestCase {
29 static class Writer implements Runnable {
30 final AtomicBoolean finished;
31 final ReadWriteConsistencyControl rwcc;
32 final AtomicBoolean status;
33
34 Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
35 this.finished = finished;
36 this.rwcc = rwcc;
37 this.status = status;
38 }
39 private Random rnd = new Random();
40 public boolean failed = false;
41
42 public void run() {
43 while (!finished.get()) {
44 ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
45
46
47 int sleepTime = rnd.nextInt(500);
48
49
50 try {
51 if (sleepTime > 0)
52 Thread.sleep(0, sleepTime * 1000);
53 } catch (InterruptedException e1) {
54 }
55 try {
56 rwcc.completeMemstoreInsert(e);
57 } catch (RuntimeException ex) {
58
59 System.out.println(ex.toString());
60 ex.printStackTrace();
61 status.set(false);
62 return;
63
64 }
65 }
66 }
67 }
68
69 public void testParallelism() throws Exception {
70 final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
71
72 final AtomicBoolean finished = new AtomicBoolean(false);
73
74
75 final AtomicBoolean readerFailed = new AtomicBoolean(false);
76 final AtomicLong failedAt = new AtomicLong();
77 Runnable reader = new Runnable() {
78 public void run() {
79 long prev = rwcc.memstoreReadPoint();
80 while (!finished.get()) {
81 long newPrev = rwcc.memstoreReadPoint();
82 if (newPrev < prev) {
83
84 System.out.println("Reader got out of order, prev: " +
85 prev + " next was: " + newPrev);
86 readerFailed.set(true);
87
88 failedAt.set(newPrev);
89 return;
90 }
91 }
92 }
93 };
94
95
96 int n = 20;
97 Thread [] writers = new Thread[n];
98 AtomicBoolean [] statuses = new AtomicBoolean[n];
99 Thread readThread = new Thread(reader);
100
101 for (int i = 0 ; i < n ; ++i ) {
102 statuses[i] = new AtomicBoolean(true);
103 writers[i] = new Thread(new Writer(finished, rwcc, statuses[i]));
104 writers[i].start();
105 }
106 readThread.start();
107
108 try {
109 Thread.sleep(10 * 1000);
110 } catch (InterruptedException ex) {
111 }
112
113 finished.set(true);
114
115 readThread.join();
116 for (int i = 0; i < n; ++i) {
117 writers[i].join();
118 }
119
120
121 assertFalse(readerFailed.get());
122 for (int i = 0; i < n; ++i) {
123 assertTrue(statuses[i].get());
124 }
125
126
127 }
128 }