2020import os
2121import re
2222import subprocess
23- import unittest
2423from unittest import mock
2524from unittest .mock import MagicMock
2625
2726import pytest
28- from parameterized import parameterized
2927
3028from airflow .exceptions import AirflowException
3129from airflow .providers .apache .beam .hooks .beam import BeamCommandRunner , BeamHook , beam_options_to_args
5856"""
5957
6058
61- class TestBeamHook ( unittest . TestCase ) :
59+ class TestBeamHook :
6260 @mock .patch (BEAM_STRING .format ("BeamCommandRunner" ))
6361 @mock .patch ("airflow.providers.apache.beam.hooks.beam.subprocess.check_output" , return_value = b"2.39.0" )
6462 def test_start_python_pipeline (self , mock_check_output , mock_runner ):
@@ -106,18 +104,19 @@ def test_start_python_pipeline_unsupported_option(self, mock_check_output):
106104 process_line_callback = MagicMock (),
107105 )
108106
109- @parameterized .expand (
107+ @pytest .mark .parametrize (
108+ "py_interpreter" ,
110109 [
111- ( "default_to_python3 " , "python3 " ),
112- ( "major_version_2 " , "python2 " ),
113- ( "major_version_3 " , "python3 " ),
114- ( "minor_version " , "python3.6 " ),
115- ]
110+ pytest . param ( "python " , id = "default python " ),
111+ pytest . param ( "python2 " , id = "major python version 2.x " ),
112+ pytest . param ( "python3 " , id = "major python version 3.x " ),
113+ pytest . param ( "python3.6 " , id = "major.minor python version " ),
114+ ],
116115 )
117116 @mock .patch (BEAM_STRING .format ("BeamCommandRunner" ))
118117 @mock .patch ("airflow.providers.apache.beam.hooks.beam.subprocess.check_output" , return_value = b"2.39.0" )
119118 def test_start_python_pipeline_with_custom_interpreter (
120- self , _ , py_interpreter , mock_check_output , mock_runner
119+ self , mock_check_output , mock_runner , py_interpreter
121120 ):
122121 hook = BeamHook (runner = DEFAULT_RUNNER )
123122 wait_for_done = mock_runner .return_value .wait_for_done
@@ -144,23 +143,24 @@ def test_start_python_pipeline_with_custom_interpreter(
144143 )
145144 wait_for_done .assert_called_once_with ()
146145
147- @parameterized .expand (
146+ @pytest .mark .parametrize (
147+ "current_py_requirements, current_py_system_site_packages" ,
148148 [
149- ([ "foo-bar" ] , False ),
150- ([ "foo-bar" ] , True ),
151- ([], True ),
152- ]
149+ pytest . param ( "foo-bar" , False , id = "requirements without system site-packages" ),
150+ pytest . param ( "foo-bar" , True , id = "requirements with system site-packages" ),
151+ pytest . param ([], True , id = "only system site-packages" ),
152+ ],
153153 )
154154 @mock .patch (BEAM_STRING .format ("prepare_virtualenv" ))
155155 @mock .patch (BEAM_STRING .format ("BeamCommandRunner" ))
156156 @mock .patch ("airflow.providers.apache.beam.hooks.beam.subprocess.check_output" , return_value = b"2.39.0" )
157157 def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages (
158158 self ,
159- current_py_requirements ,
160- current_py_system_site_packages ,
161159 mock_check_output ,
162160 mock_runner ,
163161 mock_virtualenv ,
162+ current_py_requirements ,
163+ current_py_system_site_packages ,
164164 ):
165165 hook = BeamHook (runner = DEFAULT_RUNNER )
166166 wait_for_done = mock_runner .return_value .wait_for_done
@@ -204,7 +204,7 @@ def test_start_python_pipeline_with_empty_py_requirements_and_without_system_pac
204204 wait_for_done = mock_runner .return_value .wait_for_done
205205 process_line_callback = MagicMock ()
206206
207- with self . assertRaisesRegex (AirflowException , "Invalid method invocation." ):
207+ with pytest . raises (AirflowException , match = r "Invalid method invocation\ ." ):
208208 hook .start_python_pipeline (
209209 variables = copy .deepcopy (BEAM_VARIABLES_PY ),
210210 py_file = PY_FILE ,
@@ -302,20 +302,18 @@ def test_start_go_pipeline_without_go_installed_raises(self, mock_which):
302302 mock_which .return_value = None
303303 hook = BeamHook (runner = DEFAULT_RUNNER )
304304
305- with self .assertRaises (AirflowException ) as ex_ctx :
305+ error_message = (
306+ r"You need to have Go installed to run beam go pipeline\. See .* "
307+ "installation guide. If you are running airflow in Docker see more info at '.*'"
308+ )
309+ with pytest .raises (AirflowException , match = error_message ):
306310 hook .start_go_pipeline (
307311 go_file = GO_FILE ,
308312 variables = copy .deepcopy (BEAM_VARIABLES_GO ),
309313 )
310314
311- assert (
312- "You need to have Go installed to run beam go pipeline. See https://go.dev/doc/install "
313- "installation guide. If you are running airflow in Docker see more info at "
314- "'https://airflow.apache.org/docs/docker-stack/recipes.html'." == str (ex_ctx .exception )
315- )
316315
317-
318- class TestBeamRunner (unittest .TestCase ):
316+ class TestBeamRunner :
319317 @mock .patch ("airflow.providers.apache.beam.hooks.beam.BeamCommandRunner.log" )
320318 @mock .patch ("subprocess.Popen" )
321319 @mock .patch ("select.select" )
@@ -343,18 +341,20 @@ def poll_resp_error():
343341 mock_popen .assert_called_once_with (
344342 cmd , shell = False , stdout = subprocess .PIPE , stderr = subprocess .PIPE , close_fds = True , cwd = None
345343 )
346- self .assertRaises (Exception , beam .wait_for_done )
344+ with pytest .raises (Exception ):
345+ beam .wait_for_done ()
347346
348347
349- class TestBeamOptionsToArgs (unittest .TestCase ):
350- @parameterized .expand (
348+ class TestBeamOptionsToArgs :
349+ @pytest .mark .parametrize (
350+ "options, expected_args" ,
351351 [
352352 ({"key" : "val" }, ["--key=val" ]),
353353 ({"key" : None }, ["--key" ]),
354354 ({"key" : True }, ["--key" ]),
355355 ({"key" : False }, ["--key=False" ]),
356356 ({"key" : ["a" , "b" , "c" ]}, ["--key=a" , "--key=b" , "--key=c" ]),
357- ]
357+ ],
358358 )
359359 def test_beam_options_to_args (self , options , expected_args ):
360360 args = beam_options_to_args (options )
0 commit comments