本文整理汇总了Java中org.apache.samza.config.JobConfig类的典型用法代码示例。如果您正苦于以下问题:Java JobConfig类的具体用法?Java JobConfig怎么用?Java JobConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
JobConfig类属于org.apache.samza.config包,在下文中一共展示了JobConfig类的38个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testReadFailsOnSerdeExceptions
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test(expected = SamzaException.class)
public void testReadFailsOnSerdeExceptions() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
// mock out a consumer that returns a single checkpoint IME
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
SystemAdmin mockAdmin = newAdmin("0", "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
// wire up an exception throwing serde with the checkpointmanager
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
// expect an exception from ExceptionThrowingSerde
checkpointManager.readLastCheckpoint(TASK1);
}
开发者ID:apache,
项目名称:samza,
代码行数:26,
代码来源:TestKafkaCheckpointManagerJava.java
示例2: testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testReadSucceedsOnKeySerdeExceptionsWhenValidationIsDisabled() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
// mock out a consumer that returns a single checkpoint IME
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
List<List<IncomingMessageEnvelope>> checkpointEnvelopes = ImmutableList.of(
ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0")));
SystemConsumer mockConsumer = newConsumer(checkpointEnvelopes);
SystemAdmin mockAdmin = newAdmin("0", "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
// wire up an exception throwing serde with the checkpointmanager
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
false, mockConfig, mock(MetricsRegistry.class), new ExceptionThrowingCheckpointSerde(),
new ExceptionThrowingCheckpointKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
// expect the read to succeed inspite of the exception from ExceptionThrowingSerde
checkpointManager.readLastCheckpoint(TASK1);
}
开发者ID:apache,
项目名称:samza,
代码行数:27,
代码来源:TestKafkaCheckpointManagerJava.java
示例3: ZkJobCoordinator
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
this.config = config;
this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
this.processorId = createProcessorId(config);
this.zkUtils = zkUtils;
// setup a listener for a session state change
// we are mostly interested in "session closed" and "new session created" events
zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
this.barrier = new ZkBarrierForVersionUpgrade(
zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(),
zkUtils,
new ZkBarrierListenerImpl());
this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
debounceTimer = new ScheduleAfterDebounceTime();
debounceTimer.setScheduledTaskCallback(throwable -> {
LOG.error("Received exception from in JobCoordinator Processing!", throwable);
stop();
});
}
开发者ID:apache,
项目名称:samza,
代码行数:26,
代码来源:ZkJobCoordinator.java
示例4: calculateIntStreamPartitions
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private static void calculateIntStreamPartitions(JobGraph jobGraph, Config config) {
int partitions = config.getInt(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), StreamEdge.PARTITIONS_UNKNOWN);
if (partitions < 0) {
// use the following simple algo to figure out the partitions
// partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
// partition will be further bounded by MAX_INFERRED_PARTITIONS.
// This is important when running in hadoop where an HDFS input can have lots of files (partitions).
int maxInPartitions = maxPartition(jobGraph.getSources());
int maxOutPartitions = maxPartition(jobGraph.getSinks());
partitions = Math.max(maxInPartitions, maxOutPartitions);
if (partitions > MAX_INFERRED_PARTITIONS) {
partitions = MAX_INFERRED_PARTITIONS;
log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
partitions, MAX_INFERRED_PARTITIONS));
}
}
for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
if (edge.getPartitionCount() <= 0) {
edge.setPartitionCount(partitions);
}
}
}
开发者ID:apache,
项目名称:samza,
代码行数:24,
代码来源:ExecutionPlanner.java
示例5: ContainerProcessManager
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public ContainerProcessManager(Config config,
SamzaApplicationState state,
MetricsRegistryMap registry) {
this.state = state;
this.clusterManagerConfig = new ClusterManagerConfig(config);
this.jobConfig = new JobConfig(config);
this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();
ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig);
this.clusterResourceManager = checkNotNull(factory.getClusterResourceManager(this, state));
this.metrics = new ContainerProcessManagerMetrics(config, state, registry);
if (this.hostAffinityEnabled) {
this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, state);
} else {
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
}
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
log.info("finished initialization of samza task manager");
}
开发者ID:apache,
项目名称:samza,
代码行数:24,
代码来源:ContainerProcessManager.java
示例6: getPartitionCountMonitor
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) {
Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins();
StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance());
Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
if (inputStreamsToMonitor.isEmpty()) {
throw new SamzaException("Input streams to a job can not be empty.");
}
return new StreamPartitionCountMonitor(
inputStreamsToMonitor,
streamMetadata,
metrics,
new JobConfig(config).getMonitorPartitionChangeFrequency(),
streamsChanged -> {
// Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
if (hasDurableStores) {
log.error("Input topic partition count changed in a job with durable state. Failing the job.");
state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
}
coordinatorException = new PartitionChangeException("Input topic partition count changes detected.");
});
}
开发者ID:apache,
项目名称:samza,
代码行数:23,
代码来源:ClusterBasedJobCoordinator.java
示例7: if
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
* Gets the unique ID for the next operator in the graph. The ID is of the following format:
* jobName-jobId-opCode-(userDefinedId|nextOpNum);
*
* @param opCode the {@link OpCode} of the next operator
* @param userDefinedId the optional user-provided name of the next operator or null
* @return the unique ID for the next operator in the graph
*/
/* package private */ String getNextOpId(OpCode opCode, String userDefinedId) {
if (StringUtils.isNotBlank(userDefinedId) && !USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) {
throw new SamzaException("Operator ID must not contain spaces and special characters: " + userDefinedId);
}
String nextOpId = String.format("%s-%s-%s-%s",
config.get(JobConfig.JOB_NAME()),
config.get(JobConfig.JOB_ID(), "1"),
opCode.name().toLowerCase(),
StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
if (!operatorIds.add(nextOpId)) {
throw new SamzaException(
String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId));
}
nextOpNum++;
return nextOpId;
}
开发者ID:apache,
项目名称:samza,
代码行数:26,
代码来源:StreamGraphImpl.java
示例8: runTask
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Override
public void runTask() {
JobConfig jobConfig = new JobConfig(this.config);
// validation
String taskName = new TaskConfig(config).getTaskClass().getOrElse(null);
if (taskName == null) {
throw new SamzaException("Neither APP nor task.class are defined defined");
}
LOG.info("LocalApplicationRunner will run " + taskName);
LocalStreamProcessorLifeCycleListener listener = new LocalStreamProcessorLifeCycleListener();
StreamProcessor processor = createStreamProcessor(jobConfig, null, listener);
numProcessorsToStart.set(1);
listener.setProcessor(processor);
processor.start();
}
开发者ID:apache,
项目名称:samza,
代码行数:19,
代码来源:LocalApplicationRunner.java
示例9: testDefaultPartitions
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testDefaultPartitions() {
Map<String, String> map = new HashMap<>(config);
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
StreamGraphImpl streamGraph = createSimpleGraph();
JobGraph jobGraph = planner.createJobGraph(streamGraph);
planner.calculatePartitions(streamGraph, jobGraph);
// the partitions should be the same as input1
jobGraph.getIntermediateStreams().forEach(edge -> {
assertTrue(edge.getPartitionCount() == DEFAULT_PARTITIONS);
});
}
开发者ID:apache,
项目名称:samza,
代码行数:17,
代码来源:TestExecutionPlanner.java
示例10: testTriggerIntervalWithInvalidWindowMs
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalWithInvalidWindowMs() throws Exception {
Map<String, String> map = new HashMap<>(config);
map.put(TaskConfig.WINDOW_MS(), "-1");
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
// GCD of 8, 16, 1600 and 252 is 4
assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
开发者ID:apache,
项目名称:samza,
代码行数:17,
代码来源:TestExecutionPlanner.java
示例11: createStreamOperatorTask
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private StreamOperatorTask createStreamOperatorTask(Clock clock, StreamApplication app) throws Exception {
ApplicationRunner runner = mock(ApplicationRunner.class);
when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2"));
TaskContextImpl taskContext = mock(TaskContextImpl.class);
when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
// need to return different stores for left and right side
IntegerSerde integerSerde = new IntegerSerde();
TimestampedValueSerde timestampedValueSerde = new TimestampedValueSerde(new KVSerde(integerSerde, integerSerde));
when(taskContext.getStore(eq("jobName-jobId-join-j1-L")))
.thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
when(taskContext.getStore(eq("jobName-jobId-join-j1-R")))
.thenReturn(new TestInMemoryStore(integerSerde, timestampedValueSerde));
Config config = mock(Config.class);
when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
StreamOperatorTask sot = new StreamOperatorTask(app, runner, clock);
sot.init(config, taskContext);
return sot;
}
开发者ID:apache,
项目名称:samza,
代码行数:27,
代码来源:TestJoinOperator.java
示例12: setup
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Before
public void setup() throws Exception {
config = mock(Config.class);
when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
taskContext = mock(TaskContextImpl.class);
runner = mock(ApplicationRunner.class);
Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
Serde storeValSerde = new IntegerEnvelopeSerde();
when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(taskContext.getStore("jobName-jobId-window-w1"))
.thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
}
开发者ID:apache,
项目名称:samza,
代码行数:19,
代码来源:TestWindowOperator.java
示例13: buildStreamApplicationConfigMap
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
String appName, String appId) {
Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
.put(TaskConfig.INPUT_STREAMS(), inputTopic)
.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
.put(TaskConfig.IGNORED_EXCEPTIONS(), "*")
.put(ZkConfig.ZK_CONNECT, zkConnect())
.put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY)
.put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY)
.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY)
.put(ApplicationConfig.APP_NAME, appName)
.put(ApplicationConfig.APP_ID, appId)
.put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
.put(JobConfig.JOB_NAME(), appName)
.put(JobConfig.JOB_ID(), appId)
.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
.build();
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
return applicationConfig;
}
开发者ID:apache,
项目名称:samza,
代码行数:23,
代码来源:TestZkLocalApplicationRunner.java
示例14: getBaseJobConfig
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
private Map<String, String> getBaseJobConfig() {
Map<String, String> configs = new HashMap<>();
configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
configs.put(JobConfig.JOB_NAME(), "test-table-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
// For intermediate streams
configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
configs.put("systems.kafka.samza.key.serde", "int");
configs.put("systems.kafka.samza.msg.serde", "json");
configs.put("systems.kafka.default.stream.replication.factor", "1");
configs.put("job.default.system", "kafka");
configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
return configs;
}
开发者ID:apache,
项目名称:samza,
代码行数:25,
代码来源:TestLocalTable.java
示例15: main
点赞 3
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public static void main(String [] args) throws Exception {
CommandLine cmdline = new CommandLine();
OptionParser parser = cmdline.parser();
OptionSpec<String> validatorOpt = parser.accepts("metrics-validator", "The metrics validator class.")
.withOptionalArg()
.ofType(String.class).describedAs("com.foo.bar.ClassName");
OptionSet options = cmdline.parser().parse(args);
Config config = cmdline.loadConfig(options);
MetricsValidator validator = null;
if (options.has(validatorOpt)) {
String validatorClass = options.valueOf(validatorOpt);
validator = ClassLoaderHelper.<MetricsValidator>fromClassName(validatorClass);
}
YarnConfiguration hadoopConfig = new YarnConfiguration();
hadoopConfig.set("fs.http.impl", HttpFileSystem.class.getName());
hadoopConfig.set("fs.https.impl", HttpFileSystem.class.getName());
ClientHelper clientHelper = new ClientHelper(hadoopConfig);
new YarnJobValidationTool(new JobConfig(config), clientHelper.yarnClient(), validator).run();
}
开发者ID:apache,
项目名称:samza,
代码行数:22,
代码来源:YarnJobValidationTool.java
示例16: testCheckpointsAreReadFromOldestOffset
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testCheckpointsAreReadFromOldestOffset() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
// mock out a consumer that returns a single checkpoint IME
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
SystemConsumer mockConsumer = newConsumer(ImmutableList.of(
ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, "0"))));
String oldestOffset = "0";
SystemAdmin mockAdmin = newAdmin(oldestOffset, "1");
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
// 1. verify that consumer.register is called only during checkpointManager.start.
// 2. verify that consumer.register is called with the oldest offset.
// 3. verify that no other operation on the CheckpointManager re-invokes register since start offsets are set during
// register
verify(mockConsumer, times(0)).register(CHECKPOINT_SSP, oldestOffset);
checkpointManager.start();
verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);
checkpointManager.readLastCheckpoint(TASK1);
verify(mockConsumer, times(1)).register(CHECKPOINT_SSP, oldestOffset);
}
开发者ID:apache,
项目名称:samza,
代码行数:31,
代码来源:TestKafkaCheckpointManagerJava.java
示例17: testAllMessagesInTheLogAreRead
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testAllMessagesInTheLogAreRead() throws Exception {
KafkaStreamSpec checkpointSpec = new KafkaStreamSpec(CHECKPOINT_TOPIC, CHECKPOINT_TOPIC,
CHECKPOINT_SYSTEM, 1);
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.SSP_GROUPER_FACTORY())).thenReturn(GROUPER_FACTORY_CLASS);
SystemStreamPartition ssp = new SystemStreamPartition("system-1", "input-topic", new Partition(0));
int oldestOffset = 0;
int newestOffset = 10;
// mock out a consumer that returns ten checkpoint IMEs for the same ssp
List<List<IncomingMessageEnvelope>> pollOutputs = new ArrayList<>();
for(int offset = oldestOffset; offset <= newestOffset; offset++) {
pollOutputs.add(ImmutableList.of(newCheckpointEnvelope(TASK1, ssp, Integer.toString(offset))));
}
// return one message at a time from each poll simulating a KafkaConsumer with max.poll.records = 1
SystemConsumer mockConsumer = newConsumer(pollOutputs);
SystemAdmin mockAdmin = newAdmin(Integer.toString(oldestOffset), Integer.toString(newestOffset));
SystemFactory factory = newFactory(mock(SystemProducer.class), mockConsumer, mockAdmin);
KafkaCheckpointManager checkpointManager = new KafkaCheckpointManager(checkpointSpec, factory,
true, mockConfig, mock(MetricsRegistry.class), new CheckpointSerde(), new KafkaCheckpointLogKeySerde());
checkpointManager.register(TASK1);
checkpointManager.start();
// check that all ten messages are read, and the checkpoint is the newest message
Checkpoint checkpoint = checkpointManager.readLastCheckpoint(TASK1);
Assert.assertEquals(checkpoint.getOffsets(), ImmutableMap.of(ssp, Integer.toString(newestOffset)));
}
开发者ID:apache,
项目名称:samza,
代码行数:33,
代码来源:TestKafkaCheckpointManagerJava.java
示例18: getSystemStreamPartitionGrouper
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
* Gets a SystemStreamPartitionGrouper object from the configuration.
*/
private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() {
JobConfig jobConfig = new JobConfig(config);
String factoryString = jobConfig.getSystemStreamPartitionGrouperFactory();
SystemStreamPartitionGrouper grouper = Util.<SystemStreamPartitionGrouperFactory>getObj(factoryString).getSystemStreamPartitionGrouper(jobConfig);
return grouper;
}
开发者ID:apache,
项目名称:samza,
代码行数:10,
代码来源:AzureJobCoordinator.java
示例19: ConfigManager
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public ConfigManager(Config config) {
//get rm address and port
if (!config.containsKey(rmAddressOpt) || !config.containsKey(rmPortOpt)) {
throw new IllegalArgumentException("Missing config: the config file does not contain the rm host or port.");
}
String rmAddress = config.get(rmAddressOpt);
int rmPort = config.getInt(rmPortOpt);
//get job name and id;
if (!config.containsKey(JobConfig.JOB_NAME())) {
throw new IllegalArgumentException("Missing config: the config does not contain the job name");
}
jobName = config.get(JobConfig.JOB_NAME());
jobID = config.getInt(JobConfig.JOB_ID(), 1);
//set polling interval
if (config.containsKey(pollingIntervalOpt)) {
long pollingInterval = config.getLong(pollingIntervalOpt);
if (pollingInterval <= 0) {
throw new IllegalArgumentException("polling interval cannot be a negative value");
}
this.interval = pollingInterval;
} else {
this.interval = defaultPollingInterval;
}
this.config = config;
CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
this.coordinatorStreamConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
this.yarnUtil = new YarnUtil(rmAddress, rmPort);
}
开发者ID:apache,
项目名称:samza,
代码行数:33,
代码来源:ConfigManager.java
示例20: getJobCoordinationZkPath
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public static String getJobCoordinationZkPath(Config config) {
JobConfig jobConfig = new JobConfig(config);
String appId = new ApplicationConfig(config).getGlobalAppId();
String jobName = jobConfig.getName().isDefined()
? jobConfig.getName().get()
: DEFAULT_JOB_NAME;
String jobId = jobConfig.getJobId().isDefined()
? jobConfig.getJobId().get()
: DEFAULT_JOB_ID;
return String.format(JOB_COORDINATOR_ZK_PATH_FORMAT, appId, jobName, jobId);
}
开发者ID:apache,
项目名称:samza,
代码行数:13,
代码来源:ZkJobCoordinatorFactory.java
示例21: getJobConfigs
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Override
public List<JobConfig> getJobConfigs() {
String json = "";
try {
json = getPlanAsJson();
} catch (Exception e) {
log.warn("Failed to generate plan JSON", e);
}
final String planJson = json;
return getJobNodes().stream().map(n -> n.generateConfig(planJson)).collect(Collectors.toList());
}
开发者ID:apache,
项目名称:samza,
代码行数:13,
代码来源:JobGraph.java
示例22: JobGraph
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
* Create the physical graph from StreamGraph
*/
/* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
JobGraph jobGraph = new JobGraph(config);
Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
Set<TableSpec> tables = new HashSet<>(streamGraph.getTables().keySet());
intStreams.retainAll(sinkStreams);
sourceStreams.removeAll(intStreams);
sinkStreams.removeAll(intStreams);
// For this phase, we have a single job node for the whole dag
String jobName = config.get(JobConfig.JOB_NAME());
String jobId = config.get(JobConfig.JOB_ID(), "1");
JobNode node = jobGraph.getOrCreateJobNode(jobName, jobId, streamGraph);
// add sources
sourceStreams.forEach(spec -> jobGraph.addSource(spec, node));
// add sinks
sinkStreams.forEach(spec -> jobGraph.addSink(spec, node));
// add intermediate streams
intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
// add tables
tables.forEach(spec -> jobGraph.addTable(spec, node));
jobGraph.validate();
return jobGraph;
}
开发者ID:apache,
项目名称:samza,
代码行数:35,
代码来源:ExecutionPlanner.java
示例23: getJobModel
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Override
public JobModel getJobModel() {
JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
Map<String, SystemAdmin> systemAdmins = new HashMap<>();
for (String systemName: systemConfig.getSystemNames()) {
String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
if (systemFactoryClassName == null) {
LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
}
SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
}
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
/** TODO:
Locality Manager seems to be required in JC for reading locality info and grouping tasks intelligently and also,
in SamzaContainer for writing locality info to the coordinator stream. This closely couples together
TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
(job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
*/
return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
Collections.singletonList(containerId));
}
开发者ID:apache,
项目名称:samza,
代码行数:29,
代码来源:PassthroughJobCoordinator.java
示例24: main
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Thread.setDefaultUncaughtExceptionHandler(
new SamzaContainerExceptionHandler(() -> {
log.info("Exiting process now.");
System.exit(1);
}));
String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
log.info(String.format("Got container ID: %s", containerId));
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
Config config = jobModel.getConfig();
JobConfig jobConfig = new JobConfig(config);
if (jobConfig.getName().isEmpty()) {
throw new SamzaException("can not find the job name");
}
String jobName = jobConfig.getName().get();
String jobId = jobConfig.getJobId().getOrElse(ScalaToJavaUtils.defaultValue("1"));
MDC.put("containerName", "samza-container-" + containerId);
MDC.put("jobName", jobName);
MDC.put("jobId", jobId);
StreamApplication streamApp = TaskFactoryUtil.createStreamApplication(config);
LocalContainerRunner localContainerRunner = new LocalContainerRunner(jobModel, containerId);
localContainerRunner.run(streamApp);
}
开发者ID:apache,
项目名称:samza,
代码行数:28,
代码来源:LocalContainerRunner.java
示例25: getSystemStreamPartitionGrouper
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Override
public SystemStreamPartitionGrouper getSystemStreamPartitionGrouper(Config config) {
if (!(new TaskConfigJava(config).getBroadcastSystemStreams().isEmpty())) {
throw new ConfigException("The job configured with AllSspToSingleTaskGrouper cannot have broadcast streams.");
}
String processors = config.get(JobConfig.PROCESSOR_LIST());
List<String> processorList = Arrays.asList(processors.split(","));
if (processorList.isEmpty()) {
throw new SamzaException("processor list cannot be empty!");
}
return new AllSspToSingleTaskGrouper(processorList);
}
开发者ID:apache,
项目名称:samza,
代码行数:14,
代码来源:AllSspToSingleTaskGrouperFactory.java
示例26: testTriggerIntervalForWindowsAndJoins
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalForWindowsAndJoins() throws Exception {
Map<String, String> map = new HashMap<>(config);
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
// GCD of 8, 16, 1600 and 252 is 4
assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
开发者ID:apache,
项目名称:samza,
代码行数:16,
代码来源:TestExecutionPlanner.java
示例27: testTriggerIntervalForStatelessOperators
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalForStatelessOperators() throws Exception {
Map<String, String> map = new HashMap<>(config);
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
StreamGraphImpl streamGraph = createSimpleGraph();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
}
开发者ID:apache,
项目名称:samza,
代码行数:14,
代码来源:TestExecutionPlanner.java
示例28: testTriggerIntervalWhenWindowMsIsConfigured
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalWhenWindowMsIsConfigured() throws Exception {
Map<String, String> map = new HashMap<>(config);
map.put(TaskConfig.WINDOW_MS(), "2000");
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
StreamGraphImpl streamGraph = createSimpleGraph();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
开发者ID:apache,
项目名称:samza,
代码行数:15,
代码来源:TestExecutionPlanner.java
示例29: testAppMasterWithFwk
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testAppMasterWithFwk() {
Config conf = getConfig();
state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
new MetricsRegistryMap(),
clusterResourceManager
);
taskManager.start();
SamzaResource container2 = new SamzaResource(1, 1024, "", "id0");
assertFalse(taskManager.shouldShutdown());
taskManager.onResourceAllocated(container2);
configVals.put(JobConfig.SAMZA_FWK_PATH(), "/export/content/whatever");
Config config1 = new MapConfig(configVals);
ContainerProcessManager taskManager1 = new ContainerProcessManager(
new MapConfig(config),
state,
new MetricsRegistryMap(),
clusterResourceManager
);
taskManager1.start();
taskManager1.onResourceAllocated(container2);
}
开发者ID:apache,
项目名称:samza,
代码行数:29,
代码来源:TestContainerProcessManager.java
示例30: testGetNextOpIdIncrementsId
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testGetNextOpIdIncrementsId() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
assertEquals("jobName-1234-merge-0", graph.getNextOpId(OpCode.MERGE, null));
assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName"));
assertEquals("jobName-1234-map-2", graph.getNextOpId(OpCode.MAP, null));
}
开发者ID:apache,
项目名称:samza,
代码行数:13,
代码来源:TestStreamGraphImpl.java
示例31: testGetNextOpIdRejectsDuplicates
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test(expected = SamzaException.class)
public void testGetNextOpIdRejectsDuplicates() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
assertEquals("jobName-1234-join-customName", graph.getNextOpId(OpCode.JOIN, "customName"));
graph.getNextOpId(OpCode.JOIN, "customName"); // should throw
}
开发者ID:apache,
项目名称:samza,
代码行数:12,
代码来源:TestStreamGraphImpl.java
示例32: testRunComplete
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testRunComplete()
throws Exception {
final Map<String, String> config = new HashMap<>();
config.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
StreamApplication app = mock(StreamApplication.class);
doNothing().when(app).init(anyObject(), anyObject());
ExecutionPlan plan = mock(ExecutionPlan.class);
when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
when(plan.getPlanAsJson()).thenReturn("");
when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
doReturn(plan).when(runner).getExecutionPlan(any(), any());
StreamProcessor sp = mock(StreamProcessor.class);
ArgumentCaptor<StreamProcessorLifecycleListener> captor =
ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
doAnswer(i ->
{
StreamProcessorLifecycleListener listener = captor.getValue();
listener.onStart();
listener.onShutdown();
return null;
}).when(sp).start();
doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());
runner.run(app);
assertEquals(runner.status(app), ApplicationStatus.SuccessfulFinish);
}
开发者ID:apache,
项目名称:samza,
代码行数:34,
代码来源:TestLocalApplicationRunner.java
示例33: testRunFailure
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testRunFailure()
throws Exception {
final Map<String, String> config = new HashMap<>();
config.put(ApplicationConfig.PROCESSOR_ID, "0");
LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
StreamApplication app = mock(StreamApplication.class);
doNothing().when(app).init(anyObject(), anyObject());
ExecutionPlan plan = mock(ExecutionPlan.class);
when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
when(plan.getPlanAsJson()).thenReturn("");
when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
doReturn(plan).when(runner).getExecutionPlan(any(), any());
Throwable t = new Throwable("test failure");
StreamProcessor sp = mock(StreamProcessor.class);
ArgumentCaptor<StreamProcessorLifecycleListener> captor =
ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
doAnswer(i ->
{
StreamProcessorLifecycleListener listener = captor.getValue();
listener.onFailure(t);
return null;
}).when(sp).start();
doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());
try {
runner.run(app);
} catch (Throwable th) {
assertNotNull(th);
}
assertEquals(runner.status(app), ApplicationStatus.UnsuccessfulFinish);
}
开发者ID:apache,
项目名称:samza,
代码行数:38,
代码来源:TestLocalApplicationRunner.java
示例34: testgetStreamWithSystemAtDefaultScopeInConfig
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testgetStreamWithSystemAtDefaultScopeInConfig() {
Config config = addConfigs(buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
StreamSpec spec = runner.getStreamSpec(STREAM_ID);
assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
}
开发者ID:apache,
项目名称:samza,
代码行数:12,
代码来源:TestAbstractApplicationRunner.java
示例35: testgetStreamWithSystemAtBothScopesInConfig
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
@Test
public void testgetStreamWithSystemAtBothScopesInConfig() {
Config config = addConfigs(buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
StreamConfig.SYSTEM(), TEST_SYSTEM),
JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
StreamSpec spec = runner.getStreamSpec(STREAM_ID);
assertEquals(TEST_SYSTEM, spec.getSystemName());
}
开发者ID:apache,
项目名称:samza,
代码行数:13,
代码来源:TestAbstractApplicationRunner.java
示例36: findJobInstances
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
* Finds all the job instances in the specified path and adds a corresponding {@link JobInstance} and
* {@link InstallationRecord} for each instance.
*
* @param jobInstallPath the path to search for job instances.
* @param jobs the map to which the job instances will be added.
*/
private void findJobInstances(final File jobInstallPath, final Map<JobInstance, InstallationRecord> jobs) {
try {
String jobInstallCanonPath = jobInstallPath.getCanonicalPath();
File configPath = Paths.get(jobInstallCanonPath, CFG_SUBPATH).toFile();
if (!(configPath.exists() && configPath.isDirectory())) {
log.debug("Config path not found: " + configPath);
return;
}
for (File configFile : configPath.listFiles()) {
if (configFile.isFile()) {
String configFilePath = configFile.getCanonicalPath();
Config config = jobConfigFactory.getConfig(new URI("file://" + configFilePath));
if (config.containsKey(JobConfig.JOB_NAME()) && config.containsKey(JobConfig.STREAM_JOB_FACTORY_CLASS())) {
String jobName = config.get(JobConfig.JOB_NAME());
String jobId = config.get(JobConfig.JOB_ID(), "1");
JobInstance jobInstance = new JobInstance(jobName, jobId);
if (jobs.containsKey(jobInstance)) {
throw new IllegalStateException(
String.format("Found more than one job config with jobName:%s and jobId:%s", jobName, jobId));
}
InstallationRecord jobInstall =
new InstallationRecord(jobName, jobId, jobInstallCanonPath, configFilePath, getBinPath(jobInstallCanonPath));
jobs.put(jobInstance, jobInstall);
}
}
}
} catch (Exception e) {
throw new SamzaException("Exception finding job instance in path: " + jobInstallPath, e);
}
}
开发者ID:apache,
项目名称:samza,
代码行数:44,
代码来源:SimpleInstallationFinder.java
示例37: getCoordinatorSystemConfig
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
/**
* Builds coordinator system config for the {@param jobInstance}.
* @param jobInstance the job instance to get the jobModel for.
* @return the constructed coordinator system config.
*/
private Config getCoordinatorSystemConfig(JobInstance jobInstance) {
try {
InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance);
ConfigFactory configFactory = ClassLoaderHelper.fromClassName(taskResourceConfig.getJobConfigFactory());
Config config = configFactory.getConfig(new URI(String.format("file://%s", record.getConfigFilePath())));
Map<String, String> configMap = ImmutableMap.of(JobConfig.JOB_ID(), jobInstance.getJobId(),
JobConfig.JOB_NAME(), jobInstance.getJobName());
return Util.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(config, configMap)));
} catch (Exception e) {
LOG.error(String.format("Failed to get coordinator stream config for job : %s", jobInstance), e);
throw new SamzaException(e);
}
}
开发者ID:apache,
项目名称:samza,
代码行数:19,
代码来源:SamzaTaskProxy.java
示例38: createConfigs
点赞 2
import org.apache.samza.config.JobConfig; //导入依赖的package包/类
protected Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic,
int messageCount) {
Map<String, String> configs = new HashMap<>();
configs.putAll(StandaloneTestUtils
.getStandaloneConfigs("test-job", "org.apache.samza.processor.TestZkStreamProcessor.TestStreamTask"));
configs.putAll(StandaloneTestUtils
.getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING,
true));
configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
configs.put("app.messageCount", String.valueOf(messageCount));
configs.put("app.outputTopic", outputTopic);
configs.put("app.outputSystem", testSystem);
configs.put(ZkConfig.ZK_CONNECT, zkConnect());
configs.put("job.systemstreampartition.grouper.factory",
"org.apache.samza.container.grouper.stream.GroupByPartitionFactory");
configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.zk.ZkJobCoordinatorFactory");
configs.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
configs.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
configs.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
configs.put(ZkConfig.ZK_SESSION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS);
configs.put(ZkConfig.ZK_CONNECTION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS);
return configs;
}
开发者ID:apache,
项目名称:samza,
代码行数:28,
代码来源:TestZkStreamProcessorBase.java