1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import java.util.Set;
23 import java.util.HashSet;
24
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28
29 public abstract class MultithreadedTestUtil {
30
31 public static final Log LOG =
32 LogFactory.getLog(MultithreadedTestUtil.class);
33
34 public static class TestContext {
35 private final Configuration conf;
36 private Throwable err = null;
37 private boolean stopped = false;
38 private int threadDoneCount = 0;
39 private Set<TestThread> testThreads = new HashSet<TestThread>();
40
41 public TestContext(Configuration configuration) {
42 this.conf = configuration;
43 }
44
45 protected Configuration getConf() {
46 return conf;
47 }
48
49 public synchronized boolean shouldRun() {
50 return !stopped && err == null;
51 }
52
53 public void addThread(TestThread t) {
54 testThreads.add(t);
55 }
56
57 public void startThreads() {
58 for (TestThread t : testThreads) {
59 t.start();
60 }
61 }
62
63 public void waitFor(long millis) throws Exception {
64 long endTime = System.currentTimeMillis() + millis;
65 while (!stopped) {
66 long left = endTime - System.currentTimeMillis();
67 if (left <= 0) break;
68 synchronized (this) {
69 checkException();
70 wait(left);
71 }
72 }
73 }
74 private synchronized void checkException() throws Exception {
75 if (err != null) {
76 throw new RuntimeException("Deferred", err);
77 }
78 }
79
80 public synchronized void threadFailed(Throwable t) {
81 if (err == null) err = t;
82 LOG.error("Failed!", err);
83 notify();
84 }
85
86 public synchronized void threadDone() {
87 threadDoneCount++;
88 }
89
90 public void stop() throws Exception {
91 synchronized (this) {
92 stopped = true;
93 }
94 for (TestThread t : testThreads) {
95 t.join();
96 }
97 checkException();
98 }
99 }
100
101
102
103
104
105 public static abstract class TestThread extends Thread {
106 protected final TestContext ctx;
107 protected boolean stopped;
108
109 public TestThread(TestContext ctx) {
110 this.ctx = ctx;
111 }
112
113 public void run() {
114 try {
115 doWork();
116 } catch (Throwable t) {
117 ctx.threadFailed(t);
118 }
119 ctx.threadDone();
120 }
121
122 public abstract void doWork() throws Exception;
123
124 protected void stopTestThread() {
125 this.stopped = true;
126 }
127 }
128
129
130
131
132 public static abstract class RepeatingTestThread extends TestThread {
133 public RepeatingTestThread(TestContext ctx) {
134 super(ctx);
135 }
136
137 public final void doWork() throws Exception {
138 while (ctx.shouldRun() && !stopped) {
139 doAnAction();
140 }
141 }
142
143 public abstract void doAnAction() throws Exception;
144 }
145 }