1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase;
21  
22  import java.util.Set;
23  import java.util.HashSet;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.conf.Configuration;
28  
29  public abstract class MultithreadedTestUtil {
30  
31    public static final Log LOG =
32      LogFactory.getLog(MultithreadedTestUtil.class);
33  
34    public static class TestContext {
35      private final Configuration conf;
36      private Throwable err = null;
37      private boolean stopped = false;
38      private int threadDoneCount = 0;
39      private Set<TestThread> testThreads = new HashSet<TestThread>();
40  
41      public TestContext(Configuration configuration) {
42        this.conf = configuration;
43      }
44  
45      protected Configuration getConf() {
46        return conf;
47      }
48  
49      public synchronized boolean shouldRun()  {
50        return !stopped && err == null;
51      }
52  
53      public void addThread(TestThread t) {
54        testThreads.add(t);
55      }
56  
57      public void startThreads() {
58        for (TestThread t : testThreads) {
59          t.start();
60        }
61      }
62  
63      public void waitFor(long millis) throws Exception {
64        long endTime = System.currentTimeMillis() + millis;
65        while (!stopped) {
66          long left = endTime - System.currentTimeMillis();
67          if (left <= 0) break;
68          synchronized (this) {
69            checkException();
70            wait(left);
71          }
72        }
73      }
74      private synchronized void checkException() throws Exception {
75        if (err != null) {
76          throw new RuntimeException("Deferred", err);
77        }
78      }
79  
80      public synchronized void threadFailed(Throwable t) {
81        if (err == null) err = t;
82        LOG.error("Failed!", err);
83        notify();
84      }
85  
86      public synchronized void threadDone() {
87        threadDoneCount++;
88      }
89  
90      public void stop() throws Exception {
91        synchronized (this) {
92          stopped = true;
93        }
94        for (TestThread t : testThreads) {
95          t.join();
96        }
97        checkException();
98      }
99    }
100 
101   /**
102    * A thread that can be added to a test context, and properly
103    * passes exceptions through.
104    */
105   public static abstract class TestThread extends Thread {
106     protected final TestContext ctx;
107     protected boolean stopped;
108 
109     public TestThread(TestContext ctx) {
110       this.ctx = ctx;
111     }
112 
113     public void run() {
114       try {
115         doWork();
116       } catch (Throwable t) {
117         ctx.threadFailed(t);
118       }
119       ctx.threadDone();
120     }
121 
122     public abstract void doWork() throws Exception;
123 
124     protected void stopTestThread() {
125       this.stopped = true;
126     }
127   }
128   
129   /**
130    * A test thread that performs a repeating operation.
131    */
132   public static abstract class RepeatingTestThread extends TestThread {
133     public RepeatingTestThread(TestContext ctx) {
134       super(ctx);
135     }
136     
137     public final void doWork() throws Exception {
138       while (ctx.shouldRun() && !stopped) {
139         doAnAction();
140       }
141     }
142     
143     public abstract void doAnAction() throws Exception;
144   }
145 }