Настройка событий аудита

Чтобы обеспечить трассировку событий PostgreSQL, Keycloak и Vault, настройте сбор и обработку событий аудита в PAM. События будут экспортироваться в Kafka для дальнейшего анализа и хранения.

Шаг 1. Подготовка PostgreSQL к логической репликации

Чтобы Debezium мог считывать изменения из PostgreSQL, включите логическую репликацию для CDC (Change Data Capture).

  1. В postgresql.conf включите логическую репликацию:

    ############ REPLICATION ############## wal_level = logical max_wal_senders = 10 # минимум 1 (по количеству подключений репликаторов) max_replication_slots = 10 # минимум 1 на каждый коннектор Debezium
  2. Перезапустите PostgreSQL после изменения конфигурации.

  3. Создайте пользователя с правами репликации:

    REATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'password';
  4. Разрешите репликацию в pg_hba.conf:

    local   replication     replicator                  trust host    replication     replicator 127.0.0.1/32     trust host    replication     replicator ::1/128          trust

После включения репликации, переходите к настройке таблиц.

Шаг 2. Подготовка таблиц PAM к отслеживанию изменений

Чтобы Debezium мог отслеживать старые изменения, включите REPLICA IDENTITY FULL для таблиц PAM. При включенном режиме PostgreSQL будет возвращать все значения строки UPDATE/DELETE.

  1. Установите режим REPLICA IDENTITY FULL для каждой таблицы:

    ALTER TABLE  pam.container REPLICA IDENTITY FULL; ALTER TABLE  pam.credential REPLICA IDENTITY FULL; ALTER TABLE  pam.account REPLICA IDENTITY FULL; ALTER TABLE  pam.application REPLICA IDENTITY FULL; ALTER TABLE  pam.target REPLICA IDENTITY FULL; ALTER TABLE  pam.session_connection REPLICA IDENTITY FULL; ALTER TABLE  pam_auth.event_entity REPLICA IDENTITY FULL; ALTER TABLE  pam_auth.user_role_mapping REPLICA IDENTITY FULL; ALTER TABLE  pam_auth.user_entity REPLICA IDENTITY FULL;
  2. Проверьте, что для всех перечисленных таблиц установлен режим f — FULL:

    SELECT relname, relreplident FROM pg_class WHERE relname IN ('container','credential','account', 'application','target','session_connection', 'event_entity','user_role_mapping','user_entity');

    Возможные значения relreplident:

    • d — DEFAULT, только PRIMARY KEY.

    • n — NOTHING, изменения игнорируются.

    • f — FULL все поля.

    • i — USING INDEX, по указанному уникальному индексу.

После проверки таблиц, переходите к настройке публикаций и слотов публикации.

Шаг 3. Настройка публикаций и слотов репликации

Чтобы определить, какие таблицы отслеживать и через какой канал передавать события в Debezium, создайте публикации и слоты для логической репликации.

  1. Создайте публикации:

    CREATE PUBLICATION container FOR TABLE pam.container; CREATE PUBLICATION credential FOR TABLE pam.credential; CREATE PUBLICATION account FOR TABLE pam.account; CREATE PUBLICATION application FOR TABLE pam.application; CREATE PUBLICATION target FOR TABLE pam.target; CREATE PUBLICATION session_connection FOR TABLE pam.session_connection; CREATE PUBLICATION event_entity FOR TABLE pam_auth.event_entity; CREATE PUBLICATION user_role_mapping FOR TABLE pam_auth.user_role_mapping; CREATE PUBLICATION user_entity FOR TABLE pam_auth.user_entity;
  2. Проверьте, что публикации созданы:

    SELECT * FROM pg_publication_tables;

    Если публикации нужно пересоздать, удалите их:

    DROP PUBLICATION pub_name;
  3. Создайте логические слоты репликации:

    SELECT pg_create_logical_replication_slot('container', 'pgoutput'); SELECT pg_create_logical_replication_slot('credential', 'pgoutput'); SELECT pg_create_logical_replication_slot('account', 'pgoutput'); SELECT pg_create_logical_replication_slot('application', 'pgoutput'); SELECT pg_create_logical_replication_slot('target', 'pgoutput'); SELECT pg_create_logical_replication_slot('session_connection', 'pgoutput'); SELECT pg_create_logical_replication_slot('event_entity', 'pgoutput'); SELECT pg_create_logical_replication_slot('user_role_mapping', 'pgoutput'); SELECT pg_create_logical_replication_slot('user_entity', 'pgoutput');
  4. Проверьте, что слоты репликации созданы:

    SELECT * FROM pg_replication_slots;

    Если слоты нужно пересоздать, удалите их:

    SELECT pg_drop_replication_slot('slot_name');

После создания публикаций и слотов репликации, переходите к настройке коннектора Debezium.

Шаг 4. Настройка коннекторов Debezium для Kafka Connect

Чтобы события, которые происходят в PAM попадали в Kafka в реальном времени, подключите Kafka к PostgreSQL через коннекторы Debezium. Debezium будет потреблять изменения через созданные публикации и слоты репликации.

Предварительная настройка

Прежде чем настраивать коннекторы убедитесь, что:

  • Все предыдущие шаги выполнены корректно.

  • Пользователь replicator имеет необходимые для выполнения запросов права.

  • Kafka, Kafka Connect и Kafka-брокер работают корректно.

Настройка коннекторов Debezium

Чтобы подключить Kafka к PostgreSQL:

  1. Cоздайте Debezium-коннекторы для каждого события через curl-запросы.
    В общем виде, запрос выглядит следующим образом:

    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "connector_name", // Уникальное имя коннектора Kafka Connect "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", // Класс коннектора "plugin.name": "pgoutput", // Логический плагин PostgreSQL // Подключение к PostgreSQL "database.hostname": "{{ _pam_postgres_host }}", // Хост БД "database.port": "{{ _pam_postgres_port }}", // Порт БД "database.user": "{{ _pam_postgres_user }}", // Пользователь с правами REPLICATION "database.password": "{{ _pam_postgres_password }}",// Пароль пользователя "database.dbname": "{{ _pam_postgres_db }}", // Название базы данных // CDC область покрытия "table.include.list": "schema.table_name", // Таблица, которую нужно отслеживать "publication.name": "pub_name", // Имя публикации PostgreSQL "slot.name": "slot_name", // Имя слота репликации // Настройки Kafka топиков "topic.prefix": "check_table_name", // Префикс имени выходного топика "topic.creation.default.replication.factor": "1", // Фактор репликации топика "topic.creation.default.partitions": "1", // Количество партиций по умолчанию "topic.creation.enable": "true", // Автоматическое создание топиков "offset.storage.replication.factor": "1", // Фактор репликации системного топика offset'ов // Трансформация имени топика "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", // Тип трансформации "transforms.routeToCustomTopic.regex": ".*", // Что нужно изменить "transforms.routeToCustomTopic.replacement": "topic_name" // Итоговое имя топика } }'

    Ниже — список запросов для каждого из событий.

    container
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "container", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.container", "publication.name": "container", "slot.name": "container", "topic.prefix": "check_container", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "container" } }'
    credential
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "credential", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.credential", "publication.name": "credential", "slot.name": "credential", "topic.prefix": "check_credential", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "credential" } }'
    account
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "account", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.account", "publication.name": "account", "slot.name": "account", "topic.prefix": "check_account", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "account" } }'
    application
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "application", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.application", "publication.name": "application", "slot.name": "application", "topic.prefix": "check_application", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "application" } }'
    target
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "target", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.target", "publication.name": "target", "slot.name": "target", "topic.prefix": "check_target", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "target" } }'
    session_connection
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "session_connection", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam.session_connection", "publication.name": "session_connection", "slot.name": "session_connection", "topic.prefix": "check_session_connection", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "session_connection" } }'
    event_entity
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "event_entity", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam_auth.event_entity", "publication.name": "event_entity", "slot.name": "event_entity", "topic.prefix": "check_event_entity", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "event_entity" } }'
    user_role_mapping
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "user_role_mapping", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam_auth.user_role_mapping", "publication.name": "user_role_mapping", "slot.name": "user_role_mapping", "topic.prefix": "check_user_role_mapping", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "user_role_mapping" } }'
    user_entity
    curl -X POST http://localhost:8083/connectors \ -H "Content-Type: application/json" \ -d '{ "name": "user_entity", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "{{ _pam_postgres_host }}", "database.port": "{{ _pam_postgres_port }}", "database.user": "{{ _pam_postgres_user }}", "database.password": "{{ _pam_postgres_password }}", "database.dbname": "{{ _pam_postgres_db }}", "table.include.list": "pam_auth.user_entity", "publication.name": "user_entity", "slot.name": "user_entity", "topic.prefix": "check_user_entity", "topic.creation.default.replication.factor": "1", "topic.creation.default.partitions": "1", "topic.creation.enable": "true", "offset.storage.replication.factor": "1", "transforms": "routeToCustomTopic", "transforms.routeToCustomTopic.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.routeToCustomTopic.regex": ".*", "transforms.routeToCustomTopic.replacement": "user_entity" } }'
  2. Чтобы просмотреть статус коннектора, выполните запрос:

    curl http://localhost:8083/connectors/connector_name/status

    Если коннектор создан с ошибкой, то удалите его с помощью запроса:

    curl -X DELETE http://localhost:8083/connectors/connector_name
  3. В интерфейсе Kafka проверьте, что события связанные с таблицами корректно приходят в топики:

    • container

    • credential

    • account

    • application

    • target

    • session_connection

    • event_entity

    • user_role_mapping

    • user_entity

  4. Для событий, которые связаны с Sorg Adapter, в интерфейсе Kafka вручную создайте топик ext_request, в который будут приходить события.

Подробнее о настройке топиков в интерфейсе Kafka — в настройке топиков Kafka.

После настройки коннекторов, переходите к настройке событий в Keycloak.

Шаг 5. Настройка событий в Keycloak

Чтобы Keycloak мог отправлять аудит в Kafka, включите сбор пользовательских и административных событий.

  1. Откройте Keycloak, в списке окружений выберите pam.

  2. Перейдите в раздел Clients и нажмите Create client.

  3. На шаге General settings:

    • Client ID — укажите "event-sender".

    • Name — укажите "pam-event-sender".

  4. На шаге Capability config:

    • Client authentication — выберите On.

    • Authorization — выберите On.

    • Отметьте Direct access grants. Остальные поля оставьте без изменений.

  5. В созданном клиенте event-sender перейдите на вкладу Service accounts roles.

  6. Отметьте роли view-events, view-users, view-groups и нажмите Assign role.

  7. Перейдите в раздел в Configure → Realm settings.

  8. Откройте Events.

  9. На вкладке User events settings:

    • Save events — выберите On.

    • Expiration — проверьте, что указано минимум 1 Days.

  10. Нажмите Add saved types.

  11. В открывшемся окне отметьте все типы событий и нажмите Add.

  12. На вкладке Admin events settings:

    • Save events — выберите On.

    • Include representation — выберите On.

    • Expiration — проверьте, что указано минимум 1 Days.

  13. Сохраните изменения.

После настройки аудита событий в Keycloak, переходите к настройке аудита Vault.

Шаг 6. Настройка аудита в Vault

Чтобы хранить и анализировать активности в Vault, включите логирование событий в Vault и их пересылку в Kafka.

  1. Включите логирование аудита Vault в файл:

    vault audit enable file file_path=/var/log/vault/vault_audit.log
  2. Настройте стандартное логирование Vault в файл. Для этого добавьте в конфигурационный файл Vault:

    log_file = "/var/log/vault/vault.log"
  3. Скопируйте ваши PEM-сертификаты в каталог /etc/ssl/certs/rsyslog/:

    sudo mkdir -p /etc/ssl/certs/rsyslog/ sudo cp client.cer.pem client.key.pem ca.cer.pem /etc/ssl/certs/rsyslog/ sudo chmod 600 /etc/ssl/certs/rsyslog/client.key.pem
  4. Установите пакет rsyslog-kafka:

    sudo apt-get update sudo apt-get install rsyslog-kafka
  5. Настройте rsyslog на чтения логов Vault и отправку в Kafka.
    Для этого создайте файл конфигурации:

    sudo nano /etc/rsyslog.d/00-kafka.conf
  6. Настройте файл конфигурации:

    module(load="imfile") module(load="omkafka") ############################################# # Обработка файла аудита Vault (vault_audit.log) ############################################# input( type="imfile" File="/var/log/vault/vault_audit.log" Tag="vault-audit-log" ) template(name="VaultAuditLog" type="string" string="%msg%") if ($syslogtag == 'vault-audit-log') then { # Фильтрация незначимых событий if ($msg contains '"operation":"help"') then { stop } if ($msg contains '"operation":"list"') then { stop } if ($msg contains '"path":"sys/internal/ui/') then { stop } if ($msg contains '"path":"sys/capabilities-self"') then { stop } if ($msg contains '"path":"sys/license/features"') then { stop } if ($msg contains '"path":"sys/auth"' and $msg contains '"operation":"read"') then { stop } if ($msg contains '"mount_type":"identity"') then { stop } if ($msg contains '"path":"sys/leases/lookup"') then { stop } # Отправка события в Kafka action( type="omkafka" template="VaultAuditLog" topic="vault-audit-log" broker=["sorg.demopam.lcl:9093"] confParam=[ "security.protocol=ssl", "ssl.ca.location=/etc/ssl/certs/rsyslog/ca.cer.pem", "ssl.certificate.location=/etc/ssl/certs/rsyslog/client.cer.pem", "ssl.key.location=/etc/ssl/certs/rsyslog/client.key.pem" ] ) stop } ############################################# # Обработка основного лога Vault (vault.log) ############################################# input( type="imfile" File="/var/log/vault/vault.log" Tag="vault-log" ) template(name="vaultLogJsonFormat" type="list") { constant(value="{") constant(value="\"time\":\"") property(name="timereported" dateFormat="rfc3339") constant(value="\",\"operation\":\"update\",") constant(value="\"path\":\"sys/unseal\",") constant(value="\"event\":\"") property(name="msg") constant(value="\"}") } if ($syslogtag == 'vault-log') then { if ($msg contains "core: vault is unsealed") then { action( type="omkafka" template="vaultLogJsonFormat" topic="vault-audit-log" broker=["sorg.demopam.lcl:9093"] confParam=[ "security.protocol=ssl", "ssl.ca.location=/etc/ssl/certs/rsyslog/ca.cer.pem", "ssl.certificate.location=/etc/ssl/certs/rsyslog/client.cer.pem", "ssl.key.location=/etc/ssl/certs/rsyslog/client.key.pem" ] ) stop } } # Остановка дальнейшей обработки vault.log if ($syslogtag == 'vault-log') then { stop }
  7. Проверьте конфигурационный файл:

    rsyslogd -N1
  8. Перезапустите службу и проверьте статус rsyslog:

    sudo systemctl restart rsyslog sudo systemctl status rsyslog

    Если статус failed, проверьте журнал на наличие ошибок:

    sudo journalctl -u rsyslog -e
  9. Убедитесь, что Vault имеет права на создание и запись файлов /var/log/vault/vault_audit.log

    и /var/log/vault/vault.log, а rsyslog имеет права на чтение этих файлов:

    sudo chown vault:vault /var/log/vault/ sudo chmod 640 /var/log/vault/vault_audit.log /var/log/vault/vault.log

После завершения настройки, создайте топик vault-audit-log в интерфейсе Kafka, а также остальные топики event-sender.

Шаг 7. Настройка топиков в Kafka

В интерфейсе Kafka вручную создайте топики, в которые event-sender будет отправлять обработанные сообщения. Список этих топиков определяется в конфигурационном файле event-sender.

  1. Откройте интерфейс управления Kafka.

  2. Перейдите в раздел pkb > producers.

  3. Из конфигурационного файла event-sender получите список отправителей (producers) и соответствующих им топиков.

  4. Для каждого отправителя:

    • Создайте новый топик вручную.

    • Укажите retention.ms (время хранения сообщений) на 1 день (86400000 мс).

    • Убедитесь, что количество партиций и реплика-фактор соответствуют требованиям производительности и отказоустойчивости вашей системы.

  5. Укажите названия топиков, созданных через Debezium-коннекторы:

    • container

    • credential

    • account

    • application

    • target

    • session_connection

    • event_entity

    • user_role_mapping

    • user_entity

  6. Укажите названия топиков, созданных вручную:

    • для Vaultvault-audit-log.

    • для Sorg Adapterext_request.

  7. Для корректной работы всех компонентов укажите:

    • Адреса Kafka-брокеров, например broker1:9092,broker2:9092.

    • Сертификаты безопасности — путь к truststore/keystore и пароли, если требуются.

    • Параметры подключения для Keycloak и Web API.

Готово. Проверьте, что события из PAM экспортируются в Kafka.