• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Java MessageStoreConfig类的典型用法和代码示例

java 1次浏览

本文整理汇总了Java中com.alibaba.rocketmq.store.config.MessageStoreConfig的典型用法代码示例。如果您正苦于以下问题:Java MessageStoreConfig类的具体用法?Java MessageStoreConfig怎么用?Java MessageStoreConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。

MessageStoreConfig类属于com.alibaba.rocketmq.store.config包,在下文中一共展示了MessageStoreConfig类的18个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: DefaultMessageStore

点赞 3

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
                           final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    this.messageArrivingListener = messageArrivingListener;
    this.brokerConfig = brokerConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.brokerStatsManager = brokerStatsManager;
    this.allocateMapedFileService = new AllocateMapedFileService(this);
    this.commitLog = new CommitLog(this);
    this.consumeQueueTable = new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>(32);

    this.flushConsumeQueueService = new FlushConsumeQueueService();
    this.cleanCommitLogService = new CleanCommitLogService();
    this.cleanConsumeQueueService = new CleanConsumeQueueService();
    this.storeStatsService = new StoreStatsService();
    this.indexService = new IndexService(this);
    this.haService = new HAService(this);

    this.reputMessageService = new ReputMessageService();
    this.scheduleMessageService = new ScheduleMessageService(this);

    this.allocateMapedFileService.start();
    this.indexService.start();
}
 

开发者ID:medusar,
项目名称:rocketmq-commet,
代码行数:24,
代码来源:DefaultMessageStore.java

示例2: main

点赞 3

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public static void main(String[] args) {
    try {
        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);

        final MessageStoreTestObject storeTestObject = new MessageStoreTestObject(messageStoreConfig);

        storeTestObject.updateMasterAddress("10.235.170.21:10912");

        if (!storeTestObject.load()) {
            System.out.println("load store failed");
            System.exit(-1);
        }

        storeTestObject.start();

        System.out.println("start OK, " + messageStoreConfig.getBrokerRole());
    }
    catch (Exception e) {
        e.printStackTrace();
        System.exit(-1);
    }
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:24,
代码来源:BrokerSlave.java

示例3: BrokerController

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public BrokerController(//
        final BrokerConfig brokerConfig, //
        final NettyServerConfig nettyServerConfig, //
        final NettyClientConfig nettyClientConfig, //
        final MessageStoreConfig messageStoreConfig //
) {
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    this.topicConfigManager = new TopicConfigManager(this);
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    this.producerManager = new ProducerManager();
    this.clientHousekeepingService = new ClientHousekeepingService(this);
    this.broker2Client = new Broker2Client(this);
    this.subscriptionGroupManager = new SubscriptionGroupManager(this);
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    this.filterServerManager = new FilterServerManager(this);

    if (this.brokerConfig.getNamesrvAddr() != null) {
        this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
        log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr());
    }

    this.slaveSynchronize = new SlaveSynchronize(this);

    this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());

    this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());

    this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
    this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:39,
代码来源:BrokerController.java

示例4: test_flushConsumerOffset

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
@Test
public void test_flushConsumerOffset() throws Exception {
    BrokerController brokerController = new BrokerController(//
        new BrokerConfig(), //
        new NettyServerConfig(), //
        new NettyClientConfig(), //
        new MessageStoreConfig());
    boolean initResult = brokerController.initialize();
    System.out.println("initialize " + initResult);
    brokerController.start();

    ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);

    Random random = new Random();

    for (int i = 0; i < 100; i++) {
        String group = "DIANPU_GROUP_" + i;
        for (int id = 0; id < 16; id++) {
            consumerOffsetManager.commitOffset(group, "TOPIC_A", id,
                random.nextLong() % 1024 * 1024 * 1024);
            consumerOffsetManager.commitOffset(group, "TOPIC_B", id,
                random.nextLong() % 1024 * 1024 * 1024);
            consumerOffsetManager.commitOffset(group, "TOPIC_C", id,
                random.nextLong() % 1024 * 1024 * 1024);
        }
    }

    consumerOffsetManager.persist();

    brokerController.shutdown();
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:32,
代码来源:ConsumerOffsetManagerTest.java

示例5: test_flushTopicConfig

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
@Test
public void test_flushTopicConfig() throws Exception {
    BrokerController brokerController = new BrokerController(//
        new BrokerConfig(), //
        new NettyServerConfig(), //
        new NettyClientConfig(), //
        new MessageStoreConfig());
    boolean initResult = brokerController.initialize();
    System.out.println("initialize " + initResult);
    brokerController.start();

    TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);

    TopicConfig topicConfig =
            topicConfigManager.createTopicInSendMessageMethod("TestTopic_SEND", MixAll.DEFAULT_TOPIC,
                null, 4, 0);
    assertTrue(topicConfig != null);

    System.out.println(topicConfig);

    for (int i = 0; i < 10; i++) {
        String topic = "UNITTEST-" + i;
        topicConfig =
                topicConfigManager
                    .createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4, 0);
        assertTrue(topicConfig != null);
    }

    topicConfigManager.persist();

    brokerController.shutdown();
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:33,
代码来源:TopicConfigManagerTest.java

示例6: DefaultMessageStore

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
        final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
    this.messageArrivingListener = messageArrivingListener;
    this.brokerConfig = brokerConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.brokerStatsManager = brokerStatsManager;
    this.allocateMapedFileService = new AllocateMapedFileService(this);
    this.commitLog = new CommitLog(this);
    this.consumeQueueTable = new ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>>(32);

    this.flushConsumeQueueService = new FlushConsumeQueueService();

    //磁盘超过水位进行清理
    this.cleanCommitLogService = new CleanCommitLogService();
    this.cleanConsumeQueueService = new CleanConsumeQueueService();

    this.storeStatsService = new StoreStatsService();
    this.indexService = new IndexService(this);
    this.haService = new HAService(this);

    //异步构建CQ和索引文件的服务。
    this.reputMessageService = new ReputMessageService();
    //延迟消息投递服务
    this.scheduleMessageService = new ScheduleMessageService(this);

    this.allocateMapedFileService.start();
    this.indexService.start();
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:29,
代码来源:DefaultMessageStore.java

示例7: writeMessage

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public void writeMessage(boolean normal, boolean first) throws Exception {
    System.out.println("================================================================");
    long totalMsgs = 1000;
    QUEUE_TOTAL = 3;

    MessageBody = StoreMessage.getBytes();

    MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
    messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
    messageStoreConfig.setMessageIndexEnable(false);

    MessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null);
    if (first) {
        this.storeWrite1 = messageStore;
    }
    else {
        this.storeWrite2 = messageStore;
    }

    boolean loadResult = messageStore.load();
    assertTrue(loadResult);

    messageStore.start();

    for (long i = 0; i < totalMsgs; i++) {

        PutMessageResult result = messageStore.putMessage(buildMessage());

        System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
    }

    if (normal) {
        messageStore.shutdown();
    }

    System.out.println("========================writeMessage OK========================================");
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:39,
代码来源:RecoverTest.java

示例8: readMessage

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public void readMessage(final long msgCnt) throws Exception {
    System.out.println("================================================================");
    QUEUE_TOTAL = 3;

    MessageBody = StoreMessage.getBytes();

    MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
    messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
    messageStoreConfig.setMessageIndexEnable(false);

    storeRead = new DefaultMessageStore(messageStoreConfig, null, null, null);
    boolean loadResult = storeRead.load();
    assertTrue(loadResult);

    storeRead.start();

    long readCnt = 0;
    for (int queueId = 0; queueId < QUEUE_TOTAL; queueId++) {
        for (long offset = 0;;) {
            GetMessageResult result = storeRead.getMessage("GROUP_A", "TOPIC_A", queueId, offset, 1024 * 1024, null);
            if (result.getStatus() == GetMessageStatus.FOUND) {
                System.out.println(queueId + "\t" + result.getMessageCount());
                this.veryReadMessage(queueId, offset, result.getMessageBufferList());
                offset += result.getMessageCount();
                readCnt += result.getMessageCount();
                result.release();
            }
            else {
                break;
            }
        }
    }

    System.out.println("readCnt = " + readCnt);
    assertTrue(readCnt == msgCnt);

    System.out.println("========================readMessage OK========================================");
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:40,
代码来源:RecoverTest.java

示例9: BrokerController

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public BrokerController(//
        final BrokerConfig brokerConfig, //
        final NettyServerConfig nettyServerConfig, //
        final NettyClientConfig nettyClientConfig, //
        final MessageStoreConfig messageStoreConfig //
) {
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    this.topicConfigManager = new TopicConfigManager(this);
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    this.producerManager = new ProducerManager();
    this.clientHousekeepingService = new ClientHousekeepingService(this);
    this.defaultTransactionCheckExecuter = new DefaultTransactionCheckExecuter(this);
    this.broker2Client = new Broker2Client(this);
    this.subscriptionGroupManager = new SubscriptionGroupManager(this);
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    this.filterServerManager = new FilterServerManager(this);

    if (this.brokerConfig.getNamesrvAddr() != null) {
        this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
        log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr());
    }

    this.slaveSynchronize = new SlaveSynchronize(this);

    this.sendThreadPoolQueue =
            new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());

    this.pullThreadPoolQueue =
            new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());

    this.brokerStatsManager = new BrokerStatsManager();
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:40,
代码来源:BrokerController.java

示例10: test_flushTopicConfig

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
@Test
public void test_flushTopicConfig() throws Exception {
    BrokerController brokerController = new BrokerController(//
        new BrokerConfig(), //
        new NettyServerConfig(), //
        new NettyClientConfig(), //
        new MessageStoreConfig());
    boolean initResult = brokerController.initialize();
    System.out.println("initialize " + initResult);
    brokerController.start();

    TopicConfigManager topicConfigManager = new TopicConfigManager(brokerController);

    TopicConfig topicConfig =
            topicConfigManager.createTopicInSendMessageMethod("TestTopic_SEND", MixAll.DEFAULT_TOPIC,
                null, 4);
    assertTrue(topicConfig != null);

    System.out.println(topicConfig);

    for (int i = 0; i < 10; i++) {
        String topic = "UNITTEST-" + i;
        topicConfig =
                topicConfigManager.createTopicInSendMessageMethod(topic, MixAll.DEFAULT_TOPIC, null, 4);
        assertTrue(topicConfig != null);
    }

    topicConfigManager.persist();

    brokerController.shutdown();
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:32,
代码来源:TopicConfigManagerTest.java

示例11: BrokerController

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public BrokerController(//
        final BrokerConfig brokerConfig, //
        final NettyServerConfig nettyServerConfig, //
        final NettyClientConfig nettyClientConfig, //
        final MessageStoreConfig messageStoreConfig //
) {
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    this.topicConfigManager = new TopicConfigManager(this);
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    this.producerManager = new ProducerManager();
    this.clientHousekeepingService = new ClientHousekeepingService(this);
    this.defaultTransactionCheckExecuter = new DefaultTransactionCheckExecuter(this);
    this.broker2Client = new Broker2Client(this);
    this.subscriptionGroupManager = new SubscriptionGroupManager(this);
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);

    if (this.brokerConfig.getNamesrvAddr() != null) {
        this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
        log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr());
    }

    this.slaveSynchronize = new SlaveSynchronize(this);
    this.digestLogManager = new DigestLogManager(this);

    this.sendThreadPoolQueue =
            new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());

    this.pullThreadPoolQueue =
            new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
}
 

开发者ID:brucechan0921,
项目名称:RocketMQ-3.0.8,
代码行数:38,
代码来源:BrokerController.java

示例12: getMessageStoreConfig

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public MessageStoreConfig getMessageStoreConfig() {
    return messageStoreConfig;
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:4,
代码来源:BrokerController.java

示例13: test_sendMessage

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
@Test
public void test_sendMessage() throws Exception {
    BrokerController brokerController = new BrokerController(//
        new BrokerConfig(), //
        new NettyServerConfig(), //
        new NettyClientConfig(), //
        new MessageStoreConfig());
    boolean initResult = brokerController.initialize();
    System.out.println("initialize " + initResult);

    brokerController.start();

    MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, null);
    client.start();

    for (int i = 0; i < 100000; i++) {
        String topic = "UnitTestTopic_" + i % 3;
        Message msg = new Message(topic, "TAG1 TAG2", "100200300", ("Hello, Nice world\t" + i).getBytes());
        msg.setDelayTimeLevel(i % 3 + 1);

        try {
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup("abc");
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
            requestHeader.setDefaultTopicQueueNums(4);
            requestHeader.setQueueId(i % 4);
            requestHeader.setSysFlag(0);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

            SendResult result =
                    client.sendMessage("127.0.0.1:10911", "brokerName", msg, requestHeader, 1000 * 5, CommunicationMode.SYNC, null);
            System.out.println(i + "\t" + result);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    client.shutdown();

    brokerController.shutdown();
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:46,
代码来源:SendMessageTest.java

示例14: MessageStoreTestObject

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public MessageStoreTestObject(final MessageStoreConfig messageStoreConfig) throws IOException {
    this.storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
    this.bornHost = new InetSocketAddress(InetAddress.getByName("10.232.102.184"), 0);
    this.messageStore = new DefaultMessageStore(messageStoreConfig);
    this.messageBody = this.buildMessageBody();
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:7,
代码来源:MessageStoreTestObject.java

示例15: test_sendMessage

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
@Test
public void test_sendMessage() throws Exception {
    BrokerController brokerController = new BrokerController(//
        new BrokerConfig(), //
        new NettyServerConfig(), //
        new NettyClientConfig(), //
        new MessageStoreConfig());
    boolean initResult = brokerController.initialize();
    System.out.println("initialize " + initResult);

    brokerController.start();

    MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null);
    client.start();

    for (int i = 0; i < 100000; i++) {
        String topic = "UnitTestTopic_" + i % 3;
        Message msg =
                new Message(topic, "TAG1 TAG2", "100200300", ("Hello, Nice world\t" + i).getBytes());
        msg.setDelayTimeLevel(i % 3 + 1);

        try {
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup("abc");
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
            requestHeader.setDefaultTopicQueueNums(4);
            requestHeader.setQueueId(i % 4);
            requestHeader.setSysFlag(0);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

            SendResult result =
                    client.sendMessage("127.0.0.1:10911", "brokerName", msg, requestHeader, 1000 * 5,
                        CommunicationMode.SYNC, null);
            System.out.println(i + "\t" + result);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    client.shutdown();

    brokerController.shutdown();
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:48,
代码来源:SendMessageTest.java

示例16: DefaultMessageStore

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig) throws IOException {
    this(messageStoreConfig, null);
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:4,
代码来源:DefaultMessageStore.java

示例17: writeMessage

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public void writeMessage(boolean normal, boolean first) throws Exception {
    System.out.println("================================================================");
    long totalMsgs = 1000;
    QUEUE_TOTAL = 3;

    // 构造消息体
    MessageBody = StoreMessage.getBytes();

    MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    // 每个物理映射文件
    messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
    // 每个逻辑映射文件
    messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
    messageStoreConfig.setMessageIndexEnable(false);

    MessageStore messageStore = new DefaultMessageStore(messageStoreConfig);
    if (first) {
        this.storeWrite1 = messageStore;
    }
    else {
        this.storeWrite2 = messageStore;
    }

    // 第一步,load已有数据
    boolean loadResult = messageStore.load();
    assertTrue(loadResult);

    // 第二步,启动服务
    messageStore.start();

    // 第三步,发消息
    for (long i = 0; i < totalMsgs; i++) {

        PutMessageResult result = messageStore.putMessage(buildMessage());

        System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
    }

    if (normal) {
        // 关闭存储服务
        messageStore.shutdown();
    }

    System.out.println("========================writeMessage OK========================================");
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:46,
代码来源:RecoverTest.java

示例18: readMessage

点赞 2

import com.alibaba.rocketmq.store.config.MessageStoreConfig; //导入依赖的package包/类
public void readMessage(final long msgCnt) throws Exception {
    System.out.println("================================================================");
    QUEUE_TOTAL = 3;

    // 构造消息体
    MessageBody = StoreMessage.getBytes();

    MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
    // 每个物理映射文件
    messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
    // 每个逻辑映射文件
    messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
    messageStoreConfig.setMessageIndexEnable(false);

    storeRead = new DefaultMessageStore(messageStoreConfig);
    // 第一步,load已有数据
    boolean loadResult = storeRead.load();
    assertTrue(loadResult);

    // 第二步,启动服务
    storeRead.start();

    // 第三步,收消息
    long readCnt = 0;
    for (int queueId = 0; queueId < QUEUE_TOTAL; queueId++) {
        for (long offset = 0;;) {
            GetMessageResult result = storeRead.getMessage("TOPIC_A", queueId, offset, 1024 * 1024, null);
            if (result.getStatus() == GetMessageStatus.FOUND) {
                System.out.println(queueId + "\t" + result.getMessageCount());
                this.veryReadMessage(queueId, offset, result.getMessageBufferList());
                offset += result.getMessageCount();
                readCnt += result.getMessageCount();
                result.release();
            }
            else {
                break;
            }
        }
    }

    System.out.println("readCnt = " + readCnt);
    assertTrue(readCnt == msgCnt);

    System.out.println("========================readMessage OK========================================");
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:46,
代码来源:RecoverTest.java


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)