Повторная обработка сообщений в RabbitMQ с различным временным интервалом ожидания между повторами

28 декабря 2017
RabbitMQ DLX

На практике бывают случаи, когда при сбоях выполнения задач, между попытками повторного выполнения, необходимо подождать какое-то время. Например, в процессе выполнения задачи внешняя система была недоступна. Так как количество повторов, как правило, лимитировано, то нет смысла тут же пробовать выполнить задачу еще раз. Внешняя система может стать доступной, например, через минуту. Я уже касался возможности установки временного интервала между попытками обработки сообщений в статье «Повторное выполнение задач в RabbitMQ». Однако, бывают случаи, когда необходимо после каждой неудачной попытки изменять временной интервал перед следующим повтором. В этой статье усложним функционал повторного выполнения задач, добавив возможность установки различного временного интервала между попытками обработки сообщений.

Попробуем модифицировать решение 4, которое обеспечивает следующая схема. На входе имеем точку обмена WorkExchange, которая маршрутизирует сообщения в очередь WorkQueue. Сообщения из этой очереди принимает и обрабатывает соответствующий подписчик. В случае успешной обработки, сообщение удаляется из очереди. Это реализуется за счет подтверждения подписчиком успешной обработки сообщения (ack). Если же в процессе обработки сообщения произошла ошибка, слушатель может отклонить (reject) сообщение с указанием requeue=false, в результате чего сообщение попадет в точку доступа RetryExchange, заданной в качестве DLX для очереди WorkQueue. Далее сообщение маршрутизируется в очередь RetryQueue, объявленную с параметром x-message-ttl. По достижению заданного в очереди временного лимита (TTL) сообщение, минуя исходную точку обмена WorkExchange, снова попадает в очередь WorkQueue, откуда его забирает слушатель, чтобы попробовать обработать сообщение еще раз. Этот цикл можно повторять до достижения заданного в слушателе лимита на количество повторов.

Первым делом в голову приходит мысль о создании нескольких очередей с различными параметрами x-message-ttl для получения сообщений из точки обмена RetryExchange. Для этого нужно как-то реализовать маршрутизацию в точке обмена RetryExchange, чтобы в зависимости от текущего количества неудачных попыток, сообщение попадало в очередь с соответствующим временным лимитом. Рассмотрим возможность использования различных типов точек обмена, существующих в RabbitMQ. Routing key у всех сообщений одинаковый, поэтому типы direct и topic для точки доступа RetryExchange не подойдут. При отклонении сообщения слушателем можно поменять routing key, но это будет происходить для всех отклоненных в этой очереди сообщений, что не позволит маршрутизировать сообщение в необходимую очередь. Очевидно, что тип fanout тоже не подходит, так как разветвление в данном случае не имеет смысла, нам же нужна маршрутизация в очередь, имеющую требуемый TTL. А вот на типе headers можно попробовать остановиться. Дело в том, что сообщения, попадающие в точку обмена DLX, имеют специальный заголовок x-death, содержащий в себе список словарей, которые содержат информацию о количестве попаданий в различные точки обмена DLX при разных условиях. Пример заголовка x-death показан ниже. Стоит обратить внимание на то, что свойство count появилось в версии 3.5.1. В более ранних версиях на каждый случай создается отдельная запись.

x-death:        count: 4
             exchange: RetryExchange
                queue: RetryQueue
               reason: expired
         routing-keys: WorkQueue
                 time: 1510583141
                 
                count: 3
             exchange: WorkExchange
                queue: WorkQueue
               reason: rejected
         routing-keys: WorkQueue
                 time: 1510583141

Видно, что вся информация имеется, но в RabbitMQ нет возможности связать очередь с точкой обмена по вложенным заголовкам, например, по правилу x-death[1].count = 3, как было бы удобно в нашем случае. Поэтому тип headers для точки обмена RetryExchange тоже не подходит. Таким образом, вынуждены отказаться от идеи добавления нескольких очередей для RetryExchange, потому что в RabbitMQ на данный момент отсутствует возможность реализации соответствующей маршрутизации сообщений в точке обмена существующих типов.

Идем дальше. Как вариант, можно попробовать создать цепочку dead-letter очередей с различными параметрами x-message-ttl, как показано на схеме ниже. В отличии от предыдущего случая очереди с разными временными лимитами расположены не параллельно, а последовательно. После каждой dead-letter очереди, сообщение, минуя соответствующую точку обмена DLX , попадает в новую очередь задач. Подписчик в данном случае берет сообщения из всех этих очередей.

Схема с подписчиком dead-letter очереди

Эта схема будет работать, но выглядит довольно громоздкой. Очевидно, что размер схемы зависит от количества попыток. Так, если лимит количества попыток равен 10, нужно создать 18 точек обмена и 19 очередей, а слушателю нужно быть подписанным на 10 очередей. Также к недостаткам этого решения можно отнести сложность переконфигурирования TTL для одной или нескольких очередей. Так как x-message-ttl задается при декларировании очереди, необходимо удалить и создать очереди заново. При этом нужно обеспечить сохранность сообщений, находящихся в них в данный момент времени.

Следующая мысль, которая приходит в голову — создание подписчика для dead-letter очереди, который периодически забирает сообщения и выполняет reject либо с requeue=true, либо с requeue=false. Если передать параметр requeue=true, сообщение останется в dead-letter очереди до момента, когда подписчик данной очереди не получит его в следующий раз. В случае же, когда передается параметр requeue=false, сообщение попадет в точку обмена WorkExchange, откуда снова направится в очередь WorkQueue для следующей попытки. Ниже представлена схема описанной реализации.

Схема с подписчиком dead-letter очереди

В данном случае подписчик сам понимает, нужно ли отправлять сообщение на повторное выполнение, например, в зависимости от параметров count и time соответствующего подзаголовка x-death сообщения. В отличие от предыдущего способа, эта схема не является громоздкой и ее размер не зависит от лимита на количество повторов. Также нет необходимости пересоздавать очереди при переконфигурировании времени ожидания. Само время ожидания между повторами может формироваться по определенному закону, например: 2 * count. Это может быть удобным в некоторых случаях.