本文整理汇总了Java中org.apache.giraph.conf.GiraphConstants类的典型用法代码示例。如果您正苦于以下问题:Java GiraphConstants类的具体用法?Java GiraphConstants怎么用?Java GiraphConstants使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
GiraphConstants类属于org.apache.giraph.conf包,在下文中一共展示了GiraphConstants类的27个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: runZooKeeperAndJob
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Run the ZooKeeper in-process and the job.
*
* @param zookeeperConfig Quorum peer configuration
* @param giraphJob Giraph job to run
* @return True if successful, false otherwise
*/
private static boolean runZooKeeperAndJob(
final ZookeeperConfig zookeeperConfig,
GiraphJob giraphJob) throws IOException {
final InProcessZooKeeperRunner.ZooKeeperServerRunner zookeeper =
new InProcessZooKeeperRunner.ZooKeeperServerRunner();
int port = zookeeper.start(zookeeperConfig);
LOG.info("Started test zookeeper on port " + port);
GiraphConstants.ZOOKEEPER_LIST.set(giraphJob.getConfiguration(),
"localhost:" + port);
try {
return giraphJob.run(true);
} catch (InterruptedException |
ClassNotFoundException | IOException e) {
LOG.error("runZooKeeperAndJob: Got exception on running", e);
} finally {
zookeeper.stop();
}
return false;
}
开发者ID:rayokota,
项目名称:hgraphdb,
代码行数:30,
代码来源:InternalHBaseVertexRunner.java
示例2: GiraphGraphComputer
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
public GiraphGraphComputer(final HadoopGraph hadoopGraph) {
super(hadoopGraph);
final Configuration configuration = hadoopGraph.configuration();
configuration.getKeys().forEachRemaining(key -> this.giraphConfiguration.set(key, configuration.getProperty(key).toString()));
this.giraphConfiguration.setMasterComputeClass(GiraphMemory.class);
this.giraphConfiguration.setVertexClass(GiraphVertex.class);
this.giraphConfiguration.setComputationClass(GiraphComputation.class);
this.giraphConfiguration.setWorkerContextClass(GiraphWorkerContext.class);
this.giraphConfiguration.setOutEdgesClass(EmptyOutEdges.class);
this.giraphConfiguration.setClass(GiraphConstants.VERTEX_ID_CLASS.getKey(), ObjectWritable.class, ObjectWritable.class);
this.giraphConfiguration.setClass(GiraphConstants.VERTEX_VALUE_CLASS.getKey(), VertexWritable.class, VertexWritable.class);
this.giraphConfiguration.setBoolean(GiraphConstants.STATIC_GRAPH.getKey(), true);
this.giraphConfiguration.setVertexInputFormatClass(GiraphVertexInputFormat.class);
this.giraphConfiguration.setVertexOutputFormatClass(GiraphVertexOutputFormat.class);
this.useWorkerThreadsInConfiguration = this.giraphConfiguration.getInt(GiraphConstants.MAX_WORKERS, -666) != -666 || this.giraphConfiguration.getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), -666) != -666;
}
开发者ID:PKUSilvester,
项目名称:LiteGraph,
代码行数:17,
代码来源:GiraphGraphComputer.java
示例3: getBaseConfiguration
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Override
public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) {
final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);
config.put("mapreduce.job.reduces", 2);
/// giraph configuration
config.put(GiraphConstants.LOCAL_TEST_MODE.getKey(), true); // local testing can only spawn one worker
config.put(GiraphConstants.MIN_WORKERS, 1);
config.put(GiraphConstants.MAX_WORKERS, 1);
config.put(GiraphConstants.SPLIT_MASTER_WORKER.getKey(), false);
config.put(GiraphConstants.ZOOKEEPER_IS_EXTERNAL.getKey(), false);
config.put(GiraphConstants.NETTY_SERVER_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
config.put(GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER.getKey(), false); // this prevents so many integration tests running out of threads
config.put(GiraphConstants.NETTY_USE_DIRECT_MEMORY.getKey(), true);
config.put(GiraphConstants.NUM_INPUT_THREADS.getKey(), 2);
config.put(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 2);
config.put(GiraphConstants.MAX_MASTER_SUPERSTEP_WAIT_MSECS.getKey(), TimeUnit.MINUTES.toMillis(60L));
config.put(GiraphConstants.VERTEX_OUTPUT_FORMAT_THREAD_SAFE.getKey(), false);
config.put(GiraphConstants.NUM_OUTPUT_THREADS.getKey(), 1);
return config;
}
开发者ID:PKUSilvester,
项目名称:LiteGraph,
代码行数:21,
代码来源:GiraphHadoopGraphProvider.java
示例4: testComputationState
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Test
public void testComputationState() throws IOException,
ClassNotFoundException, InterruptedException {
if (runningInDistributedMode()) {
System.out.println(
"testComputeContext: Ignore this test in distributed mode.");
return;
}
GiraphConfiguration conf = new GiraphConfiguration();
conf.setComputationClass(TestComputationStateComputation.class);
conf.setVertexInputFormatClass(
SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
conf.setWorkerContextClass(
TestComputationStateComputation.TestComputationStateWorkerContext.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
// Use multithreading
job.getConfiguration().setNumComputeThreads(
TestComputationStateComputation.NUM_COMPUTE_THREADS);
// Increase the number of vertices
GeneratedVertexReader.READER_VERTICES.set(job.getConfiguration(),
TestComputationStateComputation.NUM_VERTICES);
// Increase the number of partitions
GiraphConstants.USER_PARTITION_COUNT.set(job.getConfiguration(),
TestComputationStateComputation.NUM_PARTITIONS);
assertTrue(job.run(true));
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:27,
代码来源:TestComputationState.java
示例5: testMatchingType
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Test
public void testMatchingType() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf();
GiraphConstants.COMPUTATION_CLASS.set(conf,
GeneratedComputationMatch.class);
GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
SimpleSuperstepVertexInputFormat.class);
GiraphConstants.VERTEX_COMBINER_CLASS.set(conf,
GeneratedVertexMatchCombiner.class);
@SuppressWarnings("rawtypes")
GiraphConfigurationValidator<?, ?, ?, ?, ?> validator =
new GiraphConfigurationValidator(conf);
ImmutableClassesGiraphConfiguration gc = new
ImmutableClassesGiraphConfiguration(conf);
validator.validateConfiguration();
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:22,
代码来源:TestComputationTypes.java
示例6: testHiveMultithreadedOutput
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Test
public void testHiveMultithreadedOutput() throws Exception
{
String tableName = "test1";
hiveServer.createTable("CREATE TABLE " + tableName +
" (i1 BIGINT, i2 BIGINT) ");
GiraphConfiguration conf = new GiraphConfiguration();
conf.setVertexOutputFormatThreadSafe(true);
conf.setNumOutputThreads(2);
GiraphConstants.USER_PARTITION_COUNT.set(conf, 4);
runJob(tableName, conf);
HiveInputDescription inputDesc = new HiveInputDescription();
inputDesc.getTableDesc().setTableName(tableName);
verifyRecords(inputDesc);
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:19,
代码来源:HiveOutputTest.java
示例7: prepareConfiguration
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Override
protected void prepareConfiguration(GiraphConfiguration conf,
CommandLine cmd) {
conf.setComputationClass(ShortestPathsComputation.class);
if (EDGES_CLASS.getOptionIntValue(cmd, 1) == 1) {
conf.setOutEdgesClass(ArrayListEdges.class);
} else {
conf.setOutEdgesClass(HashMapEdges.class);
}
LOG.info("Using class " + GiraphConstants.COMPUTATION_CLASS.get(conf));
conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
if (!NO_COMBINER.optionTurnedOn(cmd)) {
conf.setCombinerClass(MinimumDoubleCombiner.class);
}
conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
BenchmarkOption.VERTICES.getOptionLongValue(cmd));
conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd));
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:20,
代码来源:ShortestPathsBenchmark.java
示例8: testGetBasePath
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Test
public void testGetBasePath() {
Configuration conf = new Configuration();
// Default is empty, everything goes in root znode
assertEquals("Default value for base path should be empty",
"", ZooKeeperManager.getBasePath(conf));
conf.set(GiraphConstants.BASE_ZNODE_KEY, "/howdy");
assertEquals("Base path should reflect value of " +
GiraphConstants.BASE_ZNODE_KEY,
"/howdy", ZooKeeperManager.getBasePath(conf));
conf.set(GiraphConstants.BASE_ZNODE_KEY, "no_slash");
try {
ZooKeeperManager.getBasePath(conf);
fail("Should not have allowed path without starting slash");
} catch (IllegalArgumentException iae) {
assertTrue(iae.getMessage().contains(GiraphConstants.BASE_ZNODE_KEY));
}
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:22,
代码来源:TestZooKeeperManager.java
示例9: prepareConfiguration
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Override
protected void prepareConfiguration(GiraphConfiguration conf,
CommandLine cmd) {
conf.setVertexClass(RandomMessageVertex.class);
conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
conf.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
conf.setMasterComputeClass(RandomMessageBenchmarkMasterCompute.class);
conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
BenchmarkOption.VERTICES.getOptionLongValue(cmd));
conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd));
conf.setInt(SUPERSTEP_COUNT,
BenchmarkOption.SUPERSTEPS.getOptionIntValue(cmd));
conf.setInt(RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
BYTES_PER_MESSAGE.getOptionIntValue(cmd));
conf.setInt(RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
MESSAGES_PER_EDGE.getOptionIntValue(cmd));
if (FLUSH_THREADS.optionTurnedOn(cmd)) {
conf.setInt(GiraphConstants.MSG_NUM_FLUSH_THREADS,
FLUSH_THREADS.getOptionIntValue(cmd));
}
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:23,
代码来源:RandomMessageBenchmark.java
示例10: Factory
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Constructor.
*
* @param config Hadoop configuration
*/
public Factory(ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
this.config = config;
String jobId = config.get("mapred.job.id", "Unknown Job");
int taskId = config.getTaskPartition();
List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
Collections.shuffle(userPaths);
directories = new String[userPaths.size()];
int i = 0;
for (String path : userPaths) {
String directory = path + File.separator + jobId + File.separator +
taskId + File.separator;
directories[i++] = directory;
if (!new File(directory).mkdirs()) {
LOG.error("SequentialFileMessageStore$Factory: Failed to create " +
directory);
}
}
this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
storeCounter = new AtomicInteger();
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:26,
代码来源:SequentialFileMessageStore.java
示例11: testGetsCalled
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Test
public void testGetsCalled() throws Exception {
assertEquals(0, Obs.postApp);
String[] graph = new String[] { "1", "2", "3" };
String klasses[] = new String[] {
Obs.class.getName(),
Obs.class.getName()
};
GiraphConfiguration conf = new GiraphConfiguration();
conf.set(GiraphConstants.MASTER_OBSERVER_CLASSES.getKey(),
arrayToString(klasses));
conf.setVertexClass(NoOpVertex.class);
conf.setOutEdgesClass(ByteArrayEdges.class);
conf.setVertexInputFormatClass(IntNullNullTextInputFormat.class);
InternalVertexRunner.run(conf, graph);
assertEquals(2, Obs.preApp);
// 3 supersteps + 1 input superstep * 2 observers = 8 callbacks
assertEquals(8, Obs.preSuperstep);
assertEquals(8, Obs.postSuperstep);
assertEquals(2, Obs.postApp);
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:26,
代码来源:TestMasterObserver.java
示例12: getMaxTasks
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Get the correct number of mappers based on the configuration
*
* @param conf Configuration to determine the number of mappers
* @return Maximum number of tasks
*/
public static int getMaxTasks(Configuration conf) {
int maxWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0);
boolean splitMasterWorker = GiraphConstants.SPLIT_MASTER_WORKER.get(conf);
int maxTasks = maxWorkers;
// if this is a YARN job, separate ZK should already be running
boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
if (splitMasterWorker && !isYarnJob) {
int zkServers = GiraphConstants.ZOOKEEPER_SERVER_COUNT.get(conf);
maxTasks += zkServers;
}
if (LOG.isDebugEnabled()) {
LOG.debug("getMaxTasks: Max workers = " + maxWorkers +
", split master/worker = " + splitMasterWorker +
", is YARN-only job = " + isYarnJob +
", total max tasks = " + maxTasks);
}
return maxTasks;
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:25,
代码来源:BspInputFormat.java
示例13: testDiskBackedPartitionStore
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Test
public void testDiskBackedPartitionStore() throws IOException {
File directory = Files.createTempDir();
GiraphConstants.PARTITIONS_DIRECTORY.set(
conf, new File(directory, "giraph_partitions").toString());
GiraphConstants.USE_OUT_OF_CORE_GRAPH.set(conf, true);
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 1);
PartitionStore<IntWritable, IntWritable, NullWritable, IntWritable>
partitionStore = new DiskBackedPartitionStore<IntWritable,
IntWritable, NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
partitionStore.shutdown();
GiraphConstants.MAX_PARTITIONS_IN_MEMORY.set(conf, 2);
partitionStore = new DiskBackedPartitionStore<IntWritable,
IntWritable, NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
partitionStore.shutdown();
FileUtils.deleteDirectory(directory);
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:22,
代码来源:TestPartitionStores.java
示例14: addFsResourcesToMap
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Populates the LocalResources list with the HDFS paths listed in
* the conf under GiraphConstants.GIRAPH_YARN_LIBJARS, and the
* GiraphConfiguration for this job. Also adds the Giraph default application
* jar as determined by GiraphYarnClient.GIRAPH_CLIENT_JAR constant.
* @param map the LocalResources list to populate.
* @param giraphConf the configuration to use to select jars to include.
* @param appId the ApplicationId, naming the the HDFS base dir for job jars.
*/
public static void addFsResourcesToMap(Map<String, LocalResource> map,
GiraphConfiguration giraphConf, ApplicationId appId) throws IOException {
FileSystem fs = FileSystem.get(giraphConf);
Path baseDir = YarnUtils.getFsCachePath(fs, appId);
boolean coreJarFound = false;
for (String fileName : giraphConf.getYarnLibJars().split(",")) {
if (fileName.length() > 0) {
Path filePath = new Path(baseDir, fileName);
LOG.info("Adding " + fileName + " to LocalResources for export.");
if (fileName.contains("giraph-core")) {
coreJarFound = true;
}
addFileToResourceMap(map, fs, filePath);
}
}
if (!coreJarFound) { // OK if you are running giraph-examples-jar-with-deps
LOG.warn("Job jars (-yj option) didn't include giraph-core.");
}
Path confPath = new Path(baseDir, GiraphConstants.GIRAPH_YARN_CONF_FILE);
addFileToResourceMap(map, fs, confPath);
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:31,
代码来源:YarnUtils.java
示例15: exportGiraphConfiguration
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Export our populated GiraphConfiguration as an XML file to be used by the
* ApplicationMaster's exec container, and register it with LocalResources.
* @param giraphConf the current Configuration object to be published.
* @param appId the ApplicationId to stamp this app's base HDFS resources dir.
*/
public static void exportGiraphConfiguration(GiraphConfiguration giraphConf,
ApplicationId appId) throws IOException {
File confFile = new File(System.getProperty("java.io.tmpdir"),
GiraphConstants.GIRAPH_YARN_CONF_FILE);
if (confFile.exists()) {
confFile.delete();
}
String localConfPath = confFile.getAbsolutePath();
FileOutputStream fos = null;
try {
fos = new FileOutputStream(localConfPath);
giraphConf.writeXml(fos);
FileSystem fs = FileSystem.get(giraphConf);
Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
GiraphConstants.GIRAPH_YARN_CONF_FILE);
fos.flush();
fs.copyFromLocalFile(false, true, new Path(localConfPath), hdfsConfPath);
} finally {
if (null != fos) {
fos.close();
}
}
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:30,
代码来源:YarnUtils.java
示例16: prepareConfiguration
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Override
protected void prepareConfiguration(GiraphConfiguration conf,
CommandLine cmd) {
conf.setVertexClass(ShortestPathsVertex.class);
if (EDGES_CLASS.getOptionIntValue(cmd, 1) == 1) {
conf.setOutEdgesClass(ArrayListEdges.class);
} else {
conf.setOutEdgesClass(HashMapEdges.class);
}
LOG.info("Using class " + GiraphConstants.VERTEX_CLASS.get(conf));
conf.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
if (!NO_COMBINER.optionTurnedOn(cmd)) {
conf.setCombinerClass(MinimumDoubleCombiner.class);
}
conf.setLong(PseudoRandomInputFormatConstants.AGGREGATE_VERTICES,
BenchmarkOption.VERTICES.getOptionLongValue(cmd));
conf.setLong(PseudoRandomInputFormatConstants.EDGES_PER_VERTEX,
BenchmarkOption.EDGES_PER_VERTEX.getOptionLongValue(cmd));
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:20,
代码来源:ShortestPathsBenchmark.java
示例17: setUp
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Before
public void setUp() throws IOException {
// Setup the conf
GiraphConfiguration tmpConf = new GiraphConfiguration();
GiraphConstants.COMPUTATION_CLASS.set(tmpConf, IntNoOpComputation.class);
conf = new ImmutableClassesGiraphConfiguration(tmpConf);
@SuppressWarnings("rawtypes")
Context context = mock(Context.class);
when(context.getConfiguration()).thenReturn(conf);
// Start the service
serverData = MockUtils.createNewServerData(conf, context);
serverData.prepareSuperstep();
workerInfo = new WorkerInfo();
server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData), workerInfo,
context);
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
client = new NettyClient(context, conf, new WorkerInfo());
client.connectAllAddresses(
Lists.<WorkerInfo>newArrayList(workerInfo));
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:25,
代码来源:RequestTest.java
示例18: prepare
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Before
public void prepare() throws IOException {
directory = Files.createTempDir();
Configuration.addDefaultResource("giraph-site.xml");
GiraphConfiguration initConfig = new GiraphConfiguration();
initConfig.setVertexClass(IntVertex.class);
GiraphConstants.MESSAGES_DIRECTORY.set(
initConfig, new File(directory, "giraph_messages").toString());
config = new ImmutableClassesGiraphConfiguration(initConfig);
testData = new TestData();
testData.maxId = 1000000;
testData.maxMessage = 1000000;
testData.maxNumberOfMessages = 100;
testData.numVertices = 50;
testData.numTimes = 10;
testData.numOfPartitions = 5;
testData.maxMessagesInMemory = 20;
service =
MockUtils.mockServiceGetVertexPartitionOwner(testData.numOfPartitions);
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:24,
代码来源:TestMessageStores.java
示例19: testGetsCalled
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Test
public void testGetsCalled() throws Exception {
assertEquals(0, Obs.postApp);
String[] graph = new String[] { "1", "2", "3" };
String klasses[] = new String[] {
Obs.class.getName(),
Obs.class.getName()
};
GiraphConfiguration conf = new GiraphConfiguration();
conf.set(GiraphConstants.MASTER_OBSERVER_CLASSES.getKey(),
arrayToString(klasses));
conf.setComputationClass(SimpleComputation.class);
conf.setOutEdgesClass(ByteArrayEdges.class);
conf.setVertexInputFormatClass(InputFormat.class);
InternalVertexRunner.run(conf, graph);
assertEquals(2, Obs.preApp);
// 3 supersteps + 1 input superstep * 2 observers = 8 callbacks
assertEquals(8, Obs.preSuperstep);
assertEquals(8, Obs.postSuperstep);
assertEquals(2, Obs.postApp);
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:26,
代码来源:TestMasterObserver.java
示例20: testMatchingType
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Test
public void testMatchingType() throws SecurityException,
NoSuchMethodException, NoSuchFieldException {
Configuration conf = getDefaultTestConf();
GiraphConstants.VERTEX_CLASS.set(conf, GeneratedVertexMatch.class);
GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
SimpleSuperstepVertexInputFormat.class);
GiraphConstants.VERTEX_COMBINER_CLASS.set(conf,
GeneratedVertexMatchCombiner.class);
@SuppressWarnings("rawtypes")
GiraphConfigurationValidator<?, ?, ?, ?> validator =
new GiraphConfigurationValidator(conf);
ImmutableClassesGiraphConfiguration gc = new
ImmutableClassesGiraphConfiguration(conf);
validator.validateConfiguration();
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:21,
代码来源:TestVertexTypes.java
示例21: setUp
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
@Before
public void setUp() throws IOException {
// Setup the conf
GiraphConfiguration tmpConf = new GiraphConfiguration();
GiraphConstants.VERTEX_CLASS.set(tmpConf, TestVertex.class);
conf = new ImmutableClassesGiraphConfiguration(tmpConf);
@SuppressWarnings("rawtypes")
Context context = mock(Context.class);
when(context.getConfiguration()).thenReturn(conf);
// Start the service
serverData = MockUtils.createNewServerData(conf, context);
workerInfo = new WorkerInfo();
server = new NettyServer(conf,
new WorkerRequestServerHandler.Factory(serverData), workerInfo,
context);
server.start();
workerInfo.setInetSocketAddress(server.getMyAddress());
client = new NettyClient(context, conf, new WorkerInfo());
client.connectAllAddresses(
Lists.<WorkerInfo>newArrayList(workerInfo));
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:24,
代码来源:RequestTest.java
示例22: Factory
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Constructor.
*
* @param config Hadoop configuration
*/
public Factory(ImmutableClassesGiraphConfiguration config) {
this.config = config;
String jobId = config.get("mapred.job.id", "Unknown Job");
int taskId = config.getTaskPartition();
List<String> userPaths = MESSAGES_DIRECTORY.getList(config);
Collections.shuffle(userPaths);
directories = new String[userPaths.size()];
int i = 0;
for (String path : userPaths) {
String directory = path + File.separator + jobId + File.separator +
taskId + File.separator;
directories[i++] = directory;
new File(directory).mkdirs();
}
this.bufferSize = GiraphConstants.MESSAGES_BUFFER_SIZE.get(config);
storeCounter = new AtomicInteger();
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:23,
代码来源:SequentialFileMessageStore.java
示例23: ServerData
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Constructor.
*
* @param service Service worker
* @param conf Configuration
* @param messageStoreFactory Factory for message stores
* @param context Mapper context
*/
public ServerData(
CentralizedServiceWorker<I, V, E, M> service,
ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
MessageStoreFactory<I, M, MessageStoreByPartition<I, M>>
messageStoreFactory,
Mapper<?, ?, ?, ?>.Context context) {
this.messageStoreFactory = messageStoreFactory;
currentMessageStore = messageStoreFactory.newStore();
incomingMessageStore = messageStoreFactory.newStore();
if (GiraphConstants.USE_OUT_OF_CORE_GRAPH.get(conf)) {
partitionStore =
new DiskBackedPartitionStore<I, V, E, M>(conf, context);
} else {
partitionStore =
new SimplePartitionStore<I, V, E, M>(conf, context);
}
edgeStore = new EdgeStore<I, V, E, M>(service, conf, context);
ownerAggregatorData = new OwnerAggregatorServerData(context, conf);
allAggregatorData = new AllAggregatorServerData(context, conf);
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:30,
代码来源:ServerData.java
示例24: checkAndAdjustPerTaskHeapSize
点赞 3
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Adjust the user-supplied <code>-yh</code> and <code>-w</code>
* settings if they are too small or large for the current cluster,
* and re-record the new settings in the GiraphConfiguration for export.
* @param gnar the GetNewAppResponse from the YARN ResourceManager.
*/
private void checkAndAdjustPerTaskHeapSize(GetNewApplicationResponse gnar) {
// do we have the right heap size on these cluster nodes to run our job?
final int minCapacity = gnar.getMinimumResourceCapability().getMemory();
final int maxCapacity = gnar.getMaximumResourceCapability().getMemory();
// make sure heap size is OK for this cluster's available containers
int giraphMem = giraphConf.getYarnTaskHeapMb();
if (giraphMem == GiraphConstants.GIRAPH_YARN_TASK_HEAP_MB_DEFAULT) {
LOG.info("Defaulting per-task heap size to " + giraphMem + "MB.");
}
if (giraphMem > maxCapacity) {
LOG.info("Giraph's request of heap MB per-task is more than the " +
"minimum; downgrading Giraph to" + maxCapacity + "MB.");
giraphMem = maxCapacity;
}
if (giraphMem < minCapacity) {
LOG.info("Giraph's request of heap MB per-task is less than the " +
"minimum; upgrading Giraph to " + minCapacity + "MB.");
giraphMem = minCapacity;
}
giraphConf.setYarnTaskHeapMb(giraphMem); // record any changes made
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:28,
代码来源:GiraphYarnClient.java
示例25: preApplication
点赞 2
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
public void preApplication() throws InstantiationException, IllegalAccessException {
final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration());
KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
final VertexProgram vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1));
this.memory = new GiraphMemory(this, vertexProgram);
}
开发者ID:PKUSilvester,
项目名称:LiteGraph,
代码行数:8,
代码来源:GiraphWorkerContext.java
示例26: buildResourceRequests
点赞 2
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Utility to construct the ResourceRequest for our resource ask: all the
* Giraph containers we need, and their memory/priority requirements.
* @return a list of ResourceRequests to send (just one, for Giraph tasks)
*/
private List<ResourceRequest> buildResourceRequests() {
// set up resource request for our Giraph BSP application
ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
resourceRequest.setHostName("*"); // hand pick our worker locality someday
Priority pri = Records.newRecord(Priority.class);
pri.setPriority(GiraphConstants.GIRAPH_YARN_PRIORITY);
resourceRequest.setPriority(pri);
Resource capability = Records.newRecord(Resource.class);
capability.setVirtualCores(1); // new YARN API, won't work version < 2.0.3
capability.setMemory(heapPerContainer);
resourceRequest.setCapability(capability);
resourceRequest.setNumContainers(containersToLaunch);
return ImmutableList.of(resourceRequest);
}
开发者ID:zfighter,
项目名称:giraph-research,
代码行数:20,
代码来源:GiraphApplicationMaster.java
示例27: getDefaultTestConf
点赞 2
import org.apache.giraph.conf.GiraphConstants; //导入依赖的package包/类
/**
* Just populate a conf with testing defaults that won't
* upset the GiraphConfigurationValidator.
* */
private Configuration getDefaultTestConf() {
Configuration conf = new Configuration();
conf.setInt(GiraphConstants.MAX_WORKERS, 1);
conf.setInt(GiraphConstants.MIN_WORKERS, 1);
conf.set(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.getKey(),
"org.apache.giraph.io.formats.DUMMY_TEST_VALUE");
return conf;
}
开发者ID:renato2099,
项目名称:giraph-gora,
代码行数:13,
代码来源:TestComputationTypes.java