背景:公司产品需要做微信服务,了解的人都知道微信粉丝有交互就会给第三方平台推送事件消息。也就是说产品管理的公众号越多,公众号的粉丝越多,那么平台需要处理的事件消息也就会越频繁。于是想到用MQ解决压力问题。自己搭建MQ维护成本又高,自然而然的想到阿里云。
具体的详细介绍,官方帮助都有说明:
https://help.aliyun.com/document_detail/29532.html?spm=5176.doc29532.6.539.rompTV
接入方式有三种:TCP、HTTP、MQTT。考虑到实时性,我采用的是TCP普通消息接入。
然后创建 Topic 创建、Producer ID、创建 Consumer ID、创建阿里云 AccessKey,SecretKey。
接下来就是按照DEMO引入JAR包,进行开发,测试,上线。(没有很好的测试条件,没有进行压测)
上线后发现过一段时间服务器CPU使用率就会暴增,直至无响应。重启后情况依旧,郁闷了。。。。。。
经过查看堆栈等一系列跟踪排错,发现有很多thread在进行空等待,类似于下面的信息
"ConsumeMessageThread_13" daemon prio=10 tid=0x00007ff57c018000 nid=0x1fdf waiting on condition [0x00007ff52d8d7000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007035c4f30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
利用Jmeter进行模拟发现,每一个消息生产、消费的过程,都要产生10多个thread,线程数在无限增长,JVM也不进行回收,直至CPU无响应。
产生原因:由于轻信了DEMO中的producer的shutdown()方法
// 在应用退出前,销毁Producer对象<br>
// 注意:如果不销毁也没有问题
producer.shutdown();
所以每当生产一个消息的时候,我就会新创建一个producer实例。而每个producer实例又会产生多个thread,数量在5-20左右(取决于自己的配置情况)。虽然调用了shutdown()方法,但是实际上,producer实例和其产生的thread并没有回收掉,thread都将进入上面描述的空等待状态。
解决方法:其实很简单,在创建Producer实例发送消息的时候,使用单例模式。(撞墙,万万没想到啊)
转载于:https://my.oschina.net/u/265896/blog/827307
背景:公司产品需要做微信服务,了解的人都知道微信粉丝有交互就会给第三方平台推送事件消息。也就是说产品管理的公众号越多,公众号的粉丝越多,那么平台需要处理的事件消息也就会越频繁。于是想到用MQ解决压力问题。自己搭建MQ维护成本又高,自然而然的想到阿里云。 具体的详细介绍,官方帮助都有说明: ...
1:问题现象:
运行的instance一段时间(20h)就下降,重启之后消费正常然后又不行了;原以为是ons版本1.2.7改成laest1.7.7.final;没效果;经验之觉:肯定是代码没优化好:
处理流程一:单纯以为应该是gc没做好;有big Object ;./jmap发现了MsgContent;查project
使用
ConcurrentHashMap<String ,MsgCon...
原文:https://www.
java
tang.com/archives/2017/10/26/08572060.html
我们在上篇文章中详细描述了Thread Dump中Native Thread和JVM Thread线程的各种状态及描述,今天总结分析的一些原则,并详细列举一些案例进行说明。
CPU占用率很高,响应很慢
按照《
Java
内存泄漏分析系列之一:
使用
jst...
坑
1:possibly caused by authentication failure rabbitmq错误
参考:Access Control (Authentication, Authorisation) in RabbitMQ
默认的Virtual Host和User
virtual host /
user guest pass...
如果发现大量的线程总是处于runnable状态,且堆栈信息中包含类似HttpClientUtil.doGet的信息,且有可能是因为http请求处理慢,导致大量线程被占用,消费能力不足导致消息堆积。解决思路,优优http请求,如设置较短的过期时间等。
如果发现大量的消费线程处于WAITING(parking)状态,
MQTT(Message Queuing Telemetry Transport)是一个轻量级的消息传输协议,它主要用于物联网和移动应用中的设备间通信。云
消息队列
(CMQ)是腾讯云提供的一种
消息队列
服务,支持MQTT协议。
下面是
使用
MQTT版云
消息队列
的教程:
1. 登录腾讯云控制台,进入CMQ服务页面。
2. 创建一个
消息队列
主题,选择MQTT作为协议类型,设置消息过期时间、消息最大长度等参数。
3. 开发MQTT客户端程序,连接到云
消息队列
主题。
4. 发布消息到云
消息队列
主题。
5. 订阅云
消息队列
主题,接收消息。
6. 处理接收到的消息。
下面是一个示例程序(
使用
Python语言):
```python
import paho.mqtt.client as mqtt
# 连接到云
消息队列
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("topic/test")
# 接收到消息
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
# 发布消息
client = mqtt.Client()
client.connect("localhost", 1883, 60)
client.publish("topic/test", "Hello, world!")
# 订阅消息
client.on_connect = on_connect
client.on_message = on_message
client.loop_forever()
在这个示例程序中,我们
使用
了paho-mqtt库来开发MQTT客户端程序。首先,我们连接到云
消息队列
主题,并订阅了一个名为“topic/test”的主题。然后,我们发布了一条消息到这个主题。最后,我们
使用
循环来接收和处理接收到的消息。
注意,在实际
使用
中,你需要将连接参数和主题名称替换为你自己的。另外,你还需要在腾讯云控制台中设置ACL(访问控制列表)来限制客户端的访问权限。
希望这个教程对你有帮助!