11from __future__ import annotations
22
3+ import sys
34from typing import Any
45from typing import Literal
56from typing import TypeVar
67
8+ if sys .version_info >= (3 , 11 ): # pragma: >=3.11 cover
9+ from typing import Self
10+ else : # pragma: <3.11 cover
11+ from typing_extensions import Self
12+
713from proxystore .proxy import extract
814from proxystore .proxy import Proxy
915from proxystore .store import get_store
1016from proxystore .store import Store
1117from proxystore .store .config import ConnectorConfig
18+ from proxystore .store .utils import resolve_async
1219from pydantic import Field
20+ from pydantic import model_validator
1321
1422from taps .plugins import register
1523from taps .transformer ._protocol import TransformerConfig
@@ -29,17 +37,34 @@ class ProxyTransformerConfig(TransformerConfig):
2937 description = 'Connector configuration.' ,
3038 )
3139 cache_size : int = Field (16 , description = 'cache size' )
40+ async_resolve : bool = Field (
41+ False ,
42+ description = (
43+ 'Asynchronously resolve proxies. Not compatible with '
44+ 'extract_target=True.'
45+ ),
46+ )
3247 extract_target : bool = Field (
3348 False ,
3449 description = (
35- 'Extract the target from the proxy when resolving the identifier.'
50+ 'Extract the target from the proxy when resolving the identifier. '
51+ 'Not compatible with async_resolve=True.'
3652 ),
3753 )
3854 populate_target : bool = Field (
3955 True ,
4056 description = 'Populate target objects of newly created proxies.' ,
4157 )
4258
59+ @model_validator (mode = 'after' )
60+ def _validate_mutex_options (self ) -> Self :
61+ if self .async_resolve and self .extract_target :
62+ raise ValueError (
63+ 'Options async_resolve and extract_target cannot be '
64+ 'enabled at the same time.' ,
65+ )
66+ return self
67+
4368 def get_transformer (self ) -> ProxyTransformer :
4469 """Create a transformer from the configuration."""
4570 connector = self .connector .get_connector ()
@@ -51,6 +76,7 @@ def get_transformer(self) -> ProxyTransformer:
5176 populate_target = self .populate_target ,
5277 register = True ,
5378 ),
79+ async_resolve = self .async_resolve ,
5480 extract_target = self .extract_target ,
5581 )
5682
@@ -62,29 +88,43 @@ class ProxyTransformer:
6288
6389 Args:
6490 store: Store instance to use for proxying objects.
91+ async_resolve: Begin asynchronously resolving proxies when the
92+ transformer resolves a proxy (which is otherwise a no-op unless
93+ `extract_target=True`). Not compatible with `extract_target=True`.
6594 extract_target: When `True`, resolving an identifier (i.e., a proxy)
6695 will return the target object. Otherwise, the proxy is returned
67- since a proxy can act as the target object.
96+ since a proxy can act as the target object. Not compatible
97+ with `async_resolve=True`.
6898 """
6999
70100 def __init__ (
71101 self ,
72102 store : Store [Any ],
73103 * ,
104+ async_resolve : bool = False ,
74105 extract_target : bool = False ,
75106 ) -> None :
107+ if async_resolve and extract_target :
108+ raise ValueError (
109+ 'Options async_resolve and extract_target cannot be '
110+ 'enabled at the same time.' ,
111+ )
112+
76113 self .store = store
114+ self .async_resolve = async_resolve
77115 self .extract_target = extract_target
78116
79117 def __repr__ (self ) -> str :
80118 ctype = type (self ).__name__
81119 store = f'store={ self .store } '
120+ async_ = f'async_resolve={ self .async_resolve } '
82121 extract = f'extract_target={ self .extract_target } '
83- return f'{ ctype } ({ store } , { extract } )'
122+ return f'{ ctype } ({ store } , { async_ } , { extract } )'
84123
85124 def __getstate__ (self ) -> dict [str , Any ]:
86125 return {
87126 'config' : self .store .config (),
127+ 'async_resolve' : self .async_resolve ,
88128 'extract_target' : self .extract_target ,
89129 }
90130
@@ -94,6 +134,7 @@ def __setstate__(self, state: dict[str, Any]) -> None:
94134 self .store = store
95135 else :
96136 self .store = Store .from_config (state ['config' ])
137+ self .async_resolve = state ['async_resolve' ]
97138 self .extract_target = state ['extract_target' ]
98139
99140 def close (self ) -> None :
@@ -125,4 +166,8 @@ def resolve(self, identifier: Proxy[T]) -> T | Proxy[T]:
125166 The resolved object or a proxy of the resolved object depending \
126167 on the setting of `extract_target`.
127168 """
128- return extract (identifier ) if self .extract_target else identifier
169+ if self .extract_target :
170+ return extract (identifier )
171+ if self .async_resolve :
172+ resolve_async (identifier )
173+ return identifier
0 commit comments