项目思路总领:
通过验证项目网站的存活,判断是否需要进行ssh连接并执行启动程序。
工作流程:
crontab进行基础保活。如果crontab失效,即脚本访问项目网站就会报错,这时会自动ssh连接来启动进程,同时还会写入crontab。
使用建议:
如果有两台以上的,可以使用这位佬友的方法,进行多台设备相互保活。如果仅有一台,可以尝试使用此方式(仅建议
)
telegram推送如下。(telegram 可以不填。)
补丁1:异步操作查询各大域名。同一账号仅ssh连接一次
补丁2:在执行ssh连接后,多次测试网站存活。防止启动失败的情况。
补丁3:增加了crontab写入保活。
如果你有更好的解决办法,欢迎。 ![]()
第一步:新建py脚本,复制粘贴以下代码
import paramiko
import datetime
import asyncio
import os
import requests
from aiohttp import ClientSession
# Telegram 推送所需的 token 和 chat ID
TELEGRAM_BOT_TOKEN = ''
TELEGRAM_CHAT_ID = ''
# 用户凭证和URL信息
user_credentials = [
{'username': '', 'password': '', 'hostname': 's14.serv00.com', 'port': 22,
'services': [
{
'process': '',
'command': '',
'url': ''
}
],
'crontab_tasks': [
""
]
},
{'username': '', 'password': '', 'hostname': 's14.serv00.com', 'port': 22,
'services': [
{
'process': '',
'command': '',
'url': ''
},
{
'process': '',
'command': '',
'url': ''
}
],
'crontab_tasks': [
"",
""
]
}
]
# 用于存储需要推送的消息
telegram_messages = []
log_file = 'restart_log.txt'
# 用于管理 SSH 连接的状态
class SSHManager:
def __init__(self):
self.ssh_clients = {}
async def get_ssh_client(self, user_data, max_retries=3, timeout=10):
if user_data['username'] not in self.ssh_clients:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
for attempt in range(max_retries):
try:
await asyncio.wait_for(
asyncio.to_thread(
ssh.connect,
user_data['hostname'],
port=user_data['port'],
username=user_data['username'],
password=user_data['password'],
timeout=timeout
),
timeout=timeout
)
self.ssh_clients[user_data['username']] = ssh
return ssh
except asyncio.TimeoutError:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - SSH connection timed out. Attempt {attempt + 1} of {max_retries}."
print(log_message)
if attempt == max_retries - 1:
telegram_messages.append(log_message)
except Exception as e:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Unable to connect to SSH. Attempt {attempt + 1} of {max_retries}. Error: {str(e)}"
print(log_message)
if attempt == max_retries - 1:
telegram_messages.append(log_message)
if attempt < max_retries - 1:
await asyncio.sleep(5) # 在重试之前等待5秒
return self.ssh_clients.get(user_data['username'])
async def close_all(self):
for username, ssh in self.ssh_clients.items():
ssh.close()
print(f"Closed SSH connection for {username}")
self.ssh_clients.clear()
async def check_url_and_restart(user_data, ssh_manager):
async with ClientSession() as session:
try:
async with session.get(user_data['url'], timeout=10) as response:
if response.status in [200, 201, 202, 203, 204, 301, 302, 303, 307, 308, 401, 403]:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Website is accessible (Status: {response.status})."
print(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
return False # 网站可访问,通常不需要重启
else:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Website is not accessible (Status: {response.status}). Attempting SSH connection."
print(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
return await check_and_restart_ssh(user_data, ssh_manager) # 进程已启动,返回True
except Exception as e:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Unable to connect to the website. Attempting SSH connection. Error: {str(e)}"
print(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
return await check_and_restart_ssh(user_data, ssh_manager)
# 网站不可访问,发起ssh连接
async def check_and_restart_ssh(user_data, ssh_manager):
ssh = await ssh_manager.get_ssh_client(user_data)
if not ssh:
return False
try:
check_command = f"pgrep -f '{user_data['process']}' | grep -v grep"
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, check_command)
exit_code = await asyncio.to_thread(stdout.channel.recv_exit_status)
if exit_code != 0: # 进程未找到
print(f"Process not found, restarting for {user_data['username']}...")
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, user_data['command'])
error_output = await asyncio.to_thread(stderr.read)
error_output = error_output.decode().strip()
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Process '{user_data['process']}' not found. Restarted."
if error_output:
log_message += f"Error: {error_output}"
telegram_messages.append(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
# 在这里添加 crontab 任务
await add_crontab_tasks(ssh, user_data, log_file, telegram_messages)
return True # 是的,已经重新启动进程
else:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Process '{user_data['process']}' is running."
print(f"Process already running for {user_data['username']}...")
with open(log_file, 'a') as f:
f.write(log_message + '\n')
return True # 进程已经在运行
except Exception as e:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Error: {str(e)}"
print(f"Error for {user_data['username']}: {str(e)}")
telegram_messages.append(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
return False # 出现错误
# 添加定时任务
async def add_crontab_tasks(ssh, user_data, log_file, telegram_messages):
crontab_tasks = user_data.get('crontab_tasks', [])
if not crontab_tasks:
return
try:
# 获取当前的 crontab
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -l")
existing_crontab = await asyncio.to_thread(stdout.read)
existing_crontab = existing_crontab.decode().strip()
# 判断现有的 crontab 是否为空
if existing_crontab:
# 如果不为空,先删除现有的 crontab
await asyncio.to_thread(ssh.exec_command, "crontab -r")
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Existing crontab deleted."
print(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
# 添加新的任务
new_crontab = "\n".join(crontab_tasks)
# 写入新的 crontab
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -")
stdin.write(new_crontab)
stdin.close()
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Added crontab tasks."
print(log_message)
telegram_messages.append(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
# 验证 crontab 是否成功添加
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -l")
current_crontab = await asyncio.to_thread(stdout.read)
current_crontab = current_crontab.decode().strip()
if not current_crontab:
# 如果 crontab 仍然为空,再次尝试添加
stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -")
stdin.write(new_crontab)
stdin.close()
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Retried adding crontab tasks."
print(log_message)
telegram_messages.append(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
except Exception as e:
log_message = f"{datetime.datetime.now()} - {user_data['username']} - Error adding crontab tasks: {str(e)}"
print(log_message)
with open(log_file, 'a') as f:
f.write(log_message + '\n')
telegram_messages.append(log_message)
async def send_telegram_messages():
if not telegram_messages:
return
formatted_message = "*🎯 serv00进程保活脚本运行报告*\n\n"
formatted_message += "\n".join(telegram_messages)
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
'chat_id': TELEGRAM_CHAT_ID,
'text': formatted_message,
'parse_mode': 'Markdown',
}
async with ClientSession() as session:
try:
async with session.post(url, json=payload) as response:
if response.status != 200:
print(f"发送消息到Telegram失败: {await response.text()}")
else:
print("消息发送成功")
except Exception as e:
print(f"发送消息到Telegram时出错: {e}")
async def main():
ssh_manager = SSHManager()
max_checks = 3 # 最大检查次数
current_check = 0 # 当前检查次数
while current_check < max_checks:
tasks = []
results = []
for user_data in user_credentials:
for service in user_data['services']:
service_data = {**user_data, **service}
tasks.append(check_url_and_restart(service_data, ssh_manager))
results = await asyncio.gather(*tasks)
if not any(results):
# 如果没有需要重启的服务,提前退出循环
break
current_check += 1 # 增加检查计数
# 如果还没有达到最大检查次数,则休眠5秒后再次检查
if current_check < max_checks:
await asyncio.sleep(5)
print(f"——————————————————————\n第{current_check + 1}次域名连接检查")
await ssh_manager.close_all()
# 在所有任务完成后发送整合的消息
await send_telegram_messages()
if __name__ == "__main__":
asyncio.run(main())
第二步:打开powershell / cmd ,安装依赖。
pip install paramiko aiohttp requests
第三步:需要填入py脚本文件中的 # 用户凭证和URL信息部分
重点填入services部分: process 、command 、url 和 crontab_tasks
注意:
1.services部分,如果你有多个进程,可以填入多个。没有就删除掉多的部分就可以。
2.process表示你需要在服务器启动的进程。command表示你通过什么命令去启动该进程。
3.url是你的项目域名。
4.crontab_tasks是你需要设置任务。
第四步:ssh连接serv00,获取process(以new api为例)
ps aux
![]()
填入process——》‘process’: ‘new-api’ (包含一些关键词就可以)
第五步:获取command(项目运行执行路径)
如果你是newapi,你应该在部署的时候有start.sh或者restart.sh.
切换到放置了sh启动脚本的文件夹。当然如果其他项目,可以去找到对应的启动sh脚本的目录。
当然也可以是纯碎的命令
cd ~/domains/你的项目域名/public_html/
输入pwd获取路径信息
pwd
得到如下:
/home/sliencem1/domains/你的项目域名/public_html
ok。
填入command——》
‘command’: '/home/sliencem1/domains/你的项目域名/public_html/restart.sh ’
或者
‘command’: '~/domains/你的项目域名/public_html/restart.sh ’
第六步:获取url和填入crontab_tasks。
url即为域名网站。
crontab_tasks可以自由发挥。这边给个案例。
@reboot ~/domains/域名网站/restart.sh >> ~/domains/xx-restart.log 2>&1
第七步:填入telegram的token和chatid(可选)
第八步:运行该脚本即可。
定时运行:可以选择一些定时任务的软件运行。或者敲一下循环定时相关的代码
