项目思路总领:
在serv00创建ws进程,客户端与服务器实现ws连接,这是一个双向的连接!
当服务器的ws进程被干掉了,我作为客户端就被会告知,这时客户端将主动去ssh连接,先检查监控process是否存在,不存在,则运行command和增加crontab。就这样~
预警:
过程示例是ws连接,而不是wss(类似https),所以我觉得设置token似乎也没用,就没设置,懂的大佬可以帮我解答下。拜谢~~
ws连接成功后,每隔50s会对服务器进行一次ping操作,进行ws保活,如果该时间会导致ws断开,请手动修改py代码中的时间。
telegram推送如下:
第一步:创建端口:随意创建就好
第二步:创建ws进程:ssh连接服务器和一些简单的代码
2.1 安装依赖库(位置随意,当前位置就ok)
npm install ws
2.2 初始化
mkdir websocket-server
cd websocket-server
npm init -y
2.3 服务器配置文件
vim server.js
输入下面的代码.
注:port是你前面注册的端口号。
const WebSocket = require('ws');
// 创建 WebSocket 服务器实例
const wss = new WebSocket.Server({ port: 8080 });
// 监听客户端连接事件
wss.on('connection', function connection(ws) {
console.log('客户已连接');
// 连接关闭事件
ws.on('close', function() {
console.log('客户已断开连接');
});
});
console.log('WebSocket服务器在 ws://localhost:8080 上运行');
2.4 启动ws服务器端
简单启动如下:
node server.js
![]()
通过 sh restart.sh启动
在server.js同级目录新建restart.sh,并复制粘贴下面代码(可选),
WORK_DIR="/home/用户名/websocket-server"
if ! pgrep -f "node server.js" > /dev/null
then
cd "$WORK_DIR"
nohup node server.js > /dev/null 2>&1 & echo "Restarted server.js at $(date)" >> "$WORK_DIR/restart_log.txt"
fi
|
|
|分割线
|
|
第三步:配置客户端:主要就是一个py脚本。
参数说明:
1.telegram选填。
2.url格式:你需要监听的域名
ws://你的项目域名:创建的端口号
3.process: 建议node server.js
ws连接断开,客户端连接ssh进行识别判断客户端进程是否存在。
process作为一个监控进程,也可能因为网络问题连接断开,重启断开(crontab可以启动),所以需要判断一下。
如图(填写node server.js即可)
![]()
4.command: 命令1;命令2;命令3
各种命令是你需要保活的进程启动命令。示例:启动ws服务器进程
~/websocket-server/restart.sh
5.crontab_tasks:
显然是在你某一次失活的时候,自动为你的服务器加上的crontab_tasks。
自由发挥:以websocket服务器进程示例
@reboot ~/websocket-server/restart.sh
- 创建一个py脚本,复制粘贴即可。修改一下# 用户凭证和URL信息的数据
在这之后,安装一下运行py脚本所需的依赖.打开powershell,输入下面指令
pip install paramiko aiofiles websockets requests
import paramiko
import datetime
import asyncio
import requests
import aiofiles
import websockets
from websockets.exceptions import ConnectionClosed
# Telegram 推送所需的 token 和 chat ID
TELEGRAM_BOT_TOKEN = ""
TELEGRAM_CHAT_ID = ""
# 用户凭证和URL信息
user_credentials = [
{'username': '', 'password': '', 'hostname': 's14.serv00.com', 'port': 22,
'process': '',
'command': '',
'url': '',
'crontab_tasks':[
"",
""
]
},
{'username': '', 'password': '', 'hostname': 's14.serv00.com', 'port': 22,
'process': '',
'command': '',
'url': '',
'crontab_tasks':[
"",
""
]
}
]
#文件路径
log_file = 'restart_log.txt'
# log -> open and write
async def log_to_file(file_path, message):
async with aiofiles.open(file_path, 'a') as f:
await f.write(message + '\n')
async def check_url_and_restart(user_data):
url = user_data['url']
reconnect_interval = 20
max_attempts = 3
attempt = 0
flag = False
while attempt < max_attempts:
attempt += 1
try:
async with websockets.connect(url, ping_interval=50, ping_timeout=10) as websocket:
log_message = f"{datetime.datetime.now()} - {user_data['url']} - WebSocket 连接成功."
print(log_message)
await log_to_file(log_file, log_message)
if flag: # 如果是因为ping pong超时,不进行重置attempt
flag = False
# 连接成功后重新设置尝试次数
attempt = 0
await websocket.wait_closed()
except Exception as e:
flag = True
log_message = f"{datetime.datetime.now()} - {user_data['url']} - 连接失败: {str(e)}"
print(log_message)
await log_to_file(log_file, log_message)
print(f"尝试重新连接... (尝试次数: {attempt}/{max_attempts})")
await asyncio.sleep(reconnect_interval) # 等待重连间隔
# 连接失败,执行重启进程
log_message = f"{datetime.datetime.now()} - {user_data['username']} - 网站WS连接失败. 重启进程."
print(log_message)
await log_to_file(log_file, log_message)
return await check_and_restart_ssh(user_data)
async def check_and_restart_ssh(user_data):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
print(f"Connecting to {user_data['hostname']} as {user_data['username']}...")
ssh.connect(user_data['hostname'], port=user_data['port'],
username=user_data['username'], password=user_data['password'])
# 检查进程是否正在运行
check_command = f"pgrep -f '{user_data['process']}' | grep -v grep"
stdin, stdout, stderr = ssh.exec_command(check_command)
exit_code = stdout.channel.recv_exit_status()
if exit_code != 0: # 进程未找到
print(f"Process not found, restarting for {user_data['username']}...")
# 执行重启命令
stdin, stdout, stderr = ssh.exec_command(user_data['command'])
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Process not found. Restarted."
send_telegram_message(log_message)
await log_to_file(log_file, log_message)
await add_crontab_tasks(ssh, user_data)
ssh.close()
return True
else:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Process already running.\n"
print(f"Process already running for {user_data['username']}...")
await log_to_file(log_file, log_message)
ssh.close()
return True
except Exception as e:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Error: {str(e)}\n"
print(f"Error for {user_data['username']}: {str(e)}")
await log_to_file(log_file, log_message)
ssh.close()
return False
# 添加定时任务
async def add_crontab_tasks(ssh, user_data):
crontab_tasks = user_data.get('crontab_tasks', [])
if not crontab_tasks:
return
try:
# 获取当前的 crontab
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -l")
existing_crontab = await asyncio.to_thread(stdout.read)
existing_crontab = existing_crontab.decode().strip()
# 判断现有的 crontab 是否为空
if existing_crontab:
# 如果不为空,先删除现有的 crontab
await asyncio.to_thread(ssh.exec_command, "crontab -r")
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Existing crontab deleted."
print(log_message)
await log_to_file(log_file, log_message)
# 添加新的任务
new_crontab = "\n".join(crontab_tasks)
# 写入新的 crontab
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -")
stdin.write(new_crontab)
stdin.close()
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Added crontab tasks."
print(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
# 验证 crontab 是否成功添加
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -l")
current_crontab = await asyncio.to_thread(stdout.read)
current_crontab = current_crontab.decode().strip()
if not current_crontab:
# 如果 crontab 仍然为空,再次尝试添加
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -")
stdin.write(new_crontab)
stdin.close()
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Retried adding crontab tasks."
print(log_message)
await log_to_file(log_file, log_message)
send_telegram_message(log_message)
except Exception as e:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Error adding crontab tasks: {str(e)}"
print(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
send_telegram_message(log_message)
def send_telegram_message(log_message):
# 使用 Markdown 格式
formatted_message = f"""
*🎯 serv00进程保活脚本运行报告*
📝 *任务报告*:
{log_message}
"""
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
'chat_id': TELEGRAM_CHAT_ID,
'text': formatted_message,
'parse_mode': 'Markdown', # 使用 Markdown 格式
}
headers = {
'Content-Type': 'application/json'
}
try:
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
print(f"发送消息到Telegram失败: {response.text}")
else:
print(f"消息发送成功: {log_message}")
except Exception as e:
print(f"发送消息到Telegram时出错: {e}")
async def main():
max_checks = 3 # 最大检查次数
async def user_task(user_data):
results = []
for _ in range(max_checks):
result = await check_url_and_restart(user_data)
results.append(result)
if not result:
break
return results
tasks = [user_task(user_data) for user_data in user_credentials]
all_results = await asyncio.gather(*tasks)
# 处理每个用户的结果
for user_data, user_results in zip(user_credentials, all_results):
print(f"Results for {user_data['username']}: {user_results}")
if __name__ == "__main__":
asyncio.run(main())
第四步:尝试运行客户端py脚本:
注意在做测试时,要将process的进程优先消灭掉,再执行。 Maybe
第五步:一个bat脚本:隐藏窗口启动+开机自启。
@echo off
start "" pythonw.exe "C:\yourpath\xxx.py"
exit
第六步:将你多个服务器端都开启ws进程:实现多服务器都实现一个py进行监控。

