Skip to content

Commit 74ab15c

Browse files
authored
[BEAM-13892] Improve coverage of avroio package (#16990)
1 parent e234543 commit 74ab15c

2 files changed

Lines changed: 133 additions & 0 deletions

File tree

sdks/go/data/tweet.avro

275 Bytes
Binary file not shown.
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
package avroio
17+
18+
import (
19+
"bytes"
20+
"encoding/json"
21+
"errors"
22+
"io/ioutil"
23+
"os"
24+
"reflect"
25+
"testing"
26+
27+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
28+
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
29+
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
30+
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
31+
32+
"github.com/linkedin/goavro"
33+
)
34+
35+
type Tweet struct {
36+
Stamp int64 `json:"timestamp"`
37+
Tweet string `json:"tweet"`
38+
User string `json:"username"`
39+
}
40+
41+
const schema = `{
42+
"type": "record",
43+
"name": "tweet",
44+
"namespace": "twitter",
45+
"fields": [
46+
{ "name": "timestamp", "type": "double" },
47+
{ "name": "tweet", "type": "string" },
48+
{ "name": "username", "type": "string" }
49+
]
50+
}`
51+
52+
func TestRead(t *testing.T) {
53+
avroFile := "../../../../data/tweet.avro"
54+
55+
p := beam.NewPipeline()
56+
s := p.Root()
57+
tweets := Read(s, avroFile, reflect.TypeOf(Tweet{}))
58+
passert.Count(s, tweets, "NumUsers", 1)
59+
passert.Equals(s, tweets, Tweet{
60+
Stamp: int64(20),
61+
Tweet: "Hello twitter",
62+
User: "user1",
63+
})
64+
65+
ptest.RunAndValidate(t, p)
66+
}
67+
68+
type TwitterUser struct {
69+
User string `json:"username"`
70+
Info string `json:"info"`
71+
}
72+
73+
const userSchema = `{
74+
"type": "record",
75+
"name": "user",
76+
"namespace": "twitter",
77+
"fields": [
78+
{ "name": "username", "type": "string" },
79+
{ "name": "info", "type": "string" }
80+
]
81+
}`
82+
83+
func TestWrite(t *testing.T) {
84+
avroFile := "./user.avro"
85+
testUsername := "user1"
86+
testInfo := "userInfo"
87+
p, s, sequence := ptest.CreateList([]string{testUsername})
88+
format := beam.ParDo(s, func(username string, emit func(string)) {
89+
newUser := TwitterUser{
90+
User: username,
91+
Info: testInfo,
92+
}
93+
94+
b, _ := json.Marshal(newUser)
95+
emit(string(b))
96+
}, sequence)
97+
Write(s, avroFile, userSchema, format)
98+
t.Cleanup(func() {
99+
os.Remove(avroFile)
100+
})
101+
102+
ptest.RunAndValidate(t, p)
103+
104+
if _, err := os.Stat(avroFile); errors.Is(err, os.ErrNotExist) {
105+
t.Fatalf("Failed to write file %v", avroFile)
106+
}
107+
108+
avroBytes, err := ioutil.ReadFile(avroFile)
109+
if err != nil {
110+
t.Fatalf("Failed to read avro file: %v", err)
111+
}
112+
ocf, err := goavro.NewOCFReader(bytes.NewReader(avroBytes))
113+
var nativeData []interface{}
114+
for ocf.Scan() {
115+
datum, err := ocf.Read()
116+
if err != nil {
117+
break // Read error sets OCFReader error
118+
}
119+
nativeData = append(nativeData, datum)
120+
}
121+
if err := ocf.Err(); err != nil {
122+
t.Fatalf("Error decoding avro data: %v", err)
123+
}
124+
if got, want := len(nativeData), 1; got != want {
125+
t.Fatalf("Avro data, got %v records, want %v", got, want)
126+
}
127+
if got, want := nativeData[0].(map[string]interface{})["username"], testUsername; got != want {
128+
t.Fatalf("User.User=%v, want %v", got, want)
129+
}
130+
if got, want := nativeData[0].(map[string]interface{})["info"], testInfo; got != want {
131+
t.Fatalf("User.User=%v, want %v", got, want)
132+
}
133+
}

0 commit comments

Comments
 (0)