
## 描述
    具有优先级的请求被服务优先处理。

## 背景和问题
    消息队列有时候要求某些紧急请求需要快速处理。

## 解决方案
    基于优先队列的消息队列。

1. 具有优先级的独立队列模式。
    应用程序发布一条具有优先级的消息，队列中的消息都会自动重新排序，高优先级的消息会自动排在前面，会被优先消费。要注意的是，具有非常低的优先级的消息可能永远不会被处理。
    ![liaoliaophp 优先级独立队列模式](https://cdn.learnku.com/uploads/images/202202/02/25292/uk9ddlUTEB.png!large)

2. 每个优先级拥有至少一个独立队列的多队列模式。
        每个队列都有单独的消费者池（多个消费者）。高优先级队列比低优先级队列有更好的硬件，更多的消费者。在这个模式下，低优先级的请求总会被处理，只是比高优先级的处理速度慢而已。
    ![liaoliaophp 多队列模式](https://cdn.learnku.com/uploads/images/202202/02/25292/8L3Yk3E1eE.png!large)



## 优点
1. 提供了优先业务，可以对不同客户提供不同级别的业务。
2. 最大限度降低运营成本。
3. 最大限度地提高应用程序的性能和可扩展性。重要的任务可以优先运行，而不太重要的可以延后执行。


## 注意事项
1. 先定义好什么是优先级的标准。例如信息需要在10秒内处理完毕。基于这个标准再做资源的规划和分配。
2. 确定好任务机制。那些请求是最高级，那些请求是最低级，有没有可抢先或者可暂停的任务类型。
3. 监控所有队列的速度，确保队列的速度是符合预期的。
4. 消息的优先级是由系统业务决定的。
5. 查询队列中的消息，要控制成本，尤其是查询多个队列的消息的时候。
6. 在多队列模式中，所有的队列都是需要监控的。
7. 在设计消费者的消费模式的时候，一定要支持动态调整(增加或者减少消费者)。


## 何时使用
1. 不同用户对应不同的优先级任务。
2. 系统中具有不同侧重点的任务。


## 可用到的设计模式思维
    一个队列里面有多个元素，在形式上符合迭代器模式，但是实际业务上不适用使用。



## 结构中包含的角色
1. AbstractQueue 抽象队列
2. ConcreteQueue 具体队列
3. Message       消息
4. Consumer      消费者
 
## 最小可表达代码
    // 抽象队列
    abstract class AbstractQueue
    {
        protected $initMaxPriority = 100; // 初始化最大优先级
        protected $initMinPriority = 0; // 初始化最小优先级
        
        protected $currentOffsetPriority; // 当前偏移的优先级
        protected $counter; // 计数器
        protected $messages = [];

        public function __construct()
        {
            $this->currentOffsetPriority = $this->initMaxPriority;

            $this->counter = $this->initMinPriority;
        }

        // 消息入队列
        public function push(Message $message)
        {
            $priority = $this->rewritePriority($message->getPriority());

            $this->reloadOffsetPriority($priority);

            $this->messages[$priority][] = $message;
        }

        // 消息出队列
        public function pop()
        {
            for ($priority = $this->getCurrentOffsetPriority(); $priority >= 0; $priority--) {
                if (empty($this->messages[$priority])) {
                    continue;
                }
                
                $message = array_shift($this->messages[$priority]);

                if (empty($this->messages[$priority])) {
                    unset($this->messages[$priority]);
                }

                return $message;   
            }

            return null;
        }

        protected function rewritePriority($priority)
        {
            $priority = $priority < $this->initMinPriority ? $this->initMinPriority : $priority;

            $priority = $priority > $this->initMaxPriority ? $this->initMaxPriority : $priority;

            return $priority;
        }

        // 这里线程不安全，需要自行加锁
        protected function reloadOffsetPriority(int $priority)
        {
            // 判断当前插入的优先级是否比现在执行的优先级要高
            if ($priority > $this->currentOffsetPriority) {
                $this->currentOffsetPriority = $priority;
            }

            // 设置计数器，防止并发引起的优先级错乱 
            $this->counter++;
            if ($this->initMaxPriority == $this->counter) {
                $this->currentOffsetPriority = $this->initMaxPriority;
                $this->counter = $this->initMinPriority;;
            }
        }

        protected function getCurrentOffsetPriority() : int 
        {
            return $this->currentOffsetPriority;
        }
    }

    // 消息
    class Message
    {
        private $priority;

        public function __construct(int $priority)
        {
            $this->priority = $priority;
        }

        public function getPriority()
        {
            return $this->priority;
        }
    }

    // 具体队列
    class ConcreteQueue extends AbstractQueue {}

    // 消费者
    class Consumer
    {
        protected $queue;

        public function __construct(AbstractQueue $queue)
        {
            $this->queue = $queue;
        }

        public function execute()
        {
            while($message = $this->queue->pop()) {
                var_dump($message);
            }
        }
    }

    $queue = new ConcreteQueue();
    $queue->push(new Message(88));
    $queue->push(new Message(11));
    $queue->push(new Message(66));
    (new Consumer($queue))->execute();