-
Notifications
You must be signed in to change notification settings - Fork 213
Description
Publishing medium-sized (32 kiB) messages in a loop causes the process's memory usage to quickly run out. I set a 2 GiB memory limit, and the process runs out of memory after publishing about 20000 messages. I believe there are two issues:
- There is no limit to the number of queued bytes in the process.
- The client library keeps 3 copies of the messages and retains them until the batch "monitor" thread finishes.
As far as I can see, there no good way to use the current client library to publish large batches of messages without HUGE memory consumption. I believe this can be fixed in two ways:
Reduce memory of the current library:
-
Release messages when the batch is done: In
thread.Batch._commit, setself._messagestoNoneafter it is published. This seems to make memory consumption better, but I didn't test it super carefully. -
Eliminate an extra copy: Change thread.Batch to build a PubsubRequest, and pass that into the gapic.PublisherClient, rather than a list of messages that then need to be copied. This reduces one copy of messages, which seems to make memory consumption better (and possibly make things faster? again not tested carefully)
Back Pressure
Change the publisher to keep a queue of batches, and block after there are more than N in flight at once.
I implemented a client that takes the blocking queue approach. It can publish this workload without ever exceeding 300 MiB of memory. Only 3 in-flight batches were necessary to have the process be CPU bound when running in Google Cloud, so I don't think the queue doesn't even need to be too large.
Environment details
OS: Linux, ContainerOS (GKE), Container is Debian9 (using distroless)
Python: 3.5.3
API: google-cloud-python 0.41.0
Steps to reproduce
- Package the following program in a Docker container.
- Run it in Kubernetes with a memory limit of 2 GiB
- Watch it blow up when you run it.
Code example
#!/usr/bin/env python3
'''
Create the topic and a subscription:
gcloud pubsub topics create deleteme_publish_memleak_test
gcloud pubsub subscriptions create deleteme_memleak_subscription --topic=deleteme_publish_memleak_test
'''
from google.cloud import pubsub_v1
import argparse
import google.auth
import logging
import time
def main():
parser = argparse.ArgumentParser(description='mass publish to pub/sub')
parser.add_argument('--topic',
default='deleteme_publish_memleak_test',
help='topic to publish to')
parser.add_argument('--messages',
type=int,
default=200000,
help='number of messages to publish')
parser.add_argument('--bytes_per_message',
type=int,
default=32 * 1024,
help='bytes per message to publish')
parser.add_argument('--log_every_n', type=int, default=2000, help='log every N messages')
args = parser.parse_args()
credentials, project_id = google.auth.default()
publish_client = pubsub_v1.PublisherClient(credentials=credentials)
topic_path = publish_client.topic_path(project_id, args.topic)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s: %(message)s',
)
logging.info('publishing %d messages to topic %s; %d bytes per message; %.1f total MiB',
args.messages, topic_path, args.bytes_per_message,
(args.messages * args.bytes_per_message) / 1024. / 1024.)
start = time.time()
for i in range(args.messages):
message = (i % 256).to_bytes(1, byteorder='little') * args.bytes_per_message
publish_client.publish(topic_path, message)
if i % args.log_every_n == 0:
logging.info('published %d messages', i)
end = time.time()
logging.info('done publishing in %f s', end - start)
if __name__ == '__main__':
main()