import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.shi.util.RabbitMqUtils;
* 通配符模式 - topic
* @author SHF
* @version 创建时间:2018年7月4日 下午5:53:29
public class TopicTest {
//交换机名称
private final static String EXCHANGE_NAME = "exchange_topic";
//路由 key
private final static String KEY_1 ="a.1";
private final static String KEY_2 ="key.*";
private final static String KEY_3 ="#.#";
//队列名称
private final static String QUEUE_1 ="queue_topic_1";
private final static String QUEUE_2 ="queue_topic_2";
* 生产者 - 路由模式
* KEY_1 ="a";
* @author SHF
* @version 创建时间:2018年7月4日 下午4:20:39
* @throws TimeoutException
* @throws IOException
@Test
public void send()throws IOException, TimeoutException {
//1 获取链接及mq 通道
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//2 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//3 消息内容
String message = " 施爷 通配符模式 topic 向你发送了一条消息....";
channel.basicPublish(EXCHANGE_NAME, KEY_1, null, message.getBytes());
System.out.println(" [x] sent:"+message);
//4关闭通道及连接
channel.close();
connection.close();
* 消费者1 - 路由模式
* KEY_2 ="b";
* @author SHF
* @version 创建时间:2018年7月4日 下午4:33:55
* @throws TimeoutException
* @throws IOException
* @throws InterruptedException
* @throws ConsumerCancelledException
* @throws ShutdownSignalException
@Test
public void reic1() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//1 获取连接 及 通道
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//2 声明队列
channel.queueDeclare(QUEUE_1, false, false, false, null);
//3 绑定交换机,指定路由
channel.queueBind(QUEUE_1, EXCHANGE_NAME, KEY_2);
//4 同一个服务器只会发送一条消息给消费者
channel.basicQos(1);
//5 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//6 监听队列,手动返回完成
channel.basicConsume(QUEUE_1, false,consumer);
//7 获取消息
while(true) {
Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println( "[x] reiv1 :" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
* 消费者2 - 路由模式
* KEY_3 ="a"
* @author SHF
* @version 创建时间:2018年7月4日 下午4:33:55
* @throws TimeoutException
* @throws IOException
* @throws InterruptedException
* @throws ConsumerCancelledException
* @throws ShutdownSignalException
@Test
public void reic2() throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//1 获取连接 及 通道
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//2 声明队列
channel.queueDeclare(QUEUE_2, false, false, false, null);
//3 绑定交换机,指定路由
channel.queueBind(QUEUE_2, EXCHANGE_NAME, KEY_3);
//4 同一个服务器只会发送一条消息给消费者
channel.basicQos(1);
//5 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
//6 监听队列,手动返回完成
channel.basicConsume(QUEUE_2, false,consumer);
//7 获取消息
while(true) {
Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println( "[x] reiv2 :" + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
2019独角兽企业重金招聘Python工程师标准>>> ...
flume使用
通配符
批量消费
kafka
的
Topic
#指定
kafka
topic
使用注释的这个
#
kafka
_
topic
s: "optics-production-data"
#flume使用
通配符
kafka
_
topic
s_regex: "optics-(.*)"
接到领导的一个需求,希望封装一下
kafka
的消费者,可以从配置读取
topic
进行消费;一开始首先想到的是用
java
kafka
的高阶api手工根据
topic
创建消费者,一个
topic
创建一个消费者,依赖zookeeper完成
kafka
内部的balance和其他管理。后来领导又提出不要依赖zookeeper,之前老是rebalance失败。
调研了一下,手工实现类似sp...
3.2
Topic
配置
与
topic
相关的配置,服务器的默认值,也可可选择的覆盖指定的
topic
。如果没有给出指定
topic
的配置,则将使用服务器默认值。 可以通过-config选项在
topic
创建时设置。此示例使用自定义最大消息的大小和刷新率,创建一个名为my-
topic
的
topic
:
> bin/
kafka
-...
提出了一个更普遍的问题, 带任意长度
通配符
的
模式
匹配问题(Pattern matching with
arbitrary-length wildcards, PMAW), 这里
模式
中不仅可以有多个
通配符
约束, 而且每个
通配符
的约束可以是两个整数, 也可
1.cleanup.policy
字符串要么是“delete”,要么是“compact”,或者两者都是。此配置指定在旧日志段上使用的保留策略。默认策略(“delete”)将在达到保留时间或大小限制时丢弃旧段。“compact”设置将启用
topic
上的日志压缩。服务器提供的默认配置是log.cleanup.policy。
2.compression.type
指定给定
topic
的最终压缩类型。该配置接受标准的压缩编解码器('gzip'、'
kafka
主题管理1.创建主题2.查询主题3.修改主题3.1 修改主题分区3.2修改主题级别参数3.3变更副本数3.4修改主题限速3.5主题分区迁移4.删除主题4.1主题删除失败5.常见问题5.1__consumer_offsets 占用太多的磁盘
kafka
主题(
topic
)的管理(增删改查),使用最多的便是
kafka
自带的脚本。
1.创建主题
kafka
提供的
kafka
-
topic
s 脚...
Kafka
topic
级别的配置参数。首先是可以在配置文件中设置全局性的
topic
配置参数,其次是可以在创建
topic
时使用 –confi设置一个或多个自定义的配置。自定义的配置项优先级别会高于全局配置。
这是在创建一个
topic
时自定义了最大的消息字节数和消息持久化参数:
> bin/
kafka
-
topic
s.sh –zookeeper localhost:2181 –create
### 回答1:
ultrareplace
通配符
是一种用于字符串替换的特殊
模式
。通常,在使用字符串替换功能时,我们会指定一个确定的字符串或
模式
进行替换。而使用 ultrareplace
通配符
,则可以更加强大且灵活地进行替换操作。
通配符
是一种可以代表一个或多个字符的符号。在 ultrareplace 中,我们可以使用以下几种
通配符
:
1. *:匹配任意数量的字符(包括零个字符)。
2. ?:匹配单个字符。
3. [abc]:匹配括号内任意一个字符。例如,[abc]xyz 可以匹配 axyz、bxyz 或 cxyz。
4. [a-z]:匹配指定范围内的字符。例如,[a-z]xyz 可以匹配任意小写字母开头的 xyz。
我们可以通过使用这些
通配符
,来实现更精确的字符串替换。例如,我们要将所有以 "ultra" 开头的字符串替换为 "super",可以使用 "ultra*" 的
通配符
模式
。
这种
通配符
的使用不仅能够简化替换操作的代码,还能够提高程序的灵活性和可扩展性。通过将
通配符
应用于字符串替换中,我们可以根据具体需求来指定替换规则,而不需要针对每个具体的字符串进行独立的替换操作。
总而言之,ultrareplace
通配符
是一种在字符串替换中使用的特殊
模式
,能够通过
通配符
来匹配指定的字符或字符序列,从而实现更强大和灵活的替换功能。通过合理使用
通配符
,我们可以简化代码,提高程序的灵活性和可扩展性。
### 回答2:
ultrareplace 是一种字符串操作
通配符
,可以在字符串中查找一定
模式
的字符或字符串,并进行替换操作。它的使用非常灵活,可以根据不同的需求进行定制化的替换。
ultrareplace
通配符
支持以下几种功能:
1. 替换单个字符:可以使用 "?" 代表单个任意字符,例如 "a?c" 可以匹配 "abc"、"arc" 等,然后进行相应的替换。
2. 替换多个字符:可以使用 "*" 代表任意数量的字符,例如 "ab*" 可以匹配 "abc"、"abcd"、"abcdef" 等,并进行相应的替换。
3. 替换字符串:可以使用 "{ }" 将字符串括起来,表示需要查找和替换的字符串。例如 "{apple}" 可以匹配字符串中的 "apple" 并进行替换操作。
4. 组合使用:可以将不同的
通配符
组合使用,以达到更精确的查找和替换效果。例如 "a?c*" 可以匹配 "abc"、"arc"、"acc" 等,并进行相应的替换。
通过灵活使用 ultrareplace
通配符
,可以方便快捷地实现字符串的替换操作,减少了繁琐的手动查找和替换过程,提高了工作效率。同时,它还可以应用于各种文本处理场景,如批量替换文件中的文字、规范文档中的格式等等。总之,ultrareplace
通配符
是一种方便实用的字符串操作工具。
[Microsoft.BizTalk.Deployment.DeploymentException] Assembly "AutoProcess, Version=1.0.0.0, Culture=n...
Calling an Application Engine from PeopleCode
[Microsoft.BizTalk.Deployment.DeploymentException] Assembly "AutoProcess, Version=1.0.0.0, Culture=n...
Calling an Application Engine from PeopleCode