serv00 windows本地+crontab保活

项目思路总领
通过验证项目网站的存活,判断是否需要进行ssh连接并执行启动程序。

工作流程
crontab进行基础保活。如果crontab失效,即脚本访问项目网站就会报错,这时会自动ssh连接来启动进程,同时还会写入crontab。

使用建议
如果有两台以上的,可以使用这位佬友的方法,进行多台设备相互保活。如果仅有一台,可以尝试使用此方式(仅建议tieba_087

telegram推送如下。(telegram 可以不填。)

补丁1:异步操作查询各大域名。同一账号仅ssh连接一次
补丁2:在执行ssh连接后,多次测试网站存活。防止启动失败的情况。
补丁3:增加了crontab写入保活。

如果你有更好的解决办法,欢迎。 :smiling_face_with_tear:

第一步:新建py脚本,复制粘贴以下代码

import paramiko
import datetime
import asyncio
import os
import requests
from aiohttp import ClientSession

# Telegram 推送所需的 token 和 chat ID
TELEGRAM_BOT_TOKEN = ''
TELEGRAM_CHAT_ID = ''

# 用户凭证和URL信息
user_credentials = [
    {'username': '', 'password': '', 'hostname': 's14.serv00.com', 'port': 22, 
     'services': [
            {
                'process': '',
                'command': '',
                'url': ''
            }
        ],
     'crontab_tasks': [
         ""
     ]
    },
    {'username': '', 'password': '', 'hostname': 's14.serv00.com', 'port': 22, 
     'services': [
            {
                'process': '',
                'command': '',
                'url': ''
            },
            {
                'process': '',
                'command': '',
                'url': ''
            }
        ],
     'crontab_tasks': [
         "",
         ""
     ]
    }
]

# 用于存储需要推送的消息
telegram_messages = []

log_file = 'restart_log.txt'

# 用于管理 SSH 连接的状态
class SSHManager:
    def __init__(self):
        self.ssh_clients = {}

    async def get_ssh_client(self, user_data, max_retries=3, timeout=10):
        if user_data['username'] not in self.ssh_clients:
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            for attempt in range(max_retries):
                try:
                    await asyncio.wait_for(
                        asyncio.to_thread(
                            ssh.connect,
                            user_data['hostname'],
                            port=user_data['port'],
                            username=user_data['username'],
                            password=user_data['password'],
                            timeout=timeout
                        ),
                        timeout=timeout
                    )
                    self.ssh_clients[user_data['username']] = ssh
                    return ssh
                except asyncio.TimeoutError:
                    log_message = f"{datetime.datetime.now()} - {user_data['username']} - SSH connection timed out. Attempt {attempt + 1} of {max_retries}."
                    print(log_message)
                    if attempt == max_retries - 1:
                        telegram_messages.append(log_message)
                except Exception as e:
                    log_message = f"{datetime.datetime.now()} - {user_data['username']} - Unable to connect to SSH. Attempt {attempt + 1} of {max_retries}. Error: {str(e)}"
                    print(log_message)
                    if attempt == max_retries - 1:
                        telegram_messages.append(log_message)
                
                if attempt < max_retries - 1:
                    await asyncio.sleep(5)  # 在重试之前等待5秒
        
        return self.ssh_clients.get(user_data['username'])

    async def close_all(self):
        for username, ssh in self.ssh_clients.items():
            ssh.close()
            print(f"Closed SSH connection for {username}")
        self.ssh_clients.clear()

async def check_url_and_restart(user_data, ssh_manager):
    async with ClientSession() as session:
        try:
            async with session.get(user_data['url'], timeout=10) as response:
                if response.status in [200, 201, 202, 203, 204, 301, 302, 303, 307, 308, 401, 403]:
                    log_message = f"{datetime.datetime.now()} - {user_data['username']} - Website is accessible (Status: {response.status})."
                    print(log_message)
                    with open(log_file, 'a') as f:
                        f.write(log_message + '\n')
                    return False  # 网站可访问,通常不需要重启
                else:
                    log_message = f"{datetime.datetime.now()} - {user_data['username']} - Website is not accessible (Status: {response.status}). Attempting SSH connection."
                    print(log_message)
                    with open(log_file, 'a') as f:
                        f.write(log_message + '\n')
                    return await check_and_restart_ssh(user_data, ssh_manager)  # 进程已启动,返回True
        except Exception as e:
            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Unable to connect to the website. Attempting SSH connection. Error: {str(e)}"
            print(log_message)
            with open(log_file, 'a') as f:
                f.write(log_message + '\n')
            return await check_and_restart_ssh(user_data, ssh_manager)

# 网站不可访问,发起ssh连接
async def check_and_restart_ssh(user_data, ssh_manager):
    ssh = await ssh_manager.get_ssh_client(user_data)
    if not ssh:
        return False
    try:
        check_command = f"pgrep -f '{user_data['process']}' | grep -v grep"
        stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, check_command)
        exit_code = await asyncio.to_thread(stdout.channel.recv_exit_status)

        if exit_code != 0:  # 进程未找到
            print(f"Process not found, restarting for {user_data['username']}...")
            stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, user_data['command'])
            error_output = await asyncio.to_thread(stderr.read)
            error_output = error_output.decode().strip()
            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Process '{user_data['process']}' not found. Restarted."
            if error_output:
                log_message += f"Error: {error_output}"
            telegram_messages.append(log_message)
            with open(log_file, 'a') as f:
                f.write(log_message + '\n')
            
            # 在这里添加 crontab 任务
            await add_crontab_tasks(ssh, user_data, log_file, telegram_messages)
            
            return True  # 是的,已经重新启动进程
        else:
            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Process '{user_data['process']}' is running."
            print(f"Process already running for {user_data['username']}...")
            with open(log_file, 'a') as f:
                f.write(log_message + '\n')
            return True  # 进程已经在运行
    except Exception as e:
        log_message = f"{datetime.datetime.now()} - {user_data['username']} - Error: {str(e)}"
        print(f"Error for {user_data['username']}: {str(e)}")
        telegram_messages.append(log_message)
        with open(log_file, 'a') as f:
            f.write(log_message + '\n')
        return False  # 出现错误

# 添加定时任务
async def add_crontab_tasks(ssh, user_data, log_file, telegram_messages):
    crontab_tasks = user_data.get('crontab_tasks', [])
    if not crontab_tasks:   
        return

    try:
        # 获取当前的 crontab
        stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -l")
        existing_crontab = await asyncio.to_thread(stdout.read)
        existing_crontab = existing_crontab.decode().strip()

        # 判断现有的 crontab 是否为空
        if existing_crontab:
            # 如果不为空,先删除现有的 crontab
            await asyncio.to_thread(ssh.exec_command, "crontab -r")
            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Existing crontab deleted."
            print(log_message)
            with open(log_file, 'a') as f:
                f.write(log_message + '\n')

        # 添加新的任务
        new_crontab = "\n".join(crontab_tasks)

        # 写入新的 crontab
        stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -")
        stdin.write(new_crontab)
        stdin.close()

        log_message = f"{datetime.datetime.now()} - {user_data['username']} - Added crontab tasks."
        print(log_message)
        telegram_messages.append(log_message)
        with open(log_file, 'a') as f:
            f.write(log_message + '\n')

        # 验证 crontab 是否成功添加
        stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -l")
        current_crontab = await asyncio.to_thread(stdout.read)
        current_crontab = current_crontab.decode().strip()

        if not current_crontab:
            # 如果 crontab 仍然为空,再次尝试添加
            stdin, stdout, stderr = await asyncio.to_thread(ssh.exec_command, "crontab -")
            stdin.write(new_crontab)
            stdin.close()

            log_message = f"{datetime.datetime.now()} - {user_data['username']} - Retried adding crontab tasks."
            print(log_message)
            telegram_messages.append(log_message)
            with open(log_file, 'a') as f:
                f.write(log_message + '\n')

    except Exception as e:
        log_message = f"{datetime.datetime.now()} - {user_data['username']} - Error adding crontab tasks: {str(e)}"
        print(log_message)
        with open(log_file, 'a') as f:
            f.write(log_message + '\n')
        telegram_messages.append(log_message)



        

async def send_telegram_messages():
    if not telegram_messages:
        return

    formatted_message = "*🎯 serv00进程保活脚本运行报告*\n\n"
    formatted_message += "\n".join(telegram_messages)

    url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
    payload = {
        'chat_id': TELEGRAM_CHAT_ID,
        'text': formatted_message,
        'parse_mode': 'Markdown',
    }

    async with ClientSession() as session:
        try:
            async with session.post(url, json=payload) as response:
                if response.status != 200:
                    print(f"发送消息到Telegram失败: {await response.text()}")
                else:
                    print("消息发送成功")
        except Exception as e:
            print(f"发送消息到Telegram时出错: {e}")


async def main():
    ssh_manager = SSHManager()
    max_checks = 3  # 最大检查次数
    current_check = 0  # 当前检查次数
    while current_check < max_checks:
        tasks = []
        results = []
        for user_data in user_credentials:
            for service in user_data['services']:
                service_data = {**user_data, **service}
                tasks.append(check_url_and_restart(service_data, ssh_manager))
        
        results = await asyncio.gather(*tasks)
        
        if not any(results):
            # 如果没有需要重启的服务,提前退出循环
            break
        
        current_check += 1  # 增加检查计数
        
        # 如果还没有达到最大检查次数,则休眠5秒后再次检查
        if current_check < max_checks:
            await asyncio.sleep(5)
            print(f"——————————————————————\n第{current_check + 1}次域名连接检查")
    
    await ssh_manager.close_all()
    # 在所有任务完成后发送整合的消息
    await send_telegram_messages()

if __name__ == "__main__":
    asyncio.run(main())

第二步:打开powershell / cmd ,安装依赖。

pip install paramiko aiohttp requests

第三步:需要填入py脚本文件中的 # 用户凭证和URL信息部分
重点填入services部分: process 、command 、url 和 crontab_tasks
注意
1.services部分,如果你有多个进程,可以填入多个。没有就删除掉多的部分就可以。
2.process表示你需要在服务器启动的进程。command表示你通过什么命令去启动该进程。
3.url是你的项目域名。
4.crontab_tasks是你需要设置任务。

第四步:ssh连接serv00,获取process(以new api为例)

ps aux 

image
填入process——》‘process’: ‘new-api’ (包含一些关键词就可以)

第五步:获取command(项目运行执行路径)
如果你是newapi,你应该在部署的时候有start.sh或者restart.sh.
切换到放置了sh启动脚本的文件夹。当然如果其他项目,可以去找到对应的启动sh脚本的目录。
当然也可以是纯碎的命令

cd  ~/domains/你的项目域名/public_html/

输入pwd获取路径信息

pwd

得到如下:
/home/sliencem1/domains/你的项目域名/public_html
ok。
填入command——》
‘command’: '/home/sliencem1/domains/你的项目域名/public_html/restart.sh ’
或者
‘command’: '~/domains/你的项目域名/public_html/restart.sh ’

第六步:获取url和填入crontab_tasks。
url即为域名网站。
crontab_tasks可以自由发挥。这边给个案例。

@reboot ~/domains/域名网站/restart.sh >> ~/domains/xx-restart.log 2>&1

第七步:填入telegram的token和chatid(可选)

第八步:运行该脚本即可。
定时运行:可以选择一些定时任务的软件运行。或者敲一下循环定时相关的代码

32 个赞

感谢大佬教程

2 个赞

感谢分享,也可以使用ws连接,如果断开连接就连接ssh启动进程,理论可以达到快速恢复进程的效果

2 个赞

感谢佬的分享

这个是最方便的

1 个赞

网站是http,可以使用ws吗?抱歉我不是特别理解,或许有其他方式进行ws,但感谢你的回复。
tieba_087

需要在serv00创建一个ws连接的进程,有点麻烦,用佬的方法更方便

原来如此。那这确实是可行啊。

强啊佬,加个serv00的标签呗

1 个赞

加了 加了!!

总感觉,等服务挂了才去激活体验挺差的……
不过好像也没什么好办法

1 个赞

是的,感觉如果没有服务器一直去输出的话,会有些断档。现在增加了crontab 应该会好些了。就是不知道crontab是怎么个清除法,但反正先搞上。

已经使用了一个ws进程的连接方式。确实如果是多个服务器的话,相对来说会比较麻烦,但效果确实显著丫~。其他的方法也是不错的~

需要一台不断线的vps作为服务端,如果你有很多serv00账号可以搞自动化,所有操作都可以代码自动完成

确实 有道理的。老哥

感谢佬的分享

1 个赞

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。