Skip to content

KMS: Add list-key-rotations flag #12853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions localstack-core/localstack/services/kms/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,20 @@ def key(self) -> RSAPrivateKey | EllipticCurvePrivateKey:
)


@dataclass
class KeyRotationEntry:
key_id: str
key_material: bytes
key_material_id: str
key_material_state: str
rotation_date: datetime.datetime | None = None
rotation_type: str | None = None
key_material_description: str | None = None
expiration_model: str | None = None
import_state: str | None = None
valid_to: str | None = None


class KmsKey:
metadata: KeyMetadata
crypto_key: KmsCryptoKey
Expand All @@ -285,7 +299,7 @@ class KmsKey:
is_key_rotation_enabled: bool
rotation_period_in_days: int
next_rotation_date: datetime.datetime
previous_keys = [str]
key_rotations: list[KeyRotationEntry]

def __init__(
self,
Expand All @@ -294,7 +308,7 @@ def __init__(
region: str = None,
):
create_key_request = create_key_request or CreateKeyRequest()
self.previous_keys = []
self.key_rotations = []

# Please keep in mind that tags of a key could be present in the request, they are not a part of metadata. At
# least in the sense of DescribeKey not returning them with the rest of the metadata. Instead, tags are more
Expand Down Expand Up @@ -323,6 +337,15 @@ def __init__(
self.rotation_period_in_days = 365
self.next_rotation_date = None

if self.supports_rotation():
initial_rotation = KeyRotationEntry(
key_id=self.metadata["Arn"],
key_material_state="CURRENT",
key_material_id=long_uid(), # FIXME: a more appropriate KMS output
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently relying on uuid, open to suggestions.

key_material=bytes(self.crypto_key.key_material),
)
self.key_rotations.append(initial_rotation)

def calculate_and_set_arn(self, account_id, region):
self.metadata["Arn"] = kms_key_arn(self.metadata.get("KeyId"), account_id, region)

Expand Down Expand Up @@ -357,7 +380,11 @@ def decrypt(
self, ciphertext: Ciphertext, encryption_context: EncryptionContextType = None
) -> bytes:
aad = _serialize_encryption_context(encryption_context=encryption_context)
keys_to_try = [self.crypto_key.key_material] + self.previous_keys
keys_to_try = [self.crypto_key.key_material]

for rotation in self.key_rotations:
if rotation.key_material and rotation.key_material not in keys_to_try:
keys_to_try.append(rotation.key_material)

for key in keys_to_try:
try:
Expand Down Expand Up @@ -738,13 +765,39 @@ def _get_key_usage(self, request_key_usage: str, key_spec: str) -> str:
return request_key_usage or "ENCRYPT_DECRYPT"

def rotate_key_on_demand(self):
if len(self.previous_keys) >= ON_DEMAND_ROTATION_LIMIT:
on_demand_count = sum(
1 for rotation in self.key_rotations if rotation.rotation_type == "ON_DEMAND"
)

if on_demand_count >= ON_DEMAND_ROTATION_LIMIT:
raise LimitExceededException(
f"The on-demand rotations limit has been reached for the given keyId. "
f"No more on-demand rotations can be performed for this key: {self.metadata['Arn']}"
)
self.previous_keys.append(self.crypto_key.key_material)
self.crypto_key = KmsCryptoKey(KeySpec.SYMMETRIC_DEFAULT)

for rotation in self.key_rotations:
rotation.key_material_state = "NON_CURRENT"

new_key_material = os.urandom(SYMMETRIC_DEFAULT_MATERIAL_LENGTH)

rotation_entry = KeyRotationEntry(
key_id=self.metadata["Arn"],
rotation_date=datetime.datetime.now(),
rotation_type="ON_DEMAND",
key_material_state="CURRENT",
key_material_id=long_uid(),
key_material=new_key_material,
)
self.key_rotations.append(rotation_entry)

self.crypto_key = KmsCryptoKey(KeySpec.SYMMETRIC_DEFAULT, new_key_material)

def supports_rotation(self) -> bool:
return (
self.metadata.get("KeySpec") == KeySpec.SYMMETRIC_DEFAULT
and self.metadata.get("Origin") == OriginType.AWS_KMS
and self.metadata.get("KeyUsage") == KeyUsageType.ENCRYPT_DECRYPT
)


class KmsGrant:
Expand Down
85 changes: 85 additions & 0 deletions localstack-core/localstack/services/kms/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
GrantTokenType,
ImportKeyMaterialResponse,
ImportType,
IncludeKeyMaterial,
IncorrectKeyException,
InvalidCiphertextException,
InvalidGrantIdException,
Expand All @@ -78,6 +79,8 @@
ListGrantsResponse,
ListKeyPoliciesRequest,
ListKeyPoliciesResponse,
ListKeyRotationsRequest,
ListKeyRotationsResponse,
ListKeysRequest,
ListKeysResponse,
ListResourceTagsRequest,
Expand Down Expand Up @@ -1354,6 +1357,74 @@ def enable_key_rotation(
key.rotation_period_in_days = request.get("RotationPeriodInDays")
key._update_key_rotation_date()

@handler("ListKeyRotations", expand=False)
def list_key_rotations(
self, context: RequestContext, request: ListKeyRotationsRequest
) -> ListKeyRotationsResponse:
# https://docs.aws.amazon.com/kms/latest/developerguide/key-state.html
# "If the KMS key has imported key material or is in a custom key store: UnsupportedOperationException."
# We do not model that here, though.
# TODO: Add support for single-region EXTERNAL key origins
key = self._get_kms_key(
context.account_id,
context.region,
request.get("KeyId"),
disabled_key_allowed=True,
pending_deletion_key_allowed=True,
)

include_key_material = request.get("IncludeKeyMaterial")
if include_key_material is not None:
if include_key_material not in ["ALL_KEY_MATERIAL", "ROTATIONS_ONLY"]:
raise ValidationException(
f"1 validation error detected: Value '{include_key_material}' at 'includeKeyMaterial' "
f"failed to satisfy constraint: Member must satisfy enum value set: [ALL_KEY_MATERIAL, ROTATIONS_ONLY]"
)

if key.metadata["KeySpec"] != KeySpec.SYMMETRIC_DEFAULT:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per AWS docs, for asymmetric keys when IncludeKeyMaterial is not provided we simply return the Rotations as empty [].
See here: https://docs.aws.amazon.com/kms/latest/developerguide/symm-asymm-compare.html#key-type-table

raise ValidationException(
"Input parameter includeKeyMaterial can only be used with symmetric keys."
)

self._validate_key_supports_include_key_material(key)

rotation_history = []
if key.supports_rotation() or include_key_material is not None:
for rotation in key.key_rotations:
rotation_entry = {"KeyId": rotation.key_id}

if rotation.key_material_id is not None:
rotation_entry["KeyMaterialId"] = rotation.key_material_id
if rotation.key_material_state is not None:
rotation_entry["KeyMaterialState"] = rotation.key_material_state
if rotation.key_material_description is not None:
rotation_entry["KeyMaterialDescription"] = rotation.key_material_description
if rotation.rotation_date is not None:
rotation_entry["RotationDate"] = rotation.rotation_date
if rotation.rotation_type is not None:
rotation_entry["RotationType"] = rotation.rotation_type
if rotation.expiration_model is not None:
rotation_entry["ExpirationModel"] = rotation.expiration_model
if rotation.import_state is not None:
rotation_entry["ImportState"] = rotation.import_state
if rotation.valid_to is not None:
rotation_entry["ValidTo"] = rotation.valid_to

if include_key_material == IncludeKeyMaterial.ALL_KEY_MATERIAL:
rotation_history.append(rotation_entry)
else: # Default ROTATIONS_ONLY
if rotation.rotation_type in ["AUTOMATIC", "ON_DEMAND"]:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A KMS key on its creation doesn't have any rotation_type.

rotation_history.append(rotation_entry)

rotations_list = PaginatedList(rotation_history)
page, next_token = rotations_list.get_page(
lambda rotation: rotation.get("RotationDate"),
next_token=request.get("Marker"),
page_size=request.get("Limit", 100),
)
kwargs = {"NextMarker": next_token, "Truncated": True} if next_token else {}
return ListKeyRotationsResponse(Rotations=page, **kwargs)

@handler("ListKeyPolicies", expand=False)
def list_key_policies(
self, context: RequestContext, request: ListKeyPoliciesRequest
Expand Down Expand Up @@ -1605,6 +1676,20 @@ def _validate_plaintext_key_type_based(
f"Algorithm {encryption_algorithm} and key spec {key.metadata['KeySpec']} cannot encrypt data larger than {max_size_bytes} bytes."
)

def _validate_key_supports_include_key_material(self, key: KmsKey):
if key.metadata["Origin"] != OriginType.AWS_KMS:
raise UnsupportedOperationException(
f"{key.metadata['Arn']} origin is {key.metadata.get('Origin')} which is not valid for this operation."
)
if key.metadata["KeySpec"] != KeySpec.SYMMETRIC_DEFAULT:
raise UnsupportedOperationException(
f"The key spec {key.metadata['KeySpec']} is not valid for this operation."
)
if key.metadata["KeyUsage"] != KeyUsageType.ENCRYPT_DECRYPT:
raise UnsupportedOperationException(
f"The key usage {key.metadata.get('KeyUsage')} is not valid for this operation."
)


# ---------------
# UTIL FUNCTIONS
Expand Down
153 changes: 153 additions & 0 deletions tests/aws/services/kms/test_kms.py
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,159 @@ def test_rotate_key_on_demand_raises_error_given_key_with_imported_key_material(
aws_client.kms.rotate_key_on_demand(KeyId=key_id)
snapshot.match("error-response", e.value.response)

@markers.aws.validated
def test_list_key_rotations_after_on_demand_rotation(
self, kms_create_key, aws_client, snapshot
):
key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="SYMMETRIC_DEFAULT")["KeyId"]

snapshot.add_transformer(
snapshot.transform.key_value("KeyMaterialId", reference_replacement=False)
)

response_initial_rotation = aws_client.kms.list_key_rotations(KeyId=key_id)
snapshot.match("initial-rotations", response_initial_rotation)

aws_client.kms.rotate_key_on_demand(KeyId=key_id)

def _rotation_completed():
status = aws_client.kms.get_key_rotation_status(KeyId=key_id)
return "OnDemandRotationStartDate" not in status

assert poll_condition(condition=_rotation_completed, timeout=10, interval=1)

response_after_on_demand_rotation = aws_client.kms.list_key_rotations(KeyId=key_id)
snapshot.match("after-on-demand-rotation", response_after_on_demand_rotation)

assert (
len(response_after_on_demand_rotation["Rotations"])
== len(response_initial_rotation["Rotations"]) + 1
)

@markers.aws.validated
def test_list_key_rotations_different_key_states(self, kms_create_key, aws_client, snapshot):
key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="SYMMETRIC_DEFAULT")["KeyId"]

snapshot.add_transformer(
snapshot.transform.key_value("KeyMaterialId", reference_replacement=False)
)

response_enabled_key = aws_client.kms.list_key_rotations(
KeyId=key_id, IncludeKeyMaterial="ALL_KEY_MATERIAL"
)
snapshot.match("enabled-key-rotations", response_enabled_key)

aws_client.kms.disable_key(KeyId=key_id)
response_disabled_key = aws_client.kms.list_key_rotations(
KeyId=key_id, IncludeKeyMaterial="ALL_KEY_MATERIAL"
)
snapshot.match("disabled-key-rotations", response_disabled_key)

aws_client.kms.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7)
response_pending_deletion_key = aws_client.kms.list_key_rotations(
KeyId=key_id, IncludeKeyMaterial="ALL_KEY_MATERIAL"
)
snapshot.match("pending-deletion-key-rotations", response_pending_deletion_key)

@markers.aws.validated
def test_list_key_rotations_content(self, kms_create_key, aws_client, snapshot):
key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="SYMMETRIC_DEFAULT")["KeyId"]

snapshot.add_transformer(
snapshot.transform.key_value("KeyMaterialId", reference_replacement=False)
)

for _ in range(3):
aws_client.kms.rotate_key_on_demand(KeyId=key_id)

def _rotation_completed():
response_status = aws_client.kms.get_key_rotation_status(KeyId=key_id)
return "OnDemandRotationStartDate" not in response_status

assert poll_condition(condition=_rotation_completed, timeout=10, interval=1)

response_after_rotations = aws_client.kms.list_key_rotations(
KeyId=key_id, IncludeKeyMaterial="ALL_KEY_MATERIAL"
)
snapshot.match("multiple-rotations", response_after_rotations)

rotations = response_after_rotations["Rotations"]
assert len(rotations) >= 4 # 1 initial + 3 on-demand

for rotation in rotations:
assert "KeyId" in rotation
assert (
"RotationDate" in rotation or rotation.get("RotationDate") is None
) # None in initial
assert (
"RotationType" in rotation or rotation.get("RotationType") is None
) # None in initial
assert "KeyMaterialId" in rotation
assert "KeyMaterialState" in rotation

@markers.aws.validated
@pytest.mark.parametrize(
"key_spec,key_usage,expected_error",
[
("RSA_2048", "SIGN_VERIFY", "ValidationException"),
("RSA_4096", "ENCRYPT_DECRYPT", "ValidationException"),
("ECC_NIST_P256", "SIGN_VERIFY", "ValidationException"),
("HMAC_256", "GENERATE_VERIFY_MAC", "ValidationException"),
],
)
def test_list_key_rotations_unsupported_key_types(
self, kms_create_key, aws_client, snapshot, key_spec, key_usage, expected_error
):
key_id = kms_create_key(KeyUsage=key_usage, KeySpec=key_spec)["KeyId"]

with pytest.raises(ClientError) as e:
aws_client.kms.list_key_rotations(KeyId=key_id, IncludeKeyMaterial="ALL_KEY_MATERIAL")

assert expected_error in str(e.value)
error_response = e.value.response
assert error_response["Error"]["Code"] == expected_error
snapshot.match(f"{key_spec.lower()}-{key_usage.lower()}-error", error_response)

@markers.aws.validated
def test_list_key_rotations_include_key_material(self, kms_create_key, aws_client, snapshot):
key_id = kms_create_key(KeyUsage="ENCRYPT_DECRYPT", KeySpec="SYMMETRIC_DEFAULT")["KeyId"]

snapshot.add_transformer(
snapshot.transform.key_value("KeyMaterialId", reference_replacement=False)
)

aws_client.kms.rotate_key_on_demand(KeyId=key_id)

def _rotation_completed():
response_status = aws_client.kms.get_key_rotation_status(KeyId=key_id)
return "OnDemandRotationStartDate" not in response_status

assert poll_condition(condition=_rotation_completed, timeout=10, interval=1)

response_default = aws_client.kms.list_key_rotations(KeyId=key_id)
snapshot.match("default-key-material", response_default)

response_all_material = aws_client.kms.list_key_rotations(
KeyId=key_id, IncludeKeyMaterial="ALL_KEY_MATERIAL"
)
snapshot.match("all-key-material", response_all_material)
assert len(response_all_material["Rotations"]) >= 2 # Initial + on-demand

response_rotations_only = aws_client.kms.list_key_rotations(
KeyId=key_id, IncludeKeyMaterial="ROTATIONS_ONLY"
)
snapshot.match("rotations-only", response_rotations_only)

assert len(response_rotations_only["Rotations"]) < len(response_all_material["Rotations"])
assert all(
r["RotationType"] in ["AUTOMATIC", "ON_DEMAND"]
for r in response_rotations_only["Rotations"]
)

with pytest.raises(ClientError) as e:
aws_client.kms.list_key_rotations(KeyId=key_id, IncludeKeyMaterial="INVALID_VALUE")
snapshot.match("invalid-key-material", e.value.response)

@markers.aws.validated
@pytest.mark.parametrize("rotation_period_in_days", [90, 180])
def test_key_enable_rotation_status(
Expand Down
Loading
Loading