|
6 | 6 | # modify it under the terms of the GNU Lesser General Public License |
7 | 7 | # as published by the Free Software Foundation, either version 3 |
8 | 8 | # of the License, or (at your option) any later version. |
9 | | -# |
10 | | -# This library is distributed in the hope that it will be useful, |
11 | | -# but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
13 | | -# Lesser General Public License for more details. |
14 | | -# |
15 | | -# You should have received a copy of the GNU Lesser General Public |
16 | | -# License along with this library. If not, see <https://www.gnu.org/licenses/>. |
17 | 9 |
|
18 | 10 | import os |
19 | 11 | import unittest |
| 12 | +from abc import abstractmethod |
20 | 13 | from subprocess import TimeoutExpired |
21 | 14 | from time import perf_counter |
22 | 15 | from typing import Any |
23 | 16 |
|
24 | 17 | from fluster.decoder import Decoder |
25 | 18 | from fluster.test_vector import TestVector, TestVectorResult |
26 | | -from fluster.utils import normalize_path |
| 19 | +from fluster.utils import compare_byte_wise_files, normalize_path |
27 | 20 |
|
28 | 21 |
|
29 | 22 | class Test(unittest.TestCase): |
@@ -51,57 +44,160 @@ def __init__( |
51 | 44 | self.timeout = timeout |
52 | 45 | self.keep_files = keep_files |
53 | 46 | self.verbose = verbose |
54 | | - setattr(self, test_vector.name, self._test) |
| 47 | + self._keep_files_during_test = False |
| 48 | + self.test_vector_result = self.test_suite.test_vectors[self.test_vector.name] |
| 49 | + |
| 50 | + # Set up the test method |
| 51 | + setattr(self, test_vector.name, self._test_wrapper) |
55 | 52 | super().__init__(test_vector.name) |
56 | 53 |
|
| 54 | + # Initialize file paths |
| 55 | + self._initialize_file_paths() |
| 56 | + |
| 57 | + def _initialize_file_paths(self) -> None: |
| 58 | + """Initialize input and output file paths.""" |
| 59 | + self.output_filepath = normalize_path(os.path.join(self.output_dir, self.test_vector.name + ".out")) |
| 60 | + |
| 61 | + input_dir = os.path.join(self.resources_dir, self.test_suite.name) |
| 62 | + |
| 63 | + if not self.test_suite.is_single_archive: |
| 64 | + input_dir = os.path.join(input_dir, self.test_vector.name) |
| 65 | + |
| 66 | + self.input_filepath = normalize_path(os.path.join(input_dir, self.test_vector.input_file)) |
| 67 | + |
| 68 | + def _execute_decode(self) -> str: |
| 69 | + """Execute the decoder and return the result.""" |
| 70 | + keep_files_for_decode = self._keep_files_during_test or self.keep_files |
| 71 | + |
| 72 | + return self.decoder.decode( |
| 73 | + self.input_filepath, |
| 74 | + self.output_filepath, |
| 75 | + self.test_vector.output_format, |
| 76 | + self.timeout, |
| 77 | + self.verbose, |
| 78 | + keep_files_for_decode, |
| 79 | + ) |
| 80 | + |
| 81 | + def _cleanup_if_needed(self) -> None: |
| 82 | + """Clean up output files if keep_files is False.""" |
| 83 | + if not self.keep_files and os.path.exists(self.output_filepath): |
| 84 | + os.remove(self.output_filepath) |
| 85 | + |
| 86 | + def _test_wrapper(self) -> None: |
| 87 | + try: |
| 88 | + self._test() |
| 89 | + finally: |
| 90 | + self._cleanup_if_needed() |
| 91 | + |
57 | 92 | def _test(self) -> None: |
| 93 | + """Execute the test and process results.""" |
58 | 94 | if self.skip: |
59 | | - self.test_suite.test_vectors[self.test_vector.name].test_result = TestVectorResult.NOT_RUN |
| 95 | + self.test_vector_result.test_result = TestVectorResult.NOT_RUN |
60 | 96 | return |
61 | 97 |
|
62 | | - output_filepath = os.path.join(self.output_dir, self.test_vector.name + ".out") |
63 | | - |
64 | | - input_filepath = os.path.join( |
65 | | - self.resources_dir, |
66 | | - self.test_suite.name, |
67 | | - (self.test_vector.name if not self.test_suite.is_single_archive else ""), |
68 | | - self.test_vector.input_file, |
69 | | - ) |
70 | | - output_filepath = normalize_path(output_filepath) |
71 | | - input_filepath = normalize_path(input_filepath) |
| 98 | + start = perf_counter() |
72 | 99 |
|
73 | 100 | try: |
74 | | - start = perf_counter() |
75 | | - result = self.decoder.decode( |
76 | | - input_filepath, |
77 | | - output_filepath, |
78 | | - self.test_vector.output_format, |
79 | | - self.timeout, |
80 | | - self.verbose, |
81 | | - self.keep_files, |
82 | | - ) |
83 | | - self.test_suite.test_vectors[self.test_vector.name].test_time = perf_counter() - start |
| 101 | + result = self._execute_decode() |
| 102 | + self.test_vector_result.test_time = perf_counter() - start |
84 | 103 | except TimeoutExpired: |
85 | | - self.test_suite.test_vectors[self.test_vector.name].test_result = TestVectorResult.TIMEOUT |
86 | | - self.test_suite.test_vectors[self.test_vector.name].test_time = perf_counter() - start |
| 104 | + self.test_vector_result.test_result = TestVectorResult.TIMEOUT |
| 105 | + self.test_vector_result.test_time = perf_counter() - start |
87 | 106 | raise |
88 | 107 | except Exception: |
89 | | - self.test_suite.test_vectors[self.test_vector.name].test_result = TestVectorResult.ERROR |
90 | | - self.test_suite.test_vectors[self.test_vector.name].test_time = perf_counter() - start |
| 108 | + self.test_vector_result.test_result = TestVectorResult.ERROR |
| 109 | + self.test_vector_result.test_time = perf_counter() - start |
91 | 110 | raise |
92 | 111 |
|
93 | | - if not self.keep_files and os.path.exists(output_filepath) and os.path.isfile(output_filepath): |
94 | | - os.remove(output_filepath) |
95 | | - |
96 | | - if not self.reference: |
97 | | - self.test_suite.test_vectors[self.test_vector.name].test_result = TestVectorResult.FAIL |
98 | | - if self.test_vector.result.lower() == result.lower(): |
99 | | - self.test_suite.test_vectors[self.test_vector.name].test_result = TestVectorResult.SUCCESS |
100 | | - self.assertEqual( |
101 | | - self.test_vector.result.lower(), |
102 | | - result.lower(), |
103 | | - self.test_vector.name, |
104 | | - ) |
| 112 | + if self.reference: |
| 113 | + self.test_vector_result.test_result = TestVectorResult.REFERENCE |
| 114 | + self.test_vector_result.result = result |
105 | 115 | else: |
106 | | - self.test_suite.test_vectors[self.test_vector.name].test_result = TestVectorResult.REFERENCE |
107 | | - self.test_suite.test_vectors[self.test_vector.name].result = result |
| 116 | + try: |
| 117 | + self.compare_result(result) |
| 118 | + self.test_vector_result.test_result = TestVectorResult.SUCCESS |
| 119 | + except Exception: |
| 120 | + self.test_vector_result.test_result = TestVectorResult.FAIL |
| 121 | + raise |
| 122 | + |
| 123 | + @abstractmethod |
| 124 | + def compare_result(self, result: str) -> None: |
| 125 | + """Compare the test result with the expected value. |
| 126 | +
|
| 127 | + Args: |
| 128 | + result: The result string from the decoder |
| 129 | + """ |
| 130 | + |
| 131 | + |
| 132 | +class MD5ComparisonTest(Test): |
| 133 | + """Test class for MD5 comparison""" |
| 134 | + |
| 135 | + def compare_result(self, result: str) -> None: |
| 136 | + """Compare MD5 hash results.""" |
| 137 | + expected = self.test_vector.result.lower() |
| 138 | + actual = result.lower() |
| 139 | + |
| 140 | + self.assertEqual(expected, actual, self.test_vector.name) |
| 141 | + |
| 142 | + |
| 143 | +class PixelComparisonTest(Test): |
| 144 | + """Test class for pixel comparison""" |
| 145 | + |
| 146 | + def __init__( |
| 147 | + self, |
| 148 | + decoder: Decoder, |
| 149 | + test_suite: Any, # can't use TestSuite type because of circular dependency |
| 150 | + test_vector: TestVector, |
| 151 | + skip: bool, |
| 152 | + output_dir: str, |
| 153 | + reference: bool, |
| 154 | + timeout: int, |
| 155 | + keep_files: bool, |
| 156 | + verbose: bool, |
| 157 | + reference_decoder: Decoder, |
| 158 | + ): |
| 159 | + super().__init__( |
| 160 | + decoder, |
| 161 | + test_suite, |
| 162 | + test_vector, |
| 163 | + skip, |
| 164 | + output_dir, |
| 165 | + reference, |
| 166 | + timeout, |
| 167 | + keep_files, |
| 168 | + verbose, |
| 169 | + ) |
| 170 | + self._keep_files_during_test = True |
| 171 | + self.reference_decoder = reference_decoder |
| 172 | + self.reference_filepath = normalize_path(os.path.join(self.output_dir, self.test_vector.name + "_ref.yuv")) |
| 173 | + |
| 174 | + def _decode_reference(self) -> str: |
| 175 | + """Decode the reference file.""" |
| 176 | + keep_files_for_decode = self._keep_files_during_test or self.keep_files |
| 177 | + |
| 178 | + return self.reference_decoder.decode( |
| 179 | + self.input_filepath, |
| 180 | + self.reference_filepath, |
| 181 | + self.test_vector.output_format, |
| 182 | + self.timeout, |
| 183 | + self.verbose, |
| 184 | + keep_files_for_decode, |
| 185 | + ) |
| 186 | + |
| 187 | + def _cleanup_if_needed(self) -> None: |
| 188 | + super()._cleanup_if_needed() |
| 189 | + if not self.keep_files and os.path.exists(self.reference_filepath): |
| 190 | + os.remove(self.reference_filepath) |
| 191 | + |
| 192 | + def compare_result(self, result: str) -> None: |
| 193 | + """Compare decoded output with reference decoder output pixel-wise.""" |
| 194 | + reference_result = self._decode_reference() |
| 195 | + |
| 196 | + if not os.path.exists(self.reference_filepath) and os.path.exists(reference_result): |
| 197 | + self.reference_filepath = reference_result |
| 198 | + |
| 199 | + comparison_result = compare_byte_wise_files( |
| 200 | + self.reference_filepath, self.output_filepath, keep_files=self.keep_files |
| 201 | + ) |
| 202 | + |
| 203 | + self.assertEqual(0, comparison_result, self.test_vector.name) |
0 commit comments