-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
KMS: Add list-key-rotations flag #12853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,6 +60,7 @@ | |
GrantTokenType, | ||
ImportKeyMaterialResponse, | ||
ImportType, | ||
IncludeKeyMaterial, | ||
IncorrectKeyException, | ||
InvalidCiphertextException, | ||
InvalidGrantIdException, | ||
|
@@ -78,6 +79,8 @@ | |
ListGrantsResponse, | ||
ListKeyPoliciesRequest, | ||
ListKeyPoliciesResponse, | ||
ListKeyRotationsRequest, | ||
ListKeyRotationsResponse, | ||
ListKeysRequest, | ||
ListKeysResponse, | ||
ListResourceTagsRequest, | ||
|
@@ -1354,6 +1357,74 @@ def enable_key_rotation( | |
key.rotation_period_in_days = request.get("RotationPeriodInDays") | ||
key._update_key_rotation_date() | ||
|
||
@handler("ListKeyRotations", expand=False) | ||
def list_key_rotations( | ||
self, context: RequestContext, request: ListKeyRotationsRequest | ||
) -> ListKeyRotationsResponse: | ||
# https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html | ||
# "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException." | ||
# We do not model that here, though. | ||
# TODO: Add support for single-region EXTERNAL key origins | ||
key = self._get_kms_key( | ||
context.account_id, | ||
context.region, | ||
request.get("KeyId"), | ||
disabled_key_allowed=True, | ||
pending_deletion_key_allowed=True, | ||
) | ||
|
||
include_key_material = request.get("IncludeKeyMaterial") | ||
if include_key_material is not None: | ||
if include_key_material not in ["ALL_KEY_MATERIAL", "ROTATIONS_ONLY"]: | ||
raise ValidationException( | ||
f"1 validation error detected: Value '{include_key_material}' at 'includeKeyMaterial' " | ||
f"failed to satisfy constraint: Member must satisfy enum value set: [ALL_KEY_MATERIAL, ROTATIONS_ONLY]" | ||
) | ||
|
||
if key.metadata["KeySpec"] != KeySpec.SYMMETRIC_DEFAULT: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per AWS docs, for asymmetric keys when |
||
raise ValidationException( | ||
"Input parameter includeKeyMaterial can only be used with symmetric keys." | ||
) | ||
|
||
self._validate_key_supports_include_key_material(key) | ||
|
||
rotation_history = [] | ||
if key.supports_rotation() or include_key_material is not None: | ||
for rotation in key.key_rotations: | ||
rotation_entry = {"KeyId": rotation.key_id} | ||
|
||
if rotation.key_material_id is not None: | ||
rotation_entry["KeyMaterialId"] = rotation.key_material_id | ||
if rotation.key_material_state is not None: | ||
rotation_entry["KeyMaterialState"] = rotation.key_material_state | ||
if rotation.key_material_description is not None: | ||
rotation_entry["KeyMaterialDescription"] = rotation.key_material_description | ||
if rotation.rotation_date is not None: | ||
rotation_entry["RotationDate"] = rotation.rotation_date | ||
if rotation.rotation_type is not None: | ||
rotation_entry["RotationType"] = rotation.rotation_type | ||
if rotation.expiration_model is not None: | ||
rotation_entry["ExpirationModel"] = rotation.expiration_model | ||
if rotation.import_state is not None: | ||
rotation_entry["ImportState"] = rotation.import_state | ||
if rotation.valid_to is not None: | ||
rotation_entry["ValidTo"] = rotation.valid_to | ||
|
||
if include_key_material == IncludeKeyMaterial.ALL_KEY_MATERIAL: | ||
rotation_history.append(rotation_entry) | ||
else: # Default ROTATIONS_ONLY | ||
if rotation.rotation_type in ["AUTOMATIC", "ON_DEMAND"]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A KMS key on its creation doesn't have any rotation_type. |
||
rotation_history.append(rotation_entry) | ||
|
||
rotations_list = PaginatedList(rotation_history) | ||
page, next_token = rotations_list.get_page( | ||
lambda rotation: rotation.get("RotationDate"), | ||
next_token=request.get("Marker"), | ||
page_size=request.get("Limit", 100), | ||
) | ||
kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {} | ||
return ListKeyRotationsResponse(Rotations=page, **kwargs) | ||
|
||
@handler("ListKeyPolicies", expand=False) | ||
def list_key_policies( | ||
self, context: RequestContext, request: ListKeyPoliciesRequest | ||
|
@@ -1605,6 +1676,20 @@ def _validate_plaintext_key_type_based( | |
f"Algorithm {encryption_algorithm} and key spec {key.metadata['KeySpec']} cannot encrypt data larger than {max_size_bytes} bytes." | ||
) | ||
|
||
def _validate_key_supports_include_key_material(self, key: KmsKey): | ||
if key.metadata["Origin"] != OriginType.AWS_KMS: | ||
raise UnsupportedOperationException( | ||
f"{key.metadata['Arn']} origin is {key.metadata.get('Origin')} which is not valid for this operation." | ||
) | ||
if key.metadata["KeySpec"] != KeySpec.SYMMETRIC_DEFAULT: | ||
raise UnsupportedOperationException( | ||
f"The key spec {key.metadata['KeySpec']} is not valid for this operation." | ||
) | ||
if key.metadata["KeyUsage"] != KeyUsageType.ENCRYPT_DECRYPT: | ||
raise UnsupportedOperationException( | ||
f"The key usage {key.metadata.get('KeyUsage')} is not valid for this operation." | ||
) | ||
|
||
|
||
# --------------- | ||
# UTIL FUNCTIONS | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently relying on uuid, open to suggestions.