Сервер очередей

В качестве сервера очередей (и получателя событий) используется Apache Kafka + Zookeeper.

Apache Kafka и Zookeeper уже установлены

Если у вас уже установлены Apache Kafka и Zookeeper (например, при развертывании Хранилища телеметрии), то дополнительные действия не требуются.

Apache Kafka и Zookeeper не установлены

Если у вас отсутствуют Apache Kafka и Zookeeper, выполните действия по установке и настройке ниже.

Порядок установки

Чтобы установить Apache Kafka и Zookeeper:

  1. Установите openjdk (минимум java-1.8.0-openjdk):

    • Пример для DEB:

      apt update apt -y install openjdk-8-jdk
    • Пример для RPM:

      yum update yum install java-1.8.0-openjdk
  2. Проверьте версию с помощью команды:

    java -version

    Результат выполнения:

    openjdk version "1.8.0_382"
  3. Создайте пользователя Apache Kafka:

    • Пример для DEB:

      useradd kafka -m passwd kafka usermod -a -G sudo kafka
    • Пример для RPM:

      useradd kafka -m passwd kafka usermod -aG wheel kafka
  4. Установите дистрибутив Apache Kafka в директорию.

    В примере ниже производится установка версии 3.8.1 в /opt/kafka:

    mkdir /opt/downloads curl "https://archive.apache.org/dist/kafka/3.8.1/kafka_2.13-3.8.1.tgz" -o /opt/downloads/kafka.tgz mkdir /opt/kafka && cd /opt/kafka tar -xvzf /opt/downloads/kafka.tgz --strip 1 chown -R kafka:kafka /opt/kafka
  5. На каждом сервере Kafka создайте каталоги для хранения данных:

    #Для хранения логов kafka можно использовать любую другую директорию mkdir -p /opt/kafka/kafka_log_dir chown -R kafka:kafka /opt/kafka/kafka_log_dir

    Примечание: на сервере Kafka должен быть создан каталог для хранения данных очередей kafka_log_dir. Свободное место на диске должно соответствовать параметрам политики хранения данных (см. log.retention.bytes в /opt/kafka/config/server.properties) — по умолчанию хранятся последние 10 ГБ .

  6. Откройте и заполните конфигурационный файл: /opt/kafka/config/server.properties.

    Используйте параметры:

    #Уникальный идентификатор брокера Kafka broker.id=0 #node_N — адресс сервера kafka advertised.listeners=PLAINTEXT://{{node_N}}:9092 #Количество сетевых потоков, используемых сервером Kafka для обработки запросов клиентов num.network.threads=3 #Количество потоков ввода-вывода, используемых сервером Kafka для обработки запросов клиентов num.io.threads=8 #Размер буфера для отправки данных через сокет socket.send.buffer.bytes=102400 #Размер буфера для получения данных через сокет socket.receive.buffer.bytes=102400 #Максимальный размер запроса, который может быть обработан сервером Kafka socket.request.max.bytes=104857600 #Путь директории логов, созданной на предыдущем шаге log.dirs=/opt/kafka/kafka_log_dir #Количество секций, которые будут созданы для новых топиков по умолчанию num.partitions=1 #Количество потоков, используемых для восстановления данных на диске для каждого каталога данных num.recovery.threads.per.data.dir=1 #Коэффициент репликации для топиков смещений offsets.topic.replication.factor=1 #Коэффициент репликации для журнала состояния транзакций transaction.state.log.replication.factor=1 #Минимальное количество реплик, которые должны быть в статусе "в сети" (ISR) для подтверждения транзакции transaction.state.log.min.isr=1 #Количество сообщений, которые должны быть записаны на диск перед принудительным сбросом #log.flush.interval.messages=1000 #Время в миллисекундах, после которого брокер Kafka будет принудительно записывать данные на диск, независимо от количества неотправленных сообщений #log.flush.interval.ms=100. #Время хранения сообщений в топике в часах log.retention.hours=168 #Максимальный размер логов в байтах log.retention.bytes=10737418240 #Размер сегмента логов в байтах log.segment.bytes=1073741824 #Интервал проверки времени хранения сообщений в топике в миллисекундах log.retention.check.interval.ms=300000 #node - адрес сервера zookeeper zookeeper.connect={{node}}:2181 #Время ожидания подключения к ZooKeeper в миллисекундах zookeeper.connection.timeout.ms=6000 #Определяет задержку в миллисекундах перед началом балансировки группы потребителей, если оставить 0, то задержки балансировщика после старта сервиса не будет group.initial.rebalance.delay.ms=0 #Флаг, указывающий, разрешено ли удаление топиков delete.topic.enable=true # Флаг, разрешающий создание топиков сторонними приложениями auto.create.topics.enable=true
  7. Создайте kafka service unit в файле: /etc/systemd/system/kafka.service.

    Используйте параметры:

    [Unit] After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties' ExecStop=/opt/kafka/bin/kafka-server-stop.sh Environment="KAFKA_HEAP_OPTS=-Xmx8192M -Xms8192M" #По умолчанию на каждый брокер Kafka выделяется 8 ГБ оперативной памяти. #В случае, если для обработки потока сообщений требуется увеличение (в логах встречаются ошибки `java.lang.OutOfMemoryError`) #Необходимо изменить параметры `KAFKA_HEAP_OPTS` (не более 32 ГБ) Restart=always RestartSec=3 LimitNOFILE=1024 [Install] WantedBy=multi-user.target
  8. Создайте каталоги для хранения данных Zookeeper:

    #Можно использовать любую другую директорию для хранения данных zookeeper mkdir /opt/kafka/zk_data_dir chown -R kafka:kafka /opt/kafka/zk_data_dir
  9. Заполните конфигурационный файл: /opt/kafka/config/zookeeper.properties.

    Используйте параметры:

    #Порт для подключения клиентов к zookeeper clientPort=2181 #Максимальное количество клиентских соединений, которые могут быть установлены с ZooKeeper сервером maxClientCnxns=0 #Интервал времени в миллисекундах, который используется ZooKeeper для выполнения различных фоновых задач, таких как синхронизация и выбор лидера в кластере tickTime=2000 #Количество тиков (tickTime), которое ZooKeeper будет ждать, пока клиент не подключится initLimit=5 #Количество тиков (tickTime), которое ZooKeeper будет ждать, пока клиент не будет синхронизирован с остальными клиентами. Если клиент не будет синхронизирован в течение этого времени, сессия будет считаться недействительной syncLimit=2 #Название каталога данных из предыдущего пункта dataDir=/opt/kafka/zk_data_dir #{{node}} — адрес сервера zookeeper server.1={{node}}:2888:3888 #Количество последних бэкапов(snapshot) для сохранения autopurge.snapRetainCount=3 #Интервал в часах для выполнения очистки бэкапов(snapshot) autopurge.purgeInterval=24
  10. Создайте /opt/kafka/zk_data_dir/myid:

    #`myid` файл — его содержимое определяет ID ноды echo '1' > /opt/kafka/zk_data_dir/myid
  11. Создайте zookeeper service unit на хосте в файле: /etc/systemd/system/zookeeper.service.

    Используйте параметры:

    [Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal RestartSec=3 [Install] WantedBy=multi-user.target
  12. Включите и запустите Zookeeper:

    systemctl daemon-reload systemctl enable zookeeper systemctl start zookeeper systemctl status zookeeper journalctl -fu zookeeper
  13. Включите и запустите Kafka:

    systemctl daemon-reload systemctl enable kafka systemctl start kafka systemctl status kafka journalctl -fu kafka

При возникновении дополнительных вопросов используйте документацию Apache Kafka (eng) c выбором необходимой версии.

RAM для брокера

По умолчанию на каждый брокер Kafka выделяется 8 ГБ оперативной памяти.

Измените параметры KAFKA_HEAP_OPTS (не более 32 ГБ) в /etc/systemd/system/kafka.service в случае, если:

  • После вызова команды:

    sudo journalctl -fu
  • Отобразится сообщение:

    OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000640000000, 6442450944, 0) failed; error='Cannot allocate memory'(errno=12)