Skip to content

Commit 66245c4

Browse files
authored
fix(parsers.avro): Add mutex to cache access (#15921)
1 parent 0febb7e commit 66245c4

File tree

1 file changed

+18
-1
lines changed

1 file changed

+18
-1
lines changed

plugins/parsers/avro/schema_registry.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"net/url"
1111
"os"
12+
"sync"
1213
"time"
1314

1415
"github.com/linkedin/goavro/v2"
@@ -25,6 +26,7 @@ type schemaRegistry struct {
2526
password string
2627
cache map[int]*schemaAndCodec
2728
client *http.Client
29+
mu sync.RWMutex
2830
}
2931

3032
const schemaByID = "%s/schemas/ids/%d"
@@ -73,10 +75,22 @@ func newSchemaRegistry(addr, caCertPath string) (*schemaRegistry, error) {
7375
return registry, nil
7476
}
7577

76-
func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
78+
// Helper function to make managing lock easier
79+
func (sr *schemaRegistry) getSchemaAndCodecFromCache(id int) (*schemaAndCodec, error) {
80+
// Read-lock the cache map before access.
81+
sr.mu.RLock()
82+
defer sr.mu.RUnlock()
7783
if v, ok := sr.cache[id]; ok {
7884
return v, nil
7985
}
86+
return nil, fmt.Errorf("schema %d not in cache", id)
87+
}
88+
89+
func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
90+
v, err := sr.getSchemaAndCodecFromCache(id)
91+
if err == nil {
92+
return v, nil
93+
}
8094

8195
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(schemaByID, sr.url, id), nil)
8296
if err != nil {
@@ -112,6 +126,9 @@ func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
112126
return nil, err
113127
}
114128
retval := &schemaAndCodec{Schema: schemaValue, Codec: codec}
129+
// Lock the cache map before update.
130+
sr.mu.Lock()
131+
defer sr.mu.Unlock()
115132
sr.cache[id] = retval
116133
return retval, nil
117134
}

0 commit comments

Comments
 (0)