gogoWebsite

springBoot's delay queue

Updated to 1 day ago

1. Create a delay queue

package com.example.demo.utils;
import lombok.Data;
import java.util.concurrent.DelayQueue;
/**
  * Delay queue
  * Need to ensure a single queue
  */
@Data
public class DelayTaskQueue {
    private static class Holder{
         static DelayQueue<DelayTask> instance = new DelayQueue(); //Singleton guarantees the queue to be unique
    }
    public static DelayQueue<DelayTask> getInstance() {
        return Holder.instance;
    }
}

2. Create message tasks (can be done according to your own business)

package com.example.demo.utils;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
  * Delay task
  * Need to implement the Delayed interface. Rewrite getDelay and compareTo to specify the corresponding rules.
  */
@Data
@Accessors(chain = true)//Chain call annotation
public class DelayTask implements Delayed {
    private String id;
    private Long time;
    private Integer type;
    @Override
    public long getDelay(TimeUnit unit) {
        // Calculate how much time is left for the task to expire
        long remaining = time - System.currentTimeMillis();
        return unit.convert(remaining, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        // Compare and sort: Sort the delay size of the task and place the task with the smallest delay time at the head of the queue
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

3. Message producer (delivery messages into the queue according to business needs)

package com.example.demo.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.DelayQueue;
/**
  * Delivery tasks (message) in services that need to be used to delay queues
  */
@Slf4j
public class DelayTaskProducer {
    /**
      *
      * @param id Business id
      * @param time Consumption time Unit: milliseconds
      * @param type Business type
      */
    public static void delayTask(String id,Long time,Integer type){
        DelayQueue<DelayTask> delayQueue = DelayTaskQueue.getInstance();//Create queue 1
        DelayTask delayTask = new DelayTask();//Create a task
        delayTask.setId(id)
                .setTime(time)
                .setType(type);
        log.info("===================== Enter the delay queue,{}",delayTask);
        boolean offer = delayQueue.offer(delayTask);//Task join the team
        if(offer){
            log.info("===================== Entry to the delay queue successfully, {}",delayQueue);
        }else{
            log.info("=================== Failed to enter the delay queue");
        }
    }
}

4. Message consumers (regulate their corresponding business operations)

package com.example.demo.utils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.DelayQueue;
@Data
@Slf4j
@Component
public class DelayTaskConsumer implements CommandLineRunner {
  /*  @Autowired
    private IProPatMeetingService meetingService;
    @Autowired
    private ParActivityService activityService;*/
    @Override
    public void run(String ...args) throws Exception {
        DelayQueue<DelayTask> delayQueue = DelayTaskQueue.getInstance();//Get the queue of the task with the same put
        new Thread(() -> {
            while (true) {
                try {
                    // Get expired messages from the head of the delay queue
                    // If there is no expired message or the queue is empty, the take() method will be blocked until there is an expired message
                    DelayTask delayTask = delayQueue.take();//block
                    switch (delayTask.getType()) {//Judge business type and perform corresponding business operations
                        case 1:
                            log.info("================================Conference consumption,{}",delayTask.getType());
                            //ParMeeting meeting = (());
                            //(meeting,true,null);
                            break;
                        case 2:
                            log.info("=============================== Event registration and consumption, {}",delayTask.getType());
                            //ParActivity activityApply = (());
                            //(activityApply, PartyActivityPushMessageType.apply_activity_type.getCode());
                            break;
                        case 3:
                            log.info("=================================== The activity starts to consume, {}",delayTask.getType());
                            //ParActivity activityStart = (());
                            //(activityStart,PartyActivityPushMessageType.activity_start_type.getCode());
                            break;
                        default:
                            log.info("================================= No consumption type found, {}",delayTask.getType());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}