View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.client;
22  
23  import java.io.IOException;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.hbase.DoNotRetryIOException;
28  import org.apache.hadoop.hbase.HRegionInfo;
29  import org.apache.hadoop.hbase.NotServingRegionException;
30  import org.apache.hadoop.hbase.RemoteExceptionHandler;
31  import org.apache.hadoop.ipc.RemoteException;
32  
33  
34  /**
35   * Retries scanner operations such as create, next, etc.
36   * Used by {@link ResultScanner}s made by {@link HTable}.
37   */
38  public class ScannerCallable extends ServerCallable<Result[]> {
39    private static final Log LOG = LogFactory.getLog(ScannerCallable.class);
40    private long scannerId = -1L;
41    private boolean instantiated = false;
42    private boolean closed = false;
43    private Scan scan;
44    private int caching = 1;
45  
46    /**
47     * @param connection which connection
48     * @param tableName table callable is on
49     * @param scan the scan to execute
50     */
51    public ScannerCallable (HConnection connection, byte [] tableName, Scan scan) {
52      super(connection, tableName, scan.getStartRow());
53      this.scan = scan;
54    }
55  
56    /**
57     * @param reload force reload of server location
58     * @throws IOException
59     */
60    @Override
61    public void instantiateServer(boolean reload) throws IOException {
62      if (!instantiated || reload) {
63        super.instantiateServer(reload);
64        instantiated = true;
65      }
66    }
67  
68    /**
69     * @see java.util.concurrent.Callable#call()
70     */
71    public Result [] call() throws IOException {
72      if (scannerId != -1L && closed) {
73        close();
74      } else if (scannerId == -1L && !closed) {
75        this.scannerId = openScanner();
76      } else {
77        Result [] rrs = null;
78        try {
79          rrs = server.next(scannerId, caching);
80        } catch (IOException e) {
81          IOException ioe = null;
82          if (e instanceof RemoteException) {
83            ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
84          }
85          if (ioe == null) throw new IOException(e);
86          if (ioe instanceof NotServingRegionException) {
87            // Throw a DNRE so that we break out of cycle of calling NSRE
88            // when what we need is to open scanner against new location.
89            // Attach NSRE to signal client that it needs to resetup scanner.
90            throw new DoNotRetryIOException("Reset scanner", ioe);
91          } else {
92            // The outer layers will retry
93            throw ioe;
94          }
95        }
96        return rrs;
97      }
98      return null;
99    }
100 
101   private void close() {
102     if (this.scannerId == -1L) {
103       return;
104     }
105     try {
106       this.server.close(this.scannerId);
107     } catch (IOException e) {
108       LOG.warn("Ignore, probably already closed", e);
109     }
110     this.scannerId = -1L;
111   }
112 
113   protected long openScanner() throws IOException {
114     return this.server.openScanner(this.location.getRegionInfo().getRegionName(),
115       this.scan);
116   }
117 
118   protected Scan getScan() {
119     return scan;
120   }
121 
122   /**
123    * Call this when the next invocation of call should close the scanner
124    */
125   public void setClose() {
126     this.closed = true;
127   }
128 
129   /**
130    * @return the HRegionInfo for the current region
131    */
132   public HRegionInfo getHRegionInfo() {
133     if (!instantiated) {
134       return null;
135     }
136     return location.getRegionInfo();
137   }
138 
139   /**
140    * Get the number of rows that will be fetched on next
141    * @return the number of rows for caching
142    */
143   public int getCaching() {
144     return caching;
145   }
146 
147   /**
148    * Set the number of rows that will be fetched on next
149    * @param caching the number of rows for caching
150    */
151   public void setCaching(int caching) {
152     this.caching = caching;
153   }
154 }