本文整理汇总了Java中akka.cluster.singleton.ClusterSingletonManager类的典型用法代码示例。如果您正苦于以下问题:Java ClusterSingletonManager类的具体用法?Java ClusterSingletonManager怎么用?Java ClusterSingletonManager使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
ClusterSingletonManager类属于akka.cluster.singleton包,在下文中一共展示了ClusterSingletonManager类的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: startAll
点赞 3
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
/**
* Starts a DataCenterForwarder for each of the known data centers in the {@link DataCenterRepository}.
* @param system Actor system to create the DataCenterForwarder actors in
* @param dataRepo Repository that knows about all data centers
* @param materializer Akka streams materializer to use
* @param visibilityRepo Repository that stores the current visiblity of aggregates
* @param eventRepo Classifier that determines which additional datacenters an event should trigger replication for
* @param eventsByTagQuery Query to use to find a continuous stream of all events
* @param tag Tag to pass to {@link EventsByTagQuery} (all events must be tagged by this)
* @param currentEventsByPersistenceIdQuery Query to find all current events for a specific persistenceId
*/
public static <E> void startAll(ActorSystem system, Materializer materializer, DataCenterRepository dataRepo, VisibilityRepository visibilityRepo, Class<E> eventType,
EventsByTagQuery eventsByTagQuery, CurrentEventsByPersistenceIdQuery currentEventsByPersistenceIdQuery) {
String tag = Replication.get(system).getEventTag(eventType);
for (DataCenter dataCenter: dataRepo.getRemotes().values()) {
system.actorOf(ClusterSingletonManager.props(
BackoffSupervisor.props(
Backoff.onFailure(
Props.create(DataCenterForwarder.class, () -> new DataCenterForwarder<>(materializer, dataCenter, visibilityRepo, eventType,
eventsByTagQuery, currentEventsByPersistenceIdQuery)),
"f",
Duration.create(1, TimeUnit.SECONDS),
Duration.create(1, TimeUnit.SECONDS), // TODO make these 3 configurable
0.2)
),
Done.getInstance(),
ClusterSingletonManagerSettings.create(system).withSingletonName("s")), "forwarder_" + dataCenter.getName() + "_" + tag);
}
}
开发者ID:Tradeshift,
项目名称:ts-reaktive,
代码行数:32,
代码来源:DataCenterForwarder.java
示例2: startBackend
点赞 3
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
public static void startBackend(int port, String role) {
Config conf = ConfigFactory.parseString("akka.cluster.roles=[" + role + "]").
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)).
withFallback(ConfigFactory.load());
ActorSystem system = ActorSystem.create("ClusterSystem", conf);
startupSharedJournal(system, (port == 2551),
ActorPaths.fromString("akka.tcp://[email protected]:2551/user/store"));
system.actorOf(
ClusterSingletonManager.props(
Master.props(workTimeout),
PoisonPill.getInstance(),
ClusterSingletonManagerSettings.create(system).withRole(role)
),
"master");
}
开发者ID:typesafehub,
项目名称:activator-akka-distributed-workers-java,
代码行数:19,
代码来源:Main.java
示例3: produceRouter
点赞 3
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
@Override
public void produceRouter(ActorSystem system, String role) {
ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create( system ).withRole( "io" );
system.actorOf( ClusterSingletonManager.props(
Props.create( GuiceActorProducer.class, QueueActorRouter.class ),
PoisonPill.getInstance(), settings ), "queueActorRouter" );
ClusterSingletonProxySettings proxySettings =
ClusterSingletonProxySettings.create( system ).withRole( role );
system.actorOf(
ClusterSingletonProxy.props( "/user/queueActorRouter", proxySettings ), "queueActorRouterProxy" );
}
开发者ID:apache,
项目名称:usergrid,
代码行数:17,
代码来源:QueueActorRouterProducer.java
示例4: produceRouter
点赞 3
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
@Override
public void produceRouter(ActorSystem system, String role) {
ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create( system ).withRole( "io" );
system.actorOf( ClusterSingletonManager.props(
Props.create( GuiceActorProducer.class, QueueWriterRouter.class ),
PoisonPill.getInstance(), settings ), "queueWriterRouter" );
ClusterSingletonProxySettings proxySettings =
ClusterSingletonProxySettings.create( system ).withRole( role );
system.actorOf(
ClusterSingletonProxy.props( "/user/queueWriterRouter", proxySettings ), "queueWriterRouterProxy" );
}
开发者ID:apache,
项目名称:usergrid,
代码行数:17,
代码来源:QueueWriterRouterProducer.java
示例5: produceRouter
点赞 3
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
@Override
public void produceRouter(ActorSystem system, String role) {
ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create( system ).withRole( "io" );
system.actorOf( ClusterSingletonManager.props(
Props.create( GuiceActorProducer.class, QueueSenderRouter.class ),
PoisonPill.getInstance(), settings ), "queueSenderRouter" );
ClusterSingletonProxySettings proxySettings =
ClusterSingletonProxySettings.create( system ).withRole( role );
system.actorOf(
ClusterSingletonProxy.props( "/user/queueSenderRouter", proxySettings ), "queueSenderRouterProxy" );
}
开发者ID:apache,
项目名称:usergrid,
代码行数:17,
代码来源:QueueSenderRouterProducer.java
示例6: produceRouter
点赞 3
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
@Override
public void produceRouter( ActorSystem system, String role ) {
ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create( system ).withRole("io");
system.actorOf( ClusterSingletonManager.props(
Props.create( GuiceActorProducer.class, UniqueValuesRouter.class ),
PoisonPill.getInstance(), settings ), "uvRouter" );
ClusterSingletonProxySettings proxySettings =
ClusterSingletonProxySettings.create( system ).withRole( role );
system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", proxySettings ), "uvProxy" );
subscribeToReservations( system );
}
开发者ID:apache,
项目名称:usergrid,
代码行数:18,
代码来源:UniqueValuesServiceImpl.java
示例7: getMaster
点赞 2
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
/**
* Creates the master actor using cluster singleton manager in the specified actor system
* @param port
* @param role
* @param isPrimary
* @param system
* @param agentConfig
* @param ip
* @return
*/
public static ActorRef getMaster(int port,String role, boolean isPrimary, ActorSystem system, AgentConfig agentConfig,String ip) {
String journalPath = String.format("akka.tcp://%[email protected]%s:%s/user/store", Constants.PerformanceSystem, ip ,port);
startupSharedJournal(system, isPrimary, ActorPath$.MODULE$.fromString(journalPath));
FiniteDuration workTimeout = Duration.create(120, "seconds");
final ClusterSingletonManagerSettings settings =
ClusterSingletonManagerSettings.create(system).withRole(role);
ActorRef ref = system.actorOf(
ClusterSingletonManager.props(Master.props(workTimeout,agentConfig), PoisonPill.getInstance(), settings),
"master");
return ref;
}
开发者ID:Abiy,
项目名称:distGatling,
代码行数:23,
代码来源:ClusterFactory.java
示例8: start
点赞 2
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
/**
* Starts a ClusterSingletonManager + BackoffSupervisor that manages a continous backup to S3, restarting
* and resuming automatically on crash or failure.
*
* @param system The actor system to create the singleton for
* @param query The akka persistence query journal to read events from
* @param tag Tag to pass to the above query
* @param s3 Service interface to communicate with S3
*/
public static void start(ActorSystem system, EventsByTagQuery query, String tag, S3 s3) {
system.actorOf(ClusterSingletonManager.props(
BackoffSupervisor.props(
Backoff.onFailure(
Props.create(S3Backup.class, () -> new S3Backup(query, tag, s3)),
"a",
Duration.create(1, TimeUnit.SECONDS),
Duration.create(1, TimeUnit.SECONDS), // TODO make these 3 configurable
0.2)
),
Done.getInstance(),
ClusterSingletonManagerSettings.create(system).withSingletonName("s")), "s3backup");
}
开发者ID:Tradeshift,
项目名称:ts-reaktive,
代码行数:23,
代码来源:S3Backup.java
示例9: initialize
点赞 2
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
@Override
public void initialize() {
ActorSystem actorSystem = actorSystemManager.retrieveActorSystem();
//create cluster singleton actor of {@link AkkaEventSchedulerService}
Props actorProps = Props.create(AkkaEventSchedulerService.class, eventSchedulerService);
ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(actorSystem);
actorSystem.actorOf(ClusterSingletonManager.props(actorProps, PoisonPill.getInstance(), settings), "eventSchedulerServiceActor");
}
开发者ID:flipkart-incubator,
项目名称:flux,
代码行数:10,
代码来源:EventSchedulerRegistryImpl.java
示例10: initialize
点赞 2
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
/**
* Initialize the singleton actor wrapping redriverService.
* @see AkkaRedriverService
*/
@Override
public void initialize() {
ActorSystem actorSystem = actorSystemManager.retrieveActorSystem();
Props actorProps = Props.create(AkkaRedriverService.class, redriverService);
ClusterSingletonManagerSettings settings = ClusterSingletonManagerSettings.create(actorSystem);
actorSystem.actorOf(ClusterSingletonManager.props(actorProps, PoisonPill.getInstance(), settings), "redriverServiceActor");
}
开发者ID:flipkart-incubator,
项目名称:flux,
代码行数:12,
代码来源:RedriverRegistryImpl.java
示例11: testWorkers
点赞 2
import akka.cluster.singleton.ClusterSingletonManager; //导入依赖的package包/类
@Test
public void testWorkers() throws Exception {
new JavaTestKit(system) {{
TestProbe clusterProbe = new TestProbe(system);
Cluster.get(system).subscribe(clusterProbe.ref(), ClusterEvent.MemberUp.class);
clusterProbe.expectMsgClass(ClusterEvent.CurrentClusterState.class);
Address clusterAddress = Cluster.get(system).selfAddress();
Cluster.get(system).join(clusterAddress);
clusterProbe.expectMsgClass(ClusterEvent.MemberUp.class);
system.actorOf(
ClusterSingletonManager.props(
Master.props(workTimeout),
PoisonPill.getInstance(),
ClusterSingletonManagerSettings.create(system).withRole("backend")
),
"master");
Set<ActorPath> initialContacts = new HashSet<>();
initialContacts.add(ActorPaths.fromString(clusterAddress + "/system/receptionist"));
ActorRef clusterClient = system.actorOf(
ClusterClient.props(ClusterClientSettings.create(system).withInitialContacts(initialContacts)),
"clusterClient");
for (int n = 1; n <= 3; n += 1) {
system.actorOf(Worker.props(clusterClient,
Props.create(WorkExecutor.class), registerInterval), "worker-" + n);
}
ActorRef flakyWorker = system.actorOf(Worker.props(clusterClient,
Props.create(FlakyWorkExecutor.class), registerInterval), "flaky-worker");
final ActorRef frontend = system.actorOf(Props.create(Frontend.class), "frontend");
final JavaTestKit results = new JavaTestKit(system);
DistributedPubSub.get(system).mediator().tell(
new DistributedPubSubMediator.Subscribe(Master.ResultsTopic, results.getRef()),
getRef());
expectMsgClass(DistributedPubSubMediator.SubscribeAck.class);
// might take a while for things to get connected
new AwaitAssert(duration("10 seconds")) {
protected void check() {
frontend.tell(new Master.Work("1", 1), getRef());
expectMsgEquals(Frontend.Ok.getInstance());
}
};
assertEquals(results.expectMsgClass(Master.WorkResult.class).workId, "1");
for (int n = 2; n <= 100; n += 1) {
frontend.tell(new Master.Work(Integer.toString(n), n), getRef());
expectMsgEquals(Frontend.Ok.getInstance());
}
results.new Within(duration("10 seconds")) {
public void run() {
Object[] messages = results.receiveN(99);
SortedSet<Integer> set = new TreeSet<Integer>();
for (Object m: messages) {
set.add(Integer.parseInt(((Master.WorkResult) m).workId));
}
// nothing lost, and no duplicates
Iterator<Integer> iterator = set.iterator();
for (int n = 2; n <= 100; n += 1) {
assertEquals(n, iterator.next().intValue());
}
}
};
}};
}
开发者ID:typesafehub,
项目名称:activator-akka-distributed-workers-java,
代码行数:79,
代码来源:DistributedWorkerTest.java