A.前情提要
平时一直在用tg看linuxdo的新帖推送,但是使用tg有时候会有一捏捏不便。
因为我平时就在使用飞书(它可以在任意多终端登录)
So,那就来一个“将Linuxdo最新的话题,推送至飞书群组bot!”
5s盾后,失效的cf-Workers版本
B.实现逻辑
检测以及推送新话题大致逻辑:
- 使用大善人的Worker+D1实现,无服务器
- 依赖:获取 LINUX DO - 最新话题 ,帖子解析为json
- 解决订阅认证: 论坛rss订阅401的解决方案 - 软件分享 - LINUX DO
- 数据库:话题已发送记录表
- 飞书卡片:100次/分 5次/秒(够用)
- 通过数据库检查是否已发送,未发送过则触发发送每分钟获取、检查、推送
C.实现过程
请翻看本帖历史记录~
D.实现代码
- worker.js如下:
const RSS_URL = 'https://linux.do/latest.rss';
const FEISHU_WEBHOOK_URL = 'https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx';//飞书群机器人Webhook 地址
const COOKIE = '_ga=xxxx; _t=xxxx; _forum_session=xxxx; _ga_xxx=xxxx';//你的linuxdo_cookie~
export default {
async scheduled(event, env, ctx) {
try {
const db = env.linuxdo_whfs;
const rssResponse = await handleRequest(new Request(RSS_URL));
const rssText = await rssResponse.text();
const items = parseRSS(rssText);
for (let item of items) {
const alreadySent = await checkIfSent(db, item.Link.url);
if (!alreadySent) {
await postToFeishu(item);
await recordSentUrl(db, item.Link.url);
}
}
} catch (error) {
console.error('Error in handleScheduledEvent:', error);
}
},
};
function parseRSS(rssText) {
const itemRegex = /<item>(.*?)<\/item>/gs;
let match, posts = [];
while ((match = itemRegex.exec(rssText)) && posts.length < 30) {
const itemContent = match[1];
const title = itemContent.match(/<title>(.*?)<\/title>/s)[1];
const creator = itemContent.match(/<dc:creator><!\[CDATA\[(.*?)\]\]><\/dc:creator>/s)[1];
const link = itemContent.match(/<link>(.*?)<\/link>/s)[1];
const description = itemContent.match(/<description><!\[CDATA\[(.*?)\]\]><\/description>/s)[1]
.replace(/ {2,}/g, ' ')
.replace(/<p><small>.*?">阅读完整话题<\/a><\/p>/s, '')
.replace(/<[^>]*>/g, '')
.replace(/^\s*[\r\n]/gm, '')
.split('\n')
.slice(0, 10)
.join('\n');
const category = itemContent.match(/<category>(.*?)<\/category>/s)[1];
const pubDate = itemContent.match(/<pubDate>(.*?)<\/pubDate>/s)[1];
posts.push({
Title: title,
Author: creator,
Description: description,
Category: category,
Time: new Date(pubDate).toUTCString(),
Link: { pc_url: "", android_url: "", ios_url: "", url: link }
});
}
return posts;
}
//其中的template是我做的飞书卡片模板,如果不喜欢可以自己制作一个~替换掉template_id和version即可~
async function postToFeishu(post) {
await fetch(FEISHU_WEBHOOK_URL, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
msg_type: "interactive",
card: { type: "template", data: { template_id: "AAq3OTs48BODR", template_version_name: "1.0.2", template_variable: post } }
})
});
}
async function handleRequest(request) {
const modifiedRequest = new Request(RSS_URL, {
headers: { ...request.headers, 'Cookie': COOKIE },
method: request.method,
body: request.body,
redirect: 'follow'
});
const response = await fetch(modifiedRequest);
return new Response(response.body, { ...response, headers: { ...response.headers, 'Access-Control-Allow-Origin': '*' } });
}
async function checkIfSent(db, url) {
const query = 'SELECT 1 FROM topics WHERE topic = ?';
const result = await db.prepare(query).bind(url).first();
return result ? true : false;
}
async function recordSentUrl(db, url) {
const query = 'INSERT INTO topics (topic) VALUES (?)';
await db.prepare(query).bind(url).run();
}
export { handleRequest };
E.配置Worker
-
创建D1数据库
Workers & Pages → D1 → Create database → Dashboard
数据库名称: linuxdo_whfs(随意即可)
Create table
创建表:
表名称: topics
列名称: topic
类型 --: text
-
绑定D1数据库
Workers & Pages → 你的worker → Settings → Variables → D1 Database Bindings
-
设置定时触发
Workers & Pages → 你的worker → Settings → Triggers → Cron Triggers
2024年5月25日 更新Python版本
DDOS事件开盾后,历史版本无法使用(LINUX DO - 最新话题 403了)
现在,你需要在你的服务器上执行它了!
我只写了主干部分
具体如何在服务器上跑?
安装python3.x
安装依赖 requests 、 feedparser 、beautifulsoup4,缺啥就装啥
然后你可以写一个定时任务,5分钟一次或10分钟一次等等~
import requests
import feedparser
import time
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import sqlite3
import logging
# 设置日志
logging.basicConfig(filename='error.log', level=logging.ERROR,
format='%(asctime)s:%(levelname)s:%(message)s')
# 飞书Webhook URL
FEISHU_WEBHOOK_URL = 'https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxx'
def fetch_rss_feed(url, retries=2, timeout=10):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.198 Safari/537.36'
}
attempt = 0
while attempt <= retries:
try:
response = requests.get(url, headers=headers, timeout=timeout)
if response.status_code == 200:
return response.content
else:
raise Exception(f"获取RSS源失败: {response.status_code}")
except requests.exceptions.Timeout:
print(f"请求超时,正在重试... ({attempt+1}/{retries+1})")
attempt += 1
if attempt > retries:
raise Exception("达到最大重试次数,获取RSS源失败。")
time.sleep(2)
def post_to_feishu(post):
headers = {'Content-Type': 'application/json'}
data = {
"msg_type": "interactive",
"card": {
"type": "template",
"data": {
"template_id": "AAq3OTs48BODR",
"template_version_name": "1.0.2",
"template_variable": post
}
}
}
response = requests.post(FEISHU_WEBHOOK_URL, headers=headers, json=data)
if response.status_code == 200:
json_response = response.json()
if json_response.get('code') == 0:
print('Successfully posted to Feishu.')
else:
logging.error(f'发送到飞书失败。错误码: {json_response.get("code")}, 错误信息: {json_response.get("msg")}')
else:
logging.error(f'HTTP请求失败。状态码: {response.status_code}, 响应: {response.text}')
def init_db():
conn = sqlite3.connect('rss_posts.db')
c = conn.cursor()
c.execute('CREATE TABLE IF NOT EXISTS posts (link TEXT PRIMARY KEY)')
conn.commit()
return conn
def main():
conn = init_db()
url = 'https://linux.do/latest.rss'
try:
rss_content = fetch_rss_feed(url)
feed = feedparser.parse(rss_content)
if feed.bozo:
print("解析Feed出错")
else:
for entry in feed.entries:
if not conn.execute("SELECT 1 FROM posts WHERE link = ?", (entry.link,)).fetchone():
soup = BeautifulSoup(entry.summary, "html.parser")
for tag in soup.find_all(["small", "a"]):
tag.decompose()
text = soup.get_text()
text = "\n".join(text.splitlines()[:6])
time_utc8 = datetime.strptime(entry.published, "%a, %d %b %Y %H:%M:%S %z") + timedelta(hours=8)
time_str = time_utc8.strftime("%Y-%m-%d %H:%M:%S")
post_data = {
"Title": entry.title,
"Author": entry.get('author', 'N/A'),
"Description": text,
"Category": entry.get('category', 'N/A'),
"Time": time_str,
"Link": {"pc_url": "", "android_url": "", "ios_url": "", "url": entry.link}
}
post_to_feishu(post_data)
conn.execute("INSERT INTO posts (link) VALUES (?)", (entry.link,))
conn.commit()
print(f"已发送到飞书: {post_data}")
else:
print("该RSS帖子已发送过,跳过。")
except Exception as e:
logging.error(e)
finally:
conn.close()
if __name__ == "__main__":
main()








