Skip to content

Bugfix/eventbridge/step functions target id input key #12099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions localstack-core/localstack/services/events/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,14 @@ def client(self):
return self._client

@abstractmethod
def send_event(self, event: FormattedEvent | TransformedEvent):
def send_event(
self, event: FormattedEvent | TransformedEvent, input_event: FormattedEvent | None = None
):
pass

def process_event(self, event: FormattedEvent):
"""Processes the event and send it to the target."""
input_event = event.copy()
if input_ := self.target.get("Input"):
event = json.loads(input_)
if isinstance(event, dict):
Expand All @@ -208,7 +211,7 @@ def process_event(self, event: FormattedEvent):
if input_transformer := self.target.get("InputTransformer"):
event = self.transform_event_with_target_input_transformer(input_transformer, event)
if event:
self.send_event(event)
self.send_event(event, input_event)
else:
LOG.info("No event to send to target %s", self.target.get("Id"))

Expand Down Expand Up @@ -541,14 +544,14 @@ def send_event(self, event):


class KinesisTargetSender(TargetSender):
def send_event(self, event):
def send_event(self, event, input_event):
partition_key_path = collections.get_safe(
self.target,
"$.KinesisParameters.PartitionKeyPath",
default_value="$.id",
)
stream_name = self.target["Arn"].split("/")[-1]
partition_key = collections.get_safe(event, partition_key_path, event["id"])
partition_key = collections.get_safe(input_event, partition_key_path, input_event["id"])
self.client.put_record(
StreamName=stream_name,
Data=to_bytes(to_json_str(event)),
Expand Down Expand Up @@ -626,10 +629,10 @@ def send_event(self, event):
class StatesTargetSender(TargetSender):
"""Step Functions Target Sender"""

def send_event(self, event):
def send_event(self, event, input_event):
self.service = "stepfunctions"
self.client.start_execution(
stateMachineArn=self.target["Arn"], name=event["id"], input=to_json_str(event)
stateMachineArn=self.target["Arn"], name=input_event["id"], input=to_json_str(event)
)

def _validate_input(self, target: Target):
Expand Down
120 changes: 120 additions & 0 deletions tests/aws/services/events/test_events_targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,9 +815,12 @@ def test_put_events_with_target_firehose(


class TestEventsTargetKinesis:
@pytest.mark.parametrize("with_input_transformer", [True, False])
@pytest.mark.skipif(is_old_provider(), reason="not supported by the old provider")
@markers.aws.validated
def test_put_events_with_target_kinesis(
self,
with_input_transformer,
kinesis_create_stream,
wait_for_stream_ready,
create_iam_role_with_policy,
Expand Down Expand Up @@ -872,6 +875,11 @@ def test_put_events_with_target_kinesis(
EventPattern=json.dumps(TEST_EVENT_PATTERN),
)

input_transformer = {
"InputPathsMap": {"payload": "$.detail.payload"},
"InputTemplate": '{"payload": <payload>}',
}
kwargs = {"InputTransformer": input_transformer} if with_input_transformer else {}
target_id = f"target-{short_uid()}"
aws_client.events.put_targets(
Rule=rule_name,
Expand All @@ -882,6 +890,7 @@ def test_put_events_with_target_kinesis(
"Arn": stream_arn,
"RoleArn": event_bridge_bus_to_kinesis_role_arn,
"KinesisParameters": {"PartitionKeyPath": "$.detail-type"},
**kwargs,
}
],
)
Expand Down Expand Up @@ -1419,3 +1428,114 @@ def _assert_messages():
assert len(messages) > 0

retry(_assert_messages, **retry_config)

@markers.aws.validated
@pytest.mark.skipif(is_old_provider(), reason="not supported by the old provider")
def test_put_events_with_target_statefunction_machine_and_input_transformer(
self, infrastructure_setup, aws_client, snapshot
):
infra = infrastructure_setup(namespace="EventsTests")
stack_name = "stack-events-target-stepfunctions"
stack = cdk.Stack(infra.cdk_app, stack_name=stack_name)

bus_name = "MyEventBus"
bus = cdk.aws_events.EventBus(stack, "MyEventBus", event_bus_name=bus_name)

queue = cdk.aws_sqs.Queue(stack, "MyQueue", queue_name="MyQueue")

send_to_sqs_task = cdk.aws_stepfunctions_tasks.SqsSendMessage(
stack,
"SendToQueue",
queue=queue,
message_body=cdk.aws_stepfunctions.TaskInput.from_object(
{"message": cdk.aws_stepfunctions.JsonPath.entire_payload}
),
)

state_machine = cdk.aws_stepfunctions.StateMachine(
stack,
"MyStateMachine",
definition=send_to_sqs_task,
state_machine_name="MyStateMachine",
)

detail_type = "myDetailType"
rule = cdk.aws_events.Rule(
stack,
"MyRule",
event_bus=bus,
event_pattern=cdk.aws_events.EventPattern(detail_type=[detail_type]),
)

input_transformer_property = cdk.aws_events.CfnRule.InputTransformerProperty(
input_template="Message with key <detail-key>",
input_paths_map={"detail-key": "$.detail.Key1"},
)
rule.add_target(
cdk.aws_events_targets.SfnStateMachine(
state_machine,
input=cdk.aws_events.RuleTargetInput.from_object(input_transformer_property),
)
)

cdk.CfnOutput(stack, "MachineArn", value=state_machine.state_machine_arn)
cdk.CfnOutput(stack, "QueueUrl", value=queue.queue_url)

with infra.provisioner() as prov:
outputs = prov.get_stack_outputs(stack_name=stack_name)

entries = [
{
"Source": "com.sample.resource",
"DetailType": detail_type,
"Detail": json.dumps({"Key1": "Value"}),
"EventBusName": bus_name,
}
for i in range(5)
]
put_events = aws_client.events.put_events(Entries=entries)

state_machine_arn = outputs["MachineArn"]

def _assert_executions():
executions = (
aws_client.stepfunctions.get_paginator("list_executions")
.paginate(stateMachineArn=state_machine_arn)
.build_full_result()
)
assert len(executions["executions"]) > 0

matched_executions = [
e
for e in executions["executions"]
if e["name"].startswith(put_events["Entries"][0]["EventId"])
]
assert len(matched_executions) > 0

retry_config = {
"retries": (20 if is_aws_cloud() else 5),
"sleep": (2 if is_aws_cloud() else 1),
"sleep_before": (2 if is_aws_cloud() else 0),
}
retry(_assert_executions, **retry_config)

messages = []
queue_url = outputs["QueueUrl"]

def _get_messages():
queue_msgs = aws_client.sqs.receive_message(QueueUrl=queue_url)
for msg in queue_msgs.get("Messages", []):
messages.append(msg)

assert len(messages) > 0
return messages

messages = retry(_get_messages, **retry_config)

snapshot.add_transformers_list(
[
snapshot.transform.key_value("ReceiptHandle", reference_replacement=False),
snapshot.transform.key_value("MD5OfBody", reference_replacement=False),
]
)
snapshot.match("messages", messages)
60 changes: 60 additions & 0 deletions tests/aws/services/events/test_events_targets.snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -899,5 +899,65 @@
}
]
}
},
"tests/aws/services/events/test_events_targets.py::TestEventsTargetKinesis::test_put_events_with_target_kinesis[True]": {
"recorded-date": "03-01-2025, 15:02:29",
"recorded-content": {
"response": {
"payload": {
"acc_id": "0a787ecb-4015",
"sf_id": "baz"
}
}
}
},
"tests/aws/services/events/test_events_targets.py::TestEventsTargetKinesis::test_put_events_with_target_kinesis[False]": {
"recorded-date": "03-01-2025, 15:03:08",
"recorded-content": {
"response": {
"version": "0",
"id": "<uuid:1>",
"detail-type": "core.update-account-command",
"source": "core.update-account-command",
"account": "111111111111",
"time": "date",
"region": "<region>",
"resources": [],
"detail": {
"command": "update-account",
"payload": {
"acc_id": "0a787ecb-4015",
"sf_id": "baz"
}
}
}
}
},
"tests/aws/services/events/test_events_targets.py::TestEventsTargetStepFunctions::test_put_events_with_target_statefunction_machine_and_input_transformer": {
"recorded-date": "03-01-2025, 15:19:29",
"recorded-content": {
"messages": [
{
"MessageId": "<uuid:1>",
"ReceiptHandle": "receipt-handle",
"MD5OfBody": "m-d5-of-body",
"Body": {
"message": {
"version": "0",
"id": "<uuid:2>",
"detail-type": "myDetailType",
"source": "com.sample.resource",
"account": "111111111111",
"time": "date",
"region": "<region>",
"resources": [],
"detail": {
"Key1": "Value"
}
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
"tests/aws/services/events/test_events_targets.py::TestEventsTargetKinesis::test_put_events_with_target_kinesis": {
"last_validated_date": "2024-07-11T09:04:25+00:00"
},
"tests/aws/services/events/test_events_targets.py::TestEventsTargetKinesis::test_put_events_with_target_kinesis[False]": {
"last_validated_date": "2025-01-03T15:03:08+00:00"
},
"tests/aws/services/events/test_events_targets.py::TestEventsTargetKinesis::test_put_events_with_target_kinesis[True]": {
"last_validated_date": "2025-01-03T15:02:29+00:00"
},
"tests/aws/services/events/test_events_targets.py::TestEventsTargetLambda::test_put_events_with_target_lambda": {
"last_validated_date": "2024-07-11T09:04:48+00:00"
},
Expand Down Expand Up @@ -47,6 +53,9 @@
"tests/aws/services/events/test_events_targets.py::TestEventsTargetStepFunctions::test_put_events_with_target_statefunction_machine": {
"last_validated_date": "2024-08-29T18:06:56+00:00"
},
"tests/aws/services/events/test_events_targets.py::TestEventsTargetStepFunctions::test_put_events_with_target_statefunction_machine_and_input_transformer": {
"last_validated_date": "2025-01-03T15:19:29+00:00"
},
"tests/aws/services/events/test_events_targets.py::test_put_events_with_target_lambda_list_entries_partial_match": {
"last_validated_date": "2024-04-08T17:36:24+00:00"
},
Expand Down
Loading