• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Java POForEach类的典型用法和代码示例

java 1次浏览

本文整理汇总了Java中org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach的典型用法代码示例。如果您正苦于以下问题:Java POForEach类的具体用法?Java POForEach怎么用?Java POForEach使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。

POForEach类属于org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators包,在下文中一共展示了POForEach类的27个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: getPlainForEachOP

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
static public POForEach getPlainForEachOP(String scope, NodeIdGenerator nig)
{
    List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
    List<Boolean> flat1 = new ArrayList<Boolean>();
    PhysicalPlan ep1 = new PhysicalPlan();
    POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
    prj1.setResultType(DataType.TUPLE);
    prj1.setStar(false);
    prj1.setColumn(1);
    prj1.setOverloaded(true);
    ep1.add(prj1);
    eps1.add(ep1);
    flat1.add(true);
    POForEach fe = new POForEach(new OperatorKey(scope, nig
            .getNextNodeId(scope)), -1, eps1, flat1);
    fe.setResultType(DataType.BAG);
    return fe;
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:19,
代码来源:MRUtil.java

示例2: visitPOForEach

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
    try{
        nonBlocking(op);
        List<PhysicalPlan> plans = op.getInputPlans();
        if(plans!=null)
            for (PhysicalPlan plan : plans) {
                processUDFs(plan);
            }
        phyToMROpMap.put(op, curMROp);
    }catch(Exception e){
        int errCode = 2034;
        String msg = "Error compiling operator " + op.getClass().getSimpleName();
        throw new MRCompilerException(msg, errCode, PigException.BUG, e);
    }
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:17,
代码来源:MRCompiler.java

示例3: createPartialAgg

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * Translate POForEach in combiner into a POPartialAgg
 * @param combineFE
 * @return partial aggregate operator
 * @throws CloneNotSupportedException 
 */
private POPartialAgg createPartialAgg(POForEach combineFE)
        throws CloneNotSupportedException {
    String scope = combineFE.getOperatorKey().scope;
    POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope, 
            NodeIdGenerator.getGenerator().getNextNodeId(scope)));
    poAgg.addOriginalLocation(combineFE.getAlias(), combineFE.getOriginalLocations());
    poAgg.setResultType(combineFE.getResultType());

    //first plan in combine foreach is the group key
    poAgg.setKeyPlan(combineFE.getInputPlans().get(0).clone());

    List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();
    for(int i=1; i<combineFE.getInputPlans().size(); i++){
        valuePlans.add(combineFE.getInputPlans().get(i).clone());
    }
    poAgg.setValuePlans(valuePlans);
    return poAgg;
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:25,
代码来源:CombinerOptimizer.java

示例4: createForEachWithGrpProj

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * Create a new foreach with same scope,alias as given foreach
 * add an inner plan that projects the group column, which is going to be
 * the first input
 * @param foreach source foreach
 * @param keyType type for group-by key
 * @return new POForeach
 */
private POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
    String scope = foreach.getOperatorKey().scope;
    POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());
    newFE.addOriginalLocation(foreach.getAlias(), foreach.getOriginalLocations());
    newFE.setResultType(foreach.getResultType());
    //create plan that projects the group column 
    PhysicalPlan grpProjPlan = new PhysicalPlan();
    //group by column is the first column
    POProject proj = new POProject(createOperatorKey(scope), 1, 0);
    proj.setResultType(keyType);
    grpProjPlan.add(proj);

    newFE.addInputPlan(grpProjPlan, false);
    return newFE;
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:24,
代码来源:CombinerOptimizer.java

示例5: patchUpMap

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * Replace old POLocalRearrange with new pre-combine LR,
 * add new map foreach, new map-local-rearrange, and connect them
 * 
 * @param mapPlan
 * @param preCombinerLR
 * @param mfe
 * @param mapAgg 
 * @param mlr
 * @throws PlanException 
 */
private void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
        POForEach mfe, POPartialAgg mapAgg, POLocalRearrange mlr)
                throws PlanException {

    POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
    mapPlan.replace(oldLR, preCombinerLR);

    mapPlan.add(mfe);
    mapPlan.connect(preCombinerLR, mfe);

    //the operator before local rearrange
    PhysicalOperator opBeforeLR = mfe;

    if(mapAgg != null){
        mapPlan.add(mapAgg);
        mapPlan.connect(mfe, mapAgg);
        opBeforeLR = mapAgg;
    }

    mapPlan.add(mlr);
    mapPlan.connect(opBeforeLR, mlr);
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:34,
代码来源:CombinerOptimizer.java

示例6: visitPOForEach

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
    if (revisit && nfe.getIllustrator() != null)
        return;
    List<PhysicalPlan> innerPlans = nfe.getInputPlans();
    for (PhysicalPlan innerPlan : innerPlans)
      innerPlanAttach(nfe, innerPlan);
    List<PhysicalOperator> preds = mPlan.getPredecessors(nfe);
    if (preds != null && preds.size() == 1 &&
        preds.get(0) instanceof POPackage &&
        !(preds.get(0) instanceof POPackageLite) &&
        ((POPackage) preds.get(0)).isDistinct()) {
        // equivalence class of POPackage for DISTINCT needs to be used
        //instead of the succeeding POForEach's equivalence class
        setIllustrator(nfe, preds.get(0).getIllustrator().getEquivalenceClasses());
        nfe.getIllustrator().setEqClassesShared();
    } else
        setIllustrator(nfe, 1);
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:20,
代码来源:IllustratorAttacher.java

示例7: testSortedDistinctInForeach

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * Test that POSortedDistinct gets printed as POSortedDistinct
 * @throws Exception
 */
@Test
public void testSortedDistinctInForeach() throws Exception {
    PhysicalPlan php = new PhysicalPlan();
    PhysicalPlan grpChain1 = GenPhyOp.loadedGrpChain();
    php.merge(grpChain1);

    List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
    PhysicalPlan inplan = new PhysicalPlan();
    PODistinct op1 = new POSortedDistinct(new OperatorKey("", r.nextLong()),
            -1, null);
    inplan.addAsLeaf(op1);
    inputs.add(inplan);
    List<Boolean> toFlattens = new ArrayList<Boolean>();
    toFlattens.add(false);
    POForEach pofe = new POForEach(new OperatorKey("", r.nextLong()), 1,
            inputs, toFlattens);

    php.addAsLeaf(pofe);
    POStore st = GenPhyOp.topStoreOp();
    php.addAsLeaf(st);
    run(php, "test/org/apache/pig/test/data/GoldenFiles/MRC19.gld");
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:27,
代码来源:TestMRCompiler.java

示例8: createPartialAgg

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * Translate POForEach in combiner into a POPartialAgg
 * @param combineFE
 * @return partial aggregate operator
 * @throws CloneNotSupportedException
 */
private static POPartialAgg createPartialAgg(POForEach combineFE) throws CloneNotSupportedException {
    String scope = combineFE.getOperatorKey().scope;
    POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope,
            NodeIdGenerator.getGenerator().getNextNodeId(scope)));
    poAgg.addOriginalLocation(combineFE.getAlias(), combineFE.getOriginalLocations());
    poAgg.setResultType(combineFE.getResultType());

    // first plan in combine foreach is the group key
    poAgg.setKeyPlan(combineFE.getInputPlans().get(0).clone());

    List<PhysicalPlan> valuePlans = Lists.newArrayList();
    for (int i=1; i<combineFE.getInputPlans().size(); i++) {
        valuePlans.add(combineFE.getInputPlans().get(i).clone());
    }
    poAgg.setValuePlans(valuePlans);
    return poAgg;
}
 

开发者ID:sigmoidanalytics,
项目名称:spork,
代码行数:24,
代码来源:CombinerOptimizerUtil.java

示例9: patchUpMap

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * Replace old POLocalRearrange with new pre-combine LR,
 * add new map foreach, new map-local-rearrange, and connect them
 *
 * @param mapPlan
 * @param preCombinerLR
 * @param mfe
 * @param mapAgg
 * @param mlr
 * @throws PlanException
 */
private static void patchUpMap(PhysicalPlan mapPlan, POPreCombinerLocalRearrange preCombinerLR,
        POForEach mfe, POPartialAgg mapAgg, POLocalRearrange mlr) throws PlanException {
    POLocalRearrange oldLR = (POLocalRearrange)mapPlan.getLeaves().get(0);
    mapPlan.replace(oldLR, preCombinerLR);

    mapPlan.add(mfe);
    mapPlan.connect(preCombinerLR, mfe);

    // the operator before local rearrange
    PhysicalOperator opBeforeLR = mfe;

    if (mapAgg != null) {
        mapPlan.add(mapAgg);
        mapPlan.connect(mfe, mapAgg);
        opBeforeLR = mapAgg;
    }

    mapPlan.add(mlr);
    mapPlan.connect(opBeforeLR, mlr);
}
 

开发者ID:sigmoidanalytics,
项目名称:spork,
代码行数:32,
代码来源:CombinerOptimizerUtil.java

示例10: visitPOForEach

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
    try{
        nonBlocking(op);
        List<PhysicalPlan> plans = op.getInputPlans();
        if (plans != null) {
            for (PhysicalPlan ep : plans) {
                processUDFs(ep);
            }
        }
        phyToTezOpMap.put(op, curTezOp);
    } catch (Exception e) {
        int errCode = 2034;
        String msg = "Error compiling operator " + op.getClass().getSimpleName();
        throw new TezCompilerException(msg, errCode, PigException.BUG, e);
    }
}
 

开发者ID:sigmoidanalytics,
项目名称:spork,
代码行数:18,
代码来源:TezCompiler.java

示例11: visitPOForEach

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Override
public void visitPOForEach(POForEach op) throws VisitorException{
    try{
        if (op.isMapSideOnly() && curMROp.isMapDone()) {
            FileSpec fSpec = getTempFileSpec();
            MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec);
            curMROp = startNew(fSpec, prevMROper);
            curMROp.mapPlan.addAsLeaf(op);
        } else {
            nonBlocking(op);
        }
        List<PhysicalPlan> plans = op.getInputPlans();
        if(plans!=null)
            for (PhysicalPlan plan : plans) {
                processUDFs(plan);
            }
        phyToMROpMap.put(op, curMROp);
    }catch(Exception e){
        int errCode = 2034;
        String msg = "Error compiling operator " + op.getClass().getSimpleName();
        throw new MRCompilerException(msg, errCode, PigException.BUG, e);
    }
}
 

开发者ID:sigmoidanalytics,
项目名称:spork,
代码行数:24,
代码来源:MRCompiler.java

示例12: visitPOForEach

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Override
public void visitPOForEach(POForEach nfe) throws VisitorException {
    if (revisit && nfe.getIllustrator() != null)
        return;
    List<PhysicalPlan> innerPlans = nfe.getInputPlans();
    for (PhysicalPlan innerPlan : innerPlans)
      innerPlanAttach(nfe, innerPlan);
    List<PhysicalOperator> preds = mPlan.getPredecessors(nfe);
    if (preds != null && preds.size() == 1
            && preds.get(0) instanceof POPackage
            && ((POPackage) preds.get(0)).getPkgr().isDistinct()) {
        // equivalence class of POPackage for DISTINCT needs to be used
        //instead of the succeeding POForEach's equivalence class
        setIllustrator(nfe, preds.get(0).getIllustrator().getEquivalenceClasses());
        nfe.getIllustrator().setEqClassesShared();
    } else
        setIllustrator(nfe, 1);
}
 

开发者ID:sigmoidanalytics,
项目名称:spork,
代码行数:19,
代码来源:IllustratorAttacher.java

示例13: processForEach

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
public boolean processForEach(POForEach fe) throws FrontendException {
    if (fe.getInputPlans().size() > 1) {
        // We don't optimize the case when POForEach has more than 1 input plan
        return true;
    }
    boolean r = false;
    try {
        r = collectColumnChain(fe.getInputPlans().get(0),
                columnChainInfo);
    } catch (PlanException e) {
        int errorCode = 2205;
        throw new FrontendException("Error visiting POForEach inner plan",
                errorCode, e);
    }
    // See something other than POProject in POForEach, set the flag to stop further processing
    return r;
}
 

开发者ID:PonIC,
项目名称:PonIC,
代码行数:18,
代码来源:SecondaryKeyOptimizer.java

示例14: createPartialAgg

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * Translate POForEach in combiner into a POPartialAgg
 * @param combineFE
 * @return partial aggregate operator
 * @throws CloneNotSupportedException 
 */
private POPartialAgg createPartialAgg(POForEach combineFE)
        throws CloneNotSupportedException {
    String scope = combineFE.getOperatorKey().scope;
    POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope, 
            NodeIdGenerator.getGenerator().getNextNodeId(scope)));
    poAgg.setAlias(combineFE.getAlias());
    poAgg.setResultType(combineFE.getResultType());

    //first plan in combine foreach is the group key
    poAgg.setKeyPlan(combineFE.getInputPlans().get(0).clone());

    List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();
    for(int i=1; i<combineFE.getInputPlans().size(); i++){
        valuePlans.add(combineFE.getInputPlans().get(i).clone());
    }
    poAgg.setValuePlans(valuePlans);
    return poAgg;
}
 

开发者ID:PonIC,
项目名称:PonIC,
代码行数:25,
代码来源:CombinerOptimizer.java

示例15: createForEachWithGrpProj

点赞 3

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * Create a new foreach with same scope,alias as given foreach
 * add an inner plan that projects the group column, which is going to be
 * the first input
 * @param foreach source foreach
 * @param keyType type for group-by key
 * @return new POForeach
 */
private POForEach createForEachWithGrpProj(POForEach foreach, byte keyType) {
    String scope = foreach.getOperatorKey().scope;
    POForEach newFE = new POForEach(createOperatorKey(scope), new ArrayList<PhysicalPlan>());
    newFE.setAlias(foreach.getAlias());
    newFE.setResultType(foreach.getResultType());
    //create plan that projects the group column 
    PhysicalPlan grpProjPlan = new PhysicalPlan();
    //group by column is the first column
    POProject proj = new POProject(createOperatorKey(scope), 1, 0);
    proj.setResultType(keyType);
    grpProjPlan.add(proj);

    newFE.addInputPlan(grpProjPlan, false);
    return newFE;
}
 

开发者ID:PonIC,
项目名称:PonIC,
代码行数:24,
代码来源:CombinerOptimizer.java

示例16: convert

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Override
 public JavaDStream<Tuple> convert(List<JavaDStream<Tuple>> predecessors,
POForEach physicalOperator) throws IOException {
     SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
     JavaDStream<Tuple> rdd = predecessors.get(0);
     ForEachFunction forEachFunction = new ForEachFunction(physicalOperator);
     return new JavaDStream<Tuple>(rdd.dstream().mapPartitions(forEachFunction, true, SparkUtil.getManifest(Tuple.class)),
     		SparkUtil.getManifest(Tuple.class));
 }
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:10,
代码来源:ForEachConverter.java

示例17: processRoot

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
public void processRoot(PhysicalOperator root) throws FrontendException {
    PhysicalOperator currentNode = root;
    while (currentNode!=null) {
        boolean sawInvalidPhysicalOper = false;
        if (currentNode instanceof PODistinct)
            sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
        else if (currentNode instanceof POSort)
            sawInvalidPhysicalOper = processSort((POSort)currentNode);
        else if (currentNode instanceof POProject)
            sawInvalidPhysicalOper = processProject((POProject)currentNode);
        else if (currentNode instanceof POUserFunc ||
                 currentNode instanceof POUnion ||
                 // We don't process foreach, since foreach is too complex to get right
                 currentNode instanceof POForEach)
            break;
        
        if (sawInvalidPhysicalOper)
            break;
        
        List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
        if (succs==null)
            currentNode = null;
        else {
            if (succs.size()>1) {
                int errorCode = 2215;
                throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
                        errorCode);
            }
            currentNode = succs.get(0);
        }
    }
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:33,
代码来源:SecondaryKeyOptimizer.java

示例18: scan

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
private void scan(MapReduceOper mr, PhysicalOperator op, String fileName) {
  	
if (op instanceof POUserFunc) {
	if (((POUserFunc)op).getFuncSpec().getClassName().equals(
			"org.apache.pig.impl.builtin.PartitionSkewedKeys")) {
		
		String[] ctorArgs = ((POUserFunc)op).getFuncSpec().getCtorArgs();
		ctorArgs[2] = fileName;    		
		return;
	}
}else if (op instanceof POForEach) {
	List<PhysicalPlan> pl = ((POForEach)op).getInputPlans();
	for(PhysicalPlan plan: pl) {
		List<PhysicalOperator> list = plan.getLeaves();
		for (PhysicalOperator pp: list) {
			scan(mr, pp, fileName);
		}
	}
}else{
	List<PhysicalOperator> preds = mr.reducePlan.getPredecessors(op);
   	
   	if (preds == null) {
   		return;
   	}
   	
   	for(PhysicalOperator p: preds) {	    		    	
   		scan(mr, p, fileName);	    		
   	}
}
  }
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:31,
代码来源:SampleOptimizer.java

示例19: addAlgebraicFuncToCombineFE

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
/**
 * add algebraic functions with appropriate projection to new foreach in combiner
 * @param cfe - the new foreach in combiner 
 * @param op2newpos - mapping of physical operator to position in input
 * @throws CloneNotSupportedException
 * @throws PlanException
 */
private void addAlgebraicFuncToCombineFE(POForEach cfe, Map<PhysicalOperator, Integer> op2newpos)
throws CloneNotSupportedException, PlanException {

    //an array that we will first populate with physical operators in order 
    //of their position in input. Used while adding plans to combine foreach
    // just so that output of combine foreach same positions as input. That
    // means the same operator to position mapping can be used by reduce as well
    PhysicalOperator[] opsInOrder = new PhysicalOperator[op2newpos.size() + 1];
    for(Map.Entry<PhysicalOperator, Integer> op2pos : op2newpos.entrySet()){
        opsInOrder[op2pos.getValue()] = op2pos.getKey();
    }

    // first position is used by group column and a plan has been added for it,
    //so start with 1
    for(int i=1; i < opsInOrder.length; i++){
        //create new inner plan for foreach
        //add cloned copy of given physical operator and a new project.
        // Even if the udf in query takes multiple input, only one project
        // needs to be added because input to this udf
        //will be the INITIAL version of udf evaluated in map. 
        PhysicalPlan newPlan = new PhysicalPlan();
        PhysicalOperator newOp = opsInOrder[i].clone();
        newPlan.add(newOp);
        POProject proj = new POProject(
                createOperatorKey(cfe.getOperatorKey().getScope()),
                1, i
        );
        proj.setResultType(DataType.BAG);
        newPlan.add(proj);
        newPlan.connect(proj, newOp);
        cfe.addInputPlan(newPlan, false);
    }
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:41,
代码来源:CombinerOptimizer.java

示例20: visitPOForEach

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Override
public void visitPOForEach(POForEach fe) throws VisitorException {
    // we need to allow foreach as input for distinct
    // but don't want it for other things (why?). So lets
    // flag the presence of Foreach and if this is present
    // with a distinct agg, it will be allowed.
    sawForeach = true;
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:9,
代码来源:CombinerOptimizer.java

示例21: isDiamondMROper

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
private boolean isDiamondMROper(MapReduceOper mr) {
    
    // We'll remove this mr as part of diamond query optimization
    // only if this mr is a trivial one, that is, it's plan
    // has either two operators (load followed by store) or three operators 
    // (the operator between the load and store must be a foreach,
    // introduced by casting operation).
    // 
    // We won't optimize in other cases where there're more operators
    // in the plan. Otherwise those operators world run multiple times 
    // in the successor MR operators which may not give better
    // performance.
    boolean rtn = false;
    if (isMapOnly(mr)) {
        PhysicalPlan pl = mr.mapPlan;
        if (pl.size() == 2 || pl.size() == 3) {               
            PhysicalOperator root = pl.getRoots().get(0);
            PhysicalOperator leaf = pl.getLeaves().get(0);
            if (root instanceof POLoad && leaf instanceof POStore) {
                if (pl.size() == 3) {
                    PhysicalOperator mid = pl.getSuccessors(root).get(0);
                    if (mid instanceof POForEach) {
                        rtn = true;
                    }                      
                } else {
                    rtn = true;
                }
            }
        }
    }
    return rtn;
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:33,
代码来源:MultiQueryOptimizer.java

示例22: testJoin

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Test
public void testJoin() throws Exception {
    ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
    ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
    prj1.setResultType(DataType.BAG);
    prj2.setResultType(DataType.BAG);
    List<Boolean> toBeFlattened = new LinkedList<Boolean>();
    toBeFlattened.add(true);
    toBeFlattened.add(true);
    PhysicalPlan plan1 = new PhysicalPlan();
    plan1.add(prj1);
    PhysicalPlan plan2 = new PhysicalPlan();
    plan2.add(prj2);
    List<PhysicalPlan> inputs = new LinkedList<PhysicalPlan>();
    inputs.add(plan1);
    inputs.add(plan2);
    PhysicalOperator poGen = new POForEach(new OperatorKey("", r.nextLong()), 1, inputs, toBeFlattened);
    //DataBag obtained = bf.newDefaultBag();
    for (Tuple t : cogroup) {
        /*plan1.attachInput(t);
        plan2.attachInput(t);*/
        poGen.attachInput(t);
        Result output = poGen.getNextTuple();
        while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
            //System.out.println(output.result);
            Tuple tObtained = (Tuple) output.result;
            assertTrue(tObtained.get(0).toString().equals(tObtained.get(2).toString()));
            //obtained.add((Tuple) output.result);
            output = poGen.getNextTuple();
        }
    }

}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:34,
代码来源:TestPOGenerate.java

示例23: testSingleNodePattern

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Test
public void testSingleNodePattern() throws PlanException{
    
    //create pattern 
    PatternPlan ptPlan = new PatternPlan();
    PatternNode ptFilNode = new PatternNode(ptPlan);
    ptFilNode.setClassName(POFilter.class);
    ptPlan.add(ptFilNode);
            
    //create plan with single ForEach node
    PhysicalPlan pplan = new PhysicalPlan();
    POForEach fe = new POForEach(getNewOpKey()); 
    pplan.add(fe);

    // verify that match is false
    boolean matched = ptPlan.match(pplan);
    assertFalse("plan not matched", matched);

    
    //add a filter to the plan (fe -> fil)
    POFilter fil = new POFilter(getNewOpKey()); 
    pplan.add(fil);
    pplan.connect(fe, fil);
    
    //verify that pattern matches
    matched = ptPlan.match(pplan);
    assertTrue("plan matched", matched);
    assertEquals(" class matched ", ptFilNode.getMatch(), fil);
    
    //test leaf/source settings in pattern node
    ptFilNode.setSourceNode(true);
    assertFalse("plan matched", ptPlan.match(pplan));
    
    ptFilNode.setSourceNode(false);
    ptFilNode.setLeafNode(true);
    assertTrue("plan matched", ptPlan.match(pplan));
    
    
    
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:41,
代码来源:TestPhyPatternMatch.java

示例24: testThreeNodePatternTwoParents

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Test
public void testThreeNodePatternTwoParents() throws PlanException, ExecException{
    //create pattern fe   fil
    //                \   /
    //                frJoin
    PatternPlan ptPlan = new PatternPlan();
    PatternNode ptFilNode = createPtNode(ptPlan, POFilter.class);
    PatternNode ptFENode = createPtNode(ptPlan, POForEach.class);
    PatternNode ptJoinNode = createPtNode(ptPlan, POFRJoin.class);

    ptPlan.connect(ptFilNode, ptJoinNode);
    ptPlan.connect(ptFENode, ptJoinNode);
    
    //create plan 
    PhysicalPlan pplan = new PhysicalPlan();
    POFilter fil = new POFilter(getNewOpKey()); 
    pplan.add(fil);
    assertFalse("plan not matched", ptPlan.match(pplan));
    
    POForEach fe = new POForEach(getNewOpKey()); 
    pplan.add(fe);
    assertFalse("plan not matched", ptPlan.match(pplan));
    
    POFRJoin join = new POFRJoin(getNewOpKey(), 0, null,
            new ArrayList<List<PhysicalPlan>>(), null, null, 0, false,null); 
    pplan.add(join);
    pplan.connect(fil, join);
    pplan.connect(fe, join);
    assertTrue("plan matched", ptPlan.match(pplan));
    assertEquals("test match node", ptFilNode.getMatch(), fil);
    assertEquals("test match node", ptFENode.getMatch(), fe);
    assertEquals("test match node", ptJoinNode.getMatch(), join);
    
}
 

开发者ID:sigmoidanalytics,
项目名称:spork-streaming,
代码行数:35,
代码来源:TestPhyPatternMatch.java

示例25: visitPOForEach

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
public void visitPOForEach(POForEach nfe) throws VisitorException {
    List<PhysicalPlan> inpPlans = nfe.getInputPlans();
    for (PhysicalPlan plan : inpPlans) {
        pushWalker(mCurrentWalker.spawnChildWalker(plan));
        visit();
        popWalker();
    }
}
 

开发者ID:sigmoidanalytics,
项目名称:spork,
代码行数:9,
代码来源:PhyPlanVisitor.java

示例26: convert

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
        POForEach physicalOperator) {
    SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
    RDD<Tuple> rdd = predecessors.get(0);
    ForEachFunction forEachFunction = new ForEachFunction(physicalOperator, this.confBytes);
    return rdd.toJavaRDD().mapPartitions(forEachFunction, true).rdd();
}
 

开发者ID:sigmoidanalytics,
项目名称:spork,
代码行数:9,
代码来源:ForEachConverter.java

示例27: processRoot

点赞 2

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; //导入依赖的package包/类
public void processRoot(PhysicalOperator root) throws FrontendException {
    PhysicalOperator currentNode = root;
    while (currentNode!=null) {
        boolean sawInvalidPhysicalOper = false;
        if (currentNode instanceof PODistinct)
            sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
        else if (currentNode instanceof POSort)
            sawInvalidPhysicalOper = processSort((POSort)currentNode);
        else if (currentNode instanceof POProject)
            sawInvalidPhysicalOper = processProject((POProject)currentNode);
        else if (currentNode instanceof POUserFunc ||
                 currentNode instanceof POUnion ||
                 // We don't process foreach, since foreach is too complex to get right
                 currentNode instanceof POForEach)
            break;

        if (sawInvalidPhysicalOper)
            break;

        List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
        if (succs==null)
            currentNode = null;
        else {
            if (succs.size()>1) {
                int errorCode = 2215;
                throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
                        errorCode);
            }
            currentNode = succs.get(0);
        }
    }
}
 

开发者ID:sigmoidanalytics,
项目名称:spork,
代码行数:33,
代码来源:SecondaryKeyOptimizerUtil.java


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)