9
9
"net/http"
10
10
"net/url"
11
11
"os"
12
+ "sync"
12
13
"time"
13
14
14
15
"github.com/linkedin/goavro/v2"
@@ -25,6 +26,7 @@ type schemaRegistry struct {
25
26
password string
26
27
cache map [int ]* schemaAndCodec
27
28
client * http.Client
29
+ mu sync.RWMutex
28
30
}
29
31
30
32
const schemaByID = "%s/schemas/ids/%d"
@@ -73,10 +75,22 @@ func newSchemaRegistry(addr, caCertPath string) (*schemaRegistry, error) {
73
75
return registry , nil
74
76
}
75
77
76
- func (sr * schemaRegistry ) getSchemaAndCodec (id int ) (* schemaAndCodec , error ) {
78
+ // Helper function to make managing lock easier
79
+ func (sr * schemaRegistry ) getSchemaAndCodecFromCache (id int ) (* schemaAndCodec , error ) {
80
+ // Read-lock the cache map before access.
81
+ sr .mu .RLock ()
82
+ defer sr .mu .RUnlock ()
77
83
if v , ok := sr .cache [id ]; ok {
78
84
return v , nil
79
85
}
86
+ return nil , fmt .Errorf ("schema %d not in cache" , id )
87
+ }
88
+
89
+ func (sr * schemaRegistry ) getSchemaAndCodec (id int ) (* schemaAndCodec , error ) {
90
+ v , err := sr .getSchemaAndCodecFromCache (id )
91
+ if err == nil {
92
+ return v , nil
93
+ }
80
94
81
95
req , err := http .NewRequest (http .MethodGet , fmt .Sprintf (schemaByID , sr .url , id ), nil )
82
96
if err != nil {
@@ -112,6 +126,9 @@ func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
112
126
return nil , err
113
127
}
114
128
retval := & schemaAndCodec {Schema : schemaValue , Codec : codec }
129
+ // Lock the cache map before update.
130
+ sr .mu .Lock ()
131
+ defer sr .mu .Unlock ()
115
132
sr .cache [id ] = retval
116
133
return retval , nil
117
134
}
0 commit comments