Accumulo supports the ability to import sorted files produced by an external process into an online table. Often, it is much faster to churn through large amounts of data using map/reduce to produce the these files. The new files can be incorporated into Accumulo using bulk ingest.
org.apache.accumulo.core.client.Connector
instanceconnector.tableOperations().getSplits()
connector.tableOperations().importDirectory()
passing the output directory of the MapReduce jobFiles can also be imported using the "importdirectory" shell command.
A complete example is available in README.bulkIngest
Importing data using whole files of sorted data can be very efficient, but it differs from live ingest in the following ways:
Consider two approaches to creating ingest files using map/reduce.
In the first case, adding the file requires telling a single tablet server about a single file. Even if the file is 20G in size, it is one call to the tablet server. The tablet server makes one extra file entry in the !METADATA table, and the data is now part of the tablet.
In the second case, an request must be made for each tablet for each file to be added. If there 100 files and 100 tablets, this will be 10K requests, and the number of files needed to be opened for scans on these tablets will be very large. Major compactions will most likely start which will eventually fix the problem, but a lot more work needs to be done by accumulo to read these files.
Getting good, fast, bulk import performance depends on creating files like the first, and avoiding files like the second.
For this reason, a RangePartitioner should be used to create files when writing with the AccumuloFileOutputFormat.
Hash partition is not recommended because it will put keys in random groups, exactly like our bad approach.
Any set of cut points for range partitioning can be used in a map
reduce job, but using Accumulo's current splits is probably the most
optimal thing to do. However in some cases there may be too many
splits. For example if there are 2000 splits, you would need to run
2001 reducers. To overcome this problem use the
connector.tableOperations.getSplits(<table name>,<max
splits>)
method. This method will not return more than
<max splits>
splits, but the splits it returns
will optimally partition the data for Accumulo.
Remember that Accumulo never splits rows across tablets. Therefore the range partitioner only considers rows when partitioning.
When bulk importing many files into a new table, it might be good to pre-split the table to bring additional resources to accepting the data. For example, if you know your data is indexed based on the date, pre-creating splits for each day will allow files to fall into natural splits. Having more tablets accept the new data means that more resources can be used to import the data right away.
An alternative to bulk ingest is to have a map/reduce job use
AccumuloOutputFormat
, which can support billions of inserts per
hour, depending on the size of your cluster. This is sufficient for
most users, but bulk ingest remains the fastest way to incorporate
data into Accumulo. In addition, bulk ingest has one advantage over
AccumuloOutputFormat: there is no duplicate data insertion. When one uses
map/reduce to output data to accumulo, restarted jobs may re-enter
data from previous failed attempts. Generally, this only matters when
there are aggregators. With bulk ingest, reducers are writing to new
map files, so it does not matter. If a reduce fails, you create a new
map file. When all reducers finish, you bulk ingest the map files
into Accumulo. The disadvantage to bulk ingest over AccumuloOutputFormat
is
greater latency: the entire map/reduce job must complete
before any data is available.