Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions qa/suites/rgw/verify/tasks/versioning.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
tasks:
- workunit:
clients:
client.0:
- rgw/run-versioning.sh
57 changes: 57 additions & 0 deletions qa/workunits/rgw/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/usr/bin/env python3

import errno
import subprocess
import logging as log
import boto3
import botocore.exceptions

log.basicConfig(format = '%(message)s', level=log.DEBUG)
log.getLogger('botocore').setLevel(log.CRITICAL)
log.getLogger('boto3').setLevel(log.CRITICAL)
log.getLogger('urllib3').setLevel(log.CRITICAL)

def exec_cmd(cmd, wait = True, **kwargs):
check_retcode = kwargs.pop('check_retcode', True)
kwargs['shell'] = True
kwargs['stdout'] = subprocess.PIPE
proc = subprocess.Popen(cmd, **kwargs)
log.info(proc.args)
if wait:
out, _ = proc.communicate()
if check_retcode:
assert(proc.returncode == 0)
return out
return (out, proc.returncode)
return ''

def create_user(uid, display_name, access_key, secret_key):
_, ret = exec_cmd(f'radosgw-admin user create --uid {uid} --display-name "{display_name}" --access-key {access_key} --secret {secret_key}', check_retcode=False)
assert(ret == 0 or errno.EEXIST)

def boto_connect(access_key, secret_key, config=None):
def try_connect(portnum, ssl, proto):
endpoint = proto + '://localhost:' + portnum
conn = boto3.resource('s3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
use_ssl=ssl,
endpoint_url=endpoint,
verify=False,
config=config,
)
try:
list(conn.buckets.limit(1)) # just verify we can list buckets
except botocore.exceptions.ConnectionError as e:
print(e)
raise
print('connected to', endpoint)
return conn
try:
return try_connect('80', False, 'http')
except botocore.exceptions.ConnectionError:
try: # retry on non-privileged http port
return try_connect('8000', False, 'http')
except botocore.exceptions.ConnectionError:
# retry with ssl
return try_connect('443', True, 'https')
19 changes: 19 additions & 0 deletions qa/workunits/rgw/run-versioning.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
set -ex

# assume working ceph environment (radosgw-admin in path) and rgw on localhost:80
# localhost::443 for ssl

mydir=`dirname $0`

python3 -m venv $mydir
source $mydir/bin/activate
pip install pip --upgrade
pip install boto3

## run test
$mydir/bin/python3 $mydir/test_rgw_versioning.py

deactivate
echo OK.

70 changes: 17 additions & 53 deletions qa/workunits/rgw/test_rgw_reshard.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
#!/usr/bin/python3

import errno
import logging as log
import time
import subprocess
import logging as log
import json
import boto3
import botocore.exceptions
import os
from common import exec_cmd, boto_connect, create_user

"""
Rgw manual and dynamic resharding testing against a running instance
Expand All @@ -19,11 +17,6 @@
#
#

log.basicConfig(format = '%(message)s', level=log.DEBUG)
log.getLogger('botocore').setLevel(log.CRITICAL)
log.getLogger('boto3').setLevel(log.CRITICAL)
log.getLogger('urllib3').setLevel(log.CRITICAL)

""" Constants """
USER = 'tester'
DISPLAY_NAME = 'Testing'
Expand All @@ -33,18 +26,6 @@
VER_BUCKET_NAME = 'myver'
INDEX_POOL = 'default.rgw.buckets.index'

def exec_cmd(cmd, **kwargs):
check_retcode = kwargs.pop('check_retcode', True)
kwargs['shell'] = True
kwargs['stdout'] = subprocess.PIPE
proc = subprocess.Popen(cmd, **kwargs)
log.info(proc.args)
out, _ = proc.communicate()
if check_retcode:
assert(proc.returncode == 0)
return out
return (out, proc.returncode)

class BucketStats:
def __init__(self, bucket_name, bucket_id, num_objs=0, size_kb=0, num_shards=0):
self.bucket_name = bucket_name
Expand Down Expand Up @@ -163,41 +144,14 @@ def main():
"""
execute manual and dynamic resharding commands
"""
# create user
_, ret = exec_cmd('radosgw-admin user create --uid {} --display-name {} --access-key {} --secret {}'.format(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY), check_retcode=False)
assert(ret == 0 or errno.EEXIST)

def boto_connect(portnum, ssl, proto):
endpoint = proto + '://localhost:' + portnum
conn = boto3.resource('s3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
use_ssl=ssl,
endpoint_url=endpoint,
verify=False,
config=None,
)
try:
list(conn.buckets.limit(1)) # just verify we can list buckets
except botocore.exceptions.ConnectionError as e:
print(e)
raise
print('connected to', endpoint)
return conn

try:
connection = boto_connect('80', False, 'http')
except botocore.exceptions.ConnectionError:
try: # retry on non-privileged http port
connection = boto_connect('8000', False, 'http')
except botocore.exceptions.ConnectionError:
# retry with ssl
connection = boto_connect('443', True, 'https')
create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)

connection = boto_connect(ACCESS_KEY, SECRET_KEY)

# create a bucket
bucket = connection.create_bucket(Bucket=BUCKET_NAME)
ver_bucket = connection.create_bucket(Bucket=VER_BUCKET_NAME)
connection.BucketVersioning('ver_bucket')
connection.BucketVersioning(VER_BUCKET_NAME).enable()

bucket_acl = connection.BucketAcl(BUCKET_NAME).load()
ver_bucket_acl = connection.BucketAcl(VER_BUCKET_NAME).load()
Expand Down Expand Up @@ -313,13 +267,23 @@ def boto_connect(portnum, ssl, proto):
json_op = json.loads(cmd.decode('utf-8', 'ignore')) # ignore utf-8 can't decode 0x80
assert len(json_op) == 0

# TESTCASE 'check that PUT succeeds during reshard'
log.debug(' test: PUT succeeds during reshard')
num_shards = get_bucket_stats(VER_BUCKET_NAME).num_shards
exec_cmd('''radosgw-admin --inject-delay-at=do_reshard --inject-delay-ms=5000 \
bucket reshard --bucket {} --num-shards {}'''
.format(VER_BUCKET_NAME, num_shards + 1), wait = False)
time.sleep(1)
ver_bucket.put_object(Key='put_during_reshard', Body=b"some_data")
log.debug('put object successful')

# Clean up
log.debug("Deleting bucket {}".format(BUCKET_NAME))
bucket.objects.all().delete()
bucket.delete()
log.debug("Deleting bucket {}".format(VER_BUCKET_NAME))
ver_bucket.object_versions.all().delete()
ver_bucket.delete()


main()
log.info("Completed resharding tests")
110 changes: 110 additions & 0 deletions qa/workunits/rgw/test_rgw_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#!/usr/bin/env python3

import logging as log
import json
import uuid
import botocore
import time
from common import exec_cmd, create_user, boto_connect
from botocore.config import Config

"""
Tests behavior of bucket versioning.
"""
# The test cases in this file have been annotated for inventory.
# To extract the inventory (in csv format) use the command:
#
# grep '^ *# TESTCASE' | sed 's/^ *# TESTCASE //'
#
#

""" Constants """
USER = 'versioning-tester'
DISPLAY_NAME = 'Versioning Testing'
ACCESS_KEY = 'LTA662PVVDTDWX6M2AB0'
SECRET_KEY = 'pvtchqajgzqx5581t6qbddbkj0bgf3a69qdkjcea'
BUCKET_NAME = 'versioning-bucket'
DATA_POOL = 'default.rgw.buckets.data'

def main():
"""
execute versioning tests
"""
create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)

connection = boto_connect(ACCESS_KEY, SECRET_KEY, Config(retries = {
'total_max_attempts': 1,
}))

# pre-test cleanup
try:
bucket = connection.Bucket(BUCKET_NAME)
bucket.objects.all().delete()
bucket.object_versions.all().delete()
bucket.delete()
except botocore.exceptions.ClientError as e:
if not e.response['Error']['Code'] == 'NoSuchBucket':
raise

bucket = connection.create_bucket(Bucket=BUCKET_NAME)
connection.BucketVersioning(BUCKET_NAME).enable()

# reproducer for bug from https://tracker.ceph.com/issues/59663
# TESTCASE 'verify that index entries and OLH objects are cleaned up after redundant deletes'
log.debug('TEST: verify that index entries and OLH objects are cleaned up after redundant deletes\n')
key = str(uuid.uuid4())
resp = bucket.Object(key).delete()
assert 'DeleteMarker' in resp, 'DeleteMarker key not present in response'
assert resp['DeleteMarker'], 'DeleteMarker value not True in response'
assert 'VersionId' in resp, 'VersionId key not present in response'
version_id = resp['VersionId']
bucket.Object(key).delete()
connection.ObjectVersion(bucket.name, key, version_id).delete()
# bucket index should now be empty
out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME}')
json_out = json.loads(out.replace(b'\x80', b'0x80'))
assert len(json_out) == 0, 'bucket index was not empty after all objects were deleted'

(_out, ret) = exec_cmd(f'rados -p {DATA_POOL} ls | grep {key}', check_retcode=False)
assert ret != 0, 'olh object was not cleaned up'

# TESTCASE 'verify that index entries and OLH objects are cleaned up after index linking error'
log.debug('TEST: verify that index entries and OLH objects are cleaned up after index linking error\n')
key = str(uuid.uuid4())
try:
exec_cmd('ceph config set client rgw_debug_inject_set_olh_err 2')
time.sleep(1)
bucket.Object(key).delete()
finally:
exec_cmd('ceph config rm client rgw_debug_inject_set_olh_err')
out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME}')
json_out = json.loads(out.replace(b'\x80', b'0x80'))
assert len(json_out) == 0, 'bucket index was not empty after op failed'
(_out, ret) = exec_cmd(f'rados -p {DATA_POOL} ls | grep {key}', check_retcode=False)
assert ret != 0, 'olh object was not cleaned up'

# TESTCASE 'verify that original null object version is intact after failed olh upgrade'
log.debug('TEST: verify that original null object version is intact after failed olh upgrade\n')
connection.BucketVersioning(BUCKET_NAME).suspend()
key = str(uuid.uuid4())
put_resp = bucket.put_object(Key=key, Body=b"data")
connection.BucketVersioning(BUCKET_NAME).enable()
try:
exec_cmd('ceph config set client rgw_debug_inject_set_olh_err 2')
time.sleep(1)
# expected to fail due to the above error injection
bucket.put_object(Key=key, Body=b"new data")
except Exception as e:
log.debug(e)
finally:
exec_cmd('ceph config rm client rgw_debug_inject_set_olh_err')
get_resp = bucket.Object(key).get()
assert put_resp.e_tag == get_resp['ETag'], 'get did not return null version with correct etag'

# Clean up
log.debug("Deleting bucket {}".format(BUCKET_NAME))
bucket.object_versions.all().delete()
bucket.delete()

main()
log.info("Completed bucket versioning tests")
33 changes: 30 additions & 3 deletions src/common/fault_injector.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#pragma once

#include <thread>
#include <type_traits>
#include <boost/type_traits/has_equal_to.hpp>
#include <boost/type_traits/has_left_shift.hpp>
#include <variant>
#include "include/ceph_assert.h"
#include "common/ceph_time.h"
#include "common/dout.h"

/// @file
Expand All @@ -34,13 +36,20 @@ struct InjectError {
const DoutPrefixProvider* dpp = nullptr;
};

/// Injects a delay before returning success.
struct InjectDelay {
/// duration of the delay
ceph::timespan duration;
/// an optional log channel to print a message
const DoutPrefixProvider* dpp = nullptr;
};

/** @class FaultInjector
* @brief Used to instrument a code path with deterministic fault injection
* by making one or more calls to check().
*
* A default-constructed FaultInjector contains no failure. It can also be
* constructed with a failure of type InjectAbort or InjectError, along with
* a location to inject that failure.
* constructed with a failure type and a location to inject that failure.
*
* The contained failure can be overwritten with a call to inject() or clear().
* This is not thread-safe with respect to other member functions on the same
Expand All @@ -67,6 +76,10 @@ class FaultInjector {
constexpr FaultInjector(Key location, InjectError e)
: location(std::move(location)), failure(e) {}

/// Construct with an injected delay at the given location.
constexpr FaultInjector(Key location, InjectDelay d)
: location(std::move(location)), failure(d) {}

/// Inject an assertion failure at the given location.
void inject(Key location, InjectAbort a) {
this->location = std::move(location);
Expand All @@ -79,6 +92,12 @@ class FaultInjector {
this->failure = e;
}

/// Injecte a delay at the given location.
void inject(Key location, InjectDelay d) {
this->location = std::move(location);
this->failure = d;
}

/// Clear any injected failure.
void clear() {
this->failure = Empty{};
Expand Down Expand Up @@ -110,6 +129,14 @@ class FaultInjector {
}
return 0;
}
int operator()(const InjectDelay& e) const {
if (check_location == this_location) {
ldpp_dout(e.dpp, -1) << "Injecting delay=" << e.duration
<< " at location=" << this_location << dendl;
std::this_thread::sleep_for(e.duration);
}
return 0;
}
};
return std::visit(visitor{location, this->location}, failure);
}
Expand All @@ -131,5 +158,5 @@ class FaultInjector {

using Empty = std::monostate; // empty state for std::variant

std::variant<Empty, InjectAbort, InjectError> failure;
std::variant<Empty, InjectAbort, InjectError, InjectDelay> failure;
};
Loading