-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathWorker.py
More file actions
354 lines (308 loc) · 15.7 KB
/
Worker.py
File metadata and controls
354 lines (308 loc) · 15.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
from __future__ import print_function
import os
import time
import logging
import traceback
import multiprocessing
from queue import Empty
from logging import FileHandler
from http.client import HTTPException
from logging.handlers import TimedRotatingFileHandler
import sys
if sys.version_info >= (3, 0):
from urllib.parse import urlencode # pylint: disable=no-name-in-module
if sys.version_info < (3, 0):
from urllib import urlencode
from RESTInteractions import CRABRest
from TaskWorker.DataObjects.Result import Result
from ServerUtilities import truncateError, executeCommand, FEEDBACKMAIL
from TaskWorker.WorkerExceptions import WorkerHandlerException, TapeDatasetException,\
ChildUnexpectedExitException, ChildTimeoutException, SubmissionRefusedException
from TaskWorker.ChildWorker import startChildWorker
## Creating configuration globals to avoid passing these around at every request
## and tell pylink to bare with this :-)
# pylint: disable=W0604, W0601
global WORKER_CONFIG
def addTaskLogHandler(logger, username, taskname, logsDir):
#set the logger to save the tasklog
formatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s:%(message)s")
taskdirname = logsDir+"/tasks/%s/" % username
try:
os.mkdir(taskdirname)
except OSError as ose:
if ose.errno != 17: #ignore the "Directory already exists error" but print other errors traces
logger.exception("Cannot set task handler logfile for task %s. Ignoring and continuing normally." % taskname)
taskhandler = FileHandler(taskdirname + taskname + '.log')
taskhandler.setFormatter(formatter)
taskhandler.setLevel(logging.DEBUG)
logger.addHandler(taskhandler)
return taskhandler
def failTask(taskName, crabserver, msg, log, failstatus='FAILED'):
try:
log.info("Uploading failure message to the REST:\n%s", msg)
truncMsg = truncateError(msg)
configreq = {'workflow': taskName,
'status': failstatus,
'subresource': 'failure',
# Limit the message to 1000. That's the limit in the REST
'failure': truncMsg}
crabserver.post(api='workflowdb', data = urlencode(configreq))
log.info("Failure message successfully uploaded to the REST")
except HTTPException as hte:
log.warning("Cannot upload failure message to the REST for task %s. HTTP exception headers follows:", taskName)
log.error(hte.headers)
except Exception as exc: #pylint: disable=broad-except
log.warning("Cannot upload failure message to the REST for workflow %s.\nReason: %s", taskName, exc)
log.exception('Traceback follows:')
def removeTaskLogHandler(logger, taskhandler):
taskhandler.flush()
taskhandler.close()
logger.removeHandler(taskhandler)
def processWorkerLoop(inputs, results, resthost, dbInstance, procnum, logger, logsDir):
procName = "Process-%s" % procnum
while True:
try:
## Get (and remove) an item from the input queue. If the queue is empty, wait
## until an item is available. Item content is:
## workid : an integer assigne by the queue module
## work : a function handler to the needed action e.g. function handleNewTask
## task : a task dictionary
## failstatus : the status to assign to the task if work fails (e.g. 'SUBMITFAILED')
workid, work, task, failstatus, inputargs = inputs.get()
if work == 'STOP':
break
taskhandler = addTaskLogHandler(logger, task['tm_username'], task['tm_taskname'], logsDir)
except (EOFError, IOError):
crashMessage = "Hit EOF/IO in getting new work\n"
crashMessage += "Assuming this is a graceful break attempt.\n"
logger.error(crashMessage)
break
outputs = None
t0 = time.time()
#log entry below is used for logs parsing, therefore, changing it might require to update logstash configuration
logger.debug("%s: Starting %s on %s", procName, str(work), task['tm_taskname'])
try:
msg = None
if hasattr(WORKER_CONFIG, 'FeatureFlags') and \
hasattr(WORKER_CONFIG.FeatureFlags, 'childWorker') and \
WORKER_CONFIG.FeatureFlags.childWorker:
logger.debug(f'Run {work.__name__} in childWorker.')
args = (resthost, dbInstance, WORKER_CONFIG, task, procnum, inputargs)
workOutput = startChildWorker(WORKER_CONFIG, work, args, logger)
else:
workOutput = work(resthost, dbInstance, WORKER_CONFIG, task, procnum, inputargs)
# note that the output of the "work" is assigned to a variable only to allow
# inspection for debugging purposes, but we do not want to put it in the
# multiprocess.Queue since in case it contains non-pickleable objects (htcondo.Submit e.g.)
# the put fails silently, see https://github.com/python/cpython/issues/84376
# for the purpose of the Worker processing we do not need to pass around all the
# info anyhow, so we only put the task Dictionart which
# a) is safe b) can be used for debugging inside Worker or MasterWorker
# reference: https://github.com/dmwm/CRABServer/pull/9026
outputs = Result(task=task, result="OK")
except TapeDatasetException as tde:
outputs = Result(task=task, err=str(tde))
except SubmissionRefusedException as sre:
outputs = Result(task=task, err=str(sre))
except WorkerHandlerException as we:
outputs = Result(task=task, err=str(we))
msg = str(we)
except (ChildUnexpectedExitException, ChildTimeoutException) as e:
# custom message
outputs = Result(task=task, err=str(e))
msg = f"Server-side failed with an error: {str(e)}"
msg += "\n This could be a temporary glitch. Please try again later."
msg += f"\n If the error persists, please send an e-mail to {FEEDBACKMAIL}."
except Exception as exc: #pylint: disable=broad-except
outputs = Result(task=task, err=str(exc))
msg = "%s: I just had a failure for %s" % (procName, str(exc))
msg += "\n\tworkid=" + str(workid)
msg += "\n\ttask=" + str(task['tm_taskname'])
msg += "\n" + str(traceback.format_exc())
finally:
if msg:
crabserver = CRABRest(resthost, WORKER_CONFIG.TaskWorker.cmscert, WORKER_CONFIG.TaskWorker.cmskey,
retry=20, logger=logger, userAgent='CRABTaskWorker')
crabserver.setDbInstance(dbInstance)
failTask(task['tm_taskname'], crabserver, msg, logger, failstatus)
t1 = time.time()
workType = task.get('tm_task_command', 'RECURRING')
#log entry below is used for logs parsing, therefore, changing it might require to update logstash configuration
logger.debug("%s: %s work on %s completed in %d seconds: %s", procName, workType, task['tm_taskname'], t1-t0, outputs)
try:
out, _, _ = executeCommand("ps u -p %s | awk '{sum=sum+$6}; END {print sum/1024}'" % os.getpid())
msg = "RSS after finishing %s: %s MB" % (task['tm_taskname'], out.strip())
logger.debug(msg)
except Exception: # pylint: disable=broad-except
logger.exception("Problem getting worker RSS:")
removeTaskLogHandler(logger, taskhandler)
logger.debug("About to put out message in results queue for workid %s", workid)
logger.debug(" out.result: %s ", outputs.result)
logger.debug(" out.task: %s ", outputs.task)
try:
results.put({
'workid': workid,
'out' : outputs
})
except Exception as ex: # pylint: disable=broad-except
logger.error(f"Unable to push worker result to shared queue:\n{ex}")
logger.debug("Done")
def processWorker(inputs, results, resthost, dbInstance, logsDir, procnum):
"""Wait for an reference to appear in the input queue, call the referenced object
and write the output in the output queue.
:arg Queue inputs: the queue where the inputs are shared by the master
:arg Queue results: the queue where this method writes the output
:return: default returning zero, but not really needed."""
logger = setProcessLogger(str(procnum), logsDir)
logger.info("Process %s is starting. PID %s", procnum, os.getpid())
try:
processWorkerLoop(inputs, results, resthost, dbInstance, procnum, logger, logsDir)
except: #pylint: disable=bare-except
#if enything happen put the log inside process logfiles instead of nohup.log
logger.exception("Unexpected error in process worker!")
logger.debug("Slave %s exiting.", procnum)
return 0
def setProcessLogger(name, logsDir):
""" Set the logger for a single process. The file used for it is logsiDir/processes/proc.name.txt and it
can be retrieved with logging.getLogger(name) in other parts of the code
"""
logger = logging.getLogger(name)
handler = TimedRotatingFileHandler(logsDir+'/processes/proc.c3id_%s.pid_%s.txt' % (name, os.getpid()), 'midnight', backupCount=30)
formatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s:%(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
class Worker(object):
"""Worker class providing all the functionalities to manage all the slaves
and distribute the work"""
def __init__(self, config, resthost, dbInstance):
"""Initializer
:arg WMCore.Configuration config: input TaskWorker configuration
:arg str instance: the hostname where the rest interface is running
:arg str dbInstance: the DB instance to use"""
self.logger = logging.getLogger("master")
global WORKER_CONFIG
WORKER_CONFIG = config
#global WORKER_DBCONFIG
self.pool = []
self.nworkers = WORKER_CONFIG.TaskWorker.nslaves if getattr(WORKER_CONFIG.TaskWorker, 'nslaves', None) is not None else multiprocessing.cpu_count()
## limit the size of the queue to be al maximum twice then the number of worker
self.leninqueue = self.nworkers*2
self.inputs = multiprocessing.Queue(self.leninqueue)
self.results = multiprocessing.Queue()
self.working = {}
self.resthost = resthost
self.dbInstance = dbInstance
def begin(self):
"""Starting up all the slaves"""
if len(self.pool) == 0:
# Starting things up
for x in range(1, self.nworkers + 1):
self.logger.debug("Starting process %i", x)
p = multiprocessing.Process(target = processWorker, args = (self.inputs, self.results, self.resthost, self.dbInstance, WORKER_CONFIG.TaskWorker.logsDir, x))
p.start()
self.pool.append(p)
self.logger.info("Started %d slaves", len(self.pool))
def end(self):
"""Stopping all the slaves"""
self.logger.debug("Ready to close all %i started processes ", len(self.pool))
for p in self.pool:
try:
## Put len(self.pool) messages in the subprocesses queue.
## Each subprocess will work on one stop message and exit
self.logger.debug("Putting stop message in the queue for %s ", str(p))
self.inputs.put(('-1', 'STOP', 'control', 'STOPFAILED', []))
except Exception as ex: #pylint: disable=broad-except
msg = "Hit some exception in deletion\n"
msg += str(ex)
self.logger.error(msg)
self.logger.info('Slaves stop messages sent. Waiting for subprocesses.')
for p in self.pool:
try:
p.join()
except Exception as ex: #pylint: disable=broad-except
msg = "Hit some exception in join\n"
msg += str(ex)
self.logger.error(msg)
self.logger.info('Subprocesses ended!')
self.pool = []
return
def injectWorks(self, items):
"""Takes care of iterating on the input works to do and
injecting them into the queue shared with the slaves
:arg list of tuple items: list of tuple, where each element
contains the type of work to be
done, the task object and the args."""
self.logger.debug("Ready to inject %d items", len(items))
workid = 0 if len(self.working.keys()) == 0 else max(self.working.keys()) + 1
for work in items:
worktype, task, failstatus, arguments = work
self.inputs.put((workid, worktype, task, failstatus, arguments))
self.working[workid] = {'workflow': task['tm_taskname'], 'injected': time.time()}
self.logger.info('Injecting work %d: %s', workid, task['tm_taskname'])
workid += 1
self.logger.debug("Injection completed.")
def checkFinished(self):
"""Verifies if there are any finished jobs in the output queue
:return Result: the output of the work completed."""
if len(self.working.keys()) == 0:
return []
allout = []
self.logger.info("%d work on going, checking if some has finished", len(self.working.keys()))
for _ in range(len(self.working.keys())):
out = None
try:
out = self.results.get_nowait()
except Empty:
pass
if out is not None:
# getting output from queue returns the workid in the queue and
# the Result object from the action handler
workid = out['workid']
result = out['out']
if result:
taskname = result.task['tm_taskname']
self.logger.debug('Completed work %d on %s', workid, taskname)
else:
# recurring actions do not return a Result object
self.logger.debug('Completed work %d %s', workid, str(out))
if isinstance(out['out'], list):
allout.extend(out['out'])
else:
allout.append(out['out'])
del self.working[out['workid']]
return allout
def freeSlaves(self):
"""Count how many unemployed slaves are there
:return int: number of free slaves."""
if self.queuedTasks() >= len(self.pool):
return 0
return len(self.pool) - self.queuedTasks()
def queuedTasks(self):
"""Count how many busy slaves are out there
:return int: number of working slaves."""
return len(self.working)
def listQueuedTasks(self):
""" list tasks being worked on """
tasks = [ v['workflow'] for v in self.working.values() ]
return tasks
def listWorks(self):
""" list what's being worked on"""
return self.working
def queueableTasks(self):
"""Depending on the queue size limit
return the number of free slots in
the working queue.
:return int: number of acquirable tasks."""
if self.queuedTasks() >= self.leninqueue:
return 0
return self.leninqueue - self.queuedTasks()
def pendingTasks(self):
"""Return the number of tasks pending
to be processed and already in the
queue.
:return int: number of tasks waiting
in the queue."""
if self.queuedTasks() <= len(self.pool):
return 0
return self.queuedTasks() - len(self.pool)