相关文章推荐
英勇无比的柿子  ·  Microsoft Entra ...·  1 年前    · 
强悍的红烧肉  ·  C# ...·  1 年前    · 
霸气的毛衣  ·  vue学习 五 ...·  2 年前    · 
通过消费组消费日志

通过消费组消费日志

当您使用第三方软件、多语言应用、云产品、流式计算框架等通过 SDK 实时消费日志服务的数据时,SDK 消费无法满足日志服务的实现细节及消费者之间的负载均衡、故障转移(Failover)等,您可以通过消费组(ConsumerGroup)消费日志,消费组(ConsumerGroup)消费的实时性较强,通常为秒级。本文为您介绍通过消费组消费数据的操作步骤。

方案概览

一个 Logstore 中包含多个 Shard,通过消费组消费数据就是将 Shard 分配给一个消费组中的消费者,分配方式遵循以下原则。

  • 在一个消费组中,一个 Shard 只会分配到一个消费者。

  • 在一个消费组中,一个消费者可以被分配多个 Shard。

新的消费者加入消费组后,这个消费组下面的 Shard 从属关系会调整,以实现消费的负载均衡,但是仍遵循上述分配原则。

image

基本概念

名词

说明

消费组

日志服务支持通过消费组消费数据。一个消费组由多个消费者构成,同一个消费组中的消费者共同消费一个 Logstore 中的数据,各个消费者不会重复消费数据。

重要

每个 Logstore 中,最多创建 30 个消费组。

消费者

消费组的构成单元,实际承担消费任务。

重要

同一个消费组中的消费者名称必须不同。

Logstore

数据采集、存储和查询单元。更多信息,请参见 日志库(Logstore)

Shard

用于控制 Logstore 的读写能力,数据必定保存在某一个 Shard 中。更多信息,请参见 分区(Shard)

Checkpoint

消费位点,是程序消费到的最新位置。程序重启后,可以通过 Checkpoint 恢复消费进度。

说明

通过消费组消费,程序发生故障时,会默认保存 Checkpoint。在程序故障恢复时,能够从断点处继续消费,从而保证数据不会被重复消费。

前提条件

步骤一:创建消费组

下面分别介绍通过 SDK、API CLI 方式创建消费组。

通过 SDK 创建消费组

创建消费组代码如下所示:

CreateConsumerGroup.java

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
    public static void main(String[] args) throws LogException {
         // 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
        String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        // 输入Project名称。
        String projectName = "ali-test-project";
        // 输入Logstore名称。
        String logstoreName = "ali-test-logstore";
        // 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
        String host = "https://cn-hangzhou.log.aliyuncs.com";
        // 创建日志服务Client。
        Client client = new Client(host, accessId, accessKey);
        try {
            // 设置消费组名称。
            String consumerGroupName = "ali-test-consumergroup2";
            System.out.println("ready to create consumergroup");
            ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
            client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
            System.out.println(String.format("create consumergroup %s success", consumerGroupName));
        } catch (LogException e) {
            System.out.println("LogException e :" + e.toString());
            System.out.println("error code :" + e.GetErrorCode());
            System.out.println("error message :" + e.GetErrorMessage());
            throw e;
}

管理消费组的代码示例,请参见 使用 Java SDK 管理消费组 使用 Python SDK 管理消费组

通过 API 创建消费组

API 创建消费组,请参见 CreateConsumerGroup - 创建消费组

查询消费组是否创建成功,请参见 ListConsumerGroup - 查询消费组

通过 CLI 创建消费组

CLI 创建消费组,请参见 create_consumer_group

查询消费组是否创建成功,请参见 list_consumer_group

步骤二:消费日志

消费原理

消费组 SDK 的消费者在首次启动时,当消费组不存在时会创建消费组。起始消费位点是指创建消费组时的数据起始消费位点,该消费位点仅在第一次创建时有效。后续重启消费者时,消费者会从上次服务端保存的消费位点处继续消费。以本示例为例:

  • LogHubConfig.ConsumePosition.BEGIN_CURSOR :消费组从头开始消费日志,起始消费位点为 Logstore 中的第一条日志。

  • LogHubConfig.ConsumePosition.END_CURSOR :此消费位点记录 Logstore 日志的最后一条日志之后。

消费示例

您可以通过 Java、C++、Python Go SDK 实现消费组消费数据,此处以 Java SDK 为例。

示例一:SDK 消费

  1. 添加 Maven 依赖。

    pom.xml 文件中,添加以下代码:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.50</version>
    </dependency>
  2. 创建消费逻辑,代码如下所示:

    SampleLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import java.util.List;
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 记录上次持久化Checkpoint的时间。
        private long mLastSaveTime = 0;
        // initialize 方法会在 processor 对象初始化时被调用一次
        public void initialize(int shardId) {
            this.shardId = shardId;
        // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // 打印已获取的数据。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
            long curTime = System.currentTimeMillis();
            // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。
                    checkPointTracker.saveCheckPoint(false);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            return null;
        // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // 将Checkpoint立即保存到服务端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
    }

    更多示例代码,请参见 aliyun-log-consumer-java Aliyun LOG Go Consumer

  3. 创建消费者实体,代码如下所示:

    SampleLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SampleLogHubProcessor 对象。
            return new SampleLogHubProcessor();
    }
  4. 创建一个消费者并启动一个消费者线程,该消费者会从指定的 Logstore 中消费数据。代码如下所示:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    public class Main {
        // 日志服务的服务接入点,请您根据实际情况填写。
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。
        private static String Project = "ali-test-project";
        // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。
        private static String Logstore = "ali-test-logstore";
        // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // 本示例从环境变量中获取AccessKey IDAccessKey Secret。。
        private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。
            // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // 调用WorkerShutdown函数,退出消费实例,关联的线程也会自动停止。
            worker.shutdown();
            // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep30秒。
            Thread.sleep(30 * 1000);
    }
  5. 运行 Main.java

    以模拟消费 Nginx 日志为例,打印日志如下:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
        __receive_time__    :    1635629877
    --------
    ......

示例二:SDK 基于 SPL 消费

  1. 添加 Maven 依赖。

    pom.xml 文件中,添加以下代码:

    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.47</version>
    </dependency>
  2. 创建消费逻辑,代码如下所示:

    SPLLogHubProcessor.java

    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import java.util.List;
    public class SPLLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // 记录上次持久化Checkpoint的时间。
        private long mLastSaveTime = 0;
        // initialize 方法会在 processor 对象初始化时被调用一次
        public void initialize(int shardId) {
            this.shardId = shardId;
        // 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
        public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
            // 打印已获取的数据。
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
                System.out.println("Tags");
                for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
                    FastLogTag logTag = fastLogGroup.getLogTags(i);
                    System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
                for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
                    FastLog log = fastLogGroup.getLogs(i);
                    System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int j = 0; j < log.getContentsCount(); ++j) {
                        FastLogContent content = log.getContents(j);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
            long curTime = System.currentTimeMillis();
            // 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
            try {
                if (curTime - mLastSaveTime > 30 * 1000) {
                    // 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。
                    checkPointTracker.saveCheckPoint(true);
                    mLastSaveTime = curTime;
                } else {
                    // 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。
                    checkPointTracker.saveCheckPoint(false);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            return null;
        // 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // 将Checkpoint立即保存到服务端。
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
    }
  3. 创建消费者实体,代码如下所示:

    SPLLogHubProcessorFactory.java

    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SPLLogHubProcessor 对象。
            return new SPLLogHubProcessor();
    }
  4. 创建一个消费者并启动一个消费者线程,该消费者会从指定的 Logstore 中消费数据。代码如下所示:

    Main.java

    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    public class Main {
        // 日志服务的服务接入点,请您根据实际情况填写。
        private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
        // 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。
        private static String Project = "ali-test-project";
        // 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。
        private static String Logstore = "ali-test-logstore";
        // 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。
        private static String ConsumerGroup = "ali-test-consumergroup2";
        // 本示例从环境变量中获取AccessKey IDAccessKey Secret。。
        private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。
            // maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。
            LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
            // setQuery可以设置消费过程中的SLS SPL语句
            config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
            ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // 调用WorkerShutdown函数,退出消费实例,关联的线程也会自动停止。
            worker.shutdown();
            // ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep30秒。
            Thread.sleep(30 * 1000);
    }
  5. 运行 Main.java

    以模拟消费 Nginx 日志为例,打印日志如下:

    :    GET
    request_uri    :    /request/path-3/file-7
    status    :    200
    body_bytes_sent    :    3820
    host    :    www.example.com
    request_time    :    43
    request_length    :    1987
    http_user_agent    :    Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
    http_referer    :    www.example.com
    http_x_forwarded_for    :    192.168.10.196
    upstream_response_time    :    0.02
    --------
    Log: 158, time: 1635629778, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
        __receive_time__    :    1635629815
    --------
    Log: 0, time: 1635629788, GetContentCount: 14
    ......
        category    :    null
        source    :    127.0.0.1
        topic    :    nginx_access_log
        machineUUID    :    null
        __receive_time__    :    1635629877
    --------
    ......

步骤三:查看消费组状态

下面介绍两种查看消费组状态方式:

通过 Java SDK 查看消费组状态

  1. 查看每个 Shard 消费数据的进度。代码如下所示:

    ConsumerGroupTest.java

    import java.util.List;
    import com.aliyun.openservices.log.Client;
    import com.aliyun.openservices.log.common.Consts.CursorMode;
    import com.aliyun.openservices.log.common.ConsumerGroup;
    import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
    import com.aliyun.openservices.log.exception.LogException;
    public class ConsumerGroupTest {
        static String endpoint = "cn-hangzhou.log.aliyuncs.com";
        static String project = "ali-test-project";
        static String logstore = "ali-test-logstore";
        static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static void main(String[] args) throws LogException {
            Client client = new Client(endpoint, accesskeyId, accesskey);
            // 获取Logstore下的所有消费组。如果消费组不存在,则长度为0。
            List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
            for(ConsumerGroup c: consumerGroups){
                // 打印消费组的属性,包括名称、心跳超时时间、是否按序消费。
                System.out.println("名称: " + c.getConsumerGroupName());
                System.out.println("心跳超时时间: " + c.getTimeout());
                System.out.println("按序消费: " + c.isInOrder());
                for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                    System.out.println("shard: " + cp.getShard());
                    // 该时间精确到微秒,类型为长整型。
                    System.out.println("最后一次更新消费进度的时间: " + cp.getUpdateTime());
                    System.out.println("消费者名称: " + cp.getConsumer());
                    String consumerPrg = "";
                    if(cp.getCheckPoint().isEmpty())
                        consumerPrg = "尚未开始消费";
                    else{
                        // Unix时间戳,单位是秒,输出时请注意格式化。
                            int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                            consumerPrg = "" + prg;
                        catch(LogException e){
                            if(e.GetErrorCode() == "InvalidCursor")
                                consumerPrg = "非法,前一次消费时刻已经超出了Logstore中数据的生命周期";
                            else{
                                // internal server error
                                throw e;
                    System.out.println("消费进度: " + consumerPrg);
                    String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                    int endPrg = 0;
                        endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                    catch(LogException e){
                        // do nothing
                    //Unix时间戳,单位:秒。输出时,请注意格式化。
                    System.out.println("最后一条数据到达时刻: " + endPrg);
    }
  2. 返回结果如下所示:

    名称: ali-test-consumergroup2
    心跳超时时间: 60
    按序消费: false
    shard: 0
    最后一次更新消费进度的时间: 0
    消费者名称: consumer_1
    消费进度: 尚未开始消费
    最后一条数据到达时刻: 1729583617
    shard: 1
    最后一次更新消费进度的时间: 0
    消费者名称: consumer_1
    消费进度: 尚未开始消费
    最后一条数据到达时刻: 1729583738
    Process finished with exit code 0

在控制台查看消费组状态

  1. 登录 日志服务控制台

  2. Project 列表区域,单击目标 Project。

    image

  3. 日志存储 > 日志库 页签中,单击目标 Logstore 左侧的 展开节点 图标,然后单击 数据消费 左侧的 展开节点 图标。

  4. 在消费组列表中,单击目标消费组。

  5. Consumer Group 状态 页面,查看每个 Shard 消费数据的进度。 image

相关操作

  • RAM 用户授权

    使用 RAM 用户操作时,需授予 RAM 用户操作消费组的相关权限。具体操作,请参见 创建 RAM 用户及授权

    授权的动作(Action)如下表所示。

    动作(Action)

    说明

    授权策略中的资源描述方式(Resource)

    log:GetCursorOrData( GetCursor - 通过时间查询 Cursor

    根据时间获取游标(cursor)。

    acs:log: ${regionName} : ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName}

    log:CreateConsumerGroup( CreateConsumerGroup - 创建消费组

    在指定的 Logstore 上创建一个消费组。

    acs:log: ${regionName} : ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    log:ListConsumerGroup( ListConsumerGroup - 查询消费组

    查询指定 Logstore 的所有消费组。

    acs:log: ${regionName} : ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/*

    log:ConsumerGroupUpdateCheckPoint( ConsumerGroupUpdateCheckPoint - 更新消费进度

    更新指定消费组的某个 Shard Checkpoint。

    acs:log: ${regionName} : ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    log:ConsumerGroupHeartBeat( ConsumerGroupHeartBeat - 消费者发送心跳到服务端

    为指定消费者发送心跳到服务端。

    acs:log: ${regionName} : ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    log:UpdateConsumerGroup( UpdateConsumerGroup - 更新消费者组

    修改指定消费组属性。

    acs:log: ${regionName} : ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    log:GetConsumerGroupCheckPoint( GetCheckPoint - 获取指定消费组的消费点

    获取指定消费组消费的某个或者所有 Shard Checkpoint。

    acs:log: ${regionName} : ${projectOwnerAliUid} :project/ ${projectName} /logstore/ ${logstoreName} /consumergroup/ ${consumerGroupName}

    例如,消费组的相关资源信息如下所示,您要通过 RAM 用户操作该消费组,则需为 RAM 用户授予以下权限。

    • Project 所属的阿里云账号:174649****602745。

    • Project 所在地域 ID:cn-hangzhou。

    • Project 名称:project-test。

    • Logstore 名称:logstore-test。

    • 消费组名称:consumergroup-test。

    授权代码如下所示:

    {
      "Version": "1",
      "Statement": [
          "Effect": "Allow",
          "Action": [
            "log:GetCursorOrData"
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
          "Effect": "Allow",
          "Action": [
            "log:CreateConsumerGroup",
            "log:ListConsumerGroup"
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
          "Effect": "Allow",
          "Action": [
            "log:ConsumerGroupUpdateCheckPoint",
            "log:ConsumerGroupHeartBeat",
            "log:UpdateConsumerGroup",
            "log:GetConsumerGroupCheckPoint"
          "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
    }
  • 异常诊断

    建议您为消费者程序配置 Log4j,将消费组内部遇到的异常信息打印出来,便于定位。log4j.properties 典型配置:

    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

    配置 Log4j 后,执行消费者程序可以看到类似如下异常信息:

    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • 通过消费组消费从某个时间开始的数据

    // consumerStartTimeInSeconds表示消费这个时间点之后的数据。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    // position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    说明
    • 按照消费需求,请您使用不同的构造方法。

    • 当服务端已保存 Checkpoint,则开始消费位置以服务端保存的 Checkpoint 为准。

    • 日志服务消费数据时,默认优先使用 Checkpoint 作为消费点。当您指定从固定时间点开始消费数据时,必须保证 consumerStartTimeInSeconds 时间点落到 TTL 周期内,否则会造成消费不生效。

  • 重置 Checkpoint

    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            // 这里 timestamp 需要是以秒为单位的 unix timestamp,如果您的时间戳以毫秒为单位,需要如下所示除以1000
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
        }

相关文档