1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import junit.framework.TestCase;
23  
24  import java.util.Random;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  public class TestReadWriteConsistencyControl extends TestCase {
29    static class Writer implements Runnable {
30      final AtomicBoolean finished;
31      final ReadWriteConsistencyControl rwcc;
32      final AtomicBoolean status;
33  
34      Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) {
35        this.finished = finished;
36        this.rwcc = rwcc;
37        this.status = status;
38      }
39      private Random rnd = new Random();
40      public boolean failed = false;
41  
42      public void run() {
43        while (!finished.get()) {
44          ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert();
45  //        System.out.println("Begin write: " + e.getWriteNumber());
46          // 10 usec - 500usec (including 0)
47          int sleepTime = rnd.nextInt(500);
48          // 500 * 1000 = 500,000ns = 500 usec
49          // 1 * 100 = 100ns = 1usec
50          try {
51            if (sleepTime > 0)
52              Thread.sleep(0, sleepTime * 1000);
53          } catch (InterruptedException e1) {
54          }
55          try {
56            rwcc.completeMemstoreInsert(e);
57          } catch (RuntimeException ex) {
58            // got failure
59            System.out.println(ex.toString());
60            ex.printStackTrace();
61            status.set(false);
62            return;
63            // Report failure if possible.
64          }
65        }
66      }
67    }
68  
69    public void testParallelism() throws Exception {
70      final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
71  
72      final AtomicBoolean finished = new AtomicBoolean(false);
73  
74      // fail flag for the reader thread
75      final AtomicBoolean readerFailed = new AtomicBoolean(false);
76      final AtomicLong failedAt = new AtomicLong();
77      Runnable reader = new Runnable() {
78        public void run() {
79          long prev = rwcc.memstoreReadPoint();
80          while (!finished.get()) {
81            long newPrev = rwcc.memstoreReadPoint();
82            if (newPrev < prev) {
83              // serious problem.
84              System.out.println("Reader got out of order, prev: " +
85              prev + " next was: " + newPrev);
86              readerFailed.set(true);
87              // might as well give up
88              failedAt.set(newPrev);
89              return;
90            }
91          }
92        }
93      };
94  
95      // writer thread parallelism.
96      int n = 20;
97      Thread [] writers = new Thread[n];
98      AtomicBoolean [] statuses = new AtomicBoolean[n];
99      Thread readThread = new Thread(reader);
100 
101     for (int i = 0 ; i < n ; ++i ) {
102       statuses[i] = new AtomicBoolean(true);
103       writers[i] = new Thread(new Writer(finished, rwcc, statuses[i]));
104       writers[i].start();
105     }
106     readThread.start();
107 
108     try {
109       Thread.sleep(10 * 1000);
110     } catch (InterruptedException ex) {
111     }
112 
113     finished.set(true);
114 
115     readThread.join();
116     for (int i = 0; i < n; ++i) {
117       writers[i].join();
118     }
119 
120     // check failure.
121     assertFalse(readerFailed.get());
122     for (int i = 0; i < n; ++i) {
123       assertTrue(statuses[i].get());
124     }
125 
126 
127   }
128 }