storage/remote: remote write 2.0 content negotiation#13498
storage/remote: remote write 2.0 content negotiation#13498alexgreenbank merged 11 commits intoremote-write-2.0from
Conversation
|
Moved lots of code around to keep boundaries of store client (in Started to document the differences between the 1.x responses and the 2.x responses here: https://docs.google.com/document/d/1t6BjIM3ha9V_ZqeR01y2s6ebMTCW1hrJtvt7E0GeyTw/edit - this needs more work to ensure all edge cases are covered. |
|
(Will fix merge conflicts manually as some rework is required...) |
Signed-off-by: Alex Greenbank <[email protected]>
35ac64f to
04929dc
Compare
|
Force pushed a single clean commit of my changes that had proper signoff. |
Signed-off-by: Alex Greenbank <[email protected]>
Signed-off-by: Alex Greenbank <[email protected]>
Signed-off-by: Alex Greenbank <[email protected]>
|
OK, still have the linting errors to fix but that I'll work on that separately. It shouldn't stop the initial reviews and subsequent comments! |
Signed-off-by: Alex Greenbank <[email protected]>
Signed-off-by: Alex Greenbank <[email protected]>
storage/remote/client.go
Outdated
| // TODO(alexg) Do we want to include the first line of the message? | ||
| return ErrStatusBadRequest | ||
| case 406: | ||
| // Return an unrecoverable error to indicate the 406 |
storage/remote/queue_manager.go
Outdated
| // Resend logic on 406 | ||
| // ErrStatusNotAcceptable is a new error defined in client.go | ||
|
|
||
| // Work out what version to send based on the last header seen and the QM's rwFormat setting | ||
| // TODO(alexg) - see comments below about retry/renegotiate design | ||
| for attemptNos := 1; attemptNos <= 3; attemptNos++ { | ||
| lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() | ||
| compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) | ||
| sendErr := attemptBatchSend(batch, rwFormat, compression, false) | ||
| if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { | ||
| // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying | ||
| break | ||
| } | ||
| // If we get either of the two errors (406, 400) we loop and re-negotiate |
There was a problem hiding this comment.
I'm not sure we should have a separate retry logic for this. The flip/flopping of the accepted protocol versions should be treated the same as any other recoverable error IMO. Especially since we intend to include the "accepted" protocol/encodings in the 406 response.
406 is recoverable so we should continue to retry as usual, avoiding dropping data as much as we can.
There was a problem hiding this comment.
IMO we should just do the song and dance to switch to the client a different protocol verison/compression encoding right away if we get a 406, and return the recoverable error and allow the retry to happen as usual, OR if we notice we can't switch to something the receiver likes, we should return a non-recoverable error.
There was a problem hiding this comment.
I think I agree with @cstyan here. And also two thoughts:
- IMO what you described in the Rollout/rollback strategy section is good enough for this. I understand your point about being more robust in front of buggy servers, but there's a ton of bugs a server could have and I don't think this one is a particularly important to receive special handling.
- Even if there are buggy servers, we expect this would only happen during rollouts, under some circumstances, and with enough bad luck. The flip/flopping should stop happening once the rollout finishes
There was a problem hiding this comment.
And following this same line: negotiateRWProto could return an error if parties could not agree on a version, and treat that as unrecoverable.
There was a problem hiding this comment.
Yeah, there's two kinds of "recoverable":
- A 5xx response is considered "recoverable" in the current Prometheus codebase sense as the retry function will resend the same payload after a short delay, no concept of any repackaging/re-encoding
- A 406 response (which we deem "recoverable" in the spec) may require creating a new payload (e.g. v1 instead of v2 or vice versa, and/or a different encoding) which is beyond what the existing
attemptStorefunction does.
This is why I described the 406 as "non-recoverable" as it wasn't "recoverable" in the same way as a 5xx error.
The main problems I found with trying to make use of the same logic as the attemptSend() method are:
- it would require passing in a whole load of parameters in order to have the information there to rebuild the payload
- the two payload creation functions take slightly different sets of parameters
so I opted to keep it higher in the stack.
I'll go back and take a look at this bearing in mind the comments above and see what could be done.
storage/remote/queue_manager.go
Outdated
| var compressed []byte | ||
|
|
||
| switch compression { | ||
| case "snappy": | ||
| compressed = snappy.Encode(*buf, data) | ||
| if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) { | ||
| // grow the buffer for the next time | ||
| *buf = make([]byte, n) | ||
| } | ||
| default: | ||
| return nil, highest, lowest, fmt.Errorf("Unknown compression scheme [%s]", compression) |
There was a problem hiding this comment.
we could refactor this out into a function now
storage/remote/client.go
Outdated
| // Attempt a HEAD request against a remote write endpoint to see what it supports. | ||
| func (c *Client) GetProtoVersions(ctx context.Context) (string, error) { |
There was a problem hiding this comment.
we should use a more descriptive name here based on the function actually doing the HEAD request, and maybe we only need the error return value?
There was a problem hiding this comment.
How about
updateProtoVersionsprobeServerForVersionsdetermineServerVersions
?
Also, would it make sense to make it private?
There was a problem hiding this comment.
I shied away from sticking HEAD in the name as I didn't want to include implementation details. Will make it private, pick a better name and remove the return values as nothing actually uses them.
There was a problem hiding this comment.
OK, renamed as probeRemoteVersions() and only returning the error string.
npazosmendez
left a comment
There was a problem hiding this comment.
looking nice, left a few comments
| remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ | ||
| Namespace: "prometheus", | ||
| Subsystem: "api", | ||
| Name: "remote_write_20_head_requests", |
There was a problem hiding this comment.
nit: we should end this in _total
| Name: "remote_write_20_head_requests", | |
| Name: "remote_write_20_head_requests_total", |
There was a problem hiding this comment.
Hm, we could also drop _20 and use label once the version distinction will be needed.
storage/remote/client.go
Outdated
| // Attempt a HEAD request against a remote write endpoint to see what it supports. | ||
| func (c *Client) GetProtoVersions(ctx context.Context) (string, error) { |
There was a problem hiding this comment.
How about
updateProtoVersionsprobeServerForVersionsdetermineServerVersions
?
Also, would it make sense to make it private?
storage/remote/queue_manager.go
Outdated
| // Resend logic on 406 | ||
| // ErrStatusNotAcceptable is a new error defined in client.go | ||
|
|
||
| // Work out what version to send based on the last header seen and the QM's rwFormat setting | ||
| // TODO(alexg) - see comments below about retry/renegotiate design | ||
| for attemptNos := 1; attemptNos <= 3; attemptNos++ { | ||
| lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() | ||
| compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) | ||
| sendErr := attemptBatchSend(batch, rwFormat, compression, false) | ||
| if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { | ||
| // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying | ||
| break | ||
| } | ||
| // If we get either of the two errors (406, 400) we loop and re-negotiate |
There was a problem hiding this comment.
I think I agree with @cstyan here. And also two thoughts:
- IMO what you described in the Rollout/rollback strategy section is good enough for this. I understand your point about being more robust in front of buggy servers, but there's a ton of bugs a server could have and I don't think this one is a particularly important to receive special handling.
- Even if there are buggy servers, we expect this would only happen during rollouts, under some circumstances, and with enough bad luck. The flip/flopping should stop happening once the rollout finishes
storage/remote/client.go
Outdated
| // Attempt a HEAD request against a remote write endpoint to see what it supports. | ||
| func (c *Client) GetProtoVersions(ctx context.Context) (string, error) { | ||
| // If we are in Version1 mode then don't even bother | ||
| if c.rwFormat == Version1 { |
There was a problem hiding this comment.
Is the Client's field c.rwFormat used anywhere apart from here? I find it confusing that the configured version is stored inside the Client (and used only here) and also in the queue manager (used it for content negotiation).
If c.rwFormat is only used here, I would remove the field and let this probe happen regardless of the configured version.
There was a problem hiding this comment.
Agreed, that would make it simpler. The whole point of a lot of my refactoring was to remove the need for the client have its own rwFormat attribute!
There was a problem hiding this comment.
I've removed rwFormat from the client.
storage/remote/queue_manager.go
Outdated
| // Resend logic on 406 | ||
| // ErrStatusNotAcceptable is a new error defined in client.go | ||
|
|
||
| // Work out what version to send based on the last header seen and the QM's rwFormat setting | ||
| // TODO(alexg) - see comments below about retry/renegotiate design | ||
| for attemptNos := 1; attemptNos <= 3; attemptNos++ { | ||
| lastHeaderSeen := s.qm.storeClient.GetLastRWHeader() | ||
| compression, rwFormat := negotiateRWProto(s.qm.rwFormat, lastHeaderSeen) | ||
| sendErr := attemptBatchSend(batch, rwFormat, compression, false) | ||
| if sendErr == nil || !(errors.Is(sendErr, ErrStatusNotAcceptable) || errors.Is(sendErr, ErrStatusBadRequest)) { | ||
| // It wasn't an error, or wasn't a 406 or 400 so we can just stop trying | ||
| break | ||
| } | ||
| // If we get either of the two errors (406, 400) we loop and re-negotiate |
There was a problem hiding this comment.
And following this same line: negotiateRWProto could return an error if parties could not agree on a version, and treat that as unrecoverable.
bwplotka
left a comment
There was a problem hiding this comment.
Nice, looks generally great from first review iteration - some style nits and suggestions
|
|
||
| var ErrStatusBadRequest = errors.New("HTTP StatusBadRequest") // 400 | ||
|
|
||
| var ErrStatusNotAcceptable = errors.New("HTTP StatusNotAcceptable") // 406 |
There was a problem hiding this comment.
It would be epic to add quick comment on both that explains semantics of those?
storage/remote/client.go
Outdated
|
|
||
| httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) | ||
| if err != nil { | ||
| // We don't attempt a retry here |
There was a problem hiding this comment.
What does this mean? That we retry somewhere else? We never retry? Let's be specific (or drop the comment, it might not help us). Let's drop it I would say (:
There was a problem hiding this comment.
Right now we're actually not doing anything with the return values from this function. We should IMO. If the user has configured their prometheus to use rw2.0 but the reciever doesn't respond correctly we should likely fail fast and have prometheus exit, or at least include a rw config option that allows them to choose to fallback to 1.0.
storage/remote/client.go
Outdated
| return "", err | ||
| } | ||
|
|
||
| // See if we got a header anyway |
There was a problem hiding this comment.
I don't want to be picky, but all those comments would be better if they have a trailing dot 🙈
| // See if we got a header anyway | |
| // See if we got a header anyway. |
There was a problem hiding this comment.
I think linting should actually fail here 🤔
There was a problem hiding this comment.
All comments were updated to fit existing style.
| case 400: | ||
| // Return an unrecoverable error to indicate the 400 | ||
| // This then gets passed up the chain so we can react to it properly | ||
| // TODO(alexg) Do we want to include the first line of the message? |
There was a problem hiding this comment.
This is where we could employ more structured response with what series exactly failed.
| remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ | ||
| Namespace: "prometheus", | ||
| Subsystem: "api", | ||
| Name: "remote_write_20_head_requests", |
There was a problem hiding this comment.
Hm, we could also drop _20 and use label once the version distinction will be needed.
| h := &writeHeadHandler{ | ||
| logger: logger, | ||
| rwFormat: rwFormat, | ||
| remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ |
There was a problem hiding this comment.
| remoteWrite20HeadRequests: prometheus.NewCounter(prometheus.CounterOpts{ | |
| remoteWrite20HeadRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ |
then you don't need must register below.
There was a problem hiding this comment.
lets avoid using promauto unless we have a reason to use it over passing a registery
There was a problem hiding this comment.
There is nothing wrong with using promauto and the default registry. In fact, we should be using it by default.
Where is this "avoid promauto" recommendation coming from?
storage/remote/write_handler.go
Outdated
| } | ||
|
|
||
| func (h *writeHeadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
| // Send a response to the HEAD request based on the format supported |
There was a problem hiding this comment.
Should this be comment for ServeHTTP instead (on top not below)
Signed-off-by: Alex Greenbank <[email protected]>
Co-authored-by: Bartlomiej Plotka <[email protected]> Signed-off-by: Alex Greenbank <[email protected]>
Signed-off-by: Alex Greenbank <[email protected]>
Signed-off-by: Alex Greenbank <[email protected]>
Signed-off-by: Alex Greenbank <[email protected]>
|
Gah. Keyboard focus on just the wrong button when hitting return... |
Overview:
This PR adds Content Negotiation to the Remote Write 2.0 protocol. This allows a sending Prometheus server to use the newer version of the protocol if the receiving server supports it. It also implements the handling in the receiving server side. It builds on top of the WAL/metadata watcher work.
Spec: Draft Spec Link
Summary of changes:
queue_manager.WriteClienthas been changed to allow the client side (which communicates with the receiving server) to know which format the data it is being passed is being sent.snappy, etc)400or406errors so that the sending side can handle these and choose to re-encode a payload as a different version or encoding.X-Prometheus-Remote-Write-Versionheader is now exposedWork remaining:
git commit -sproperly - rebase branch to satisfy DCOqueue_manager.go(see existing TODOs there)snappycompression insendMetadataWithBackoff()can be made cleaner