Skip to content

Integrate nanopb to encode and decode metrics#4388

Merged
Anchit Jain (anchitj) merged 18 commits intodev_kip_714from
dev_kip_714_serialisation
Aug 21, 2023
Merged

Integrate nanopb to encode and decode metrics#4388
Anchit Jain (anchitj) merged 18 commits intodev_kip_714from
dev_kip_714_serialisation

Conversation

@anchitj
Copy link
Copy Markdown
Contributor

@anchitj Anchit Jain (anchitj) commented Aug 8, 2023

Changes done:

  1. Added nanopb files in src/nanopb from https://github.com/nanopb/nanopb
  2. Generated protobuf stubs using protoc compiler in nanopb and opentelemetry:1.0.0 protos, and added them in src/opentelemetry. One of the file metrics.proto needed an option present in metrics.options to create the callback needed for oneof types in .proto file which isn't generated by default.
  3. Wrote encode related methods in rdkafka_telemetry_encode.c. Sample payload for a sum type metric will be like
metrics_data {
  resource_metrics {
    resource {
      attributes {
        key: "service.name"
        value {
          string_value: "example-service"
        }
      }
    }
    scope_metrics {
      scope {
        name: "example-scope"
        version: "1.0.0"
      }
      metrics {
        name: "requests_total"
        description: "Total number of requests"
        unit: "1"
        data {
          sum {
            data_points {
              attributes {
                key: "endpoint"
                value {
                  string_value: "/home"
                }
              }
              time_unix_nano: 1609462800123456789 
              value {
                as_int: 150 
              }
            }
            aggregation_temporality: AGGREGATION_TEMPORALITY_CUMULATIVE
          }
        }
      }
    }
  }
}
  1. Wrote decode related methods in rdkafka_telemetry_decode.c. This is needed for testing, though not needed for functionality.
  2. Only one metric is being encode right now by default.

TODO:

Add prefix to both nanopb and opentelemetry structs to not cause conflicts with existing applications dependent on them.

@cla-assistant
Copy link
Copy Markdown

cla-assistant Bot commented Aug 8, 2023

CLA assistant check
All committers have signed the CLA.

Copy link
Copy Markdown
Contributor

@milindl Milind L (milindl) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments, but will discuss this more with anchit

Comment thread src/rdkafka_mock_handlers.c Outdated
Comment thread src/rdkafka_telemetry.c Outdated
Comment thread src/rdkafka_telemetry_encode.c Outdated
Comment thread src/rdkafka_telemetry_encode.c Outdated
Comment thread src/Makefile
Comment thread src/rdkafka_telemetry_decode.c Outdated
@anchitj Anchit Jain (anchitj) force-pushed the dev_kip_714_serialisation branch 2 times, most recently from c021826 to f288c9b Compare August 8, 2023 10:24
Comment thread src/opentelemetry/metrics.options Outdated
@@ -0,0 +1 @@
opentelemetry.proto.metrics.v1.Metric submsg_callback:true; No newline at end of file
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment here

Copy link
Copy Markdown
Contributor

@milindl Milind L (milindl) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, but left some comments.
Comments contain mostly minor stuff, but a few larger changes.

Comment thread src/rdkafka_broker.h Outdated

struct {
/* TODO: fill this out. */
rd_atomic32_t connects; /**< Connection attempts,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not urgent but for later, if this is only ever calculated in the main thread, we can make it int32_t (for the historic rkc_c) to prevent any overhead of using atomic values.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, update to int32_t.

Comment thread src/rdkafka_request.c
sizeof(rd_bool_t) + strlen(compression_type) +
metrics_size;
rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_PushTelemetry, 1, len);
rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_PushTelemetry,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not this line exactly, but above, in rd_kafka_broker_ApiVersion_supported( rkb, RD_KAFKAP_PushTelemetry, 0, 1, NULL);, we should use rd_kafka_broker_ApiVersion_supported( rkb, RD_KAFKAP_PushTelemetry, 0, 0, NULL); (set max ver = 0, not 1).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment thread src/rdkafka_request.c Outdated
rd_kafka_buf_write_kbytes(rkbuf, metric_bytes);
rd_free(metric_bytes);

/* Let the handler perform retries so that it can pick
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment isn't relevant, should be changed, though I believe NO_RETRIES is legitimate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the comment.

Comment thread src/rdkafka_telemetry.c Outdated
void *metrics_payload = NULL;
size_t metrics_payload_size;
void *metrics_payload = encode_metrics(rk, &metrics_payload_size);
char *compression_type = "gzip";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should ideally use rd_kafka_compression_t for PushTelemetry request. We can defer this change to later, when we actually implement compression.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I've added a TODO for this.

Comment thread src/rdkafka_telemetry.c Outdated
// TODO: Update dummy values
void *metrics_payload = NULL;
size_t metrics_payload_size;
void *metrics_payload = encode_metrics(rk, &metrics_payload_size);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be const void*

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment thread src/rdkafka_telemetry_encode.c Outdated
#include "nanopb/pb_decode.h"
#include "opentelemetry/metrics.pb.h"

rd_bool_t
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All methods which aren't used by other files, mark them as static, here and elsewhere
That way they won't interfere with functions of the same name elsewhere (encode_string might be a fairly common name for example)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, missed doing this. Made all the file level functions static.

Comment thread src/rdkafka_telemetry_encode.c Outdated
void *const *arg) {
opentelemetry_proto_metrics_v1_NumberDataPoint *data_point =
(opentelemetry_proto_metrics_v1_NumberDataPoint *)*arg;
if (!pb_encode_tag_for_field(stream, field)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here and elsewhere, convention is to avoid using braces for single line if/for/whiles

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to this convention everywhere.

Comment thread src/rdkafka_telemetry_encode.c Outdated
stream, opentelemetry_proto_common_v1_KeyValue_fields, key_value);
}

static const char *rd_kafka_type2str(rd_kafka_type_t type) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

best to move this to rdkafka_int.h and rdkafka_int.c and mark as non-static (I see it's used in rdkafka.c also)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to rdkafka_int.h

Comment thread src/rdkafka_telemetry_encode.c Outdated
}

// TODO: Update to handle multiple data points.
rd_bool_t encode_number_data_point(pb_ostream_t *stream,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry about my earlier comment about making them rd_bool_t. I didn't realize they were being used as callbacks for something that expects function ptrs with return type bool(_Bool). It's best to match the callback type in that case. Otherwise the compiler will complain (warning)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to bool.

Comment thread src/rdkafka_telemetry_encode.c Outdated
*metric_description =
"The total number of connections established.",
*metric_unit = "1",
*metric_type_str = rd_kafka_type2str(rk->rk_type), *metric_name;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for all these things, the type should be const char* and not char*, except for metric_name which should be declared separately.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

@anchitj Anchit Jain (anchitj) merged commit 530eb0b into dev_kip_714 Aug 21, 2023
@anchitj Anchit Jain (anchitj) deleted the dev_kip_714_serialisation branch August 21, 2023 07:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants