11from __future__ import annotations
22
3+ import re
34from typing import Any , Final , Generic , Optional , TypeVar
45
56from localstack .services .cloudformation .engine .v2 .change_set_model import (
@@ -254,20 +255,20 @@ def _resolve_condition(self, logical_id: str) -> PreprocEntityDelta:
254255 return condition_delta
255256 raise RuntimeError (f"No condition '{ logical_id } ' was found." )
256257
257- def _resolve_pseudo_parameter (self , pseudo_parameter_name : str ) -> PreprocEntityDelta :
258+ def _resolve_pseudo_parameter (self , pseudo_parameter_name : str ) -> Any :
258259 match pseudo_parameter_name :
259260 case "AWS::Partition" :
260- after = get_partition (self ._change_set .region_name )
261+ return get_partition (self ._change_set .region_name )
261262 case "AWS::AccountId" :
262- after = self ._change_set .stack .account_id
263+ return self ._change_set .stack .account_id
263264 case "AWS::Region" :
264- after = self ._change_set .stack .region_name
265+ return self ._change_set .stack .region_name
265266 case "AWS::StackName" :
266- after = self ._change_set .stack .stack_name
267+ return self ._change_set .stack .stack_name
267268 case "AWS::StackId" :
268- after = self ._change_set .stack .stack_id
269+ return self ._change_set .stack .stack_id
269270 case "AWS::URLSuffix" :
270- after = _AWS_URL_SUFFIX
271+ return _AWS_URL_SUFFIX
271272 case "AWS::NoValue" :
272273 # TODO: add support for NoValue, None cannot be used to communicate a Null value in preproc classes.
273274 raise NotImplementedError ("The use of AWS:NoValue is currently unsupported" )
@@ -277,14 +278,14 @@ def _resolve_pseudo_parameter(self, pseudo_parameter_name: str) -> PreprocEntity
277278 )
278279 case _:
279280 raise RuntimeError (f"Unknown pseudo parameter value '{ pseudo_parameter_name } '" )
280- return PreprocEntityDelta (before = after , after = after )
281281
282282 def _resolve_reference (self , logical_id : str ) -> PreprocEntityDelta :
283283 if logical_id in _PSEUDO_PARAMETERS :
284- pseudo_parameter_delta = self ._resolve_pseudo_parameter (
284+ pseudo_parameter_value = self ._resolve_pseudo_parameter (
285285 pseudo_parameter_name = logical_id
286286 )
287- return pseudo_parameter_delta
287+ # Pseudo parameters are constants within the lifecycle of a template.
288+ return PreprocEntityDelta (before = pseudo_parameter_value , after = pseudo_parameter_value )
288289
289290 node_parameter = self ._get_node_parameter_if_exists (parameter_name = logical_id )
290291 if isinstance (node_parameter , NodeParameter ):
@@ -477,6 +478,73 @@ def visit_node_intrinsic_function_fn_not(
477478 # Implicit change type computation.
478479 return PreprocEntityDelta (before = before , after = after )
479480
481+ def visit_node_intrinsic_function_fn_sub (
482+ self , node_intrinsic_function : NodeIntrinsicFunction
483+ ) -> PreprocEntityDelta :
484+ arguments_delta = self .visit (node_intrinsic_function .arguments )
485+ arguments_before = arguments_delta .before
486+ arguments_after = arguments_delta .after
487+
488+ def _compute_sub (args : str | list [Any ], select_before : bool = False ) -> str :
489+ # TODO: add further schema validation.
490+ string_template : str
491+ sub_parameters : dict
492+ if isinstance (args , str ):
493+ string_template = args
494+ sub_parameters = dict ()
495+ elif (
496+ isinstance (args , list )
497+ and len (args ) == 2
498+ and isinstance (args [0 ], str )
499+ and isinstance (args [1 ], dict )
500+ ):
501+ string_template = args [0 ]
502+ sub_parameters = args [1 ]
503+ else :
504+ raise RuntimeError (
505+ "Invalid arguments shape for Fn::Sub, expected a String "
506+ f"or a Tuple of String and Map but got '{ args } '"
507+ )
508+ sub_string = string_template
509+ template_variable_names = re .findall ("\\ ${([^}]+)}" , string_template )
510+ for template_variable_name in template_variable_names :
511+ if template_variable_name in _PSEUDO_PARAMETERS :
512+ template_variable_value = self ._resolve_pseudo_parameter (
513+ pseudo_parameter_name = template_variable_name
514+ )
515+ elif template_variable_name in sub_parameters :
516+ template_variable_value = sub_parameters [template_variable_name ]
517+ else :
518+ try :
519+ reference_delta = self ._resolve_reference (logical_id = template_variable_name )
520+ template_variable_value = (
521+ reference_delta .before if select_before else reference_delta .after
522+ )
523+ except RuntimeError :
524+ raise RuntimeError (
525+ f"Undefined variable name in Fn::Sub string template '{ template_variable_name } '"
526+ )
527+ sub_string = sub_string .replace (
528+ f"${{{ template_variable_name } }}" , template_variable_value
529+ )
530+ return sub_string
531+
532+ before = None
533+ if (
534+ isinstance (arguments_before , str )
535+ or isinstance (arguments_before , list )
536+ and len (arguments_before ) == 2
537+ ):
538+ before = _compute_sub (args = arguments_before , select_before = True )
539+ after = None
540+ if (
541+ isinstance (arguments_after , str )
542+ or isinstance (arguments_after , list )
543+ and len (arguments_after ) == 2
544+ ):
545+ after = _compute_sub (args = arguments_after )
546+ return PreprocEntityDelta (before = before , after = after )
547+
480548 def visit_node_intrinsic_function_fn_join (
481549 self , node_intrinsic_function : NodeIntrinsicFunction
482550 ) -> PreprocEntityDelta :
0 commit comments