Skip to content

Feature: Firehose/Kinesis: Add resharding (split merge) kinesis event stream validated tests #10177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
24c8925
feat: use cleanup fixture for cleaning up firehose stream
maxhoheiser Jan 12, 2024
f8b8438
feat: align firehose delivery stream creation fixture
maxhoheiser Jan 21, 2024
f75ae4b
feat: align kinesis create stream fixture
maxhoheiser Jan 21, 2024
5827f4a
feat: add test name for role and policy if not given
maxhoheiser Jan 21, 2024
c983f14
feat: add snapshot test for multiple firehose delivery stream consumers
maxhoheiser Jan 21, 2024
1e93ef9
feat: add snapshot
maxhoheiser Jan 21, 2024
beb1192
feat: mark as validated
maxhoheiser Jan 21, 2024
f3d1962
feat: make default message reusable
maxhoheiser Jan 21, 2024
cbb6007
fix: use correct log groupe name
maxhoheiser Jan 21, 2024
ef8e9ed
feat: wait for role propagation instead of retry this handles differe…
maxhoheiser Jan 25, 2024
1462ab2
feat: fix typo
maxhoheiser Jan 25, 2024
4b5b9e0
feat: align naming
maxhoheiser Jan 25, 2024
41655fd
feat: align naming
maxhoheiser Jan 29, 2024
7f2d90a
fix: align updated names with snapshots
maxhoheiser Jan 31, 2024
cbb11ff
feat: add unique dynamo db lease table for each version of kcl
maxhoheiser Jan 31, 2024
6ddf1b4
CFn: improve error message for invalid ref (#10149)
simonrw Jan 31, 2024
bf16771
enhance parity for DynamoDB to Kinesis stream integration (#10143)
whummer Jan 31, 2024
54a8e73
Fix scenario tests failing in non-default region (#9149)
viren-nadkarni Feb 1, 2024
9230626
fix Kinesis rejection of log level (#10158)
bentsku Feb 1, 2024
baf352c
cloudwatch: fix functionality of metrics for multi-accounts and regio…
sannya-singal Feb 2, 2024
17e9613
StepFunctions, SNS Optimised Integration: Automatic Stringification o…
MEPalma Feb 5, 2024
75b10a1
fix dynamodb router rule removal (#10172)
thrau Feb 5, 2024
ad8fdc2
Fix bootstrap tests failing in non-default region (#10152)
viren-nadkarni Feb 5, 2024
e231c6a
feat: add resharding split test
maxhoheiser Feb 5, 2024
b8689a7
feat: add snapshot test for shard merging kinesis stream
maxhoheiser Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions localstack/services/cloudformation/engine/template_deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from localstack.utils.strings import to_bytes, to_str
from localstack.utils.threads import start_worker_thread

from localstack.services.cloudformation.models import * # noqa: F401, isort:skip
from localstack.services.cloudformation.models import * # noqa: F401, F403, isort:skip
from localstack.utils.urls import localstack_host

ACTION_CREATE = "create"
Expand Down Expand Up @@ -169,7 +169,9 @@ def resolve_ref(
# resource
resource = resources.get(ref)
if not resource:
raise Exception("Should be detected earlier.")
raise Exception(
f"Resource target for `Ref {ref}` could not be found. Is there a resource with name {ref} in your stack?"
)

return resources[ref].get("PhysicalResourceId")

Expand Down Expand Up @@ -325,7 +327,10 @@ def _resolve_refs_recursively(
)
# TODO: we should check the deployment state and not try to GetAtt from a resource that is still IN_PROGRESS or hasn't started yet.
if resolved_getatt is None:
raise DependencyNotYetSatisfied(resource_ids=resource_logical_id, message="")
raise DependencyNotYetSatisfied(
resource_ids=resource_logical_id,
message=f"Could not resolve attribute '{attribute_name}' on resource '{resource_logical_id}'",
)
return resolved_getatt

if stripped_fn_lower == "join":
Expand Down Expand Up @@ -361,7 +366,7 @@ def _resolve_refs_recursively(
none_values = [v for v in join_values if v is None]
if none_values:
raise Exception(
"Cannot resolve CF fn::Join %s due to null values: %s" % (value, join_values)
f"Cannot resolve CF Fn::Join {value} due to null values: {join_values}"
)
return value[keys_list[0]][0].join([str(v) for v in join_values])

Expand All @@ -375,7 +380,7 @@ def _resolve_refs_recursively(
item_to_sub[1].update(attr_refs)

for key, val in item_to_sub[1].items():
val = resolve_refs_recursively(
resolved_val = resolve_refs_recursively(
account_id,
region_name,
stack_name,
Expand All @@ -385,11 +390,13 @@ def _resolve_refs_recursively(
parameters,
val,
)
if not isinstance(val, str):
if not isinstance(resolved_val, str):
# We don't have access to the resource that's a dependency in this case,
# so do the best we can with the resource ids
raise DependencyNotYetSatisfied(resource_ids=key, message="")
result = result.replace("${%s}" % key, val)
raise DependencyNotYetSatisfied(
resource_ids=key, message=f"Could not resolve {val} to terminal value type"
)
result = result.replace("${%s}" % key, resolved_val)

# resolve placeholders
result = resolve_placeholders_in_string(
Expand Down Expand Up @@ -1284,7 +1291,7 @@ def do_apply_changes_in_loop(self, changes, stack):
break
if not updated:
raise Exception(
"Resource deployment loop completed, pending resource changes: %s" % changes
f"Resource deployment loop completed, pending resource changes: {changes}"
)

# clean up references to deleted resources in stack
Expand Down
31 changes: 20 additions & 11 deletions localstack/services/dynamodb/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
ItemFinder,
ItemSet,
SchemaExtractor,
de_dynamize_record,
extract_table_name_from_partiql_update,
)
from localstack.services.dynamodbstreams import dynamodbstreams_api
Expand All @@ -136,7 +137,7 @@
from localstack.utils.common import short_uid, to_bytes
from localstack.utils.json import BytesEncoder, canonical_json
from localstack.utils.scheduler import Scheduler
from localstack.utils.strings import long_uid, to_str
from localstack.utils.strings import long_uid, md5, to_str
from localstack.utils.threads import FuncThread, start_thread, start_worker_thread

# set up logger
Expand Down Expand Up @@ -165,7 +166,7 @@
MANAGED_KMS_KEYS = {}


def dynamodb_table_exists(table_name, client=None):
def dynamodb_table_exists(table_name: str, client=None):
client = client or connect_to().dynamodb
paginator = client.get_paginator("list_tables")
pages = paginator.paginate(PaginationConfig={"PageSize": 100})
Expand Down Expand Up @@ -216,8 +217,12 @@ def forward_to_kinesis_stream(records):
record["tableName"] = table_name
record.pop("eventSourceARN", None)
record["dynamodb"].pop("StreamViewType", None)
record.pop("eventVersion", None)
record["recordFormat"] = "application/json"
record["userIdentity"] = None
hash_keys = list(filter(lambda key: key["KeyType"] == "HASH", table_def["KeySchema"]))
partition_key = hash_keys[0]["AttributeName"]
# TODO: reverse properly how AWS creates the partition key, it seems to be an MD5 hash
kinesis_partition_key = md5(f"{table_name}{hash_keys[0]['AttributeName']}")

stream_account_id = extract_account_id_from_arn(stream_arn)
stream_region_name = extract_region_from_arn(stream_arn)
Expand All @@ -229,7 +234,7 @@ def forward_to_kinesis_stream(records):
kinesis.put_record(
StreamARN=stream_arn,
Data=json.dumps(record, cls=BytesEncoder),
PartitionKey=partition_key,
PartitionKey=kinesis_partition_key,
)

@classmethod
Expand Down Expand Up @@ -472,8 +477,7 @@ def on_before_start(self):

def on_before_stop(self):
self._expired_items_worker.stop()
for rule in self._router_rules:
ROUTER.remove_rule(rule)
ROUTER.remove(self._router_rules)

def accept_state_visitor(self, visitor: StateVisitor):
visitor.visit(dynamodb_stores)
Expand Down Expand Up @@ -1143,6 +1147,7 @@ def transact_write_items(
event_sources_or_streams_enabled
or has_streams_enabled(context.account_id, context.region, record["eventSourceARN"])
)
# TODO: forward selectively only for the streams that are enabled
if event_sources_or_streams_enabled:
self.forward_stream_records(context.account_id, context.region, records)

Expand Down Expand Up @@ -1647,7 +1652,8 @@ def prepare_transact_write_item_records(
new_record["eventSourceARN"] = arns.dynamodb_table_arn(
table_name, account_id=account_id, region_name=region_name
)
new_record["dynamodb"]["SizeBytes"] = _get_size_bytes(put_request["Item"])
record_item = de_dynamize_record(put_request["Item"])
new_record["dynamodb"]["SizeBytes"] = _get_size_bytes(record_item)
records.append(new_record)
i += 1
update_request = request.get("Update")
Expand Down Expand Up @@ -1847,9 +1853,11 @@ def should_throttle(self, action):
# ---
# Misc. util functions
# ---
def _get_size_bytes(item) -> int:


def _get_size_bytes(item: dict) -> int:
try:
size_bytes = len(json.dumps(item))
size_bytes = len(json.dumps(item, separators=(",", ":")))
except TypeError:
size_bytes = len(str(item))
return size_bytes
Expand Down Expand Up @@ -1886,9 +1894,10 @@ def is_index_query_valid(account_id: str, region_name: str, query_data: dict) ->
return True


def has_streams_enabled(account_id: str, region_name: str, table_name: str):
if not table_name:
def has_streams_enabled(account_id: str, region_name: str, table_name_or_arn: str):
if not table_name_or_arn:
return
table_name = table_name_or_arn.split(":table/")[-1]
table_arn = arns.dynamodb_table_arn(table_name, account_id=account_id, region_name=region_name)

if dynamodbstreams_api.get_stream_for_table(account_id, region_name, table_arn):
Expand Down
99 changes: 14 additions & 85 deletions localstack/services/dynamodb/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
import re
from decimal import Decimal
from typing import Dict, List, Mapping, Optional
from typing import Dict, List, Optional

from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from cachetools import TTLCache
from moto.core.exceptions import JsonRESTError

Expand Down Expand Up @@ -159,14 +159,11 @@ def find_existing_item(
if "Item" not in existing_item:
if "message" in existing_item:
table_names = ddb_client.list_tables()["TableNames"]
msg = (
"Unable to get item from DynamoDB (existing tables: %s ...truncated if >100 tables): %s"
% (
table_names,
existing_item["message"],
)
LOG.warning(
"Unable to get item from DynamoDB (existing tables: %s ...truncated if >100 tables): %s",
table_names,
existing_item["message"],
)
LOG.warning(msg)
return
return existing_item.get("Item")

Expand Down Expand Up @@ -202,87 +199,19 @@ def extract_table_name_from_partiql_update(statement: str) -> Optional[str]:
return match and match.group(2)


def dynamize_value(value):
def dynamize_value(value) -> dict:
"""
Taken from boto.dynamodb.types and augmented to support BOOL, M and L types (recursive), as well as fixing binary
encoding, already done later by the SDK.
Take a scalar Python value or dict/list and return a dict consisting of the Amazon DynamoDB type specification and
the value that needs to be sent to Amazon DynamoDB. If the type of the value is not supported, raise a TypeError
"""
dynamodb_type = _get_dynamodb_type(value)
if dynamodb_type == "N":
value = {dynamodb_type: _serialize_num(value)}
elif dynamodb_type in ("S", "BOOL", "B"):
value = {dynamodb_type: value}
elif dynamodb_type == "NS":
value = {dynamodb_type: list(map(_serialize_num, value))}
elif dynamodb_type in ("SS", "BS"):
value = {dynamodb_type: [n for n in value]}
elif dynamodb_type == "NULL":
value = {dynamodb_type: True}
elif dynamodb_type == "L":
value = {dynamodb_type: [dynamize_value(v) for v in value]}
elif dynamodb_type == "M":
value = {dynamodb_type: {k: dynamize_value(v) for k, v in value.items()}}
return TypeSerializer().serialize(value)

return value


def _get_dynamodb_type(val, use_boolean=True):
"""
Take a scalar Python value and return a string representing the corresponding Amazon DynamoDB type.
If the value passed in is not a supported type, raise a TypeError.
def de_dynamize_record(item: dict) -> dict:
"""
dynamodb_type = None
if val is None:
dynamodb_type = "NULL"
elif _is_num(val):
if isinstance(val, bool) and use_boolean:
dynamodb_type = "BOOL"
else:
dynamodb_type = "N"
elif _is_str(val):
dynamodb_type = "S"
elif isinstance(val, (set, frozenset)):
if False not in map(_is_num, val):
dynamodb_type = "NS"
elif False not in map(_is_str, val):
dynamodb_type = "SS"
elif False not in map(_is_binary, val):
dynamodb_type = "BS"
elif _is_binary(val):
dynamodb_type = "B"
elif isinstance(val, Mapping):
dynamodb_type = "M"
elif isinstance(val, list):
dynamodb_type = "L"
if dynamodb_type is None:
msg = 'Unsupported type "%s" for value "%s"' % (type(val), val)
raise TypeError(msg)
return dynamodb_type


def _is_num(n, boolean_as_int=True):
if boolean_as_int:
types = (int, float, Decimal, bool)
else:
types = (int, float, Decimal)

return isinstance(n, types) or n in types


def _is_str(n):
return isinstance(n, str) or isinstance(n, type) and issubclass(n, str)


def _is_binary(n):
return isinstance(n, bytes) # Binary is subclass of bytes.


def _serialize_num(val):
"""Cast a number to a string and perform
validation to ensure no loss of precision.
Return the given item in DynamoDB format parsed as regular dict object, i.e., convert
something like `{'foo': {'S': 'test'}, 'bar': {'N': 123}}` to `{'foo': 'test', 'bar': 123}`.
Note: This is the reverse operation of `dynamize_value(...)` above.
"""
if isinstance(val, bool):
return str(int(val))
return str(val)
deserializer = TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in item.items()}
2 changes: 1 addition & 1 deletion localstack/services/firehose/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def _startup():
region_name=context.region,
listener_func=listener_function,
wait_until_started=True,
ddb_lease_table_suffix="-firehose",
ddb_lease_table_suffix=f"-firehose-{delivery_stream_name}",
)

self.kinesis_listeners[delivery_stream_arn] = process
Expand Down
10 changes: 8 additions & 2 deletions localstack/services/kinesis/kinesis_mock_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,16 @@ def _create_kinesis_mock_server(self, account_id: str) -> KinesisMockServer:
if config.KINESIS_MOCK_LOG_LEVEL:
log_level = config.KINESIS_MOCK_LOG_LEVEL.upper()
elif config.LS_LOG:
if config.LS_LOG == "warning":
ls_log_level = config.LS_LOG.upper()
if ls_log_level == "WARNING":
log_level = "WARN"
elif ls_log_level == "TRACE-INTERNAL":
log_level = "TRACE"
elif ls_log_level not in ("ERROR", "WARN", "INFO", "DEBUG", "TRACE"):
# to protect from cases where the log level will be rejected from kinesis-mock
log_level = "INFO"
else:
log_level = config.LS_LOG.upper()
log_level = ls_log_level
else:
log_level = "INFO"
latency = config.KINESIS_LATENCY + "ms"
Expand Down
7 changes: 5 additions & 2 deletions localstack/services/lambda_/invocation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,33 @@
LOG = logging.getLogger(__name__)


def record_cw_metric_invocation(function_name: str, region_name: str):
def record_cw_metric_invocation(function_name: str, account_id: str, region_name: str):
try:
publish_lambda_metric(
"Invocations",
1,
{"func_name": function_name},
region_name=region_name,
account_id=account_id,
)
except Exception as e:
LOG.debug("Failed to send CloudWatch metric for Lambda invocation: %s", e)


def record_cw_metric_error(function_name: str, region_name: str):
def record_cw_metric_error(function_name: str, account_id: str, region_name: str):
try:
publish_lambda_metric(
"Invocations",
1,
{"func_name": function_name},
region_name=region_name,
account_id=account_id,
)
publish_lambda_metric(
"Errors",
1,
{"func_name": function_name},
account_id=account_id,
region_name=region_name,
)
except Exception as e:
Expand Down
2 changes: 2 additions & 0 deletions localstack/services/lambda_/invocation/version_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ def invoke(self, *, invocation: Invocation) -> InvocationResult:
start_thread(
lambda *args, **kwargs: record_cw_metric_error(
function_name=self.function.function_name,
account_id=self.function_version.id.account,
region_name=self.function_version.id.region,
),
name=f"record-cloudwatch-metric-error-{function_id.function_name}:{function_id.qualifier}",
Expand All @@ -237,6 +238,7 @@ def invoke(self, *, invocation: Invocation) -> InvocationResult:
start_thread(
lambda *args, **kwargs: record_cw_metric_invocation(
function_name=self.function.function_name,
account_id=self.function_version.id.account,
region_name=self.function_version.id.region,
),
name=f"record-cloudwatch-metric-{function_id.function_name}:{function_id.qualifier}",
Expand Down
Loading