diff --git a/README.md b/README.md index 6cf5e54..a37cfb2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # db_log_exporter -从 MySQL / PostgreSQL 数据库表中**定时拉取日志**,写入标准 **syslog 格式**(RFC 5424 变体)文本文件。 +从 MySQL / PostgreSQL 数据库表中**定时拉取日志**,写入标准 **syslog 格式**(RFC 5424 变体)文本文件。 --- @@ -22,20 +22,24 @@ ## 输出格式 -每行一条日志,格式如下(RFC 5424 变体): +每行一条日志,格式如下(RFC 5424 变体): ``` -1 2026-05-12T15:30:00.123456+08:00 hostname app_name[12345]: [trace=abc123] [span=def456] 日志内容 +1 2026-05-12T15:30:00.123456+08:00 hostname app_name[12345]: [user=张三] [action=删除订单] [target=订单系统] [result=成功] [trace=abc123] [span=def456] 日志内容 ``` | 字段 | 说明 | |------|------| | `` | syslog 优先级,`<6>`=INFO `<4>`=WARN `<3>`=ERROR 等 | | `version` | RFC 5424 版本号,始终为 `1` | -| `timestamp` | ISO 8601 时间(含微秒和时区) | -| `hostname` | 配置文件中的 hostname(默认系统 hostname) | +| `timestamp` | ISO 8601 时间(含微秒和时区) | +| `hostname` | 配置文件中的 hostname(默认系统 hostname) | | `app_name[pid]` | 配置的 app_name + 本进程 PID | -| `[trace=...] [span=...]` | 结构化数据(trace_id / span_id 等) | +| `[user=...]` | 操作人用户名 | +| `[action=...]` | 操作动作,如:删除/创建/更新/登录等 | +| `[target=...]` | 操作对象,如:订单系统/用户管理/配置项等 | +| `[result=...]` | 操作结果,如:成功/失败/超时等 | +| `[trace=...] [span=...]` | 结构化数据(trace_id / span_id 等) | | `message` | 日志正文 | --- @@ -70,7 +74,7 @@ sudo mkdir -p /etc/db_log_exporter sudo nano /etc/db_log_exporter/config.yaml ``` -参考 `config.yaml.example`(本仓库根目录),主要填: +参考 `config.yaml.example`(本仓库根目录),主要填: ```yaml databases: @@ -93,7 +97,7 @@ sources: message: message ``` -### 4. 测试连接(Dry-run) +### 4. 测试连接(Dry-run) ```bash python3 /opt/db_log_exporter/db_log_exporter.py \ @@ -103,7 +107,7 @@ python3 /opt/db_log_exporter/db_log_exporter.py \ ### 5. 运行方式 -**方式 A:systemd 守护进程(推荐)** +**方式 A:systemd 守护进程(推荐)** ```bash # 复制 service 文件 @@ -113,7 +117,7 @@ sudo systemctl enable --now db_log_exporter sudo journalctl -u db_log_exporter -f # 实时查看日志 ``` -**方式 B:cron 定时任务(单次模式)** +**方式 B:cron 定时任务(单次模式)** ```bash # crontab -e @@ -121,7 +125,7 @@ sudo journalctl -u db_log_exporter -f # 实时查看日志 -c /etc/db_log_exporter/config.yaml --once ``` -**方式 C:直接运行(前台守护)** +**方式 C:直接运行(前台守护)** ```bash python3 /opt/db_log_exporter/db_log_exporter.py \ @@ -139,7 +143,7 @@ global: output_dir: /var/log/db_exporter # 日志输出目录 checkpoint_dir: /var/lib/... # 断点文件目录 hostname: myserver # syslog hostname - interval: 30 # 默认轮询间隔(秒) + interval: 30 # 默认轮询间隔(秒) batch_size: 1000 # 默认每次最多读条数 databases: @@ -168,9 +172,14 @@ sources: message: message trace_id: trace_id # 可选 span_id: span_id # 可选 + # 操作审计字段 + user: user # 操作人 + action: action # 操作动作 + target: target # 操作对象 + result: result # 操作结果 ``` -### 数据库用户权限(最小权限) +### 数据库用户权限(最小权限) **MySQL:** ```sql @@ -192,7 +201,7 @@ GRANT SELECT ON audit_log TO log_reader; ## 与日志采集系统集成 -### rsyslog(服务器接收) +### rsyslog(服务器接收) ```conf # /etc/rsyslog.d/60-db-exporter.conf @@ -204,7 +213,7 @@ input(type="imfile" Facility="local0") ``` -### Filebeat(采集到 Elasticsearch) +### Filebeat(采集到 Elasticsearch) ```yaml # filebeat.yml @@ -217,7 +226,7 @@ filebeat.inputs: fields_under_root: true ``` -### Promtail(Loki 采集) +### Promtail(Loki 采集) ```yaml # promtail.yml @@ -240,7 +249,7 @@ scrape_configs: | 连接被拒绝 | 确认数据库允许该 IP 连接,检查防火墙/安全组 | | 权限不足 | 确认运行用户对 `output_dir`、`checkpoint_dir` 有写权限 | | 日志重复 | 删除对应断点文件并重启,程序会从头拉取 | -| 中文乱码 | 确认数据库字符集为 `utf8mb4`(MySQL)或 `UTF8`(PG) | +| 中文乱码 | 确认数据库字符集为 `utf8mb4`(MySQL)或 `UTF8`(PG) | | 连接超时 | 在 `databases` 中加 `connect_timeout: 10` | --- @@ -249,7 +258,7 @@ scrape_configs: ``` db_log_exporter/ -├── db_log_exporter.py # 主程序(Python 守护进程) +├── db_log_exporter.py # 主程序(Python 守护进程) ├── config.yaml.example # 配置文件示例 ├── requirements.txt # Python 依赖 ├── db_log_exporter.service # systemd 服务文件 @@ -262,11 +271,11 @@ db_log_exporter/ ## 命令行参数 ``` --c, --config YAML 配置文件路径(必填) ---once 仅执行一次轮询后退出(适合 cron) +-c, --config YAML 配置文件路径(必填) +--once 仅执行一次轮询后退出(适合 cron) --dry-run 仅测试数据库连接,不写文件 ---log-level 日志级别: DEBUG|INFO|WARNING|ERROR(默认 INFO) ---log-file 本程序日志输出文件(默认 stdout) +--log-level 日志级别: DEBUG|INFO|WARNING|ERROR(默认 INFO) +--log-file 本程序日志输出文件(默认 stdout) ``` --- diff --git a/config.yaml b/config.yaml index 59a3011..9bbcbad 100644 --- a/config.yaml +++ b/config.yaml @@ -13,18 +13,18 @@ # 全局配置 # --------------------------------------------------------------------------- global: - # 日志输出目录(需有写入权限) + # 日志输出目录(需有写入权限) output_dir: /var/log/db_exporter - # 断点存放目录(需有写入权限) + # 断点存放目录(需有写入权限) # 每次拉取后保存最后一条记录的 ID,实现断点续传 checkpoint_dir: /var/lib/db_exporter/checkpoints - # 本程序写入日志时使用的 hostname(出现在 syslog 行中) + # 本程序写入日志时使用的 hostname(出现在 syslog 行中) # 不填则自动取系统 hostname hostname: "" - # 全局默认轮询间隔(秒),单个 source 可单独覆盖 + # 全局默认轮询间隔(秒),单个 source 可单独覆盖 interval: 30 # 全局默认每次最多读取条数,单个 source 可单独覆盖 @@ -56,7 +56,7 @@ databases: # --------------------------------------------------------------------------- -# 日志源定义(每个 source = 一个数据库表) +# 日志源定义(每个 source = 一个数据库表) # --------------------------------------------------------------------------- sources: @@ -68,24 +68,29 @@ sources: database: mysql_prod # 要查询的表名 table: access_log - # 输出到 output_dir 中的文件名(支持子目录,如 "subdir/app.log") + # 输出到 output_dir 中的文件名(支持子目录,如 "subdir/app.log") log_file: mysql_access.log - # 此数据源的 app_name(出现在 syslog 行 []) + # 此数据源的 app_name(出现在 syslog 行 []) app_name: access-log - # 此数据源的轮询间隔(秒),覆盖全局配置 + # 此数据源的轮询间隔(秒),覆盖全局配置 interval: 15 # 每次最多读取条数,覆盖全局配置 batch_size: 500 - # 列名映射(当数据库列名与默认值不同时使用) + # 列名映射(当数据库列名与默认值不同时使用) columns: - id: id # 主键/自增列(必填,用于断点跟踪) - timestamp: created_at # 时间戳列(必填) - level: log_level # 日志级别列(必填,值如 INFO/ERROR/WARN) - message: msg # 日志内容列(必填) + id: id # 主键/自增列(必填,用于断点跟踪) + timestamp: created_at # 时间戳列(必填) + level: log_level # 日志级别列(必填,值如 INFO/ERROR/WARN) + message: msg # 日志内容列(必填) # 以下为可选扩展字段,可在 syslog structured data 中体现 trace_id: trace_id span_id: span_id extra: extra_data + # 操作审计字段 + user: user + action: action + target: target + result: result # ------------------------------- # MySQL 日志表 — 错误日志 diff --git a/db_log_exporter.py b/db_log_exporter.py index 32e892f..81ae9fc 100755 --- a/db_log_exporter.py +++ b/db_log_exporter.py @@ -4,9 +4,9 @@ db_log_exporter — 数据库日志导出守护进程 从 MySQL / PostgreSQL 数据库表中定时提取日志,写入标准 syslog 格式文本文件。 -支持每个数据源独立跟踪偏移量(断点续传),线程安全,无重复日志。 +支持每个数据源独立跟踪偏移量(断点续传),线程安全,无重复日志。 -标准输出格式(RFC 5424 变体,每行一条): +标准输出格式(RFC 5424 变体,每行一条): []: 作者:QClaw @@ -68,7 +68,7 @@ SYSLOG_PRIORITY = { "FATAL": "<2>", } -# 默认字段映射(当配置中未指定时使用) +# 默认字段映射(当配置中未指定时使用) DEFAULT_COLUMN_MAP = { "id": "id", "timestamp": "created_at", @@ -80,9 +80,14 @@ DEFAULT_COLUMN_MAP = { "trace_id": "trace_id", "span_id": "span_id", "extra": "extra", + # operation audit fields + "user": "user", + "action": "action", + "target": "target", + "result": "result", } -DEFAULT_INTERVAL = 30 # 默认轮询间隔(秒) +DEFAULT_INTERVAL = 30 # 默认轮询间隔(秒) DEFAULT_BATCH = 1000 # 默认每次最多读取条数 DEFAULT_HOSTNAME = socket.gethostname() @@ -104,7 +109,7 @@ def setup_logging(level: str = "INFO", log_file: Optional[str] = None) -> loggin ch.setFormatter(logging.Formatter(fmt, datefmt)) logger.addHandler(ch) - # 文件(可选) + # 文件(可选) if log_file: fh = logging.FileHandler(log_file, encoding="utf-8") fh.setFormatter(logging.Formatter(fmt, datefmt)) @@ -134,22 +139,29 @@ def format_syslog_line( trace_id: str = "", span_id: str = "", extra: str = "", + user: str = "", + action: str = "", + target: str = "", + result: str = "", ) -> str: """ 格式化为 RFC 5424 变体 syslog 行。 格式: version timestamp hostname appname pid msgid structured_data message - 示例(完整一行): - <6>1 2026-01-01T12:00:00.123456+08:00 myhost myapp[12345]: [trace=abc] message text + 示例(完整一行): + <6>1 2026-01-01T12:00:00.123456+08:00 myhost myapp[12345]: [user=张三] [action=删除订单] [target=订单系统] [result=成功] message text """ pri = get_syslog_priority(level) - # 去掉 level 字符串两端方括号(如果有) clean_msg = message.strip() parts = [f"[{k}={v}]" for k, v in [ ("trace", trace_id), ("span", span_id), ("extra", extra), + ("user", user), + ("action", action), + ("target", target), + ("result", result), ] if v] structured = " ".join(parts) if structured: @@ -167,14 +179,14 @@ def acquire_lock_file(lock_path: str) -> int: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: os.close(fd) - raise RuntimeError(f"另一个 {APP_NAME} 实例已在运行(锁文件:{lock_path}),请先停止。") + raise RuntimeError(f"另一个 {APP_NAME} 实例已在运行(锁文件:{lock_path}),请先停止。") return fd def load_checkpoint(checkpoint_dir: str, source_name: str) -> Optional[Any]: """ - 加载断点文件,返回 last_id(可以是 int、datetime 或任意可序列化值)。 - 若文件不存在,返回 None(从头开始拉取)。 + 加载断点文件,返回 last_id(可以是 int、datetime 或任意可序列化值)。 + 若文件不存在,返回 None(从头开始拉取)。 """ path = Path(checkpoint_dir) / f"{source_name}.json" if not path.exists(): @@ -229,7 +241,7 @@ def build_mysql_query(cfg: Dict[str, Any], col_map: Dict[str, str], ts_col = col_map["timestamp"] lvl_col = col_map["level"] msg_col = col_map["message"] - # 构造 SELECT 列(处理别名) + # 构造 SELECT 列(处理别名) extra_cols = [] for alias, db_col in col_map.items(): if alias not in ("id", "timestamp", "level", "message") and db_col: @@ -262,7 +274,7 @@ def build_mysql_query(cfg: Dict[str, Any], col_map: Dict[str, str], def build_pg_query(cfg: Dict[str, Any], col_map: Dict[str, str], last_id: Optional[int], batch: int) -> Tuple[str, Tuple]: - """构建 PostgreSQL 查询语句(语法与 MySQL 基本兼容,%s 替换)""" + """构建 PostgreSQL 查询语句(语法与 MySQL 基本兼容,%s 替换)""" return build_mysql_query(cfg, col_map, last_id, batch) # 参数风格相同 @@ -366,7 +378,7 @@ class LogSource: self.app_name = table_config.get("app_name", name) self.pid_str = str(os.getpid()) self.col_map = merge_column_map(table_config.get("columns")) - self.filter_query = table_config.get("filter", "") # WHERE 子句(不含 WHERE) + self.filter_query = table_config.get("filter", "") # WHERE 子句(不含 WHERE) self.log_file_pattern = table_config.get("log_file", f"{name}.log") self._last_id: Optional[Any] = None @@ -420,7 +432,7 @@ class LogSource: # 有新日志,短暂 sleep 避免空转 self._stop_evt.wait(min(self.interval, 2.0)) except DatabaseError as e: - logger.error("[%s] 数据库错误(%s),%ds 后重试: %s", self.name, self.db_type, self.interval, e) + logger.error("[%s] 数据库错误(%s),%ds 后重试: %s", self.name, self.db_type, self.interval, e) self._stop_evt.wait(self.interval) except Exception as e: logger.exception("[%s] 未知异常,%ds 后重试: %s", self.name, self.interval, e) @@ -450,7 +462,7 @@ class LogSource: elif self.db_type == "postgresql" or self.db_type == "postgres": return get_rows_pg(self.db_config, self.col_map, self._last_id, self.batch_size) else: - raise ValueError(f"不支持的数据库类型: {self.db_type}(仅支持 mysql/postgresql)") + raise ValueError(f"不支持的数据库类型: {self.db_type}(仅支持 mysql/postgresql)") def _write_rows(self, rows: List[Dict]) -> None: """将行数据写入日志文件""" @@ -483,6 +495,10 @@ class LogSource: tid_val = str(row.get(self.col_map.get("trace_id", "")) or "") sid_val = str(row.get(self.col_map.get("span_id", "")) or "") ext_val = str(row.get(self.col_map.get("extra", "")) or "") + usr_val = str(row.get(self.col_map.get("user", "") or "") or "") + act_val = str(row.get(self.col_map.get("action", "") or "") or "") + tgt_val = str(row.get(self.col_map.get("target", "") or "") or "") + res_val = str(row.get(self.col_map.get("result", "") or "") or "") # 处理时间戳格式 timestamp = self._format_timestamp(ts_val) @@ -497,11 +513,15 @@ class LogSource: trace_id = tid_val, span_id = sid_val, extra = ext_val, + user = usr_val, + action = act_val, + target = tgt_val, + result = res_val, ) def _format_timestamp(self, val: Any) -> str: """ - 将任意时间格式转为 ISO 8601 格式(带时区)。 + 将任意时间格式转为 ISO 8601 格式(带时区)。 支持: datetime / date / 字符串 / unix timestamp(float/int) / None """ if val is None: @@ -580,7 +600,7 @@ def create_parser() -> argparse.ArgumentParser: epilog=""" 示例: %(prog)s -c /etc/db_log_exporter/config.yaml - %(prog)s -c config.yaml --once # 运行一次(不守护)并退出 + %(prog)s -c config.yaml --once # 运行一次(不守护)并退出 %(prog)s -c config.yaml --dry-run # 仅连接测试,不写入文件 %(prog)s -c config.yaml --log-level DEBUG """, @@ -588,14 +608,14 @@ def create_parser() -> argparse.ArgumentParser: parser.add_argument("-c", "--config", required=True, help="YAML 配置文件路径") parser.add_argument("--once", action="store_true", - help="仅执行一次轮询后退出(不守护)") + help="仅执行一次轮询后退出(不守护)") parser.add_argument("--dry-run", action="store_true", help="仅测试数据库连接,不写入日志文件") parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], - help="本程序的日志级别(默认 INFO)") + help="本程序的日志级别(默认 INFO)") parser.add_argument("--log-file", - help="本程序的日志输出文件(默认仅输出到 stdout)") + help="本程序的日志输出文件(默认仅输出到 stdout)") return parser @@ -628,7 +648,7 @@ def main() -> int: logger.warning("配置中没有发现任何数据源,请检查配置文件。") return 0 - # 确保必要目录存在(如果以 root 运行时) + # 确保必要目录存在(如果以 root 运行时) for d in [config.get("global", {}).get("output_dir", "/var/log/db_exporter"), config.get("global", {}).get("checkpoint_dir", "/var/lib/db_exporter/checkpoints")]: try: @@ -643,20 +663,20 @@ def main() -> int: for src in sources: try: rows, _ = src._fetch() - logger.info("[%s] ✅ 连接成功,当前有新日志 %d 条(未写入)", src.name, len(rows)) + logger.info("[%s] ✅ 连接成功,当前有新日志 %d 条(未写入)", src.name, len(rows)) except Exception as e: logger.error("[%s] ❌ 连接失败: %s", src.name, e) ok = False return 0 if ok else 1 - # 获取锁文件(防止重复启动) + # 获取锁文件(防止重复启动) lock_file = f"/var/run/{APP_NAME}/{APP_NAME}.lock" lock_fd = None try: Path(lock_file).parent.mkdir(parents=True, exist_ok=True) lock_fd = acquire_lock_file(lock_file) except PermissionError: - logger.warning("无权限创建锁文件 %s,跳过锁定检测(可能重复启动)", lock_file) + logger.warning("无权限创建锁文件 %s,跳过锁定检测(可能重复启动)", lock_file) except RuntimeError as e: logger.error("%s", e) return 1 @@ -689,7 +709,7 @@ def main() -> int: src.stop(timeout=5.0) logger.info("=== 单次轮询完成,退出 ===") else: - # 守护进程主循环(防止主线程退出) + # 守护进程主循环(防止主线程退出) while True: time.sleep(3600) diff --git a/db_log_exporter.service b/db_log_exporter.service index aa30bde..cdbd2e1 100644 --- a/db_log_exporter.service +++ b/db_log_exporter.service @@ -5,7 +5,7 @@ After=network.target mysql.service postgresql.service Wants=mysql.service postgresql.service [Service] -# 以专门的用户/组运行(推荐先创建,示例:useradd -r -s /sbin/nologin db_exporter) +# 以专门的用户/组运行(推荐先创建,示例:useradd -r -s /sbin/nologin db_exporter) User=root Group=root @@ -26,10 +26,10 @@ ProtectSystem=strict ProtectHome=true ReadWritePaths=/var/log/db_exporter /var/lib/db_exporter -# 环境变量(可选,敏感信息可用 systemd secret 或环境文件) +# 环境变量(可选,敏感信息可用 systemd secret 或环境文件) # EnvironmentFile=/etc/db_log_exporter/env -# 日志输出(systemd journal) +# 日志输出(systemd journal) StandardOutput=journal StandardError=journal SyslogIdentifier=db_log_exporter diff --git a/setup.sh b/setup.sh index 9a7e3a3..e6bbcf9 100755 --- a/setup.sh +++ b/setup.sh @@ -52,11 +52,11 @@ check_deps() { error "缺少依赖: ${missing[*]} \n请先安装: pip3 install PyMySQL psycopg2-binary PyYAML" fi - # 检查 MySQL / PostgreSQL 客户端(仅检查命令是否存在,不强制要求服务运行) + # 检查 MySQL / PostgreSQL 客户端(仅检查命令是否存在,不强制要求服务运行) if command -v mysql &>/dev/null || command -v psql &>/dev/null; then info "数据库客户端已找到" else - warn "未找到 mysql/psql 客户端(程序使用 Python DB 驱动,不影响运行)" + warn "未找到 mysql/psql 客户端(程序使用 Python DB 驱动,不影响运行)" fi info "依赖检查通过 ✓" @@ -85,7 +85,7 @@ do_install() { info "文件已复制到 ${INSTALL_DIR} 和 ${CONFIG_DIR}" - # 3. 创建 config.yaml(如果不存在) + # 3. 创建 config.yaml(如果不存在) if [ ! -f "${CONFIG_DIR}/config.yaml" ]; then if [ -f "${CONFIG_DIR}/config.yaml.example" ]; then cp "${CONFIG_DIR}/config.yaml.example" "${CONFIG_DIR}/config.yaml" @@ -126,7 +126,7 @@ do_uninstall() { rm -f "/etc/systemd/system/${SYSTEMD_UNIT}" systemctl daemon-reload - info "删除安装目录(确认不再需要)..." + info "删除安装目录(确认不再需要)..." read -rp "是否删除 ${INSTALL_DIR} 和 ${CONFIG_DIR}?[y/N]: " confirm if [[ "$confirm" =~ ^[Yy]$ ]]; then rm -rf "${INSTALL_DIR}" "${CONFIG_DIR}" "${LOG_DIR}" "${DATA_DIR}" "${RUN_DIR}" @@ -144,7 +144,7 @@ do_status() { info "服务状态: 运行中 ✓" systemctl status "${APP_NAME}" --no-pager elif systemctl is-enabled --quiet "${APP_NAME}"; then - warn "服务状态: 未运行(已开机自启)" + warn "服务状态: 未运行(已开机自启)" else warn "服务状态: 未安装或未启用" fi @@ -171,5 +171,5 @@ case "$ACTION" in install) do_install "$@" ;; uninstall) do_uninstall "$@" ;; status) do_status "$@" ;; - *) error "未知操作: $ACTION(支持: install | uninstall | status)" ;; + *) error "未知操作: $ACTION(支持: install | uninstall | status)" ;; esac