From b18805aa26c2c7182b965f1542b90abd5e34a5a1 Mon Sep 17 00:00:00 2001 From: Coldin04 Date: Wed, 13 Aug 2025 18:49:19 +0800 Subject: [PATCH] first code --- .gitignore | 33 +++++++++ README.md | 138 +++++++++++++++++++++++++++++++++++++ app.py | 89 ++++++++++++++++++++++++ handlers/base_handler.py | 15 ++++ handlers/crypto_handler.py | 50 ++++++++++++++ handlers/registry.py | 30 ++++++++ requirements.txt | 5 ++ services/base_service.py | 38 ++++++++++ services/crypto_service.py | 47 +++++++++++++ test.py | 127 ++++++++++++++++++++++++++++++++++ utils/rss_generator.py | 40 +++++++++++ 11 files changed, 612 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 app.py create mode 100644 handlers/base_handler.py create mode 100644 handlers/crypto_handler.py create mode 100644 handlers/registry.py create mode 100644 requirements.txt create mode 100644 services/base_service.py create mode 100644 services/crypto_service.py create mode 100644 test.py create mode 100644 utils/rss_generator.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..06b4bb4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +*.pyc + +# 虚拟环境 +venv/ +env/ +ENV/ +.venv/ +.ENV/ + +# 日志和临时文件 +*.log +*.tmp +*.swp + +# VSCode +.vscode/ + +# 系统文件 +.DS_Store +Thumbs.db + +# 配置和密钥(如有) +*.env +config.py + +# pip 缓存 +pip-wheel-metadata/ +*.egg-info/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..38d6f80 --- /dev/null +++ b/README.md @@ -0,0 +1,138 @@ +# RSS Plugin for Quote/0 + + + +## 项目简介 + +RSS Plugin for Quote/0 是一个基于 Python Flask 的 RSS API 服务,致力于为 [quote/0 墨水屏摆件](https://dot.mindreset.tech/) 拓展更多场景的数据接口。项目目前已支持加密货币行情(如 ETH-USDC),未来可扩展服务器监控、天气、资讯等多种 RSS 数据源,助力墨水屏设备实现更多创意玩法。 + +> 了解 quote/0 墨水屏摆件的更多玩法与社区创意,推荐阅读:[少数派专栏介绍](https://sspai.com/post/100304) + +--- + +## ⚠️ 免责声明 + +本项目部分或全部代码由 [GitHub Copilot](https://github.com/features/copilot) AI 辅助生成,仅供学习与交流使用。请勿直接用于生产环境或商业用途,使用前请充分审核和测试。 + +[![GitHub Copilot enabled](https://img.shields.io/badge/Copilot-Enabled-4B8BBE?logo=github&logoColor=white)](https://github.com/features/copilot) +[![Powered by AI](https://img.shields.io/badge/AI%20Generated-Yes-blueviolet)](https://github.com/features/copilot) + +--- + +## 主要功能 + +- **REST API 服务**:输出标准 RSS 格式数据,适配墨水屏等低功耗设备 +- **OKX 实时行情**:通过 OKX 官方 SDK 获取 ETH-USDC 最新价格 +- **30 秒缓存机制**:减少外部 API 请求,提升响应速度 +- **可扩展架构**:支持未来扩展更多数据源或监控功能 +- **健壮的错误处理与日志** + +--- + +## 技术栈 + +- Python 3.x +- Flask +- okx-sdk +- requests +- websockets +- XML 生成 + +--- + +## 目录结构 + +``` +quote-rss-show/ +├── app.py # 主 Flask 应用 +├── requirements.txt # 项目依赖 +├── config.py # 配置文件(建议用环境变量管理敏感信息) +├── services/ # 服务层 +│ ├── base_service.py +│ └── crypto_service.py +├── handlers/ # RSS 内容处理器 +│ ├── base_handler.py +│ ├── crypto_handler.py +│ └── registry.py +└── utils/ # 工具类 + └── rss_generator.py +``` + +--- + +## 快速开始 + +### 1. 克隆项目 + +```bash +git clone +cd quote-rss-show +``` + +### 2. 创建虚拟环境并安装依赖 + +```bash +python -m venv venv +# Windows +venv\Scripts\activate +# Linux/Mac +source venv/bin/activate + +pip install -r requirements.txt +``` + +### 3. 配置 OKX API 密钥 + +建议通过环境变量配置密钥,或在 `config.py` 中填写(不推荐将密钥提交到仓库)。 + +### 4. 启动服务(开发模式) + +```bash +python app.py +``` + +访问: +- [http://127.0.0.1:5000/](http://127.0.0.1:5000/) 查看 API 文档 +- [http://127.0.0.1:5000/rss/crypto?pair=ETH-USDC](http://127.0.0.1:5000/rss/crypto?pair=ETH-USDC) 获取 ETH-USDC 价格 RSS + +--- + +## 生产部署建议 + +- 使用 Gunicorn 等 WSGI 服务部署(如:`gunicorn -w 4 -b 0.0.0.0:5000 app:app`) +- 推荐用 Supervisor 或 systemd 守护进程 +- 前置 Nginx 反向代理,支持 HTTPS +- 关闭 Flask debug,做好日志与监控 +- 仅开放必要端口,定期更新依赖 + +--- + +## API 端点 + +- `GET /rss/crypto?pair=ETH-USDC` + 返回 ETH-USDC 价格的 RSS XML + +- `GET /` + API 文档页面 + +- `GET /health` + 健康检查 + +--- + +## 相关链接 + +- [quote/0 墨水屏摆件官网](https://dot.mindreset.tech/) +- [少数派专栏介绍](https://sspai.com/post/100304) + +--- + +## LICENSE + +MIT + +--- + +## 致谢 + +- [OKX 官方 Python SDK](https://github.com/burakoner/okx-sdk) diff --git a/app.py b/app.py new file mode 100644 index 0000000..6b13db6 --- /dev/null +++ b/app.py @@ -0,0 +1,89 @@ +from flask import Flask, request, Response +import logging +from config import config +from handlers.registry import HandlerRegistry +from utils.rss_generator import RSSGenerator + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def create_app(config_name='default'): + app = Flask(__name__) + app.config.from_object(config[config_name]) + + # 初始化处理器注册中心 + handler_registry = HandlerRegistry() + + # 初始化RSS生成器 + rss_generator = RSSGenerator( + title=app.config['RSS_TITLE'], + description=app.config['RSS_DESCRIPTION'], + link=app.config['RSS_LINK'], + language=app.config['RSS_LANGUAGE'] + ) + + @app.route('/') + def index(): + """API文档页面""" + available_handlers = handler_registry.list_handlers() + docs = f""" +

Crypto RSS Service

+

可用的RSS订阅源:

+ +

使用说明:

+

将RSS链接粘贴到您的墨水屏RSS阅读器中即可。

+ """ + return docs + + @app.route('/rss/') + def rss_feed(handler_name): + """RSS订阅源端点""" + handler = handler_registry.get_handler(handler_name) + + if not handler: + return Response("Handler not found", status=404) + + try: + # 获取查询参数 + request_params = request.args.to_dict() + + # 让处理器处理请求 + items = handler.handle(**request_params) + + # 生成RSS XML + rss_xml = rss_generator.generate_rss(items) + + return Response(rss_xml, mimetype='application/rss+xml') + + except Exception as e: + logger.error(f"生成RSS时出错: {e}") + error_items = [{ + "title": "服务暂时不可用", + "description": f"生成RSS时出现错误: {str(e)}", + "link": app.config['RSS_LINK'] + }] + rss_xml = rss_generator.generate_rss(error_items) + return Response(rss_xml, mimetype='application/rss+xml', status=500) + + @app.route('/health') + def health_check(): + """健康检查端点""" + return {"status": "healthy", "available_handlers": handler_registry.list_handlers()} + + return app + +if __name__ == '__main__': + app = create_app('development') + app.run(host='0.0.0.0', port=5000, debug=True) \ No newline at end of file diff --git a/handlers/base_handler.py b/handlers/base_handler.py new file mode 100644 index 0000000..17a1966 --- /dev/null +++ b/handlers/base_handler.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any, List + +class BaseHandler(ABC): + """基础处理器类""" + + @abstractmethod + def handle(self, **kwargs) -> List[Dict[str, Any]]: + """处理请求并返回RSS项目列表""" + pass + + @abstractmethod + def get_handler_name(self) -> str: + """返回处理器名称""" + pass \ No newline at end of file diff --git a/handlers/crypto_handler.py b/handlers/crypto_handler.py new file mode 100644 index 0000000..f2b3f0c --- /dev/null +++ b/handlers/crypto_handler.py @@ -0,0 +1,50 @@ +from typing import Dict, Any, List +from datetime import datetime +from .base_handler import BaseHandler +from services.crypto_service import CryptoService + +class CryptoHandler(BaseHandler): + """加密货币价格处理器""" + + def __init__(self, crypto_service: CryptoService): + self.crypto_service = crypto_service + + def handle(self, pair: str = "ETH-USDT", **kwargs) -> List[Dict[str, Any]]: + """处理加密货币价格请求""" + data = self.crypto_service.get_cached_data(f"{pair.lower()}_price", pair=pair) + + if not data: + return [{ + "title": f"{pair} 价格获取失败", + "description": "无法获取当前价格,请稍后重试", + "link": "https://www.okx.com", + "pub_date": datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0800"), + "guid": f"{pair}-error-{int(datetime.now().timestamp())}" + }] + + # 格式化价格信息 + price = data['price'] + change_24h = data['change_24h'] + change_symbol = "📈" if change_24h >= 0 else "📉" + + title = f"{pair.replace('-', '/')} 当前价格: ${price:,.4f}" + + description = f""" + 🔸 当前价格: ${price:,.4f} + 🔸 24小时涨跌: {change_symbol} {change_24h:+.2f}% + 🔸 24小时最高: ${data['high_24h']:,.4f} + 🔸 24小时最低: ${data['low_24h']:,.4f} + 🔸 24小时成交量: {data['volume_24h']:,.2f} + 🔸 更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} + """.strip() + + return [{ + "title": title, + "description": description, + "link": f"https://www.okx.com/trade-spot/{pair.lower()}", + "pub_date": datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0800"), + "guid": f"{pair}-{int(datetime.now().timestamp())}" + }] + + def get_handler_name(self) -> str: + return "crypto" \ No newline at end of file diff --git a/handlers/registry.py b/handlers/registry.py new file mode 100644 index 0000000..c8d5e60 --- /dev/null +++ b/handlers/registry.py @@ -0,0 +1,30 @@ +from typing import Dict,List +from .base_handler import BaseHandler +from .crypto_handler import CryptoHandler +from services.crypto_service import CryptoService + +class HandlerRegistry: + """处理器注册中心""" + + def __init__(self): + self.handlers: Dict[str, BaseHandler] = {} + self._register_default_handlers() + + def _register_default_handlers(self): + """注册默认处理器""" + # 注册加密货币处理器 + crypto_service = CryptoService() + crypto_handler = CryptoHandler(crypto_service) + self.register_handler(crypto_handler) + + def register_handler(self, handler: BaseHandler): + """注册处理器""" + self.handlers[handler.get_handler_name()] = handler + + def get_handler(self, handler_name: str) -> BaseHandler: + """获取处理器""" + return self.handlers.get(handler_name) + + def list_handlers(self) -> List[str]: + """列出所有可用的处理器""" + return list(self.handlers.keys()) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..db82630 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +Flask +okx-sdk +requests +websockets +Werkzeug==2.3.7 \ No newline at end of file diff --git a/services/base_service.py b/services/base_service.py new file mode 100644 index 0000000..f458716 --- /dev/null +++ b/services/base_service.py @@ -0,0 +1,38 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional +from datetime import datetime, timedelta + +class BaseService(ABC): + """基础服务类,为扩展提供统一接口""" + + def __init__(self): + self.cache = {} + self.cache_timeout = timedelta(seconds=30) + + @abstractmethod + def get_data(self, **kwargs) -> Optional[Dict[str, Any]]: + """获取数据的抽象方法""" + pass + + def get_cached_data(self, cache_key: str, **kwargs) -> Optional[Dict[str, Any]]: + """带缓存的数据获取""" + # 检查缓存 + if cache_key in self.cache: + cached_item = self.cache[cache_key] + if datetime.now() - cached_item['timestamp'] < self.cache_timeout: + return cached_item['data'] + + # 获取新数据 + data = self.get_data(**kwargs) + if data: + # 更新缓存 + self.cache[cache_key] = { + 'data': data, + 'timestamp': datetime.now() + } + + return data + + def clear_cache(self): + """清理缓存""" + self.cache.clear() \ No newline at end of file diff --git a/services/crypto_service.py b/services/crypto_service.py new file mode 100644 index 0000000..e85cb06 --- /dev/null +++ b/services/crypto_service.py @@ -0,0 +1,47 @@ +from typing import Dict, Any, Optional +import logging +from okx import OkxRestClient +from .base_service import BaseService + +logger = logging.getLogger(__name__) + +class CryptoService(BaseService): + """加密货币价格服务""" + + def __init__(self, api_key: str = "", secret_key: str = "", passphrase: str = ""): + super().__init__() + # 对于公共数据,不需要API密钥 + self.client = OkxRestClient(api_key, secret_key, passphrase) if api_key else OkxRestClient() + + def get_data(self, pair: str = "ETH-USDT") -> Optional[Dict[str, Any]]: + """获取指定交易对的价格数据""" + try: + # 使用正确的marketdata属性获取ticker数据 + response = self.client.marketdata.get_ticker(instId=pair) + + if response and response.get('code') == '0' and response.get('data'): + ticker_data = response['data'][0] + + # 手动计算24小时百分比变化 + last_price = float(ticker_data['last']) + open_price = float(ticker_data['open24h']) + pct_change = ((last_price - open_price) / open_price) * 100 + + # 格式化数据 + return { + 'pair': pair, + 'price': last_price, + 'change_24h': round(pct_change, 2), # 保留两位小数 + 'volume_24h': float(ticker_data.get('vol24h', 0)), + 'high_24h': float(ticker_data.get('high24h', 0)), + 'low_24h': float(ticker_data.get('low24h', 0)), + 'timestamp': ticker_data['ts'] + } + else: + logger.error(f"获取{pair}价格失败: {response}") + return None + + except Exception as e: + logger.error(f"获取加密货币价格时出错: {e}") + return None + diff --git a/test.py b/test.py new file mode 100644 index 0000000..33661d0 --- /dev/null +++ b/test.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +OKX marketdata属性测试脚本 +用于验证通过marketdata属性获取价格数据 +""" + +import sys +from pprint import pprint + +# 导入必要的库 +try: + from okx import OkxRestClient + print("成功导入 okx.OkxRestClient") +except ImportError: + print("错误: 无法导入OKX SDK, 请安装: pip install okx-sdk") + sys.exit(1) + +def test_marketdata(): + """测试通过marketdata属性获取价格""" + print("\n=== 测试 marketdata.get_ticker 方法 ===") + + # 初始化客户端 (不使用API密钥) + client = OkxRestClient() + + # 检查是否有marketdata属性 + if not hasattr(client, 'marketdata'): + print("❌ 错误: client对象没有marketdata属性") + print(f"可用属性: {', '.join([a for a in dir(client) if not a.startswith('_')])}") + return + + print("✅ 找到marketdata属性") + + # 检查marketdata对象是否有get_ticker方法 + marketdata = client.marketdata + if not hasattr(marketdata, 'get_ticker'): + print("❌ 错误: marketdata对象没有get_ticker方法") + print(f"可用方法: {', '.join([a for a in dir(marketdata) if not a.startswith('_')])}") + return + + print("✅ 找到get_ticker方法") + + # 测试获取ETH-USDT价格 + try: + pair = "ETH-USDT" + print(f"\n获取 {pair} 价格...") + response = client.marketdata.get_ticker(instId=pair) + + print(f"响应状态码: {response.get('code')}") + if response and response.get('code') == '0' and response.get('data'): + print("✅ 成功获取价格数据!") + + ticker_data = response['data'][0] + price = float(ticker_data['last']) + + print(f"\n{pair} 当前价格: {price} USDT") + print(f"24小时最高价: {ticker_data.get('high24h', 'N/A')}") + print(f"24小时最低价: {ticker_data.get('low24h', 'N/A')}") + print(f"24小时涨跌幅: {ticker_data.get('pctChange', 'N/A')}%") # 使用get方法避免KeyError + + print("\n完整响应数据:") + pprint(response) + else: + print("❌ 请求成功但返回错误:") + pprint(response) + except Exception as e: + print(f"❌ 请求过程中出错: {e}") + +def test_with_api_keys(): + """测试使用API密钥的情况""" + print("\n=== 测试使用API密钥 ===") + + # 填写您的API密钥信息 + api_key = input("请输入API密钥 (或直接回车跳过): ").strip() + if not api_key: + print("跳过API密钥测试") + return + + secret_key = input("请输入Secret Key: ").strip() + passphrase = input("请输入Passphrase: ").strip() + + # 初始化带API密钥的客户端 + client = OkxRestClient(api_key, secret_key, passphrase) + + # 测试获取ETH-USDT价格 + try: + pair = "ETH-USDT" + print(f"\n使用API密钥获取 {pair} 价格...") + response = client.marketdata.get_ticker(instId=pair) + + if response and response.get('code') == '0' and response.get('data'): + print("✅ 使用API密钥成功获取价格!") + price = float(response['data'][0]['last']) + print(f"{pair} 当前价格: {price} USDT") + else: + print("❌ 使用API密钥请求失败:") + pprint(response) + except Exception as e: + print(f"❌ 使用API密钥请求过程中出错: {e}") + +def main(): + """主函数""" + print("OKX Marketdata属性测试") + print("-" * 50) + + # 检查SDK版本 + try: + import pkg_resources + print(f"SDK版本: {pkg_resources.get_distribution('okx-sdk').version}") + except Exception as e: + print(f"无法获取SDK版本: {e}") + + # 测试不带API密钥 + test_marketdata() + + # 询问是否测试API密钥 + print("\n要测试使用API密钥吗?") + choice = input("输入 'y' 进行测试, 或任意键跳过: ") + + if choice.lower() == 'y': + test_with_api_keys() + + print("\n测试完成!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/utils/rss_generator.py b/utils/rss_generator.py new file mode 100644 index 0000000..bb76c52 --- /dev/null +++ b/utils/rss_generator.py @@ -0,0 +1,40 @@ +from datetime import datetime +from typing import Dict, Any, List +import xml.etree.ElementTree as ET + +class RSSGenerator: + """RSS XML生成器""" + + def __init__(self, title: str, description: str, link: str, language: str = "zh-cn"): + self.title = title + self.description = description + self.link = link + self.language = language + + def generate_rss(self, items: List[Dict[str, Any]]) -> str: + """生成RSS XML""" + # 创建根元素 + rss = ET.Element("rss", version="2.0") + channel = ET.SubElement(rss, "channel") + + # 频道信息 + ET.SubElement(channel, "title").text = self.title + ET.SubElement(channel, "description").text = self.description + ET.SubElement(channel, "link").text = self.link + ET.SubElement(channel, "language").text = self.language + ET.SubElement(channel, "lastBuildDate").text = datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0800") + + # 添加文章项目 + for item_data in items: + item = ET.SubElement(channel, "item") + ET.SubElement(item, "title").text = item_data.get("title", "") + ET.SubElement(item, "description").text = item_data.get("description", "") + ET.SubElement(item, "link").text = item_data.get("link", self.link) + ET.SubElement(item, "pubDate").text = item_data.get("pub_date", datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0800")) + + if "guid" in item_data: + ET.SubElement(item, "guid").text = item_data["guid"] + + # 转换为字符串 + xml_str = ET.tostring(rss, encoding='unicode', method='xml') + return f'\n{xml_str}' \ No newline at end of file