高并发实现之BlockingQueue阻塞队列+@PostConstruct注解

基于典型的生产者-消费者方案。 (BlockingQueue)可以安全地与多个生产者和多个消费者一起使用 生产者负责数据的生产,而消费者则 数据的消费。

如图所示 工人负责生产数据,  超市就像是消费模式   消费者从工厂获取商品  生产者和消费者之间存在一定比例。 它另外支持以下操作:在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用。但还支持{@link java.util.Collection}接口。

作者重点使用 LinkedBlockingQueue

1.基于链表的阻塞队列,主要的核心方法如下,这些方法都支持中断,其中put方法和take方法我们都很熟悉的了,另外offer和poll两个方法其实对应的是put和take两个方法提供了阻塞超时机制

2.LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量

@PostConstruct

PostConstruct批注用于需要依赖注入完成以执行任何初始化之后要执行的方法上。在类投入使用之前必须调用此方法。所有支持依赖注入的类都必须支持该释。即使该类不要求注入任何资源,也必须调用用PostConstruct注释的方法。此注释只能注释一种方法。应用PostConstruct批注的方法必须满足以下所有条件:

1.除了拦截器的情况外,该方法不得具有任何参数,在这种情况下,该方法将采用Interceptors规范定义的InvocationContext对象。2在拦截器类上定义的方法必须具有以下签名之一

2.1void <METHOD>(InvocationContext) 2.2 Object <METHOD>(InvocationContext) throws Exception

注意:PostConstruct拦截器方法不得引发应用程序异常,但如果除生命周期事件之外,同一拦截器方法还对业务或超时方法进行了干预,则可以声明它引发已检查的异常,包括java.lang.Exception。如果PostConstruct拦截器方法返回值,则容器将忽略它

3.在非拦截器类上定义的方法必须具有以下签名:

void <METHOD&#062

4.应用PostConstruct的方法可以是公共的,受保护的,私有的或私有的

5.除了应用程序客户端,该方法一定不能是静态的

6.方法可能是最终的

7.如果该方法抛出未经检查的异常,则该类不得投入使用,除非是在EJB可以处理异常甚至从异常中恢复的EJB的情况下。

以上是源码的翻译

总结

1.因为当调用构造函数时,bean还没有初始化-即没有注入依赖项。在@PostConstruct方法完全初始化bean,您可以使用依赖项。因为这是保证在bean生命周期中只调用一次此方法的契约。一个bean可能会在其内部工作过程中被容器多次实例化,但它保证@PostConstruct只会被调用一次。

2.要在依赖加载后,对象使用前执行,而且只执行一次

实现来了~~~~ 上代码

在Service编写

void recordJob(User job);

void delJob(User job);

@PostConstruct

void init();

@PostConstruct

void delInit();


在ServiceImpl中写

//定义一个容量为10000的阻塞队列,BlockingQueue线程安全可以多个生产者同时put

private BlockingQueuedataQueue =new LinkedBlockingQueue(10000);

//定义一个容量为10000的阻塞队列,BlockingQueue线程安全可以多个生产者同时put

private BlockingQueuedelDataQueue =new LinkedBlockingQueue(10000);

private Listlist =new ArrayList();

private ListdelList =new ArrayList();

//put任务的方法,供生产者调用

@Override

public void recordJob(User job) {

try {

dataQueue.put(job);

}catch (InterruptedException e) {

log.info("dataQueue,批量更新Job入队列异常");

Thread.currentThread().interrupt();

@Override

public void delJob(User job) {

try {

log.info("delDataQueue,批量删除Job入队列成功!Job -->  Id  :" + job.getUserId());

delDataQueue.put(job.getUserId().longValue());

}catch (InterruptedException e) {

log.info("delDataQueue,批量删除Job入队列异常");

Thread.currentThread().interrupt();

@PostConstruct

@Override

public void init() {

Thread thread =new Thread(() -> {

log.info("启动批量(新增或者更新)守护线程,启动时间{}", new Date(System.currentTimeMillis()));

while (Boolean.TRUE) {

User poll =null;

boolean pollTimeOut =false;

long startTime;

long endTime;

try {

// poll时设置超时时间为2秒

poll =dataQueue.poll(2, TimeUnit.SECONDS);

}catch (InterruptedException e) {

log.info("批量更新Job异常");

Thread.currentThread().interrupt();

if (null != poll) {

// poll到任务添加到List中

list.add(poll);

}else {

// poll超时,设置超时标志位

pollTimeOut =true;

// 如果任务List等于5000或poll超时且List中还有任务就批量更新

if (list.size() ==300 ||

(pollTimeOut && !CollectionUtils.isEmpty(list))) {

startTime = System.currentTimeMillis();

saveOrUpdateBatch(list);

log.info("Job任务批量更新{}条任务,耗时{}毫秒", list.size(),

System.currentTimeMillis() - startTime);

list.clear();

@PostConstruct

@Override

public void delInit() {

Thread thread =new Thread(() -> {

log.info("启动批量删除守护线程,启动时间{}", new Date(System.currentTimeMillis()));

while (Boolean.TRUE) {

Long poll =null;

boolean pollTimeOut =false;

long startTime;

long endTime;

try {

// poll时设置超时时间为2秒

poll =delDataQueue.poll(2, TimeUnit.SECONDS);

}catch (InterruptedException e) {

log.info("批量删除Job异常");

Thread.currentThread().interrupt();

if (null != poll) {

// poll到任务添加到List中

delList.add(poll);

}else {

// poll超时,设置超时标志位

pollTimeOut =true;

// 如果任务List等于5000或poll超时且List中还有任务就批量更新

if (delList.size() ==300 ||

(pollTimeOut && !CollectionUtils.isEmpty(delList))) {

startTime = System.currentTimeMillis();

removeByIds(delList);

log.info("Job任务批量删除{}条任务,耗时{}毫秒", delList.size(),

System.currentTimeMillis() - startTime);

delList.clear();