Skip to content

Commit

Permalink
more tests for GetObjectAttributes
Browse files Browse the repository at this point in the history
* multipart upload without checksums
* multipart upload with a single part
* pagination of multipart parts
* non-multipart upload with/without checksum
* versioned object, current and non-current
* sse-c encrypted object

Signed-off-by: Casey Bodley <[email protected]>
  • Loading branch information
cbodley committed Oct 18, 2024
1 parent b2aff9a commit 511894a
Showing 1 changed file with 261 additions and 2 deletions.
263 changes: 261 additions & 2 deletions s3tests_boto3/functional/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13738,9 +13738,16 @@ def test_post_object_upload_checksum():
r = requests.post(url, files=payload, verify=get_config_ssl_verify())
assert r.status_code == 400

def check_parts_count(parts, expected):
# AWS docs disagree on the name of this element
if 'TotalPartsCount' in parts:
assert parts['TotalPartsCount'] == expected
else:
assert parts['PartsCount'] == expected

@pytest.mark.checksum
@pytest.mark.fails_on_dbstore
def test_get_object_attributes():
def test_get_multipart_checksum_object_attributes():
bucket_name = get_new_bucket()
client = get_client()

Expand Down Expand Up @@ -13773,7 +13780,7 @@ def test_get_object_attributes():
nparts = len(parts)
assert response['ObjectSize'] == objlen
assert response['Checksum']['ChecksumSHA256'] == upload_checksum
assert response['ObjectParts']['TotalPartsCount'] == nparts
check_parts_count(response['ObjectParts'], nparts)

# check the parts
partno = 1
Expand All @@ -13785,3 +13792,255 @@ def test_get_object_attributes():
assert obj_part['Size'] == objlen - ((nparts-1) * (5 * 1024 * 1024))
assert obj_part['ChecksumSHA256'] == checksums[partno - 1]
partno += 1

@pytest.mark.fails_on_dbstore
def test_get_multipart_object_attributes():
bucket_name = get_new_bucket()
client = get_client()

key = "multipart"
part_size = 5*1024*1024
objlen = 30*1024*1024

(upload_id, data, parts) = _multipart_upload(bucket_name, key, objlen, part_size)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts})
etag = response['ETag'].strip('"')
assert len(etag)

request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key, \
ObjectAttributes=request_attributes)

# check overall object
nparts = len(parts)
assert response['ObjectSize'] == objlen
check_parts_count(response['ObjectParts'], len(parts))
assert response['ObjectParts']['IsTruncated'] == False
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'

# check the parts
partno = 1
for obj_part in response['ObjectParts']['Parts']:
assert obj_part['PartNumber'] == partno
assert obj_part['Size'] == part_size
assert 'ChecksumSHA256' not in obj_part
partno += 1

@pytest.mark.fails_on_dbstore
def test_get_paginated_multipart_object_attributes():
bucket_name = get_new_bucket()
client = get_client()

key = "multipart"
part_size = 5*1024*1024
objlen = 30*1024*1024

(upload_id, data, parts) = _multipart_upload(bucket_name, key, objlen, part_size)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts})
etag = response['ETag'].strip('"')
assert len(etag)

request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes,
MaxParts=1, PartNumberMarker=3)

# check overall object
assert response['ObjectSize'] == objlen
check_parts_count(response['ObjectParts'], len(parts))
assert response['ObjectParts']['MaxParts'] == 1
assert response['ObjectParts']['PartNumberMarker'] == 3
assert response['ObjectParts']['IsTruncated'] == True
assert response['ObjectParts']['NextPartNumberMarker'] == 4
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'

# check the part
assert len(response['ObjectParts']['Parts']) == 1
obj_part = response['ObjectParts']['Parts'][0]
assert obj_part['PartNumber'] == 4
assert obj_part['Size'] == part_size
assert 'ChecksumSHA256' not in obj_part

request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes,
MaxParts=10, PartNumberMarker=4)

# check overall object
assert response['ObjectSize'] == objlen
check_parts_count(response['ObjectParts'], len(parts))
assert response['ObjectParts']['MaxParts'] == 10
assert response['ObjectParts']['IsTruncated'] == False
assert response['ObjectParts']['PartNumberMarker'] == 4
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'

# check the parts
assert len(response['ObjectParts']['Parts']) == 2
partno = 5
for obj_part in response['ObjectParts']['Parts']:
assert obj_part['PartNumber'] == partno
assert obj_part['Size'] == part_size
assert 'ChecksumSHA256' not in obj_part
partno += 1

@pytest.mark.fails_on_dbstore
def test_get_single_multipart_object_attributes():
bucket_name = get_new_bucket()
client = get_client()

key = "multipart"
part_size = 5*1024*1024
part_sizes = [part_size] # just one part
part_count = len(part_sizes)
total_size = sum(part_sizes)

(upload_id, data, parts) = _multipart_upload(bucket_name, key, total_size, part_size)
response = client.complete_multipart_upload(Bucket=bucket_name, Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': parts})
etag = response['ETag'].strip('"')
assert len(etag)

request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes)

assert response['ObjectSize'] == total_size
check_parts_count(response['ObjectParts'], 1)
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'

assert len(response['ObjectParts']['Parts']) == 1
obj_part = response['ObjectParts']['Parts'][0]
assert obj_part['PartNumber'] == 1
assert obj_part['Size'] == part_size
assert 'ChecksumSHA256' not in obj_part

def test_get_checksum_object_attributes():
bucket_name = get_new_bucket()
client = get_client()

key = "myobj"
size = 1024
body = FakeWriteFile(size, 'A')
sha256sum = 'arcu6553sHVAiX4MjW0j7I7vD4w6R+Gz9Ok0Q9lTa+0='
response = client.put_object(Bucket=bucket_name, Key=key, Body=body, ChecksumAlgorithm='SHA256', ChecksumSHA256=sha256sum)
assert sha256sum == response['ChecksumSHA256']
etag = response['ETag'].strip('"')
assert len(etag)

request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes)

assert response['ObjectSize'] == size
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert response['Checksum']['ChecksumSHA256'] == sha256sum
assert 'ObjectParts' not in response

def test_get_versioned_object_attributes():
bucket_name = get_new_bucket()
check_configure_versioning_retry(bucket_name, "Enabled", "Enabled")
client = get_client()
key = "obj"
objlen = 3

response = client.put_object(Bucket=bucket_name, Key=key, Body='foo')
etag = response['ETag'].strip('"')
assert len(etag)
version = response['VersionId']
assert len(version)

request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes)

assert 'DeleteMarker' not in response
assert response['VersionId'] == version

assert response['ObjectSize'] == 3
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert 'ObjectParts' not in response

# write a new current version
client.put_object(Bucket=bucket_name, Key=key, Body='foo')

# ask for the original version again
request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key, VersionId=version,
ObjectAttributes=request_attributes)

assert 'DeleteMarker' not in response
assert response['VersionId'] == version

assert response['ObjectSize'] == 3
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert 'ObjectParts' not in response

@pytest.mark.encryption
def test_get_sse_c_encrypted_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
key = 'obj'
objlen = 1000
data = 'A'*objlen
sse_args = {
'SSECustomerAlgorithm': 'AES256',
'SSECustomerKey': 'pO3upElrwuEXSoFwCfnZPdSsmt/xWeFa0N9KgDijwVs=',
'SSECustomerKeyMD5': 'DWygnHRtgiJ77HCm+1rvHw=='
}
attrs = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']

response = client.put_object(Bucket=bucket_name, Key=key, Body=data, **sse_args)
etag = response['ETag'].strip('"')
assert len(etag)

# GetObjectAttributes fails without sse-c headers
e = assert_raises(ClientError, client.get_object_attributes,
Bucket=bucket_name, Key=key, ObjectAttributes=attrs)
status, error_code = _get_status_and_error_code(e.response)
assert status == 400

# and succeeds sse-c headers
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=attrs, **sse_args)

assert 'DeleteMarker' not in response
assert 'VersionId' not in response

assert response['ObjectSize'] == objlen
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert 'ObjectParts' not in response

def test_get_object_attributes():
bucket_name = get_new_bucket()
client = get_client()
key = "obj"
objlen = 3

response = client.put_object(Bucket=bucket_name, Key=key, Body='foo')
etag = response['ETag'].strip('"')
assert len(etag)

request_attributes = ['ETag', 'Checksum', 'ObjectParts', 'StorageClass', 'ObjectSize']
response = client.get_object_attributes(Bucket=bucket_name, Key=key,
ObjectAttributes=request_attributes)

assert 'DeleteMarker' not in response
assert 'VersionId' not in response

assert response['ObjectSize'] == 3
assert response['ETag'] == etag
assert response['StorageClass'] == 'STANDARD'
assert 'ObjectParts' not in response

0 comments on commit 511894a

Please sign in to comment.