PHP实现延时队列和处理超时订单
今天小编就为大家带来一篇有关PHP实现延时队列和处理超时订单的文章。小编觉得挺实用的,为此分享给大家做个参考。一起跟随小编过来看看吧。

我们提供的服务有:网站建设、成都网站制作、微信公众号开发、网站优化、网站认证、赫章ssl等。为上千家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的赫章网站制作公司
延时队列
Delayproducer.Php
Amqpbuilder.Php
AmqpBuilder.php
arguments = array_merge($this->arguments, $arguments);
return $this;
}
/**
* 设置延时队列相关参数
*
* @param string $queueName
* @param int $xMessageTtl
* @param string $xDeadLetterExchange
* @param string $xDeadLetterRoutingKey
*
* @return $this
*/
public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
{
$this->setArguments([
'x-message-ttl' => ['I', $xMessageTtl * 1000], // 毫秒
'x-dead-letter-exchange' => ['S', $xDeadLetterExchange],
'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
]);
$this->setQueue($queueName);
return $this;
}
}DelayProducer.php
produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
});
}
/**
* @param ProducerMessageInterface $producerMessage
* @param AmqpBuilder $queueBuilder
* @param bool $confirm
* @param int $timeout
*
* @return bool
* @throws \Throwable
*/
private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
{
$result = false;
$this->injectMessageProperty($producerMessage);
$message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
$pool = $this->getConnectionPool($producerMessage->getPoolName());
/** @var \Hyperf\Amqp\Connection $connection */
$connection = $pool->get();
if ($confirm) {
$channel = $connection->getConfirmChannel();
} else {
$channel = $connection->getChannel();
}
$channel->set_ack_handler(function () use (&$result)
{
$result = true;
});
try {
// 处理延时队列
$exchangeBuilder = $producerMessage->getExchangeBuilder();
// 队列定义
$channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
// 路由定义
$channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
// 队列绑定
$channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
// 消息发送
$channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
$channel->wait_for_pending_acks_returns($timeout);
} catch (Throwable $exception) {
// Reconnect the connection before release.
$connection->reconnect();
throw $exception;
}
finally {
$connection->release();
}
return $confirm ? $result : true;
}
/**
* @param ProducerMessageInterface $producerMessage
*/
private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
{
if (class_exists(AnnotationCollector::class)) {
/** @var \Hyperf\Amqp\Annotation\Producer $annotation */
$annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
if ($annotation) {
$annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
$annotation->exchange && $producerMessage->setExchange($annotation->exchange);
}
}
}
}处理超时订单
Orderqueueconsumer.Php
Orderqueueproducer.Php
Orderqueueproducer.php
payload = $data;
}
public function getExchangeBuilder() : ExchangeBuilder
{
return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
}
}Orderqueueconsumer.php
Demo
$builder = new AmqpBuilder(); $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route'); $que = ApplicationContext::getContainer()->get(DelayProducer::class); var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))以上就是PHP实现延时队列和处理超时订单的代码展示,如果在日常工作遇到这个问题,希望你能通过这篇文章解决问题。如果想了解更多相关内容,欢迎关注创新互联行业资讯频道!
网站标题:PHP实现延时队列和处理超时订单
网址分享:http://www.jxjierui.cn/article/gcpdge.html


咨询
建站咨询
