first code

This commit is contained in:
Coldin04 2025-08-13 18:49:19 +08:00
commit b18805aa26
11 changed files with 612 additions and 0 deletions

33
.gitignore vendored Normal file
View file

@ -0,0 +1,33 @@
# Python
__pycache__/
*.py[cod]
*.pyo
*.pyd
*.pyc
# 虚拟环境
venv/
env/
ENV/
.venv/
.ENV/
# 日志和临时文件
*.log
*.tmp
*.swp
# VSCode
.vscode/
# 系统文件
.DS_Store
Thumbs.db
# 配置和密钥(如有)
*.env
config.py
# pip 缓存
pip-wheel-metadata/
*.egg-info/

138
README.md Normal file
View file

@ -0,0 +1,138 @@
# RSS Plugin for Quote/0
## 项目简介
RSS Plugin for Quote/0 是一个基于 Python Flask 的 RSS API 服务,致力于为 [quote/0 墨水屏摆件](https://dot.mindreset.tech/) 拓展更多场景的数据接口。项目目前已支持加密货币行情(如 ETH-USDC未来可扩展服务器监控、天气、资讯等多种 RSS 数据源,助力墨水屏设备实现更多创意玩法。
> 了解 quote/0 墨水屏摆件的更多玩法与社区创意,推荐阅读:[少数派专栏介绍](https://sspai.com/post/100304)
---
## ⚠️ 免责声明
本项目部分或全部代码由 [GitHub Copilot](https://github.com/features/copilot) AI 辅助生成,仅供学习与交流使用。请勿直接用于生产环境或商业用途,使用前请充分审核和测试。
[![GitHub Copilot enabled](https://img.shields.io/badge/Copilot-Enabled-4B8BBE?logo=github&logoColor=white)](https://github.com/features/copilot)
[![Powered by AI](https://img.shields.io/badge/AI%20Generated-Yes-blueviolet)](https://github.com/features/copilot)
---
## 主要功能
- **REST API 服务**:输出标准 RSS 格式数据,适配墨水屏等低功耗设备
- **OKX 实时行情**:通过 OKX 官方 SDK 获取 ETH-USDC 最新价格
- **30 秒缓存机制**:减少外部 API 请求,提升响应速度
- **可扩展架构**:支持未来扩展更多数据源或监控功能
- **健壮的错误处理与日志**
---
## 技术栈
- Python 3.x
- Flask
- okx-sdk
- requests
- websockets
- XML 生成
---
## 目录结构
```
quote-rss-show/
├── app.py # 主 Flask 应用
├── requirements.txt # 项目依赖
├── config.py # 配置文件(建议用环境变量管理敏感信息)
├── services/ # 服务层
│ ├── base_service.py
│ └── crypto_service.py
├── handlers/ # RSS 内容处理器
│ ├── base_handler.py
│ ├── crypto_handler.py
│ └── registry.py
└── utils/ # 工具类
└── rss_generator.py
```
---
## 快速开始
### 1. 克隆项目
```bash
git clone <your-repo-url>
cd quote-rss-show
```
### 2. 创建虚拟环境并安装依赖
```bash
python -m venv venv
# Windows
venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
pip install -r requirements.txt
```
### 3. 配置 OKX API 密钥
建议通过环境变量配置密钥,或在 `config.py` 中填写(不推荐将密钥提交到仓库)。
### 4. 启动服务(开发模式)
```bash
python app.py
```
访问:
- [http://127.0.0.1:5000/](http://127.0.0.1:5000/) 查看 API 文档
- [http://127.0.0.1:5000/rss/crypto?pair=ETH-USDC](http://127.0.0.1:5000/rss/crypto?pair=ETH-USDC) 获取 ETH-USDC 价格 RSS
---
## 生产部署建议
- 使用 Gunicorn 等 WSGI 服务部署(如:`gunicorn -w 4 -b 0.0.0.0:5000 app:app`
- 推荐用 Supervisor 或 systemd 守护进程
- 前置 Nginx 反向代理,支持 HTTPS
- 关闭 Flask debug做好日志与监控
- 仅开放必要端口,定期更新依赖
---
## API 端点
- `GET /rss/crypto?pair=ETH-USDC`
返回 ETH-USDC 价格的 RSS XML
- `GET /`
API 文档页面
- `GET /health`
健康检查
---
## 相关链接
- [quote/0 墨水屏摆件官网](https://dot.mindreset.tech/)
- [少数派专栏介绍](https://sspai.com/post/100304)
---
## LICENSE
MIT
---
## 致谢
- [OKX 官方 Python SDK](https://github.com/burakoner/okx-sdk)

89
app.py Normal file
View file

@ -0,0 +1,89 @@
from flask import Flask, request, Response
import logging
from config import config
from handlers.registry import HandlerRegistry
from utils.rss_generator import RSSGenerator
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_app(config_name='default'):
app = Flask(__name__)
app.config.from_object(config[config_name])
# 初始化处理器注册中心
handler_registry = HandlerRegistry()
# 初始化RSS生成器
rss_generator = RSSGenerator(
title=app.config['RSS_TITLE'],
description=app.config['RSS_DESCRIPTION'],
link=app.config['RSS_LINK'],
language=app.config['RSS_LANGUAGE']
)
@app.route('/')
def index():
"""API文档页面"""
available_handlers = handler_registry.list_handlers()
docs = f"""
<h1>Crypto RSS Service</h1>
<h2>可用的RSS订阅源</h2>
<ul>
"""
for handler_name in available_handlers:
if handler_name == 'crypto':
docs += f"""
<li><a href="/rss/{handler_name}?pair=ETH-USDT">/rss/{handler_name}?pair=ETH-USDT</a> - ETH价格</li>
<li><a href="/rss/{handler_name}?pair=BTC-USDT">/rss/{handler_name}?pair=BTC-USDT</a> - BTC价格</li>
"""
docs += """
</ul>
<h2>使用说明</h2>
<p>将RSS链接粘贴到您的墨水屏RSS阅读器中即可</p>
"""
return docs
@app.route('/rss/<handler_name>')
def rss_feed(handler_name):
"""RSS订阅源端点"""
handler = handler_registry.get_handler(handler_name)
if not handler:
return Response("Handler not found", status=404)
try:
# 获取查询参数
request_params = request.args.to_dict()
# 让处理器处理请求
items = handler.handle(**request_params)
# 生成RSS XML
rss_xml = rss_generator.generate_rss(items)
return Response(rss_xml, mimetype='application/rss+xml')
except Exception as e:
logger.error(f"生成RSS时出错: {e}")
error_items = [{
"title": "服务暂时不可用",
"description": f"生成RSS时出现错误: {str(e)}",
"link": app.config['RSS_LINK']
}]
rss_xml = rss_generator.generate_rss(error_items)
return Response(rss_xml, mimetype='application/rss+xml', status=500)
@app.route('/health')
def health_check():
"""健康检查端点"""
return {"status": "healthy", "available_handlers": handler_registry.list_handlers()}
return app
if __name__ == '__main__':
app = create_app('development')
app.run(host='0.0.0.0', port=5000, debug=True)

15
handlers/base_handler.py Normal file
View file

@ -0,0 +1,15 @@
from abc import ABC, abstractmethod
from typing import Dict, Any, List
class BaseHandler(ABC):
"""基础处理器类"""
@abstractmethod
def handle(self, **kwargs) -> List[Dict[str, Any]]:
"""处理请求并返回RSS项目列表"""
pass
@abstractmethod
def get_handler_name(self) -> str:
"""返回处理器名称"""
pass

View file

@ -0,0 +1,50 @@
from typing import Dict, Any, List
from datetime import datetime
from .base_handler import BaseHandler
from services.crypto_service import CryptoService
class CryptoHandler(BaseHandler):
"""加密货币价格处理器"""
def __init__(self, crypto_service: CryptoService):
self.crypto_service = crypto_service
def handle(self, pair: str = "ETH-USDT", **kwargs) -> List[Dict[str, Any]]:
"""处理加密货币价格请求"""
data = self.crypto_service.get_cached_data(f"{pair.lower()}_price", pair=pair)
if not data:
return [{
"title": f"{pair} 价格获取失败",
"description": "无法获取当前价格,请稍后重试",
"link": "https://www.okx.com",
"pub_date": datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0800"),
"guid": f"{pair}-error-{int(datetime.now().timestamp())}"
}]
# 格式化价格信息
price = data['price']
change_24h = data['change_24h']
change_symbol = "📈" if change_24h >= 0 else "📉"
title = f"{pair.replace('-', '/')} 当前价格: ${price:,.4f}"
description = f"""
🔸 当前价格: ${price:,.4f}
🔸 24小时涨跌: {change_symbol} {change_24h:+.2f}%
🔸 24小时最高: ${data['high_24h']:,.4f}
🔸 24小时最低: ${data['low_24h']:,.4f}
🔸 24小时成交量: {data['volume_24h']:,.2f}
🔸 更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
""".strip()
return [{
"title": title,
"description": description,
"link": f"https://www.okx.com/trade-spot/{pair.lower()}",
"pub_date": datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0800"),
"guid": f"{pair}-{int(datetime.now().timestamp())}"
}]
def get_handler_name(self) -> str:
return "crypto"

30
handlers/registry.py Normal file
View file

@ -0,0 +1,30 @@
from typing import Dict,List
from .base_handler import BaseHandler
from .crypto_handler import CryptoHandler
from services.crypto_service import CryptoService
class HandlerRegistry:
"""处理器注册中心"""
def __init__(self):
self.handlers: Dict[str, BaseHandler] = {}
self._register_default_handlers()
def _register_default_handlers(self):
"""注册默认处理器"""
# 注册加密货币处理器
crypto_service = CryptoService()
crypto_handler = CryptoHandler(crypto_service)
self.register_handler(crypto_handler)
def register_handler(self, handler: BaseHandler):
"""注册处理器"""
self.handlers[handler.get_handler_name()] = handler
def get_handler(self, handler_name: str) -> BaseHandler:
"""获取处理器"""
return self.handlers.get(handler_name)
def list_handlers(self) -> List[str]:
"""列出所有可用的处理器"""
return list(self.handlers.keys())

5
requirements.txt Normal file
View file

@ -0,0 +1,5 @@
Flask
okx-sdk
requests
websockets
Werkzeug==2.3.7

38
services/base_service.py Normal file
View file

@ -0,0 +1,38 @@
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
class BaseService(ABC):
"""基础服务类,为扩展提供统一接口"""
def __init__(self):
self.cache = {}
self.cache_timeout = timedelta(seconds=30)
@abstractmethod
def get_data(self, **kwargs) -> Optional[Dict[str, Any]]:
"""获取数据的抽象方法"""
pass
def get_cached_data(self, cache_key: str, **kwargs) -> Optional[Dict[str, Any]]:
"""带缓存的数据获取"""
# 检查缓存
if cache_key in self.cache:
cached_item = self.cache[cache_key]
if datetime.now() - cached_item['timestamp'] < self.cache_timeout:
return cached_item['data']
# 获取新数据
data = self.get_data(**kwargs)
if data:
# 更新缓存
self.cache[cache_key] = {
'data': data,
'timestamp': datetime.now()
}
return data
def clear_cache(self):
"""清理缓存"""
self.cache.clear()

View file

@ -0,0 +1,47 @@
from typing import Dict, Any, Optional
import logging
from okx import OkxRestClient
from .base_service import BaseService
logger = logging.getLogger(__name__)
class CryptoService(BaseService):
"""加密货币价格服务"""
def __init__(self, api_key: str = "", secret_key: str = "", passphrase: str = ""):
super().__init__()
# 对于公共数据不需要API密钥
self.client = OkxRestClient(api_key, secret_key, passphrase) if api_key else OkxRestClient()
def get_data(self, pair: str = "ETH-USDT") -> Optional[Dict[str, Any]]:
"""获取指定交易对的价格数据"""
try:
# 使用正确的marketdata属性获取ticker数据
response = self.client.marketdata.get_ticker(instId=pair)
if response and response.get('code') == '0' and response.get('data'):
ticker_data = response['data'][0]
# 手动计算24小时百分比变化
last_price = float(ticker_data['last'])
open_price = float(ticker_data['open24h'])
pct_change = ((last_price - open_price) / open_price) * 100
# 格式化数据
return {
'pair': pair,
'price': last_price,
'change_24h': round(pct_change, 2), # 保留两位小数
'volume_24h': float(ticker_data.get('vol24h', 0)),
'high_24h': float(ticker_data.get('high24h', 0)),
'low_24h': float(ticker_data.get('low24h', 0)),
'timestamp': ticker_data['ts']
}
else:
logger.error(f"获取{pair}价格失败: {response}")
return None
except Exception as e:
logger.error(f"获取加密货币价格时出错: {e}")
return None

127
test.py Normal file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
OKX marketdata属性测试脚本
用于验证通过marketdata属性获取价格数据
"""
import sys
from pprint import pprint
# 导入必要的库
try:
from okx import OkxRestClient
print("成功导入 okx.OkxRestClient")
except ImportError:
print("错误: 无法导入OKX SDK, 请安装: pip install okx-sdk")
sys.exit(1)
def test_marketdata():
"""测试通过marketdata属性获取价格"""
print("\n=== 测试 marketdata.get_ticker 方法 ===")
# 初始化客户端 (不使用API密钥)
client = OkxRestClient()
# 检查是否有marketdata属性
if not hasattr(client, 'marketdata'):
print("❌ 错误: client对象没有marketdata属性")
print(f"可用属性: {', '.join([a for a in dir(client) if not a.startswith('_')])}")
return
print("✅ 找到marketdata属性")
# 检查marketdata对象是否有get_ticker方法
marketdata = client.marketdata
if not hasattr(marketdata, 'get_ticker'):
print("❌ 错误: marketdata对象没有get_ticker方法")
print(f"可用方法: {', '.join([a for a in dir(marketdata) if not a.startswith('_')])}")
return
print("✅ 找到get_ticker方法")
# 测试获取ETH-USDT价格
try:
pair = "ETH-USDT"
print(f"\n获取 {pair} 价格...")
response = client.marketdata.get_ticker(instId=pair)
print(f"响应状态码: {response.get('code')}")
if response and response.get('code') == '0' and response.get('data'):
print("✅ 成功获取价格数据!")
ticker_data = response['data'][0]
price = float(ticker_data['last'])
print(f"\n{pair} 当前价格: {price} USDT")
print(f"24小时最高价: {ticker_data.get('high24h', 'N/A')}")
print(f"24小时最低价: {ticker_data.get('low24h', 'N/A')}")
print(f"24小时涨跌幅: {ticker_data.get('pctChange', 'N/A')}%") # 使用get方法避免KeyError
print("\n完整响应数据:")
pprint(response)
else:
print("❌ 请求成功但返回错误:")
pprint(response)
except Exception as e:
print(f"❌ 请求过程中出错: {e}")
def test_with_api_keys():
"""测试使用API密钥的情况"""
print("\n=== 测试使用API密钥 ===")
# 填写您的API密钥信息
api_key = input("请输入API密钥 (或直接回车跳过): ").strip()
if not api_key:
print("跳过API密钥测试")
return
secret_key = input("请输入Secret Key: ").strip()
passphrase = input("请输入Passphrase: ").strip()
# 初始化带API密钥的客户端
client = OkxRestClient(api_key, secret_key, passphrase)
# 测试获取ETH-USDT价格
try:
pair = "ETH-USDT"
print(f"\n使用API密钥获取 {pair} 价格...")
response = client.marketdata.get_ticker(instId=pair)
if response and response.get('code') == '0' and response.get('data'):
print("✅ 使用API密钥成功获取价格!")
price = float(response['data'][0]['last'])
print(f"{pair} 当前价格: {price} USDT")
else:
print("❌ 使用API密钥请求失败:")
pprint(response)
except Exception as e:
print(f"❌ 使用API密钥请求过程中出错: {e}")
def main():
"""主函数"""
print("OKX Marketdata属性测试")
print("-" * 50)
# 检查SDK版本
try:
import pkg_resources
print(f"SDK版本: {pkg_resources.get_distribution('okx-sdk').version}")
except Exception as e:
print(f"无法获取SDK版本: {e}")
# 测试不带API密钥
test_marketdata()
# 询问是否测试API密钥
print("\n要测试使用API密钥吗?")
choice = input("输入 'y' 进行测试, 或任意键跳过: ")
if choice.lower() == 'y':
test_with_api_keys()
print("\n测试完成!")
if __name__ == "__main__":
main()

40
utils/rss_generator.py Normal file
View file

@ -0,0 +1,40 @@
from datetime import datetime
from typing import Dict, Any, List
import xml.etree.ElementTree as ET
class RSSGenerator:
"""RSS XML生成器"""
def __init__(self, title: str, description: str, link: str, language: str = "zh-cn"):
self.title = title
self.description = description
self.link = link
self.language = language
def generate_rss(self, items: List[Dict[str, Any]]) -> str:
"""生成RSS XML"""
# 创建根元素
rss = ET.Element("rss", version="2.0")
channel = ET.SubElement(rss, "channel")
# 频道信息
ET.SubElement(channel, "title").text = self.title
ET.SubElement(channel, "description").text = self.description
ET.SubElement(channel, "link").text = self.link
ET.SubElement(channel, "language").text = self.language
ET.SubElement(channel, "lastBuildDate").text = datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0800")
# 添加文章项目
for item_data in items:
item = ET.SubElement(channel, "item")
ET.SubElement(item, "title").text = item_data.get("title", "")
ET.SubElement(item, "description").text = item_data.get("description", "")
ET.SubElement(item, "link").text = item_data.get("link", self.link)
ET.SubElement(item, "pubDate").text = item_data.get("pub_date", datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0800"))
if "guid" in item_data:
ET.SubElement(item, "guid").text = item_data["guid"]
# 转换为字符串
xml_str = ET.tostring(rss, encoding='unicode', method='xml')
return f'<?xml version="1.0" encoding="UTF-8"?>\n{xml_str}'