Linuxdo新话题推送到tg?不!推送飞书!!更新Python版本!

A.前情提要

平时一直在用tg看linuxdo的新帖推送,但是使用tg有时候会有一捏捏不便。
因为我平时就在使用飞书(它可以在任意多终端登录)
So,那就来一个“将Linuxdo最新的话题,推送至飞书群组bot!

5s盾后,失效的cf-Workers版本

B.实现逻辑

检测以及推送新话题大致逻辑:

C.实现过程

请翻看本帖历史记录~

D.实现代码

  • worker.js如下:
const RSS_URL = 'https://linux.do/latest.rss';
const FEISHU_WEBHOOK_URL = 'https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx';//飞书群机器人Webhook 地址
const COOKIE = '_ga=xxxx; _t=xxxx; _forum_session=xxxx; _ga_xxx=xxxx';//你的linuxdo_cookie~

export default {
  async scheduled(event, env, ctx) {
    try {
      const db = env.linuxdo_whfs;
      const rssResponse = await handleRequest(new Request(RSS_URL));
      const rssText = await rssResponse.text();
      const items = parseRSS(rssText);
      for (let item of items) {
        const alreadySent = await checkIfSent(db, item.Link.url);
        if (!alreadySent) {
          await postToFeishu(item);
          await recordSentUrl(db, item.Link.url);
        }
      }
    } catch (error) {
      console.error('Error in handleScheduledEvent:', error);
    }
  },
};

function parseRSS(rssText) {
  const itemRegex = /<item>(.*?)<\/item>/gs;
  let match, posts = [];
  while ((match = itemRegex.exec(rssText)) && posts.length < 30) {
    const itemContent = match[1];
    const title = itemContent.match(/<title>(.*?)<\/title>/s)[1];
    const creator = itemContent.match(/<dc:creator><!\[CDATA\[(.*?)\]\]><\/dc:creator>/s)[1];
    const link = itemContent.match(/<link>(.*?)<\/link>/s)[1];
    const description = itemContent.match(/<description><!\[CDATA\[(.*?)\]\]><\/description>/s)[1]
      .replace(/ {2,}/g, ' ')
      .replace(/<p><small>.*?">阅读完整话题<\/a><\/p>/s, '')
      .replace(/<[^>]*>/g, '')
      .replace(/^\s*[\r\n]/gm, '')
      .split('\n')
      .slice(0, 10)
      .join('\n');
    const category = itemContent.match(/<category>(.*?)<\/category>/s)[1];
    const pubDate = itemContent.match(/<pubDate>(.*?)<\/pubDate>/s)[1];

    posts.push({
      Title: title,
      Author: creator,
      Description: description,
      Category: category,
      Time: new Date(pubDate).toUTCString(),
      Link: { pc_url: "", android_url: "", ios_url: "", url: link }
    });
  }
  return posts;
}

//其中的template是我做的飞书卡片模板,如果不喜欢可以自己制作一个~替换掉template_id和version即可~
async function postToFeishu(post) {
  await fetch(FEISHU_WEBHOOK_URL, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      msg_type: "interactive",
      card: { type: "template", data: { template_id: "AAq3OTs48BODR", template_version_name: "1.0.2", template_variable: post } }
    })
  });
}

async function handleRequest(request) {
  const modifiedRequest = new Request(RSS_URL, {
    headers: { ...request.headers, 'Cookie': COOKIE },
    method: request.method,
    body: request.body,
    redirect: 'follow'
  });
  const response = await fetch(modifiedRequest);
  return new Response(response.body, { ...response, headers: { ...response.headers, 'Access-Control-Allow-Origin': '*' } });
}

async function checkIfSent(db, url) {
  const query = 'SELECT 1 FROM topics WHERE topic = ?';
  const result = await db.prepare(query).bind(url).first();
  return result ? true : false;
}

async function recordSentUrl(db, url) {
  const query = 'INSERT INTO topics (topic) VALUES (?)';
  await db.prepare(query).bind(url).run();
}

export { handleRequest };


E.配置Worker

  • 创建D1数据库
    Workers & Pages → D1 → Create database → Dashboard
    数据库名称: linuxdo_whfs(随意即可)

    Create table
    创建表:
    表名称: topics
    列名称: topic
    类型 --: text

  • 绑定D1数据库
    Workers & Pages → 你的worker → Settings → Variables → D1 Database Bindings

  • 设置定时触发
    Workers & Pages → 你的worker → Settings → Triggers → Cron Triggers

2024年5月25日 更新Python版本

DDOS事件开盾后,历史版本无法使用(LINUX DO - 最新话题 403了)
现在,你需要在你的服务器上执行它了!
我只写了主干部分
具体如何在服务器上跑?
安装python3.x
安装依赖 requestsfeedparserbeautifulsoup4,缺啥就装啥
然后你可以写一个定时任务,5分钟一次或10分钟一次等等~

import requests
import feedparser
import time
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import sqlite3
import logging

# 设置日志
logging.basicConfig(filename='error.log', level=logging.ERROR,
                    format='%(asctime)s:%(levelname)s:%(message)s')

# 飞书Webhook URL
FEISHU_WEBHOOK_URL = 'https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxx'

def fetch_rss_feed(url, retries=2, timeout=10):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.198 Safari/537.36'
    }
    attempt = 0

    
    while attempt <= retries:
        try:
            response = requests.get(url, headers=headers, timeout=timeout)
            if response.status_code == 200:
                return response.content
            else:
                raise Exception(f"获取RSS源失败: {response.status_code}")
        except requests.exceptions.Timeout:
            print(f"请求超时,正在重试... ({attempt+1}/{retries+1})")
            attempt += 1
            if attempt > retries:
                raise Exception("达到最大重试次数,获取RSS源失败。")
            time.sleep(2)

def post_to_feishu(post):
    headers = {'Content-Type': 'application/json'}
    data = {
        "msg_type": "interactive",
        "card": {
            "type": "template",
            "data": {
                "template_id": "AAq3OTs48BODR",
                "template_version_name": "1.0.2",
                "template_variable": post
            }
        }
    }
    response = requests.post(FEISHU_WEBHOOK_URL, headers=headers, json=data)
    if response.status_code == 200:
        json_response = response.json()
        if json_response.get('code') == 0:
            print('Successfully posted to Feishu.')
        else:
            logging.error(f'发送到飞书失败。错误码: {json_response.get("code")}, 错误信息: {json_response.get("msg")}')
    else:
        logging.error(f'HTTP请求失败。状态码: {response.status_code}, 响应: {response.text}')

def init_db():
    conn = sqlite3.connect('rss_posts.db')
    c = conn.cursor()
    c.execute('CREATE TABLE IF NOT EXISTS posts (link TEXT PRIMARY KEY)')
    conn.commit()
    return conn

def main():
    conn = init_db()
    url = 'https://linux.do/latest.rss'
    try:
        rss_content = fetch_rss_feed(url)
        feed = feedparser.parse(rss_content)
        if feed.bozo:
            print("解析Feed出错")
        else:
            for entry in feed.entries:
                if not conn.execute("SELECT 1 FROM posts WHERE link = ?", (entry.link,)).fetchone():
                    soup = BeautifulSoup(entry.summary, "html.parser")
                    for tag in soup.find_all(["small", "a"]):
                        tag.decompose()
                    text = soup.get_text()
                    text = "\n".join(text.splitlines()[:6])
                    time_utc8 = datetime.strptime(entry.published, "%a, %d %b %Y %H:%M:%S %z") + timedelta(hours=8)
                    time_str = time_utc8.strftime("%Y-%m-%d %H:%M:%S")
                    post_data = {
                        "Title": entry.title,
                        "Author": entry.get('author', 'N/A'),
                        "Description": text,
                        "Category": entry.get('category', 'N/A'),
                        "Time": time_str,
                        "Link": {"pc_url": "", "android_url": "", "ios_url": "", "url": entry.link}
                    }
                    post_to_feishu(post_data)
                    conn.execute("INSERT INTO posts (link) VALUES (?)", (entry.link,))
                    conn.commit()
                    print(f"已发送到飞书: {post_data}")
                else:
                    print("该RSS帖子已发送过,跳过。")
    except Exception as e:
        logging.error(e)
    finally:
        conn.close()

if __name__ == "__main__":
    main()

如果没错的话,现在你可以开始享用了~

13 个赞

加油~

3 个赞

加油~~

3 个赞

哇~加油

1 个赞

tg的在哪里使用捏

1 个赞

好强

1 个赞

我加了这两个

1 个赞

谢谢

1 个赞

我是微信,每小时收一次,把这一小时内的新帖汇总成一条消息

3 个赞

我是rss收录的所有主题,就算被删除的也能自动收录进来 :crazy_face:
手机就下个RSS阅读器就好了 :crazy_face:

1 个赞

都好会玩啊 :saluting_face:

1 个赞

出个教程吧哥

1 个赞

哇哈哈哈哈,我所有设备都登陆着飞书,所以想一个应用,多类消息直接全包

1 个赞

出个教程吧

终于有时间实现完成啦~~~~

完美,推送一致~

好诶!(不过还是TG常用)

哇哈哈哈哈哈,因为我是用飞书办公的缘故 :rofl:办公摸鱼两不误

O-O这样呀

:racehorse:一下