Skip to content

Conversation

@KKcorps
Copy link
Contributor

@KKcorps KKcorps commented Nov 27, 2022

This is being done to avoid ThroughputExceeded exceptions in Kinesis due to 5RPS limit on .getRecords requests.
More on this - https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-readprovisionedthroughputexceeded/

Documentation

You can use the following config to enforce per shard rate limit on Kinesis

"streamConfigs": {
      "streamType": "kinesis",
      "stream.kinesis.topic.name": "<your kinesis stream name>",
      ..
      "requests_per_second_limit" : "5"
    }

@KKcorps KKcorps added the bugfix label Nov 27, 2022
@Jackie-Jiang Jackie-Jiang added Configuration Config changes (addition/deletion/change in behavior) release-notes Referenced by PRs that need attention when compiling the next release notes labels Nov 27, 2022
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please add the new configuration to the PR description and the documentation

private final int _numMaxRecordsToFetch;
private final ExecutorService _executorService;
private final ShardIteratorType _shardIteratorType;
private int _rpsLimit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) This can be final. Also, should we treat 0 as unlimited?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should allow unlimited. It opens up misconfiguration. Instead we can just ignore with a warning and reset to the default value.

Copy link
Contributor

@navina navina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lgtm!

private final int _numMaxRecordsToFetch;
private final ExecutorService _executorService;
private final ShardIteratorType _shardIteratorType;
private int _rpsLimit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should allow unlimited. It opens up misconfiguration. Instead we can just ignore with a warning and reset to the default value.

@codecov-commenter
Copy link

codecov-commenter commented Nov 27, 2022

Codecov Report

Merging #9863 (ad9826d) into master (e307106) will increase coverage by 33.30%.
The diff coverage is 0.00%.

@@              Coverage Diff              @@
##             master    #9863       +/-   ##
=============================================
+ Coverage     28.06%   61.36%   +33.30%     
- Complexity       53     4827     +4774     
=============================================
  Files          1949     1966       +17     
  Lines        104632   105848     +1216     
  Branches      15847    16055      +208     
=============================================
+ Hits          29362    64953    +35591     
+ Misses        72395    36070    -36325     
- Partials       2875     4825     +1950     
Flag Coverage Δ
integration1 25.10% <0.00%> (-0.11%) ⬇️
integration2 ?
unittests1 67.90% <ø> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...che/pinot/plugin/stream/kinesis/KinesisConfig.java 0.00% <0.00%> (ø)
...e/pinot/plugin/stream/kinesis/KinesisConsumer.java 0.00% <0.00%> (ø)
...pinot/core/data/manager/realtime/TimerService.java 0.00% <0.00%> (-100.00%) ⬇️
...t/core/plan/StreamingInstanceResponsePlanNode.java 0.00% <0.00%> (-100.00%) ⬇️
...ore/operator/streaming/StreamingResponseUtils.java 0.00% <0.00%> (-100.00%) ⬇️
...server/starter/helix/SegmentReloadStatusValue.java 0.00% <0.00%> (-100.00%) ⬇️
...ager/realtime/PeerSchemeSplitSegmentCommitter.java 0.00% <0.00%> (-100.00%) ⬇️
...urces/ServerReloadControllerJobStatusResponse.java 0.00% <0.00%> (-100.00%) ⬇️
...t/plugin/minion/tasks/purge/PurgeTaskExecutor.java 0.00% <0.00%> (-91.31%) ⬇️
...he/pinot/common/utils/grpc/GrpcRequestBuilder.java 0.00% <0.00%> (-90.91%) ⬇️
... and 1248 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

break;
}

if (requestSentTime == currentWindow && getRecordsResponse.records().isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@navina I feel like I should remove this isEmpty check from here. Reason being it doesn't cover the cases when we might be getting only 1-2 records on each .getRecords request and thus exceed the throughput limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok yeah makes sense 👍

Copy link
Contributor

@navina navina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@KKcorps KKcorps merged commit 76c6492 into apache:master Dec 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bugfix Configuration Config changes (addition/deletion/change in behavior) release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants