Skip to content

Commit 2aa4da0

Browse files
authored
[BEAM-13857] Add K:V flags for expansion service jars and addresses to Go ITs. (#16908)
Adds functionality for running jars to the Go integration test framework, and uses this functionality to implement handling of K:V flags for providing expansion service jars and addresses to the test framework. This means that tests can simply get the address of an expansion service with the appropriate label, and this feature will handle running a jar if necessary, or just using the passed in endpoint otherwise.
1 parent 117123c commit 2aa4da0

9 files changed

Lines changed: 657 additions & 1 deletion

File tree

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package integration
17+
18+
import (
19+
"fmt"
20+
"strconv"
21+
"time"
22+
23+
"github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
24+
"github.com/apache/beam/sdks/v2/go/test/integration/internal/ports"
25+
)
26+
27+
// ExpansionServices is a struct used for getting addresses and starting expansion services, based
28+
// on the --expansion_jar and --expansion_addr flags in this package. The main reason to use this
29+
// instead of accessing the flags directly is to let it handle jar startup and shutdown.
30+
//
31+
// Usage
32+
//
33+
// Create an ExpansionServices object in TestMain with NewExpansionServices. Then use GetAddr for
34+
// every expansion service needed for the test. Call Shutdown on it before finishing TestMain (or
35+
// simply defer a call to it).
36+
//
37+
// ExpansionServices is not concurrency safe, and so a single instance should not be used within
38+
// multiple individual tests, due to the possibility of those tests being run concurrently. It is
39+
// recommended to only use ExpansionServices in TestMain to avoid this.
40+
//
41+
// Example:
42+
// var retCode int
43+
// defer func() { os.Exit(retCode) }() // Defer os.Exit so it happens after other defers.
44+
// services := integration.NewExpansionServices()
45+
// defer func() { services.Shutdown() }()
46+
// addr, err := services.GetAddr("example")
47+
// if err != nil {
48+
// retCode = 1
49+
// panic(err)
50+
// }
51+
// expansionAddr = addr // Save address to a package-level variable used by tests.
52+
// retCode = ptest.MainRet(m)
53+
type ExpansionServices struct {
54+
addrs map[string]string
55+
jars map[string]string
56+
procs []jars.Process
57+
// Callback for running jars, stored this way for testing purposes.
58+
run func(time.Duration, string, ...string) (jars.Process, error)
59+
waitTime time.Duration // Time to sleep after running jar. Tests can adjust this.
60+
}
61+
62+
// NewExpansionServices creates and initializes an ExpansionServices instance.
63+
func NewExpansionServices() *ExpansionServices {
64+
return &ExpansionServices{
65+
addrs: GetExpansionAddrs(),
66+
jars: GetExpansionJars(),
67+
procs: make([]jars.Process, 0),
68+
run: jars.Run,
69+
waitTime: 3 * time.Second,
70+
}
71+
}
72+
73+
// GetAddr gets the address for the expansion service with the given label. The label corresponds to
74+
// the labels used in the --expansion_jar and --expansion_addr flags. If an expansion service is
75+
// provided as a jar, then that jar will be run to retrieve the address, and the jars are not
76+
// guaranteed to be shut down unless Shutdown is called.
77+
//
78+
// Note: If this function starts a jar, it waits a few seconds for it to initialize. Do not use
79+
// this function if the possibility of a few seconds of latency is not acceptable.
80+
func (es *ExpansionServices) GetAddr(label string) (string, error) {
81+
// Always default to existing address before running a jar.
82+
if addr, ok := es.addrs[label]; ok {
83+
return addr, nil
84+
}
85+
jar, ok := es.jars[label]
86+
if !ok {
87+
err := fmt.Errorf("no --expansion_jar or --expansion_addr flag provided with label \"%s\"", label)
88+
return "", fmt.Errorf("expansion service labeled \"%s\" not found: %w", label, err)
89+
}
90+
91+
// Start jar on open port.
92+
port, err := ports.GetOpenTCP()
93+
if err != nil {
94+
return "", fmt.Errorf("cannot get open port for expansion service labeled \"%s\": %w", label, err)
95+
}
96+
portStr := strconv.Itoa(port)
97+
98+
// Run jar and cache its info.
99+
proc, err := es.run(*ExpansionTimeout, jar, portStr)
100+
if err != nil {
101+
return "", fmt.Errorf("cannot run jar for expansion service labeled \"%s\": %w", label, err)
102+
}
103+
time.Sleep(es.waitTime) // Wait a bit for the jar to start.
104+
es.procs = append(es.procs, proc)
105+
addr := "localhost:" + portStr
106+
es.addrs[label] = addr
107+
return addr, nil
108+
}
109+
110+
// Shutdown shuts down any jars started by the ExpansionServices struct and should get called if it
111+
// was used at all.
112+
func (es *ExpansionServices) Shutdown() {
113+
for _, p := range es.procs {
114+
p.Kill()
115+
}
116+
es.jars = nil
117+
es.addrs = nil
118+
es.procs = nil
119+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package integration
17+
18+
import (
19+
"fmt"
20+
"testing"
21+
"time"
22+
23+
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
24+
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
25+
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
26+
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
27+
"github.com/apache/beam/sdks/v2/go/test/integration/internal/jars"
28+
"github.com/google/go-cmp/cmp"
29+
"github.com/google/go-cmp/cmp/cmpopts"
30+
)
31+
32+
type testProcess struct {
33+
killed bool
34+
jar string
35+
}
36+
37+
func (p *testProcess) Kill() error {
38+
p.killed = true
39+
return nil
40+
}
41+
42+
func failRun(_ time.Duration, _ string, _ ...string) (jars.Process, error) {
43+
return nil, fmt.Errorf("unexpectedly running a jar, failing")
44+
}
45+
46+
func succeedRun(_ time.Duration, jar string, _ ...string) (jars.Process, error) {
47+
return &testProcess{jar: jar}, nil
48+
}
49+
50+
// TestExpansionServices_GetAddr_Addresses tests calling GetAddr on provided addresses.
51+
func TestExpansionServices_GetAddr_Addresses(t *testing.T) {
52+
addrsMap := map[string]string{
53+
"label1": "testAddr1",
54+
"label2": "testAddr2",
55+
"label3": "testAddr3",
56+
}
57+
jarsMap := map[string]string{
58+
"label2": "jarFilepath2",
59+
}
60+
es := &ExpansionServices{
61+
addrs: addrsMap,
62+
jars: jarsMap,
63+
procs: make([]jars.Process, 0),
64+
run: failRun,
65+
waitTime: 0,
66+
}
67+
68+
// Ensure we get the same map we put in, and that addresses take priority over jars if
69+
// both are given for the same label.
70+
for label, wantAddr := range addrsMap {
71+
gotAddr, err := es.GetAddr(label)
72+
if err != nil {
73+
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
74+
continue
75+
}
76+
if gotAddr != wantAddr {
77+
t.Errorf("incorrect address for \"%v\", want %v, got %v", label, wantAddr, gotAddr)
78+
}
79+
}
80+
// Check that nonexistent labels fail.
81+
if _, err := es.GetAddr("nonexistent_label"); err == nil {
82+
t.Errorf("did not receive error when calling GetAddr with nonexistent label")
83+
}
84+
}
85+
86+
// TestExpansionServices_GetAddr_Jars tests calling GetAddr on provided jars.
87+
func TestExpansionServices_GetAddr_Jars(t *testing.T) {
88+
addrsMap := map[string]string{}
89+
jarsMap := map[string]string{
90+
"label1": "jarFilepath1",
91+
"label2": "jarFilepath2",
92+
"label3": "jarFilepath3",
93+
}
94+
es := &ExpansionServices{
95+
addrs: addrsMap,
96+
jars: jarsMap,
97+
procs: make([]jars.Process, 0),
98+
run: succeedRun,
99+
waitTime: 0,
100+
}
101+
102+
// Call GetAddr on each jar twice, checking that the addresses remain consistent.
103+
gotMap := make(map[string]string)
104+
for label := range jarsMap {
105+
gotAddr, err := es.GetAddr(label)
106+
if err != nil {
107+
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
108+
continue
109+
}
110+
gotMap[label] = gotAddr
111+
}
112+
for label, gotAddr := range gotMap {
113+
secondAddr, err := es.GetAddr(label)
114+
if err != nil {
115+
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
116+
continue
117+
}
118+
if secondAddr != gotAddr {
119+
t.Errorf("getAddr returned different address when called twice for \"%v\", "+
120+
"attempt 1: %v, attempt 2: %v", label, gotAddr, secondAddr)
121+
}
122+
}
123+
// Check that all jars were run.
124+
gotJars := make([]string, 0)
125+
for _, proc := range es.procs {
126+
testProc := proc.(*testProcess)
127+
gotJars = append(gotJars, testProc.jar)
128+
}
129+
wantJars := make([]string, 0)
130+
for _, jar := range jarsMap {
131+
wantJars = append(wantJars, jar)
132+
}
133+
lessFunc := func(a, b string) bool { return a < b }
134+
if diff := cmp.Diff(wantJars, gotJars, cmpopts.SortSlices(lessFunc)); diff != "" {
135+
t.Errorf("processes in ExpansionServices does not match jars that should be running: diff(-want,+got):\n%v", diff)
136+
}
137+
}
138+
139+
// TestExpansionServices_Shutdown tests that a shutdown correctly kills all jars started by an
140+
// ExpansionServices.
141+
func TestExpansionServices_Shutdown(t *testing.T) {
142+
addrsMap := map[string]string{}
143+
jarsMap := map[string]string{
144+
"label1": "jarFilepath1",
145+
"label2": "jarFilepath2",
146+
"label3": "jarFilepath3",
147+
}
148+
es := &ExpansionServices{
149+
addrs: addrsMap,
150+
jars: jarsMap,
151+
procs: make([]jars.Process, 0),
152+
run: succeedRun,
153+
waitTime: 0,
154+
}
155+
// Call getAddr on each label to run jars.
156+
for label := range addrsMap {
157+
_, err := es.GetAddr(label)
158+
if err != nil {
159+
t.Errorf("unexpected error when getting address for \"%v\": %v", label, err)
160+
continue
161+
}
162+
}
163+
164+
// Shutdown and confirm that jars are killed and addresses can no longer be retrieved.
165+
procs := es.procs
166+
es.Shutdown()
167+
for _, proc := range procs {
168+
testProc := proc.(*testProcess)
169+
if !testProc.killed {
170+
t.Errorf("process for jar %v was not killed on Shutdown()", testProc.jar)
171+
}
172+
}
173+
for label := range addrsMap {
174+
_, err := es.GetAddr(label)
175+
if err == nil {
176+
t.Errorf("calling GetAddr after Shutdown did not return an error for \"%v\"", label)
177+
}
178+
}
179+
}

sdks/go/test/integration/flags.go

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515

1616
package integration
1717

18-
import "flag"
18+
import (
19+
"flag"
20+
"fmt"
21+
"strings"
22+
)
1923

2024
// The following flags are flags used in one or more integration tests, and that
2125
// may be used by scripts that execute "go test ./sdks/go/test/integration/...".
@@ -53,4 +57,84 @@ var (
5357
KafkaJarTimeout = flag.String("kafka_jar_timeout", "10m",
5458
"Sets an auto-shutdown timeout to the Kafka cluster. "+
5559
"Requires the timeout command to be present in Path, unless the value is set to \"\".")
60+
61+
// ExpansionJars contains elements in the form "label:jar" describing jar
62+
// filepaths for expansion services to use in integration tests, and the
63+
// corresponding labels. Once provided through this flag, those jars can
64+
// be used in tests via the ExpansionServices struct.
65+
ExpansionJars stringSlice
66+
67+
// ExpansionAddrs contains elements in the form "label:address" describing
68+
// endpoints for expansion services to use in integration tests, and the
69+
// corresponding labels. Once provided through this flag, those addresses
70+
// can be used in tests via the ExpansionServices struct.
71+
ExpansionAddrs stringSlice
72+
73+
// ExpansionTimeout attempts to apply an auto-shutdown timeout to any
74+
// expansion services started by integration tests.
75+
ExpansionTimeout = flag.Duration("expansion_timeout", 0,
76+
"Sets an auto-shutdown timeout to any started expansion services. "+
77+
"Requires the timeout command to be present in Path, unless the value is set to 0.")
5678
)
79+
80+
func init() {
81+
flag.Var(&ExpansionJars, "expansion_jar",
82+
"Define jar locations for expansion services. Each entry consists of "+
83+
"two values, an arbitrary label and a jar filepath, separated by a "+
84+
"\":\", in the form \"label:jar\". Jars provided through this flag "+
85+
"can be started by tests.")
86+
flag.Var(&ExpansionAddrs, "expansion_addr",
87+
"Define addresses for expansion services. Each entry consists of "+
88+
"two values, an arbitrary label and an address, separated by a "+
89+
"\":\", in the form \"label:address\". Addresses provided through "+
90+
"this flag can be used as expansion addresses by tests.")
91+
}
92+
93+
// GetExpansionJars gets all the jars given to --expansion_jar as a map of label to jar location.
94+
func GetExpansionJars() map[string]string {
95+
ret := make(map[string]string)
96+
for _, jar := range ExpansionJars {
97+
splits := strings.SplitN(jar, ":", 2)
98+
ret[splits[0]] = splits[1]
99+
}
100+
return ret
101+
}
102+
103+
// GetExpansionAddrs gets all the addresses given to --expansion_addr as a map of label to address.
104+
func GetExpansionAddrs() map[string]string {
105+
ret := make(map[string]string)
106+
for _, addr := range ExpansionAddrs {
107+
splits := strings.SplitN(addr, ":", 2)
108+
ret[splits[0]] = splits[1]
109+
}
110+
return ret
111+
}
112+
113+
// stringSlice is a flag.Value implementation for string slices, that allows
114+
// multiple strings to be assigned to one flag by specifying multiple instances
115+
// of the flag.
116+
//
117+
// Example:
118+
// var myFlags stringSlice
119+
// flag.Var(&myFlags, "my_flag", "A list of flags")
120+
// With the example above, the slice can be set to contain ["foo", "bar"]:
121+
// cmd -my_flag foo -my_flag bar
122+
type stringSlice []string
123+
124+
// String implements the String method of flag.Value. This outputs the value
125+
// of the flag as a string.
126+
func (s *stringSlice) String() string {
127+
return fmt.Sprintf("%v", *s)
128+
}
129+
130+
// Set implements the Set method of flag.Value. This stores a string input to
131+
// the flag into a stringSlice representation.
132+
func (s *stringSlice) Set(value string) error {
133+
*s = append(*s, value)
134+
return nil
135+
}
136+
137+
// Get returns the instance itself.
138+
func (s stringSlice) Get() interface{} {
139+
return s
140+
}

0 commit comments

Comments
 (0)