Настройка событий аудита
Чтобы обеспечить трассировку событий PostgreSQL, Keycloak и Vault, настройте сбор и обработку событий аудита в PAM. События будут экспортироваться в Kafka для дальнейшего анализа и хранения.
Шаг 1. Подготовка PostgreSQL к логической репликации
Чтобы Debezium мог считывать изменения из PostgreSQL, включите логическую репликацию для CDC (Change Data Capture).
В
postgresql.confвключите логическую репликацию:############ REPLICATION ############## wal_level = logical max_wal_senders = 10 # минимум 1 (по количеству подключений репликаторов) max_replication_slots = 10 # минимум 1 на каждый коннектор DebeziumПерезапустите PostgreSQL после изменения конфигурации.
Создайте пользователя с правами репликации:
REATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'password';Разрешите репликацию в
pg_hba.conf:local replication replicator trust host replication replicator 127.0.0.1/32 trust host replication replicator ::1/128 trust
После включения репликации, переходите к настройке таблиц.
Шаг 2. Подготовка таблиц PAM к отслеживанию изменений
Чтобы Debezium мог отслеживать старые изменения, включите REPLICA IDENTITY FULL для таблиц PAM. При включенном режиме PostgreSQL будет возвращать все значения строки UPDATE/DELETE.
Установите режим
REPLICA IDENTITY FULLдля каждой таблицы:ALTER TABLE pam.container REPLICA IDENTITY FULL; ALTER TABLE pam.credential REPLICA IDENTITY FULL; ALTER TABLE pam.account REPLICA IDENTITY FULL; ALTER TABLE pam.application REPLICA IDENTITY FULL; ALTER TABLE pam.target REPLICA IDENTITY FULL; ALTER TABLE pam.session_connection REPLICA IDENTITY FULL; ALTER TABLE pam_auth.event_entity REPLICA IDENTITY FULL; ALTER TABLE pam_auth.user_role_mapping REPLICA IDENTITY FULL; ALTER TABLE pam_auth.user_entity REPLICA IDENTITY FULL;Проверьте, что для всех перечисленных таблиц установлен режим f — FULL:
SELECT relname, relreplident FROM pg_class WHERE relname IN ('container','credential','account', 'application','target','session_connection', 'event_entity','user_role_mapping','user_entity');Возможные значения
relreplident:d — DEFAULT, только PRIMARY KEY.
n — NOTHING, изменения игнорируются.
f — FULL все поля.
i — USING INDEX, по указанному уникальному индексу.
После проверки таблиц, переходите к настройке публикаций и слотов публикации.
Шаг 3. Настройка публикаций и слотов репликации
Чтобы определить, какие таблицы отслеживать и через какой канал передавать события в Debezium, создайте публикации и слоты для логической репликации.
Создайте публикации:
CREATE PUBLICATION container FOR TABLE pam.container; CREATE PUBLICATION credential FOR TABLE pam.credential; CREATE PUBLICATION account FOR TABLE pam.account; CREATE PUBLICATION application FOR TABLE pam.application; CREATE PUBLICATION target FOR TABLE pam.target; CREATE PUBLICATION session_connection FOR TABLE pam.session_connection; CREATE PUBLICATION event_entity FOR TABLE pam_auth.event_entity; CREATE PUBLICATION user_role_mapping FOR TABLE pam_auth.user_role_mapping; CREATE PUBLICATION user_entity FOR TABLE pam_auth.user_entity;Проверьте, что публикации созданы:
SELECT * FROM pg_publication_tables;Если публикации нужно пересоздать, удалите их:
DROP PUBLICATION pub_name;Создайте логические слоты репликации:
SELECT pg_create_logical_replication_slot('container', 'pgoutput'); SELECT pg_create_logical_replication_slot('credential', 'pgoutput'); SELECT pg_create_logical_replication_slot('account', 'pgoutput'); SELECT pg_create_logical_replication_slot('application', 'pgoutput'); SELECT pg_create_logical_replication_slot('target', 'pgoutput'); SELECT pg_create_logical_replication_slot('session_connection', 'pgoutput'); SELECT pg_create_logical_replication_slot('event_entity', 'pgoutput'); SELECT pg_create_logical_replication_slot('user_role_mapping', 'pgoutput'); SELECT pg_create_logical_replication_slot('user_entity', 'pgoutput');Проверьте, что слоты репликации созданы:
SELECT * FROM pg_replication_slots;Если слоты нужно пересоздать, удалите их:
SELECT pg_drop_replication_slot('slot_name');
После создания публикаций и слотов репликации, переходите к настройке коннектора Debezium.
Шаг 4. Настройка коннекторов Debezium для Kafka Connect
Чтобы события, которые происходят в PAM попадали в Kafka в реальном времени, подключите Kafka к PostgreSQL через коннекторы Debezium. Debezium будет потреблять изменения через созданные публикации и слоты репликации.
Предварительная настройка
Прежде чем настраивать коннекторы убедитесь, что:
Все предыдущие шаги выполнены корректно.
Пользователь
replicatorимеет необходимые для выполнения запросов права.Kafka, Kafka Connect и Kafka-брокер работают корректно.
Настройка коннекторов Debezium
Чтобы подключить Kafka к PostgreSQL:
Cоздайте Debezium-коннекторы для каждого события через curl-запросы.
В общем виде, запрос выглядит следующим образом:curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "connector_name", // Уникальное имя коннектора Kafka Connect "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // Класс коннектора "plugin.name": "pgoutput", // Логический плагин PostgreSQL // Подключение к PostgreSQL "database.hostname": "{{ _pam_postgres_host }}", // Хост БД "database.port": "{{ _pam_postgres_port }}", // Порт БД "database.user": "{{ _pam_postgres_user }}", // Пользователь с правами REPLICATION "database.password": "{{ _pam_postgres_password }}",// Пароль пользователя "database.dbname": "{{ _pam_postgres_db }}", // Название базы данных // CDC область покрытия "table.include.list": "schema.table_name", // Таблица, которую нужно отслеживать "publication.name": "pub_name", // Имя публикации PostgreSQL "slot.name": "slot_name", // Имя слота репликации // Настройки Kafka топиков "topic.prefix": "check_table_name", // Префикс имени выходного топика "topic.creation.default.replication.factor": "1", // Фактор репликации топика "topic.creation.default.partitions": "1", // Количество партиций по умолчанию "topic.creation.enable": "true", // Автоматическое создание топиков "offset.storage.replication.factor": "1", // Фактор репликации системного топика offset'ов // Трансформация имени топика "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", // Тип трансформации "transforms.routeToCustomTopic.regex": ".*", // Что нужно изменить "transforms.routeToCustomTopic.replacement": "topic_name" // Итоговое имя топика } }'Ниже — список запросов для каждого из событий.
curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "container", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.container", "publication.name": "container", "slot.name": "container", "topic.prefix": "check_container", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "container" } }'curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "credential", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.credential", "publication.name": "credential", "slot.name": "credential", "topic.prefix": "check_credential", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "credential" } }'curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "account", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.account", "publication.name": "account", "slot.name": "account", "topic.prefix": "check_account", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "account" } }'curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "application", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.application", "publication.name": "application", "slot.name": "application", "topic.prefix": "check_application", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "application" } }'curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "target", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.target", "publication.name": "target", "slot.name": "target", "topic.prefix": "check_target", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "target" } }'curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "session_connection", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.session_connection", "publication.name": "session_connection", "slot.name": "session_connection", "topic.prefix": "check_session_connection", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "session_connection" } }'curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "event_entity", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam_auth.event_entity", "publication.name": "event_entity", "slot.name": "event_entity", "topic.prefix": "check_event_entity", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "event_entity" } }'curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "user_role_mapping", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam_auth.user_role_mapping", "publication.name": "user_role_mapping", "slot.name": "user_role_mapping", "topic.prefix": "check_user_role_mapping", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "user_role_mapping" } }'curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "user_entity", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam_auth.user_entity", "publication.name": "user_entity", "slot.name": "user_entity", "topic.prefix": "check_user_entity", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "user_entity" } }'Чтобы просмотреть статус коннектора, выполните запрос:
curl http://localhost:8083/connectors/connector_name/statusЕсли коннектор создан с ошибкой, то удалите его с помощью запроса:
curl -X DELETE http://localhost:8083/connectors/connector_nameВ интерфейсе Kafka проверьте, что события связанные с таблицами корректно приходят в топики:
containercredentialaccountapplicationtargetsession_connectionevent_entityuser_role_mappinguser_entity
Для событий, которые связаны с Sorg Adapter, в интерфейсе Kafka вручную создайте топик
ext_request, в который будут приходить события.
Подробнее о настройке топиков в интерфейсе Kafka — в настройке топиков Kafka.
После настройки коннекторов, переходите к настройке событий в Keycloak.
Шаг 5. Настройка событий в Keycloak
Чтобы Keycloak мог отправлять аудит в Kafka, включите сбор пользовательских и административных событий.
Откройте Keycloak, в списке окружений выберите pam.
Перейдите в раздел Clients и нажмите Create client.
На шаге General settings:
Client ID — укажите "event-sender".
Name — укажите "pam-event-sender".
На шаге Capability config:
Client authentication — выберите On.
Authorization — выберите On.
Отметьте Direct access grants. Остальные поля оставьте без изменений.
В созданном клиенте event-sender перейдите на вкладу Service accounts roles.
Отметьте роли view-events, view-users, view-groups и нажмите Assign role.
Перейдите в раздел в Configure → Realm settings.
Откройте Events.
На вкладке User events settings:
Save events — выберите On.
Expiration — проверьте, что указано минимум 1 Days.
Нажмите Add saved types.
В открывшемся окне отметьте все типы событий и нажмите Add.
На вкладке Admin events settings:
Save events — выберите On.
Include representation — выберите On.
Expiration — проверьте, что указано минимум 1 Days.
Сохраните изменения.
После настройки аудита событий в Keycloak, переходите к настройке аудита Vault.
Шаг 6. Настройка аудита в Vault
Чтобы хранить и анализировать активности в Vault, включите логирование событий в Vault и их пересылку в Kafka.
Включите логирование аудита Vault в файл:
vault audit enable file file_path=/var/log/vault/vault_audit.logНастройте стандартное логирование Vault в файл. Для этого добавьте в конфигурационный файл Vault:
log_file = "/var/log/vault/vault.log"Скопируйте ваши PEM-сертификаты в каталог
/etc/ssl/certs/rsyslog/:sudo mkdir -p /etc/ssl/certs/rsyslog/ sudo cp client.cer.pem client.key.pem ca.cer.pem /etc/ssl/certs/rsyslog/ sudo chmod 600 /etc/ssl/certs/rsyslog/client.key.pemУстановите пакет
rsyslog-kafka:sudo apt-get update sudo apt-get install rsyslog-kafkaНастройте
rsyslogна чтения логов Vault и отправку в Kafka.
Для этого создайте файл конфигурации:sudo nano /etc/rsyslog.d/00-kafka.confНастройте файл конфигурации:
module(load="imfile") module(load="omkafka") ############################################# # Обработка файла аудита Vault (vault_audit.log) ############################################# input( type="imfile" File="/var/log/vault/vault_audit.log" Tag="vault-audit-log" ) template(name="VaultAuditLog" type="string" string="%msg%") if ($syslogtag == 'vault-audit-log') then { # Фильтрация незначимых событий if ($msg contains '"operation":"help"') then { stop } if ($msg contains '"operation":"list"') then { stop } if ($msg contains '"path":"sys/internal/ui/') then { stop } if ($msg contains '"path":"sys/capabilities-self"') then { stop } if ($msg contains '"path":"sys/license/features"') then { stop } if ($msg contains '"path":"sys/auth"' and $msg contains '"operation":"read"') then { stop } if ($msg contains '"mount_type":"identity"') then { stop } if ($msg contains '"path":"sys/leases/lookup"') then { stop } # Отправка события в Kafka action( type="omkafka" template="VaultAuditLog" topic="vault-audit-log" broker=["sorg.demopam.lcl:9093"] confParam=[ "security.protocol=ssl", "ssl.ca.location=/etc/ssl/certs/rsyslog/ca.cer.pem", "ssl.certificate.location=/etc/ssl/certs/rsyslog/client.cer.pem", "ssl.key.location=/etc/ssl/certs/rsyslog/client.key.pem" ] ) stop } ############################################# # Обработка основного лога Vault (vault.log) ############################################# input( type="imfile" File="/var/log/vault/vault.log" Tag="vault-log" ) template(name="vaultLogJsonFormat" type="list") { constant(value="{") constant(value="\"time\":\"") property(name="timereported" dateFormat="rfc3339") constant(value="\",\"operation\":\"update\",") constant(value="\"path\":\"sys/unseal\",") constant(value="\"event\":\"") property(name="msg") constant(value="\"}") } if ($syslogtag == 'vault-log') then { if ($msg contains "core: vault is unsealed") then { action( type="omkafka" template="vaultLogJsonFormat" topic="vault-audit-log" broker=["sorg.demopam.lcl:9093"] confParam=[ "security.protocol=ssl", "ssl.ca.location=/etc/ssl/certs/rsyslog/ca.cer.pem", "ssl.certificate.location=/etc/ssl/certs/rsyslog/client.cer.pem", "ssl.key.location=/etc/ssl/certs/rsyslog/client.key.pem" ] ) stop } } # Остановка дальнейшей обработки vault.log if ($syslogtag == 'vault-log') then { stop }Проверьте конфигурационный файл:
rsyslogd -N1Перезапустите службу и проверьте статус
rsyslog:sudo systemctl restart rsyslog sudo systemctl status rsyslogЕсли статус failed, проверьте журнал на наличие ошибок:
sudo journalctl -u rsyslog -eУбедитесь, что Vault имеет права на создание и запись файлов
/var/log/vault/vault_audit.logи
/var/log/vault/vault.log, аrsyslogимеет права на чтение этих файлов:sudo chown vault:vault /var/log/vault/ sudo chmod 640 /var/log/vault/vault_audit.log /var/log/vault/vault.log
После завершения настройки, создайте топик vault-audit-log в интерфейсе Kafka, а также остальные топики event-sender.
Шаг 7. Настройка топиков в Kafka
В интерфейсе Kafka вручную создайте топики, в которые event-sender будет отправлять обработанные сообщения. Список этих топиков определяется в конфигурационном файле event-sender.
Откройте интерфейс управления Kafka.
Перейдите в раздел pkb > producers.
Из конфигурационного файла
event-senderполучите список отправителей (producers) и соответствующих им топиков.Для каждого отправителя:
Создайте новый топик вручную.
Укажите
retention.ms(время хранения сообщений) на1 день(86400000 мс).Убедитесь, что количество партиций и реплика-фактор соответствуют требованиям производительности и отказоустойчивости вашей системы.
Укажите названия топиков, созданных через Debezium-коннекторы:
containercredentialaccountapplicationtargetsession_connectionevent_entityuser_role_mappinguser_entity
Укажите названия топиков, созданных вручную:
для Vault —
vault-audit-log.для Sorg Adapter —
ext_request.
Для корректной работы всех компонентов укажите:
Адреса Kafka-брокеров, например
broker1:9092,broker2:9092.Сертификаты безопасности — путь к truststore/keystore и пароли, если требуются.
Параметры подключения для Keycloak и Web API.
Готово. Проверьте, что события из PAM экспортируются в Kafka.