-
Notifications
You must be signed in to change notification settings - Fork 129
Expand file tree
/
Copy pathprocess_media.py
More file actions
2440 lines (1963 loc) · 93.4 KB
/
process_media.py
File metadata and controls
2440 lines (1963 loc) · 93.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import uuid
from methods.regular.regular_api import *
try:
from methods.video.video_preprocess import Video_Preprocess
except Exception as exception:
print("Fail, Video_Preprocess: Could not import", exception)
import requests
import threading
import tempfile
import csv
import gc
import shutil
from urllib.parse import urlsplit
from random import randrange
from werkzeug.utils import secure_filename
from imageio import imwrite
from imageio import imread
from shared.image_tools import imresize
from shared.database.user import UserbaseProject
from shared.utils.memory_checks import check_and_wait_for_memory
from shared.database.input import Input
from shared.database.video.video import Video
from shared.database.image import Image
from shared.annotation import Annotation_Update
from shared.database.video.sequence import Sequence
from shared.utils.task import task_complete
from shared.data_tools_core import Data_tools
global Update_Input
from methods.input.input_update import Update_Input
from shared.database.model.model_run import ModelRun
from shared.database.audio.audio_file import AudioFile
from shared.database.model.model import Model
from shared.utils import job_dir_sync_utils
from shared.database.task.job.job import Job
from tenacity import retry, wait_random_exponential, stop_after_attempt, wait_fixed
from shared.database.text_file import TextFile
from shared.database.task.job.job_working_dir import JobWorkingDir
from shared.model.model_manager import ModelManager
import traceback
from shared.utils.source_control.file.file_transfer_core import perform_sync_events_after_file_transfer
from methods.sensor_fusion.sensor_fusion_file_processor import SensorFusionFileProcessor
from methods.geotiff.GeoTiffProcessor import GeoTiffProcessor
import numpy as np
from shared.regular.regular_log import log_has_error
import os
from shared.feature_flags.feature_checker import FeatureChecker
from shared.utils.singleton import Singleton
from methods.text_data.text_tokenizer import TextTokenizer
from shared.utils.instance.transform_instance_utils import rotate_instance_dict_90_degrees
from shared.ingest.allowed_ingest_extensions import images_allowed_file_names, \
sensor_fusion_allowed_extensions, \
geo_tiff_allowed_extensions, \
videos_allowed_file_names, \
text_allowed_file_names, \
audio_allowed_file_names, \
csv_allowed_file_names, \
existing_instances_allowed_file_names
from shared.ingest.prioritized_item import PrioritizedItem
data_tools = Data_tools().data_tools
STOP_PROCESSING_DATA = False
class Process_Media():
"""
Basic process of using this is to call init with session and:
* project or project_id
* input or input_id
Generally expect
* prefer object (ie project), the id option (ie project_id) is for threading
* Input class to already be "setup" with useful info before calling this
* That the file is "available" either in memory as a numpy array
or in a temporary directory. if the file is from a url
we attempt to download it first.
Expected call methods
* main_entry()
* process_one_image_file()
* process_url()
On recursion and threads
* we expect main_entry() can be called recursively, for example
from process_csv
* we can create instances of Process_Media to use with threads
or for recursive
EXAMPLE use:
process_media = Process_Media(
session = session,
member = None,
input_id = input_id)
process_media.main_entry()
(See upload.py)
main_entry is generally when the file type or method is unknown
for example, from the UI where the user can upload many types
the other methods are meant to be used in conjunction with other classes
ie process_one_image_file() for use with video
or process_url for use with prediction
As we progress from just "get the files in there"
to "do stuff with the files" and "pull from API / other sources" this
class can expand.
CAUTION
input_id is not guaranteed to be available
and if Input object is passed of some types (currently == frame),
then we do NOT commit it and do not expect
input.id to be available either.
"""
def __init__(self,
session,
member = None,
project: Project = None,
project_id: int = None,
org = None,
raw_file = None,
input_id: int = None,
input: Input = None,
item = None
):
# Assign default item since we currently
# define defaults for paremters below in the item class
# input_id is not used from item at this stage
# which is confusing but works for now I guess?
# (but is required on PrioritizedItem construction.) sigh
# This is more of a "hold over from switching / immediate mode")
if item is None:
item = PrioritizedItem(
priority = 100,
input_id = None)
self.video_id = item.video_id
self.video_parent_file = item.video_parent_file
self.file_is_numpy_array = item.file_is_numpy_array
self.raw_numpy_image = item.raw_numpy_image
self.frame_number = item.frame_number
self.total_frames = item.total_frames
self.num_frames_to_update = item.num_frames_to_update
self.frame_completion_controller = item.frame_completion_controller
self.raw_file = raw_file
self.input = input
self.input_id = input_id
# file is accessible through self.input.file
self.session = session
self.sequence_map = {}
self.member = member
self.project = project # This get set from self.input in main_entry()
self.project_id = project_id
self.org = org
# It creates it's own log for each run...
self.log = regular_log.default()
# Question, if not running from http do we still follow same log pattern?
# If so we should build log from Regular method
# This also feels confused with the input.status and input.status text
# TODO get limit from org/project/user etc.
self.directory_file_count_limit = 90000
# for directory_id see self.input.directory_id
# Question, for video, do we want to record
# A per frame input class???
# That could be confusing.
self.allow_csv = True
if self.input and not isinstance(self.input, Input):
raise Exception("input should be class Input object. Use input_id for ints")
@retry(wait = wait_random_exponential(multiplier = .5, max = 120),
stop = stop_after_attempt(30))
def get_input_with_retry(self):
"""
"""
if self.input:
return
try:
logger.info(f"Getting input {self.input_id}")
self.input = Input.get_by_id(
session = self.session,
id = self.input_id)
if self.input is None:
raise Exception
except Exception as e:
logger.warn(f"Unable to fetch input ID: {self.input_id}")
self.attempt_reinsert_input()
@retry(wait = wait_random_exponential(multiplier = 2, max = 1024),
stop = stop_after_attempt(50))
def attempt_reinsert_input(self):
"""
"""
logger.info("reinsert attempt")
input = Input.get_by_id(
session = self.session,
id = self.input_id)
if not input: raise Exception
if not input.retry_count:
input.retry_count = 1
else:
input.retry_count += 1
if input.retry_count < 3:
input.status = 'retry/failed_to_fetch_input'
input.processing_deferred = True
logger.warn(f"Reset procesing status on: {self.input_id}")
else:
input.status = 'failed'
input.description = "failed_to_fetch Adjust Database configs database may be overloaded."
def main_entry(self):
"""
input.status_text has error message if applicable
"""
start_time = time.time()
check_and_wait_for_memory(memory_limit_float = 85.0)
self.get_input_with_retry()
if self.input is None:
logger.error("input is None")
return
self.input.time_last_attempted = start_time
if not self.input.update_log:
self.input.update_log = regular_log.default()
self.project = self.input.project
"""
Context that for detached from session processing
(ie media_type == 'frame'), if we try to access input.project.id
that will do a refresh from the session which will fail
because we are purposely not attaching input
"""
self.project_id = self.input.project_id
if self.project is None:
self.project = Project.get_by_id(
session = self.session,
id = self.project_id)
if self.input.allow_csv:
self.allow_csv = self.input.allow_csv
self.input.status = "processing"
if self.input.mode == "flow":
# Get directory from flow
self.input.directory = self.input.action_flow.directory
self.try_to_commit()
if self.input.mode == "copy_file":
self.__start_copy_file()
# Important!
# We are exiting main loop here
return
if self.input.mode == "update":
self.__update_existing_file(file = self.input.file)
# Important!
# We are exiting main loop here
if len(self.log["error"].keys()) >= 1:
logger.error(f"Error updating instances: {str(self.log['error'])}")
return
return
if self.input.mode == "update_with_existing": # existing instances
self.__update_existing_file(file = self.input.file,
init_existing_instances = True)
# Important!
# We are exiting main loop here
if len(self.log["error"].keys()) >= 1:
logger.error(f"Error updating instances: {str(self.log['error'])}")
return
return
if self.input.type not in ["from_url", "from_video_split"]:
# For those types we populate the filename from URL later in process
# `from_resumable` (eg from UI), we already have the name, so do check here
# `from_url` we "build" name later, so if we check here original_filename is None.
if self.__file_does_not_exist_in_target_directory() is False:
return False
if self.input.media_type != "frame":
"""
Causing Instance <Project at 0x> is not bound to a Session; attribute refresh operation cannot proceed (Background on this error at: http://sqlalche.me/e/bhk3)
if we try to get it on a "Detached" session
We don't appear to need the working dir for frame processing
This block should probably be it's own function
"""
# Why are we getting working_dir here?
# Why not just use it from input.directory?
self.working_dir = WorkingDir.get_with_fallback(
session = self.session,
project = self.project,
directory_id = self.input.directory_id)
# Careful to not leak directory information through this
if self.working_dir is False or self.working_dir is None:
self.input.status = "failed"
self.input.status_text = "Invalid directory permission."
self.input.invalid_directory_permission = True
return False
self.working_dir_id = self.working_dir.id # cache for threading workaround
if self.input.directory_id is None:
self.input.directory_id = self.working_dir_id
check_limits_result = self.check_limits()
if check_limits_result is False:
return False
if self.input.type in ["from_resumable",
"from_url",
"from_video_split",
"from_sensor_fusion_json",
"from_geo_tiff",
"from_geo_tiff_json",
"ui_wizard"]:
download_result = self.download_media()
# careful we expect keys available here ie
# log['error']['status_text']
if len(self.log["error"].keys()) >= 1:
logger.error(f"Error downloading media: {str(self.log['error'])}")
self.input.update_log['error'] = self.log['error']
return False
if self.input.media_type == "video":
"""
If video_was_split is True
We return early becasue we don't want
To actually process the "root" video file.
beyond what the split into clips function does
I'm a little wary of calling it "was_split"
as that operation could fail... maybe another name...
Else:
We want to try processing the file by itself
This whole chain still feels waaay to complicated...
but we need an exit condition potentially here right...
Still something that could be improved...
"""
self.input.video_was_split = self.split_video_into_clips()
if self.input.video_was_split is True:
self.try_to_commit()
return True
###
### Main
self.check_free_tier_limits()
if log_has_error(self.log):
return False
self.route_based_on_media_type()
###
self.try_to_commit()
if not self.input:
return True
if log_has_error(self.log):
return False
process_instance_result = self.process_existing_instance_list()
self.may_attach_to_job()
self.update_jobs_with_attached_dirs()
end_time = time.time()
self.log['info']['run_time'] = end_time - start_time
return True
def __file_does_not_exist_in_target_directory(self):
"""
Context
1) A user uploading same data twice.
2) We assume this doesn't fire on "update" approaches (based on where it
is in process_media execution
Overrides options
1) Set input.allow_duplicates = True
2) Remove the file from the directory and retry (can use literal retry process,
or just a fresh input)
3) Target a different dataset
"""
if self.input.allow_duplicates is True:
return True
existing_file_list = WorkingDirFileLink.file_list(
session = self.session,
working_dir_id = self.input.directory_id,
original_filename = self.input.original_filename,
original_filename_match_type = None
)
if existing_file_list:
self.input.status = "failed"
self.input.status_text = f"Filename {self.input.original_filename} Already Exists in Dir. Existing ID is:{str(existing_file_list[0].id)}"
self.input.update_log = {'error': {
'existing_file_id': existing_file_list[0].id}
}
return False
return True
def __copy_video(self):
logger.debug(f"Copying Video {self.input.file_id}")
self.input.newly_copied_file = File.copy_file_from_existing(
session = self.session,
working_dir = self.input.directory,
orginal_directory_id = self.input.source_directory_id,
existing_file = self.input.file,
copy_instance_list = False,
log = self.input.update_log,
add_link = True,
remove_link = False,
flush_session = True,
defer_copy = False,
batch_id = self.input.batch_id
)
if self.input.copy_instance_list is False:
# For declaring success on the video file when no frames are available (i.e no instances)
self.declare_success(input = self.input)
return
global New_video # important
from methods.video.video import New_video
# COPY INSTANCES, Sequences, and Frames
new_video = New_video(
session = self.session,
project = self.project,
input = self.input
)
new_video.add_sequence_map_to_input(
source_video_parent_file = self.input.file,
destination_video_parent_file_id = self.input.newly_copied_file.id)
# We commit session to ensure that when pushing the frames to queue below (that will be on multiple threads)
# they have access to the Sequences ID's on the DB.
self.try_to_commit()
# Push all the frames to the Queue
frames_list = new_video.push_frames_for_copy_to_queue(
source_video_parent_file_id = self.input.file_id,
destination_video_parent_file_id = self.input.newly_copied_file.id)
if len(frames_list) == 0:
# For declaring success on the video file when no frames are available (i.e no instances)
self.declare_success(input = self.input)
def __copy_frame(self):
file = File.get_by_id(self.session, self.input.file_id)
# The frame input has copies of this so we don't have to get parent
new_file = File.copy_file_from_existing(
session = self.session,
working_dir = None, # avoid detached session
working_dir_id = self.input.directory_id,
existing_file = file,
copy_instance_list = self.input.copy_instance_list,
add_link = self.input.add_link, # Not sure about making this dynamic
remove_link = self.input.remove_link,
sequence_map = self.input.sequence_map,
previous_video_parent_id = self.input.parent_file_id,
flush_session = True,
)
self.frame_completion_controller.mark_frame_complete(self.frame_number)
# Update Percent of parent input
self.video_status_updates()
# Should this be part of "declare success?
# Or use get_parent_with_retry()
if self.frame_number == self.total_frames:
# Perform sync operations
parent_input = self.input.parent_input(self.session)
perform_sync_events_after_file_transfer(
session = self.session,
source_directory = parent_input.source_directory,
destination_directory = parent_input.directory,
log = self.log,
log_sync_events = True,
transfer_action = 'copy',
file = parent_input.file,
member = self.member,
new_file = parent_input.newly_copied_file,
defer_sync = False,
sync_event_manager = None,
)
return new_file
def __copy_file(self):
logger.debug(f"Copying file type: {self.input.media_type} {self.input.file_id}")
self.input.newly_copied_file = File.copy_file_from_existing(
session = self.session,
working_dir = None,
working_dir_id = self.input.directory_id,
existing_file = self.input.file,
copy_instance_list = self.input.copy_instance_list,
add_link = self.input.add_link,
remove_link = self.input.remove_link,
sequence_map = None,
previous_video_parent_id = None,
flush_session = True,
)
if self.input.file.type == 'compound':
child_files = self.input.file.get_child_files(session = self.session)
for child in child_files:
File.copy_file_from_existing(
session = self.session,
working_dir = None,
working_dir_id = self.input.directory_id,
existing_file = child,
copy_instance_list = self.input.copy_instance_list,
add_link = self.input.add_link,
remove_link = self.input.remove_link,
sequence_map = None,
previous_video_parent_id = None,
flush_session = True,
new_parent_id = self.input.newly_copied_file.id
)
self.declare_success(self.input)
# Perform sync operations
source_dir = WorkingDir.get_by_id(self.session, self.input.source_directory_id)
dest_dir = WorkingDir.get_by_id(self.session, self.input.directory_id)
perform_sync_events_after_file_transfer(
session = self.session,
source_directory = None, # We just provide destination directory to attach incoming dir to task.
destination_directory = dest_dir,
log = self.log,
log_sync_events = True,
transfer_action = 'copy',
file = self.input.file,
member = self.member,
new_file = self.input.newly_copied_file,
defer_sync = False,
sync_event_manager = None,
)
return self.input.newly_copied_file
def __start_copy_file(self):
# Prep work
if self.input.media_type == "video":
logger.info(f"Starting Sequenced Copy from File {self.input.file.id}")
# Get the sequence_map.
self.__copy_video()
return
elif self.input.media_type == 'frame':
self.__copy_frame()
else:
self.__copy_file()
def __update_existing_file(self,
file,
init_existing_instances = False):
# Prep work
if file:
self.input.media_type = file.type
# not sure about this, we make assumptions about file type downstream
if self.input.media_type is None:
# Even if the file will *eventually* be valid, if we don't know the media type
# then we can't reasonably process the update.
# https://github.com/diffgram/training_data/pull/269
self.log['error']['media_type'] = "media_type undefined. This may be a timing issue. \
Try including instances in single request, or waiting for file to finish processing before sending."
return False
try:
self.populate_new_models_and_runs()
# TODO what other input keys do we need to update (ie this assumes images etc)
if file and self.input.media_type == "video":
logger.debug("Parent Video File Update")
self.__update_existing_video() # Maybe should be a strategy operation
return
elif file and self.input.media_type == 'text':
self.process_existing_instance_list(
init_existing_instances = init_existing_instances)
else:
process_instance_result = self.process_existing_instance_list(
init_existing_instances = init_existing_instances)
logger.debug(("Image or Frame File Update"))
if file and file.frame_number:
logger.info(f"{file.frame_number}, {self.input.video_parent_length}")
if process_instance_result is True and self.input.media_type == 'frame':
self.__update_parent_video_at_last_frame()
# TODO first video case (otherwise then goes in frame processing flow)
if self.input.media_type in ["image", "text"] and self.input.status != "failed":
self.declare_success(input = self.input)
except Exception as e:
logger.error(traceback.format_exc())
self.input.status = 'failed'
self.input.description = str(e)
self.input.update_log = {'error': traceback.format_exc()}
self.log['error']['update_instance'] = str(e)
self.log['error']['traceback'] = traceback.format_exc()
if self.input.media_type == 'frame':
self.proprogate_frame_instance_update_errors_to_parent(self.log)
def __update_parent_video_at_last_frame(self):
# Last frame
# In the update context input.video_parent_length = self.highest_frame_encountered
if self.input.frame_number == self.input.video_parent_length:
# Assumes frames are put in priority queue in frame order.
# See determine_unique_sequences_from_external() for how this is derived
logger.info("Last Frame Update")
time.sleep(4) # in case this worker is ahead of another
logger.info("Updating Sequences")
self.update_sequences()
# We assume if we get to this stage it was successful?
parent_input = self.get_parent_input_with_retry()
self.__toggle_flags_from_input(input = parent_input)
self.declare_success(input = parent_input)
self._add_input_to_session(parent_input)
def __toggle_flags_from_input(self, input: Input):
if not input.task:
return
if input.task_action == 'complete_task':
result, new_file = task_complete.task_complete(
session = self.session,
task = input.task,
new_file = input.file,
project = self.project,
member = self.member)
def __update_existing_video(self):
global New_video # important
from methods.video.video import New_video
if not self.input.frame_packet_map:
self.input.update_log['error'][
'frame_packet_map'] = 'Please provide a frame packet map. It cannot be empty.'
self.input.status = 'failed'
self.input.status_text = "Please provide a frame packet map. It cannot be empty.'"
self._add_input_to_session(self.input)
self.try_to_commit()
return
# TODO "new video" name makes less sense in new context
new_video = New_video(
session = self.session,
project = self.project,
input = self.input
)
try:
new_video.update_from_frame_packet_map()
except Exception as e:
traceback.format_exc()
self.input.update_log['error']['update_from_frame_packet_map'] = str(e)
self.input.status = 'failed'
self._add_input_to_session(self.input)
logger.error(str(e))
if len(self.input.update_log["error"].keys()) >= 1:
self.input = self.update_video_status_when_update_has_errors(input = self.input) # 'parent' here not frame
self._add_input_to_session(self.input)
# We should not declare success here, it's only started processing
# TODO review if another state to put here like is_processing
def update_video_status_when_update_has_errors(self, input):
input.status = "failed"
input.status_text = "See Update Log"
input.update_log['last_updated'] = str(time.time()) # to make sure we trigger update
return input
def split_video_into_clips(self):
"""
We expect video prepocess to handle clean up of clips it creates,
the clean up at the end here is for the original full video file.
We assume if this succeeds we return True
It may return False, ie if the video is shorter
then than the proposed split, in which case we just want to process
it as a normal frame right?
We assume if we return early ("not ok to split")
there are no side effects, ie from doign the check.
"""
global Video_Preprocess # important
try:
video_preprocess = Video_Preprocess(
session = self.session,
parent_input = self.input
)
except Exception as exception:
from methods.video.video_preprocess import Video_Preprocess
video_preprocess = Video_Preprocess(
session = self.session,
parent_input = self.input
)
is_ok_to_split = video_preprocess.check_ok_to_split()
if is_ok_to_split is False:
return False
video_preprocess.split_video()
self.clean_up_temp_dir_on_thread()
return True
def create_task_on_job_sync_directories(self):
"""
Given a file, attach create tasks from those files attached to the given
job.
:param session:
:param job:
:param files:
:return:
"""
job_sync_manager = job_dir_sync_utils.JobDirectorySyncManager(
session = self.session,
job = self.input.job,
log = self.log
)
return job_sync_manager.create_task_from_file(self.input.file)
def update_jobs_with_attached_dirs(self):
# From the file directory, get all related jobs.
# TODO confirm how this works for pre processing case
# Whitelist for allow types here, otherwise it opens a ton of connections while say processing frames
if self.input.media_type not in ['image', 'video', 'sensor_fusion', 'text', 'audio', 'geospatial']:
return
file = self.input.file
if self.input.file.parent_id is not None:
# Avoid adding child of a compound file
file = File.get_by_id(session = self.session, file_id = self.input.file.parent_id)
directory = self.input.directory
if directory is None:
directory = self.project.directory_default
logger.info(f"[update_jobs_with_attached_dirs] Default Dir Used : {self.project.directory_default}")
jobs = JobWorkingDir.list(
session = self.session,
sync_type = 'sync',
class_to_return = Job,
working_dir_id = directory.id
)
for job in jobs:
job_sync_dir_manger = job_dir_sync_utils.JobDirectorySyncManager(
session = self.session,
job = job,
log = self.log
)
job_sync_dir_manger.create_file_links_for_attached_dirs(
sync_only = True,
create_tasks = True,
file_to_link = file,
file_to_link_dataset = self.working_dir,
related_input = self.input,
member = self.member
)
job.update_file_count_statistic(session = self.session)
# Refresh the task stat count to the latest value.
# We want to do this because there may be cases where 2 frames updated the task count
# concurrently and that may lead to a bad end result of the counter.
# Commit any update job/task data.
job.refresh_stat_count_tasks(self.session)
self.try_to_commit()
def may_attach_to_job(self):
if not self.input or not self.input.file:
return
if not self.input.job_id:
return
# We could use Job_permissions.check_job_after_project_already_valid()
# But I'm not sure if raising in a thread is a good idea.
Job_permissions.check_job_after_project_already_valid(
job = self.input.job,
project = self.project)
result, log = WorkingDirFileLink.file_link_update(
session = self.session,
add_or_remove = "add",
incoming_directory = self.input.directory,
directory = self.input.job.directory, # difference is this is from job
file_id = self.input.file.id,
job = self.input.job
)
# If job is not completed we should be creating tasks for the new files attached.
if self.input.job.status in ['active', 'in_review']:
self.create_task_on_job_sync_directories()
def process_geo_tiff_file(self):
geotiff_processor = GeoTiffProcessor(
session = self.session,
input = self.input,
log = self.log
)
try:
result, self.log = geotiff_processor.process_geotiff_data()
if log_has_error(self.log):
self.input.status = 'failed'
logger.error(f"Geotiff file failed to process. Input {self.input.id}")
logger.error(self.log)
self.input.update_log = self.log
return
self.declare_success(self.input)
except Exception as e:
logger.error(f"Exception on process geo data: {traceback.format_exc()}")
self.log['error']['geo_data'] = traceback.format_exc()
self.input.status = 'failed'
self.input.update_log = self.log
def process_sensor_fusion_json(self):
sf_processor = SensorFusionFileProcessor(
session = self.session,
input = self.input,
log = self.log
)
try:
result, self.log = sf_processor.process_sensor_fusion_file_contents()
if log_has_error(self.log):
self.input.status = 'failed'
logger.error(f"Sensor fussion file failed to process. Input {self.input.id}")
logger.error(self.log)
self.input.update_log = self.log
return
self.declare_success(self.input)
except Exception as e:
logger.error(f"Exception on process sensor fusion: {traceback.format_exc()}")
self.log['error']['process_sensor_fusion_json'] = traceback.format_exc()
self.input.status = 'failed'
self.input.update_log = self.log
def check_free_tier_limits(self):
if self.input.media_type not in ['image', 'text', 'sensor_fusion', 'video']:
return
directory = self.input.directory
user_id = None
user = None
if self.input.member_created:
user = self.input.member_created.user
if user:
user_id = user.id
feature_checker = FeatureChecker(
session = self.session,
user = user,
project = self.input.project
)
if self.input.media_type == 'video':
max_file_count = feature_checker.get_limit_from_plan('MAX_VIDEOS_PER_DATASET')
elif self.input.media_type == 'image':
max_file_count = feature_checker.get_limit_from_plan('MAX_VIDEOS_PER_DATASET')
elif self.input.media_type == 'text':
max_file_count = feature_checker.get_limit_from_plan('MAX_TEXT_FILES_PER_DATASET')
elif self.input.media_type == 'sensor_fusion':
max_file_count = feature_checker.get_limit_from_plan('MAX_SENSOR_FUSION_FILES_PER_DATASET')
else:
return
# Small optimization, avoid querying DB if no check is required (ie Premium Plans)
if max_file_count is None:
return
file_count_dir = WorkingDirFileLink.file_list(
session = self.session,
working_dir_id = directory.id,
limit = None,
counts_only = True
)
logger.info('Free tier check for user: {} DIR[{}] File count: {}'.format(user_id,
directory.id,
file_count_dir))
if max_file_count is not None and max_file_count <= file_count_dir:
message = 'Free Tier Limit Reached - Max Files Allowed: {}. But Directory with ID: {} has {}'.format(
max_file_count,
directory.id,
file_count_dir)
logger.error(message)
self.log['error']['free_tier_limit'] = message
self.log['info']['feature_checker'] = feature_checker.log
self.input.status = 'failed'
self.input.description = message
self.input.update_log = self.log
return False
def route_based_on_media_type(self):
"""
Route to function based on self.input.media_type
"""
strategy_operations = {
"image": self.process_one_image_file,
"text": self.process_one_text_file,
"audio": self.process_one_audio_file,
"frame": self.process_frame,
"sensor_fusion": self.process_sensor_fusion_json,
"geo_tiff": self.process_geo_tiff_file,
"video": self.process_video,
"csv": self.process_csv_file
}
operation = None