Сервер очередей
В качестве сервера очередей (и получателя событий) используется Apache Kafka + Zookeeper.
Apache Kafka и Zookeeper уже установлены
Если у вас уже установлены Apache Kafka и Zookeeper (например, при развертывании Хранилища телеметрии), то дополнительные действия не требуются.
Apache Kafka и Zookeeper не установлены
Если у вас отсутствуют Apache Kafka и Zookeeper, выполните действия по установке и настройке ниже.
Порядок установки
Чтобы установить Apache Kafka и Zookeeper:
Установите openjdk (минимум java-1.8.0-openjdk):
Пример для DEB:
apt update apt -y install openjdk-8-jdkПример для RPM:
yum update yum install java-1.8.0-openjdk
Проверьте версию с помощью команды:
java -versionРезультат выполнения:
openjdk version "1.8.0_382"Создайте пользователя Apache Kafka:
Пример для DEB:
useradd kafka -m passwd kafka usermod -a -G sudo kafkaПример для RPM:
useradd kafka -m passwd kafka usermod -aG wheel kafka
Установите дистрибутив Apache Kafka в директорию.
В примере ниже производится установка версии 3.8.1 в /opt/kafka:
mkdir /opt/downloads curl "https://archive.apache.org/dist/kafka/3.8.1/kafka_2.13-3.8.1.tgz" -o /opt/downloads/kafka.tgz mkdir /opt/kafka && cd /opt/kafka tar -xvzf /opt/downloads/kafka.tgz --strip 1 chown -R kafka:kafka /opt/kafkaНа каждом сервере Kafka создайте каталоги для хранения данных:
#Для хранения логов kafka можно использовать любую другую директорию mkdir -p /opt/kafka/kafka_log_dir chown -R kafka:kafka /opt/kafka/kafka_log_dirПримечание: на сервере Kafka должен быть создан каталог для хранения данных очередей
kafka_log_dir. Свободное место на диске должно соответствовать параметрам политики хранения данных (см.log.retention.bytesв/opt/kafka/config/server.properties) — по умолчанию хранятся последние 10 ГБ .Откройте и заполните конфигурационный файл:
/opt/kafka/config/server.properties.Используйте параметры:
#Уникальный идентификатор брокера Kafka broker.id=0 #node_N — адресс сервера kafka advertised.listeners=PLAINTEXT://{{node_N}}:9092 #Количество сетевых потоков, используемых сервером Kafka для обработки запросов клиентов num.network.threads=3 #Количество потоков ввода-вывода, используемых сервером Kafka для обработки запросов клиентов num.io.threads=8 #Размер буфера для отправки данных через сокет socket.send.buffer.bytes=102400 #Размер буфера для получения данных через сокет socket.receive.buffer.bytes=102400 #Максимальный размер запроса, который может быть обработан сервером Kafka socket.request.max.bytes=104857600 #Путь директории логов, созданной на предыдущем шаге log.dirs=/opt/kafka/kafka_log_dir #Количество секций, которые будут созданы для новых топиков по умолчанию num.partitions=1 #Количество потоков, используемых для восстановления данных на диске для каждого каталога данных num.recovery.threads.per.data.dir=1 #Коэффициент репликации для топиков смещений offsets.topic.replication.factor=1 #Коэффициент репликации для журнала состояния транзакций transaction.state.log.replication.factor=1 #Минимальное количество реплик, которые должны быть в статусе "в сети" (ISR) для подтверждения транзакции transaction.state.log.min.isr=1 #Количество сообщений, которые должны быть записаны на диск перед принудительным сбросом #log.flush.interval.messages=1000 #Время в миллисекундах, после которого брокер Kafka будет принудительно записывать данные на диск, независимо от количества неотправленных сообщений #log.flush.interval.ms=100. #Время хранения сообщений в топике в часах log.retention.hours=168 #Максимальный размер логов в байтах log.retention.bytes=10737418240 #Размер сегмента логов в байтах log.segment.bytes=1073741824 #Интервал проверки времени хранения сообщений в топике в миллисекундах log.retention.check.interval.ms=300000 #node - адрес сервера zookeeper zookeeper.connect={{node}}:2181 #Время ожидания подключения к ZooKeeper в миллисекундах zookeeper.connection.timeout.ms=6000 #Определяет задержку в миллисекундах перед началом балансировки группы потребителей, если оставить 0, то задержки балансировщика после старта сервиса не будет group.initial.rebalance.delay.ms=0 #Флаг, указывающий, разрешено ли удаление топиков delete.topic.enable=true # Флаг, разрешающий создание топиков сторонними приложениями auto.create.topics.enable=trueСоздайте kafka service unit в файле:
/etc/systemd/system/kafka.service.Используйте параметры:
[Unit] After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties' ExecStop=/opt/kafka/bin/kafka-server-stop.sh Environment="KAFKA_HEAP_OPTS=-Xmx8192M -Xms8192M" #По умолчанию на каждый брокер Kafka выделяется 8 ГБ оперативной памяти. #В случае, если для обработки потока сообщений требуется увеличение (в логах встречаются ошибки `java.lang.OutOfMemoryError`) #Необходимо изменить параметры `KAFKA_HEAP_OPTS` (не более 32 ГБ) Restart=always RestartSec=3 LimitNOFILE=1024 [Install] WantedBy=multi-user.targetСоздайте каталоги для хранения данных Zookeeper:
#Можно использовать любую другую директорию для хранения данных zookeeper mkdir /opt/kafka/zk_data_dir chown -R kafka:kafka /opt/kafka/zk_data_dirЗаполните конфигурационный файл:
/opt/kafka/config/zookeeper.properties.Используйте параметры:
#Порт для подключения клиентов к zookeeper clientPort=2181 #Максимальное количество клиентских соединений, которые могут быть установлены с ZooKeeper сервером maxClientCnxns=0 #Интервал времени в миллисекундах, который используется ZooKeeper для выполнения различных фоновых задач, таких как синхронизация и выбор лидера в кластере tickTime=2000 #Количество тиков (tickTime), которое ZooKeeper будет ждать, пока клиент не подключится initLimit=5 #Количество тиков (tickTime), которое ZooKeeper будет ждать, пока клиент не будет синхронизирован с остальными клиентами. Если клиент не будет синхронизирован в течение этого времени, сессия будет считаться недействительной syncLimit=2 #Название каталога данных из предыдущего пункта dataDir=/opt/kafka/zk_data_dir #{{node}} — адрес сервера zookeeper server.1={{node}}:2888:3888 #Количество последних бэкапов(snapshot) для сохранения autopurge.snapRetainCount=3 #Интервал в часах для выполнения очистки бэкапов(snapshot) autopurge.purgeInterval=24Создайте
/opt/kafka/zk_data_dir/myid:#`myid` файл — его содержимое определяет ID ноды echo '1' > /opt/kafka/zk_data_dir/myidСоздайте zookeeper service unit на хосте в файле:
/etc/systemd/system/zookeeper.service.Используйте параметры:
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal RestartSec=3 [Install] WantedBy=multi-user.targetВключите и запустите Zookeeper:
systemctl daemon-reload systemctl enable zookeeper systemctl start zookeeper systemctl status zookeeper journalctl -fu zookeeperВключите и запустите Kafka:
systemctl daemon-reload systemctl enable kafka systemctl start kafka systemctl status kafka journalctl -fu kafka
При возникновении дополнительных вопросов используйте документацию Apache Kafka (eng) c выбором необходимой версии.
RAM для брокера
По умолчанию на каждый брокер Kafka выделяется 8 ГБ оперативной памяти.
Измените параметры KAFKA_HEAP_OPTS (не более 32 ГБ) в /etc/systemd/system/kafka.service в случае, если:
После вызова команды:
sudo journalctl -fuОтобразится сообщение:
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000640000000, 6442450944, 0) failed; error='Cannot allocate memory'(errno=12)