• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Java JobConfig类的典型用法和代码示例

java 1次浏览

本文整理汇总了Java中org.apache.samza.config.JobConfig的典型用法代码示例。如果您正苦于以下问题:Java JobConfig类的具体用法?Java JobConfig怎么用?Java JobConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。

JobConfig类属于org.apache.samza.config包,在下文中一共展示了JobConfig类的38个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testReadFailsOnSerdeExceptions

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test(expected = SamzaException.class)
public void testReadFailsOnSerdeExceptions() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);

  // mock out a consumer that returns a single checkpoint IME
  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
  List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
      ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
  SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);

  SystemAdmin mockAdmin = newAdmin("0", "1");
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);

  // wire up an exception throwing serde with the checkpointmanager
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(), new KafkaCheckpointLogKeySerde());
  checkpointManager.register(TASK1);
  checkpointManager.start();

  // expect an exception from ExceptionThrowingSerde
  checkpointManager.readLastCheckpoint(TASK1);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:26,
代码来源:TestKafkaCheckpointManagerJava.java

示例2: testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);

  // mock out a consumer that returns a single checkpoint IME
  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
  List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
      ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
  SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);

  SystemAdmin mockAdmin = newAdmin("0", "1");
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);

  // wire up an exception throwing serde with the checkpointmanager
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      false, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(),
      new ExceptionThrowingCheckpointKeySerde());
  checkpointManager.register(TASK1);
  checkpointManager.start();

  // expect the read to succeed inspite of the exception from ExceptionThrowingSerde
  checkpointManager.readLastCheckpoint(TASK1);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:27,
代码来源:TestKafkaCheckpointManagerJava.java

示例3: ZkJobCoordinator

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
  this.config = config;

  this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);

  this.processorId = createProcessorId(config);
  this.zkUtils = zkUtils;
  // setup a listener for a session state change
  // we are mostly interested in "session closed" and "new session created" events
  zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
  LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
  leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
  this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
  this.barrier =  new ZkBarrierForVersionUpgrade(
      zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(),
      zkUtils,
      new ZkBarrierListenerImpl());
  this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
  this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
  debounceTimer = new ScheduleAfterDebounceTime();
  debounceTimer.setScheduledTaskCallback(throwable -> {
      LOG.error("Received exception from in JobCoordinator Processing!", throwable);
      stop();
    });
}
 

开发者ID:apache,
项目名称:samza,
代码行数:26,
代码来源:ZkJobCoordinator.java

示例4: calculateIntStreamPartitions

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private static void calculateIntStreamPartitions(JobGraph jobGraph, Config config) {
  int partitions = config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), StreamEdge.PARTITIONS_UNKNOWN);
  if (partitions < 0) {
    // use the following simple algo to figure out the partitions
    // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
    // partition will be further bounded by MAX_INFERRED_PARTITIONS.
    // This is important when running in hadoop where an HDFS input can have lots of files (partitions).
    int maxInPartitions = maxPartition(jobGraph.getSources());
    int maxOutPartitions = maxPartition(jobGraph.getSinks());
    partitions = Math.max(maxInPartitions, maxOutPartitions);

    if (partitions > MAX_INFERRED_PARTITIONS) {
      partitions = MAX_INFERRED_PARTITIONS;
      log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
          partitions, MAX_INFERRED_PARTITIONS));
    }
  }
  for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
    if (edge.getPartitionCount() <= 0) {
      edge.setPartitionCount(partitions);
    }
  }
}
 

开发者ID:apache,
项目名称:samza,
代码行数:24,
代码来源:ExecutionPlanner.java

示例5: ContainerProcessManager

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public ContainerProcessManager(Config config,
                               SamzaApplicationState state,
                               MetricsRegistryMap registry) {
  this.state = state;
  this.clusterManagerConfig = new ClusterManagerConfig(config);
  this.jobConfig = new JobConfig(config);

  this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();

  ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
  this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
  this.metrics = new ContainerProcessManagerMetrics(config, state, registry);

  if (this.hostAffinityEnabled) {
    this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, state);
  } else {
    this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
  }

  this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
  log.info("finished initialization of samza task manager");

}
 

开发者ID:apache,
项目名称:samza,
代码行数:24,
代码来源:ContainerProcessManager.java

示例6: getPartitionCountMonitor

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) {
  Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins();
  StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance());
  Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
  if (inputStreamsToMonitor.isEmpty()) {
    throw new SamzaException("Input streams to a job can not be empty.");
  }

  return new StreamPartitionCountMonitor(
      inputStreamsToMonitor,
      streamMetadata,
      metrics,
      new JobConfig(config).getMonitorPartitionChangeFrequency(),
      streamsChanged -> {
      // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
      if (hasDurableStores) {
        log.error("Input topic partition count changed in a job with durable state. Failing the job.");
        state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
      }
      coordinatorException = new PartitionChangeException("Input topic partition count changes detected.");
    });
}
 

开发者ID:apache,
项目名称:samza,
代码行数:23,
代码来源:ClusterBasedJobCoordinator.java

示例7: if

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
 * Gets the unique ID for the next operator in the graph. The ID is of the following format:
 * jobName-jobId-opCode-(userDefinedId|nextOpNum);
 *
 * @param opCode the {@link OpCode} of the next operator
 * @param userDefinedId the optional user-provided name of the next operator or null
 * @return the unique ID for the next operator in the graph
 */
/* package private */ String getNextOpId(OpCode opCode, String userDefinedId) {
  if (StringUtils.isNotBlank(userDefinedId) && !USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) {
    throw new SamzaException("Operator ID must not contain spaces and special characters: " + userDefinedId);
  }

  String nextOpId = String.format("%s-%s-%s-%s",
      config.get(JobConfig.JOB_NAME()),
      config.get(JobConfig.JOB_ID(), "1"),
      opCode.name().toLowerCase(),
      StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
  if (!operatorIds.add(nextOpId)) {
    throw new SamzaException(
        String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
  }
  nextOpNum++;
  return nextOpId;
}
 

开发者ID:apache,
项目名称:samza,
代码行数:26,
代码来源:StreamGraphImpl.java

示例8: runTask

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Override
public void runTask() {
  JobConfig jobConfig = new JobConfig(this.config);

  // validation
  String taskName = new TaskConfig(config).getTaskClass().getOrElse(null);
  if (taskName == null) {
    throw new SamzaException("Neither APP nor task.class are defined defined");
  }
  LOG.info("LocalApplicationRunner will run " + taskName);
  LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();

  StreamProcessor processor = createStreamProcessor(jobConfig, null, listener);

  numProcessorsToStart.set(1);
  listener.setProcessor(processor);
  processor.start();
}
 

开发者ID:apache,
项目名称:samza,
代码行数:19,
代码来源:LocalApplicationRunner.java

示例9: testDefaultPartitions

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testDefaultPartitions() {
  Map<String, String> map = new HashMap<>(config);
  map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
  Config cfg = new MapConfig(map);

  ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
  StreamGraphImpl streamGraph = createSimpleGraph();
  JobGraph jobGraph = planner.createJobGraph(streamGraph);
  planner.calculatePartitions(streamGraph, jobGraph);

  // the partitions should be the same as input1
  jobGraph.getIntermediateStreams().forEach(edge -> {
      assertTrue(edge.getPartitionCount() == DEFAULT_PARTITIONS);
    });
}
 

开发者ID:apache,
项目名称:samza,
代码行数:17,
代码来源:TestExecutionPlanner.java

示例10: testTriggerIntervalWithInvalidWindowMs

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalWithInvalidWindowMs() throws Exception {
  Map<String, String> map = new HashMap<>(config);
  map.put(TaskConfig.WINDOW_MS(), "-1");
  map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
  Config cfg = new MapConfig(map);

  ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
  StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
  ExecutionPlan plan = planner.plan(streamGraph);
  List<JobConfig> jobConfigs = plan.getJobConfigs();
  assertEquals(1, jobConfigs.size());

  // GCD of 8, 16, 1600 and 252 is 4
  assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
 

开发者ID:apache,
项目名称:samza,
代码行数:17,
代码来源:TestExecutionPlanner.java

示例11: createStreamOperatorTask

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamApplication app) throws Exception {
  ApplicationRunner runner = mock(ApplicationRunner.class);
  when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
  when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2"));

  TaskContextImpl taskContext = mock(TaskContextImpl.class);
  when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
      .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
          new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
  when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
  // need to return different stores for left and right side
  IntegerSerde integerSerde = new IntegerSerde();
  TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde));
  when(taskContext.getStore(eq("jobName-jobId-join-j1-L")))
      .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
  when(taskContext.getStore(eq("jobName-jobId-join-j1-R")))
      .thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));

  Config config = mock(Config.class);
  when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
  when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");

  StreamOperatorTask sot = new StreamOperatorTask(app, runner, clock);
  sot.init(config, taskContext);
  return sot;
}
 

开发者ID:apache,
项目名称:samza,
代码行数:27,
代码来源:TestJoinOperator.java

示例12: setup

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Before
public void setup() throws Exception {
  config = mock(Config.class);
  when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
  when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
  taskContext = mock(TaskContextImpl.class);
  runner = mock(ApplicationRunner.class);
  Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
  Serde storeValSerde = new IntegerEnvelopeSerde();

  when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
      .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
  when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());

  when(taskContext.getStore("jobName-jobId-window-w1"))
      .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
  when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
}
 

开发者ID:apache,
项目名称:samza,
代码行数:19,
代码来源:TestWindowOperator.java

示例13: buildStreamApplicationConfigMap

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
    String appName, String appId) {
  Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
      .put(TaskConfig.INPUT_STREAMS(), inputTopic)
      .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
      .put(TaskConfig.IGNORED_EXCEPTIONS(), "*")
      .put(ZkConfig.ZK_CONNECT, zkConnect())
      .put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY)
      .put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY)
      .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY)
      .put(ApplicationConfig.APP_NAME, appName)
      .put(ApplicationConfig.APP_ID, appId)
      .put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
      .put(JobConfig.JOB_NAME(), appName)
      .put(JobConfig.JOB_ID(), appId)
      .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
      .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
      .build();
  Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
  applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
  return applicationConfig;
}
 

开发者ID:apache,
项目名称:samza,
代码行数:23,
代码来源:TestZkLocalApplicationRunner.java

示例14: getBaseJobConfig

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private Map<String, String> getBaseJobConfig() {
  Map<String, String> configs = new HashMap<>();
  configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());

  configs.put(JobConfig.JOB_NAME(), "test-table-job");
  configs.put(JobConfig.PROCESSOR_ID(), "1");
  configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
  configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
  configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());

  // For intermediate streams
  configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
  configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
  configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
  configs.put("systems.kafka.samza.key.serde", "int");
  configs.put("systems.kafka.samza.msg.serde", "json");
  configs.put("systems.kafka.default.stream.replication.factor", "1");
  configs.put("job.default.system", "kafka");

  configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
  configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());

  return configs;
}
 

开发者ID:apache,
项目名称:samza,
代码行数:25,
代码来源:TestLocalTable.java

示例15: main

点赞 3

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public static void main(String [] args) throws Exception {
  CommandLine cmdline = new CommandLine();
  OptionParser parser = cmdline.parser();
  OptionSpec<String> validatorOpt = parser.accepts("metrics-validator", "The metrics validator class.")
                                          .withOptionalArg()
                                          .ofType(String.class).describedAs("com.foo.bar.ClassName");
  OptionSet options = cmdline.parser().parse(args);
  Config config = cmdline.loadConfig(options);
  MetricsValidator validator = null;
  if (options.has(validatorOpt)) {
    String validatorClass = options.valueOf(validatorOpt);
    validator = ClassLoaderHelper.<MetricsValidator>fromClassName(validatorClass);
  }

  YarnConfiguration hadoopConfig = new YarnConfiguration();
  hadoopConfig.set("fs.http.impl", HttpFileSystem.class.getName());
  hadoopConfig.set("fs.https.impl", HttpFileSystem.class.getName());
  ClientHelper clientHelper = new ClientHelper(hadoopConfig);

  new YarnJobValidationTool(new JobConfig(config), clientHelper.yarnClient(), validator).run();
}
 

开发者ID:apache,
项目名称:samza,
代码行数:22,
代码来源:YarnJobValidationTool.java

示例16: testCheckpointsAreReadFromOldestOffset

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testCheckpointsAreReadFromOldestOffset() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);

  // mock out a consumer that returns a single checkpoint IME
  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
  SystemConsumer mockConsumer = newConsumer(ImmutableList.of(
      ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0"))));

  String oldestOffset = "0";
  SystemAdmin mockAdmin = newAdmin(oldestOffset, "1");
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
  checkpointManager.register(TASK1);

  // 1. verify that consumer.register is called only during checkpointManager.start.
  // 2. verify that consumer.register is called with the oldest offset.
  // 3. verify that no other operation on the CheckpointManager re-invokes register since start offsets are set during
  // register
  verify(mockConsumer, times(0)).register(CHECKPOINT_SSP, oldestOffset);
  checkpointManager.start();
  verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);

  checkpointManager.readLastCheckpoint(TASK1);
  verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:31,
代码来源:TestKafkaCheckpointManagerJava.java

示例17: testAllMessagesInTheLogAreRead

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testAllMessagesInTheLogAreRead() throws Exception {
  KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
      CHECKPOINT_SYSTEM, 1);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);

  SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));

  int oldestOffset = 0;
  int newestOffset = 10;

  // mock out a consumer that returns ten checkpoint IMEs for the same ssp
  List<List<IncomingMessageEnvelope>> pollOutputs = new ArrayList<>();
  for(int offset = oldestOffset; offset <= newestOffset; offset++) {
    pollOutputs.add(ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, Integer.toString(offset))));
  }

  // return one message at a time from each poll simulating a KafkaConsumer with max.poll.records = 1
  SystemConsumer mockConsumer = newConsumer(pollOutputs);
  SystemAdmin mockAdmin = newAdmin(Integer.toString(oldestOffset), Integer.toString(newestOffset));
  SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);

  KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
      true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
  checkpointManager.register(TASK1);
  checkpointManager.start();

  // check that all ten messages are read, and the checkpoint is the newest message
  Checkpoint checkpoint = checkpointManager.readLastCheckpoint(TASK1);
  Assert.assertEquals(checkpoint.getOffsets(), ImmutableMap.of(ssp, Integer.toString(newestOffset)));
}
 

开发者ID:apache,
项目名称:samza,
代码行数:33,
代码来源:TestKafkaCheckpointManagerJava.java

示例18: getSystemStreamPartitionGrouper

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
 * Gets a SystemStreamPartitionGrouper object from the configuration.
 */
private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() {
  JobConfig jobConfig = new JobConfig(config);
  String factoryString = jobConfig.getSystemStreamPartitionGrouperFactory();
  SystemStreamPartitionGrouper grouper = Util.<SystemStreamPartitionGrouperFactory>getObj(factoryString).getSystemStreamPartitionGrouper(jobConfig);
  return grouper;
}
 

开发者ID:apache,
项目名称:samza,
代码行数:10,
代码来源:AzureJobCoordinator.java

示例19: ConfigManager

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public ConfigManager(Config config) {

    //get rm address and port
    if (!config.containsKey(rmAddressOpt) || !config.containsKey(rmPortOpt)) {
      throw new IllegalArgumentException("Missing config: the config file does not contain the rm host or port.");
    }
    String rmAddress = config.get(rmAddressOpt);
    int rmPort = config.getInt(rmPortOpt);

    //get job name and id;
    if (!config.containsKey(JobConfig.JOB_NAME())) {
      throw new IllegalArgumentException("Missing config: the config does not contain the job name");
    }
    jobName = config.get(JobConfig.JOB_NAME());
    jobID = config.getInt(JobConfig.JOB_ID(), 1);

    //set polling interval
    if (config.containsKey(pollingIntervalOpt)) {
      long pollingInterval = config.getLong(pollingIntervalOpt);
      if (pollingInterval <= 0) {
        throw new IllegalArgumentException("polling interval cannot be a negative value");
      }
      this.interval = pollingInterval;
    } else {
      this.interval = defaultPollingInterval;
    }

    this.config = config;
    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
    this.coordinatorStreamConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
    this.yarnUtil = new YarnUtil(rmAddress, rmPort);
  }
 

开发者ID:apache,
项目名称:samza,
代码行数:33,
代码来源:ConfigManager.java

示例20: getJobCoordinationZkPath

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public static String getJobCoordinationZkPath(Config config) {
  JobConfig jobConfig = new JobConfig(config);
  String appId = new ApplicationConfig(config).getGlobalAppId();
  String jobName = jobConfig.getName().isDefined()
      ? jobConfig.getName().get()
      : DEFAULT_JOB_NAME;
  String jobId = jobConfig.getJobId().isDefined()
      ? jobConfig.getJobId().get()
      : DEFAULT_JOB_ID;

  return String.format(JOB_COORDINATOR_ZK_PATH_FORMAT, appId, jobName, jobId);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:13,
代码来源:ZkJobCoordinatorFactory.java

示例21: getJobConfigs

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Override
public List<JobConfig> getJobConfigs() {
  String json = "";
  try {
    json = getPlanAsJson();
  } catch (Exception e) {
    log.warn("Failed to generate plan JSON", e);
  }

  final String planJson = json;
  return getJobNodes().stream().map(n -> n.generateConfig(planJson)).collect(Collectors.toList());
}
 

开发者ID:apache,
项目名称:samza,
代码行数:13,
代码来源:JobGraph.java

示例22: JobGraph

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
 * Create the physical graph from StreamGraph
 */
/* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
  JobGraph jobGraph = new JobGraph(config);
  Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
  Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
  Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
  Set<TableSpec> tables = new HashSet<>(streamGraph.getTables().keySet());
  intStreams.retainAll(sinkStreams);
  sourceStreams.removeAll(intStreams);
  sinkStreams.removeAll(intStreams);

  // For this phase, we have a single job node for the whole dag
  String jobName = config.get(JobConfig.JOB_NAME());
  String jobId = config.get(JobConfig.JOB_ID(), "1");
  JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph);

  // add sources
  sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));

  // add sinks
  sinkStreams.forEach(spec -> jobGraph.addSink(spec, node));

  // add intermediate streams
  intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));

  // add tables
  tables.forEach(spec -> jobGraph.addTable(spec, node));

  jobGraph.validate();

  return jobGraph;
}
 

开发者ID:apache,
项目名称:samza,
代码行数:35,
代码来源:ExecutionPlanner.java

示例23: getJobModel

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Override
public JobModel getJobModel() {
  JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
  Map<String, SystemAdmin> systemAdmins = new HashMap<>();
  for (String systemName: systemConfig.getSystemNames()) {
    String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
    if (systemFactoryClassName == null) {
      LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
      throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
    }
    SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
    systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
  }

  StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
      Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());

  String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));

  /** TODO:
   Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
   in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
   TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
   (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
   */
  return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
      Collections.singletonList(containerId));
}
 

开发者ID:apache,
项目名称:samza,
代码行数:29,
代码来源:PassthroughJobCoordinator.java

示例24: main

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
  Thread.setDefaultUncaughtExceptionHandler(
      new SamzaContainerExceptionHandler(() -> {
        log.info("Exiting process now.");
        System.exit(1);
      }));
  String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
  log.info(String.format("Got container ID: %s", containerId));
  String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
  log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
  int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
  JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
  Config config = jobModel.getConfig();
  JobConfig jobConfig = new JobConfig(config);
  if (jobConfig.getName().isEmpty()) {
    throw new SamzaException("can not find the job name");
  }
  String jobName = jobConfig.getName().get();
  String jobId = jobConfig.getJobId().getOrElse(ScalaToJavaUtils.defaultValue("1"));
  MDC.put("containerName", "samza-container-" + containerId);
  MDC.put("jobName", jobName);
  MDC.put("jobId", jobId);

  StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
  LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId);
  localContainerRunner.run(streamApp);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:28,
代码来源:LocalContainerRunner.java

示例25: getSystemStreamPartitionGrouper

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Override
public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) {
  if (!(new TaskConfigJava(config).getBroadcastSystemStreams().isEmpty())) {
    throw new ConfigException("The job configured with AllSspToSingleTaskGrouper cannot have broadcast streams.");
  }

  String processors = config.get(JobConfig.PROCESSOR_LIST());
  List<String> processorList = Arrays.asList(processors.split(","));
  if (processorList.isEmpty()) {
    throw new SamzaException("processor list cannot be empty!");
  }
  return new AllSspToSingleTaskGrouper(processorList);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:14,
代码来源:AllSspToSingleTaskGrouperFactory.java

示例26: testTriggerIntervalForWindowsAndJoins

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalForWindowsAndJoins() throws Exception {
  Map<String, String> map = new HashMap<>(config);
  map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
  Config cfg = new MapConfig(map);

  ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
  StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
  ExecutionPlan plan = planner.plan(streamGraph);
  List<JobConfig> jobConfigs = plan.getJobConfigs();
  assertEquals(1, jobConfigs.size());

  // GCD of 8, 16, 1600 and 252 is 4
  assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
 

开发者ID:apache,
项目名称:samza,
代码行数:16,
代码来源:TestExecutionPlanner.java

示例27: testTriggerIntervalForStatelessOperators

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalForStatelessOperators() throws Exception {
  Map<String, String> map = new HashMap<>(config);
  map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
  Config cfg = new MapConfig(map);

  ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
  StreamGraphImpl streamGraph = createSimpleGraph();
  ExecutionPlan plan = planner.plan(streamGraph);
  List<JobConfig> jobConfigs = plan.getJobConfigs();
  assertEquals(1, jobConfigs.size());
  assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
}
 

开发者ID:apache,
项目名称:samza,
代码行数:14,
代码来源:TestExecutionPlanner.java

示例28: testTriggerIntervalWhenWindowMsIsConfigured

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalWhenWindowMsIsConfigured() throws Exception {
  Map<String, String> map = new HashMap<>(config);
  map.put(TaskConfig.WINDOW_MS(), "2000");
  map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
  Config cfg = new MapConfig(map);

  ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
  StreamGraphImpl streamGraph = createSimpleGraph();
  ExecutionPlan plan = planner.plan(streamGraph);
  List<JobConfig> jobConfigs = plan.getJobConfigs();
  assertEquals(1, jobConfigs.size());
  assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
 

开发者ID:apache,
项目名称:samza,
代码行数:15,
代码来源:TestExecutionPlanner.java

示例29: testAppMasterWithFwk

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testAppMasterWithFwk() {
  Config conf = getConfig();
  state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));

  ContainerProcessManager taskManager = new ContainerProcessManager(
      new MapConfig(conf),
      state,
      new MetricsRegistryMap(),
      clusterResourceManager
  );
  taskManager.start();
  SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
  assertFalse(taskManager.shouldShutdown());
  taskManager.onResourceAllocated(container2);

  configVals.put(JobConfig.SAMZA_FWK_PATH(), "/export/content/whatever");
  Config config1 = new MapConfig(configVals);

  ContainerProcessManager taskManager1 = new ContainerProcessManager(
      new MapConfig(config),
      state,
      new MetricsRegistryMap(),
      clusterResourceManager
  );
  taskManager1.start();
  taskManager1.onResourceAllocated(container2);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:29,
代码来源:TestContainerProcessManager.java

示例30: testGetNextOpIdIncrementsId

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testGetNextOpIdIncrementsId() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
  when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");

  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
  assertEquals("jobName-1234-merge-0", graph.getNextOpId(OpCode.MERGE, null));
  assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName"));
  assertEquals("jobName-1234-map-2", graph.getNextOpId(OpCode.MAP, null));
}
 

开发者ID:apache,
项目名称:samza,
代码行数:13,
代码来源:TestStreamGraphImpl.java

示例31: testGetNextOpIdRejectsDuplicates

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test(expected = SamzaException.class)
public void testGetNextOpIdRejectsDuplicates() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  Config mockConfig = mock(Config.class);
  when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
  when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");

  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
  assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName"));
  graph.getNextOpId(OpCode.JOIN, "customName"); // should throw
}
 

开发者ID:apache,
项目名称:samza,
代码行数:12,
代码来源:TestStreamGraphImpl.java

示例32: testRunComplete

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testRunComplete()
    throws Exception {
  final Map<String, String> config = new HashMap<>();
  config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
  LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
  StreamApplication app = mock(StreamApplication.class);
  doNothing().when(app).init(anyObject(), anyObject());

  ExecutionPlan plan = mock(ExecutionPlan.class);
  when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
  when(plan.getPlanAsJson()).thenReturn("");
  when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
  doReturn(plan).when(runner).getExecutionPlan(any(), any());

  StreamProcessor sp = mock(StreamProcessor.class);
  ArgumentCaptor<StreamProcessorLifecycleListener> captor =
      ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);

  doAnswer(i ->
    {
      StreamProcessorLifecycleListener listener = captor.getValue();
      listener.onStart();
      listener.onShutdown();
      return null;
    }).when(sp).start();

  doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());

  runner.run(app);

  assertEquals(runner.status(app), ApplicationStatus.SuccessfulFinish);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:34,
代码来源:TestLocalApplicationRunner.java

示例33: testRunFailure

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testRunFailure()
    throws Exception {
  final Map<String, String> config = new HashMap<>();
  config.put(ApplicationConfig.PROCESSOR_ID, "0");
  LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
  StreamApplication app = mock(StreamApplication.class);
  doNothing().when(app).init(anyObject(), anyObject());

  ExecutionPlan plan = mock(ExecutionPlan.class);
  when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
  when(plan.getPlanAsJson()).thenReturn("");
  when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
  doReturn(plan).when(runner).getExecutionPlan(any(), any());

  Throwable t = new Throwable("test failure");
  StreamProcessor sp = mock(StreamProcessor.class);
  ArgumentCaptor<StreamProcessorLifecycleListener> captor =
      ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);

  doAnswer(i ->
    {
      StreamProcessorLifecycleListener listener = captor.getValue();
      listener.onFailure(t);
      return null;
    }).when(sp).start();

  doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());

  try {
    runner.run(app);
  } catch (Throwable th) {
    assertNotNull(th);
  }

  assertEquals(runner.status(app), ApplicationStatus.UnsuccessfulFinish);
}
 

开发者ID:apache,
项目名称:samza,
代码行数:38,
代码来源:TestLocalApplicationRunner.java

示例34: testgetStreamWithSystemAtDefaultScopeInConfig

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testgetStreamWithSystemAtDefaultScopeInConfig() {
  Config config = addConfigs(buildStreamConfig(STREAM_ID,
                                                StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
                              JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);

  AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
  StreamSpec spec = runner.getStreamSpec(STREAM_ID);

  assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
}
 

开发者ID:apache,
项目名称:samza,
代码行数:12,
代码来源:TestAbstractApplicationRunner.java

示例35: testgetStreamWithSystemAtBothScopesInConfig

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testgetStreamWithSystemAtBothScopesInConfig() {
  Config config = addConfigs(buildStreamConfig(STREAM_ID,
                                              StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
                                              StreamConfig.SYSTEM(), TEST_SYSTEM),
                              JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);

  AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
  StreamSpec spec = runner.getStreamSpec(STREAM_ID);

  assertEquals(TEST_SYSTEM, spec.getSystemName());
}
 

开发者ID:apache,
项目名称:samza,
代码行数:13,
代码来源:TestAbstractApplicationRunner.java

示例36: findJobInstances

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
 * Finds all the job instances in the specified path and adds a corresponding {@link JobInstance} and
 * {@link InstallationRecord} for each instance.
 *
 * @param jobInstallPath  the path to search for job instances.
 * @param jobs            the map to which the job instances will be added.
 */
private void findJobInstances(final File jobInstallPath, final Map<JobInstance, InstallationRecord> jobs) {
  try {
    String jobInstallCanonPath = jobInstallPath.getCanonicalPath();
    File configPath = Paths.get(jobInstallCanonPath, CFG_SUBPATH).toFile();
    if (!(configPath.exists() && configPath.isDirectory())) {
      log.debug("Config path not found: " + configPath);
      return;
    }

    for (File configFile : configPath.listFiles()) {

      if (configFile.isFile()) {

        String configFilePath = configFile.getCanonicalPath();
        Config config = jobConfigFactory.getConfig(new URI("file://" + configFilePath));

        if (config.containsKey(JobConfig.JOB_NAME()) && config.containsKey(JobConfig.STREAM_JOB_FACTORY_CLASS())) {

          String jobName = config.get(JobConfig.JOB_NAME());
          String jobId = config.get(JobConfig.JOB_ID(), "1");
          JobInstance jobInstance = new JobInstance(jobName, jobId);

          if (jobs.containsKey(jobInstance)) {
            throw new IllegalStateException(
                String.format("Found more than one job config with jobName:%s and jobId:%s", jobName, jobId));
          }
          InstallationRecord jobInstall =
              new InstallationRecord(jobName, jobId, jobInstallCanonPath, configFilePath, getBinPath(jobInstallCanonPath));
          jobs.put(jobInstance, jobInstall);
        }
      }
    }
  } catch (Exception e) {
    throw new SamzaException("Exception finding job instance in path: " + jobInstallPath, e);
  }
}
 

开发者ID:apache,
项目名称:samza,
代码行数:44,
代码来源:SimpleInstallationFinder.java

示例37: getCoordinatorSystemConfig

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
 * Builds coordinator system config for the {@param jobInstance}.
 * @param jobInstance the job instance to get the jobModel for.
 * @return the constructed coordinator system config.
 */
private Config getCoordinatorSystemConfig(JobInstance jobInstance) {
  try {
    InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance);
    ConfigFactory configFactory =  ClassLoaderHelper.fromClassName(taskResourceConfig.getJobConfigFactory());
    Config config = configFactory.getConfig(new URI(String.format("file://%s", record.getConfigFilePath())));
    Map<String, String> configMap = ImmutableMap.of(JobConfig.JOB_ID(), jobInstance.getJobId(),
                                                    JobConfig.JOB_NAME(), jobInstance.getJobName());
    return Util.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(config, configMap)));
  } catch (Exception e) {
    LOG.error(String.format("Failed to get coordinator stream config for job : %s", jobInstance), e);
    throw new SamzaException(e);
  }
}
 

开发者ID:apache,
项目名称:samza,
代码行数:19,
代码来源:SamzaTaskProxy.java

示例38: createConfigs

点赞 2

import org.apache.samza.config.JobConfig; //导入依赖的package包/类
protected Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic,
    int messageCount) {
  Map<String, String> configs = new HashMap<>();
  configs.putAll(StandaloneTestUtils
      .getStandaloneConfigs("test-job", "org.apache.samza.processor.TestZkStreamProcessor.TestStreamTask"));
  configs.putAll(StandaloneTestUtils
      .getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING,
          true));
  configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
  configs.put("app.messageCount", String.valueOf(messageCount));
  configs.put("app.outputTopic", outputTopic);
  configs.put("app.outputSystem", testSystem);
  configs.put(ZkConfig.ZK_CONNECT, zkConnect());

  configs.put("job.systemstreampartition.grouper.factory",
      "org.apache.samza.container.grouper.stream.GroupByPartitionFactory");
  configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");

  configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
  configs.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
  configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
  configs.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
  configs.put(ZkConfig.ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS);
  configs.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS);

  return configs;
}
 

开发者ID:apache,
项目名称:samza,
代码行数:28,
代码来源:TestZkStreamProcessorBase.java


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)