Skip to content

Feature/EventBridge: Clearn up rule-processing logic #12302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 46 additions & 45 deletions localstack-core/localstack/services/events/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1837,8 +1837,20 @@ def _process_entry(
processed_entries.append({"EventId": event_formatted["id"]})

if configured_rules := list(event_bus.rules.values()):
rules_matched = []
for rule in configured_rules:
self._process_rules(rule, region, account_id, event_formatted)
if matches_event(rule.event_pattern, event_formatted):
self._process_rule(rule, region, account_id, event_formatted)
rules_matched.append(True)
if not any(rules_matched):
LOG.info(
json.dumps(
{
"InfoCode": "InternalInfoEvents at matches_rule",
"InfoMessage": f"No rules matched for formatted event: {event_formatted}",
}
)
)
else:
LOG.info(
json.dumps(
Expand All @@ -1853,60 +1865,49 @@ def _proxy_capture_input_event(self, event: FormattedEvent) -> None:
# only required for eventstudio to capture input event if no rule is configured
pass

def _process_rules(
def _process_rule(
self,
rule: Rule,
region: str,
account_id: str,
event_formatted: FormattedEvent,
) -> None:
) -> bool:
"""Process rules for an event. Note that we no longer handle entries here as AWS returns success regardless of target failures."""
event_pattern = rule.event_pattern

if matches_event(event_pattern, event_formatted):
if not rule.targets:
LOG.info(
json.dumps(
{
"InfoCode": "InternalInfoEvents at iterate over targets",
"InfoMessage": f"No target configured for matched rule: {rule}",
}
)
)
return

for target in rule.targets.values():
target_id = target["Id"]
if is_archive_arn(target["Arn"]):
self._put_to_archive(
region,
account_id,
archive_target_id=target_id,
event=event_formatted,
)
else:
target_unique_id = f"{rule.arn}-{target_id}"
target_sender = self._target_sender_store[target_unique_id]
try:
target_sender.process_event(event_formatted.copy())
rule_invocation.record(target_sender.service)
except Exception as error:
rule_error.record(target_sender.service)
# Log the error but don't modify the response
LOG.info(
json.dumps(
{
"ErrorCode": "TargetDeliveryFailure",
"ErrorMessage": f"Failed to deliver to target {target_id}: {str(error)}",
}
)
)
else:
if not rule.targets:
LOG.info(
json.dumps(
{
"InfoCode": "InternalInfoEvents at matches_rule",
"InfoMessage": f"No rules matched for formatted event: {event_formatted}",
"InfoCode": "InternalInfoEvents at iterate over targets",
"InfoMessage": f"No target configured for matched rule: {rule}",
}
)
)
return

for target in rule.targets.values():
target_id = target["Id"]
if is_archive_arn(target["Arn"]):
self._put_to_archive(
region,
account_id,
archive_target_id=target_id,
event=event_formatted,
)
else:
target_unique_id = f"{rule.arn}-{target_id}"
target_sender = self._target_sender_store[target_unique_id]
try:
target_sender.process_event(event_formatted.copy())
rule_invocation.record(target_sender.service)
except Exception as error:
rule_error.record(target_sender.service)
# Log the error but don't modify the response
LOG.info(
json.dumps(
{
"ErrorCode": "TargetDeliveryFailure",
"ErrorMessage": f"Failed to deliver to target {target_id}: {str(error)}",
}
)
)
Loading