11#!/usr/bin/env python3
2+ import sys
23import asyncio
34import json
4- import sys
55from argparse import ArgumentParser
66import logging
77import sqlite3
88import string
99import random
1010import os
11+ import time
12+ import tempfile
1113
1214## Configuration
13- DATABASE_FILE = "/tmp/clustersubmit.sqlite3"
14- LOG_FILE = "/tmp/clustersubmit.log"
15+ DATABASE_FILE = os .environ .get ("DATABASE_FILE" , os .path .join (tempfile .gettempdir (), "clustersubmit.sqlite3" ))
16+ LOG_FILE = os .environ .get ("LOG_FILE" , os .path .join (tempfile .gettempdir (), "clustersubmit.log" ))
17+ TIMEOUT_SECONDS = float (os .environ .get ("TIMEOUT_SECONDS" , "300" ))
1518
1619logging .basicConfig (
1720 level = logging .DEBUG ,
@@ -41,10 +44,27 @@ args = parser.parse_args()
4144
4245## Initialization
4346with sqlite3 .connect (DATABASE_FILE ) as db :
44- db .execute ("CREATE TABLE IF NOT EXISTS jobs(jobid TEXT, jobscript TEXT, done INT, testtime)" )
45- db .execute ("CREATE TABLE IF NOT EXISTS edges(parent TEXT, child TEXT, testtime)" )
46- db .execute ("DELETE FROM jobs WHERE testtime <= datetime('now', '-1 hour')" )
47- db .execute ("DELETE FROM edges WHERE testtime <= datetime('now', '-1 hour')" )
47+ db .execute ("""
48+ CREATE TABLE IF NOT EXISTS jobs(
49+ jobid TEXT PRIMARY KEY,
50+ jobscript TEXT,
51+ done INT,
52+ created_at TEXT
53+ )
54+ """ )
55+
56+ db .execute ("""
57+ CREATE TABLE IF NOT EXISTS edges(
58+ parent TEXT,
59+ child TEXT,
60+ created_at TEXT,
61+ FOREIGN KEY(parent) REFERENCES jobs(jobid),
62+ FOREIGN KEY(child) REFERENCES jobs(jobid)
63+ );
64+ """ )
65+
66+ db .execute ("DELETE FROM jobs WHERE created_at <= datetime('now', '-1 hour')" )
67+ db .execute ("DELETE FROM edges WHERE created_at <= datetime('now', '-1 hour')" )
4868
4969
5070
@@ -57,69 +77,79 @@ async def execute_single_job(jobid, jobscript):
5777 """
5878 logging .info (f"Executing { jobid } " )
5979 proc = await asyncio .create_subprocess_shell (jobscript )
60- await proc .communicate ()
80+ stdout , stderr = await proc .communicate ()
81+ if proc .returncode != 0 :
82+ logging .critical (stdout )
83+ logging .critical (stderr )
84+ raise RuntimeError (f"Falied executing job { jobid } " )
85+
6186 async with db_lock :
6287 with sqlite3 .connect (DATABASE_FILE , timeout = 30 ) as db :
6388 db .execute ("PRAGMA journal_mode=WAL" )
6489 db .execute ("UPDATE jobs SET done = 1 WHERE jobid = ?" , (jobid ,))
6590 logging .info (f"Marked { jobid } as done" )
66- async def execute_jobs ():
91+
92+
93+ async def execute_jobs (timeout_seconds = TIMEOUT_SECONDS ):
6794 """
6895 Simplistic scheduler checking jobs that can run and executing them
6996 """
70- for _ in range (100 ):
97+ start = time .perf_counter ()
98+ while (time .perf_counter () - start ) < timeout_seconds :
7199 with sqlite3 .connect (DATABASE_FILE ) as db :
72100 result = db .execute (QUERY_RUNNABLE ).fetchall ()
73- if len (result ):
74- async with asyncio .TaskGroup () as tg :
75- for row in result :
76- tg .create_task (execute_single_job (* row ))
77- else :
101+ if not result :
78102 logging .info ("Could not find any other job to execute." )
79103 return
104+
105+ async with asyncio .TaskGroup () as tg :
106+ for row in result :
107+ tg .create_task (execute_single_job (* row ))
108+
109+ raise TimeoutError (f"Exceeded timeout of { timeout_seconds } seconds" )
80110
81111
82112if args .execute :
83113 asyncio .run (execute_jobs ())
84114 exit (0 )
85115elif args .jobscript is None :
116+ logging .critical (f"Failed to submit a job: missing jobscript. { sys .argv } " )
86117 raise ValueError ("Expected jobscript as the last positional argument" )
87118
88119
89120
90-
91121## Submission
92122with open (args .jobscript ) as input_file :
93- for line in input_file :
94- if line .startswith ("# properties = " ):
95- properties = json .loads (line [len ("# properties = " ):- 1 ])
96- break
123+ jobscript_content = input_file .read ()
124+
125+ for line in jobscript_content .splitlines ():
126+ if line .startswith ("# properties = " ):
127+ properties = json .loads (line [len ("# properties = " ):])
128+ break
97129
98130logging .info (f"---" )
99131logging .info (f"Processing rule: { properties ['rule' ]} " )
100132
101133job_name = '' .join (
102- [properties ['rule' ], '-' ] +
134+ [properties ['rule' ], '-' , str ( time . time ()), '-' ] +
103135 [random .choice (string .ascii_lowercase ) for _ in range (5 )]
104136)
105137
106- dependencies = [ s for s in args .dependencies .split (" " ) if s not in [ ' ' , '' , None ]]
138+ dependencies = args .dependencies .split ()
107139logging .info (f"Submitting `{ job_name } ` which depends on { dependencies } " )
108140
109141with sqlite3 .connect (DATABASE_FILE ) as db :
110142 db .execute (
111- "INSERT INTO jobs(jobid, jobscript, done, testtime ) VALUES (?, ?, 0, datetime('now'))" ,
112- (job_name , open ( args . jobscript ). read () )
143+ "INSERT INTO jobs(jobid, jobscript, done, created_at ) VALUES (?, ?, 0, datetime('now'))" ,
144+ (job_name , jobscript_content )
113145 )
114146
115147 for dependency in dependencies :
116148 db .execute (
117- "INSERT INTO edges(parent, child, testtime ) VALUES (?, ?, datetime('now'))" ,
149+ "INSERT INTO edges(parent, child, created_at ) VALUES (?, ?, datetime('now'))" ,
118150 (dependency , job_name )
119151 )
120152
121-
122-
123153print (job_name )
124154
125155
0 commit comments