Как использовать группы потребителей в Redis Streams

Рошан Кумар - старший менеджер по продукту в Redis Labs.

Redis Streams - это новая структура данных, представленная в Redis 5.0, которая позволяет создавать потоки данных и управлять ими. В предыдущей статье я показал, как добавлять данные в поток и как читать данные разными способами. В этой статье я объясню, как использовать группы потребителей в Redis Streams. Группа потребителей - это способ разделить поток сообщений между несколькими клиентами для ускорения обработки или облегчения нагрузки для более медленных потребителей.

В идеальном мире и производители, и потребители данных работают в одном темпе, и нет потери данных или накопления данных. К сожалению, в реальном мире это не так. Практически во всех случаях использования обработки потоков данных в реальном времени производители и потребители работают с разной скоростью. Кроме того, существует более одного типа потребителей, каждый со своими требованиями и темпами обработки. Redis Streams удовлетворяет эту потребность с помощью набора функций, который в значительной степени ориентирован на поддержку потребителей. Одна из его важнейших характеристик - группа потребителей.

Когда использовать группу потребителей Redis Streams

Цель групп потребителей - масштабировать процесс потребления данных. Рассмотрим один пример - приложение для обработки изображений. Решение требует трех основных компонентов:

  1. Производитель (возможно, одна или несколько камер), который фиксирует и хранит изображения;
  2. Redis Stream, который сохраняет изображения (в хранилище потоковых данных) в порядке их поступления; и
  3. Процессор изображений, обрабатывающий каждое изображение. 
Redis Labs

Предположим, ваш производитель сохраняет 500 изображений в секунду, а процессор изображений обрабатывает только 100 изображений в секунду на полной мощности. Эта разница в скорости создаст отставание, и ваш процессор изображений никогда не сможет его наверстать. Простой способ решить эту проблему - запустить пять процессоров изображений (как показано на рисунке 2), каждый из которых обрабатывает взаимоисключающий набор изображений. Этого можно добиться с помощью группы потребителей, которая позволяет разделить рабочие нагрузки и направить их разным потребителям.

Redis Labs

Группа потребителей делает больше, чем просто разбиение данных - она ​​обеспечивает безопасность данных и обеспечивает аварийное восстановление.

Как работает группа потребителей Redis Streams

Группа потребителей - это структура данных в Redis Stream. Как показано на рисунке 3, группу потребителей можно рассматривать как набор списков. Еще одна вещь, которую следует представить, - это список предметов, которые не потребляются никакими потребителями - для нашего обсуждения, давайте назовем это «неиспользованным списком». Когда данные поступают в поток, они немедленно помещаются в список неиспользованных.

Redis Labs

Группа потребителей ведет отдельный список для каждого потребителя, обычно с прикрепленным приложением. На рисунке 3 наше решение имеет N идентичных приложений (Приложение 1, Приложение 2,…. Приложение n), которые считывают данные через Потребителя 1, Потребителя 2,… Потребителя n соответственно.

Когда приложение считывает данные с помощью команды XREADGROUP, определенные записи данных удаляются из списка неиспользованных и помещаются в список ожидающих записей, принадлежащий соответствующему потребителю. Таким образом, никакие два потребителя не будут потреблять одни и те же данные.

Наконец, когда приложение уведомляет поток с помощью команды XACK, оно удаляет элемент из списка ожидающих записей потребителя.

Теперь, когда я объяснил основы групп потребителей, давайте глубже рассмотрим, как работает этот жизненный цикл данных.

Создание группы потребителей Redis Streams

Вы можете создать новую группу потребителей, используя команду XGROUP CREATE, как показано ниже.

XGROUP СОЗДАТЬ mystream mygroup $ MKSTREAM

Как и в случае с XREAD, знак $ в конце команды указывает потоку доставлять только новые данные с этого момента времени вперед. Альтернативный вариант - 0 или другой идентификатор из записи потока. При использовании 0 поток будет доставить все данные с начала потока.

MKSTREAM создает новый поток, в данном случае mystream, если он еще не существует.

Чтение и управление данными Redis Stream

Предположим, у вас есть поток Redis (mystream), и вы уже создали группу потребителей (mygroup), как показано выше. Теперь вы можете добавлять элементы с именами a, b, c, d, e, как в следующем примере.

XADD mystream * имя a

Выполнение этой команды для имен от a до e заполнит Redis Stream, mystream и неиспользованный список группы потребителей mystream. Это показано на рисунке 4.

Redis Labs

Здесь вы можете видеть, что потребители Алиса и Боб еще не приступили к работе. Приложение A потребляет данные через Алису-потребителя, а приложение B - через Боба.

Использование данных Redis Streams

Команда для чтения данных из группы - XREADGROUP. В нашем примере, когда приложение A начинает обработку данных, оно вызывает потребителя (Алису) для выборки данных, как в:

XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream>

Точно так же приложение B считывает данные через Боба следующим образом:

ГРУППА XREADGROUP mygroup COUNT 2 Боб СТРИМИТ mystream>

Специальный символ> в конце указывает Redis Streams извлекать только те записи данных, которые не доставляются другим потребителям. Также обратите внимание, что никакие два потребителя не будут потреблять одни и те же данные, что приведет к перемещению данных из неиспользованного списка Алисе и Бобу, как показано на рисунке 5.

Redis Labs

Удаление обработанных сообщений из списков ожидающих записей

Данные в списках ожидающих записей ваших потребителей будут оставаться там до тех пор, пока приложение A и приложение B не подтвердят Redis Streams, что они успешно использовали данные. Это делается с помощью команды XACK. Например, приложение A подтвердит следующее после использования d и e с идентификаторами 1526569411111-0 и 1526569411112-0.

XACK mystream mygroup 1526569411111-0 1526569411112-0

Комбинация XREADGROUP и XACK аналогична запуску транзакции и ее фиксации, что обеспечивает безопасность данных. 

После запуска XACK предположим, что приложение A выполнило XREADGROUP, как показано ниже. Теперь структура данных выглядит как на рисунке 6.

XREADGROUP GROUP mygroup COUNT 2 Alice STREAMS mystream>
Redis Labs

Восстановление после сбоев

Если приложение B завершилось из-за сбоя при обработке b и c, то структура данных будет выглядеть, как на рисунке 7.

Redis Labs

Осталось два варианта:

1. Перезапустите приложение B и перезагрузите данные от потребителя (Боба).

В этом случае приложение B должно считывать данные от вашего потребителя (Боба) с помощью команды XREADGROUP, но с одним отличием. Вместо> в конце приложение B передаст 0 (или идентификатор ниже, чем у предыдущей обработанной записи данных). Помните, что> отправляет потребителю новые данные из списка неиспользованных.

ГРУППА XREADGROUP mygroup COUNT 2 Боб ПОТОКИ mystream 0

Приведенная выше команда получит записи данных, которые уже сохранены в списке для потребителя Боба. Он не будет получать новые данные из неиспользованного списка. Приложение B могло перебирать все данные в потребителе Бобе перед тем, как получить новые данные.

2. Заставьте Алису потребовать все данные у Боба и обработать их через приложение A.

Это особенно полезно, если вы не можете восстановить приложение B из-за сбоя узла, диска или сети. В таких случаях любой другой потребитель (например, Алиса) может потребовать данные Боба и продолжить обработку этих данных, предотвращая, таким образом, простои службы. Чтобы запросить данные Боба, вы должны выполнить два набора команд:

XPENDING mystream mygroup - +10 Боб

Это приведет к получению всех ожидающих записей данных для Боба. Опции - и + выбирают весь диапазон. Если b и c имели идентификаторы 1526569411113-0 и 1526569411114-0 соответственно, команда, которая переместит данные Боба в Алису, выглядит следующим образом:

XCLAIM mystream mygroup Алиса 0 1526569411113-0 1526569411114-0

Группы потребителей поддерживают текущие часы для данных в списке потребления. Например, когда приложение B читает b, часы включаются, пока Боб не получит ACK. С помощью параметра времени в команде XCLAIM вы можете указать группе потребителей перемещать только данные, которые находятся в режиме ожидания дольше указанного времени. Вы также можете игнорировать это, передав 0, как показано в примере выше. Результат этих команд показан на рисунке 8. XCLAIM также может пригодиться, когда один из ваших процессоров-потребителей работает медленно, что приводит к накоплению необработанных данных.

Redis Labs

В предыдущей статье мы рассмотрели основы использования Redis Streams. В этой статье мы немного углубились и объяснили, когда использовать группы потребителей и как они работают. Группы потребителей в Redis Streams уменьшают вашу нагрузку, когда дело доходит до управления разделами данных, их жизненными циклами и безопасностью данных. Кроме того, возможности горизонтального масштабирования групп потребителей могут принести пользу многим приложениям реального времени.

В предстоящей третьей статье о Redis Streams я продемонстрирую, как разработать приложение классификации в реальном времени с использованием Redis Streams и Lettuce, библиотеки с открытым исходным кодом на основе Java для Redis. Между тем, вы можете узнать больше, изучив руководство по Redis Streams на веб-сайте проекта Redis. 

Рошан Кумар - старший менеджер по продукту в  Redis Labs . Он имеет большой опыт в разработке программного обеспечения и технологическом маркетинге. Рошан работал в Hewlett-Packard и во многих успешных стартапах Кремниевой долины, включая ZillionTV, Salorix, Alopa и ActiveVideo. Как увлеченный программист, он спроектировал и разработал mindzeal.com, онлайн-платформу, на которой проходят курсы компьютерного программирования для молодых студентов. Рошан имеет степень бакалавра компьютерных наук и степень магистра делового администрирования Университета Санта-Клары.

-

Форум новых технологий предоставляет площадку для изучения и обсуждения новых корпоративных технологий с беспрецедентной глубины и широты. Выбор носит субъективный характер и основан на нашем выборе технологий, которые мы считаем важными и представляющими наибольший интерес для читателей. не принимает маркетинговые материалы для публикации и оставляет за собой право редактировать весь предоставленный контент. Все запросы отправляйте на  [email protected] .