Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
tmacro committed Nov 9, 2023
1 parent 9f36624 commit e42630e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
8 changes: 8 additions & 0 deletions lib/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ class Config {
+ 'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics;
}

if (config.onlyCountLatestWhenObjectLocked !== undefined) {
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked =

Check failure on line 179 in lib/Config.js

View workflow job for this annotation

GitHub Actions / lint

There should be no line break before or after '='
config.onlyCountLatestWhenObjectLocked;
}

return config;
}
}
Expand Down
11 changes: 10 additions & 1 deletion lib/UtapiReindex.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class UtapiReindex {
this._log = new werelogs.Logger('UtapiReindex', { level, dump });
}

if (config && config.onlyCountLatestWhenObjectLocked) {
this._onlyCountLatestWhenObjectLocked = true;
}

this._requestLogger = this._log.newRequestLogger();
}

Expand Down Expand Up @@ -97,13 +101,18 @@ class UtapiReindex {
if (this._sentinel.sentinelPassword) {
flags.redis_password = this._sentinel.sentinelPassword;
}
if (this._onlyCountLatestWhenObjectLocked) {
flags.only_latest_when_locked = '';
}
/* eslint-enable camelcase */
const opts = [];
Object.keys(flags)
.forEach(flag => {
const name = `--${flag.replace(/_/g, '-')}`;
opts.push(name);
opts.push(flags[flag]);
if (flags[flag] !== '') {
opts.push(flags[flag]);
}
});
return opts;
}
Expand Down
38 changes: 27 additions & 11 deletions lib/reindex/s3_bucketd.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import requests
from requests import ConnectionError, HTTPError, Timeout

logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.DEBUG)
_log = logging.getLogger('utapi-reindex')

USERS_BUCKET = 'users..bucket'
Expand All @@ -34,6 +34,7 @@ def get_options():
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
return parser.parse_args()

def chunks(iterable, size):
Expand All @@ -49,7 +50,7 @@ def inner(*args, **kwargs):
return urllib.parse.quote(val.encode('utf-8'))
return inner

Bucket = namedtuple('Bucket', ['userid', 'name'])
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_locked'], defaults=[False])
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])

Expand All @@ -67,9 +68,10 @@ class BucketDClient:
__url_format = '{addr}/default/bucket/{bucket}'
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}

def __init__(self, bucketd_addr=None, max_retries=2):
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False):
self._bucketd_addr = bucketd_addr
self._max_retries = max_retries
self._only_latest_when_locked = only_latest_when_locked
self._session = requests.Session()

def _do_req(self, url, check_500=True, **kwargs):
Expand Down Expand Up @@ -151,9 +153,18 @@ def get_next_marker(p):
buckets = []
for result in payload['Contents']:
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups())
bucket_md = json.loads(result['value'])
obj_lock_enabled = bucket_md.get('objectLockEnabled', False)
# obj_lock_config = bucket_md.get('objectLockConfiguration', {})
# obj_lock_mode = obj_lock_config.get('rule', {}).get('mode')
# if mode == 'GOVERNANCE' or mode == 'COMPLIANCE':
bucket = Bucket(*match.groups(), object_locked=obj_lock_enabled)
if obj_lock_enabled:
bucket = bucket._replace(object_locked=True)
if name is None or bucket.name == name:
buckets.append(bucket)
print(result)

if buckets:
yield buckets
if name is not None:
Expand Down Expand Up @@ -204,8 +215,8 @@ def _sum_objects(self, bucket, listing):
for status_code, payload in listing:
contents = payload['Contents'] if isinstance(payload, dict) else payload
if contents is None:
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket, status_code))
raise InvalidListing(bucket)
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket.name, status_code))
raise InvalidListing(bucket.name)
for obj in contents:
count += 1
if isinstance(obj['value'], dict):
Expand All @@ -218,6 +229,10 @@ def _sum_objects(self, bucket, listing):
size = data.get('content-length', 0)
total_size += size

if self._only_latest_when_locked and bucket.object_locked and '\x00' in obj['key']:
_log.debug('Skipping versioned key: %s'%obj['key'])
continue

# If versioned, subtract the size of the master to avoid double counting
if last_master is not None and obj['key'].startswith(last_master + '\x00'):
_log.debug('Detected versioned key: %s - subtracting master size: %i'% (
Expand Down Expand Up @@ -248,15 +263,16 @@ def get_next_marker(p):
'gt': get_next_marker,
}

count, total_size = self._sum_objects(bucket.name, self._list_bucket(bucket.name, **params))
count, total_size = self._sum_objects(bucket, self._list_bucket(bucket.name, **params))
return BucketContents(
bucket=bucket,
obj_count=count,
total_size=total_size
)

def count_mpu_parts(self, mpu):
_bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)

def get_prefix(p):
if p is None:
Expand All @@ -276,9 +292,9 @@ def get_next_marker(p):
'listingType': 'Delimiter',
}

count, total_size = self._sum_objects(_bucket, self._list_bucket(_bucket, **params))
count, total_size = self._sum_objects(shadow_bucket, self._list_bucket(shadow_bucket_name, **params))
return BucketContents(
bucket=mpu.bucket._replace(name=_bucket),
bucket=shadow_bucket,
obj_count=0, # MPU parts are not counted towards numberOfObjects
total_size=total_size
)
Expand Down Expand Up @@ -361,7 +377,7 @@ def log_report(resource, name, obj_count, total_size):
if options.bucket is not None and not options.bucket.strip():
print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
Expand Down

0 comments on commit e42630e

Please sign in to comment.