Skip to content

Commit cf32f3d

Browse files
KMS HMAC support (localstack#8047)
1 parent 2e86662 commit cf32f3d

File tree

4 files changed

+685
-4
lines changed

4 files changed

+685
-4
lines changed

localstack/services/kms/models.py

Lines changed: 87 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import logging
55
import os
6+
import random
67
import re
78
import struct
89
import uuid
@@ -11,7 +12,7 @@
1112
from typing import Dict, List, Tuple
1213

1314
from cryptography.exceptions import InvalidSignature
14-
from cryptography.hazmat.primitives import hashes
15+
from cryptography.hazmat.primitives import hashes, hmac
1516
from cryptography.hazmat.primitives import serialization as crypto_serialization
1617
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa, utils
1718

@@ -24,8 +25,10 @@
2425
DisabledException,
2526
EncryptionContextType,
2627
KeyMetadata,
28+
KMSInvalidMacException,
2729
KMSInvalidSignatureException,
2830
KMSInvalidStateException,
31+
MacAlgorithmSpec,
2932
MessageType,
3033
NotFoundException,
3134
SigningAlgorithmSpec,
@@ -58,6 +61,13 @@
5861
"ECC_SECG_P256K1": ec.SECP256K1(),
5962
}
6063

64+
HMAC_RANGE_KEY_LENGTHS = {
65+
"HMAC_224": (28, 64),
66+
"HMAC_256": (32, 64),
67+
"HMAC_384": (48, 128),
68+
"HMAC_512": (64, 128),
69+
}
70+
6171

6272
class ValidationException(CommonServiceException):
6373
def __init__(self, message: str):
@@ -162,9 +172,19 @@ def __init__(self, key_spec: str):
162172
elif key_spec.startswith("ECC"):
163173
curve = ECC_CURVES.get(key_spec)
164174
self.key = ec.generate_private_key(curve)
175+
elif key_spec.startswith("HMAC"):
176+
if key_spec not in HMAC_RANGE_KEY_LENGTHS:
177+
raise ValidationException(
178+
f"1 validation error detected: Value '{key_spec}' at 'keySpec' "
179+
f"failed to satisfy constraint: Member must satisfy enum value set: "
180+
f"[RSA_2048, ECC_NIST_P384, ECC_NIST_P256, ECC_NIST_P521, HMAC_384, RSA_3072, "
181+
f"ECC_SECG_P256K1, RSA_4096, SYMMETRIC_DEFAULT, HMAC_256, HMAC_224, HMAC_512]"
182+
)
183+
minimum_length, maximum_length = HMAC_RANGE_KEY_LENGTHS.get(key_spec)
184+
self.key_material = os.urandom(random.randint(minimum_length, maximum_length))
185+
return
165186
else:
166-
# Currently we do not support HMAC keys - symmetric keys that are used for GenerateMac / VerifyMac.
167-
# We also do not support SM2 - asymmetric keys both suitable for ENCRYPT_DECRYPT and SIGN_VERIFY,
187+
# We do not support SM2 - asymmetric keys both suitable for ENCRYPT_DECRYPT and SIGN_VERIFY,
168188
# but only used in China AWS regions.
169189
raise UnsupportedOperationException(f"KeySpec {key_spec} is not supported")
170190

@@ -212,6 +232,20 @@ def __init__(
212232
def calculate_and_set_arn(self, account_id, region):
213233
self.metadata["Arn"] = kms_key_arn(self.metadata.get("KeyId"), account_id, region)
214234

235+
def generate_mac(self, msg: bytes, mac_algorithm: MacAlgorithmSpec) -> bytes:
236+
h = self._get_hmac_context(mac_algorithm)
237+
h.update(msg)
238+
return h.finalize()
239+
240+
def verify_mac(self, msg: bytes, mac: bytes, mac_algorithm: MacAlgorithmSpec) -> bool:
241+
h = self._get_hmac_context(mac_algorithm)
242+
h.update(msg)
243+
try:
244+
h.verify(mac)
245+
return True
246+
except InvalidSignature:
247+
raise KMSInvalidMacException()
248+
215249
# Encrypt is a method of KmsKey and not of KmsCryptoKey only because it requires KeyId, and KmsCryptoKeys do not
216250
# hold KeyIds. Maybe it would be possible to remodel this better.
217251
def encrypt(self, plaintext: bytes) -> bytes:
@@ -254,6 +288,23 @@ def verify(
254288
# AWS itself raises this exception without any additional message.
255289
raise KMSInvalidSignatureException()
256290

291+
def _get_hmac_context(self, mac_algorithm: MacAlgorithmSpec) -> hmac.HMAC:
292+
if mac_algorithm == "HMAC_SHA_224":
293+
h = hmac.HMAC(self.crypto_key.key_material, hashes.SHA224())
294+
elif mac_algorithm == "HMAC_SHA_256":
295+
h = hmac.HMAC(self.crypto_key.key_material, hashes.SHA256())
296+
elif mac_algorithm == "HMAC_SHA_384":
297+
h = hmac.HMAC(self.crypto_key.key_material, hashes.SHA384())
298+
elif mac_algorithm == "HMAC_SHA_512":
299+
h = hmac.HMAC(self.crypto_key.key_material, hashes.SHA512())
300+
else:
301+
raise ValidationException(
302+
f"1 validation error detected: Value '{mac_algorithm}' at 'macAlgorithm' "
303+
f"failed to satisfy constraint: Member must satisfy enum value set: "
304+
f"[HMAC_SHA_384, HMAC_SHA_256, HMAC_SHA_224, HMAC_SHA_512]"
305+
)
306+
return h
307+
257308
def _construct_sign_verify_kwargs(
258309
self, signing_algorithm: SigningAlgorithmSpec, message_type: MessageType
259310
) -> Dict:
@@ -318,7 +369,6 @@ def _populate_metadata(
318369
# "DescribeKey does not return the following information: ... Tags on the KMS key."
319370

320371
self.metadata["Description"] = create_key_request.get("Description") or ""
321-
self.metadata["KeyUsage"] = create_key_request.get("KeyUsage") or "ENCRYPT_DECRYPT"
322372
self.metadata["MultiRegion"] = create_key_request.get("MultiRegion") or False
323373
self.metadata["Origin"] = create_key_request.get("Origin") or "AWS_KMS"
324374
# https://docs.aws.amazon.com/kms/latest/APIReference/API_CreateKey.html#KMS-CreateKey-request-CustomerMasterKeySpec
@@ -330,6 +380,9 @@ def _populate_metadata(
330380
or "SYMMETRIC_DEFAULT"
331381
)
332382
self.metadata["CustomerMasterKeySpec"] = self.metadata.get("KeySpec")
383+
self.metadata["KeyUsage"] = self._get_key_usage(
384+
create_key_request.get("KeyUsage"), self.metadata.get("KeySpec")
385+
)
333386

334387
# Metadata fields AWS introduces automatically
335388
self.metadata["AWSAccountId"] = account_id or get_aws_account_id()
@@ -353,6 +406,7 @@ def _populate_metadata(
353406
self._populate_signing_algorithms(
354407
self.metadata.get("KeyUsage"), self.metadata.get("KeySpec")
355408
)
409+
self._populate_mac_algorithms(self.metadata.get("KeyUsage"), self.metadata.get("KeySpec"))
356410

357411
def add_tags(self, tags: List) -> None:
358412
# Just in case we get None from somewhere.
@@ -430,6 +484,35 @@ def _populate_signing_algorithms(self, key_usage: str, key_spec: str) -> None:
430484
"RSASSA_PSS_SHA_512",
431485
]
432486

487+
def _populate_mac_algorithms(self, key_usage: str, key_spec: str) -> None:
488+
if key_usage != "GENERATE_VERIFY_MAC":
489+
return
490+
if key_spec == "HMAC_224":
491+
self.metadata["MacAlgorithms"] = ["HMAC_SHA_224"]
492+
elif key_spec == "HMAC_256":
493+
self.metadata["MacAlgorithms"] = ["HMAC_SHA_256"]
494+
elif key_spec == "HMAC_384":
495+
self.metadata["MacAlgorithms"] = ["HMAC_SHA_384"]
496+
elif key_spec == "HMAC_512":
497+
self.metadata["MacAlgorithms"] = ["HMAC_SHA_512"]
498+
499+
def _get_key_usage(self, request_key_usage: str, key_spec: str) -> str:
500+
if key_spec in HMAC_RANGE_KEY_LENGTHS:
501+
if request_key_usage is None:
502+
raise ValidationException(
503+
"You must specify a KeyUsage value for all KMS keys except for symmetric encryption keys."
504+
)
505+
elif request_key_usage != "GENERATE_VERIFY_MAC":
506+
raise ValidationException(
507+
f"1 validation error detected: Value '{request_key_usage}' at 'keyUsage' "
508+
f"failed to satisfy constraint: Member must satisfy enum value set: "
509+
f"[ENCRYPT_DECRYPT, SIGN_VERIFY, GENERATE_VERIFY_MAC]"
510+
)
511+
else:
512+
return "GENERATE_VERIFY_MAC"
513+
else:
514+
return request_key_usage or "ENCRYPT_DECRYPT"
515+
433516

434517
class KmsGrant:
435518
# AWS documentation doesn't seem to mention any metadata object for grants like it does mention KeyMetadata for

localstack/services/kms/provider.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
GenerateDataKeyResponse,
4040
GenerateDataKeyWithoutPlaintextRequest,
4141
GenerateDataKeyWithoutPlaintextResponse,
42+
GenerateMacRequest,
43+
GenerateMacResponse,
4244
GenerateRandomRequest,
4345
GenerateRandomResponse,
4446
GetKeyPolicyRequest,
@@ -68,6 +70,7 @@
6870
ListKeysResponse,
6971
ListResourceTagsRequest,
7072
ListResourceTagsResponse,
73+
MacAlgorithmSpec,
7174
MarkerType,
7275
NotFoundException,
7376
PlaintextType,
@@ -84,6 +87,8 @@
8487
UntagResourceRequest,
8588
UpdateAliasRequest,
8689
UpdateKeyDescriptionRequest,
90+
VerifyMacRequest,
91+
VerifyMacResponse,
8792
VerifyRequest,
8893
VerifyResponse,
8994
WrappingKeySpec,
@@ -533,6 +538,46 @@ def generate_data_key_without_plaintext(
533538
result.pop("Plaintext")
534539
return GenerateDataKeyWithoutPlaintextResponse(**result)
535540

541+
@handler("GenerateMac", expand=False)
542+
def generate_mac(
543+
self,
544+
context: RequestContext,
545+
request: GenerateMacRequest,
546+
) -> GenerateMacResponse:
547+
msg = request.get("Message")
548+
self._validate_mac_msg_length(msg)
549+
550+
key = self._get_key(context, request.get("KeyId"))
551+
self._validate_key_for_generate_verify_mac(key)
552+
553+
algorithm = request.get("MacAlgorithm")
554+
self._validate_mac_algorithm(key, algorithm)
555+
556+
mac = key.generate_mac(msg, algorithm)
557+
558+
return GenerateMacResponse(Mac=mac, MacAlgorithm=algorithm, KeyId=key.metadata.get("Arn"))
559+
560+
@handler("VerifyMac", expand=False)
561+
def verify_mac(
562+
self,
563+
context: RequestContext,
564+
request: VerifyMacRequest,
565+
) -> VerifyMacResponse:
566+
msg = request.get("Message")
567+
self._validate_mac_msg_length(msg)
568+
569+
key = self._get_key(context, request.get("KeyId"))
570+
self._validate_key_for_generate_verify_mac(key)
571+
572+
algorithm = request.get("MacAlgorithm")
573+
self._validate_mac_algorithm(key, algorithm)
574+
575+
mac_valid = key.verify_mac(msg, request.get("Mac"), algorithm)
576+
577+
return VerifyMacResponse(
578+
KeyId=key.metadata.get("Arn"), MacValid=mac_valid, MacAlgorithm=algorithm
579+
)
580+
536581
@handler("Sign", expand=False)
537582
def sign(self, context: RequestContext, request: SignRequest) -> SignResponse:
538583
key = self._get_key(context, request.get("KeyId"))
@@ -889,6 +934,34 @@ def _validate_key_for_sign_verify(self, key: KmsKey):
889934
"KeyUsage for signing / verification key should be SIGN_VERIFY"
890935
)
891936

937+
def _validate_key_for_generate_verify_mac(self, key: KmsKey):
938+
if key.metadata["KeyUsage"] != "GENERATE_VERIFY_MAC":
939+
raise InvalidKeyUsageException(
940+
"KeyUsage for generate / verify mac should be GENERATE_VERIFY_MAC"
941+
)
942+
943+
def _validate_mac_msg_length(self, msg: bytes):
944+
if len(msg) > 4096:
945+
raise ValidationException(
946+
"1 validation error detected: Value at 'message' failed to satisfy constraint: "
947+
"Member must have length less than or equal to 4096"
948+
)
949+
950+
def _validate_mac_algorithm(self, key: KmsKey, algorithm: str):
951+
if not hasattr(MacAlgorithmSpec, algorithm):
952+
raise ValidationException(
953+
f"1 validation error detected: Value '{algorithm}' at 'macAlgorithm' "
954+
f"failed to satisfy constraint: Member must satisfy enum value set: "
955+
f"[HMAC_SHA_384, HMAC_SHA_256, HMAC_SHA_224, HMAC_SHA_512]"
956+
)
957+
958+
key_spec = key.metadata["KeySpec"]
959+
if x := algorithm.split("_"):
960+
if len(x) == 3 and x[0] + "_" + x[2] != key_spec:
961+
raise InvalidKeyUsageException(
962+
f"Algorithm {algorithm} is incompatible with key spec {key_spec}."
963+
)
964+
892965
def _validate_grant_request(self, data: Dict, store: KmsStore):
893966
if "KeyId" not in data or "GranteePrincipal" not in data or "Operations" not in data:
894967
raise ValidationError("Grant ID, key ID and grantee principal must be specified")

0 commit comments

Comments
 (0)