Skip to content

Support large number of metric data points by reducing memory allocation #5105

@asafm

Description

@asafm

Context

Apache Pulsar is a distributed, horizontally scalable messaging platform. Think of Kafka + RabbitMQ as one scalable platform. Its primary nodes are called Brokers. Pulsar clients use topics to produce and consume messages. Pulsar has a unique feature that allows it to support up to 1 million topics cluster-wide, and work is being done to support 1 million topics in a single broker. Also, it can support very low latency - less than 5ms per write/read.

Pulsar broker today exposes topic-level, producer, and consumer-level metrics (a topic can have many connected consumers and producers). One of the first issues that happened far in the past was memory pressure - during metrics collection, many metric data points objects were allocated, leading to the CPU spending time running the garbage collector and causing latency to spike well beyond the promised 5ms. This was solved by custom code, which iterated each topic and encoded metric values in Prometheus text format directly to an off-heap byte buffer, thus avoiding memory allocation. Mutable resettable objects were used (thread local instead of object pool) to facilitate data transition between Pulsar business logic classes - think of it as an object pool of DTOs. This has worked well, and Pulsar can withstand its promised 5ms latency. The byte buffer is written to the Prometheus REST endpoint HTTP response object output stream.

Today one of the significant caveats Pulsar has to use that feature up to its full potential is the metrics. If you have as low as 100k topics per broker, when each topic has 70 unique metrics, this leads to emitting 7 million metric data points. When a broker hosts 1M topics, it will emit 70M metric data points. This has several downsides impacting end users:

  • Cost - you need a very large TSDB or pay a lot for a TSDB vendor.
  • Query time
  • HTTP response of Prometheus endpoint reached 300 MB -2 GB.

One of the features I'm currently designing for Pulsar is the ability to configure aggregation and filtering in a Pulsar broker. Users can specify topic groups, which would typically be in the hundreds. The topic metrics will be emitted in the granularity of topic groups and not topics, thus reducing to normally usable cardinality. Users can dynamically alter a specific group to get it in the granularity of topics enabling them to "debug" issues with this group. Filtering would allow them to get all 70 metrics for a given group while getting the essential five metrics for all other groups.

Since Pulsar metrics code today is composed of multiple custom metric stacks accumulated over ten years, it requires consolidating it into a single metrics library. This gives the ability to choose a new metrics library for Apache Pulsar and ditch that custom code. After a lengthy evaluation of all existing Java metric libraries, I have concluded that OpenTelemetry is the fittest: Clear, understandable API, elegant SDK specifications leading to the elegant composable design, powerful features set, soon-to-be industry-wide standard, very active community, and foundation-based.

The problem

When the SDK is used in an environment where many metric data points are created per metrics collection (many = gt 500k up to 70M), the amount of allocated objects on the heap is very large (1x - 10x the number of metric data points, meaning in Pulsar case to 1M to 700M objects in the extreme case), leading to CPU spent in garbage collection instead doing Pulsar code hence impacting latency, making it far higher than 5ms and leading to instability of the latency.

There are multiple sources of this memory allocation:

  • The MetricReader interface and the MetricsExporter interfaces were designed to receive the metrics collected from memory by the SDK using a list (collection); thus, each metric point is allocated one object.
  • The OTLP exporter serializes the list of data points to protobuf by creating (allocating) a Marshaller object per each piece of data in the data point, leading to the allocation of objects in the size of 10-30 times the number of metric data points, subsequently garbage collected.
  • The storage objects used by the implementation of the instruments are allocating an object every time they need to aggregate the accumulated counters (i.e., creating a snapshot of current accumulation into an object), which multiplies the number of objects to be garbage collected x2 per collection.
  • OTel supports the concept of reading and exporting to multiple destinations, thus numerous readers. Each reader has its storage per the pair of (instrument, Attributes), meaning each reader doubles the amount of memory required and the amount of memory allocation, thus leading to double the garbage collection.

The proposed solution

Batch to Streaming API

The proposed idea is to switch the metrics collection methodology from batching - producing a list - to streaming, meaning iterating the results using the visitor pattern. It is similar to the difference between different ways to do XML/JSON parsing: DOM Parsers vs. SAX parsers. Switching to streaming API will start with aggregator handle and storage classes, continue with MetricProducer and MetricReader, and end with MetricExporter, which will allow us to minimize heap object allocation to a bare minimum during metrics collection by streaming the data directly to the socket used by the exporter or an off-heap byte array (later to be written by the exporter to the socket).

The following is a pseudo-code sketch of the suggested change to the SDK. It uses the visitor design pattern coupled with re-usable metric data points objects, referred to as Mutable* in the pseudo-code below.

MetricsVisitor {
	resourceStarted(Resource)
	instrumentationScopeStarted(IntrumentationScopeInfo)
	instrumentStarted(MutableInstrumentInfo)
	visitAttributes(MutableMetricData)		
	instrumentEnded()
	instrumentationScopeEnded()
	resourceEnded()	
}	


MetricsBatch {
	visit(MetricsVisitor)
}


Exporter
	// Existing method
	export(Collection<MetricData> metrics) {
		export(createMetricsBatch(metrics))
	}

	// new method
	export(MetricsBatch)
		metricsBatch.visit(new ExporterMetricsVisitor())

	ExporterMetricsVisitor {
		resourceStarted(Resource) {
			encodeResource(Resource)
		}
		...
	}

	// for backward compatibility 
	createMetricsBatch(metrics) {
		return new MetricsBatch() {
			visit(metricsVisitor) {
				groupBy(metrics) --> Map<Resource, Map<InstrumentationScopeInfo, List<Instrument>>
				for (resource)
					metricsVisitor.resourceStarted(resource)
					for (instrumentationScopeInfo)
						metricsVisitor.instrumentationScopeStarted(instrumentationScopeInfo)
						for (instrument)
							metricsVisitor.instrumentStarted(instrumentInfo)
							for (attributes)
								visitAttributes(attributes, metricsData)
							metricsVisitor.instrumentEnded()							
						metricsVisitor.instrumentationScopeEnded()
					metricsVisitor.resourceEnded()

			}
		}
	}


MetricsProducer {
	// existing method renamed slightly
	Collection<MetricData> collectAllMetricsAsCollection()

	// new method
	MetricsBatch collectAllMetrics()
}

SdkMetricsProducer 
	collectAllMetrics() {
		return new MetricsBatch() {
			visit(metricsVisitor) {
				for (Resource : resource)
					metricsVisitor.resourceStarted(Resource)
					for (meter : meters)
						metricsVisitor.instrumentationScopeStarted(Resource)
						for (instrument : instruments)
							metricsVisitor.instrumentStarted(Resource)
							for (attributes : attributes) {
								metricsVisitor.visitAttributes(Attributes, MutableMetricData)
							}
							metricsVisitor.instrumentEnded(Resource)
						}
						metricsVisitor.instrumentationScopeEnded(Resource)
					}
					metricsVisitor.resourceEnded(Resource)
				}
			}
		}
	}
}


PeriodicMetricsReader {
	// once in an interval run:
	exporter.export(new MetricsBatch() {
		visit(metricsVisitor) {
			for (metricsProducer)
				metricsProducer.produce() -> producerMetricsBatch
				producerMetricsBatch.visit(metricsVisitor)
		}
	})
}

  • The exporter can have an additional Mode getMode() method by default will return BATCH to be backward compatible with exporters created outside of this SDK. SDK exporter implementations will return STREAMING. This will allow the Metric Reader orchestrating it all to choose which method to execute for the exporter.
  • MetricReader is currently not allowed to be created outside the SDK; hence its implementation is changeable.
  • MetricProducer interface is internal and hence susceptible to change. If needed, we can use the same principle of getMode() to decide which method to call on the MetricProducer to collect the metrics.

If there is an agreement, a more detailed design will be presented, perhaps with better ideas / naming to achieve the same goal.

Streaming Exporters

The OTLP exporter can be modified to sequentially encode the metric data, as the visitor assures us the method will be called in the proper order (Resource, instrumentation scope, instrument, attributes); thus, we can write directly to the output stream, removing the need to allocate marshaller objects per data point.

The same can be achieved more easily with Prometheus exporter, writing directly to the HTTP output stream.

Reusing storage for the same conditions

If two metric readers share the same aggregation temporality with the same parameters, the same storage can be used for both. If an instrument is configured the same in two metric readers, in terms of aggregation function and temporality, the same storage instance can be used. For example, if we're using a Prometheus reader as is and an OTLP exporter as is, with cumulative aggregation, all instrument storage instances can be created once to be used for both readers.

Using this in Apache Pulsar

Following this change, I can write an aggregation-and-filtering class decorating the SDK OTLP exporter, which performs the configured aggregation per topic group within an instrument. I can present the design for that if it is needed.

Benefits

  • OTel Java SDK will reduce its resource usage to a bare minimum (both memory and CPU)
  • OTel will be able to be used in large-scale deployments featuring 100k and 1M metric data points per process without affecting the process resources (memory and CPU)
  • Memory-constrained devices will be able to use OTel Java SDK without fear of utilizing CPU for garbage collection

Notes

If this idea is agreed upon, I can contribute a more detailed design and implement it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Feature RequestSuggest an idea for this project

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions