Skip to content

Commit 08e88a8

Browse files
committed
Refactor read file function and remove gcsio dependency
1 parent c54eb2e commit 08e88a8

17 files changed

Lines changed: 41 additions & 56 deletions

sdks/python/apache_beam/examples/complete/autocomplete_it_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from apache_beam.examples.complete import autocomplete
2929
from apache_beam.testing.test_pipeline import TestPipeline
3030
from apache_beam.testing.test_utils import create_file
31-
from apache_beam.testing.test_utils import read_gcs_output_file
31+
from apache_beam.testing.test_utils import read_files_from_pattern
3232

3333

3434
def format_output_file(output_string):
@@ -85,7 +85,7 @@ def test_autocomplete_output_files_on_small_input(self):
8585
autocomplete.run(test_pipeline.get_full_options_as_args(**extra_opts))
8686

8787
# Load result file and compare.
88-
result = read_gcs_output_file(output).strip()
88+
result = read_files_from_pattern('%s*' % output).strip()
8989

9090
self.assertEqual(
9191
sorted(self.EXPECTED_PREFIXES), sorted(format_output_file(result)))

sdks/python/apache_beam/examples/complete/distribopt_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
from apache_beam.testing.test_pipeline import TestPipeline
3333
from apache_beam.testing.test_utils import create_file
34-
from apache_beam.testing.test_utils import read_gcs_output_file
34+
from apache_beam.testing.test_utils import read_files_from_pattern
3535

3636
FILE_CONTENTS = 'OP01,8,12,0,12\n' \
3737
'OP02,30,14,3,12\n' \
@@ -72,7 +72,7 @@ def test_basics(self):
7272
save_main_session=False)
7373

7474
# Load result file and compare.
75-
lines = read_gcs_output_file(output).splitlines()
75+
lines = read_files_from_pattern('%s*' % output).splitlines()
7676

7777
# Only 1 result
7878
self.assertEqual(len(lines), 1)

sdks/python/apache_beam/examples/complete/estimate_pi_it_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
from apache_beam.examples.complete import estimate_pi
2929
from apache_beam.testing.test_pipeline import TestPipeline
30-
from apache_beam.testing.test_utils import read_gcs_output_file
30+
from apache_beam.testing.test_utils import read_files_from_pattern
3131

3232

3333
class EstimatePiIT(unittest.TestCase):
@@ -41,7 +41,7 @@ def test_estimate_pi_output_file(self):
4141
extra_opts = {'output': output}
4242
estimate_pi.run(test_pipeline.get_full_options_as_args(**extra_opts))
4343
# Load result file and compare.
44-
result = read_gcs_output_file(output)
44+
result = read_files_from_pattern('%s*' % output)
4545
[_, _, estimated_pi] = json.loads(result.strip())
4646
# Note: Probabilistically speaking this test can fail with a probability
4747
# that is very small (VERY) given that we run at least 100 thousand

sdks/python/apache_beam/examples/complete/tfidf.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import apache_beam as beam
3131
from apache_beam.io import ReadFromText
3232
from apache_beam.io import WriteToText
33-
from apache_beam.io.gcp import gcsio
33+
from apache_beam.io.filesystems import FileSystems
3434
from apache_beam.options.pipeline_options import PipelineOptions
3535
from apache_beam.options.pipeline_options import SetupOptions
3636
from apache_beam.pvalue import AsSingleton
@@ -197,11 +197,12 @@ def run(argv=None, save_main_session=True):
197197
# workflow rely on global context (e.g., a module imported at module level).
198198
pipeline_options = PipelineOptions(pipeline_args)
199199
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
200-
gcs = gcsio.GcsIO()
201200
with beam.Pipeline(options=pipeline_options) as p:
202201

203202
# Read documents specified by the uris command line option.
204-
pcoll = read_documents(p, gcs.list_prefix(known_args.uris).keys())
203+
metadata_list = FileSystems.match([known_args.uris])[0].metadata_list
204+
uris = [metadata.path for metadata in metadata_list]
205+
pcoll = read_documents(p, uris)
205206
# Compute TF-IDF information for each word.
206207
output = pcoll | TfIdf()
207208
# Write the output using a "Write" transform that has side effects.

sdks/python/apache_beam/examples/complete/tfidf_it_test.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,7 @@
2929
from apache_beam.examples.complete import tfidf
3030
from apache_beam.testing.test_pipeline import TestPipeline
3131
from apache_beam.testing.test_utils import create_file
32-
from apache_beam.testing.test_utils import read_gcs_output_file
33-
34-
# Protect against environments where gcsio library is not available.
35-
try:
36-
from apache_beam.io.gcp import gcsio
37-
except ImportError:
38-
gcsio = None
32+
from apache_beam.testing.test_utils import read_files_from_pattern
3933

4034
EXPECTED_RESULTS = set([
4135
('ghi', '1.txt', 0.3662040962227032), ('abc', '1.txt', 0.0),
@@ -46,7 +40,6 @@
4640
EXPECTED_LINE_RE = r'\(u?\'([a-z]*)\', \(\'.*([0-9]\.txt)\', (.*)\)\)'
4741

4842

49-
@unittest.skipIf(gcsio is None, 'GCP dependencies are not installed')
5043
class TfIdfIT(unittest.TestCase):
5144
@pytest.mark.examples_postcommit
5245
def test_basics(self):
@@ -60,14 +53,14 @@ def test_basics(self):
6053
create_file('/'.join([input_folder, '3.txt']), 'abc')
6154
output = '/'.join([temp_location, str(uuid.uuid4()), 'result'])
6255

63-
extra_opts = {'uris': input_folder, 'output': output}
56+
extra_opts = {'uris': '%s/**' % input_folder, 'output': output}
6457
tfidf.run(
6558
test_pipeline.get_full_options_as_args(**extra_opts),
6659
save_main_session=False)
6760

6861
# Parse result file and compare.
6962
results = []
70-
lines = read_gcs_output_file(output).splitlines()
63+
lines = read_files_from_pattern('%s*' % output).splitlines()
7164
for line in lines:
7265
match = re.search(EXPECTED_LINE_RE, line)
7366
logging.info('Result line: %s', line)

sdks/python/apache_beam/examples/complete/tfidf_test.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,13 @@
2828
from apache_beam.testing.util import assert_that
2929
from apache_beam.testing.util import equal_to
3030

31-
# Protect against environments where gcsio library is not available.
32-
try:
33-
from apache_beam.io.gcp import gcsio
34-
except ImportError:
35-
gcsio = None
36-
3731
EXPECTED_RESULTS = set([
3832
('ghi', '1.txt', 0.3662040962227032), ('abc', '1.txt', 0.0),
3933
('abc', '3.txt', 0.0), ('abc', '2.txt', 0.0),
4034
('def', '1.txt', 0.13515503603605478), ('def', '2.txt', 0.2027325540540822)
4135
])
4236

4337

44-
@unittest.skipIf(gcsio is None, 'GCP dependencies are not installed')
4538
class TfIdfTest(unittest.TestCase):
4639
def test_tfidf_transform(self):
4740
with TestPipeline() as p:

sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_it_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from apache_beam.examples.complete import top_wikipedia_sessions
2929
from apache_beam.testing.test_pipeline import TestPipeline
3030
from apache_beam.testing.test_utils import create_file
31-
from apache_beam.testing.test_utils import read_gcs_output_file
31+
from apache_beam.testing.test_utils import read_files_from_pattern
3232

3333

3434
class ComputeTopSessionsIT(unittest.TestCase):
@@ -88,7 +88,7 @@ def test_top_wikipedia_sessions_output_files_on_small_input(self):
8888
test_pipeline.get_full_options_as_args(**extra_opts))
8989

9090
# Load result file and compare.
91-
result = read_gcs_output_file(output).strip().splitlines()
91+
result = read_files_from_pattern('%s*' % output).strip().splitlines()
9292

9393
self.assertEqual(self.EXPECTED, sorted(result, key=lambda x: x.split()[0]))
9494

sdks/python/apache_beam/examples/cookbook/coders_it_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from apache_beam.examples.cookbook import coders
2929
from apache_beam.testing.test_pipeline import TestPipeline
3030
from apache_beam.testing.test_utils import create_file
31-
from apache_beam.testing.test_utils import read_gcs_output_file
31+
from apache_beam.testing.test_utils import read_files_from_pattern
3232

3333

3434
def format_result(result_string):
@@ -72,7 +72,7 @@ def test_coders_output_files_on_small_input(self):
7272
coders.run(test_pipeline.get_full_options_as_args(**extra_opts))
7373

7474
# Load result file and compare.
75-
result = read_gcs_output_file(output).strip()
75+
result = read_files_from_pattern('%s*' % output).strip()
7676

7777
self.assertEqual(
7878
sorted(self.EXPECTED_RESULT), sorted(format_result(result)))

sdks/python/apache_beam/examples/cookbook/custom_ptransform_it_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from apache_beam.examples.cookbook import custom_ptransform
2828
from apache_beam.testing.test_pipeline import TestPipeline
2929
from apache_beam.testing.test_utils import create_file
30-
from apache_beam.testing.test_utils import read_gcs_output_file
30+
from apache_beam.testing.test_utils import read_files_from_pattern
3131

3232

3333
def format_result(result_string):
@@ -61,7 +61,7 @@ def test_custom_ptransform_output_files_on_small_input(self):
6161
custom_ptransform.run(test_pipeline.get_full_options_as_args(**extra_opts))
6262

6363
# Load result file and compare.
64-
result = read_gcs_output_file(output).strip()
64+
result = read_files_from_pattern('%s*' % output).strip()
6565
self.assertEqual(result, self.EXPECTED_RESULT)
6666

6767

sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
from apache_beam.examples.cookbook import group_with_coder
2929
from apache_beam.testing.test_pipeline import TestPipeline
30-
from apache_beam.testing.test_utils import read_gcs_output_file
30+
from apache_beam.testing.test_utils import read_files_from_pattern
3131

3232
# Protect against environments where gcsio library is not available.
3333
try:
@@ -88,7 +88,7 @@ def test_basics_with_type_check(self):
8888
save_main_session=False)
8989
# Parse result file and compare.
9090
results = []
91-
lines = read_gcs_output_file(output).splitlines()
91+
lines = read_files_from_pattern('%s*' % output).splitlines()
9292
for line in lines:
9393
name, points = line.split(',')
9494
results.append((name, int(points)))

0 commit comments

Comments
 (0)