#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ db_log_exporter — 数据库日志导出守护进程 从 MySQL / PostgreSQL 数据库表中定时提取日志,写入标准 syslog 格式文本文件。 支持每个数据源独立跟踪偏移量(断点续传),线程安全,无重复日志。 标准输出格式(RFC 5424 变体,每行一条): []: 作者:QClaw 依赖:PyMySQL / psycopg2-binary / PyYAML """ import argparse import fcntl import fnmatch import json import logging import os import re import signal import socket import struct import sys import threading import time from collections import defaultdict from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import yaml try: import pymysql except ImportError: pymysql = None try: import psycopg2 except ImportError: psycopg2 = None # ============================================================================== # 全局配置 & 日志 # ============================================================================== APP_NAME = "db_log_exporter" LOGGER: Optional[logging.Logger] = None # Syslog 优先级映射 SYSLOG_PRIORITY = { "DEBUG": "<7>", "TRACE": "<7>", "INFO": "<6>", "NOTICE": "<5>", "WARN": "<4>", "WARNING": "<4>", "ERROR": "<3>", "ERR": "<3>", "CRITICAL": "<2>", "CRIT": "<2>", "ALERT": "<1>", "EMERG": "<0>", "FATAL": "<2>", } # 默认字段映射(当配置中未指定时使用) DEFAULT_COLUMN_MAP = { "id": "id", "timestamp": "created_at", "level": "level", "logger": "logger", "message": "message", "host": "host", "source": "source", "trace_id": "trace_id", "span_id": "span_id", "extra": "extra", # operation audit fields "user": "user", "action": "action", "target": "target", "result": "result", } DEFAULT_INTERVAL = 30 # 默认轮询间隔(秒) DEFAULT_BATCH = 1000 # 默认每次最多读取条数 DEFAULT_HOSTNAME = socket.gethostname() # ============================================================================== # 日志工具 # ============================================================================== def setup_logging(level: str = "INFO", log_file: Optional[str] = None) -> logging.Logger: """配置本程序自身的日志输出""" global LOGGER logger = logging.getLogger(APP_NAME) logger.setLevel(getattr(logging, level.upper(), logging.INFO)) fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" datefmt = "%Y-%m-%d %H:%M:%S" # 控制台 ch = logging.StreamHandler(sys.stdout) ch.setFormatter(logging.Formatter(fmt, datefmt)) logger.addHandler(ch) # 文件(可选) if log_file: fh = logging.FileHandler(log_file, encoding="utf-8") fh.setFormatter(logging.Formatter(fmt, datefmt)) logger.addHandler(fh) LOGGER = logger return logger # ============================================================================== # 工具函数 # ============================================================================== def get_syslog_priority(level_str: str) -> str: """将日志级别字符串转为 syslog 优先级标记""" normalized = level_str.strip().upper() return SYSLOG_PRIORITY.get(normalized, "<6>") # 默认为 INFO def format_syslog_line( timestamp: str, hostname: str, app_name: str, pid: str, level: str, message: str, trace_id: str = "", span_id: str = "", extra: str = "", user: str = "", action: str = "", target: str = "", result: str = "", ) -> str: """ 格式化为 RFC 5424 变体 syslog 行。 格式: version timestamp hostname appname pid msgid structured_data message 示例(完整一行): <6>1 2026-01-01T12:00:00.123456+08:00 myhost myapp[12345]: [user=张三] [action=删除订单] [target=订单系统] [result=成功] message text """ pri = get_syslog_priority(level) clean_msg = message.strip() parts = [f"[{k}={v}]" for k, v in [ ("trace", trace_id), ("span", span_id), ("extra", extra), ("user", user), ("action", action), ("target", target), ("result", result), ] if v] structured = " ".join(parts) if structured: structured += " " return f"{pri}1 {timestamp} {hostname} {app_name}[{pid}]: {structured}{clean_msg}" def acquire_lock_file(lock_path: str) -> int: """ 使用 flock 机制获取文件锁,防止重复启动。 返回文件描述符,使用完毕后需 close。 """ fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o644) try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: os.close(fd) raise RuntimeError(f"另一个 {APP_NAME} 实例已在运行(锁文件:{lock_path}),请先停止。") return fd def load_checkpoint(checkpoint_dir: str, source_name: str) -> Optional[Any]: """ 加载断点文件,返回 last_id(可以是 int、datetime 或任意可序列化值)。 若文件不存在,返回 None(从头开始拉取)。 """ path = Path(checkpoint_dir) / f"{source_name}.json" if not path.exists(): return None try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) return data.get("last_id") except (json.JSONDecodeError, IOError) as e: if LOGGER: LOGGER.warning("断点文件损坏 [%s],将从头拉取: %s", path, e) return None def save_checkpoint(checkpoint_dir: str, source_name: str, last_id: Any) -> None: """持久化断点""" Path(checkpoint_dir).mkdir(parents=True, exist_ok=True) path = Path(checkpoint_dir) / f"{source_name}.json" tmp = path.with_suffix(".tmp") with open(tmp, "w", encoding="utf-8") as f: json.dump({"last_id": last_id, "saved_at": datetime.now().isoformat()}, f) tmp.rename(path) # 原子替换 def load_config(config_path: str) -> Dict[str, Any]: """加载 YAML 配置文件""" with open(config_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) def merge_column_map(config_map: Optional[Dict[str, str]]) -> Dict[str, str]: """合并配置的列映射与默认值""" result = dict(DEFAULT_COLUMN_MAP) if config_map: result.update(config_map) return result # ============================================================================== # 数据库连接工厂 # ============================================================================== class DatabaseError(Exception): """数据库操作异常""" pass def build_mysql_query(cfg: Dict[str, Any], col_map: Dict[str, str], last_id: Optional[int], batch: int) -> Tuple[str, Tuple]: """构建 MySQL 查询语句及参数""" id_col = col_map["id"] ts_col = col_map["timestamp"] lvl_col = col_map["level"] msg_col = col_map["message"] # 构造 SELECT 列(处理别名) extra_cols = [] for alias, db_col in col_map.items(): if alias not in ("id", "timestamp", "level", "message") and db_col: extra_cols.append(f"{db_col} AS {alias}") select_parts = [id_col, ts_col, lvl_col, msg_col] + extra_cols # 去重,避免 last_id 列在 extra_cols 里重复 seen = {id_col, ts_col, lvl_col, msg_col} deduped = [] for p in select_parts: col = p.split(" AS ")[0].strip() if col not in seen: deduped.append(p) seen.add(col) select_clause = ", ".join(deduped) if deduped else "*" if last_id is not None: sql = (f"SELECT {select_clause} FROM {cfg['table']} " f"WHERE {id_col} > %s " f"ORDER BY {id_col} ASC LIMIT %s") params: Tuple = (last_id, batch) else: sql = (f"SELECT {select_clause} FROM {cfg['table']} " f"ORDER BY {id_col} ASC LIMIT %s") params = (batch,) return sql, params def build_pg_query(cfg: Dict[str, Any], col_map: Dict[str, str], last_id: Optional[int], batch: int) -> Tuple[str, Tuple]: """构建 PostgreSQL 查询语句(语法与 MySQL 基本兼容,%s 替换)""" return build_mysql_query(cfg, col_map, last_id, batch) # 参数风格相同 def get_rows_mysql(cfg: Dict[str, Any], col_map: Dict[str, str], last_id: Optional[int], batch: int) -> Tuple[List[Dict], Optional[int]]: """从 MySQL 读取日志行,返回 (rows, new_last_id)""" if pymysql is None: raise DatabaseError("PyMySQL 未安装,请运行: pip install pymysql") conn_cfg = { "host": cfg.get("host", "localhost"), "port": cfg.get("port", 3306), "user": cfg["user"], "password": cfg["password"], "database": cfg.get("database", ""), "charset": cfg.get("charset", "utf8mb4"), "cursorclass": pymysql.cursors.DictCursor, } # 不指定 database 时用 None,后续再选 if not cfg.get("database"): conn_cfg.pop("database", None) conn = pymysql.connect(**conn_cfg) try: with conn.cursor() as cur: if cfg.get("database"): sql, params = build_mysql_query(cfg, col_map, last_id, batch) else: # 先切换 database cur.execute(f"USE {cfg['database_name']}") sql, params = build_mysql_query(cfg, col_map, last_id, batch) cur.execute(sql, params) rows = cur.fetchall() finally: conn.close() new_last = rows[-1][col_map["id"]] if rows else last_id return rows, new_last def get_rows_pg(cfg: Dict[str, Any], col_map: Dict[str, str], last_id: Optional[int], batch: int) -> Tuple[List[Dict], Optional[int]]: """从 PostgreSQL 读取日志行,返回 (rows, new_last_id)""" if psycopg2 is None: raise DatabaseError("psycopg2 未安装,请运行: pip install psycopg2-binary") dsn_parts = [] for key in ("host", "port", "dbname", "user", "password"): val = cfg.get(key) if val is not None: if key == "password": dsn_parts.append(f'password="{val}"') else: dsn_parts.append(f"{key}='{val}'") dsn = " ".join(dsn_parts) conn = psycopg2.connect(dsn) try: with conn.cursor() as cur: sql, params = build_pg_query(cfg, col_map, last_id, batch) cur.execute(sql, params) rows = cur.fetchall() # psycopg2 返回普通 tuple,需要手动映射列名 col_names = [desc[0] for desc in cur.description] rows = [dict(zip(col_names, r)) for r in rows] finally: conn.close() new_last = rows[-1][col_map["id"]] if rows else last_id return rows, new_last # ============================================================================== # 单个日志源处理器 # ============================================================================== class LogSource: """单个数据库日志源""" def __init__( self, name: str, db_type: str, db_config: Dict[str, Any], table_config: Dict[str, Any], output_dir: str, checkpoint_dir: str, interval: int, batch_size: int, hostname: str, ): self.name = name self.db_type = db_type.lower() self.db_config = db_config self.table_config = table_config self.output_dir = Path(output_dir) self.checkpoint_dir = Path(checkpoint_dir) self.interval = interval self.batch_size = batch_size self.hostname = hostname self.app_name = table_config.get("app_name", name) self.pid_str = str(os.getpid()) self.col_map = merge_column_map(table_config.get("columns")) self.filter_query = table_config.get("filter", "") # WHERE 子句(不含 WHERE) self.log_file_pattern = table_config.get("log_file", f"{name}.log") self._last_id: Optional[Any] = None self._running = False self._thread: Optional[threading.Thread] = None self._lock = threading.Lock() self._stop_evt = threading.Event() # ------------------------------------------------------------------ # 公共控制 # ------------------------------------------------------------------ def start(self) -> None: with self._lock: if self._running: return self._running = True self._stop_evt.clear() self._thread = threading.Thread(target=self._run_loop, daemon=True, name=f"logsrc-{self.name}") self._thread.start() def stop(self, timeout: float = 10.0) -> None: with self._lock: if not self._running: return self._running = False self._stop_evt.set() if self._thread: self._thread.join(timeout=timeout) def is_alive(self) -> bool: with self._lock: return self._running # ------------------------------------------------------------------ # 内部循环 # ------------------------------------------------------------------ def _run_loop(self) -> None: logger = LOGGER or logging.getLogger(APP_NAME) # 启动时加载断点 self._last_id = load_checkpoint(str(self.checkpoint_dir), self.name) while not self._stop_evt.is_set(): try: fetched = self._fetch_and_write() if fetched == 0: # 没有新日志,sleep 等待下次轮询 self._stop_evt.wait(self.interval) else: # 有新日志,短暂 sleep 避免空转 self._stop_evt.wait(min(self.interval, 2.0)) except DatabaseError as e: logger.error("[%s] 数据库错误(%s),%ds 后重试: %s", self.name, self.db_type, self.interval, e) self._stop_evt.wait(self.interval) except Exception as e: logger.exception("[%s] 未知异常,%ds 后重试: %s", self.name, self.interval, e) self._stop_evt.wait(self.interval) logger.debug("[%s] 线程已停止", self.name) # ------------------------------------------------------------------ # 核心逻辑 # ------------------------------------------------------------------ def _fetch_and_write(self) -> int: """拉取并写入日志,返回本轮写入条数""" rows, new_last = self._fetch() if not rows: return 0 self._write_rows(rows) self._last_id = new_last save_checkpoint(str(self.checkpoint_dir), self.name, self._last_id) return len(rows) def _fetch(self) -> Tuple[List[Dict], Optional[int]]: """调用对应数据库的 fetch 函数""" if self.db_type == "mysql": return get_rows_mysql(self.db_config, self.col_map, self._last_id, self.batch_size) elif self.db_type == "postgresql" or self.db_type == "postgres": return get_rows_pg(self.db_config, self.col_map, self._last_id, self.batch_size) else: raise ValueError(f"不支持的数据库类型: {self.db_type}(仅支持 mysql/postgresql)") def _write_rows(self, rows: List[Dict]) -> None: """将行数据写入日志文件""" out_path = self.output_dir / self.log_file_pattern self.output_dir.mkdir(parents=True, exist_ok=True) lines: List[str] = [] for row in rows: line = self._row_to_syslog(row) lines.append(line) with open(out_path, "a", encoding="utf-8", buffering=8192) as f: for line in lines: f.write(line + "\n") logger = LOGGER if logger: logger.debug("[%s] 写入 %d 条日志 → %s", self.name, len(lines), out_path) def _row_to_syslog(self, row: Dict[str, Any]) -> str: """将一行数据库记录转为 syslog 格式字符串""" id_col = self.col_map["id"] ts_col = self.col_map["timestamp"] lvl_col = self.col_map["level"] msg_col = self.col_map["message"] ts_val = row.get(ts_col) lvl_val = row.get(lvl_col, "INFO") msg_val = str(row.get(msg_col) or "") tid_val = str(row.get(self.col_map.get("trace_id", "")) or "") sid_val = str(row.get(self.col_map.get("span_id", "")) or "") ext_val = str(row.get(self.col_map.get("extra", "")) or "") usr_val = str(row.get(self.col_map.get("user", "") or "") or "") act_val = str(row.get(self.col_map.get("action", "") or "") or "") tgt_val = str(row.get(self.col_map.get("target", "") or "") or "") res_val = str(row.get(self.col_map.get("result", "") or "") or "") # 处理时间戳格式 timestamp = self._format_timestamp(ts_val) return format_syslog_line( timestamp = timestamp, hostname = self.hostname, app_name = self.app_name, pid = self.pid_str, level = lvl_val, message = msg_val, trace_id = tid_val, span_id = sid_val, extra = ext_val, user = usr_val, action = act_val, target = tgt_val, result = res_val, ) def _format_timestamp(self, val: Any) -> str: """ 将任意时间格式转为 ISO 8601 格式(带时区)。 支持: datetime / date / 字符串 / unix timestamp(float/int) / None """ if val is None: return datetime.utcnow().isoformat() + "Z" if isinstance(val, datetime): try: return val.isoformat() except Exception: return str(val) if isinstance(val, (int, float)): try: dt = datetime.fromtimestamp(val) return dt.isoformat() except Exception: return str(val) # 字符串:尝试 parse s = str(val).strip() for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z"): try: dt = datetime.strptime(s[:len(fmt.rstrip("._f%z"))], fmt.rstrip("._f%z")) return dt.isoformat() except Exception: pass return s # 无法解析,返回原字符串 # ============================================================================== # 主程序 # ============================================================================== def build_sources_from_config(config: Dict[str, Any]) -> List[LogSource]: """根据配置字典构建所有 LogSource 实例""" global_cfg = config.get("global", {}) db_configs = config.get("databases", {}) sources_cfg = config.get("sources", []) output_dir = global_cfg.get("output_dir", "/var/log/db_exporter") checkpoint_dir = global_cfg.get("checkpoint_dir", "/var/lib/db_exporter/checkpoints") hostname = global_cfg.get("hostname", DEFAULT_HOSTNAME) default_interval = global_cfg.get("interval", DEFAULT_INTERVAL) default_batch = global_cfg.get("batch_size", DEFAULT_BATCH) sources: List[LogSource] = [] for src_cfg in sources_cfg: db_ref = src_cfg.get("database") if db_ref not in db_configs: raise ValueError(f"数据源 [{src_cfg['name']}] 引用的数据库 '{db_ref}' 未在 databases 中定义") db_type = db_configs[db_ref].get("type", "mysql").lower() db_cfg = db_configs[db_ref] sources.append(LogSource( name = src_cfg["name"], db_type = db_type, db_config = db_cfg, table_config = src_cfg, output_dir = output_dir, checkpoint_dir = checkpoint_dir, interval = src_cfg.get("interval", default_interval), batch_size = src_cfg.get("batch_size", default_batch), hostname = hostname, )) return sources def create_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="数据库日志导出守护进程 — 从 MySQL / PostgreSQL 定时拉取日志,写入标准 syslog 格式文件", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: %(prog)s -c /etc/db_log_exporter/config.yaml %(prog)s -c config.yaml --once # 运行一次(不守护)并退出 %(prog)s -c config.yaml --dry-run # 仅连接测试,不写入文件 %(prog)s -c config.yaml --log-level DEBUG """, ) parser.add_argument("-c", "--config", required=True, help="YAML 配置文件路径") parser.add_argument("--once", action="store_true", help="仅执行一次轮询后退出(不守护)") parser.add_argument("--dry-run", action="store_true", help="仅测试数据库连接,不写入日志文件") parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="本程序的日志级别(默认 INFO)") parser.add_argument("--log-file", help="本程序的日志输出文件(默认仅输出到 stdout)") return parser def main() -> int: parser = create_parser() args = parser.parse_args() # 配置日志 setup_logging(level=args.log_level, log_file=args.log_file) logger = LOGGER # 加载配置 try: config = load_config(args.config) except FileNotFoundError: print(f"[ERROR] 配置文件不存在: {args.config}", file=sys.stderr) return 1 except yaml.YAMLError as e: print(f"[ERROR] YAML 解析失败: {e}", file=sys.stderr) return 1 # 构建数据源 try: sources = build_sources_from_config(config) except Exception as e: logger.error("配置解析失败: %s", e) return 1 if not sources: logger.warning("配置中没有发现任何数据源,请检查配置文件。") return 0 # 确保必要目录存在(如果以 root 运行时) for d in [config.get("global", {}).get("output_dir", "/var/log/db_exporter"), config.get("global", {}).get("checkpoint_dir", "/var/lib/db_exporter/checkpoints")]: try: Path(d).mkdir(parents=True, exist_ok=True) except PermissionError: logger.warning("无权限创建目录 %s,将使用当前目录下的子目录", d) # Dry-run:仅测试连接 if args.dry_run: logger.info("=== Dry-run 模式:仅测试数据库连接 ===") ok = True for src in sources: try: rows, _ = src._fetch() logger.info("[%s] ✅ 连接成功,当前有新日志 %d 条(未写入)", src.name, len(rows)) except Exception as e: logger.error("[%s] ❌ 连接失败: %s", src.name, e) ok = False return 0 if ok else 1 # 获取锁文件(防止重复启动) lock_file = f"/var/run/{APP_NAME}/{APP_NAME}.lock" lock_fd = None try: Path(lock_file).parent.mkdir(parents=True, exist_ok=True) lock_fd = acquire_lock_file(lock_file) except PermissionError: logger.warning("无权限创建锁文件 %s,跳过锁定检测(可能重复启动)", lock_file) except RuntimeError as e: logger.error("%s", e) return 1 # 注册信号处理 def signal_handler(signum, frame): sig_name = signal.Signals(signum).name logger.info("收到信号 %s,准备优雅退出...", sig_name) for src in sources: src.stop(timeout=5.0) logger.info("所有数据源已停止,退出。") sys.exit(0) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) # 启动所有数据源 for src in sources: src.start() logger.info("已启动数据源: %s (db=%s, table=%s, interval=%ds)", src.name, src.db_type, src.table_config["table"], src.interval) logger.info("=== %s 已启动,PID=%d,共 %d 个数据源 ===", APP_NAME, os.getpid(), len(sources)) if args.once: # --once:等待一轮轮询后退出 time.sleep(max(s.interval for s in sources) + 2) for src in sources: src.stop(timeout=5.0) logger.info("=== 单次轮询完成,退出 ===") else: # 守护进程主循环(防止主线程退出) while True: time.sleep(3600) if lock_fd is not None: os.close(lock_fd) return 0 if __name__ == "__main__": sys.exit(main())