相关文章推荐
骑白马的小蝌蚪  ·  网络伦理缺失类型三:网络暴力-华中师范大学新 ...·  2 年前    · 
求醉的胡萝卜  ·  解析路虎卫士PHEV,5.6秒破百,_懂车帝·  2 年前    · 
天涯  ·  中国人很聪明,为什么现代科学却起源于西方? ...·  2 年前    · 
天涯  ·  生产力与生产关系- 知乎·  3 年前    · 
中国一点蓝  ·  两个眼睛看东西重影,但是遮挡任何一个,只用一 ...·  5 年前    · 
Code  ›  kafka删除topic消息的四种方式[通俗易懂]开发者社区
delete kafka命令 kafka topic
https://cloud.tencent.com/developer/article/2150430
腼腆的卡布奇诺
2 年前
作者头像
全栈程序员站长
0 篇文章

kafka删除topic消息的四种方式[通俗易懂]

前往专栏
腾讯云
开发者社区
文档 意见反馈 控制台
首页
学习
活动
专区
工具
TVP
文章/答案/技术大牛
发布
首页
学习
活动
专区
工具
TVP
返回腾讯云官网
社区首页 > 专栏 > 全栈程序员必看 > kafka删除topic消息的四种方式[通俗易懂]

kafka删除topic消息的四种方式[通俗易懂]

作者头像
全栈程序员站长
发布 于 2022-11-03 15:54:53
6.7K 0
发布 于 2022-11-03 15:54:53
举报

方法一:快速配置删除法(简单粗暴,如果这个主题有程序还在消费者,此时KAFKA就game over)

1.kafka启动之前,在server.properties配置delete.topic.enable=true

2.执行命令bin/kafka-topics.sh –delete –topic test –zookeeper zk:2181或者使用 kafka-manager 集群管理工具删除

注意:如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记为marked for deletion,加上配置,重启kafka,之前的topick就真正删除了。

方法二:设置删除策略(简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over)

1.kafka启动之前,在server.properties配置

#日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete
# 注意:下面有两种配置,一种是基于时间的策略,另种是基于日志文件大小的策略,两种策略同是配置的话,只要满足其中种策略,则触发Log删除的操作。删除操作总是先删除最旧的日志
# 消息在Kafka中保存的时间,168小时之前的1og, 可以被删除掉,根据policy处理数据。
log.retention.hours=4
# 当剩余空间低于log.retention.bytes字节,则开始删除1og
log.retention.bytes=37580963840
# 每隔300000ms, logcleaner线程将检查一次,看是否符合上述保留策略的消息可以被删除
log.retention.check.interval.ms=1000

方法三:手动删除法(不推荐)(简单粗暴,如果这个消息有程序还在消费者,此时KAFKA就game over)

前提:不允许更改server.properties配置

1.删除zk下面topic(test)

启动bin/zkCli.sh ls /brokers/topics rmr /brokers/topics/test ls /brokers/topics 查topic是否删除:bin/kafka-topics.sh –list –zookeeper zk:2181

2.删除各broker下topic数据,默认目录为/tmp/kafka-logs

方法四:偏移量(看起来你最友好,会程序的你推荐)

package com.censoft.kafkaAdmin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import java.sql.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
* @author zy Zhang
* @version : 1.0
* @Description
* @since 2020/7/13 16:02
public class DeleteReordsByOffset {
public static void main(String[] args) throws ClassNotFoundException {
// 1.创建kafkaAdminClient
Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.27.111:9092");
AdminClient kafkaAdminClient = KafkaAdminClient.create(properties);
// 2.从数据库获取需要删除的消息
Class.forName("com.mysql.jdbc.Driver");
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
String url  = "jdbc:mysql://localhost:3306/test?useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF-8";
String user = "root";
String password = "123456";
Connection conn = null;
Statement statement = null;
ResultSet res = null;
String sql = "SELECT Topic, KafkaPartition, UntilOffset FROM Kafka_Offset;";
try {
conn = DriverManager.getConnection(url, user, password);
statement = conn.createStatement();
res = statement.executeQuery(sql);
if (res != null) {
while (res.next()) {
String topic = res.getString("Topic");
Integer partition = res.getInt("KafkaPartition");
Long offset = res.getLong("UntilOffset");
TopicPartition topicPartition = new TopicPartition(topic, partition);
RecordsToDelete recordsToDelete1 = RecordsToDelete.beforeOffset(offset);
recordsToDelete.put(topicPartition, recordsToDelete1);
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
e.printStackTrace();
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
// 3.执行删除
DeleteRecordsResult result = kafkaAdminClient.deleteRecords(recordsToDelete);
Map<TopicPartition, KafkaFuture<DeletedRecords>> lowWatermarks = result.lowWatermarks();
try {
for (Map.Entry<TopicPartition, KafkaFuture<DeletedRecords>> entry : lowWatermarks.entrySet()) {
System.out.println(entry.getKey().topic() + " " + entry.getKey().partition() + " " + entry.getValue().get().lowWatermark());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
kafkaAdminClient.close();
}

2020-11-27 补充说明:

		目前发现通过这种方法起到的效果是:
topic的起始偏移量会被定位到传入的recordsToDelete指定的位置
但是并没有将磁盘中存储的数据删除
如果我找到在磁盘删除的方法会继续更新,看下面

2020-11-30 补充说明:

 
推荐文章
骑白马的小蝌蚪  ·  网络伦理缺失类型三:网络暴力-华中师范大学新闻传播学院案例库
2 年前
求醉的胡萝卜  ·  解析路虎卫士PHEV,5.6秒破百,_懂车帝
2 年前
天涯  ·  中国人很聪明,为什么现代科学却起源于西方? - 知乎
2 年前
天涯  ·  生产力与生产关系- 知乎
3 年前
中国一点蓝  ·  两个眼睛看东西重影,但是遮挡任何一个,只用一只眼看会很清晰,之前以为是散光,换了眼镜后还不行,求解答? - 知乎
5 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号