Files
DBLog/db_log_exporter.py
QClaw Bot ebc1a2a87c fix: 替换所有中文括号为英文括号
feat: 新增操作审计字段 user/action/target/result 到 syslog 输出
docs: 更新 README 输出格式和配置示例说明
2026-05-13 13:33:45 +08:00

723 lines
25 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
db_log_exporter — 数据库日志导出守护进程
从 MySQL / PostgreSQL 数据库表中定时提取日志,写入标准 syslog 格式文本文件。
支持每个数据源独立跟踪偏移量(断点续传),线程安全,无重复日志。
标准输出格式(RFC 5424 变体,每行一条)
<Jan 1 12:00:00> <hostname> <app_name>[<pid>]: <priority><version> <timestamp_iso> <hostname> <app_name> <pid> <msg_id> <structured_data> <message>
作者QClaw
依赖PyMySQL / psycopg2-binary / PyYAML
"""
import argparse
import fcntl
import fnmatch
import json
import logging
import os
import re
import signal
import socket
import struct
import sys
import threading
import time
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
try:
import pymysql
except ImportError:
pymysql = None
try:
import psycopg2
except ImportError:
psycopg2 = None
# ==============================================================================
# 全局配置 & 日志
# ==============================================================================
APP_NAME = "db_log_exporter"
LOGGER: Optional[logging.Logger] = None
# Syslog 优先级映射
SYSLOG_PRIORITY = {
"DEBUG": "<7>",
"TRACE": "<7>",
"INFO": "<6>",
"NOTICE": "<5>",
"WARN": "<4>",
"WARNING": "<4>",
"ERROR": "<3>",
"ERR": "<3>",
"CRITICAL": "<2>",
"CRIT": "<2>",
"ALERT": "<1>",
"EMERG": "<0>",
"FATAL": "<2>",
}
# 默认字段映射(当配置中未指定时使用)
DEFAULT_COLUMN_MAP = {
"id": "id",
"timestamp": "created_at",
"level": "level",
"logger": "logger",
"message": "message",
"host": "host",
"source": "source",
"trace_id": "trace_id",
"span_id": "span_id",
"extra": "extra",
# operation audit fields
"user": "user",
"action": "action",
"target": "target",
"result": "result",
}
DEFAULT_INTERVAL = 30 # 默认轮询间隔(秒)
DEFAULT_BATCH = 1000 # 默认每次最多读取条数
DEFAULT_HOSTNAME = socket.gethostname()
# ==============================================================================
# 日志工具
# ==============================================================================
def setup_logging(level: str = "INFO", log_file: Optional[str] = None) -> logging.Logger:
"""配置本程序自身的日志输出"""
global LOGGER
logger = logging.getLogger(APP_NAME)
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
datefmt = "%Y-%m-%d %H:%M:%S"
# 控制台
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logging.Formatter(fmt, datefmt))
logger.addHandler(ch)
# 文件(可选)
if log_file:
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setFormatter(logging.Formatter(fmt, datefmt))
logger.addHandler(fh)
LOGGER = logger
return logger
# ==============================================================================
# 工具函数
# ==============================================================================
def get_syslog_priority(level_str: str) -> str:
"""将日志级别字符串转为 syslog 优先级标记"""
normalized = level_str.strip().upper()
return SYSLOG_PRIORITY.get(normalized, "<6>") # 默认为 INFO
def format_syslog_line(
timestamp: str,
hostname: str,
app_name: str,
pid: str,
level: str,
message: str,
trace_id: str = "",
span_id: str = "",
extra: str = "",
user: str = "",
action: str = "",
target: str = "",
result: str = "",
) -> str:
"""
格式化为 RFC 5424 变体 syslog 行。
格式:
<priority>version timestamp hostname appname pid msgid structured_data message
示例(完整一行)
<6>1 2026-01-01T12:00:00.123456+08:00 myhost myapp[12345]: [user=张三] [action=删除订单] [target=订单系统] [result=成功] message text
"""
pri = get_syslog_priority(level)
clean_msg = message.strip()
parts = [f"[{k}={v}]" for k, v in [
("trace", trace_id),
("span", span_id),
("extra", extra),
("user", user),
("action", action),
("target", target),
("result", result),
] if v]
structured = " ".join(parts)
if structured:
structured += " "
return f"{pri}1 {timestamp} {hostname} {app_name}[{pid}]: {structured}{clean_msg}"
def acquire_lock_file(lock_path: str) -> int:
"""
使用 flock 机制获取文件锁,防止重复启动。
返回文件描述符,使用完毕后需 close。
"""
fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o644)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
os.close(fd)
raise RuntimeError(f"另一个 {APP_NAME} 实例已在运行(锁文件:{lock_path}),请先停止。")
return fd
def load_checkpoint(checkpoint_dir: str, source_name: str) -> Optional[Any]:
"""
加载断点文件,返回 last_id(可以是 int、datetime 或任意可序列化值)。
若文件不存在,返回 None(从头开始拉取)。
"""
path = Path(checkpoint_dir) / f"{source_name}.json"
if not path.exists():
return None
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("last_id")
except (json.JSONDecodeError, IOError) as e:
if LOGGER:
LOGGER.warning("断点文件损坏 [%s],将从头拉取: %s", path, e)
return None
def save_checkpoint(checkpoint_dir: str, source_name: str, last_id: Any) -> None:
"""持久化断点"""
Path(checkpoint_dir).mkdir(parents=True, exist_ok=True)
path = Path(checkpoint_dir) / f"{source_name}.json"
tmp = path.with_suffix(".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump({"last_id": last_id, "saved_at": datetime.now().isoformat()}, f)
tmp.rename(path) # 原子替换
def load_config(config_path: str) -> Dict[str, Any]:
"""加载 YAML 配置文件"""
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def merge_column_map(config_map: Optional[Dict[str, str]]) -> Dict[str, str]:
"""合并配置的列映射与默认值"""
result = dict(DEFAULT_COLUMN_MAP)
if config_map:
result.update(config_map)
return result
# ==============================================================================
# 数据库连接工厂
# ==============================================================================
class DatabaseError(Exception):
"""数据库操作异常"""
pass
def build_mysql_query(cfg: Dict[str, Any], col_map: Dict[str, str],
last_id: Optional[int], batch: int) -> Tuple[str, Tuple]:
"""构建 MySQL 查询语句及参数"""
id_col = col_map["id"]
ts_col = col_map["timestamp"]
lvl_col = col_map["level"]
msg_col = col_map["message"]
# 构造 SELECT 列(处理别名)
extra_cols = []
for alias, db_col in col_map.items():
if alias not in ("id", "timestamp", "level", "message") and db_col:
extra_cols.append(f"{db_col} AS {alias}")
select_parts = [id_col, ts_col, lvl_col, msg_col] + extra_cols
# 去重,避免 last_id 列在 extra_cols 里重复
seen = {id_col, ts_col, lvl_col, msg_col}
deduped = []
for p in select_parts:
col = p.split(" AS ")[0].strip()
if col not in seen:
deduped.append(p)
seen.add(col)
select_clause = ", ".join(deduped) if deduped else "*"
if last_id is not None:
sql = (f"SELECT {select_clause} FROM {cfg['table']} "
f"WHERE {id_col} > %s "
f"ORDER BY {id_col} ASC LIMIT %s")
params: Tuple = (last_id, batch)
else:
sql = (f"SELECT {select_clause} FROM {cfg['table']} "
f"ORDER BY {id_col} ASC LIMIT %s")
params = (batch,)
return sql, params
def build_pg_query(cfg: Dict[str, Any], col_map: Dict[str, str],
last_id: Optional[int], batch: int) -> Tuple[str, Tuple]:
"""构建 PostgreSQL 查询语句(语法与 MySQL 基本兼容,%s 替换)"""
return build_mysql_query(cfg, col_map, last_id, batch) # 参数风格相同
def get_rows_mysql(cfg: Dict[str, Any], col_map: Dict[str, str],
last_id: Optional[int], batch: int) -> Tuple[List[Dict], Optional[int]]:
"""从 MySQL 读取日志行,返回 (rows, new_last_id)"""
if pymysql is None:
raise DatabaseError("PyMySQL 未安装,请运行: pip install pymysql")
conn_cfg = {
"host": cfg.get("host", "localhost"),
"port": cfg.get("port", 3306),
"user": cfg["user"],
"password": cfg["password"],
"database": cfg.get("database", ""),
"charset": cfg.get("charset", "utf8mb4"),
"cursorclass": pymysql.cursors.DictCursor,
}
# 不指定 database 时用 None后续再选
if not cfg.get("database"):
conn_cfg.pop("database", None)
conn = pymysql.connect(**conn_cfg)
try:
with conn.cursor() as cur:
if cfg.get("database"):
sql, params = build_mysql_query(cfg, col_map, last_id, batch)
else:
# 先切换 database
cur.execute(f"USE {cfg['database_name']}")
sql, params = build_mysql_query(cfg, col_map, last_id, batch)
cur.execute(sql, params)
rows = cur.fetchall()
finally:
conn.close()
new_last = rows[-1][col_map["id"]] if rows else last_id
return rows, new_last
def get_rows_pg(cfg: Dict[str, Any], col_map: Dict[str, str],
last_id: Optional[int], batch: int) -> Tuple[List[Dict], Optional[int]]:
"""从 PostgreSQL 读取日志行,返回 (rows, new_last_id)"""
if psycopg2 is None:
raise DatabaseError("psycopg2 未安装,请运行: pip install psycopg2-binary")
dsn_parts = []
for key in ("host", "port", "dbname", "user", "password"):
val = cfg.get(key)
if val is not None:
if key == "password":
dsn_parts.append(f'password="{val}"')
else:
dsn_parts.append(f"{key}='{val}'")
dsn = " ".join(dsn_parts)
conn = psycopg2.connect(dsn)
try:
with conn.cursor() as cur:
sql, params = build_pg_query(cfg, col_map, last_id, batch)
cur.execute(sql, params)
rows = cur.fetchall()
# psycopg2 返回普通 tuple需要手动映射列名
col_names = [desc[0] for desc in cur.description]
rows = [dict(zip(col_names, r)) for r in rows]
finally:
conn.close()
new_last = rows[-1][col_map["id"]] if rows else last_id
return rows, new_last
# ==============================================================================
# 单个日志源处理器
# ==============================================================================
class LogSource:
"""单个数据库日志源"""
def __init__(
self,
name: str,
db_type: str,
db_config: Dict[str, Any],
table_config: Dict[str, Any],
output_dir: str,
checkpoint_dir: str,
interval: int,
batch_size: int,
hostname: str,
):
self.name = name
self.db_type = db_type.lower()
self.db_config = db_config
self.table_config = table_config
self.output_dir = Path(output_dir)
self.checkpoint_dir = Path(checkpoint_dir)
self.interval = interval
self.batch_size = batch_size
self.hostname = hostname
self.app_name = table_config.get("app_name", name)
self.pid_str = str(os.getpid())
self.col_map = merge_column_map(table_config.get("columns"))
self.filter_query = table_config.get("filter", "") # WHERE 子句(不含 WHERE)
self.log_file_pattern = table_config.get("log_file", f"{name}.log")
self._last_id: Optional[Any] = None
self._running = False
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
self._stop_evt = threading.Event()
# ------------------------------------------------------------------
# 公共控制
# ------------------------------------------------------------------
def start(self) -> None:
with self._lock:
if self._running:
return
self._running = True
self._stop_evt.clear()
self._thread = threading.Thread(target=self._run_loop, daemon=True, name=f"logsrc-{self.name}")
self._thread.start()
def stop(self, timeout: float = 10.0) -> None:
with self._lock:
if not self._running:
return
self._running = False
self._stop_evt.set()
if self._thread:
self._thread.join(timeout=timeout)
def is_alive(self) -> bool:
with self._lock:
return self._running
# ------------------------------------------------------------------
# 内部循环
# ------------------------------------------------------------------
def _run_loop(self) -> None:
logger = LOGGER or logging.getLogger(APP_NAME)
# 启动时加载断点
self._last_id = load_checkpoint(str(self.checkpoint_dir), self.name)
while not self._stop_evt.is_set():
try:
fetched = self._fetch_and_write()
if fetched == 0:
# 没有新日志sleep 等待下次轮询
self._stop_evt.wait(self.interval)
else:
# 有新日志,短暂 sleep 避免空转
self._stop_evt.wait(min(self.interval, 2.0))
except DatabaseError as e:
logger.error("[%s] 数据库错误(%s)%ds 后重试: %s", self.name, self.db_type, self.interval, e)
self._stop_evt.wait(self.interval)
except Exception as e:
logger.exception("[%s] 未知异常,%ds 后重试: %s", self.name, self.interval, e)
self._stop_evt.wait(self.interval)
logger.debug("[%s] 线程已停止", self.name)
# ------------------------------------------------------------------
# 核心逻辑
# ------------------------------------------------------------------
def _fetch_and_write(self) -> int:
"""拉取并写入日志,返回本轮写入条数"""
rows, new_last = self._fetch()
if not rows:
return 0
self._write_rows(rows)
self._last_id = new_last
save_checkpoint(str(self.checkpoint_dir), self.name, self._last_id)
return len(rows)
def _fetch(self) -> Tuple[List[Dict], Optional[int]]:
"""调用对应数据库的 fetch 函数"""
if self.db_type == "mysql":
return get_rows_mysql(self.db_config, self.col_map, self._last_id, self.batch_size)
elif self.db_type == "postgresql" or self.db_type == "postgres":
return get_rows_pg(self.db_config, self.col_map, self._last_id, self.batch_size)
else:
raise ValueError(f"不支持的数据库类型: {self.db_type}(仅支持 mysql/postgresql)")
def _write_rows(self, rows: List[Dict]) -> None:
"""将行数据写入日志文件"""
out_path = self.output_dir / self.log_file_pattern
self.output_dir.mkdir(parents=True, exist_ok=True)
lines: List[str] = []
for row in rows:
line = self._row_to_syslog(row)
lines.append(line)
with open(out_path, "a", encoding="utf-8", buffering=8192) as f:
for line in lines:
f.write(line + "\n")
logger = LOGGER
if logger:
logger.debug("[%s] 写入 %d 条日志 → %s", self.name, len(lines), out_path)
def _row_to_syslog(self, row: Dict[str, Any]) -> str:
"""将一行数据库记录转为 syslog 格式字符串"""
id_col = self.col_map["id"]
ts_col = self.col_map["timestamp"]
lvl_col = self.col_map["level"]
msg_col = self.col_map["message"]
ts_val = row.get(ts_col)
lvl_val = row.get(lvl_col, "INFO")
msg_val = str(row.get(msg_col) or "")
tid_val = str(row.get(self.col_map.get("trace_id", "")) or "")
sid_val = str(row.get(self.col_map.get("span_id", "")) or "")
ext_val = str(row.get(self.col_map.get("extra", "")) or "")
usr_val = str(row.get(self.col_map.get("user", "") or "") or "")
act_val = str(row.get(self.col_map.get("action", "") or "") or "")
tgt_val = str(row.get(self.col_map.get("target", "") or "") or "")
res_val = str(row.get(self.col_map.get("result", "") or "") or "")
# 处理时间戳格式
timestamp = self._format_timestamp(ts_val)
return format_syslog_line(
timestamp = timestamp,
hostname = self.hostname,
app_name = self.app_name,
pid = self.pid_str,
level = lvl_val,
message = msg_val,
trace_id = tid_val,
span_id = sid_val,
extra = ext_val,
user = usr_val,
action = act_val,
target = tgt_val,
result = res_val,
)
def _format_timestamp(self, val: Any) -> str:
"""
将任意时间格式转为 ISO 8601 格式(带时区)。
支持: datetime / date / 字符串 / unix timestamp(float/int) / None
"""
if val is None:
return datetime.utcnow().isoformat() + "Z"
if isinstance(val, datetime):
try:
return val.isoformat()
except Exception:
return str(val)
if isinstance(val, (int, float)):
try:
dt = datetime.fromtimestamp(val)
return dt.isoformat()
except Exception:
return str(val)
# 字符串:尝试 parse
s = str(val).strip()
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z"):
try:
dt = datetime.strptime(s[:len(fmt.rstrip("._f%z"))], fmt.rstrip("._f%z"))
return dt.isoformat()
except Exception:
pass
return s # 无法解析,返回原字符串
# ==============================================================================
# 主程序
# ==============================================================================
def build_sources_from_config(config: Dict[str, Any]) -> List[LogSource]:
"""根据配置字典构建所有 LogSource 实例"""
global_cfg = config.get("global", {})
db_configs = config.get("databases", {})
sources_cfg = config.get("sources", [])
output_dir = global_cfg.get("output_dir", "/var/log/db_exporter")
checkpoint_dir = global_cfg.get("checkpoint_dir", "/var/lib/db_exporter/checkpoints")
hostname = global_cfg.get("hostname", DEFAULT_HOSTNAME)
default_interval = global_cfg.get("interval", DEFAULT_INTERVAL)
default_batch = global_cfg.get("batch_size", DEFAULT_BATCH)
sources: List[LogSource] = []
for src_cfg in sources_cfg:
db_ref = src_cfg.get("database")
if db_ref not in db_configs:
raise ValueError(f"数据源 [{src_cfg['name']}] 引用的数据库 '{db_ref}' 未在 databases 中定义")
db_type = db_configs[db_ref].get("type", "mysql").lower()
db_cfg = db_configs[db_ref]
sources.append(LogSource(
name = src_cfg["name"],
db_type = db_type,
db_config = db_cfg,
table_config = src_cfg,
output_dir = output_dir,
checkpoint_dir = checkpoint_dir,
interval = src_cfg.get("interval", default_interval),
batch_size = src_cfg.get("batch_size", default_batch),
hostname = hostname,
))
return sources
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="数据库日志导出守护进程 — 从 MySQL / PostgreSQL 定时拉取日志,写入标准 syslog 格式文件",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
%(prog)s -c /etc/db_log_exporter/config.yaml
%(prog)s -c config.yaml --once # 运行一次(不守护)并退出
%(prog)s -c config.yaml --dry-run # 仅连接测试,不写入文件
%(prog)s -c config.yaml --log-level DEBUG
""",
)
parser.add_argument("-c", "--config", required=True,
help="YAML 配置文件路径")
parser.add_argument("--once", action="store_true",
help="仅执行一次轮询后退出(不守护)")
parser.add_argument("--dry-run", action="store_true",
help="仅测试数据库连接,不写入日志文件")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="本程序的日志级别(默认 INFO)")
parser.add_argument("--log-file",
help="本程序的日志输出文件(默认仅输出到 stdout)")
return parser
def main() -> int:
parser = create_parser()
args = parser.parse_args()
# 配置日志
setup_logging(level=args.log_level, log_file=args.log_file)
logger = LOGGER
# 加载配置
try:
config = load_config(args.config)
except FileNotFoundError:
print(f"[ERROR] 配置文件不存在: {args.config}", file=sys.stderr)
return 1
except yaml.YAMLError as e:
print(f"[ERROR] YAML 解析失败: {e}", file=sys.stderr)
return 1
# 构建数据源
try:
sources = build_sources_from_config(config)
except Exception as e:
logger.error("配置解析失败: %s", e)
return 1
if not sources:
logger.warning("配置中没有发现任何数据源,请检查配置文件。")
return 0
# 确保必要目录存在(如果以 root 运行时)
for d in [config.get("global", {}).get("output_dir", "/var/log/db_exporter"),
config.get("global", {}).get("checkpoint_dir", "/var/lib/db_exporter/checkpoints")]:
try:
Path(d).mkdir(parents=True, exist_ok=True)
except PermissionError:
logger.warning("无权限创建目录 %s,将使用当前目录下的子目录", d)
# Dry-run仅测试连接
if args.dry_run:
logger.info("=== Dry-run 模式:仅测试数据库连接 ===")
ok = True
for src in sources:
try:
rows, _ = src._fetch()
logger.info("[%s] ✅ 连接成功,当前有新日志 %d 条(未写入)", src.name, len(rows))
except Exception as e:
logger.error("[%s] ❌ 连接失败: %s", src.name, e)
ok = False
return 0 if ok else 1
# 获取锁文件(防止重复启动)
lock_file = f"/var/run/{APP_NAME}/{APP_NAME}.lock"
lock_fd = None
try:
Path(lock_file).parent.mkdir(parents=True, exist_ok=True)
lock_fd = acquire_lock_file(lock_file)
except PermissionError:
logger.warning("无权限创建锁文件 %s,跳过锁定检测(可能重复启动)", lock_file)
except RuntimeError as e:
logger.error("%s", e)
return 1
# 注册信号处理
def signal_handler(signum, frame):
sig_name = signal.Signals(signum).name
logger.info("收到信号 %s,准备优雅退出...", sig_name)
for src in sources:
src.stop(timeout=5.0)
logger.info("所有数据源已停止,退出。")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# 启动所有数据源
for src in sources:
src.start()
logger.info("已启动数据源: %s (db=%s, table=%s, interval=%ds)",
src.name, src.db_type, src.table_config["table"], src.interval)
logger.info("=== %s 已启动PID=%d,共 %d 个数据源 ===",
APP_NAME, os.getpid(), len(sources))
if args.once:
# --once等待一轮轮询后退出
time.sleep(max(s.interval for s in sources) + 2)
for src in sources:
src.stop(timeout=5.0)
logger.info("=== 单次轮询完成,退出 ===")
else:
# 守护进程主循环(防止主线程退出)
while True:
time.sleep(3600)
if lock_fd is not None:
os.close(lock_fd)
return 0
if __name__ == "__main__":
sys.exit(main())