Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions localstack-core/localstack/services/lambda_/analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from enum import StrEnum

from localstack.utils.analytics.metrics import Counter

NAMESPACE = "lambda"

hotreload_counter = Counter(namespace=NAMESPACE, name="hotreload", labels=["operation"])

function_counter = Counter(
namespace=NAMESPACE,
name="function",
labels=[
"operation",
"status",
"runtime",
"package_type",
# only for operation "invoke"
"invocation_type",
],
)


class FunctionOperation(StrEnum):
invoke = "invoke"
create = "create"


class FunctionStatus(StrEnum):
success = "success"
zero_reserved_concurrency_error = "zero_reserved_concurrency_error"
event_age_exceeded_error = "event_age_exceeded_error"
throttle_error = "throttle_error"
system_error = "system_error"
unhandled_state_error = "unhandled_state_error"
failed_state_error = "failed_state_error"
pending_state_error = "pending_state_error"
invalid_payload_error = "invalid_payload_error"
invocation_error = "invocation_error"


esm_counter = Counter(namespace=NAMESPACE, name="esm", labels=["source", "status"])


class EsmExecutionStatus(StrEnum):
success = "success"
partial_batch_failure_error = "partial_batch_failure_error"
target_invocation_error = "target_invocation_error"
unhandled_error = "unhandled_error"
source_poller_error = "source_poller_error"
# TODO: Add tracking for filter error. Options:
# a) raise filter exception and track it in the esm_worker
# b) somehow add tracking in the individual pollers
filter_error = "filter_error"
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid

from localstack.aws.api.pipes import LogLevel
from localstack.services.lambda_.analytics import EsmExecutionStatus, esm_counter
from localstack.services.lambda_.event_source_mapping.event_processor import (
BatchFailureError,
EventProcessor,
Expand All @@ -15,7 +16,6 @@
Sender,
SenderError,
)
from localstack.services.lambda_.usage import esm_error, esm_invocation

LOG = logging.getLogger(__name__)

Expand All @@ -37,7 +37,6 @@ def process_events_batch(self, input_events: list[dict] | dict) -> None:
else:
first_event = {}
event_source = first_event.get("eventSource")
esm_invocation.record(event_source)

execution_id = uuid.uuid4()
# Create a copy of the original input events
Expand All @@ -60,12 +59,16 @@ def process_events_batch(self, input_events: list[dict] | dict) -> None:
messageType="ExecutionSucceeded",
logLevel=LogLevel.INFO,
)
esm_counter.labels(source=event_source, status=EsmExecutionStatus.success).increment()
except PartialFailureSenderError as e:
self.logger.log(
messageType="ExecutionFailed",
logLevel=LogLevel.ERROR,
error=e.error,
)
esm_counter.labels(
source=event_source, status=EsmExecutionStatus.partial_batch_failure_error
).increment()
# TODO: check whether partial batch item failures is enabled by default or need to be explicitly enabled
# using --function-response-types "ReportBatchItemFailures"
# https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html
Expand All @@ -78,15 +81,20 @@ def process_events_batch(self, input_events: list[dict] | dict) -> None:
logLevel=LogLevel.ERROR,
error=e.error,
)
esm_counter.labels(
source=event_source, status=EsmExecutionStatus.target_invocation_error
).increment()
raise BatchFailureError(error=e.error) from e
except Exception as e:
esm_error.record(event_source)
LOG.error(
"Unhandled exception while processing Lambda event source mapping (ESM) events %s for ESM with execution id %s",
events,
execution_id,
exc_info=LOG.isEnabledFor(logging.DEBUG),
)
esm_counter.labels(
source=event_source, status=EsmExecutionStatus.unhandled_error
).increment()
raise e

def process_target_stage(self, events: list[dict]) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
LAMBDA_EVENT_SOURCE_MAPPING_MAX_BACKOFF_ON_ERROR_SEC,
LAMBDA_EVENT_SOURCE_MAPPING_POLL_INTERVAL_SEC,
)
from localstack.services.lambda_.analytics import EsmExecutionStatus, esm_counter
from localstack.services.lambda_.event_source_mapping.pollers.poller import (
EmptyPollResultsException,
Poller,
)
from localstack.services.lambda_.invocation.models import LambdaStore, lambda_stores
from localstack.services.lambda_.provider_utils import get_function_version_from_arn
from localstack.utils.aws.arns import parse_arn
from localstack.utils.backoff import ExponentialBackoff
from localstack.utils.threads import FuncThread

Expand Down Expand Up @@ -181,6 +183,10 @@ def poller_loop(self, *args, **kwargs):
e,
exc_info=LOG.isEnabledFor(logging.DEBUG),
)
event_source = parse_arn(self.esm_config.get("EventSourceArn")).get("service")
esm_counter.labels(
source=event_source, status=EsmExecutionStatus.source_poller_error
).increment()
# Wait some time between retries to avoid running into the problem right again
poll_interval_duration = error_boff.next_backoff()
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@
from botocore.config import Config

from localstack import config
from localstack.aws.api.lambda_ import TooManyRequestsException
from localstack.aws.api.lambda_ import InvocationType, TooManyRequestsException
from localstack.services.lambda_.analytics import (
FunctionOperation,
FunctionStatus,
function_counter,
)
from localstack.services.lambda_.invocation.internal_sqs_queue import get_fake_sqs_client
from localstack.services.lambda_.invocation.lambda_models import (
EventInvokeConfig,
Expand Down Expand Up @@ -194,18 +199,30 @@ def handle_message(self, message: dict) -> None:
failure_cause = None
qualifier = self.version_manager.function_version.id.qualifier
event_invoke_config = self.version_manager.function.event_invoke_configs.get(qualifier)
runtime = None
status = None
try:
sqs_invocation = SQSInvocation.decode(message["Body"])
invocation = sqs_invocation.invocation
try:
invocation_result = self.version_manager.invoke(invocation=invocation)
function_config = self.version_manager.function_version.config
function_counter.labels(
operation=FunctionOperation.invoke,
runtime=function_config.runtime or "n/a",
status=FunctionStatus.success,
invocation_type=InvocationType.Event,
package_type=function_config.package_type,
).increment()
except Exception as e:
# Reserved concurrency == 0
if self.version_manager.function.reserved_concurrent_executions == 0:
failure_cause = "ZeroReservedConcurrency"
status = FunctionStatus.zero_reserved_concurrency_error
# Maximum event age expired (lookahead for next retry)
elif not has_enough_time_for_retry(sqs_invocation, event_invoke_config):
failure_cause = "EventAgeExceeded"
status = FunctionStatus.event_age_exceeded_error
if failure_cause:
invocation_result = InvocationResult(
is_error=True, request_id=invocation.request_id, payload=None, logs=None
Expand All @@ -216,13 +233,22 @@ def handle_message(self, message: dict) -> None:
self.process_dead_letter_queue(sqs_invocation, invocation_result)
return
# 3) Otherwise, retry without increasing counter
self.process_throttles_and_system_errors(sqs_invocation, e)
status = self.process_throttles_and_system_errors(sqs_invocation, e)
return
finally:
sqs_client = get_sqs_client(self.version_manager.function_version)
sqs_client.delete_message(
QueueUrl=self.event_queue_url, ReceiptHandle=message["ReceiptHandle"]
)
# status MUST be set before returning
package_type = self.version_manager.function_version.config.package_type
function_counter.labels(
operation=FunctionOperation.invoke,
runtime=runtime or "n/a",
status=status,
invocation_type=InvocationType.Event,
package_type=package_type,
).increment()

# Good summary blogpost: https://haithai91.medium.com/aws-lambdas-retry-behaviors-edff90e1cf1b
# Asynchronous invocation handling: https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html
Expand Down Expand Up @@ -278,7 +304,9 @@ def handle_message(self, message: dict) -> None:
"Error handling lambda invoke %s", e, exc_info=LOG.isEnabledFor(logging.DEBUG)
)

def process_throttles_and_system_errors(self, sqs_invocation: SQSInvocation, error: Exception):
def process_throttles_and_system_errors(
self, sqs_invocation: SQSInvocation, error: Exception
) -> str:
# If the function doesn't have enough concurrency available to process all events, additional
# requests are throttled. For throttling errors (429) and system errors (500-series), Lambda returns
# the event to the queue and attempts to run the function again for up to 6 hours. The retry interval
Expand All @@ -292,10 +320,12 @@ def process_throttles_and_system_errors(self, sqs_invocation: SQSInvocation, err
# https://repost.aws/knowledge-center/lambda-troubleshoot-invoke-error-502-500
if isinstance(error, TooManyRequestsException): # Throttles 429
LOG.debug("Throttled lambda %s: %s", self.version_manager.function_arn, error)
status = FunctionStatus.throttle_error
else: # System errors 5xx
LOG.debug(
"Service exception in lambda %s: %s", self.version_manager.function_arn, error
)
status = FunctionStatus.system_error
maximum_exception_retry_delay_seconds = 5 * 60
delay_seconds = min(
2**sqs_invocation.exception_retries, maximum_exception_retry_delay_seconds
Expand All @@ -307,6 +337,7 @@ def process_throttles_and_system_errors(self, sqs_invocation: SQSInvocation, err
MessageBody=sqs_invocation.encode(),
DelaySeconds=delay_seconds,
)
return status

def process_success_destination(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
)
from localstack.aws.connect import connect_to
from localstack.constants import AWS_REGION_US_EAST_1
from localstack.services.lambda_ import usage
from localstack.services.lambda_.analytics import (
FunctionOperation,
FunctionStatus,
function_counter,
hotreload_counter,
)
from localstack.services.lambda_.api_utils import (
lambda_arn,
qualified_lambda_arn,
Expand Down Expand Up @@ -272,29 +277,42 @@ def invoke(

# Need the qualified arn to exactly get the target lambda
qualified_arn = qualified_lambda_arn(function_name, version_qualifier, account_id, region)
version = function.versions.get(version_qualifier)
runtime = version.config.runtime or "n/a"
package_type = version.config.package_type
try:
version_manager = self.get_lambda_version_manager(qualified_arn)
event_manager = self.get_lambda_event_manager(qualified_arn)
usage.runtime.record(version_manager.function_version.config.runtime)
except ValueError as e:
version = function.versions.get(version_qualifier)
state = version and version.config.state.state
# TODO: make such developer hints optional or remove after initial v2 transition period
if state == State.Failed:
status = FunctionStatus.failed_state_error
HINT_LOG.error(
f"Failed to create the runtime executor for the function {function_name}. "
"Please ensure that Docker is available in the LocalStack container by adding the volume mount "
'"/var/run/docker.sock:/var/run/docker.sock" to your LocalStack startup. '
"Check out https://docs.localstack.cloud/user-guide/aws/lambda/#docker-not-available"
)
elif state == State.Pending:
status = FunctionStatus.pending_state_error
HINT_LOG.warning(
"Lambda functions are created and updated asynchronously in the new lambda provider like in AWS. "
f"Before invoking {function_name}, please wait until the function transitioned from the state "
"Pending to Active using: "
f'"awslocal lambda wait function-active-v2 --function-name {function_name}" '
"Check out https://docs.localstack.cloud/user-guide/aws/lambda/#function-in-pending-state"
)
else:
status = FunctionStatus.unhandled_state_error
LOG.error("Unexpected state %s for Lambda function %s", state, function_name)
function_counter.labels(
operation=FunctionOperation.invoke,
runtime=runtime,
status=status,
invocation_type=invocation_type,
package_type=package_type,
).increment()
raise ResourceConflictException(
f"The operation cannot be performed at this time. The function is currently in the following state: {state}"
) from e
Expand All @@ -306,6 +324,13 @@ def invoke(
try:
to_str(payload)
except Exception as e:
function_counter.labels(
operation=FunctionOperation.invoke,
runtime=runtime,
status=FunctionStatus.invalid_payload_error,
invocation_type=invocation_type,
package_type=package_type,
).increment()
# MAYBE: improve parity of detailed exception message (quite cumbersome)
raise InvalidRequestContentException(
f"Could not parse request body into json: Could not parse payload into json: {e}",
Expand All @@ -331,7 +356,7 @@ def invoke(
)
)

return version_manager.invoke(
invocation_result = version_manager.invoke(
invocation=Invocation(
payload=payload,
invoked_arn=invoked_arn,
Expand All @@ -342,6 +367,19 @@ def invoke(
trace_context=trace_context,
)
)
status = (
FunctionStatus.invocation_error
if invocation_result.is_error
else FunctionStatus.success
)
function_counter.labels(
operation=FunctionOperation.invoke,
runtime=runtime,
status=status,
invocation_type=invocation_type,
package_type=package_type,
).increment()
return invocation_result

def update_version(self, new_version: FunctionVersion) -> Future[None]:
"""
Expand Down Expand Up @@ -601,7 +639,7 @@ def store_s3_bucket_archive(
:return: S3 Code object representing the archive stored in S3
"""
if archive_bucket == config.BUCKET_MARKER_LOCAL:
usage.hotreload.increment()
hotreload_counter.labels(operation="create").increment()
return create_hot_reloading_code(path=archive_key)
s3_client: "S3Client" = connect_to().s3
kwargs = {"VersionId": archive_version} if archive_version else {}
Expand Down
12 changes: 12 additions & 0 deletions localstack-core/localstack/services/lambda_/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@
from localstack.services.edge import ROUTER
from localstack.services.lambda_ import api_utils
from localstack.services.lambda_ import hooks as lambda_hooks
from localstack.services.lambda_.analytics import (
FunctionOperation,
FunctionStatus,
function_counter,
)
from localstack.services.lambda_.api_utils import (
ARCHITECTURES,
STATEMENT_ID_REGEX,
Expand Down Expand Up @@ -1037,6 +1042,13 @@ def create_function(
)
fn.versions["$LATEST"] = version
state.functions[function_name] = fn
function_counter.labels(
operation=FunctionOperation.create,
runtime=runtime or "n/a",
status=FunctionStatus.success,
invocation_type="n/a",
package_type=package_type,
)
self.lambda_service.create_function_version(version)

if tags := request.get("Tags"):
Expand Down
17 changes: 0 additions & 17 deletions localstack-core/localstack/services/lambda_/usage.py

This file was deleted.

Loading