commit 7cd8f7840443423c6377c8609980a690ebd2e32a Author: QClaw Bot Date: Tue May 12 19:02:46 2026 +0800 initial: db_log_exporter v1.0 diff --git a/README.md b/README.md new file mode 100644 index 0000000..76a962f --- /dev/null +++ b/README.md @@ -0,0 +1,306 @@ +# db_log_exporter + +从 MySQL / PostgreSQL 数据库表中**定时拉取日志**,写入标准 **syslog 格式**(RFC 5424 变体)文本文件。 + +--- + +## 功能特性 + +| 特性 | 说明 | +|------|------| +| **多数据库支持** | MySQL + PostgreSQL 同时运行,互不干扰 | +| **断点续传** | 每个数据源独立记录最后拉取的 ID,重启不丢、不重复 | +| **标准格式** | 输出 RFC 5424 syslog 行,可被 rsyslog、Filebeat、Promtail 等直接采集 | +| **多数据源** | 一个进程管理多个表,不同表可配置不同数据库、轮询间隔、输出文件 | +| **线程安全** | 每个数据源独立线程,并发拉取,互不影响 | +| **灵活列映射** | 数据库列名不同时可在配置中映射 | +| **Dry-run** | `--dry-run` 模式测试连接,不写文件 | +| **单次模式** | `--once` 模式跑完一轮即退出,适合 cron 场景 | +| **systemd 集成** | 提供 `.service` 文件,支持开机自启 | + +--- + +## 输出格式 + +每行一条日志,格式如下(RFC 5424 变体): + +``` +1 2026-05-12T15:30:00.123456+08:00 hostname app_name[12345]: [trace=abc123] [span=def456] 日志内容 +``` + +| 字段 | 说明 | +|------|------| +| `` | syslog 优先级,`<6>`=INFO `<4>`=WARN `<3>`=ERROR 等 | +| `version` | RFC 5424 版本号,始终为 `1` | +| `timestamp` | ISO 8601 时间(含微秒和时区) | +| `hostname` | 配置文件中的 hostname(默认系统 hostname) | +| `app_name[pid]` | 配置的 app_name + 本进程 PID | +| `[trace=...] [span=...]` | 结构化数据(trace_id / span_id 等) | +| `message` | 日志正文 | + +--- + +## 快速开始 + +### 1. 安装依赖 + +```bash +# CentOS / RHEL / Fedora +sudo yum install -y python3 python3-pip +sudo pip3 install PyMySQL psycopg2-binary PyYAML + +# Debian / Ubuntu +sudo apt-get install -y python3 python3-pip +sudo pip3 install PyMySQL psycopg2-binary PyYAML +``` + +### 2. 下载程序 + +```bash +sudo mkdir -p /opt/db_log_exporter +sudo curl -L https://your-repo/db_log_exporter.py \ + -o /opt/db_log_exporter/db_log_exporter.py +sudo chmod +x /opt/db_log_exporter/db_log_exporter.py +``` + +### 3. 编写配置 + +```bash +sudo mkdir -p /etc/db_log_exporter +sudo nano /etc/db_log_exporter/config.yaml +``` + +参考 `config.yaml.example`(本仓库根目录),主要填: + +```yaml +databases: + mysql_prod: + type: mysql + host: 192.168.1.100 + port: 3306 + user: log_reader + password: "your_password" + database: app_logs + +sources: + - name: access_log + database: mysql_prod + table: access_log + columns: + id: id + timestamp: created_at + level: log_level + message: message +``` + +### 4. 测试连接(Dry-run) + +```bash +python3 /opt/db_log_exporter/db_log_exporter.py \ + -c /etc/db_log_exporter/config.yaml \ + --dry-run +``` + +### 5. 运行方式 + +**方式 A:systemd 守护进程(推荐)** + +```bash +# 复制 service 文件 +sudo cp db_log_exporter.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable --now db_log_exporter +sudo journalctl -u db_log_exporter -f # 实时查看日志 +``` + +**方式 B:cron 定时任务(单次模式)** + +```bash +# crontab -e +* * * * * python3 /opt/db_log_exporter/db_log_exporter.py \ + -c /etc/db_log_exporter/config.yaml --once +``` + +**方式 C:直接运行(前台守护)** + +```bash +python3 /opt/db_log_exporter/db_log_exporter.py \ + -c /etc/db_log_exporter/config.yaml +``` + +--- + +## 配置说明 + +详细配置参考 `config.yaml.example`,核心字段: + +```yaml +global: + output_dir: /var/log/db_exporter # 日志输出目录 + checkpoint_dir: /var/lib/... # 断点文件目录 + hostname: myserver # syslog hostname + interval: 30 # 默认轮询间隔(秒) + batch_size: 1000 # 默认每次最多读条数 + +databases: + 别名: + type: mysql | postgresql + host: ... + port: ... + user: ... + password: ... + database: ... # MySQL: database 名 + dbname: ... # PostgreSQL: dbname + charset: utf8mb4 # MySQL 专用 + +sources: + - name: 源名称 # 唯一标识,用于断点文件名 + database: 引用上方别名 + table: 表名 + log_file: 输出文件名 # 相对于 output_dir + app_name: syslog程序名 + interval: 15 # 覆盖全局间隔 + batch_size: 500 # 覆盖全局批次大小 + columns: # 列名映射 + id: id + timestamp: created_at + level: log_level + message: message + trace_id: trace_id # 可选 + span_id: span_id # 可选 +``` + +### 数据库用户权限(最小权限) + +**MySQL:** +```sql +CREATE USER 'log_reader'@'%' IDENTIFIED BY 'password'; +GRANT SELECT ON app_logs.access_log TO 'log_reader'@'%'; +GRANT SELECT ON app_logs.error_log TO 'log_reader'@'%'; +FLUSH PRIVILEGES; +``` + +**PostgreSQL:** +```sql +CREATE USER log_reader WITH PASSWORD 'password'; +GRANT CONNECT ON DATABASE app_logs TO log_reader; +GRANT SELECT ON application_logs TO log_reader; +GRANT SELECT ON audit_log TO log_reader; +``` + +--- + +## 与日志采集系统集成 + +### rsyslog(服务器接收) + +```conf +# /etc/rsyslog.d/60-db-exporter.conf +module(load="imfile" PollingInterval="10") + +input(type="imfile" + File="/var/log/db_exporter/mysql_access.log" + Tag="db:access:" + Facility="local0") +``` + +### Filebeat(采集到 Elasticsearch) + +```yaml +# filebeat.yml +filebeat.inputs: + - type: log + paths: + - /var/log/db_exporter/*.log + fields: + log_type: db_exporter + fields_under_root: true +``` + +### Promtail(Loki 采集) + +```yaml +# promtail.yml +scrape_configs: + - job_name: db_exporter + static_configs: + - targets: [localhost] + labels: + job: db_exporter + __path__: /var/log/db_exporter/*.log +``` + +--- + +## 故障排查 + +| 问题 | 排查方法 | +|------|---------| +| 服务启动失败 | `journalctl -u db_log_exporter -e` 查看错误日志 | +| 连接被拒绝 | 确认数据库允许该 IP 连接,检查防火墙/安全组 | +| 权限不足 | 确认运行用户对 `output_dir`、`checkpoint_dir` 有写权限 | +| 日志重复 | 删除对应断点文件并重启,程序会从头拉取 | +| 中文乱码 | 确认数据库字符集为 `utf8mb4`(MySQL)或 `UTF8`(PG) | +| 连接超时 | 在 `databases` 中加 `connect_timeout: 10` | + +--- + +## 目录结构 + +``` +db_log_exporter/ +├── db_log_exporter.py # 主程序(Python 守护进程) +├── config.yaml.example # 配置文件示例 +├── requirements.txt # Python 依赖 +├── db_log_exporter.service # systemd 服务文件 +├── setup.sh # 自动化安装脚本 +└── README.md # 本文件 +``` + +--- + +## 命令行参数 + +``` +-c, --config YAML 配置文件路径(必填) +--once 仅执行一次轮询后退出(适合 cron) +--dry-run 仅测试数据库连接,不写文件 +--log-level 日志级别: DEBUG|INFO|WARNING|ERROR(默认 INFO) +--log-file 本程序日志输出文件(默认 stdout) +``` + +--- + +## 安全建议 + +1. **数据库密码不要明文写在配置中**,使用环境变量或 systemd secret: + ```bash + # /etc/db_log_exporter/env + DB_PASSWORD=your_password + ``` + 然后在 `config.yaml` 中用 `${DB_PASSWORD}` 引用 + +2. **以最小权限用户运行服务**,不要用 root: + ```bash + useradd -r -s /sbin/nologin db_exporter + chown -R db_exporter:db_exporter /var/log/db_exporter /var/lib/db_exporter + # 修改 service 文件中的 User=db_exporter + ``` + +3. **日志文件及时轮转**,防止磁盘爆满: + ```bash + # /etc/logrotate.d/db_log_exporter + /var/log/db_exporter/*.log { + daily + rotate 7 + compress + delaycompress + missingok + notifempty + create 0644 root root + sharedscripts + postrotate + systemctl reload db_log_exporter > /dev/null 2>&1 || true + endscript + } + ``` diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..59a3011 --- /dev/null +++ b/config.yaml @@ -0,0 +1,137 @@ +# ============================================================================== +# db_log_exporter — 配置文件示例 +# ============================================================================== +# 路径: /etc/db_log_exporter/config.yaml +# ============================================================================== +# 修改说明: +# 1. 在 databases 节点下定义各数据库连接信息 +# 2. 在 sources 节点下定义需要导出的日志表 +# 3. 每个 source 通过 database 字段引用一个已定义的数据库 +# ============================================================================== + +# --------------------------------------------------------------------------- +# 全局配置 +# --------------------------------------------------------------------------- +global: + # 日志输出目录(需有写入权限) + output_dir: /var/log/db_exporter + + # 断点存放目录(需有写入权限) + # 每次拉取后保存最后一条记录的 ID,实现断点续传 + checkpoint_dir: /var/lib/db_exporter/checkpoints + + # 本程序写入日志时使用的 hostname(出现在 syslog 行中) + # 不填则自动取系统 hostname + hostname: "" + + # 全局默认轮询间隔(秒),单个 source 可单独覆盖 + interval: 30 + + # 全局默认每次最多读取条数,单个 source 可单独覆盖 + batch_size: 1000 + + +# --------------------------------------------------------------------------- +# 数据库连接定义 +# --------------------------------------------------------------------------- +databases: + # ---------- MySQL 示例 ---------- + mysql_prod: + type: mysql + host: 192.168.1.100 + port: 3306 + user: log_reader + password: "your_password_here" + database: app_logs + charset: utf8mb4 + + # ---------- PostgreSQL 示例 ---------- + pg_prod: + type: postgresql + host: 192.168.1.200 + port: 5432 + user: log_reader + password: "your_password_here" + dbname: app_logs + + +# --------------------------------------------------------------------------- +# 日志源定义(每个 source = 一个数据库表) +# --------------------------------------------------------------------------- +sources: + + # ------------------------------- + # MySQL 日志表 — 按 ID 自增主键 + # ------------------------------- + - name: mysql_access_log + # 引用上方定义的数据库 + database: mysql_prod + # 要查询的表名 + table: access_log + # 输出到 output_dir 中的文件名(支持子目录,如 "subdir/app.log") + log_file: mysql_access.log + # 此数据源的 app_name(出现在 syslog 行 []) + app_name: access-log + # 此数据源的轮询间隔(秒),覆盖全局配置 + interval: 15 + # 每次最多读取条数,覆盖全局配置 + batch_size: 500 + # 列名映射(当数据库列名与默认值不同时使用) + columns: + id: id # 主键/自增列(必填,用于断点跟踪) + timestamp: created_at # 时间戳列(必填) + level: log_level # 日志级别列(必填,值如 INFO/ERROR/WARN) + message: msg # 日志内容列(必填) + # 以下为可选扩展字段,可在 syslog structured data 中体现 + trace_id: trace_id + span_id: span_id + extra: extra_data + + # ------------------------------- + # MySQL 日志表 — 错误日志 + # ------------------------------- + - name: mysql_error_log + database: mysql_prod + table: error_log + log_file: mysql_error.log + app_name: error-log + interval: 10 + columns: + id: id + timestamp: created_at + level: level + message: message + trace_id: trace_id + + # ------------------------------- + # PostgreSQL 日志表 — 应用日志 + # ------------------------------- + - name: pg_app_log + database: pg_prod + table: application_logs + log_file: pg_app.log + app_name: pg-app + interval: 30 + columns: + id: log_id + timestamp: logged_at + level: severity + message: content + logger: component + trace_id: trace_id + extra: metadata + + # ------------------------------- + # PostgreSQL 日志表 — 审计日志 + # ------------------------------- + - name: pg_audit_log + database: pg_prod + table: audit_log + log_file: pg_audit.log + app_name: pg-audit + interval: 60 + columns: + id: audit_id + timestamp: happened_at + level: event_type + message: description diff --git a/db_log_exporter.py b/db_log_exporter.py new file mode 100755 index 0000000..32e892f --- /dev/null +++ b/db_log_exporter.py @@ -0,0 +1,702 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +db_log_exporter — 数据库日志导出守护进程 + +从 MySQL / PostgreSQL 数据库表中定时提取日志,写入标准 syslog 格式文本文件。 +支持每个数据源独立跟踪偏移量(断点续传),线程安全,无重复日志。 + +标准输出格式(RFC 5424 变体,每行一条): + []: + +作者:QClaw +依赖:PyMySQL / psycopg2-binary / PyYAML +""" + +import argparse +import fcntl +import fnmatch +import json +import logging +import os +import re +import signal +import socket +import struct +import sys +import threading +import time +from collections import defaultdict +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import yaml + +try: + import pymysql +except ImportError: + pymysql = None + +try: + import psycopg2 +except ImportError: + psycopg2 = None + + +# ============================================================================== +# 全局配置 & 日志 +# ============================================================================== + +APP_NAME = "db_log_exporter" +LOGGER: Optional[logging.Logger] = None + +# Syslog 优先级映射 +SYSLOG_PRIORITY = { + "DEBUG": "<7>", + "TRACE": "<7>", + "INFO": "<6>", + "NOTICE": "<5>", + "WARN": "<4>", + "WARNING": "<4>", + "ERROR": "<3>", + "ERR": "<3>", + "CRITICAL": "<2>", + "CRIT": "<2>", + "ALERT": "<1>", + "EMERG": "<0>", + "FATAL": "<2>", +} + +# 默认字段映射(当配置中未指定时使用) +DEFAULT_COLUMN_MAP = { + "id": "id", + "timestamp": "created_at", + "level": "level", + "logger": "logger", + "message": "message", + "host": "host", + "source": "source", + "trace_id": "trace_id", + "span_id": "span_id", + "extra": "extra", +} + +DEFAULT_INTERVAL = 30 # 默认轮询间隔(秒) +DEFAULT_BATCH = 1000 # 默认每次最多读取条数 +DEFAULT_HOSTNAME = socket.gethostname() + + +# ============================================================================== +# 日志工具 +# ============================================================================== + +def setup_logging(level: str = "INFO", log_file: Optional[str] = None) -> logging.Logger: + """配置本程序自身的日志输出""" + global LOGGER + logger = logging.getLogger(APP_NAME) + logger.setLevel(getattr(logging, level.upper(), logging.INFO)) + fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" + datefmt = "%Y-%m-%d %H:%M:%S" + + # 控制台 + ch = logging.StreamHandler(sys.stdout) + ch.setFormatter(logging.Formatter(fmt, datefmt)) + logger.addHandler(ch) + + # 文件(可选) + if log_file: + fh = logging.FileHandler(log_file, encoding="utf-8") + fh.setFormatter(logging.Formatter(fmt, datefmt)) + logger.addHandler(fh) + + LOGGER = logger + return logger + + +# ============================================================================== +# 工具函数 +# ============================================================================== + +def get_syslog_priority(level_str: str) -> str: + """将日志级别字符串转为 syslog 优先级标记""" + normalized = level_str.strip().upper() + return SYSLOG_PRIORITY.get(normalized, "<6>") # 默认为 INFO + + +def format_syslog_line( + timestamp: str, + hostname: str, + app_name: str, + pid: str, + level: str, + message: str, + trace_id: str = "", + span_id: str = "", + extra: str = "", +) -> str: + """ + 格式化为 RFC 5424 变体 syslog 行。 + 格式: + version timestamp hostname appname pid msgid structured_data message + + 示例(完整一行): + <6>1 2026-01-01T12:00:00.123456+08:00 myhost myapp[12345]: [trace=abc] message text + """ + pri = get_syslog_priority(level) + # 去掉 level 字符串两端方括号(如果有) + clean_msg = message.strip() + parts = [f"[{k}={v}]" for k, v in [ + ("trace", trace_id), + ("span", span_id), + ("extra", extra), + ] if v] + structured = " ".join(parts) + if structured: + structured += " " + return f"{pri}1 {timestamp} {hostname} {app_name}[{pid}]: {structured}{clean_msg}" + + +def acquire_lock_file(lock_path: str) -> int: + """ + 使用 flock 机制获取文件锁,防止重复启动。 + 返回文件描述符,使用完毕后需 close。 + """ + fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o644) + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + os.close(fd) + raise RuntimeError(f"另一个 {APP_NAME} 实例已在运行(锁文件:{lock_path}),请先停止。") + return fd + + +def load_checkpoint(checkpoint_dir: str, source_name: str) -> Optional[Any]: + """ + 加载断点文件,返回 last_id(可以是 int、datetime 或任意可序列化值)。 + 若文件不存在,返回 None(从头开始拉取)。 + """ + path = Path(checkpoint_dir) / f"{source_name}.json" + if not path.exists(): + return None + try: + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + return data.get("last_id") + except (json.JSONDecodeError, IOError) as e: + if LOGGER: + LOGGER.warning("断点文件损坏 [%s],将从头拉取: %s", path, e) + return None + + +def save_checkpoint(checkpoint_dir: str, source_name: str, last_id: Any) -> None: + """持久化断点""" + Path(checkpoint_dir).mkdir(parents=True, exist_ok=True) + path = Path(checkpoint_dir) / f"{source_name}.json" + tmp = path.with_suffix(".tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump({"last_id": last_id, "saved_at": datetime.now().isoformat()}, f) + tmp.rename(path) # 原子替换 + + +def load_config(config_path: str) -> Dict[str, Any]: + """加载 YAML 配置文件""" + with open(config_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + + +def merge_column_map(config_map: Optional[Dict[str, str]]) -> Dict[str, str]: + """合并配置的列映射与默认值""" + result = dict(DEFAULT_COLUMN_MAP) + if config_map: + result.update(config_map) + return result + + +# ============================================================================== +# 数据库连接工厂 +# ============================================================================== + +class DatabaseError(Exception): + """数据库操作异常""" + pass + + +def build_mysql_query(cfg: Dict[str, Any], col_map: Dict[str, str], + last_id: Optional[int], batch: int) -> Tuple[str, Tuple]: + """构建 MySQL 查询语句及参数""" + id_col = col_map["id"] + ts_col = col_map["timestamp"] + lvl_col = col_map["level"] + msg_col = col_map["message"] + # 构造 SELECT 列(处理别名) + extra_cols = [] + for alias, db_col in col_map.items(): + if alias not in ("id", "timestamp", "level", "message") and db_col: + extra_cols.append(f"{db_col} AS {alias}") + + select_parts = [id_col, ts_col, lvl_col, msg_col] + extra_cols + # 去重,避免 last_id 列在 extra_cols 里重复 + seen = {id_col, ts_col, lvl_col, msg_col} + deduped = [] + for p in select_parts: + col = p.split(" AS ")[0].strip() + if col not in seen: + deduped.append(p) + seen.add(col) + + select_clause = ", ".join(deduped) if deduped else "*" + + if last_id is not None: + sql = (f"SELECT {select_clause} FROM {cfg['table']} " + f"WHERE {id_col} > %s " + f"ORDER BY {id_col} ASC LIMIT %s") + params: Tuple = (last_id, batch) + else: + sql = (f"SELECT {select_clause} FROM {cfg['table']} " + f"ORDER BY {id_col} ASC LIMIT %s") + params = (batch,) + + return sql, params + + +def build_pg_query(cfg: Dict[str, Any], col_map: Dict[str, str], + last_id: Optional[int], batch: int) -> Tuple[str, Tuple]: + """构建 PostgreSQL 查询语句(语法与 MySQL 基本兼容,%s 替换)""" + return build_mysql_query(cfg, col_map, last_id, batch) # 参数风格相同 + + +def get_rows_mysql(cfg: Dict[str, Any], col_map: Dict[str, str], + last_id: Optional[int], batch: int) -> Tuple[List[Dict], Optional[int]]: + """从 MySQL 读取日志行,返回 (rows, new_last_id)""" + if pymysql is None: + raise DatabaseError("PyMySQL 未安装,请运行: pip install pymysql") + + conn_cfg = { + "host": cfg.get("host", "localhost"), + "port": cfg.get("port", 3306), + "user": cfg["user"], + "password": cfg["password"], + "database": cfg.get("database", ""), + "charset": cfg.get("charset", "utf8mb4"), + "cursorclass": pymysql.cursors.DictCursor, + } + # 不指定 database 时用 None,后续再选 + if not cfg.get("database"): + conn_cfg.pop("database", None) + + conn = pymysql.connect(**conn_cfg) + try: + with conn.cursor() as cur: + if cfg.get("database"): + sql, params = build_mysql_query(cfg, col_map, last_id, batch) + else: + # 先切换 database + cur.execute(f"USE {cfg['database_name']}") + sql, params = build_mysql_query(cfg, col_map, last_id, batch) + cur.execute(sql, params) + rows = cur.fetchall() + finally: + conn.close() + + new_last = rows[-1][col_map["id"]] if rows else last_id + return rows, new_last + + +def get_rows_pg(cfg: Dict[str, Any], col_map: Dict[str, str], + last_id: Optional[int], batch: int) -> Tuple[List[Dict], Optional[int]]: + """从 PostgreSQL 读取日志行,返回 (rows, new_last_id)""" + if psycopg2 is None: + raise DatabaseError("psycopg2 未安装,请运行: pip install psycopg2-binary") + + dsn_parts = [] + for key in ("host", "port", "dbname", "user", "password"): + val = cfg.get(key) + if val is not None: + if key == "password": + dsn_parts.append(f'password="{val}"') + else: + dsn_parts.append(f"{key}='{val}'") + dsn = " ".join(dsn_parts) + + conn = psycopg2.connect(dsn) + try: + with conn.cursor() as cur: + sql, params = build_pg_query(cfg, col_map, last_id, batch) + cur.execute(sql, params) + rows = cur.fetchall() + # psycopg2 返回普通 tuple,需要手动映射列名 + col_names = [desc[0] for desc in cur.description] + rows = [dict(zip(col_names, r)) for r in rows] + finally: + conn.close() + + new_last = rows[-1][col_map["id"]] if rows else last_id + return rows, new_last + + +# ============================================================================== +# 单个日志源处理器 +# ============================================================================== + +class LogSource: + """单个数据库日志源""" + + def __init__( + self, + name: str, + db_type: str, + db_config: Dict[str, Any], + table_config: Dict[str, Any], + output_dir: str, + checkpoint_dir: str, + interval: int, + batch_size: int, + hostname: str, + ): + self.name = name + self.db_type = db_type.lower() + self.db_config = db_config + self.table_config = table_config + self.output_dir = Path(output_dir) + self.checkpoint_dir = Path(checkpoint_dir) + self.interval = interval + self.batch_size = batch_size + self.hostname = hostname + self.app_name = table_config.get("app_name", name) + self.pid_str = str(os.getpid()) + self.col_map = merge_column_map(table_config.get("columns")) + self.filter_query = table_config.get("filter", "") # WHERE 子句(不含 WHERE) + self.log_file_pattern = table_config.get("log_file", f"{name}.log") + + self._last_id: Optional[Any] = None + self._running = False + self._thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + self._stop_evt = threading.Event() + + # ------------------------------------------------------------------ + # 公共控制 + # ------------------------------------------------------------------ + + def start(self) -> None: + with self._lock: + if self._running: + return + self._running = True + self._stop_evt.clear() + self._thread = threading.Thread(target=self._run_loop, daemon=True, name=f"logsrc-{self.name}") + self._thread.start() + + def stop(self, timeout: float = 10.0) -> None: + with self._lock: + if not self._running: + return + self._running = False + self._stop_evt.set() + if self._thread: + self._thread.join(timeout=timeout) + + def is_alive(self) -> bool: + with self._lock: + return self._running + + # ------------------------------------------------------------------ + # 内部循环 + # ------------------------------------------------------------------ + + def _run_loop(self) -> None: + logger = LOGGER or logging.getLogger(APP_NAME) + # 启动时加载断点 + self._last_id = load_checkpoint(str(self.checkpoint_dir), self.name) + + while not self._stop_evt.is_set(): + try: + fetched = self._fetch_and_write() + if fetched == 0: + # 没有新日志,sleep 等待下次轮询 + self._stop_evt.wait(self.interval) + else: + # 有新日志,短暂 sleep 避免空转 + self._stop_evt.wait(min(self.interval, 2.0)) + except DatabaseError as e: + logger.error("[%s] 数据库错误(%s),%ds 后重试: %s", self.name, self.db_type, self.interval, e) + self._stop_evt.wait(self.interval) + except Exception as e: + logger.exception("[%s] 未知异常,%ds 后重试: %s", self.name, self.interval, e) + self._stop_evt.wait(self.interval) + + logger.debug("[%s] 线程已停止", self.name) + + # ------------------------------------------------------------------ + # 核心逻辑 + # ------------------------------------------------------------------ + + def _fetch_and_write(self) -> int: + """拉取并写入日志,返回本轮写入条数""" + rows, new_last = self._fetch() + if not rows: + return 0 + + self._write_rows(rows) + self._last_id = new_last + save_checkpoint(str(self.checkpoint_dir), self.name, self._last_id) + return len(rows) + + def _fetch(self) -> Tuple[List[Dict], Optional[int]]: + """调用对应数据库的 fetch 函数""" + if self.db_type == "mysql": + return get_rows_mysql(self.db_config, self.col_map, self._last_id, self.batch_size) + elif self.db_type == "postgresql" or self.db_type == "postgres": + return get_rows_pg(self.db_config, self.col_map, self._last_id, self.batch_size) + else: + raise ValueError(f"不支持的数据库类型: {self.db_type}(仅支持 mysql/postgresql)") + + def _write_rows(self, rows: List[Dict]) -> None: + """将行数据写入日志文件""" + out_path = self.output_dir / self.log_file_pattern + self.output_dir.mkdir(parents=True, exist_ok=True) + + lines: List[str] = [] + for row in rows: + line = self._row_to_syslog(row) + lines.append(line) + + with open(out_path, "a", encoding="utf-8", buffering=8192) as f: + for line in lines: + f.write(line + "\n") + + logger = LOGGER + if logger: + logger.debug("[%s] 写入 %d 条日志 → %s", self.name, len(lines), out_path) + + def _row_to_syslog(self, row: Dict[str, Any]) -> str: + """将一行数据库记录转为 syslog 格式字符串""" + id_col = self.col_map["id"] + ts_col = self.col_map["timestamp"] + lvl_col = self.col_map["level"] + msg_col = self.col_map["message"] + + ts_val = row.get(ts_col) + lvl_val = row.get(lvl_col, "INFO") + msg_val = str(row.get(msg_col) or "") + tid_val = str(row.get(self.col_map.get("trace_id", "")) or "") + sid_val = str(row.get(self.col_map.get("span_id", "")) or "") + ext_val = str(row.get(self.col_map.get("extra", "")) or "") + + # 处理时间戳格式 + timestamp = self._format_timestamp(ts_val) + + return format_syslog_line( + timestamp = timestamp, + hostname = self.hostname, + app_name = self.app_name, + pid = self.pid_str, + level = lvl_val, + message = msg_val, + trace_id = tid_val, + span_id = sid_val, + extra = ext_val, + ) + + def _format_timestamp(self, val: Any) -> str: + """ + 将任意时间格式转为 ISO 8601 格式(带时区)。 + 支持: datetime / date / 字符串 / unix timestamp(float/int) / None + """ + if val is None: + return datetime.utcnow().isoformat() + "Z" + + if isinstance(val, datetime): + try: + return val.isoformat() + except Exception: + return str(val) + + if isinstance(val, (int, float)): + try: + dt = datetime.fromtimestamp(val) + return dt.isoformat() + except Exception: + return str(val) + + # 字符串:尝试 parse + s = str(val).strip() + for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z"): + try: + dt = datetime.strptime(s[:len(fmt.rstrip("._f%z"))], fmt.rstrip("._f%z")) + return dt.isoformat() + except Exception: + pass + return s # 无法解析,返回原字符串 + + +# ============================================================================== +# 主程序 +# ============================================================================== + +def build_sources_from_config(config: Dict[str, Any]) -> List[LogSource]: + """根据配置字典构建所有 LogSource 实例""" + global_cfg = config.get("global", {}) + db_configs = config.get("databases", {}) + sources_cfg = config.get("sources", []) + + output_dir = global_cfg.get("output_dir", "/var/log/db_exporter") + checkpoint_dir = global_cfg.get("checkpoint_dir", "/var/lib/db_exporter/checkpoints") + hostname = global_cfg.get("hostname", DEFAULT_HOSTNAME) + default_interval = global_cfg.get("interval", DEFAULT_INTERVAL) + default_batch = global_cfg.get("batch_size", DEFAULT_BATCH) + + sources: List[LogSource] = [] + for src_cfg in sources_cfg: + db_ref = src_cfg.get("database") + if db_ref not in db_configs: + raise ValueError(f"数据源 [{src_cfg['name']}] 引用的数据库 '{db_ref}' 未在 databases 中定义") + + db_type = db_configs[db_ref].get("type", "mysql").lower() + db_cfg = db_configs[db_ref] + + sources.append(LogSource( + name = src_cfg["name"], + db_type = db_type, + db_config = db_cfg, + table_config = src_cfg, + output_dir = output_dir, + checkpoint_dir = checkpoint_dir, + interval = src_cfg.get("interval", default_interval), + batch_size = src_cfg.get("batch_size", default_batch), + hostname = hostname, + )) + + return sources + + +def create_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="数据库日志导出守护进程 — 从 MySQL / PostgreSQL 定时拉取日志,写入标准 syslog 格式文件", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + %(prog)s -c /etc/db_log_exporter/config.yaml + %(prog)s -c config.yaml --once # 运行一次(不守护)并退出 + %(prog)s -c config.yaml --dry-run # 仅连接测试,不写入文件 + %(prog)s -c config.yaml --log-level DEBUG + """, + ) + parser.add_argument("-c", "--config", required=True, + help="YAML 配置文件路径") + parser.add_argument("--once", action="store_true", + help="仅执行一次轮询后退出(不守护)") + parser.add_argument("--dry-run", action="store_true", + help="仅测试数据库连接,不写入日志文件") + parser.add_argument("--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], + help="本程序的日志级别(默认 INFO)") + parser.add_argument("--log-file", + help="本程序的日志输出文件(默认仅输出到 stdout)") + return parser + + +def main() -> int: + parser = create_parser() + args = parser.parse_args() + + # 配置日志 + setup_logging(level=args.log_level, log_file=args.log_file) + logger = LOGGER + + # 加载配置 + try: + config = load_config(args.config) + except FileNotFoundError: + print(f"[ERROR] 配置文件不存在: {args.config}", file=sys.stderr) + return 1 + except yaml.YAMLError as e: + print(f"[ERROR] YAML 解析失败: {e}", file=sys.stderr) + return 1 + + # 构建数据源 + try: + sources = build_sources_from_config(config) + except Exception as e: + logger.error("配置解析失败: %s", e) + return 1 + + if not sources: + logger.warning("配置中没有发现任何数据源,请检查配置文件。") + return 0 + + # 确保必要目录存在(如果以 root 运行时) + for d in [config.get("global", {}).get("output_dir", "/var/log/db_exporter"), + config.get("global", {}).get("checkpoint_dir", "/var/lib/db_exporter/checkpoints")]: + try: + Path(d).mkdir(parents=True, exist_ok=True) + except PermissionError: + logger.warning("无权限创建目录 %s,将使用当前目录下的子目录", d) + + # Dry-run:仅测试连接 + if args.dry_run: + logger.info("=== Dry-run 模式:仅测试数据库连接 ===") + ok = True + for src in sources: + try: + rows, _ = src._fetch() + logger.info("[%s] ✅ 连接成功,当前有新日志 %d 条(未写入)", src.name, len(rows)) + except Exception as e: + logger.error("[%s] ❌ 连接失败: %s", src.name, e) + ok = False + return 0 if ok else 1 + + # 获取锁文件(防止重复启动) + lock_file = f"/var/run/{APP_NAME}/{APP_NAME}.lock" + lock_fd = None + try: + Path(lock_file).parent.mkdir(parents=True, exist_ok=True) + lock_fd = acquire_lock_file(lock_file) + except PermissionError: + logger.warning("无权限创建锁文件 %s,跳过锁定检测(可能重复启动)", lock_file) + except RuntimeError as e: + logger.error("%s", e) + return 1 + + # 注册信号处理 + def signal_handler(signum, frame): + sig_name = signal.Signals(signum).name + logger.info("收到信号 %s,准备优雅退出...", sig_name) + for src in sources: + src.stop(timeout=5.0) + logger.info("所有数据源已停止,退出。") + sys.exit(0) + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + + # 启动所有数据源 + for src in sources: + src.start() + logger.info("已启动数据源: %s (db=%s, table=%s, interval=%ds)", + src.name, src.db_type, src.table_config["table"], src.interval) + + logger.info("=== %s 已启动,PID=%d,共 %d 个数据源 ===", + APP_NAME, os.getpid(), len(sources)) + + if args.once: + # --once:等待一轮轮询后退出 + time.sleep(max(s.interval for s in sources) + 2) + for src in sources: + src.stop(timeout=5.0) + logger.info("=== 单次轮询完成,退出 ===") + else: + # 守护进程主循环(防止主线程退出) + while True: + time.sleep(3600) + + if lock_fd is not None: + os.close(lock_fd) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/db_log_exporter.service b/db_log_exporter.service new file mode 100644 index 0000000..aa30bde --- /dev/null +++ b/db_log_exporter.service @@ -0,0 +1,41 @@ +[Unit] +Description=Database Log Exporter — 从 MySQL/PostgreSQL 拉取日志写入文本文件 +Documentation=https://github.com/example/db_log_exporter +After=network.target mysql.service postgresql.service +Wants=mysql.service postgresql.service + +[Service] +# 以专门的用户/组运行(推荐先创建,示例:useradd -r -s /sbin/nologin db_exporter) +User=root +Group=root + +# 程序路径 +ExecStart=/usr/bin/python3 /opt/db_log_exporter/db_log_exporter.py \ + --config /etc/db_log_exporter/config.yaml \ + --log-file /var/log/db_exporter/exporter.log \ + --log-level INFO + +# 重启策略 +Restart=on-failure +RestartSec=10 +StartLimitBurst=5 + +# 安全加固 +NoNewPrivileges=true +ProtectSystem=strict +ProtectHome=true +ReadWritePaths=/var/log/db_exporter /var/lib/db_exporter + +# 环境变量(可选,敏感信息可用 systemd secret 或环境文件) +# EnvironmentFile=/etc/db_log_exporter/env + +# 日志输出(systemd journal) +StandardOutput=journal +StandardError=journal +SyslogIdentifier=db_log_exporter + +# 优雅停止超时 +TimeoutStopSec=30 + +[Install] +WantedBy=multi-user.target diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..de0fd02 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +PyMySQL>=1.1.0 +psycopg2-binary>=2.9.0 +PyYAML>=6.0 diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..9a7e3a3 --- /dev/null +++ b/setup.sh @@ -0,0 +1,175 @@ +#!/usr/bin/env bash +# ============================================================================== +# db_log_exporter — 安装部署脚本 +# ============================================================================== +# 用法: +# sudo ./setup.sh [install | uninstall | status] +# +# 支持: CentOS 7/8, RHEL, Fedora, Debian, Ubuntu +# ============================================================================== + +set -euo pipefail + +APP_NAME="db_log_exporter" +INSTALL_DIR="/opt/${APP_NAME}" +CONFIG_DIR="/etc/${APP_NAME}" +SYSTEMD_UNIT="${APP_NAME}.service" +LOG_DIR="/var/log/${APP_NAME}" +DATA_DIR="/var/lib/${APP_NAME}" +RUN_DIR="/var/run/${APP_NAME}" +CHECKPOINT_DIR="${DATA_DIR}/checkpoints" + +# 颜色 +RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m' + +info() { echo -e "${GREEN}[INFO]${NC} $*"; } +warn() { echo -e "${YELLOW}[WARN]${NC} $*" >&2; } +error() { echo -e "${RED}[ERROR]${NC} $*" >&2; exit 1; } +need_root() { [ "$(id -u)" -eq 0 ] || error "请用 root 权限运行: sudo $0 $*"; } + +# --------------------------------------------------------------------------- +# 检查依赖 +# --------------------------------------------------------------------------- +check_deps() { + info "检查依赖..." + local missing=() + + if ! command -v python3 &>/dev/null; then + missing+=("python3") + fi + + if ! python3 -c "import pymysql" 2>/dev/null; then + missing+=("PyMySQL (pip3 install pymysql)") + fi + if ! python3 -c "import psycopg2" 2>/dev/null; then + missing+=("psycopg2-binary (pip3 install psycopg2-binary)") + fi + if ! python3 -c "import yaml" 2>/dev/null; then + missing+=("PyYAML (pip3 install pyyaml)") + fi + + if [ ${#missing[@]} -gt 0 ]; then + error "缺少依赖: ${missing[*]} \n请先安装: pip3 install PyMySQL psycopg2-binary PyYAML" + fi + + # 检查 MySQL / PostgreSQL 客户端(仅检查命令是否存在,不强制要求服务运行) + if command -v mysql &>/dev/null || command -v psql &>/dev/null; then + info "数据库客户端已找到" + else + warn "未找到 mysql/psql 客户端(程序使用 Python DB 驱动,不影响运行)" + fi + + info "依赖检查通过 ✓" +} + +# --------------------------------------------------------------------------- +# 安装 +# --------------------------------------------------------------------------- +do_install() { + need_root "$@" + check_deps + + info "开始安装 ${APP_NAME}..." + + # 1. 创建目录 + for d in "${INSTALL_DIR}" "${CONFIG_DIR}" "${LOG_DIR}" "${DATA_DIR}" "${CHECKPOINT_DIR}" "${RUN_DIR}"; do + mkdir -p "$d" + echo " 创建: $d" + done + + # 2. 复制文件 + cp "$(dirname "$0")/db_log_exporter.py" "${INSTALL_DIR}/" + cp "$(dirname "$0")/config.yaml" "${CONFIG_DIR}/config.yaml.example" + [ -f "$(dirname "$0")/requirements.txt" ] && \ + cp "$(dirname "$0")/requirements.txt" "${INSTALL_DIR}/" + + info "文件已复制到 ${INSTALL_DIR} 和 ${CONFIG_DIR}" + + # 3. 创建 config.yaml(如果不存在) + if [ ! -f "${CONFIG_DIR}/config.yaml" ]; then + if [ -f "${CONFIG_DIR}/config.yaml.example" ]; then + cp "${CONFIG_DIR}/config.yaml.example" "${CONFIG_DIR}/config.yaml" + warn "已创建默认配置: ${CONFIG_DIR}/config.yaml" + warn "请编辑配置文件,填入数据库连接信息后再启动服务!" + fi + fi + + # 4. 安装 systemd 服务 + local systemd_dir="/etc/systemd/system" + cp "$(dirname "$0")/${SYSTEMD_UNIT}" "${systemd_dir}/" + chmod 644 "${systemd_dir}/${SYSTEMD_UNIT}" + systemctl daemon-reload + + info "systemd 服务已安装 ✓" + info "" + info "=== 后续步骤 ===" + info "1. 编辑配置: nano ${CONFIG_DIR}/config.yaml" + info "2. 检查连接: python3 ${INSTALL_DIR}/${APP_NAME}.py -c ${CONFIG_DIR}/config.yaml --dry-run" + info "3. 启动服务: systemctl start ${APP_NAME}" + info "4. 开机自启: systemctl enable ${APP_NAME}" + info "5. 查看日志: journalctl -u ${APP_NAME} -f" + info "" + info "安装完成 ✓" +} + +# --------------------------------------------------------------------------- +# 卸载 +# --------------------------------------------------------------------------- +do_uninstall() { + need_root "$@" + + info "停止并禁用服务..." + systemctl stop "${APP_NAME}" 2>/dev/null || true + systemctl disable "${APP_NAME}" 2>/dev/null || true + + info "删除 systemd unit..." + rm -f "/etc/systemd/system/${SYSTEMD_UNIT}" + systemctl daemon-reload + + info "删除安装目录(确认不再需要)..." + read -rp "是否删除 ${INSTALL_DIR} 和 ${CONFIG_DIR}?[y/N]: " confirm + if [[ "$confirm" =~ ^[Yy]$ ]]; then + rm -rf "${INSTALL_DIR}" "${CONFIG_DIR}" "${LOG_DIR}" "${DATA_DIR}" "${RUN_DIR}" + info "已删除所有文件 ✓" + else + warn "保留文件,手动清理: rm -rf ${INSTALL_DIR} ${CONFIG_DIR}" + fi +} + +# --------------------------------------------------------------------------- +# 状态 +# --------------------------------------------------------------------------- +do_status() { + if systemctl is-active --quiet "${APP_NAME}"; then + info "服务状态: 运行中 ✓" + systemctl status "${APP_NAME}" --no-pager + elif systemctl is-enabled --quiet "${APP_NAME}"; then + warn "服务状态: 未运行(已开机自启)" + else + warn "服务状态: 未安装或未启用" + fi + + if [ -d "${CHECKPOINT_DIR}" ]; then + echo "" + info "断点文件:" + ls -la "${CHECKPOINT_DIR}" 2>/dev/null | grep -v "^total" || echo " (空)" + fi + + if [ -d "${LOG_DIR}" ]; then + echo "" + info "日志文件:" + ls -lh "${LOG_DIR}" 2>/dev/null || echo " (空)" + fi +} + +# --------------------------------------------------------------------------- +# 主入口 +# --------------------------------------------------------------------------- +ACTION="${1:-install}" + +case "$ACTION" in + install) do_install "$@" ;; + uninstall) do_uninstall "$@" ;; + status) do_status "$@" ;; + *) error "未知操作: $ACTION(支持: install | uninstall | status)" ;; +esac