• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

Java QueryConsumerOffsetRequestHeader类的典型用法和代码示例

java 2次浏览

本文整理汇总了Java中com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader的典型用法代码示例。如果您正苦于以下问题:Java QueryConsumerOffsetRequestHeader类的具体用法?Java QueryConsumerOffsetRequestHeader怎么用?Java QueryConsumerOffsetRequestHeader使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。

QueryConsumerOffsetRequestHeader类属于com.alibaba.rocketmq.common.protocol.header包,在下文中一共展示了QueryConsumerOffsetRequestHeader类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: fetchConsumeOffsetFromBroker

点赞 3

import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; //导入依赖的package包/类
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        // TODO Here may be heavily overhead for Name Server,need tuning
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());

        return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}
 

开发者ID:y123456yz,
项目名称:reading-and-annotate-rocketmq-3.4.6,
代码行数:22,
代码来源:RemoteBrokerOffsetStore.java

示例2: fetchConsumeOffsetFromBroker

点赞 3

import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; //导入依赖的package包/类
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
        InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        // TODO 此处可能对Name Server压力过大,需要调优
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());

        return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
            findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
    }
    else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:23,
代码来源:RemoteBrokerOffsetStore.java

示例3: queryConsumerOffset

点赞 2

import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; //导入依赖的package包/类
/**
 * 查询Consumer消费进度
 */
public long queryConsumerOffset(//
        final String addr,//
        final QueryConsumerOffsetRequestHeader requestHeader,//
        final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
    // 添加虚拟运行环境相关的projectGroupPrefix
    if (!UtilAll.isBlank(projectGroupPrefix)) {
        requestHeader.setConsumerGroup(VirtualEnvUtil.buildWithProjectGroup(
            requestHeader.getConsumerGroup(), projectGroupPrefix));
        requestHeader.setTopic(VirtualEnvUtil.buildWithProjectGroup(requestHeader.getTopic(),
            projectGroupPrefix));
    }

    RemotingCommand request =
            RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
    case ResponseCode.SUCCESS: {
        QueryConsumerOffsetResponseHeader responseHeader =
                (QueryConsumerOffsetResponseHeader) response
                    .decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);

        return responseHeader.getOffset();
    }
    default:
        break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:36,
代码来源:MQClientAPIImpl.java

示例4: queryConsumerOffset

点赞 2

import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; //导入依赖的package包/类
/**
 * 查询Consumer消费进度
 */
public long queryConsumerOffset(//
        final String addr,//
        final QueryConsumerOffsetRequestHeader requestHeader,//
        final long timeoutMillis//
) throws RemotingException, MQBrokerException, InterruptedException {
    // 添加虚拟运行环境相关的projectGroupPrefix
    if (!UtilAll.isBlank(projectGroupPrefix)) {
        requestHeader.setConsumerGroup(VirtualEnvUtil.buildWithProjectGroup(
            requestHeader.getConsumerGroup(), projectGroupPrefix));
        requestHeader.setTopic(VirtualEnvUtil.buildWithProjectGroup(requestHeader.getTopic(),
            projectGroupPrefix));
    }

    RemotingCommand request =
            RemotingCommand
                .createRequestCommand(MQRequestCode.QUERY_CONSUMER_OFFSET_VALUE, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
    case ResponseCode.SUCCESS_VALUE: {
        QueryConsumerOffsetResponseHeader responseHeader =
                (QueryConsumerOffsetResponseHeader) response
                    .decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);

        return responseHeader.getOffset();
    }
    default:
        break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
 

开发者ID:brucechan0921,
项目名称:RocketMQ-3.0.8,
代码行数:36,
代码来源:MQClientAPIImpl.java

示例5: queryConsumerOffset

点赞 2

import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; //导入依赖的package包/类
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
    final RemotingCommand response =
            RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
    final QueryConsumerOffsetResponseHeader responseHeader =
            (QueryConsumerOffsetResponseHeader) response.getCustomHeader();
    final QueryConsumerOffsetRequestHeader requestHeader =
            (QueryConsumerOffsetRequestHeader) request
                .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

    long offset =
            this.brokerController.getConsumerOffsetManager().queryOffset(
                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());

    // 订阅组存在
    if (offset >= 0) {
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
    }
    // 订阅组不存在
    else {
        long minOffset =
                this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
                    requestHeader.getQueueId());
        // 订阅组不存在情况下,如果这个队列的消息最小Offset是0,则表示这个Topic上线时间不长,服务器堆积的数据也不多,那么这个订阅组就从0开始消费。
        // 尤其对于Topic队列数动态扩容时,必须要从0开始消费。
        if (minOffset <= 0) {
            responseHeader.setOffset(0L);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        }
        // 新版本服务器不做消费进度纠正
        else {
            response.setCode(ResponseCode.QUERY_NOT_FOUND);
            response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
        }
    }

    return response;
}
 

开发者ID:diwayou,
项目名称:rocketmq-all-trans,
代码行数:42,
代码来源:AdminBrokerProcessor.java


版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)