Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rgw/driver/daos/rgw_sal_daos.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ int DaosMultipartUpload::complete(
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
rgw::sal::Attrs attrs = target_obj->get_attrs();
rgw::sal::Attrs& attrs = target_obj->get_attrs();

do {
ldpp_dout(dpp, 20) << "DaosMultipartUpload::complete(): list_parts()"
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/driver/motr/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2683,7 +2683,7 @@ int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp,
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
rgw::sal::Attrs attrs = target_obj->get_attrs();
rgw::sal::Attrs& attrs = target_obj->get_attrs();

do {
ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/driver/posix/rgw_sal_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2612,7 +2612,7 @@ int POSIXMultipartUpload::complete(const DoutPrefixProvider *dpp,
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
rgw::sal::Attrs attrs = target_obj->get_attrs();
rgw::sal::Attrs& attrs = target_obj->get_attrs();

do {
ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y);
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/driver/rados/rgw_sal_rados.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2422,7 +2422,7 @@ int RadosMultipartUpload::complete(const DoutPrefixProvider *dpp,
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
rgw::sal::Attrs attrs = target_obj->get_attrs();
rgw::sal::Attrs& attrs = target_obj->get_attrs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cbodley can you please see if this change has other implication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's hard to tell. are we mutating the attrs? if not these should be const-refs instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we are mutating the attrs at line 2544:

  attrs[RGW_ATTR_ETAG] = etag_bl;

  if (compressed) {
    // write compression attribute to full object
    bufferlist tmp;
    encode(cs_info, tmp);
    attrs[RGW_ATTR_COMPRESSION] = tmp;
  }


do {
ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated, y);
Expand Down
63 changes: 42 additions & 21 deletions src/rgw/rgw_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6591,9 +6591,6 @@ void RGWCompleteMultipart::execute(optional_yield y)
RGWMultiCompleteUpload *parts;
RGWMultiXMLParser parser;
std::unique_ptr<rgw::sal::MultipartUpload> upload;
off_t ofs = 0;
std::unique_ptr<rgw::sal::Object> meta_obj;
std::unique_ptr<rgw::sal::Object> target_obj;
uint64_t olh_epoch = 0;

op_ret = get_params(y);
Expand Down Expand Up @@ -6682,8 +6679,8 @@ void RGWCompleteMultipart::execute(optional_yield y)


// make reservation for notification if needed
std::unique_ptr<rgw::sal::Notification> res
= driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y, &s->object->get_name());
res = driver->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, y,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems wrong for the Notification to point at the meta_obj rather than the target_obj. that's probably why you were seeing lifetime issues around it?

&s->object->get_name());
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
Expand All @@ -6706,21 +6703,10 @@ void RGWCompleteMultipart::execute(optional_yield y)
return;
}

// remove the upload meta object ; the meta object is not versioned
// when the bucket is, as that would add an unneeded delete marker
int r = meta_obj->delete_object(this, y, true /* prevent versioning */);
Comment on lines -6709 to -6711
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before this moved to RGWCompleteMultipart::complete(), it was only being called on success. but complete() is called on errors too, so the change caused a regression in https://tracker.ceph.com/issues/65746

if (r >= 0) {
/* serializer's exclusive lock is released */
serializer->clear_locked();
} else {
ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << dendl;
}

// send request to notification manager
int ret = res->publish_commit(this, ofs, upload->get_mtime(), etag, target_obj->get_instance());
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
upload_time = upload->get_mtime();
int r = serializer->unlock();
if (r < 0) {
Comment on lines +6707 to +6708
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this unlock() call wasn't here before, and i'm not sure why it was added. in the successful path where we call meta_obj->delete_object(), we shouldn't do a separate unlock() because delete_object() deletes the lock implicitly (that's why it uses serializer->clear_locked())

ldpp_dout(this, 0) << "WARNING: failed to unlock " << *serializer.get() << dendl;
}
} // RGWCompleteMultipart::execute

Expand Down Expand Up @@ -6773,7 +6759,42 @@ void RGWCompleteMultipart::complete()
}
}

etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
if (op_ret >= 0 && target_obj.get() != nullptr) {
s->object->set_attrs(target_obj->get_attrs());
etag = s->object->get_attrs()[RGW_ATTR_ETAG].to_str();
// send request to notification manager
if (res.get() != nullptr) {
int ret = res->publish_commit(this, ofs, upload_time, etag, target_obj->get_instance());
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
// too late to rollback operation, hence op_ret is not set here
}
} else {
ldpp_dout(this, 1) << "ERROR: reservation is null" << dendl;
}
} else {
ldpp_dout(this, 1) << "ERROR: either op_ret is negative (execute failed) or target_obj is null, op_ret: "
<< op_ret << dendl;
}

// remove the upload meta object ; the meta object is not versioned
// when the bucket is, as that would add an unneeded delete marker
// moved to complete to prevent segmentation fault in publish commit
if (meta_obj.get() != nullptr) {
int ret = meta_obj->delete_object(this, null_yield, true /* prevent versioning */);
if (ret >= 0) {
/* serializer's exclusive lock is released */
serializer->clear_locked();
} else {
ldpp_dout(this, 0) << "WARNING: failed to remove object " << meta_obj << ", ret: " << ret << dendl;
}
} else {
ldpp_dout(this, 0) << "WARNING: meta_obj is null" << dendl;
}

res.reset();
meta_obj.reset();
target_obj.reset();

send_response();
}
Expand Down
5 changes: 5 additions & 0 deletions src/rgw/rgw_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,11 @@ class RGWCompleteMultipart : public RGWOp {
bufferlist data;
std::unique_ptr<rgw::sal::MPSerializer> serializer;
jspan_ptr multipart_trace;
ceph::real_time upload_time;
std::unique_ptr<rgw::sal::Object> target_obj;
std::unique_ptr<rgw::sal::Notification> res;
std::unique_ptr<rgw::sal::Object> meta_obj;
off_t ofs = 0;

public:
RGWCompleteMultipart() {}
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_sal_dbstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ namespace rgw::sal {
int marker = 0;
uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
auto etags_iter = part_etags.begin();
rgw::sal::Attrs attrs = target_obj->get_attrs();
rgw::sal::Attrs& attrs = target_obj->get_attrs();

ofs = 0;
accounted_size = 0;
Expand Down
64 changes: 64 additions & 0 deletions src/test/rgw/bucket_notification/test_bn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2397,6 +2397,70 @@ def test_http_post_object_upload():
conn1.delete_bucket(Bucket=bucket_name)


@attr('mpu_test')
def test_ps_s3_multipart_on_master_http():
""" test http multipart object upload on master"""
conn = connection()
zonegroup = 'default'

# create random port for the http server
host = get_ip()
port = random.randint(10000, 20000)
# start an http server in a separate thread
http_server = StreamingHTTPServer(host, port, num_workers=10)

# create bucket
bucket_name = gen_bucket_name()
bucket = conn.create_bucket(bucket_name)
topic_name = bucket_name + TOPIC_SUFFIX

# create s3 topic
endpoint_address = 'http://'+host+':'+str(port)
endpoint_args = 'push-endpoint='+endpoint_address
opaque_data = 'http://1.2.3.4:8888'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
topic_arn = topic_conf.set_config()
# create s3 notification
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
'Events': []
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
assert_equal(status/100, 2)

# create objects in the bucket
client_threads = []
content = str(os.urandom(20*1024*1024))
key = bucket.new_key('obj')
thr = threading.Thread(target = set_contents_from_string, args=(key, content,))
thr.start()
client_threads.append(thr)
[thr.join() for thr in client_threads]

print('wait for 5sec for the messages...')
time.sleep(5)

# check http receiver
keys = list(bucket.list())
print('total number of objects: ' + str(len(keys)))
events = http_server.get_and_reset_events()
for event in events:
assert_equal(event['Records'][0]['opaqueData'], opaque_data)
assert_true(event['Records'][0]['s3']['object']['eTag'] != '')

# cleanup
for key in keys:
key.delete()
[thr.join() for thr in client_threads]
topic_conf.del_config()
s3_notification_conf.del_config(notification=notification_name)
# delete the bucket
conn.delete_bucket(bucket_name)
http_server.close()


@attr('amqp_test')
def test_ps_s3_multipart_on_master():
""" test multipart object upload on master"""
Expand Down