public final class GroupByJoinPlan extends Plan
select r(kx,ky,c(z)) from x in X, y in Y, z = mp(x,y) where jx(x) = jy(y) group by (kx,ky): (gx(x),gy(y));where: mp: map function, r: reduce function, c: combine function, jx: left join key function, jy: right join key function, gx: left group-by function, gy: right group-by function.
select ( sum(z), i, j ) from (x,i,k) in X, (y,k,j) in Y, z = x*y group by (i,j);It uses m*n partitions, so that n/m=|X|/|Y| and a hash table of size |X|/n*|Y|/m can fit in memory M. That is, n = |X|/sqrt(M), m = |Y|/sqrt(M). Each partition generates |X|/n*|Y|/m data. It replicates X n times and Y m times. Uses a hash-table H of size |X|/n*|Y|/m MapReduce pseudo-code:
mapX ( x ) for i = 0,n-1 emit ( ((hash(gx(x)) % m)+m*i, jx(x), 1), (1,x) ) mapY ( y ) for i = 0,m-1 emit ( ((hash(gy(y)) % n)*m+i, jy(y), 2), (2,y) )mapper output key: (partition,joinkey,tag), value: (tag,data)
reduce ( (p,_,_), s ) if p != current_partition flush() current_partition = p read x from s first and store it to xs for each y from the rest of s for each x in xs H[(gx(x),gy(y))] = c( H[(gx(x),gy(y))], mp((x,y)) )where flush() is: for each ((kx,ky),v) in H: emit r((kx,ky),v)
Plan.MRContainerKeyComparator
cache, conf, counter_container, counter_key, max_input_files, temporary_paths, value_container
Constructor and Description |
---|
GroupByJoinPlan() |
Modifier and Type | Method and Description |
---|---|
static DataSet |
groupByJoin(Tree left_join_key_fnc,
Tree right_join_key_fnc,
Tree left_groupby_fnc,
Tree right_groupby_fnc,
Tree map_fnc,
Tree combine_fnc,
Tree reduce_fnc,
DataSet X,
DataSet Y,
int num_reducers,
int n,
int m,
String stop_counter)
the GroupByJoin operation
|
, binarySource, binarySource, clean, collect, collect, distribute_compiled_arguments, dump, fileCache, functional_argument, generator, generator, get_type, getCache, merge, merge, new_path, parsedSource, parsedSource, print_stream, setCache, size
public static final DataSet groupByJoin(Tree left_join_key_fnc, Tree right_join_key_fnc, Tree left_groupby_fnc, Tree right_groupby_fnc, Tree map_fnc, Tree combine_fnc, Tree reduce_fnc, DataSet X, DataSet Y, int num_reducers, int n, int m, String stop_counter) throws Exception
left_join_key_fnc
- left join key functionright_join_key_fnc
- right join key functionleft_groupby_fnc
- left group-by functionright_groupby_fnc
- right group-by functionmap_fnc
- map functioncombine_fnc
- combine functionreduce_fnc
- reduce functionX
- left data setY
- right data setnum_reducers
- number of reducersn
- left dimension of the reducer gridm
- right dimension of the reducer gridstop_counter
- optional counter used in repeat operationException
Copyright © 2013 The Apache Software Foundation. All rights reserved.