1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.io.NullWritable;
30  import org.apache.hadoop.io.Writable;
31  import org.apache.hadoop.mapreduce.InputFormat;
32  import org.apache.hadoop.mapreduce.InputSplit;
33  import org.apache.hadoop.mapreduce.JobContext;
34  import org.apache.hadoop.mapreduce.RecordReader;
35  import org.apache.hadoop.mapreduce.TaskAttemptContext;
36  
37  /**
38   * Input format that creates a configurable number of map tasks
39   * each provided with a single row of NullWritables. This can be
40   * useful when trying to write mappers which don't have any real
41   * input (eg when the mapper is simply producing random data as output)
42   */
43  public class NMapInputFormat extends InputFormat<NullWritable, NullWritable> {
44    private static final String NMAPS_KEY = "nmapinputformat.num.maps";
45  
46    @Override
47    public RecordReader<NullWritable, NullWritable> createRecordReader(
48        InputSplit split,
49        TaskAttemptContext tac) throws IOException, InterruptedException {
50      return new SingleRecordReader<NullWritable, NullWritable>(
51          NullWritable.get(), NullWritable.get());
52    }
53  
54    @Override
55    public List<InputSplit> getSplits(JobContext context) throws IOException,
56        InterruptedException {
57      int count = getNumMapTasks(context.getConfiguration());
58      List<InputSplit> splits = new ArrayList<InputSplit>(count);
59      for (int i = 0; i < count; i++) {
60        splits.add(new NullInputSplit());
61      }
62      return splits;
63    }
64  
65    public static void setNumMapTasks(Configuration conf, int numTasks) {
66      conf.setInt(NMAPS_KEY, numTasks);
67    }
68  
69    public static int getNumMapTasks(Configuration conf) {
70      return conf.getInt(NMAPS_KEY, 1);
71    }
72  
73    private static class NullInputSplit extends InputSplit implements Writable {
74      @Override
75      public long getLength() throws IOException, InterruptedException {
76        return 0;
77      }
78  
79      @Override
80      public String[] getLocations() throws IOException, InterruptedException {
81        return new String[] {};
82      }
83  
84      @Override
85      public void readFields(DataInput in) throws IOException {
86      }
87  
88      @Override
89      public void write(DataOutput out) throws IOException {
90      }
91    }
92    
93    private static class SingleRecordReader<K, V>
94      extends RecordReader<K, V> {
95      
96      private final K key;
97      private final V value;
98      boolean providedKey = false;
99  
100     SingleRecordReader(K key, V value) {
101       this.key = key;
102       this.value = value;
103     }
104 
105     @Override
106     public void close() {
107     }
108 
109     @Override
110     public K getCurrentKey() {
111       return key;
112     }
113 
114     @Override
115     public V getCurrentValue(){
116       return value;
117     }
118 
119     @Override
120     public float getProgress() {
121       return 0;
122     }
123 
124     @Override
125     public void initialize(InputSplit split, TaskAttemptContext tac) {
126     }
127 
128     @Override
129     public boolean nextKeyValue() {
130       if (providedKey) return false;
131       providedKey = true;
132       return true;
133     }
134     
135   }
136 }