Skip to content

hubble: Support for FlowLog Aggregation - [CFP-37472]#42011

Merged
gandro merged 4 commits intocilium:mainfrom
mereta:cfp/aggregation
Nov 12, 2025
Merged

hubble: Support for FlowLog Aggregation - [CFP-37472]#42011
gandro merged 4 commits intocilium:mainfrom
mereta:cfp/aggregation

Conversation

@mereta
Copy link
Copy Markdown
Contributor

@mereta mereta commented Oct 3, 2025

This PR implements CFP-37472

The goal is to allow user to aggregate flows on desired properties to reduce the overall volume of logs produced to in turn reduce storage costs.
This PR introduces flow aggregation capabilities to Hubble's dynamic and static exporters and enabling collection and export of aggregated flows.

The PR may seem large but I think keeping this together makes sense. It consists of the following parts:
API changes (5): 041e12d
Aggregation Logic (14): 025bfa2
Charts & Docs (7): e7b837a
Exporter Fix (rep): d393cec

Overview

Supports multiple dynamic flowlog configs.

  • pkg/hubble/exporter/aggregator.go - AggregationLogic. Thread-safe aggregation with field-based grouping and direction-aware flow counting
  • pkg/hubble/exporter/config.go - Configuration parsing with new 'Duration' wrapper type for YAML/JSON unmarshaling. This is needed for Dynamic Exporters hot-reload 'Duration' parsing in order to be able to use '10s/10m' duration formats.
  • pkg/hubble/parser/fieldaggregate/ - Field Aggregate is a wrapper that extends FieldMask functionality.
  • api/v1/aggregate/aggregate.proto - New aggregate message definitions for flow counts. This will only be used when Agrgegation feature is enabled and will be logged as part of flow.Aggregate.
    • Adding aggregation fields directly to the Flow api.
message Aggregate {
  uint32 ingress_flow_count = 1;
  uint32 egress_flow_count = 2;
  uint32 unknown_direction_flow_count = 3;
}
  • pkg/hubble/exporter/context_cancellation_test.go - Testing of graceful shutdown and goroutine management to ensure no go routine leaks.

Config update

two new fields under hubble.export.static && hubble.export.dynamic.

  • fieldAggregate: ["source.namespace", "destination.service_name"]
  • aggregationInterval: "1m"
    For Dynamic exporter a change in either will trigger hot reload.
    If aggregationInterval is set to 0s - aggregation is disabled
    If both FieldAggregate and FieldMask is enabled. FieldMask is ignored. (More in FieldMask & FieldAggregate analysis below)

FieldMask & FieldAggregate analysis:

Scenario Design Choice Data Integrity Memory Usage Performance Correctness Issues
Current: FieldAggregate overrides FieldMask When FieldAggregate active, skip FieldMask entirely High – aggregated flows show exactly the fields used for grouping Optimal – single field copy operation Best – no duplicate processing No conflicts
Alternative 1: FieldMask after aggregation Apply FieldMask to aggregated flows before export BROKEN – Fields used for aggregation key may be masked out, making aggregation meaningless Low memory but wrong data Wasted CPU on aggregation of hidden fields Aggregated counts become orphaned without their grouping context
Alternative 2: FieldMask before aggregation Apply FieldMask to each flow, then aggregate on masked flows RISKY – Aggregation keys may collide if mask removes distinguishing fields Higher – dual field operations per flow Worse – double field processing Over-aggregation if mask creates false key collisions
Alternative 3: Intersection validation Validate FieldMask & FieldAggregate, apply mask after Safe when validated Higher – validation overhead + dual ops Complex validation logic Configuration complexity, user confusion
Alternative 4: Union approach Mask to FieldMask ∪ FieldAggregate fields Safe but potentially shows unwanted fields Higher memory – more fields than needed Extra field copying User sees fields they tried to mask out

FieldAggregate config:

fieldAggregate={source.namespace,source.pod_name,destination.namespace,verdict}"
Aggregated Flow structure:

"flow":{
   "verdict":"FORWARDED",
   "source":{
      "namespace":"default",
      "pod_name":"kapinger-bad-666498f4bc-w262k"},
   "destination":{
      "namespace":"default"
   },
   "aggregate":{
      "ingress_flow_count":78,
      "egress_flow_count":345,
      "unknown_direction_flow_count":27
   }
}

ConfigMap: (FieldMask config is ignored)

  flowlogs.yaml: |
    flowLogs:
    - aggregationInterval: 30s
      fieldAggregate:
      - source.namespace
      - source.pod_name
      - destination.namespace
      - verdict
     fieldMask:
      - time
      - source.namespace
      - source.pod_name
      - destination.namespace
      - destination.pod_name
      - l4
      - IP
      - node_name
      - is_reply
      - verdict
      - drop_reason_desc
      filePath: /var/run/cilium/hubble/events-agg.log
      includeFilters:
      - source_pod:
        - default/
      - destination_pod:
        - default/
      name: system

Testing Steps:

Manual Verrification:
BYO CNI Cilium Cluster

  • helm upgrade cilium cilium/cilium --version 1.19-dev
  • Build local images and update cluster
  • Deploy a workload that produces alot of pings to itself
  • Test aggreagtion via various combinations or below.
   --set "hubble.export.dynamic.config.content[1].aggregationInterval=25s" \
   --set "hubble.export.dynamic.config.content[1].fieldMask={time,source.namespace,source.pod_name,destination.namespace,destination.pod_name,l4,IP,node_name,is_reply,verdict,drop_reason_desc}" \
   --set "hubble.export.dynamic.config.content[1].fieldAggregate={source.namespace,source.pod_name,destination.namespace}" \
   --set "hubble.export.dynamic.config.content[1].name=agg2" \
   --set "hubble.export.dynamic.config.content[1].filePath=/var/run/cilium/hubble/events-agg2.log" \
   --set "hubble.export.dynamic.config.content[1].includeFilters[0].source_pod[0]=default/" \
   --set "hubble.export.dynamic.config.content[1].includeFilters[1].destination_pod[0]=default/"
  • Test with multiple configs
  • Test aggregation with static exporter
  • Verification of cilium-flowlog-config ConfigMap
  • Verification of logs in multiple files via node-shell
    • var/run/cilium/hubble
    • check that file names match CM
    • check for aggregation counts
    • check for config reloads
    • check setting to 0s disables aggregation
image image
Support for Configurable Hubble FlowLog Aggregation (CFP-37472)

@maintainer-s-little-helper maintainer-s-little-helper bot added the dont-merge/needs-release-note-label The author needs to describe the release impact of these changes. label Oct 3, 2025
@github-actions github-actions bot added the kind/community-contribution This was a contribution made by a community member. label Oct 3, 2025
@mereta mereta force-pushed the cfp/aggregation branch 12 times, most recently from f08dee7 to fb52264 Compare October 6, 2025 13:45
@mereta mereta force-pushed the cfp/aggregation branch 2 times, most recently from 32ad9c0 to f0e4c8d Compare October 15, 2025 14:30
@mereta mereta force-pushed the cfp/aggregation branch 3 times, most recently from 9117c3e to 9ddb88d Compare October 24, 2025 09:51
@mereta mereta force-pushed the cfp/aggregation branch 2 times, most recently from 78a206e to 1a89a0a Compare October 28, 2025 17:18
@mereta mereta marked this pull request as ready for review October 28, 2025 17:21
@mereta mereta requested review from a team as code owners October 28, 2025 17:21
@mereta mereta requested review from glrf and youngnick October 28, 2025 17:21
@mereta
Copy link
Copy Markdown
Contributor Author

mereta commented Nov 7, 2025

Overall this looks pretty good. I'll take @devodev because he's worked a lot on the cell architecture of the exporters and the way the contexts are being passed around I'm not sure if there's any better way or gotchas to be concerned with.

I'm also curious if there's potentially any way we could have a separate AggregateExporter which wraps the regular exporter, similar to how dynamic exporter wraps the regular exporter. I think having aggregation directly on the exporter is fine, but I am curious about if we could do it in a more "layered" approach instead. Was this considered, or do you think this would be possible?

I did not explore the option of using Aggregation as a wrapper. Logically it seemed more like an extension of the exporter so I took this approach. I had a quick look and I think the nature of aggregation logic is quite linked to aggregator itself and I don't think we could use this as a wrapper per se due to the way they function (stateful in aggregation vs stateless in immediate export of data), maybe we could have different types of exporters later that have different processing patterns and use the current exporter logic as some 'base' exporter. I think for now this is optimal (I've had a few rounds of feedback and simplified a few parts already).

Copy link
Copy Markdown
Contributor

@devodev devodev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I'm very satisfied with the latest changes and I deem this ready to merge. Thank you!

I still left some nit suggestions as I re-review, but feel free to ignore :)

@mereta mereta force-pushed the cfp/aggregation branch 2 times, most recently from 50243c4 to 19545dc Compare November 7, 2025 19:38
@mereta
Copy link
Copy Markdown
Contributor Author

mereta commented Nov 7, 2025

Overall I'm very satisfied with the latest changes and I deem this ready to merge. Thank you!

I still left some nit suggestions as I re-review, but feel free to ignore :)

I applied the changes☺️ Thank you for all the feedback!

@mereta mereta requested review from chancez and gandro November 7, 2025 19:49
@mereta mereta force-pushed the cfp/aggregation branch 2 times, most recently from 85394de to d6b1e98 Compare November 10, 2025 09:48
Copy link
Copy Markdown
Member

@gandro gandro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three more nits. But besides that looks good to me, unblocking. Thanks!

@mereta mereta force-pushed the cfp/aggregation branch 2 times, most recently from 17eaf9f to d393cec Compare November 10, 2025 11:23
Copy link
Copy Markdown
Contributor

@chancez chancez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Thanks a lot.

@tklauser tklauser removed request for a team and tklauser November 11, 2025 08:36
@gandro
Copy link
Copy Markdown
Member

gandro commented Nov 11, 2025

/test

@gandro gandro enabled auto-merge November 11, 2025 13:44
@gandro
Copy link
Copy Markdown
Member

gandro commented Nov 11, 2025

@mereta Unfortunately this PR no longer passes the go lint on main and thus was ejected from the merge queue. We've merged the modernize lint a couple hours ago.

Please rebase on main and fix the following linting issues:

  Error: pkg/hubble/exporter/aggregator_test.go:34:39: any: interface{} can be replaced by any (modernize)
  func (f *failingEncoder) Encode(event interface{}) error {
                                        ^
  Error: pkg/hubble/exporter/config.go:270:49: any: interface{} can be replaced by any (modernize)
  func (d *Duration) UnmarshalYAML(unmarshal func(interface{}) error) error {
                                                  ^
  Error: pkg/hubble/exporter/config.go:301:34: any: interface{} can be replaced by any (modernize)
  func (d Duration) MarshalYAML() (interface{}, error) {
                                   ^
  Error: pkg/hubble/exporter/exporter_test.go:523:21: stringsseq: Ranging over SplitSeq is more efficient (modernize)
  	for _, ln := range bytes.Split([]byte(output), []byte("\n")) {`

@gandro
Copy link
Copy Markdown
Member

gandro commented Nov 11, 2025

/test

@anubhabMajumdar
Copy link
Copy Markdown
Contributor

/test

@anubhabMajumdar
Copy link
Copy Markdown
Contributor

https://github.com/cilium/cilium/actions/runs/19273886114
This failure seems extremely unlikely to be related to this change.

@anubhabMajumdar
Copy link
Copy Markdown
Contributor

/test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/hubble Impacts hubble server or relay kind/community-contribution This was a contribution made by a community member. kind/enhancement This would improve or streamline existing functionality. release-note/minor This PR changes functionality that users may find relevant to operating Cilium.

Projects

No open projects
Status: Released

Development

Successfully merging this pull request may close these issues.

6 participants