initial: db_log_exporter v1.0

This commit is contained in:
QClaw Bot
2026-05-12 19:02:46 +08:00
commit 7cd8f78404
6 changed files with 1364 additions and 0 deletions

306
README.md Normal file
View File

@@ -0,0 +1,306 @@
# db_log_exporter
从 MySQL / PostgreSQL 数据库表中**定时拉取日志**,写入标准 **syslog 格式**RFC 5424 变体)文本文件。
---
## 功能特性
| 特性 | 说明 |
|------|------|
| **多数据库支持** | MySQL + PostgreSQL 同时运行,互不干扰 |
| **断点续传** | 每个数据源独立记录最后拉取的 ID重启不丢、不重复 |
| **标准格式** | 输出 RFC 5424 syslog 行,可被 rsyslog、Filebeat、Promtail 等直接采集 |
| **多数据源** | 一个进程管理多个表,不同表可配置不同数据库、轮询间隔、输出文件 |
| **线程安全** | 每个数据源独立线程,并发拉取,互不影响 |
| **灵活列映射** | 数据库列名不同时可在配置中映射 |
| **Dry-run** | `--dry-run` 模式测试连接,不写文件 |
| **单次模式** | `--once` 模式跑完一轮即退出,适合 cron 场景 |
| **systemd 集成** | 提供 `.service` 文件,支持开机自启 |
---
## 输出格式
每行一条日志格式如下RFC 5424 变体):
```
<priority>1 2026-05-12T15:30:00.123456+08:00 hostname app_name[12345]: [trace=abc123] [span=def456] 日志内容
```
| 字段 | 说明 |
|------|------|
| `<priority>` | syslog 优先级,`<6>`=INFO `<4>`=WARN `<3>`=ERROR 等 |
| `version` | RFC 5424 版本号,始终为 `1` |
| `timestamp` | ISO 8601 时间(含微秒和时区) |
| `hostname` | 配置文件中的 hostname默认系统 hostname |
| `app_name[pid]` | 配置的 app_name + 本进程 PID |
| `[trace=...] [span=...]` | 结构化数据trace_id / span_id 等) |
| `message` | 日志正文 |
---
## 快速开始
### 1. 安装依赖
```bash
# CentOS / RHEL / Fedora
sudo yum install -y python3 python3-pip
sudo pip3 install PyMySQL psycopg2-binary PyYAML
# Debian / Ubuntu
sudo apt-get install -y python3 python3-pip
sudo pip3 install PyMySQL psycopg2-binary PyYAML
```
### 2. 下载程序
```bash
sudo mkdir -p /opt/db_log_exporter
sudo curl -L https://your-repo/db_log_exporter.py \
-o /opt/db_log_exporter/db_log_exporter.py
sudo chmod +x /opt/db_log_exporter/db_log_exporter.py
```
### 3. 编写配置
```bash
sudo mkdir -p /etc/db_log_exporter
sudo nano /etc/db_log_exporter/config.yaml
```
参考 `config.yaml.example`(本仓库根目录),主要填:
```yaml
databases:
mysql_prod:
type: mysql
host: 192.168.1.100
port: 3306
user: log_reader
password: "your_password"
database: app_logs
sources:
- name: access_log
database: mysql_prod
table: access_log
columns:
id: id
timestamp: created_at
level: log_level
message: message
```
### 4. 测试连接Dry-run
```bash
python3 /opt/db_log_exporter/db_log_exporter.py \
-c /etc/db_log_exporter/config.yaml \
--dry-run
```
### 5. 运行方式
**方式 Asystemd 守护进程(推荐)**
```bash
# 复制 service 文件
sudo cp db_log_exporter.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable --now db_log_exporter
sudo journalctl -u db_log_exporter -f # 实时查看日志
```
**方式 Bcron 定时任务(单次模式)**
```bash
# crontab -e
* * * * * python3 /opt/db_log_exporter/db_log_exporter.py \
-c /etc/db_log_exporter/config.yaml --once
```
**方式 C直接运行前台守护**
```bash
python3 /opt/db_log_exporter/db_log_exporter.py \
-c /etc/db_log_exporter/config.yaml
```
---
## 配置说明
详细配置参考 `config.yaml.example`,核心字段:
```yaml
global:
output_dir: /var/log/db_exporter # 日志输出目录
checkpoint_dir: /var/lib/... # 断点文件目录
hostname: myserver # syslog hostname
interval: 30 # 默认轮询间隔(秒)
batch_size: 1000 # 默认每次最多读条数
databases:
别名:
type: mysql | postgresql
host: ...
port: ...
user: ...
password: ...
database: ... # MySQL: database 名
dbname: ... # PostgreSQL: dbname
charset: utf8mb4 # MySQL 专用
sources:
- name: 源名称 # 唯一标识,用于断点文件名
database: 引用上方别名
table: 表名
log_file: 输出文件名 # 相对于 output_dir
app_name: syslog程序名
interval: 15 # 覆盖全局间隔
batch_size: 500 # 覆盖全局批次大小
columns: # 列名映射
id: id
timestamp: created_at
level: log_level
message: message
trace_id: trace_id # 可选
span_id: span_id # 可选
```
### 数据库用户权限(最小权限)
**MySQL:**
```sql
CREATE USER 'log_reader'@'%' IDENTIFIED BY 'password';
GRANT SELECT ON app_logs.access_log TO 'log_reader'@'%';
GRANT SELECT ON app_logs.error_log TO 'log_reader'@'%';
FLUSH PRIVILEGES;
```
**PostgreSQL:**
```sql
CREATE USER log_reader WITH PASSWORD 'password';
GRANT CONNECT ON DATABASE app_logs TO log_reader;
GRANT SELECT ON application_logs TO log_reader;
GRANT SELECT ON audit_log TO log_reader;
```
---
## 与日志采集系统集成
### rsyslog服务器接收
```conf
# /etc/rsyslog.d/60-db-exporter.conf
module(load="imfile" PollingInterval="10")
input(type="imfile"
File="/var/log/db_exporter/mysql_access.log"
Tag="db:access:"
Facility="local0")
```
### Filebeat采集到 Elasticsearch
```yaml
# filebeat.yml
filebeat.inputs:
- type: log
paths:
- /var/log/db_exporter/*.log
fields:
log_type: db_exporter
fields_under_root: true
```
### PromtailLoki 采集)
```yaml
# promtail.yml
scrape_configs:
- job_name: db_exporter
static_configs:
- targets: [localhost]
labels:
job: db_exporter
__path__: /var/log/db_exporter/*.log
```
---
## 故障排查
| 问题 | 排查方法 |
|------|---------|
| 服务启动失败 | `journalctl -u db_log_exporter -e` 查看错误日志 |
| 连接被拒绝 | 确认数据库允许该 IP 连接,检查防火墙/安全组 |
| 权限不足 | 确认运行用户对 `output_dir``checkpoint_dir` 有写权限 |
| 日志重复 | 删除对应断点文件并重启,程序会从头拉取 |
| 中文乱码 | 确认数据库字符集为 `utf8mb4`MySQL`UTF8`PG |
| 连接超时 | 在 `databases` 中加 `connect_timeout: 10` |
---
## 目录结构
```
db_log_exporter/
├── db_log_exporter.py # 主程序Python 守护进程)
├── config.yaml.example # 配置文件示例
├── requirements.txt # Python 依赖
├── db_log_exporter.service # systemd 服务文件
├── setup.sh # 自动化安装脚本
└── README.md # 本文件
```
---
## 命令行参数
```
-c, --config YAML 配置文件路径(必填)
--once 仅执行一次轮询后退出(适合 cron
--dry-run 仅测试数据库连接,不写文件
--log-level 日志级别: DEBUG|INFO|WARNING|ERROR默认 INFO
--log-file 本程序日志输出文件(默认 stdout
```
---
## 安全建议
1. **数据库密码不要明文写在配置中**,使用环境变量或 systemd secret
```bash
# /etc/db_log_exporter/env
DB_PASSWORD=your_password
```
然后在 `config.yaml` 中用 `${DB_PASSWORD}` 引用
2. **以最小权限用户运行服务**,不要用 root
```bash
useradd -r -s /sbin/nologin db_exporter
chown -R db_exporter:db_exporter /var/log/db_exporter /var/lib/db_exporter
# 修改 service 文件中的 User=db_exporter
```
3. **日志文件及时轮转**,防止磁盘爆满:
```bash
# /etc/logrotate.d/db_log_exporter
/var/log/db_exporter/*.log {
daily
rotate 7
compress
delaycompress
missingok
notifempty
create 0644 root root
sharedscripts
postrotate
systemctl reload db_log_exporter > /dev/null 2>&1 || true
endscript
}
```

137
config.yaml Normal file
View File

@@ -0,0 +1,137 @@
# ==============================================================================
# db_log_exporter — 配置文件示例
# ==============================================================================
# 路径: /etc/db_log_exporter/config.yaml
# ==============================================================================
# 修改说明:
# 1. 在 databases 节点下定义各数据库连接信息
# 2. 在 sources 节点下定义需要导出的日志表
# 3. 每个 source 通过 database 字段引用一个已定义的数据库
# ==============================================================================
# ---------------------------------------------------------------------------
# 全局配置
# ---------------------------------------------------------------------------
global:
# 日志输出目录(需有写入权限)
output_dir: /var/log/db_exporter
# 断点存放目录(需有写入权限)
# 每次拉取后保存最后一条记录的 ID实现断点续传
checkpoint_dir: /var/lib/db_exporter/checkpoints
# 本程序写入日志时使用的 hostname出现在 syslog 行中)
# 不填则自动取系统 hostname
hostname: ""
# 全局默认轮询间隔(秒),单个 source 可单独覆盖
interval: 30
# 全局默认每次最多读取条数,单个 source 可单独覆盖
batch_size: 1000
# ---------------------------------------------------------------------------
# 数据库连接定义
# ---------------------------------------------------------------------------
databases:
# ---------- MySQL 示例 ----------
mysql_prod:
type: mysql
host: 192.168.1.100
port: 3306
user: log_reader
password: "your_password_here"
database: app_logs
charset: utf8mb4
# ---------- PostgreSQL 示例 ----------
pg_prod:
type: postgresql
host: 192.168.1.200
port: 5432
user: log_reader
password: "your_password_here"
dbname: app_logs
# ---------------------------------------------------------------------------
# 日志源定义(每个 source = 一个数据库表)
# ---------------------------------------------------------------------------
sources:
# -------------------------------
# MySQL 日志表 — 按 ID 自增主键
# -------------------------------
- name: mysql_access_log
# 引用上方定义的数据库
database: mysql_prod
# 要查询的表名
table: access_log
# 输出到 output_dir 中的文件名(支持子目录,如 "subdir/app.log"
log_file: mysql_access.log
# 此数据源的 app_name出现在 syslog 行 <app_name>[<pid>]
app_name: access-log
# 此数据源的轮询间隔(秒),覆盖全局配置
interval: 15
# 每次最多读取条数,覆盖全局配置
batch_size: 500
# 列名映射(当数据库列名与默认值不同时使用)
columns:
id: id # 主键/自增列(必填,用于断点跟踪)
timestamp: created_at # 时间戳列(必填)
level: log_level # 日志级别列(必填,值如 INFO/ERROR/WARN
message: msg # 日志内容列(必填)
# 以下为可选扩展字段,可在 syslog structured data 中体现
trace_id: trace_id
span_id: span_id
extra: extra_data
# -------------------------------
# MySQL 日志表 — 错误日志
# -------------------------------
- name: mysql_error_log
database: mysql_prod
table: error_log
log_file: mysql_error.log
app_name: error-log
interval: 10
columns:
id: id
timestamp: created_at
level: level
message: message
trace_id: trace_id
# -------------------------------
# PostgreSQL 日志表 — 应用日志
# -------------------------------
- name: pg_app_log
database: pg_prod
table: application_logs
log_file: pg_app.log
app_name: pg-app
interval: 30
columns:
id: log_id
timestamp: logged_at
level: severity
message: content
logger: component
trace_id: trace_id
extra: metadata
# -------------------------------
# PostgreSQL 日志表 — 审计日志
# -------------------------------
- name: pg_audit_log
database: pg_prod
table: audit_log
log_file: pg_audit.log
app_name: pg-audit
interval: 60
columns:
id: audit_id
timestamp: happened_at
level: event_type
message: description

702
db_log_exporter.py Executable file
View File

@@ -0,0 +1,702 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
db_log_exporter — 数据库日志导出守护进程
从 MySQL / PostgreSQL 数据库表中定时提取日志,写入标准 syslog 格式文本文件。
支持每个数据源独立跟踪偏移量(断点续传),线程安全,无重复日志。
标准输出格式RFC 5424 变体,每行一条):
<Jan 1 12:00:00> <hostname> <app_name>[<pid>]: <priority><version> <timestamp_iso> <hostname> <app_name> <pid> <msg_id> <structured_data> <message>
作者QClaw
依赖PyMySQL / psycopg2-binary / PyYAML
"""
import argparse
import fcntl
import fnmatch
import json
import logging
import os
import re
import signal
import socket
import struct
import sys
import threading
import time
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
try:
import pymysql
except ImportError:
pymysql = None
try:
import psycopg2
except ImportError:
psycopg2 = None
# ==============================================================================
# 全局配置 & 日志
# ==============================================================================
APP_NAME = "db_log_exporter"
LOGGER: Optional[logging.Logger] = None
# Syslog 优先级映射
SYSLOG_PRIORITY = {
"DEBUG": "<7>",
"TRACE": "<7>",
"INFO": "<6>",
"NOTICE": "<5>",
"WARN": "<4>",
"WARNING": "<4>",
"ERROR": "<3>",
"ERR": "<3>",
"CRITICAL": "<2>",
"CRIT": "<2>",
"ALERT": "<1>",
"EMERG": "<0>",
"FATAL": "<2>",
}
# 默认字段映射(当配置中未指定时使用)
DEFAULT_COLUMN_MAP = {
"id": "id",
"timestamp": "created_at",
"level": "level",
"logger": "logger",
"message": "message",
"host": "host",
"source": "source",
"trace_id": "trace_id",
"span_id": "span_id",
"extra": "extra",
}
DEFAULT_INTERVAL = 30 # 默认轮询间隔(秒)
DEFAULT_BATCH = 1000 # 默认每次最多读取条数
DEFAULT_HOSTNAME = socket.gethostname()
# ==============================================================================
# 日志工具
# ==============================================================================
def setup_logging(level: str = "INFO", log_file: Optional[str] = None) -> logging.Logger:
"""配置本程序自身的日志输出"""
global LOGGER
logger = logging.getLogger(APP_NAME)
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
fmt = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
datefmt = "%Y-%m-%d %H:%M:%S"
# 控制台
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(logging.Formatter(fmt, datefmt))
logger.addHandler(ch)
# 文件(可选)
if log_file:
fh = logging.FileHandler(log_file, encoding="utf-8")
fh.setFormatter(logging.Formatter(fmt, datefmt))
logger.addHandler(fh)
LOGGER = logger
return logger
# ==============================================================================
# 工具函数
# ==============================================================================
def get_syslog_priority(level_str: str) -> str:
"""将日志级别字符串转为 syslog 优先级标记"""
normalized = level_str.strip().upper()
return SYSLOG_PRIORITY.get(normalized, "<6>") # 默认为 INFO
def format_syslog_line(
timestamp: str,
hostname: str,
app_name: str,
pid: str,
level: str,
message: str,
trace_id: str = "",
span_id: str = "",
extra: str = "",
) -> str:
"""
格式化为 RFC 5424 变体 syslog 行。
格式:
<priority>version timestamp hostname appname pid msgid structured_data message
示例(完整一行):
<6>1 2026-01-01T12:00:00.123456+08:00 myhost myapp[12345]: [trace=abc] message text
"""
pri = get_syslog_priority(level)
# 去掉 level 字符串两端方括号(如果有)
clean_msg = message.strip()
parts = [f"[{k}={v}]" for k, v in [
("trace", trace_id),
("span", span_id),
("extra", extra),
] if v]
structured = " ".join(parts)
if structured:
structured += " "
return f"{pri}1 {timestamp} {hostname} {app_name}[{pid}]: {structured}{clean_msg}"
def acquire_lock_file(lock_path: str) -> int:
"""
使用 flock 机制获取文件锁,防止重复启动。
返回文件描述符,使用完毕后需 close。
"""
fd = os.open(lock_path, os.O_CREAT | os.O_RDWR, 0o644)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
os.close(fd)
raise RuntimeError(f"另一个 {APP_NAME} 实例已在运行(锁文件:{lock_path}),请先停止。")
return fd
def load_checkpoint(checkpoint_dir: str, source_name: str) -> Optional[Any]:
"""
加载断点文件,返回 last_id可以是 int、datetime 或任意可序列化值)。
若文件不存在,返回 None从头开始拉取
"""
path = Path(checkpoint_dir) / f"{source_name}.json"
if not path.exists():
return None
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("last_id")
except (json.JSONDecodeError, IOError) as e:
if LOGGER:
LOGGER.warning("断点文件损坏 [%s],将从头拉取: %s", path, e)
return None
def save_checkpoint(checkpoint_dir: str, source_name: str, last_id: Any) -> None:
"""持久化断点"""
Path(checkpoint_dir).mkdir(parents=True, exist_ok=True)
path = Path(checkpoint_dir) / f"{source_name}.json"
tmp = path.with_suffix(".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump({"last_id": last_id, "saved_at": datetime.now().isoformat()}, f)
tmp.rename(path) # 原子替换
def load_config(config_path: str) -> Dict[str, Any]:
"""加载 YAML 配置文件"""
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
def merge_column_map(config_map: Optional[Dict[str, str]]) -> Dict[str, str]:
"""合并配置的列映射与默认值"""
result = dict(DEFAULT_COLUMN_MAP)
if config_map:
result.update(config_map)
return result
# ==============================================================================
# 数据库连接工厂
# ==============================================================================
class DatabaseError(Exception):
"""数据库操作异常"""
pass
def build_mysql_query(cfg: Dict[str, Any], col_map: Dict[str, str],
last_id: Optional[int], batch: int) -> Tuple[str, Tuple]:
"""构建 MySQL 查询语句及参数"""
id_col = col_map["id"]
ts_col = col_map["timestamp"]
lvl_col = col_map["level"]
msg_col = col_map["message"]
# 构造 SELECT 列(处理别名)
extra_cols = []
for alias, db_col in col_map.items():
if alias not in ("id", "timestamp", "level", "message") and db_col:
extra_cols.append(f"{db_col} AS {alias}")
select_parts = [id_col, ts_col, lvl_col, msg_col] + extra_cols
# 去重,避免 last_id 列在 extra_cols 里重复
seen = {id_col, ts_col, lvl_col, msg_col}
deduped = []
for p in select_parts:
col = p.split(" AS ")[0].strip()
if col not in seen:
deduped.append(p)
seen.add(col)
select_clause = ", ".join(deduped) if deduped else "*"
if last_id is not None:
sql = (f"SELECT {select_clause} FROM {cfg['table']} "
f"WHERE {id_col} > %s "
f"ORDER BY {id_col} ASC LIMIT %s")
params: Tuple = (last_id, batch)
else:
sql = (f"SELECT {select_clause} FROM {cfg['table']} "
f"ORDER BY {id_col} ASC LIMIT %s")
params = (batch,)
return sql, params
def build_pg_query(cfg: Dict[str, Any], col_map: Dict[str, str],
last_id: Optional[int], batch: int) -> Tuple[str, Tuple]:
"""构建 PostgreSQL 查询语句(语法与 MySQL 基本兼容,%s 替换)"""
return build_mysql_query(cfg, col_map, last_id, batch) # 参数风格相同
def get_rows_mysql(cfg: Dict[str, Any], col_map: Dict[str, str],
last_id: Optional[int], batch: int) -> Tuple[List[Dict], Optional[int]]:
"""从 MySQL 读取日志行,返回 (rows, new_last_id)"""
if pymysql is None:
raise DatabaseError("PyMySQL 未安装,请运行: pip install pymysql")
conn_cfg = {
"host": cfg.get("host", "localhost"),
"port": cfg.get("port", 3306),
"user": cfg["user"],
"password": cfg["password"],
"database": cfg.get("database", ""),
"charset": cfg.get("charset", "utf8mb4"),
"cursorclass": pymysql.cursors.DictCursor,
}
# 不指定 database 时用 None后续再选
if not cfg.get("database"):
conn_cfg.pop("database", None)
conn = pymysql.connect(**conn_cfg)
try:
with conn.cursor() as cur:
if cfg.get("database"):
sql, params = build_mysql_query(cfg, col_map, last_id, batch)
else:
# 先切换 database
cur.execute(f"USE {cfg['database_name']}")
sql, params = build_mysql_query(cfg, col_map, last_id, batch)
cur.execute(sql, params)
rows = cur.fetchall()
finally:
conn.close()
new_last = rows[-1][col_map["id"]] if rows else last_id
return rows, new_last
def get_rows_pg(cfg: Dict[str, Any], col_map: Dict[str, str],
last_id: Optional[int], batch: int) -> Tuple[List[Dict], Optional[int]]:
"""从 PostgreSQL 读取日志行,返回 (rows, new_last_id)"""
if psycopg2 is None:
raise DatabaseError("psycopg2 未安装,请运行: pip install psycopg2-binary")
dsn_parts = []
for key in ("host", "port", "dbname", "user", "password"):
val = cfg.get(key)
if val is not None:
if key == "password":
dsn_parts.append(f'password="{val}"')
else:
dsn_parts.append(f"{key}='{val}'")
dsn = " ".join(dsn_parts)
conn = psycopg2.connect(dsn)
try:
with conn.cursor() as cur:
sql, params = build_pg_query(cfg, col_map, last_id, batch)
cur.execute(sql, params)
rows = cur.fetchall()
# psycopg2 返回普通 tuple需要手动映射列名
col_names = [desc[0] for desc in cur.description]
rows = [dict(zip(col_names, r)) for r in rows]
finally:
conn.close()
new_last = rows[-1][col_map["id"]] if rows else last_id
return rows, new_last
# ==============================================================================
# 单个日志源处理器
# ==============================================================================
class LogSource:
"""单个数据库日志源"""
def __init__(
self,
name: str,
db_type: str,
db_config: Dict[str, Any],
table_config: Dict[str, Any],
output_dir: str,
checkpoint_dir: str,
interval: int,
batch_size: int,
hostname: str,
):
self.name = name
self.db_type = db_type.lower()
self.db_config = db_config
self.table_config = table_config
self.output_dir = Path(output_dir)
self.checkpoint_dir = Path(checkpoint_dir)
self.interval = interval
self.batch_size = batch_size
self.hostname = hostname
self.app_name = table_config.get("app_name", name)
self.pid_str = str(os.getpid())
self.col_map = merge_column_map(table_config.get("columns"))
self.filter_query = table_config.get("filter", "") # WHERE 子句(不含 WHERE
self.log_file_pattern = table_config.get("log_file", f"{name}.log")
self._last_id: Optional[Any] = None
self._running = False
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
self._stop_evt = threading.Event()
# ------------------------------------------------------------------
# 公共控制
# ------------------------------------------------------------------
def start(self) -> None:
with self._lock:
if self._running:
return
self._running = True
self._stop_evt.clear()
self._thread = threading.Thread(target=self._run_loop, daemon=True, name=f"logsrc-{self.name}")
self._thread.start()
def stop(self, timeout: float = 10.0) -> None:
with self._lock:
if not self._running:
return
self._running = False
self._stop_evt.set()
if self._thread:
self._thread.join(timeout=timeout)
def is_alive(self) -> bool:
with self._lock:
return self._running
# ------------------------------------------------------------------
# 内部循环
# ------------------------------------------------------------------
def _run_loop(self) -> None:
logger = LOGGER or logging.getLogger(APP_NAME)
# 启动时加载断点
self._last_id = load_checkpoint(str(self.checkpoint_dir), self.name)
while not self._stop_evt.is_set():
try:
fetched = self._fetch_and_write()
if fetched == 0:
# 没有新日志sleep 等待下次轮询
self._stop_evt.wait(self.interval)
else:
# 有新日志,短暂 sleep 避免空转
self._stop_evt.wait(min(self.interval, 2.0))
except DatabaseError as e:
logger.error("[%s] 数据库错误(%s%ds 后重试: %s", self.name, self.db_type, self.interval, e)
self._stop_evt.wait(self.interval)
except Exception as e:
logger.exception("[%s] 未知异常,%ds 后重试: %s", self.name, self.interval, e)
self._stop_evt.wait(self.interval)
logger.debug("[%s] 线程已停止", self.name)
# ------------------------------------------------------------------
# 核心逻辑
# ------------------------------------------------------------------
def _fetch_and_write(self) -> int:
"""拉取并写入日志,返回本轮写入条数"""
rows, new_last = self._fetch()
if not rows:
return 0
self._write_rows(rows)
self._last_id = new_last
save_checkpoint(str(self.checkpoint_dir), self.name, self._last_id)
return len(rows)
def _fetch(self) -> Tuple[List[Dict], Optional[int]]:
"""调用对应数据库的 fetch 函数"""
if self.db_type == "mysql":
return get_rows_mysql(self.db_config, self.col_map, self._last_id, self.batch_size)
elif self.db_type == "postgresql" or self.db_type == "postgres":
return get_rows_pg(self.db_config, self.col_map, self._last_id, self.batch_size)
else:
raise ValueError(f"不支持的数据库类型: {self.db_type}(仅支持 mysql/postgresql")
def _write_rows(self, rows: List[Dict]) -> None:
"""将行数据写入日志文件"""
out_path = self.output_dir / self.log_file_pattern
self.output_dir.mkdir(parents=True, exist_ok=True)
lines: List[str] = []
for row in rows:
line = self._row_to_syslog(row)
lines.append(line)
with open(out_path, "a", encoding="utf-8", buffering=8192) as f:
for line in lines:
f.write(line + "\n")
logger = LOGGER
if logger:
logger.debug("[%s] 写入 %d 条日志 → %s", self.name, len(lines), out_path)
def _row_to_syslog(self, row: Dict[str, Any]) -> str:
"""将一行数据库记录转为 syslog 格式字符串"""
id_col = self.col_map["id"]
ts_col = self.col_map["timestamp"]
lvl_col = self.col_map["level"]
msg_col = self.col_map["message"]
ts_val = row.get(ts_col)
lvl_val = row.get(lvl_col, "INFO")
msg_val = str(row.get(msg_col) or "")
tid_val = str(row.get(self.col_map.get("trace_id", "")) or "")
sid_val = str(row.get(self.col_map.get("span_id", "")) or "")
ext_val = str(row.get(self.col_map.get("extra", "")) or "")
# 处理时间戳格式
timestamp = self._format_timestamp(ts_val)
return format_syslog_line(
timestamp = timestamp,
hostname = self.hostname,
app_name = self.app_name,
pid = self.pid_str,
level = lvl_val,
message = msg_val,
trace_id = tid_val,
span_id = sid_val,
extra = ext_val,
)
def _format_timestamp(self, val: Any) -> str:
"""
将任意时间格式转为 ISO 8601 格式(带时区)。
支持: datetime / date / 字符串 / unix timestamp(float/int) / None
"""
if val is None:
return datetime.utcnow().isoformat() + "Z"
if isinstance(val, datetime):
try:
return val.isoformat()
except Exception:
return str(val)
if isinstance(val, (int, float)):
try:
dt = datetime.fromtimestamp(val)
return dt.isoformat()
except Exception:
return str(val)
# 字符串:尝试 parse
s = str(val).strip()
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z"):
try:
dt = datetime.strptime(s[:len(fmt.rstrip("._f%z"))], fmt.rstrip("._f%z"))
return dt.isoformat()
except Exception:
pass
return s # 无法解析,返回原字符串
# ==============================================================================
# 主程序
# ==============================================================================
def build_sources_from_config(config: Dict[str, Any]) -> List[LogSource]:
"""根据配置字典构建所有 LogSource 实例"""
global_cfg = config.get("global", {})
db_configs = config.get("databases", {})
sources_cfg = config.get("sources", [])
output_dir = global_cfg.get("output_dir", "/var/log/db_exporter")
checkpoint_dir = global_cfg.get("checkpoint_dir", "/var/lib/db_exporter/checkpoints")
hostname = global_cfg.get("hostname", DEFAULT_HOSTNAME)
default_interval = global_cfg.get("interval", DEFAULT_INTERVAL)
default_batch = global_cfg.get("batch_size", DEFAULT_BATCH)
sources: List[LogSource] = []
for src_cfg in sources_cfg:
db_ref = src_cfg.get("database")
if db_ref not in db_configs:
raise ValueError(f"数据源 [{src_cfg['name']}] 引用的数据库 '{db_ref}' 未在 databases 中定义")
db_type = db_configs[db_ref].get("type", "mysql").lower()
db_cfg = db_configs[db_ref]
sources.append(LogSource(
name = src_cfg["name"],
db_type = db_type,
db_config = db_cfg,
table_config = src_cfg,
output_dir = output_dir,
checkpoint_dir = checkpoint_dir,
interval = src_cfg.get("interval", default_interval),
batch_size = src_cfg.get("batch_size", default_batch),
hostname = hostname,
))
return sources
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="数据库日志导出守护进程 — 从 MySQL / PostgreSQL 定时拉取日志,写入标准 syslog 格式文件",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
%(prog)s -c /etc/db_log_exporter/config.yaml
%(prog)s -c config.yaml --once # 运行一次(不守护)并退出
%(prog)s -c config.yaml --dry-run # 仅连接测试,不写入文件
%(prog)s -c config.yaml --log-level DEBUG
""",
)
parser.add_argument("-c", "--config", required=True,
help="YAML 配置文件路径")
parser.add_argument("--once", action="store_true",
help="仅执行一次轮询后退出(不守护)")
parser.add_argument("--dry-run", action="store_true",
help="仅测试数据库连接,不写入日志文件")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="本程序的日志级别(默认 INFO")
parser.add_argument("--log-file",
help="本程序的日志输出文件(默认仅输出到 stdout")
return parser
def main() -> int:
parser = create_parser()
args = parser.parse_args()
# 配置日志
setup_logging(level=args.log_level, log_file=args.log_file)
logger = LOGGER
# 加载配置
try:
config = load_config(args.config)
except FileNotFoundError:
print(f"[ERROR] 配置文件不存在: {args.config}", file=sys.stderr)
return 1
except yaml.YAMLError as e:
print(f"[ERROR] YAML 解析失败: {e}", file=sys.stderr)
return 1
# 构建数据源
try:
sources = build_sources_from_config(config)
except Exception as e:
logger.error("配置解析失败: %s", e)
return 1
if not sources:
logger.warning("配置中没有发现任何数据源,请检查配置文件。")
return 0
# 确保必要目录存在(如果以 root 运行时)
for d in [config.get("global", {}).get("output_dir", "/var/log/db_exporter"),
config.get("global", {}).get("checkpoint_dir", "/var/lib/db_exporter/checkpoints")]:
try:
Path(d).mkdir(parents=True, exist_ok=True)
except PermissionError:
logger.warning("无权限创建目录 %s,将使用当前目录下的子目录", d)
# Dry-run仅测试连接
if args.dry_run:
logger.info("=== Dry-run 模式:仅测试数据库连接 ===")
ok = True
for src in sources:
try:
rows, _ = src._fetch()
logger.info("[%s] ✅ 连接成功,当前有新日志 %d 条(未写入)", src.name, len(rows))
except Exception as e:
logger.error("[%s] ❌ 连接失败: %s", src.name, e)
ok = False
return 0 if ok else 1
# 获取锁文件(防止重复启动)
lock_file = f"/var/run/{APP_NAME}/{APP_NAME}.lock"
lock_fd = None
try:
Path(lock_file).parent.mkdir(parents=True, exist_ok=True)
lock_fd = acquire_lock_file(lock_file)
except PermissionError:
logger.warning("无权限创建锁文件 %s,跳过锁定检测(可能重复启动)", lock_file)
except RuntimeError as e:
logger.error("%s", e)
return 1
# 注册信号处理
def signal_handler(signum, frame):
sig_name = signal.Signals(signum).name
logger.info("收到信号 %s,准备优雅退出...", sig_name)
for src in sources:
src.stop(timeout=5.0)
logger.info("所有数据源已停止,退出。")
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# 启动所有数据源
for src in sources:
src.start()
logger.info("已启动数据源: %s (db=%s, table=%s, interval=%ds)",
src.name, src.db_type, src.table_config["table"], src.interval)
logger.info("=== %s 已启动PID=%d,共 %d 个数据源 ===",
APP_NAME, os.getpid(), len(sources))
if args.once:
# --once等待一轮轮询后退出
time.sleep(max(s.interval for s in sources) + 2)
for src in sources:
src.stop(timeout=5.0)
logger.info("=== 单次轮询完成,退出 ===")
else:
# 守护进程主循环(防止主线程退出)
while True:
time.sleep(3600)
if lock_fd is not None:
os.close(lock_fd)
return 0
if __name__ == "__main__":
sys.exit(main())

41
db_log_exporter.service Normal file
View File

@@ -0,0 +1,41 @@
[Unit]
Description=Database Log Exporter — 从 MySQL/PostgreSQL 拉取日志写入文本文件
Documentation=https://github.com/example/db_log_exporter
After=network.target mysql.service postgresql.service
Wants=mysql.service postgresql.service
[Service]
# 以专门的用户/组运行推荐先创建示例useradd -r -s /sbin/nologin db_exporter
User=root
Group=root
# 程序路径
ExecStart=/usr/bin/python3 /opt/db_log_exporter/db_log_exporter.py \
--config /etc/db_log_exporter/config.yaml \
--log-file /var/log/db_exporter/exporter.log \
--log-level INFO
# 重启策略
Restart=on-failure
RestartSec=10
StartLimitBurst=5
# 安全加固
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/log/db_exporter /var/lib/db_exporter
# 环境变量(可选,敏感信息可用 systemd secret 或环境文件)
# EnvironmentFile=/etc/db_log_exporter/env
# 日志输出systemd journal
StandardOutput=journal
StandardError=journal
SyslogIdentifier=db_log_exporter
# 优雅停止超时
TimeoutStopSec=30
[Install]
WantedBy=multi-user.target

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
PyMySQL>=1.1.0
psycopg2-binary>=2.9.0
PyYAML>=6.0

175
setup.sh Executable file
View File

@@ -0,0 +1,175 @@
#!/usr/bin/env bash
# ==============================================================================
# db_log_exporter — 安装部署脚本
# ==============================================================================
# 用法:
# sudo ./setup.sh [install | uninstall | status]
#
# 支持: CentOS 7/8, RHEL, Fedora, Debian, Ubuntu
# ==============================================================================
set -euo pipefail
APP_NAME="db_log_exporter"
INSTALL_DIR="/opt/${APP_NAME}"
CONFIG_DIR="/etc/${APP_NAME}"
SYSTEMD_UNIT="${APP_NAME}.service"
LOG_DIR="/var/log/${APP_NAME}"
DATA_DIR="/var/lib/${APP_NAME}"
RUN_DIR="/var/run/${APP_NAME}"
CHECKPOINT_DIR="${DATA_DIR}/checkpoints"
# 颜色
RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; NC='\033[0m'
info() { echo -e "${GREEN}[INFO]${NC} $*"; }
warn() { echo -e "${YELLOW}[WARN]${NC} $*" >&2; }
error() { echo -e "${RED}[ERROR]${NC} $*" >&2; exit 1; }
need_root() { [ "$(id -u)" -eq 0 ] || error "请用 root 权限运行: sudo $0 $*"; }
# ---------------------------------------------------------------------------
# 检查依赖
# ---------------------------------------------------------------------------
check_deps() {
info "检查依赖..."
local missing=()
if ! command -v python3 &>/dev/null; then
missing+=("python3")
fi
if ! python3 -c "import pymysql" 2>/dev/null; then
missing+=("PyMySQL (pip3 install pymysql)")
fi
if ! python3 -c "import psycopg2" 2>/dev/null; then
missing+=("psycopg2-binary (pip3 install psycopg2-binary)")
fi
if ! python3 -c "import yaml" 2>/dev/null; then
missing+=("PyYAML (pip3 install pyyaml)")
fi
if [ ${#missing[@]} -gt 0 ]; then
error "缺少依赖: ${missing[*]} \n请先安装: pip3 install PyMySQL psycopg2-binary PyYAML"
fi
# 检查 MySQL / PostgreSQL 客户端(仅检查命令是否存在,不强制要求服务运行)
if command -v mysql &>/dev/null || command -v psql &>/dev/null; then
info "数据库客户端已找到"
else
warn "未找到 mysql/psql 客户端(程序使用 Python DB 驱动,不影响运行)"
fi
info "依赖检查通过 ✓"
}
# ---------------------------------------------------------------------------
# 安装
# ---------------------------------------------------------------------------
do_install() {
need_root "$@"
check_deps
info "开始安装 ${APP_NAME}..."
# 1. 创建目录
for d in "${INSTALL_DIR}" "${CONFIG_DIR}" "${LOG_DIR}" "${DATA_DIR}" "${CHECKPOINT_DIR}" "${RUN_DIR}"; do
mkdir -p "$d"
echo " 创建: $d"
done
# 2. 复制文件
cp "$(dirname "$0")/db_log_exporter.py" "${INSTALL_DIR}/"
cp "$(dirname "$0")/config.yaml" "${CONFIG_DIR}/config.yaml.example"
[ -f "$(dirname "$0")/requirements.txt" ] && \
cp "$(dirname "$0")/requirements.txt" "${INSTALL_DIR}/"
info "文件已复制到 ${INSTALL_DIR}${CONFIG_DIR}"
# 3. 创建 config.yaml如果不存在
if [ ! -f "${CONFIG_DIR}/config.yaml" ]; then
if [ -f "${CONFIG_DIR}/config.yaml.example" ]; then
cp "${CONFIG_DIR}/config.yaml.example" "${CONFIG_DIR}/config.yaml"
warn "已创建默认配置: ${CONFIG_DIR}/config.yaml"
warn "请编辑配置文件,填入数据库连接信息后再启动服务!"
fi
fi
# 4. 安装 systemd 服务
local systemd_dir="/etc/systemd/system"
cp "$(dirname "$0")/${SYSTEMD_UNIT}" "${systemd_dir}/"
chmod 644 "${systemd_dir}/${SYSTEMD_UNIT}"
systemctl daemon-reload
info "systemd 服务已安装 ✓"
info ""
info "=== 后续步骤 ==="
info "1. 编辑配置: nano ${CONFIG_DIR}/config.yaml"
info "2. 检查连接: python3 ${INSTALL_DIR}/${APP_NAME}.py -c ${CONFIG_DIR}/config.yaml --dry-run"
info "3. 启动服务: systemctl start ${APP_NAME}"
info "4. 开机自启: systemctl enable ${APP_NAME}"
info "5. 查看日志: journalctl -u ${APP_NAME} -f"
info ""
info "安装完成 ✓"
}
# ---------------------------------------------------------------------------
# 卸载
# ---------------------------------------------------------------------------
do_uninstall() {
need_root "$@"
info "停止并禁用服务..."
systemctl stop "${APP_NAME}" 2>/dev/null || true
systemctl disable "${APP_NAME}" 2>/dev/null || true
info "删除 systemd unit..."
rm -f "/etc/systemd/system/${SYSTEMD_UNIT}"
systemctl daemon-reload
info "删除安装目录(确认不再需要)..."
read -rp "是否删除 ${INSTALL_DIR}${CONFIG_DIR}[y/N]: " confirm
if [[ "$confirm" =~ ^[Yy]$ ]]; then
rm -rf "${INSTALL_DIR}" "${CONFIG_DIR}" "${LOG_DIR}" "${DATA_DIR}" "${RUN_DIR}"
info "已删除所有文件 ✓"
else
warn "保留文件,手动清理: rm -rf ${INSTALL_DIR} ${CONFIG_DIR}"
fi
}
# ---------------------------------------------------------------------------
# 状态
# ---------------------------------------------------------------------------
do_status() {
if systemctl is-active --quiet "${APP_NAME}"; then
info "服务状态: 运行中 ✓"
systemctl status "${APP_NAME}" --no-pager
elif systemctl is-enabled --quiet "${APP_NAME}"; then
warn "服务状态: 未运行(已开机自启)"
else
warn "服务状态: 未安装或未启用"
fi
if [ -d "${CHECKPOINT_DIR}" ]; then
echo ""
info "断点文件:"
ls -la "${CHECKPOINT_DIR}" 2>/dev/null | grep -v "^total" || echo " (空)"
fi
if [ -d "${LOG_DIR}" ]; then
echo ""
info "日志文件:"
ls -lh "${LOG_DIR}" 2>/dev/null || echo " (空)"
fi
}
# ---------------------------------------------------------------------------
# 主入口
# ---------------------------------------------------------------------------
ACTION="${1:-install}"
case "$ACTION" in
install) do_install "$@" ;;
uninstall) do_uninstall "$@" ;;
status) do_status "$@" ;;
*) error "未知操作: $ACTION(支持: install | uninstall | status" ;;
esac