1. Create a delay queue
package com.example.demo.utils;
import lombok.Data;
import java.util.concurrent.DelayQueue;
/**
* Delay queue
* Need to ensure a single queue
*/
@Data
public class DelayTaskQueue {
private static class Holder{
static DelayQueue<DelayTask> instance = new DelayQueue(); //Singleton guarantees the queue to be unique
}
public static DelayQueue<DelayTask> getInstance() {
return Holder.instance;
}
}
2. Create message tasks (can be done according to your own business)
package com.example.demo.utils;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* Delay task
* Need to implement the Delayed interface. Rewrite getDelay and compareTo to specify the corresponding rules.
*/
@Data
@Accessors(chain = true)//Chain call annotation
public class DelayTask implements Delayed {
private String id;
private Long time;
private Integer type;
@Override
public long getDelay(TimeUnit unit) {
// Calculate how much time is left for the task to expire
long remaining = time - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
// Compare and sort: Sort the delay size of the task and place the task with the smallest delay time at the head of the queue
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
3. Message producer (delivery messages into the queue according to business needs)
package com.example.demo.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.DelayQueue;
/**
* Delivery tasks (message) in services that need to be used to delay queues
*/
@Slf4j
public class DelayTaskProducer {
/**
*
* @param id Business id
* @param time Consumption time Unit: milliseconds
* @param type Business type
*/
public static void delayTask(String id,Long time,Integer type){
DelayQueue<DelayTask> delayQueue = DelayTaskQueue.getInstance();//Create queue 1
DelayTask delayTask = new DelayTask();//Create a task
delayTask.setId(id)
.setTime(time)
.setType(type);
log.info("===================== Enter the delay queue,{}",delayTask);
boolean offer = delayQueue.offer(delayTask);//Task join the team
if(offer){
log.info("===================== Entry to the delay queue successfully, {}",delayQueue);
}else{
log.info("=================== Failed to enter the delay queue");
}
}
}
4. Message consumers (regulate their corresponding business operations)
package com.example.demo.utils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.DelayQueue;
@Data
@Slf4j
@Component
public class DelayTaskConsumer implements CommandLineRunner {
/* @Autowired
private IProPatMeetingService meetingService;
@Autowired
private ParActivityService activityService;*/
@Override
public void run(String ...args) throws Exception {
DelayQueue<DelayTask> delayQueue = DelayTaskQueue.getInstance();//Get the queue of the task with the same put
new Thread(() -> {
while (true) {
try {
// Get expired messages from the head of the delay queue
// If there is no expired message or the queue is empty, the take() method will be blocked until there is an expired message
DelayTask delayTask = delayQueue.take();//block
switch (delayTask.getType()) {//Judge business type and perform corresponding business operations
case 1:
log.info("================================Conference consumption,{}",delayTask.getType());
//ParMeeting meeting = (());
//(meeting,true,null);
break;
case 2:
log.info("=============================== Event registration and consumption, {}",delayTask.getType());
//ParActivity activityApply = (());
//(activityApply, PartyActivityPushMessageType.apply_activity_type.getCode());
break;
case 3:
log.info("=================================== The activity starts to consume, {}",delayTask.getType());
//ParActivity activityStart = (());
//(activityStart,PartyActivityPushMessageType.activity_start_type.getCode());
break;
default:
log.info("================================= No consumption type found, {}",delayTask.getType());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}