mirror of
https://github.com/Coldin04/vaultwarden_backup.git
synced 2026-02-17 16:13:32 +00:00
154 lines
4.7 KiB
Python
154 lines
4.7 KiB
Python
import os
|
|
import tarfile
|
|
import boto3
|
|
import datetime
|
|
import hashlib
|
|
import shutil
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
from subprocess import run
|
|
|
|
# 加载配置
|
|
load_dotenv('./backup.env') # 或 '.env',看你放哪
|
|
|
|
# 环境变量
|
|
access_key = os.getenv('R2_ACCESS_KEY_ID')
|
|
secret_key = os.getenv('R2_SECRET_ACCESS_KEY')
|
|
account_id = os.getenv('R2_ACCOUNT_ID')
|
|
bucket = os.getenv('R2_BUCKET_NAME')
|
|
region = os.getenv('R2_REGION', 'auto')
|
|
|
|
source_dir = os.getenv('BACKUP_SOURCE_DIR')
|
|
tar_path = os.getenv('BACKUP_TEMP_FILE')
|
|
enc_path = os.getenv('BACKUP_ENCRYPTED_FILE')
|
|
enc_password = os.getenv('ENCRYPT_PASSWORD')
|
|
|
|
slot_count = int(os.getenv('SLOT_COUNT', 3))
|
|
backup_prefix = os.getenv('BACKUP_PREFIX', 'back-vault-s')
|
|
|
|
# 创建 R2 客户端
|
|
session = boto3.session.Session()
|
|
client = session.client(
|
|
service_name='s3',
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
endpoint_url=f'https://{account_id}.r2.cloudflarestorage.com',
|
|
region_name=region,
|
|
)
|
|
|
|
# 发送 Telegram 消息
|
|
def send_telegram_message(text):
|
|
bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
|
|
chat_id = os.getenv('TELEGRAM_CHAT_ID')
|
|
if not bot_token or not chat_id:
|
|
print('[!] 未配置 Telegram 推送,无法发送通知')
|
|
return
|
|
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
|
payload = {"chat_id": chat_id, "text": text}
|
|
try:
|
|
requests.post(url, data=payload, timeout=10)
|
|
except Exception as e:
|
|
print(f'[!] Telegram 推送失败: {e}')
|
|
|
|
"""
|
|
def tar_backup():
|
|
print('[*] 打包数据...')
|
|
with tarfile.open(tar_path, "w:gz") as tar:
|
|
tar.add(source_dir, arcname=os.path.basename(source_dir))
|
|
"""
|
|
def sqlite_backup():
|
|
print('[*] 使用 sqlite3 .backup 命令备份数据库...')
|
|
db_file = os.path.join(source_dir, 'db.sqlite3')
|
|
backup_db_file = os.path.join(source_dir, 'db-backup.sqlite3')
|
|
if os.path.exists(db_file):
|
|
cmd = [
|
|
"sqlite3", db_file,
|
|
f".backup '{backup_db_file}'"
|
|
]
|
|
run(cmd, check=True)
|
|
else:
|
|
print('[!] 未找到数据库文件,跳过数据库备份')
|
|
return backup_db_file
|
|
|
|
def tar_backup():
|
|
print('[*] 只备份必要数据...')
|
|
backup_db_file = sqlite_backup()
|
|
with tarfile.open(tar_path, "w:gz") as tar:
|
|
# 主数据库
|
|
db_file = os.path.join(source_dir, 'db.sqlite3')
|
|
if os.path.exists(db_file):
|
|
tar.add(db_file, arcname='db.sqlite3')
|
|
# 配置文件
|
|
config_file = os.path.join(source_dir, 'config.json')
|
|
if os.path.exists(config_file):
|
|
tar.add(config_file, arcname='config.json')
|
|
# 私钥
|
|
rsa_file = os.path.join(source_dir, 'rsa_key.pem')
|
|
if os.path.exists(rsa_file):
|
|
tar.add(rsa_file, arcname='rsa_key.pem')
|
|
# 附件目录(如有需要)
|
|
attachments_dir = os.path.join(source_dir, 'attachments')
|
|
if os.path.exists(attachments_dir):
|
|
tar.add(attachments_dir, arcname='attachments')
|
|
if os.path.exists(backup_db_file):
|
|
os.remove(backup_db_file)
|
|
|
|
def encrypt_backup():
|
|
print('[*] 加密备份...')
|
|
cmd = [
|
|
"openssl", "enc", "-aes-256-cbc", "-salt", "-pbkdf2",
|
|
"-in", tar_path,
|
|
"-out", enc_path,
|
|
"-k", enc_password
|
|
]
|
|
run(cmd, check=True)
|
|
|
|
def upload_backup():
|
|
now = datetime.datetime.utcnow().strftime('%Y%m%d-%H%M%S')
|
|
object_key = f"{backup_prefix}{now}.tar.gz.enc"
|
|
print(f'[*] 上传备份 {object_key} ...')
|
|
with open(enc_path, 'rb') as f:
|
|
client.upload_fileobj(f, bucket, object_key)
|
|
return object_key
|
|
|
|
def rotate_backups():
|
|
print('[*] 检查并轮换旧备份...')
|
|
res = client.list_objects_v2(Bucket=bucket, Prefix=backup_prefix)
|
|
if 'Contents' not in res:
|
|
print('[+] 无需轮换,当前仅有一个备份')
|
|
return
|
|
|
|
backups = sorted(
|
|
[obj['Key'] for obj in res['Contents']],
|
|
key=lambda k: k
|
|
)
|
|
if len(backups) <= slot_count:
|
|
print(f'[+] 当前备份数 {len(backups)} 未超过 {slot_count},无需删除')
|
|
return
|
|
|
|
to_delete = backups[0:len(backups)-slot_count]
|
|
for key in to_delete:
|
|
print(f'[-] 删除过旧备份 {key}')
|
|
client.delete_object(Bucket=bucket, Key=key)
|
|
|
|
def cleanup():
|
|
print('[*] 清理临时文件')
|
|
for f in [tar_path, enc_path]:
|
|
if os.path.exists(f):
|
|
os.remove(f)
|
|
|
|
def main():
|
|
try:
|
|
tar_backup()
|
|
encrypt_backup()
|
|
upload_backup()
|
|
rotate_backups()
|
|
cleanup()
|
|
print('[✓] 所有备份流程完成')
|
|
except Exception as e:
|
|
err_msg = f'[!] 备份流程出错: {e}'
|
|
print(err_msg)
|
|
send_telegram_message(err_msg)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|