import os import tarfile import boto3 import datetime import hashlib import shutil import requests from dotenv import load_dotenv from subprocess import run # 加载配置 load_dotenv('./backup.env') # 或 '.env',看你放哪 # 环境变量 access_key = os.getenv('R2_ACCESS_KEY_ID') secret_key = os.getenv('R2_SECRET_ACCESS_KEY') account_id = os.getenv('R2_ACCOUNT_ID') bucket = os.getenv('R2_BUCKET_NAME') region = os.getenv('R2_REGION', 'auto') source_dir = os.getenv('BACKUP_SOURCE_DIR') tar_path = os.getenv('BACKUP_TEMP_FILE') enc_path = os.getenv('BACKUP_ENCRYPTED_FILE') enc_password = os.getenv('ENCRYPT_PASSWORD') slot_count = int(os.getenv('SLOT_COUNT', 3)) backup_prefix = os.getenv('BACKUP_PREFIX', 'back-vault-s') # 创建 R2 客户端 session = boto3.session.Session() client = session.client( service_name='s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com', region_name=region, ) # 发送 Telegram 消息 def send_telegram_message(text): bot_token = os.getenv('TELEGRAM_BOT_TOKEN') chat_id = os.getenv('TELEGRAM_CHAT_ID') if not bot_token or not chat_id: print('[!] 未配置 Telegram 推送,无法发送通知') return url = f"https://api.telegram.org/bot{bot_token}/sendMessage" payload = {"chat_id": chat_id, "text": text} try: requests.post(url, data=payload, timeout=10) except Exception as e: print(f'[!] Telegram 推送失败: {e}') """ def tar_backup(): print('[*] 打包数据...') with tarfile.open(tar_path, "w:gz") as tar: tar.add(source_dir, arcname=os.path.basename(source_dir)) """ def sqlite_backup(): print('[*] 使用 sqlite3 .backup 命令备份数据库...') db_file = os.path.join(source_dir, 'db.sqlite3') backup_db_file = os.path.join(source_dir, 'db-backup.sqlite3') if os.path.exists(db_file): cmd = [ "sqlite3", db_file, f".backup '{backup_db_file}'" ] run(cmd, check=True) else: print('[!] 未找到数据库文件,跳过数据库备份') return backup_db_file def tar_backup(): print('[*] 只备份必要数据...') backup_db_file = sqlite_backup() with tarfile.open(tar_path, "w:gz") as tar: # 主数据库 db_file = os.path.join(source_dir, 'db.sqlite3') if os.path.exists(db_file): tar.add(db_file, arcname='db.sqlite3') # 配置文件 config_file = os.path.join(source_dir, 'config.json') if os.path.exists(config_file): tar.add(config_file, arcname='config.json') # 私钥 rsa_file = os.path.join(source_dir, 'rsa_key.pem') if os.path.exists(rsa_file): tar.add(rsa_file, arcname='rsa_key.pem') # 附件目录(如有需要) attachments_dir = os.path.join(source_dir, 'attachments') if os.path.exists(attachments_dir): tar.add(attachments_dir, arcname='attachments') if os.path.exists(backup_db_file): os.remove(backup_db_file) def encrypt_backup(): print('[*] 加密备份...') cmd = [ "openssl", "enc", "-aes-256-cbc", "-salt", "-pbkdf2", "-in", tar_path, "-out", enc_path, "-k", enc_password ] run(cmd, check=True) def upload_backup(): now = datetime.datetime.utcnow().strftime('%Y%m%d-%H%M%S') object_key = f"{backup_prefix}{now}.tar.gz.enc" print(f'[*] 上传备份 {object_key} ...') with open(enc_path, 'rb') as f: client.upload_fileobj(f, bucket, object_key) return object_key def rotate_backups(): print('[*] 检查并轮换旧备份...') res = client.list_objects_v2(Bucket=bucket, Prefix=backup_prefix) if 'Contents' not in res: print('[+] 无需轮换,当前仅有一个备份') return backups = sorted( [obj['Key'] for obj in res['Contents']], key=lambda k: k ) if len(backups) <= slot_count: print(f'[+] 当前备份数 {len(backups)} 未超过 {slot_count},无需删除') return to_delete = backups[0:len(backups)-slot_count] for key in to_delete: print(f'[-] 删除过旧备份 {key}') client.delete_object(Bucket=bucket, Key=key) def cleanup(): print('[*] 清理临时文件') for f in [tar_path, enc_path]: if os.path.exists(f): os.remove(f) def main(): try: tar_backup() encrypt_backup() upload_backup() rotate_backups() cleanup() print('[✓] 所有备份流程完成') except Exception as e: err_msg = f'[!] 备份流程出错: {e}' print(err_msg) send_telegram_message(err_msg) if __name__ == "__main__": main()