-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathPostJob.py
More file actions
3163 lines (2938 loc) · 163 KB
/
PostJob.py
File metadata and controls
3163 lines (2938 loc) · 163 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python3
# This is a long term issue and to maintain ~3k lines of code in one file is hard.
# The code it is hard to read: workflow, taskname, reqname. All are the same....
# pylint: disable=too-many-lines, too-many-arguments
# pylint: disable=too-many-nested-blocks, too-many-branches, too-many-locals
# pylint: disable=too-many-statements, too-many-instance-attributes, too-many-public-methods
# pylint: disable=line-too-long
# this code intenionally uses some GLOBAL statements
# pylint: disable=global-statement
# too many open statements here to worry about specifying and encoding
# pylint: disable=unspecified-encoding
# here we intentionally (!?) use snake-case instead of CamelCase
# pylint: disable=invalid-name
# this file is too long, too complex etc. etc. so no hope that we put
# a docstring on all functions
# pylint: disable=missing-function-docstring, missing-class-docstring
# and no hope that we cleanup things, until we rewrite it
# pylint: disable=W0511 # do not barf on "TODO" comments
# pylint: disable=consider-using-f-string
# pylint: disable=logging-fstring-interpolation
# yeah, we have a lot of try-except where we really want to "catch everything"
# pylint: disable=broad-except
# there are a few very intricate dictionaries where current code works. better not modify it to be pretty
# pylint: disable=consider-using-dict-items
"""
In the PostJob we read the FrameworkJobReport (FJR) to retrieve information
about the output files. The FJR contains information for output files produced
either via PoolOutputModule or TFileService, which respectively produce files
of type EDM and TFile. Below are examples of the output part of a FJR for each
of these two cases.
Example FJR['steps']['cmsRun']['output'] for an EDM file produced via
PoolOutputModule:
{
'temp_storage_site': 'T2_US_Nebraska',
'storage_site': 'T2_CH_CERN',
'pfn': 'dumper.root',
'checksums': {'adler32': '47d823c0', 'cksum': '640736586'},
'size': 3582626,
'local_stageout': True,
'direct_stageout': False,
'ouput_module_class': 'PoolOutputModule',
'runs': {'1': [666668, 666672, 666675, 666677, ...]},
'branch_hash': 'd41d8cd98f00b204e9800998ecf8427e',
'input': ['/store/mc/HC/GenericTTbar/GEN-SIM-RECO/CMSSW_5_3_1_START53_V5-v1/0011/404C27A2-22AE-E111-BC20-003048D373AE.root', ...],
'inputpfns': ['/cms/store/mc/HC/GenericTTbar/GEN-SIM-RECO/CMSSW_5_3_1_START53_V5-v1/0011/404C27A2-22AE-E111-BC20-003048D373AE.root', ...],
'lfn': '',
'pset_hash': 'eede9f2e261619d27929da51104cf9d7',
'catalog': '',
'module_label': 'o',
'guid': '48C5017F-A405-E411-9BE6-00A0D1E70940',
'events': 30650
}
Example FJR['steps']['cmsRun']['output'] for a TFile produced via TFileService:
{
'temp_storage_site': 'T2_US_Nebraska',
'storage_site': 'T2_CH_CERN',
'pfn': '/tmp/1882789.vmpsched/glide_Paza70/execute/dir_27876/histo.root',
'checksums': {'adler32': 'e8ed4a12', 'cksum': '2439186609'},
'size': 360,
'local_stageout': True,
'direct_stageout': False,
'fileName': '/tmp/1882789.vmpsched/glide_Paza70/execute/dir_27876/histo.root',
'Source': 'TFileService'
}
For other type of output files, we add in cmscp.py the basic necessary info
about the file to the FJR. The information is:
{
'temp_storage_site': 'T2_US_Nebraska',
'storage_site': 'T2_CH_CERN',
'pfn': 'out.root',
'size': 45124,
'local_stageout': True,
'direct_stageout': False
}
The PostJob and cmscp are the only places in CRAB3 where we should use camel_case instead of snakeCase.
"""
import os
import sys
import time
import json
import uuid
import pprint
import signal
import tarfile
import logging
import logging.handlers
import subprocess
import unittest
import datetime
import tempfile
import pickle
import traceback
import random
import shutil
import hashlib
from shutil import move
from http.client import HTTPException
import requests
from requests.auth import HTTPBasicAuth
import htcondor2 as htcondor
import classad2 as classad
from WMCore import Lexicon
from WMCore.DataStructs.LumiList import LumiList
from TaskWorker import __version__
from TaskWorker.Actions.RetryJob import RetryJob
from TaskWorker.Actions.RetryJob import JOB_RETURN_CODES
from ServerUtilities import TRANSFERDB_STATES, PUBLICATIONDB_STATES
from ServerUtilities import isFailurePermanent, mostCommon, encodeRequest, oracleOutputMapping
from ServerUtilities import getLock, getHashLfn
from RESTInteractions import CRABRest
ASO_JOB = None
G_JOB_REPORT_NAME = None
G_JOB_REPORT_NAME_NEW = None
G_ERROR_SUMMARY_FILE_NAME = "error_summary.json"
G_FJR_PARSE_RESULTS_FILE_NAME = "task_process/fjr_parse_results.txt"
G_FAKE_OUTDATASET = '/FakeDataset/fakefile-FakePublish-5b6a581e4ddd41b130711a045d5fecb9/USER'
def sighandler():
if ASO_JOB:
ASO_JOB.cancel()
signal.signal(signal.SIGHUP, sighandler)
signal.signal(signal.SIGINT, sighandler)
signal.signal(signal.SIGTERM, sighandler)
# ==============================================================================
class NotFound(Exception):
"""Not Found is raised only if there is no document found in RDBMS.
This makes PostJob to submit new transfer request to database."""
pass # pylint: disable=unnecessary-pass
DEFER_NUM = -1
def first_pj_execution():
return DEFER_NUM == 0
# ==============================================================================
def compute_outputdataset_name(primaryDS=None, username=None, publish_name=None, pset_hash=None, module_label=None):
"""
a standalone function to compute the name of DBS a dataset where to publish data
Name of output dataset is a property of each output file, so it is not
unique inside the PostJob and needs to be computed "as needed" both in ASO class
(to communicate it to Rucio) and in PostJob class (to upload metadata) so
we compute it here in a uniform way from informations available inside both places
see
https://twiki.cern.ch/twiki/bin/view/CMSPublic/Crab3DataHandling#Output_dataset_names_in_DBS
Convention for output dataset name:
/<primarydataset>/<username>-<tag>[-<module>]-<PSETHASH>/USER
Input args are all strings:
primaryDS: the primaryDataSet, usually the same as the input
username: could be username or group name
publish_name : value of the tm_publish_name field in TaskDB, derived from configuration parameter Data.outputDatasetTag
module : if multiple EMD outputs are presents, the module label of the PoolOutputModule,
otherwise missing
pset_hash : the 32-char PSet hash . This is not the output of edmConfigHash but taken from edmProvDump.
returns: DBS datasetname as a string
"""
if pset_hash:
# replace placeholder (32*'0') with correct pset hash for this file
publish_name = "%s-%s" % (publish_name.rsplit('-', 1)[0], pset_hash)
# #hash = pset_hash if pset_hash else 32*'0'
if module_label:
# insert output module name before the pset hash
left, right = publish_name.rsplit('-', 1)
publish_name = "%s-%s-%s" % (left, module_label, right)
outdataset = '/' + primaryDS + '/' + username + '-' + publish_name + '/USER'
return outdataset
# ==============================================================================
def prepareErrorSummary(logger, fsummary, job_id, crab_retry):
"""Parse the job_fjr file corresponding to the current PostJob. If an error
message is found, it is inserted into the error_summary.json file
"""
# The job_id and crab_retry variables in PostJob are integers, while here we
# mostly use them as strings.
job_id = str(job_id)
crab_retry = str(crab_retry)
error_summary = []
error_summary_changed = False
fjr_file_name = "job_fjr." + job_id + "." + crab_retry + ".json"
with open(fjr_file_name) as frep:
try:
rep = None
exit_code = -1
rep = json.load(frep)
if not 'exitCode' in rep:
raise Exception("'exitCode' key not found in the report") # pylint: disable=broad-exception-raised
exit_code = rep['exitCode']
if not 'exitMsg' in rep:
raise Exception("'exitMsg' key not found in the report") # pylint: disable=broad-exception-raised
exit_msg = rep['exitMsg']
if not 'steps' in rep:
raise Exception("'steps' key not found in the report") # pylint: disable=broad-exception-raised
if not 'cmsRun' in rep['steps']:
raise Exception("'cmsRun' key not found in report['steps']") # pylint: disable=broad-exception-raised
if not 'errors' in rep['steps']['cmsRun']:
raise Exception("'errors' key not found in report['steps']['cmsRun']") # pylint: disable=broad-exception-raised
if rep['steps']['cmsRun']['errors']:
# If there are errors in the job report, they come from the job execution. This
# is the error we want to report to the user, so write it to the error summary.
if len(rep['steps']['cmsRun']['errors']) != 1:
#this should never happen because the report has just one step, but just in case print a message
logger.info("More than one error found in report['steps']['cmsRun']['errors']. Just considering the first one.")
msg = "Updating error summary for jobid %s retry %s with following information:" % (job_id, crab_retry)
msg += "\n'exit code' = %s" % (exit_code)
msg += "\n'exit message' = %s" % (exit_msg)
msg += "\n'error message' = %s" % (rep['steps']['cmsRun']['errors'][0])
logger.info(msg)
error_summary = [exit_code, exit_msg, rep['steps']['cmsRun']['errors'][0]]
error_summary_changed = True
else:
# If there are no errors in the job report, but there is an exit code and exit
# message from the job (not post-job), we want to report them to the user only
# in case we know this is the terminal exit code and exit message. And this is
# the case if the exit code is not 0. Even a post-job exit code != 0 can be
# added later to the job report, the job exit code takes precedence, so we can
# already write it to the error summary.
if exit_code != 0:
msg = "Updating error summary for jobid %s retry %s with following information:" % (job_id, crab_retry)
msg += "\n'exit code' = %s" % (exit_code)
msg += "\n'exit message' = %s" % (exit_msg)
logger.info(msg)
error_summary = [exit_code, exit_msg, {}]
error_summary_changed = True
else:
# In case the job exit code is 0, we still have to check if there is an exit
# code from post-job. If there is a post-job exit code != 0, write it to the
# error summary; otherwise write the exit code 0 and exit message from the job
# (the message should be "OK").
postjob_exit_code = rep.get('postjob', {}).get('exitCode', -1)
postjob_exit_msg = rep.get('postjob', {}).get('exitMsg', "No post-job error message available.")
if postjob_exit_code != 0:
# Use exit code 90000 as a general exit code for failures in the post-processing step.
# The 'crab status' error summary should not show this error code,
# but replace it with the generic message "failed in post-processing".
msg = "Updating error summary for jobid %s retry %s with following information:" % (job_id, crab_retry)
msg += "\n'exit code' = 90000 ('Post-processing failed')"
msg += "\n'exit message' = %s" % (postjob_exit_msg)
logger.info(msg)
error_summary = [90000, postjob_exit_msg, {}]
else:
msg = "Updating error summary for jobid %s retry %s with following information:" % (job_id, crab_retry)
msg += "\n'exit code' = %s" % (exit_code)
msg += "\n'exit message' = %s" % (exit_msg)
logger.info(msg)
error_summary = [exit_code, exit_msg, {}]
error_summary_changed = True
except Exception as ex:
logger.info(str(ex))
# Write to the error summary that the job report is not valid or has no error
# message
if not rep:
exit_msg = 'Invalid framework job report. The framework job report exists, but it cannot be loaded.'
else:
exit_msg = rep['exitMsg'] if 'exitMsg' in rep else 'The framework job report could be loaded, but no error message was found there.'
msg = "Updating error summary for jobid %s retry %s with following information:" % (job_id, crab_retry)
msg += "\n'exit code' = %s" % (exit_code)
msg += "\n'exit message' = %s" % (exit_msg)
logger.info(msg)
error_summary = [exit_code, exit_msg, {}]
error_summary_changed = True
# Write the fjr report summary of this postjob to a file which task_process reads incrementally
if error_summary_changed:
with getLock(G_FJR_PARSE_RESULTS_FILE_NAME):
with open(G_FJR_PARSE_RESULTS_FILE_NAME, "a+") as fjr_parse_results:
# make sure the "json file" is written as multiple lines
fjr_parse_results.write(json.dumps({job_id : {crab_retry : error_summary}}) + "\n")
# Read, update and re-write the error_summary.json file
try:
error_summary_old_content = {}
if os.stat(G_ERROR_SUMMARY_FILE_NAME).st_size != 0:
fsummary.seek(0)
error_summary_old_content = json.load(fsummary)
except (IOError, ValueError):
# There is nothing to do if the error_summary file doesn't exist or is invalid.
# Just recreate it.
logger.info("File %s is empty, wrong or does not exist. Will create a new file." % (G_ERROR_SUMMARY_FILE_NAME))
error_summary_new_content = error_summary_old_content
error_summary_new_content[job_id] = {crab_retry : error_summary}
# If we have updated the error summary, write it to the json file.
# Use a temporary file and rename to avoid concurrent writing of the file.
if error_summary_changed:
logger.debug("Writing error summary file")
tempFilename = (G_ERROR_SUMMARY_FILE_NAME + ".%s") % os.getpid()
with open(tempFilename, "w") as tempFile:
json.dump(error_summary_new_content, tempFile)
move(tempFilename, G_ERROR_SUMMARY_FILE_NAME)
logger.debug("Written error summary file")
def fixUpTempStorageSite(logger=None, siteName=None):
"""
overrides the name of the site used for local stageout at WN in /store/temp/user
for cases where new need to direct Rucio to use a different site name for
LFN2PFN resolution in order to work around the fact that Rucio does not
implement full CMS TFC functionalities
see https://github.com/dmwm/CRABServer/issues/6285
:param logger: a logger object where to log messages
:param siteName: the name to ovrerride
:return: the new Site
"""
# pretend that CRAB output staged out at FNAL are at T3_US_FNALLPC
newName = siteName
if siteName == 'T1_US_FNAL_Disk':
newName = 'T3_US_FNALLPC'
if logger:
logger.warning('temp storage site changed from T1_US_FNAL_Disk to T3_US_FNALLPC ')
return newName
# ==============================================================================
class ASOServerJob():
"""
Class used to inject transfer requests to ASO database.
"""
def __init__(self, logger, aso_start_time, aso_start_timestamp, dest_site, source_dir,
dest_dir, source_sites, job_id, filenames, reqname, log_size,
log_needs_transfer, job_report_output, job_ad, crab_retry, retry_timeout,
job_failed, transfer_logs, transfer_outputs, rest_host, db_instance, pubname):
"""
ASOServerJob constructor.
"""
self.logger = logger
self.publishname = pubname
self.docs_in_transfer = None
self.crab_retry = crab_retry
self.retry_timeout = retry_timeout
self.job_id = job_id
self.dest_site = dest_site
self.source_dir = source_dir
self.dest_dir = dest_dir
self.job_failed = job_failed
self.source_sites = source_sites
self.filenames = filenames
self.reqname = reqname
self.job_report_output = job_report_output
self.log_size = log_size
self.log_needs_transfer = log_needs_transfer
self.transfer_logs = transfer_logs
self.transfer_outputs = transfer_outputs
self.job_ad = job_ad
self.failures = {}
self.aso_start_time = aso_start_time
self.aso_start_timestamp = aso_start_timestamp
proxy = os.environ.get('X509_USER_PROXY', None)
self.proxy = proxy
self.rest_host = rest_host
self.db_instance = db_instance
self.rest_url = rest_host + '/crabserver/' + db_instance + '/' # used in logging
self.found_doc_in_db = False
try:
self.crabserver = CRABRest(self.rest_host, proxy, proxy, retry=2, userAgent='CRABSchedd')
self.crabserver.setDbInstance(self.db_instance)
except Exception as ex:
msg = "Failed to connect to ASO database via CRABRest: %s" % (str(ex))
self.logger.exception(msg)
raise RuntimeError(msg) from ex
# = = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def save_docs_in_transfer(self):
""" The function is used to save into a file the documents we are transfering so
we do not have to query the DB to get this list every time the postjob is restarted.
"""
try:
filename = 'transfer_info/docs_in_transfer.%s.%d.json' % (self.job_id, self.crab_retry)
with open(filename, 'w') as fd:
json.dump(self.docs_in_transfer, fd)
except:
#Only printing a generic message, the full stacktrace is printed in execute()
self.logger.error("Failed to save the docs in transfer. Aborting the postjob")
raise
# = = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def load_docs_in_transfer(self):
""" Function that loads the object saved as a json by save_docs_in_transfer
"""
try:
filename = 'transfer_info/docs_in_transfer.%s.%d.json' % (self.job_id, self.crab_retry)
with open(filename) as fd:
self.docs_in_transfer = json.load(fd)
except Exception as ex: # pylint: disable=broad-except
#Only printing a generic message, the full stacktrace is printed in execute()
self.logger.error("Failed to load the docs in transfer. Aborting the postjob")
raise Exception from ex # pylint: disable=broad-exception-raised
# = = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def check_transfers(self):
self.load_docs_in_transfer()
failed_killed_transfers = []
done_transfers = []
starttime = time.time()
if self.aso_start_timestamp:
starttime = self.aso_start_timestamp
if first_pj_execution():
self.logger.info("====== Starting to monitor ASO transfers.")
try:
with getLock('get_transfers_statuses'):
# Get the transfer status in all documents listed in self.docs_in_transfer.
transfers_statuses = self.get_transfers_statuses()
except TransferCacheLoadError as e:
self.logger.info("Error getting the status of the transfers. Deferring PJ. Got: %s" % e)
return 4
msg = "Got statuses: %s; %.1f hours since transfer submit."
msg = msg % (", ".join(transfers_statuses), (time.time()-starttime)/3600.0)
self.logger.info(msg)
all_transfers_finished = True
doc_ids = [doc_info['doc_id'] for doc_info in self.docs_in_transfer]
for transfer_status, doc_id in zip(transfers_statuses, doc_ids):
# States to wait on.
if transfer_status in ['new', 'acquired', 'retry', 'unknown', 'submitted']:
all_transfers_finished = False
continue
# Good states.
if transfer_status in ['done']:
if doc_id not in done_transfers:
done_transfers.append(doc_id)
continue
# Bad states.
if transfer_status in ['failed', 'killed', 'kill']:
if doc_id not in failed_killed_transfers:
failed_killed_transfers.append(doc_id)
msg = "Stageout job (internal ID %s) failed with status '%s'." % (doc_id, transfer_status)
doc = self.load_transfer_document(doc_id)
if doc and ('transfer_failure_reason' in doc) and doc['transfer_failure_reason']:
# reasons: The transfer failure reason(s).
# app: The application that gave the transfer failure reason(s).
# E.g. 'aso' or '' (meaning the postjob). When printing the
# transfer failure reasons (e.g. below), print also that the
# failures come from the given app (if app != '').
# severity: Either 'permanent' or 'recoverable'.
# It is set by PostJob in the perform_transfers() function,
# when is_failure_permanent() is called to determine if a
# failure is permanent or not.
reasons, app, severity = doc['transfer_failure_reason'], 'aso', None
if reasons:
# Do not add this if FTS jobID is not available
if doc['fts_id']:
reasons += " [...CUT...] Full log at https://fts3-cms.cern.ch:8449/fts3/ftsmon/#/job/%s" % doc['fts_id']
if app:
msg += "\n-----> %s log start -----" % str(app).upper()
msg += "\n%s" % reasons
if app:
msg += "\n<----- %s log finish ----" % str(app).upper()
self.logger.error(msg)
else:
reasons, app, severity = "Failure reason unavailable.", None, None
self.logger.error(msg)
self.logger.warning("WARNING: no failure reason available.")
self.failures[doc_id] = {'reasons': reasons, 'app': app, 'severity': severity}
else:
exmsg = "Got an unknown transfer status: %s" % (transfer_status)
raise RuntimeError(exmsg)
if all_transfers_finished:
msg = "All transfers finished."
self.logger.info(msg)
if failed_killed_transfers:
msg = "There were %d failed/killed transfers:" % (len(failed_killed_transfers))
msg += " %s" % (", ".join(failed_killed_transfers))
self.logger.info(msg)
self.logger.info("====== Finished to monitor ASO transfers.")
return 1
self.logger.info("====== Finished to monitor ASO transfers.")
return 0
# If there is a timeout for transfers to complete, check if it was exceeded
# and if so kill the ongoing transfers. # timeout = -1 means no timeout.
if self.retry_timeout != -1 and time.time() - starttime > self.retry_timeout:
msg = "Post-job reached its timeout of %d seconds waiting for ASO transfers to complete." % (self.retry_timeout)
msg += " Will cancel ongoing ASO transfers."
self.logger.warning(msg)
self.logger.info("====== Starting to cancel ongoing ASO transfers.")
docs_to_cancel = {}
reason = "Cancelled ASO transfer after timeout of %d seconds." % (self.retry_timeout)
for doc_info in self.docs_in_transfer:
doc_id = doc_info['doc_id']
if doc_id not in done_transfers + failed_killed_transfers:
docs_to_cancel.update({doc_id: reason})
cancelled, not_cancelled = self.cancel(docs_to_cancel, max_retries=2)
for doc_id in cancelled:
failed_killed_transfers.append(doc_id)
app, severity = None, None
self.failures[doc_id] = {'reasons': reason, 'app': app, 'severity': severity}
if not_cancelled:
msg = "Failed to cancel %d ASO transfers: %s" % (len(not_cancelled), ", ".join(not_cancelled))
self.logger.error(msg)
self.logger.info("====== Finished to cancel ongoing ASO transfers.")
self.logger.info("====== Finished to monitor ASO transfers.")
return 2
self.logger.info("====== Finished to cancel ongoing ASO transfers.")
self.logger.info("====== Finished to monitor ASO transfers.")
return 1
# defer the execution of the postjob
return 4
# = = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def run(self):
"""
This is the main method in ASOServerJob. Should be called after initializing
an instance.
"""
with getLock('get_transfers_statuses'):
self.docs_in_transfer = self.inject_to_aso()
if self.docs_in_transfer is False:
exmsg = "Couldn't upload document to ASO database"
raise RuntimeError(exmsg)
if not self.docs_in_transfer:
self.logger.info("No files to transfer via ASO. Done!")
return 0
self.save_docs_in_transfer()
return 4
# = = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def recordASOStartTime(self):
# Add the post-job exit code and error message to the job report.
job_report = {}
try:
with open(G_JOB_REPORT_NAME) as fd:
job_report = json.load(fd)
except (IOError, ValueError):
pass
job_report['aso_start_timestamp'] = int(time.time())
job_report['aso_start_time'] = str(datetime.datetime.now())
with open(G_JOB_REPORT_NAME, 'w') as fd:
json.dump(job_report, fd)
# = = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def inject_to_aso(self):
"""
Inject documents to ASO database if not done by cmscp from worker node.
returs: docs_in_trasnfer a list of documents injected to ASO, possibly an emtpy list []
if e.g. the file was transferred directly from job to destination
It returns False in case of error. So the caller must be careful to consider
and empty list as success and only the False boolean as failure.
Clearly this could have been done better... e.g. raising an exception
"""
self.found_doc_in_db = False # This is only for oracle implementation and we want to check before adding new doc.
self.logger.info("====== Upload out file records to filetransfersdb table in CRAB DataBase.")
docs_in_transfer = []
output_files = []
now = str(datetime.datetime.now())
last_update = int(time.time())
file_output_type = None
if self.aso_start_timestamp is None or self.aso_start_time is None:
self.aso_start_timestamp = last_update
self.aso_start_time = now
msg = "Unable to determine ASO start time from job report."
msg += " Will use ASO start time = %s (%s)."
msg = msg % (self.aso_start_time, self.aso_start_timestamp)
self.logger.warning(msg)
role = str(self.job_ad['CRAB_UserRole'])
if str(self.job_ad['CRAB_UserRole']).lower() == 'undefined':
role = ''
group = str(self.job_ad['CRAB_UserGroup'])
if str(self.job_ad['CRAB_UserGroup']).lower() == 'undefined':
group = ''
with open('taskinformation.pkl', 'rb') as fd:
task = pickle.load(fd)
# Now need to loop on all output files in this job, build the outputdatset
# name for each (if applicable) and fill the file_info dictionary for each
# For reasons detailed below, need to repeat the loop 3 times
# Loop #1. Find information for all output files in this job
for output_module in self.job_report_output.values():
for output_file_info in output_module:
file_info = {}
file_info['pfn'] = str(output_file_info['pfn'])
file_info['checksums'] = output_file_info.get('checksums', {'cksum': '0', 'adler32': '0'})
file_info['outsize'] = output_file_info.get('size', 0)
file_info['direct_stageout'] = output_file_info.get('direct_stageout', False)
file_info['pset_hash'] = output_file_info.get('pset_hash', 32*'0')
file_info['module_label'] = output_file_info.get('module_label')
# assign filetype based on the classification made by CRAB Client
if file_info['pfn'] in task['tm_edm_outfiles']:
file_info['filetype'] = 'EDM'
elif file_info['pfn'] in task['tm_tfile_outfiles']:
file_info['filetype'] = 'TFILE'
elif file_info['pfn'] in task['tm_outfiles']:
file_info['filetype'] = 'FAKE'
else:
file_info['filetype'] = 'UNKNOWN'
output_files.append(file_info)
# Loop #2. find out if there are multiple output modules, since rule
# for constructing outputdataset name is different in that case, see
# https://twiki.cern.ch/twiki/bin/view/CMSPublic/Crab3DataHandling#Output_dataset_names_in_DBS
edm_file_count = 0
for file_info in output_files:
if file_info['filetype'] == 'EDM':
edm_file_count += 1
multiple_edm = edm_file_count > 1
# Loop #3. compute the output dataset name for each file and add it to file_info
for file_info in output_files:
# use common function with PostJob to ensure uniform dataset name
if file_info['filetype'] in ['EDM', 'DQM'] and task['tm_publication'] == 'T':
# group name overrides username when present:
username = self.job_ad['CRAB_UserHN']
if self.dest_dir.startswith('/store/group/') and self.dest_dir.split('/')[3]:
username = self.dest_dir.split('/')[3]
module_label = file_info.get('module_label') if multiple_edm else None
outdataset = compute_outputdataset_name(primaryDS=self.job_ad['CRAB_PrimaryDataset'],
username=username,
publish_name=self.publishname,
pset_hash=file_info['pset_hash'],
module_label=module_label)
else:
outdataset = G_FAKE_OUTDATASET
file_info['outputdataset'] = outdataset
found_log = False
# need to process output fils grouped by where they are
# files may be at different sites if local stageout failed for some
for source_site, filename in zip(self.source_sites, self.filenames):
# PREPARE TRANSFER INFO
# We assume that the first file in self.filenames is the logs archive.
if found_log:
if not self.transfer_outputs:
continue
source_lfn = os.path.join(self.source_dir, filename)
dest_lfn = os.path.join(self.dest_dir, filename)
file_type = 'output'
ifile = get_file_index(filename, output_files)
if ifile is None:
continue
size = output_files[ifile]['outsize']
checksums = output_files[ifile]['checksums']
# needs_transfer is False if and only if the file was staged out
# from the worker node directly to the permanent storage.
needs_transfer = not output_files[ifile]['direct_stageout']
file_output_type = output_files[ifile]['filetype']
outputdataset = output_files[ifile]['outputdataset']
else:
found_log = True
if not self.transfer_logs:
continue
source_lfn = os.path.join(self.source_dir, 'log', filename)
dest_lfn = os.path.join(self.dest_dir, 'log', filename)
file_type = 'log'
size = self.log_size
# a few fake values for log files for fields which will go in CRAB internal DB's
checksums = {'adler32': 'abc'} # this is not what FTS will use !
outputdataset = G_FAKE_OUTDATASET
# needs_transfer is False if and only if the file was staged out
# from the worker node directly to the permanent storage.
needs_transfer = self.log_needs_transfer
self.logger.info("Working on file %s" % (filename))
doc_id = getHashLfn(source_lfn)
doc_new_info = {'state': 'new',
'source': fixUpTempStorageSite(logger=self.logger, siteName=source_site),
'destination': self.dest_site,
'checksums': checksums,
'size': size,
'last_update': last_update,
'start_time': now,
'end_time': '',
'job_end_time': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
'retry_count': [],
'failure_reason': [],
# The 'job_retry_count' is used by ASO when reporting to dashboard,
# so it is OK to set it equal to the crab (post-job) retry count.
'job_retry_count': self.crab_retry,
'outputdataset': outputdataset,
}
direct = False
if not needs_transfer:
msg = "File %s is marked as having been directly staged out"
msg += " from the worker node to the permanent storage."
msg = msg % (filename)
self.logger.info(msg)
doc_new_info['state'] = 'done'
doc_new_info['end_time'] = now
direct = True
# SET THE PUBLLICATION FLAG
task_publish = int(self.job_ad['CRAB_Publish'])
publication_msg = None
if file_type == 'output':
publish = task_publish
if publish and self.job_failed:
publication_msg = "Disabling publication of output file %s,"
publication_msg += " because job is marked as failed."
publication_msg = publication_msg % (filename)
publish = 0
if publish and file_output_type != 'EDM':
publication_msg = "Disabling publication of output file %s,"
publication_msg += " because it is not of EDM type."
publication_msg = publication_msg % (filename)
publish = 0
else:
# This is the log file, so obviously publication should be turned off.
publish = 0
# FINALLY INJECT TO ASO (MODIFY EXISTING DOC OR CREATE NEW ONE)
# What does ASO needs to do for this file (transfer and/or publication) is
# saved in this list for the only purpose of printing a better message later.
aso_tasks = []
if needs_transfer:
aso_tasks.append("transfer")
if publish:
aso_tasks.append("publication")
delayed_publicationflag_update = False
if not (needs_transfer or publish or direct):
# This file doesn't need transfer nor publication, so we don't need to upload
# a document to ASO database.
if publication_msg:
self.logger.info(publication_msg)
msg = "File %s doesn't need transfer nor publication."
msg += " No need to inject a document to ASO."
msg = msg % (filename)
self.logger.info(msg)
else:
# If the file needs both transfer and publication we turn the publication flag off and
# we will update the database once the filemetadata are uploaded
# The other cases are:
# 1) Publication already off: we obviously do not need to do anything
# 2) Transfer not required (e.g.: direct stageout) but publication necessary:
# In this case we just upload the document now with publication requested
if needs_transfer and publish:
publish = 0
delayed_publicationflag_update = True
msg = "Temporarily disabling publication flag."
msg += "It will be updated once the transfer is done (and filemetadata uploaded)."
self.logger.info(msg)
# This file needs transfer and/or publication. If a document (for the current
# job retry) is not yet in ASO database, we need to do the upload.
needs_commit = True
try:
doc = self.getDocByID(doc_id)
# The document was already uploaded to ASO database. It could have been
# uploaded from the WN in the current job retry or in a previous job retry,
# or by the postjob in a previous job retry.
transfer_status = doc.get('state')
if not transfer_status:
# This means it is RDBMS database as we changed fields to match what they are :)
transfer_status = doc.get('transfer_state')
if doc.get('start_time') == self.aso_start_time or \
doc.get('start_time') == self.aso_start_timestamp:
# The document was uploaded from the WN in the current job retry, so we don't
# upload a new document. (If the transfer is done or ongoing, then of course we
# don't want to re-inject the transfer request. OTOH, if the transfer has
# failed, we don't want the postjob to retry it; instead the postjob will exit
# and the whole job will be retried).
msg = "LFN %s (id %s) is already in ASO database"
msg += " (it was injected from the worker node in the current job retry)"
msg += " and file transfer status is '%s'."
msg = msg % (source_lfn, doc_id, transfer_status)
self.logger.info(msg)
needs_commit = False
else:
# The document was uploaded in a previous job retry. This means that in the
# current job retry the injection from the WN has failed or cmscp did a direct
# stageout. We upload a new stageout request, unless the transfer is still
# ongoing (which should actually not happen, unless the postjob for the
# previous job retry didn't run).
msg = "LFN %s (id %s) is already in ASO database (file transfer status is '%s'),"
msg += " but does not correspond to the current job retry."
msg = msg % (source_lfn, doc_id, transfer_status)
if transfer_status in ['acquired', 'new', 'retry']:
msg += "\nFile transfer status is %s, which is not terminal ('done', 'failed' or 'killed')." % transfer_status
msg += " Will not inject a new document for the current job retry."
self.logger.info(msg)
needs_commit = False
else:
msg += " Will inject a new %s request." % (' and '.join(aso_tasks))
self.logger.info(msg)
msg = "Previous document: %s" % (pprint.pformat(doc))
self.logger.debug(msg)
except NotFound:
# The document was not yet uploaded to ASO database (if this is the first job
# retry, then either the upload from the WN failed, or cmscp did a direct
# stageout and here we need to inject for publication only). In any case we
# have to inject a new document.
msg = "LFN %s (id %s) is not in ASO database."
msg += " Will inject a new %s request."
msg = msg % (source_lfn, doc_id, ' and '.join(aso_tasks))
self.logger.info(msg)
if publication_msg:
self.logger.info(publication_msg)
input_dataset = str(self.job_ad['DESIRED_CMSDataset'])
if str(self.job_ad['DESIRED_CMSDataset']).lower() == 'undefined':
input_dataset = ''
primary_dataset = str(self.job_ad['CRAB_PrimaryDataset'])
if input_dataset:
input_dataset_or_primary_dataset = input_dataset
elif primary_dataset:
input_dataset_or_primary_dataset = '/'+primary_dataset # Adding the '/' until we fix ASO
else:
input_dataset_or_primary_dataset = '/'+'NotDefined' # Adding the '/' until we fix ASO
doc = {'_id': doc_id,
'inputdataset': input_dataset_or_primary_dataset,
'lfn': source_lfn,
'source_lfn': source_lfn,
'destination_lfn': dest_lfn,
'checksums': checksums,
'user': str(self.job_ad['CRAB_UserHN']),
'group': group,
'role': role,
'dbs_url': str(self.job_ad['CRAB_DBSURL']),
'workflow': self.reqname,
'jobid': self.job_id,
'publication_state': 'not_published',
'publication_retry_count': [],
'type': file_type,
}
# TODO: We do the following, only because that's what ASO does when a file has
# been successfully transferred. But this modified LFN makes no sence when it
# starts with /store/temp/user/, because the modified LFN is then
# /store/user/<username>.<hash>/bla/blabla, i.e. it contains the <hash>, which
# is never part of a destination LFN, but only of temp source LFNs.
# Once ASO uses the source_lfn and the destination_lfn instead of only the lfn,
# this should not be needed anymore.
if not needs_transfer:
doc['lfn'] = source_lfn.replace('/store/temp', '/store', 1)
except Exception as ex:
msg = "Error loading document from ASO database: %s" % (str(ex))
try:
msg += "\n%s" % (traceback.format_exc())
except AttributeError:
msg += "\nTraceback unavailable."
self.logger.error(msg)
return False
# If after all we need to upload a new document to ASO database, let's do it.
if needs_commit:
doc.update(doc_new_info)
doc['publish'] = publish
msg = "ASO job description: %s" % (pprint.pformat(doc))
self.logger.info(msg)
commit_result_msg = self.updateOrInsertDoc(doc, needs_transfer)
if 'error' in commit_result_msg:
msg = "Error injecting document to ASO database:\n%s" % (commit_result_msg)
self.logger.info(msg)
return False
# Record all files for which we want the post-job to monitor their transfer.
if needs_transfer:
doc_info = {'doc_id' : doc_id,
'start_time' : doc.get('start_time'),
'delayed_publicationflag_update' : delayed_publicationflag_update
}
docs_in_transfer.append(doc_info)
# Make sure that the fjr has the record of the ASO start transfer time stamp
self.recordASOStartTime()
self.logger.info("====== Finished to upload output files info to database.")
return docs_in_transfer
# = = = = = ASOServerJob = = = = = = = = = = = = = = = = = = = = = = = = = = = =
def getDocByID(self, doc_id):
docInfo = self.crabserver.get(api='fileusertransfers', data=encodeRequest({'subresource': 'getById', "id": doc_id}))
if docInfo and len(docInfo[0]['result']) == 1:
# Means that we have already a document in database!
docInfo = oracleOutputMapping(docInfo)
# Just to be 100% sure not to break after the mapping been added
if not docInfo:
self.found_doc_in_db = False
raise NotFound('Document not found in database')
# transfer_state and publication_state is a number in database.
docInfo[0]['transfer_state'] = TRANSFERDB_STATES[docInfo[0]['transfer_state']]
docInfo[0]['publication_state'] = PUBLICATIONDB_STATES[docInfo[0]['publication_state']]
# Also change id to doc_id
docInfo[0]['job_id'] = docInfo[0]['id']
self.found_doc_in_db = True # This is needed for further if there is a need to update doc info in DB
return docInfo[0]
self.found_doc_in_db = False
raise NotFound('Document not found in database!')
def updateOrInsertDoc(self, doc, toTransfer):
""" need a docstring here """
returnMsg = {}
if not self.found_doc_in_db:
# This means that it was not founded in DB and we will have to insert new doc
newDoc = {'id': doc['_id'],
'username': doc['user'],
'taskname': doc['workflow'],
'start_time': self.aso_start_timestamp,
'destination': doc['destination'],
'destination_lfn': doc['destination_lfn'],
'source': doc['source'],
'source_lfn': doc['source_lfn'],
'filesize': doc['size'],
'publish': doc['publish'],
'transfer_state': doc['state'].upper(),
'publication_state': 'NEW' if doc['publish'] else 'NOT_REQUIRED',
'job_id': doc['jobid'],
'job_retry_count': doc['job_retry_count'],
'type': doc['type'],
}
try:
self.crabserver.put(api='fileusertransfers', data=encodeRequest(newDoc))
except HTTPException as hte:
msg = "Error uploading document to database."
msg += " Transfer submission failed."
msg += "\n%s" % (str(hte.headers))
returnMsg['error'] = msg
updateDoc={'subresource':'updateTransfers', 'list_of_ids':[doc['_id']]}
updateDoc['list_of_transfer_state'] = [newDoc['transfer_state']]
# make sure that asoworker field in transfersdb is always filled, since
# otherwise whichever Publisher process looks first for things to do, grabs them
# https://github.com/dmwm/CRABServer/blob/8012e1297759bab620d89c8cb253f1832b4eb466/src/python/Databases/FileTransfersDB/Oracle/FileTransfers/FileTransfers.py#L27-L33
# but since PUT API ignores an asoworker argument when inserting
# https://github.com/dmwm/CRABServer/blob/43f6377447922d46353072e86d960e3c78967a17/src/python/CRABInterface/RESTFileUserTransfers.py#L122-L125
# we need to update the record after insetion with a POST, which requires list of ids and states
# https://github.com/dmwm/CRABServer/blob/43f6377447922d46353072e86d960e3c78967a17/src/python/CRABInterface/RESTFileTransfers.py#L131-L133
updateDoc['asoworker'] = 'schedd'
try:
self.crabserver.post(api='filetransfers', data=encodeRequest(updateDoc))
except HTTPException as hte:
msg = "Error uploading document to database."
msg += " Transfer submission failed."
msg += "\n%s" % (str(hte.headers))
returnMsg['error'] = msg
else:
# This means it is in database and we need only update specific fields.
newDoc = {'id': doc['id'],
'username': doc['username'],
'taskname': doc['taskname'],
'start_time': self.aso_start_timestamp,
'source': doc['source'],
'source_lfn': doc['source_lfn'],
'filesize': doc['filesize'],
'transfer_state': doc.get('state', 'NEW').upper(),
'publish': doc['publish'],
'publication_state': 'NEW' if doc['publish'] else 'NOT_REQUIRED',
'job_id': doc['jobid'],
'job_retry_count': doc['job_retry_count'],
'transfer_retry_count': 0,
'subresource': 'updateDoc'}
try:
self.crabserver.post(api='fileusertransfers', data=encodeRequest(newDoc))
except HTTPException as hte:
msg = "Error updating document in database."
msg += " Transfer submission failed."
msg += "\n%s" % (str(hte.headers))
returnMsg['error'] = msg
# Previous post resets asoworker to NULL. This is not good, so we set it again
# using a different API to update the transfersDB record
updateDoc = {}
updateDoc['list_of_ids'] = [newDoc['id']]
updateDoc['list_of_transfer_state'] = [newDoc['transfer_state']]
updateDoc['subresource'] = 'updateTransfers'
updateDoc['asoworker'] = 'schedd'
try:
self.crabserver.post(api='filetransfers', data=encodeRequest(updateDoc))
except HTTPException as hte:
msg = "Error uploading document to database."
msg += " Transfer submission failed."
msg += "\n%s" % (str(hte.headers))
returnMsg['error'] = msg
if toTransfer:
if not 'publishname' in newDoc:
newDoc['publishname'] = self.publishname
if not 'checksums' in newDoc:
newDoc['checksums'] = doc['checksums']
if not 'destination_lfn' in newDoc:
newDoc['destination_lfn'] = doc['destination_lfn']
if not 'destination' in newDoc:
newDoc['destination'] = doc['destination']
if not 'outputdataset' in newDoc:
newDoc['outputdataset'] = doc['outputdataset']
# Rucio ASO requires the "type" field to handle the log transfers.
if not 'type' in newDoc:
newDoc['type'] = doc['type']
with open('task_process/transfers.txt', 'a+') as transfers_file:
transfer_dump = json.dumps(newDoc)
transfers_file.write(transfer_dump+"\n")
if not os.path.exists('task_process/RestInfoForFileTransfers.json'):
#if not os.path.exists('task_process/rest_filetransfers.txt'):
restInfo = {'host':self.rest_host,
'dbInstance': self.db_instance,