方案概览
一个
Logstore
中包含多个
Shard,通过消费组消费数据就是将
Shard
分配给一个消费组中的消费者,分配方式遵循以下原则。
新的消费者加入消费组后,这个消费组下面的
Shard
从属关系会调整,以实现消费的负载均衡,但是仍遵循上述分配原则。
基本概念
|
名词
|
说明
|
|
消费组
|
日志服务支持通过消费组消费数据。一个消费组由多个消费者构成,同一个消费组中的消费者共同消费一个
Logstore
中的数据,各个消费者不会重复消费数据。
重要
每个
Logstore
中,最多创建
30
个消费组。
|
|
消费者
|
消费组的构成单元,实际承担消费任务。
|
|
Logstore
|
数据采集、存储和查询单元。更多信息,请参见
日志库(Logstore)
。
|
|
Shard
|
用于控制
Logstore
的读写能力,数据必定保存在某一个
Shard
中。更多信息,请参见
分区(Shard)
。
|
|
Checkpoint
|
消费位点,是程序消费到的最新位置。程序重启后,可以通过
Checkpoint
恢复消费进度。
说明
通过消费组消费,程序发生故障时,会默认保存
Checkpoint。在程序故障恢复时,能够从断点处继续消费,从而保证数据不会被重复消费。
|
步骤一:创建消费组
下面分别介绍通过
SDK、API
和
CLI
方式创建消费组。
通过
SDK
创建消费组
创建消费组代码如下所示:
CreateConsumerGroup.java
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.exception.LogException;
public class CreateConsumerGroup {
public static void main(String[] args) throws LogException {
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。
String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// 输入Project名称。
String projectName = "ali-test-project";
// 输入Logstore名称。
String logstoreName = "ali-test-logstore";
// 设置日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写。
String host = "https://cn-hangzhou.log.aliyuncs.com";
// 创建日志服务Client。
Client client = new Client(host, accessId, accessKey);
try {
// 设置消费组名称。
String consumerGroupName = "ali-test-consumergroup2";
System.out.println("ready to create consumergroup");
ConsumerGroup consumerGroup = new ConsumerGroup(consumerGroupName, 300, true);
client.CreateConsumerGroup(projectName, logstoreName, consumerGroup);
System.out.println(String.format("create consumergroup %s success", consumerGroupName));
} catch (LogException e) {
System.out.println("LogException e :" + e.toString());
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
管理消费组的代码示例,请参见
使用
Java SDK
管理消费组
、
使用
Python SDK
管理消费组
。
步骤二:消费日志
消费原理
消费组
SDK
的消费者在首次启动时,当消费组不存在时会创建消费组。起始消费位点是指创建消费组时的数据起始消费位点,该消费位点仅在第一次创建时有效。后续重启消费者时,消费者会从上次服务端保存的消费位点处继续消费。以本示例为例:
消费示例
您可以通过
Java、C++、Python
及
Go SDK
实现消费组消费数据,此处以
Java SDK
为例。
示例一:SDK
消费
-
添加
Maven
依赖。
在
pom.xml
文件中,添加以下代码:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.50</version>
</dependency>
-
创建消费逻辑,代码如下所示:
SampleLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SampleLogHubProcessor implements ILogHubProcessor {
private int shardId;
// 记录上次持久化Checkpoint的时间。
private long mLastSaveTime = 0;
// initialize 方法会在 processor 对象初始化时被调用一次
public void initialize(int shardId) {
this.shardId = shardId;
// 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// 打印已获取的数据。
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
long curTime = System.currentTimeMillis();
// 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
// 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。
checkPointTracker.saveCheckPoint(false);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
return null;
// 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
// 将Checkpoint立即保存到服务端。
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
更多示例代码,请参见
aliyun-log-consumer-java
、
Aliyun LOG Go Consumer
。
-
创建消费者实体,代码如下所示:
SampleLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SampleLogHubProcessor 对象。
return new SampleLogHubProcessor();
}
-
创建一个消费者并启动一个消费者线程,该消费者会从指定的
Logstore
中消费数据。代码如下所示:
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// 日志服务的服务接入点,请您根据实际情况填写。
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。
private static String Project = "ali-test-project";
// 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。
private static String Logstore = "ali-test-logstore";
// 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。
private static String ConsumerGroup = "ali-test-consumergroup2";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。。
private static String AccessKeyId= System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。
// maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR,1000);
ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
// 调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
worker.shutdown();
// ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep为30秒。
Thread.sleep(30 * 1000);
}
-
运行
Main.java
。
以模拟消费
Nginx
日志为例,打印日志如下:
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
__receive_time__ : 1635629877
--------
......
示例二:SDK
基于
SPL
消费
-
添加
Maven
依赖。
在
pom.xml
文件中,添加以下代码:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>loghub-client-lib</artifactId>
<version>0.6.47</version>
</dependency>
-
创建消费逻辑,代码如下所示:
SPLLogHubProcessor.java
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogContent;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.common.FastLogTag;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import java.util.List;
public class SPLLogHubProcessor implements ILogHubProcessor {
private int shardId;
// 记录上次持久化Checkpoint的时间。
private long mLastSaveTime = 0;
// initialize 方法会在 processor 对象初始化时被调用一次
public void initialize(int shardId) {
this.shardId = shardId;
// 消费数据的主逻辑,消费时的所有异常都需要处理,不能直接抛出。
public String process(List<LogGroupData> logGroups, ILogHubCheckPointTracker checkPointTracker) {
// 打印已获取的数据。
for (LogGroupData logGroup : logGroups) {
FastLogGroup fastLogGroup = logGroup.GetFastLogGroup();
System.out.println("Tags");
for (int i = 0; i < fastLogGroup.getLogTagsCount(); ++i) {
FastLogTag logTag = fastLogGroup.getLogTags(i);
System.out.printf("%s : %s\n", logTag.getKey(), logTag.getValue());
for (int i = 0; i < fastLogGroup.getLogsCount(); ++i) {
FastLog log = fastLogGroup.getLogs(i);
System.out.println("--------\nLog: " + i + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
for (int j = 0; j < log.getContentsCount(); ++j) {
FastLogContent content = log.getContents(j);
System.out.println(content.getKey() + "\t:\t" + content.getValue());
long curTime = System.currentTimeMillis();
// 每隔30秒,写一次Checkpoint到服务端。如果30秒内发生Worker异常终止,新启动的Worker会从上一个Checkpoint获取消费数据,可能存在少量的重复数据。
try {
if (curTime - mLastSaveTime > 30 * 1000) {
// 参数为true表示立即手动将Checkpoint更新到服务端。此外,默认每60秒会自动将内存中缓存的Checkpoint更新到服务端。
checkPointTracker.saveCheckPoint(true);
mLastSaveTime = curTime;
} else {
// 参数为false表示将Checkpoint缓存在本地,可被自动更新Checkpoint机制更新到服务端。
checkPointTracker.saveCheckPoint(false);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
return null;
// 当Worker退出时,会调用该函数,您可以在此处执行清理工作。
public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
// 将Checkpoint立即保存到服务端。
try {
checkPointTracker.saveCheckPoint(true);
} catch (LogHubCheckPointException e) {
e.printStackTrace();
}
-
创建消费者实体,代码如下所示:
SPLLogHubProcessorFactory.java
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
class SPLLogHubProcessorFactory implements ILogHubProcessorFactory {
public ILogHubProcessor generatorProcessor() {
// 生成一个消费实例。注意:每次调用 generatorProcessor 方法,都应该返回一个新的 SPLLogHubProcessor 对象。
return new SPLLogHubProcessor();
}
-
创建一个消费者并启动一个消费者线程,该消费者会从指定的
Logstore
中消费数据。代码如下所示:
Main.java
import com.aliyun.openservices.loghub.client.ClientWorker;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
public class Main {
// 日志服务的服务接入点,请您根据实际情况填写。
private static String Endpoint = "cn-hangzhou.log.aliyuncs.com";
// 日志服务项目名称,请您根据实际情况填写。请从已创建项目中获取项目名称。
private static String Project = "ali-test-project";
// 日志库名称,请您根据实际情况填写。请从已创建日志库中获取日志库名称。
private static String Logstore = "ali-test-logstore";
// 消费组名称,请您根据实际情况填写。您无需提前创建,该程序运行时会自动创建该消费组。
private static String ConsumerGroup = "ali-test-consumergroup2";
// 本示例从环境变量中获取AccessKey ID和AccessKey Secret。。
private static String AccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String AccessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
// consumer_1是消费者名称,同一个消费组下面的消费者名称必须不同。不同消费者在多台机器上启动多个进程,均衡消费一个Logstore时,消费者名称可以使用机器IP地址来区分。
// maxFetchLogGroupSize用于设置每次从服务端获取的LogGroup最大数目,使用默认值即可。您可以使用config.setMaxFetchLogGroupSize(100);调整,取值范围为(0,1000]。
LogHubConfig config = new LogHubConfig(ConsumerGroup, "consumer_1", Endpoint, Project, Logstore, AccessKeyId, AccessKeySecret, LogHubConfig.ConsumePosition.BEGIN_CURSOR, 1000);
// setQuery可以设置消费过程中的SLS SPL语句
config.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
ClientWorker worker = new ClientWorker(new SPLLogHubProcessorFactory(), config);
Thread thread = new Thread(worker);
// Thread运行之后,ClientWorker会自动运行,ClientWorker扩展了Runnable接口。
thread.start();
Thread.sleep(60 * 60 * 1000);
// 调用Worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
worker.shutdown();
// ClientWorker运行过程中会生成多个异步的任务。Shutdown完成后,请等待还在执行的任务安全退出。建议设置sleep为30秒。
Thread.sleep(30 * 1000);
}
-
运行
Main.java
。
以模拟消费
Nginx
日志为例,打印日志如下:
: GET
request_uri : /request/path-3/file-7
status : 200
body_bytes_sent : 3820
host : www.example.com
request_time : 43
request_length : 1987
http_user_agent : Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36
http_referer : www.example.com
http_x_forwarded_for : 192.168.10.196
upstream_response_time : 0.02
--------
Log: 158, time: 1635629778, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
__receive_time__ : 1635629815
--------
Log: 0, time: 1635629788, GetContentCount: 14
......
category : null
source : 127.0.0.1
topic : nginx_access_log
machineUUID : null
__receive_time__ : 1635629877
--------
......
步骤三:查看消费组状态
下面介绍两种查看消费组状态方式:
通过
Java SDK
查看消费组状态
-
查看每个
Shard
消费数据的进度。代码如下所示:
ConsumerGroupTest.java
import java.util.List;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
static String endpoint = "cn-hangzhou.log.aliyuncs.com";
static String project = "ali-test-project";
static String logstore = "ali-test-logstore";
static String accesskeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accesskey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
public static void main(String[] args) throws LogException {
Client client = new Client(endpoint, accesskeyId, accesskey);
// 获取Logstore下的所有消费组。如果消费组不存在,则长度为0。
List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
for(ConsumerGroup c: consumerGroups){
// 打印消费组的属性,包括名称、心跳超时时间、是否按序消费。
System.out.println("名称: " + c.getConsumerGroupName());
System.out.println("心跳超时时间: " + c.getTimeout());
System.out.println("按序消费: " + c.isInOrder());
for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
System.out.println("shard: " + cp.getShard());
// 该时间精确到微秒,类型为长整型。
System.out.println("最后一次更新消费进度的时间: " + cp.getUpdateTime());
System.out.println("消费者名称: " + cp.getConsumer());
String consumerPrg = "";
if(cp.getCheckPoint().isEmpty())
consumerPrg = "尚未开始消费";
else{
// Unix时间戳,单位是秒,输出时请注意格式化。
int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
consumerPrg = "" + prg;
catch(LogException e){
if(e.GetErrorCode() == "InvalidCursor")
consumerPrg = "非法,前一次消费时刻已经超出了Logstore中数据的生命周期";
else{
// internal server error
throw e;
System.out.println("消费进度: " + consumerPrg);
String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
int endPrg = 0;
endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
catch(LogException e){
// do nothing
//Unix时间戳,单位:秒。输出时,请注意格式化。
System.out.println("最后一条数据到达时刻: " + endPrg);
}
-
返回结果如下所示:
名称: ali-test-consumergroup2
心跳超时时间: 60
按序消费: false
shard: 0
最后一次更新消费进度的时间: 0
消费者名称: consumer_1
消费进度: 尚未开始消费
最后一条数据到达时刻: 1729583617
shard: 1
最后一次更新消费进度的时间: 0
消费者名称: consumer_1
消费进度: 尚未开始消费
最后一条数据到达时刻: 1729583738
Process finished with exit code 0
在控制台查看消费组状态
-
登录
日志服务控制台
。
-
在
Project
列表区域,单击目标
Project。
-
在
页签中,单击目标
Logstore
左侧的
图标,然后单击
数据消费
左侧的
图标。
-
在消费组列表中,单击目标消费组。
-
在
Consumer Group
状态
页面,查看每个
Shard
消费数据的进度。
相关操作
-
RAM
用户授权
使用
RAM
用户操作时,需授予
RAM
用户操作消费组的相关权限。具体操作,请参见
创建
RAM
用户及授权
。
授权的动作(Action)如下表所示。
|
动作(Action)
|
说明
|
授权策略中的资源描述方式(Resource)
|
|
log:GetCursorOrData(
GetCursor - 通过时间查询
Cursor
)
|
根据时间获取游标(cursor)。
|
acs:log:
${regionName}
:
${projectOwnerAliUid}
:project/
${projectName}
/logstore/
${logstoreName}
|
|
log:CreateConsumerGroup(
CreateConsumerGroup - 创建消费组
)
|
在指定的
Logstore
上创建一个消费组。
|
acs:log:
${regionName}
:
${projectOwnerAliUid}
:project/
${projectName}
/logstore/
${logstoreName}
/consumergroup/
${consumerGroupName}
|
|
log:ListConsumerGroup(
ListConsumerGroup - 查询消费组
)
|
查询指定
Logstore
的所有消费组。
|
acs:log:
${regionName}
:
${projectOwnerAliUid}
:project/
${projectName}
/logstore/
${logstoreName}
/consumergroup/*
|
|
log:ConsumerGroupUpdateCheckPoint(
ConsumerGroupUpdateCheckPoint - 更新消费进度
)
|
更新指定消费组的某个
Shard
的
Checkpoint。
|
acs:log:
${regionName}
:
${projectOwnerAliUid}
:project/
${projectName}
/logstore/
${logstoreName}
/consumergroup/
${consumerGroupName}
|
|
log:ConsumerGroupHeartBeat(
ConsumerGroupHeartBeat - 消费者发送心跳到服务端
)
|
为指定消费者发送心跳到服务端。
|
acs:log:
${regionName}
:
${projectOwnerAliUid}
:project/
${projectName}
/logstore/
${logstoreName}
/consumergroup/
${consumerGroupName}
|
|
log:UpdateConsumerGroup(
UpdateConsumerGroup - 更新消费者组
)
|
修改指定消费组属性。
|
acs:log:
${regionName}
:
${projectOwnerAliUid}
:project/
${projectName}
/logstore/
${logstoreName}
/consumergroup/
${consumerGroupName}
|
|
log:GetConsumerGroupCheckPoint(
GetCheckPoint - 获取指定消费组的消费点
)
|
获取指定消费组消费的某个或者所有
Shard
的
Checkpoint。
|
acs:log:
${regionName}
:
${projectOwnerAliUid}
:project/
${projectName}
/logstore/
${logstoreName}
/consumergroup/
${consumerGroupName}
|
例如,消费组的相关资源信息如下所示,您要通过
RAM
用户操作该消费组,则需为
RAM
用户授予以下权限。
-
Project
所属的阿里云账号:174649****602745。
-
Project
所在地域
ID:cn-hangzhou。
-
Project
名称:project-test。
-
Logstore
名称:logstore-test。
-
消费组名称:consumergroup-test。
授权代码如下所示:
{
"Version": "1",
"Statement": [
"Effect": "Allow",
"Action": [
"log:GetCursorOrData"
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
"Effect": "Allow",
"Action": [
"log:CreateConsumerGroup",
"log:ListConsumerGroup"
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
"Effect": "Allow",
"Action": [
"log:ConsumerGroupUpdateCheckPoint",
"log:ConsumerGroupHeartBeat",
"log:UpdateConsumerGroup",
"log:GetConsumerGroupCheckPoint"
"Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
}
-
异常诊断
建议您为消费者程序配置
Log4j,将消费组内部遇到的异常信息打印出来,便于定位。log4j.properties
典型配置:
log4j.rootLogger = info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
配置
Log4j
后,执行消费者程序可以看到类似如下异常信息:
[WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
-
通过消费组消费从某个时间开始的数据
// consumerStartTimeInSeconds表示消费这个时间点之后的数据。
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
int consumerStartTimeInSeconds);
// position是个枚举变量,LogHubConfig.ConsumePosition.BEGIN_CURSOR表示从最老的数据开始消费,LogHubConfig.ConsumePosition.END_CURSOR表示从最新的数据开始消费。
public LogHubConfig(String consumerGroupName,
String consumerName,
String loghubEndPoint,
String project, String logStore,
String accessId, String accessKey,
ConsumePosition position);
-
重置
Checkpoint
public static void updateCheckpoint() throws Exception {
Client client = new Client(host, accessId, accessKey);
// 这里 timestamp 需要是以秒为单位的 unix timestamp,如果您的时间戳以毫秒为单位,需要如下所示除以1000
long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
for (Shard shard : response.GetShards()) {
int shardId = shard.GetShardId();
String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
}