Skip to content

LDM Migration #12852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
PipeTargetParameters,
)
from localstack.services.lambda_ import hooks as lambda_hooks
from localstack.services.lambda_ import ldm
from localstack.services.lambda_.event_source_mapping.esm_event_processor import (
EsmEventProcessor,
)
Expand All @@ -39,10 +40,6 @@
from localstack.services.lambda_.event_source_mapping.senders.lambda_sender import LambdaSender
from localstack.utils.aws.arns import parse_arn
from localstack.utils.aws.client_types import ServicePrincipal
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS,
is_lambda_debug_mode,
)


class PollerHolder:
Expand All @@ -65,8 +62,8 @@ def get_esm_worker(self) -> EsmWorker:
# Sender (always Lambda)
function_arn = self.esm_config["FunctionArn"]

if is_lambda_debug_mode():
timeout_seconds = DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
if ldm.IS_LDM_ENABLED:
timeout_seconds = ldm.DEFAULT_LDM_TIMEOUT_SECONDS
else:
# 900s is the maximum amount of time a Lambda can run for.
lambda_max_timeout_seconds = 900
Expand Down
7 changes: 6 additions & 1 deletion localstack-core/localstack/services/lambda_/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@

from localstack.runtime.hooks import hook_spec

HOOKS_LAMBDA_CREATE_FUNCTION_VERSION = "localstack.hooks.lambda_create_function_version"
HOOKS_LAMBDA_DELETE_FUNCTION_VERSION = "localstack.hooks.lambda_delete_function_version"
HOOKS_LAMBDA_START_DOCKER_EXECUTOR = "localstack.hooks.lambda_start_docker_executor"
HOOKS_LAMBDA_PREPARE_DOCKER_EXECUTOR = "localstack.hooks.lambda_prepare_docker_executors"
HOOKS_LAMBDA_INJECT_LAYER_FETCHER = "localstack.hooks.lambda_inject_layer_fetcher"
HOOKS_LAMBDA_INJECT_LDM_PROVISIONER = "localstack.hooks.lambda_inject_ldm_provisioner"
HOOKS_LAMBDA_PREBUILD_ENVIRONMENT_IMAGE = "localstack.hooks.lambda_prebuild_environment_image"
HOOKS_LAMBDA_CREATE_EVENT_SOURCE_POLLER = "localstack.hooks.lambda_create_event_source_poller"
HOOKS_LAMBDA_SET_EVENT_SOURCE_CONFIG_DEFAULTS = (
"localstack.hooks.lambda_set_event_source_config_defaults"
)


create_function_version = hook_spec(HOOKS_LAMBDA_CREATE_FUNCTION_VERSION)
delete_function_version = hook_spec(HOOKS_LAMBDA_DELETE_FUNCTION_VERSION)
start_docker_executor = hook_spec(HOOKS_LAMBDA_START_DOCKER_EXECUTOR)
prepare_docker_executor = hook_spec(HOOKS_LAMBDA_PREPARE_DOCKER_EXECUTOR)
inject_layer_fetcher = hook_spec(HOOKS_LAMBDA_INJECT_LAYER_FETCHER)
inject_ldm_provisioner = hook_spec(HOOKS_LAMBDA_INJECT_LDM_PROVISIONER)
prebuild_environment_image = hook_spec(HOOKS_LAMBDA_PREBUILD_ENVIRONMENT_IMAGE)
create_event_source_poller = hook_spec(HOOKS_LAMBDA_CREATE_EVENT_SOURCE_POLLER)
set_event_source_config_defaults = hook_spec(HOOKS_LAMBDA_SET_EVENT_SOURCE_CONFIG_DEFAULTS)
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@
InitializationType,
OtherServiceEndpoint,
)
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
is_lambda_debug_enabled_for,
is_lambda_debug_timeout_enabled_for,
)

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -79,10 +75,7 @@ def get_environment(

try:
yield execution_environment
if is_lambda_debug_timeout_enabled_for(lambda_arn=function_version.qualified_arn):
self.stop_environment(execution_environment)
else:
execution_environment.release()
execution_environment.release()
except InvalidStatusException as invalid_e:
LOG.error("InvalidStatusException: %s", invalid_e)
except Exception as e:
Expand Down Expand Up @@ -143,21 +136,6 @@ def scale_provisioned_concurrency(
function_version: FunctionVersion,
target_provisioned_environments: int,
) -> list[Future[None]]:
# Enforce a single environment per lambda version if this is a target
# of an active Lambda Debug Mode.
qualified_lambda_version_arn = function_version.qualified_arn
if (
is_lambda_debug_enabled_for(qualified_lambda_version_arn)
and target_provisioned_environments > 0
):
LOG.warning(
"Environments for '%s' enforced to '1' by Lambda Debug Mode, "
"configurations will continue to report the set value '%s'",
qualified_lambda_version_arn,
target_provisioned_environments,
)
target_provisioned_environments = 1

current_provisioned_environments = [
e
for e in self.environments[version_manager_id].values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
InitializationType,
)
from localstack.services.lambda_.invocation.models import lambda_stores
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
is_lambda_debug_enabled_for,
)

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -130,24 +127,6 @@ def get_invocation_lease(
unqualified_function_arn = function_version.id.unqualified_arn()
qualified_arn = function_version.id.qualified_arn()

# Enforce one lease per ARN if the global flag is set
if is_lambda_debug_enabled_for(qualified_arn):
with provisioned_tracker.lock, on_demand_tracker.lock:
on_demand_executions: int = on_demand_tracker.concurrent_executions[
unqualified_function_arn
]
provisioned_executions = provisioned_tracker.concurrent_executions[qualified_arn]
if on_demand_executions or provisioned_executions:
LOG.warning(
"Concurrent lambda invocations disabled for '%s' by Lambda Debug Mode",
qualified_arn,
)
raise TooManyRequestsException(
"Rate Exceeded.",
Reason="SingleLeaseEnforcement",
Type="User",
)

lease_type = None
# HACK: skip reserved and provisioned concurrency if function not available (e.g., in Lambda@Edge)
if function is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
)
from localstack.utils.docker_utils import DOCKER_CLIENT as CONTAINER_CLIENT
from localstack.utils.files import chmod_r, rm_rf
from localstack.utils.lambda_debug_mode.lambda_debug_mode import lambda_debug_port_for
from localstack.utils.net import get_free_tcp_port
from localstack.utils.strings import short_uid, truncate

Expand Down Expand Up @@ -321,9 +320,6 @@ def start(self, env_vars: dict[str, str]) -> None:
platform=docker_platform(self.function_version.config.architectures[0]),
additional_flags=config.LAMBDA_DOCKER_FLAGS,
)
debug_port = lambda_debug_port_for(self.function_version.qualified_arn)
if debug_port is not None:
container_config.ports.add(debug_port, debug_port)

if self.function_version.config.package_type == PackageType.Zip:
if self.function_version.config.code.is_hot_reloading():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
RuntimeExecutor,
get_runtime_executor,
)
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS,
is_lambda_debug_timeout_enabled_for,
)
from localstack.utils.strings import to_str
from localstack.utils.xray.trace_header import TraceHeader

Expand Down Expand Up @@ -139,7 +135,7 @@ def get_environment_variables(self) -> Dict[str, str]:
# AWS_LAMBDA_DOTNET_PREJIT
"TZ": ":UTC",
# 2) Public AWS RIE interface: https://github.com/aws/aws-lambda-runtime-interface-emulator
"AWS_LAMBDA_FUNCTION_TIMEOUT": self._get_execution_timeout_seconds(),
"AWS_LAMBDA_FUNCTION_TIMEOUT": self.function_version.config.timeout,
# 3) Public LocalStack endpoint
"LOCALSTACK_HOSTNAME": self.runtime_executor.get_endpoint_from_executor(),
"EDGE_PORT": str(config.GATEWAY_LISTEN[0].port),
Expand Down Expand Up @@ -388,18 +384,5 @@ def get_credentials(self) -> Credentials:
DurationSeconds=43200,
)["Credentials"]

def _get_execution_timeout_seconds(self) -> int:
# Returns the timeout value in seconds to be enforced during the execution of the
# lambda function. This is the configured value or the DEBUG MODE default if this
# is enabled.
if is_lambda_debug_timeout_enabled_for(self.function_version.qualified_arn):
return DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
return self.function_version.config.timeout

def _get_startup_timeout_seconds(self) -> int:
# Returns the timeout value in seconds to be enforced during lambda container startups.
# This is the value defined through LAMBDA_RUNTIME_ENVIRONMENT_TIMEOUT or the LAMBDA
# DEBUG MODE default if this is enabled.
if is_lambda_debug_timeout_enabled_for(self.function_version.qualified_arn):
return DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
return STARTUP_TIMEOUT_SEC
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@

from localstack.http import Response, route
from localstack.services.edge import ROUTER
from localstack.services.lambda_ import ldm
from localstack.services.lambda_.invocation.lambda_models import InvocationResult
from localstack.utils.backoff import ExponentialBackoff
from localstack.utils.lambda_debug_mode.lambda_debug_mode import (
DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS,
is_lambda_debug_mode,
)
from localstack.utils.objects import singleton_factory
from localstack.utils.strings import to_str

Expand Down Expand Up @@ -209,9 +206,9 @@ def invoke(self, payload: Dict[str, str]) -> InvocationResult:
# Note that if timeouts are enforced for the lambda function invoked at this endpoint
# (this is needs to be configured in the Lambda Debug Mode Config file), the lambda
# function will continue to enforce the expected timeouts.
if is_lambda_debug_mode():
if ldm.IS_LDM_ENABLED:
# The value is set to a default high value to ensure eventual termination.
timeout_seconds = DEFAULT_LAMBDA_DEBUG_MODE_TIMEOUT_SECONDS
timeout_seconds = ldm.DEFAULT_LDM_TIMEOUT_SECONDS
else:
# Do not wait longer for an invoke than the maximum lambda timeout plus a buffer
lambda_max_timeout_seconds = 900
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class Invocation:
# = invocation_id
request_id: str
trace_context: dict
user_agent: Optional[str] = None


InitializationType = Literal["on-demand", "provisioned-concurrency"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
)
from localstack.aws.connect import connect_to
from localstack.constants import AWS_REGION_US_EAST_1
from localstack.services.lambda_ import hooks as lambda_hooks
from localstack.services.lambda_.analytics import (
FunctionOperation,
FunctionStatus,
Expand Down Expand Up @@ -130,6 +131,7 @@ def stop_version(self, qualified_arn: str) -> None:
if not version_manager:
raise ValueError(f"Unable to find version manager for {qualified_arn}")
self.task_executor.submit(version_manager.stop)
lambda_hooks.delete_function_version.run(qualified_arn)

def get_lambda_version_manager(self, function_arn: str) -> LambdaVersionManager:
"""
Expand Down Expand Up @@ -185,6 +187,7 @@ def create_function_version(self, function_version: FunctionVersion) -> Future[N
assignment_service=self.assignment_service,
)
self.lambda_starting_versions[qualified_arn] = version_manager
lambda_hooks.create_function_version.run(function_version.qualified_arn)
return self.task_executor.submit(self._start_lambda_version, version_manager)

def publish_version(self, function_version: FunctionVersion):
Expand Down Expand Up @@ -230,6 +233,7 @@ def invoke(
request_id: str,
payload: bytes | None,
trace_context: dict | None = None,
user_agent: Optional[str] = None,
) -> InvocationResult | None:
"""
Invokes a specific version of a lambda
Expand Down Expand Up @@ -352,6 +356,7 @@ def invoke(
invoke_time=datetime.now(),
request_id=request_id,
trace_context=trace_context,
user_agent=user_agent,
)
)

Expand All @@ -364,6 +369,7 @@ def invoke(
invoke_time=datetime.now(),
request_id=request_id,
trace_context=trace_context,
user_agent=user_agent,
)
)
status = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
State,
StateReasonCode,
)
from localstack.services.lambda_ import hooks as lambda_hooks
from localstack.services.lambda_.invocation.assignment import AssignmentService
from localstack.services.lambda_.invocation.counting_service import CountingService
from localstack.services.lambda_.invocation.execution_environment import ExecutionEnvironment
Expand All @@ -29,6 +30,7 @@
record_cw_metric_invocation,
)
from localstack.services.lambda_.invocation.runtime_executor import get_runtime_executor
from localstack.services.lambda_.ldm import LDMProvisioner
from localstack.utils.strings import long_uid, truncate
from localstack.utils.threads import FuncThread, start_thread

Expand All @@ -52,6 +54,8 @@ class LambdaVersionManager:
counting_service: CountingService
assignment_service: AssignmentService

ldm_provisioner: LDMProvisioner | None

def __init__(
self,
function_arn: str,
Expand Down Expand Up @@ -79,6 +83,9 @@ def __init__(
# https://aws.amazon.com/blogs/compute/coming-soon-expansion-of-aws-lambda-states-to-all-functions/
self.state = VersionState(state=State.Pending)

self.ldm_provisioner = None
lambda_hooks.inject_ldm_provisioner.run(self)

def start(self) -> VersionState:
try:
self.log_handler.start_subscriber()
Expand Down Expand Up @@ -191,6 +198,30 @@ def invoke(self, *, invocation: Invocation) -> InvocationResult:
LOG.warning(message)
raise ServiceException(message)

if self.ldm_provisioner and (
ldm_execution_environment := self.ldm_provisioner.get_execution_environment(
qualified_lambda_arn=self.function_version.qualified_arn,
user_agent=invocation.user_agent,
)
):
try:
invocation_result = ldm_execution_environment.invoke(invocation)
invocation_result.executed_version = self.function_version.id.qualifier
self.store_logs(
invocation_result=invocation_result, execution_env=ldm_execution_environment
)
except StatusErrorException as e:
invocation_result = InvocationResult(
request_id="",
payload=e.payload,
is_error=True,
logs="",
executed_version=self.function_version.id.qualifier,
)
finally:
ldm_execution_environment.release()
return invocation_result

with self.counting_service.get_invocation_lease(
self.function, self.function_version
) as provisioning_type:
Expand Down
15 changes: 15 additions & 0 deletions localstack-core/localstack/services/lambda_/ldm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import abc
from typing import Optional

from localstack.aws.api.lambda_ import Arn
from localstack.services.lambda_.invocation.execution_environment import ExecutionEnvironment

DEFAULT_LDM_TIMEOUT_SECONDS: int = 3_600
IS_LDM_ENABLED: bool = False


class LDMProvisioner(abc.ABC):
@abc.abstractmethod
def get_execution_environment(
self, qualified_lambda_arn: Arn, user_agent: Optional[str]
) -> Optional[ExecutionEnvironment]: ...
Loading
Loading