serv00保活 ws连接+crontab 保活 本地/远端

项目思路总领
在serv00创建ws进程,客户端与服务器实现ws连接,这是一个双向的连接!
当服务器的ws进程被干掉了,我作为客户端就被会告知,这时客户端将主动去ssh连接,先检查监控process是否存在,不存在,则运行command和增加crontab。就这样~

预警:
过程示例是ws连接,而不是wss(类似https),所以我觉得设置token似乎也没用,就没设置,懂的大佬可以帮我解答下。拜谢~~
ws连接成功后,每隔50s会对服务器进行一次ping操作,进行ws保活,如果该时间会导致ws断开,请手动修改py代码中的时间。

telegram推送如下

第一步:创建端口:随意创建就好

第二步:创建ws进程:ssh连接服务器和一些简单的代码

2.1 安装依赖库(位置随意,当前位置就ok)

npm install ws

2.2 初始化

mkdir websocket-server
cd websocket-server
npm init -y

2.3 服务器配置文件

vim server.js

输入下面的代码.
注:port是你前面注册的端口号。

 const WebSocket = require('ws');
// 创建 WebSocket 服务器实例
 const wss = new WebSocket.Server({ port: 8080 });
 // 监听客户端连接事件
 wss.on('connection', function connection(ws) { 
        console.log('客户已连接');
     // 连接关闭事件
        ws.on('close', function() {
                console.log('客户已断开连接');
                     });
  });
  console.log('WebSocket服务器在 ws://localhost:8080 上运行');


2.4 启动ws服务器端
简单启动如下:

node server.js

image

通过 sh restart.sh启动
在server.js同级目录新建restart.sh,并复制粘贴下面代码(可选),

WORK_DIR="/home/用户名/websocket-server"

if ! pgrep -f "node server.js" > /dev/null
         then
         cd "$WORK_DIR"
         nohup node server.js > /dev/null 2>&1 & echo "Restarted server.js at $(date)" >> "$WORK_DIR/restart_log.txt"
         fi

|
|
|分割线
|
|

第三步:配置客户端:主要就是一个py脚本。
参数说明
1.telegram选填。

2.url格式:你需要监听的域名

ws://你的项目域名:创建的端口号

3.process: 建议node server.js
ws连接断开,客户端连接ssh进行识别判断客户端进程是否存在。
process作为一个监控进程,也可能因为网络问题连接断开,重启断开(crontab可以启动),所以需要判断一下。
如图(填写node server.js即可)
image

4.command: 命令1;命令2;命令3
各种命令是你需要保活的进程启动命令。示例:启动ws服务器进程

~/websocket-server/restart.sh

5.crontab_tasks:
显然是在你某一次失活的时候,自动为你的服务器加上的crontab_tasks。
自由发挥:以websocket服务器进程示例

@reboot ~/websocket-server/restart.sh
  1. 创建一个py脚本,复制粘贴即可。修改一下# 用户凭证和URL信息的数据
    在这之后,安装一下运行py脚本所需的依赖.打开powershell,输入下面指令
pip install paramiko aiofiles websockets requests

import paramiko
import datetime
import asyncio
import requests
import aiofiles
import websockets
from websockets.exceptions import ConnectionClosed

# Telegram 推送所需的 token 和 chat ID
TELEGRAM_BOT_TOKEN = ""
TELEGRAM_CHAT_ID = ""

# 用户凭证和URL信息
user_credentials = [
    {'username': '', 'password': '', 'hostname': 's14.serv00.com', 'port': 22, 
     'process': '', 
     'command': '',
     'url': '',
	 'crontab_tasks':[
         "",
         ""
     ]
	 },
    {'username': '', 'password': '', 'hostname': 's14.serv00.com', 'port': 22, 
     'process': '', 
     'command': '',
     'url': '',
	 'crontab_tasks':[
         "",
         ""
     ]
	 } 
]

#文件路径
log_file = 'restart_log.txt'

# log -> open and write
async def log_to_file(file_path, message):
    async with aiofiles.open(file_path, 'a') as f:
        await f.write(message + '\n')

async def check_url_and_restart(user_data):
    url = user_data['url']
    reconnect_interval = 20
    max_attempts = 3
    attempt = 0
    flag = False

    while attempt < max_attempts:
        attempt += 1
        try:
            async with websockets.connect(url, ping_interval=50, ping_timeout=10) as websocket:
                log_message = f"{datetime.datetime.now()} - {user_data['url']} - WebSocket 连接成功."
                print(log_message)
                await log_to_file(log_file, log_message)
                if flag: # 如果是因为ping pong超时,不进行重置attempt
                    flag = False        
                    # 连接成功后重新设置尝试次数
                    attempt = 0
                await websocket.wait_closed()
        except Exception as e:
            flag = True
            log_message = f"{datetime.datetime.now()} - {user_data['url']} - 连接失败: {str(e)}"
            print(log_message)
            await log_to_file(log_file, log_message)
            print(f"尝试重新连接... (尝试次数: {attempt}/{max_attempts})")
            await asyncio.sleep(reconnect_interval)  # 等待重连间隔
            
    # 连接失败,执行重启进程
    log_message = f"{datetime.datetime.now()} - {user_data['username']} - 网站WS连接失败. 重启进程."
    print(log_message)
    await log_to_file(log_file, log_message)
    return await check_and_restart_ssh(user_data)

async def check_and_restart_ssh(user_data):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    try:
        print(f"Connecting to {user_data['hostname']} as {user_data['username']}...")
        ssh.connect(user_data['hostname'], port=user_data['port'], 
                    username=user_data['username'], password=user_data['password'])
        
        # 检查进程是否正在运行
        check_command = f"pgrep -f '{user_data['process']}' | grep -v grep"
        stdin, stdout, stderr = ssh.exec_command(check_command)
        exit_code = stdout.channel.recv_exit_status()

        if exit_code != 0:  # 进程未找到
            print(f"Process not found, restarting for {user_data['username']}...")
            # 执行重启命令
            stdin, stdout, stderr = ssh.exec_command(user_data['command'])
            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Process not found. Restarted."
            send_telegram_message(log_message)
            await log_to_file(log_file, log_message)
            await add_crontab_tasks(ssh, user_data)
            ssh.close()
            return True
        else:
            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Process already running.\n"
            print(f"Process already running for {user_data['username']}...")
            await log_to_file(log_file, log_message)
            ssh.close()
            return True

    except Exception as e:
        log_message = f"{datetime.datetime.now()} - {user_data['username']} - Error: {str(e)}\n"
        print(f"Error for {user_data['username']}: {str(e)}")
        await log_to_file(log_file, log_message)
        ssh.close()
        return False


# 添加定时任务
async def add_crontab_tasks(ssh, user_data):
    crontab_tasks = user_data.get('crontab_tasks', [])
    if not crontab_tasks:   
        return

    try:
        # 获取当前的 crontab
        stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -l")
        existing_crontab = await asyncio.to_thread(stdout.read)
        existing_crontab = existing_crontab.decode().strip()

        # 判断现有的 crontab 是否为空
        if existing_crontab:
            # 如果不为空,先删除现有的 crontab
            await asyncio.to_thread(ssh.exec_command, "crontab -r")
            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Existing crontab deleted."
            print(log_message)
            await log_to_file(log_file, log_message)

        # 添加新的任务
        new_crontab = "\n".join(crontab_tasks)

        # 写入新的 crontab
        stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -")
        stdin.write(new_crontab)
        stdin.close()

        log_message = f"{datetime.datetime.now()} - {user_data['username']} - Added crontab tasks."
        print(log_message)
        with open(log_file, 'a') as f:
            f.write(log_message + '\n')

        # 验证 crontab 是否成功添加
        stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -l")
        current_crontab = await asyncio.to_thread(stdout.read)
        current_crontab = current_crontab.decode().strip()

        if not current_crontab:
            # 如果 crontab 仍然为空,再次尝试添加
            stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -")
            stdin.write(new_crontab)
            stdin.close()

            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Retried adding crontab tasks."
            print(log_message)
            await log_to_file(log_file, log_message)
            send_telegram_message(log_message)

    except Exception as e:
        log_message = f"{datetime.datetime.now()} - {user_data['username']} - Error adding crontab tasks: {str(e)}"
        print(log_message)
        with open(log_file, 'a') as f:
            f.write(log_message + '\n')
        send_telegram_message(log_message)


def send_telegram_message(log_message):
    # 使用 Markdown 格式
    formatted_message = f"""
*🎯 serv00进程保活脚本运行报告*

📝 *任务报告*:

{log_message}

    """

    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    payload = {
        'chat_id': TELEGRAM_CHAT_ID,
        'text': formatted_message,
        'parse_mode': 'Markdown',  # 使用 Markdown 格式
    }
    headers = {
        'Content-Type': 'application/json'
    }

    try:
        response = requests.post(url, json=payload, headers=headers)
        if response.status_code != 200:
            print(f"发送消息到Telegram失败: {response.text}")
        else:
            print(f"消息发送成功: {log_message}")
    except Exception as e:
        print(f"发送消息到Telegram时出错: {e}")

async def main():
    max_checks = 3  # 最大检查次数

    async def user_task(user_data):
        results = []
        for _ in range(max_checks):
            result = await check_url_and_restart(user_data)
            results.append(result)
            if not result:
                break
        return results

    tasks = [user_task(user_data) for user_data in user_credentials]
    all_results = await asyncio.gather(*tasks)
    
    # 处理每个用户的结果
    for user_data, user_results in zip(user_credentials, all_results):
        print(f"Results for {user_data['username']}: {user_results}")


if __name__ == "__main__":
    asyncio.run(main())

第四步:尝试运行客户端py脚本:
注意在做测试时,要将process的进程优先消灭掉,再执行。 Maybe

第五步:一个bat脚本:隐藏窗口启动+开机自启。

@echo off
start "" pythonw.exe "C:\yourpath\xxx.py"
exit

第六步:将你多个服务器端都开启ws进程:实现多服务器都实现一个py进行监控。

15 个赞

感谢大佬教程!

1 个赞

感谢佬友分享~~

感谢分享大佬厉害啊

感谢佬友分享~~

马克备用,谢谢

感谢大佬教程 期待那位博主,做一期视频,给小白喂饭

感谢佬友分享

修改了下内容,应该还是比较简单的。~~tieba_047

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。