77import logging
88import threading
99from enum import Enum
10+ import typing
1011from typing import (
1112 Any ,
1213 Callable ,
1920 cast ,
2021 overload ,
2122)
23+ import weakref
2224
2325import typing_extensions
2426
3537 from typing_extensions import Literal
3638
3739
40+ # Import kernel_context for typing only
41+ if typing .TYPE_CHECKING :
42+ import solara .server .kernel_context
43+
44+
3845R = TypeVar ("R" )
3946T = TypeVar ("T" )
4047P = typing_extensions .ParamSpec ("P" )
@@ -201,6 +208,7 @@ class TaskAsyncio(Task[P, R]):
201208 current_future : Optional [asyncio .Future ] = None
202209 _cancel : Optional [Callable [[], None ]] = None
203210 _retry : Optional [Callable [[], None ]] = None
211+ _context : Optional ["weakref.ReferenceType[solara.server.kernel_context.VirtualKernelContext]" ] = None
204212
205213 def __init__ (self , run_in_thread : bool , function : Callable [P , Coroutine [Any , Any , R ]], key : str ):
206214 self .run_in_thread = run_in_thread
@@ -249,6 +257,7 @@ def cancel():
249257 import solara .server .kernel_context
250258
251259 context = solara .server .kernel_context .get_current_context ()
260+ self ._context = weakref .ref (context )
252261 call_event_loop = context .event_loop
253262 else :
254263 call_event_loop = _main_event_loop or asyncio .get_event_loop ()
@@ -263,10 +272,28 @@ def runs_in_thread():
263272 try :
264273 thread_event_loop .run_until_complete (current_task )
265274 except asyncio .CancelledError as e :
266- call_event_loop .call_soon_threadsafe (future .set_exception , e )
275+ try :
276+ call_event_loop .call_soon_threadsafe (future .set_exception , e )
277+ except Exception as e2 :
278+ if not self ._is_context_closed ():
279+ logger .exception (
280+ "error setting exception from for task %s. Original exception: %s\n Reason for failing to set exception: %s" ,
281+ self .function .__name__ ,
282+ e ,
283+ e2 ,
284+ )
267285 except Exception as e :
268286 logger .exception ("error running in thread" )
269- call_event_loop .call_soon_threadsafe (future .set_exception , e )
287+ try :
288+ call_event_loop .call_soon_threadsafe (future .set_exception , e )
289+ except Exception as e2 :
290+ if not self ._is_context_closed ():
291+ logger .exception (
292+ "error setting exception from for task %s. Original exception: %s\n Reason for failing to set exception: %s" ,
293+ self .function .__name__ ,
294+ e ,
295+ e2 ,
296+ )
270297 raise
271298
272299 self ._result .value = TaskResult [R ](latest = self ._last_value , _state = TaskState .STARTING )
@@ -281,6 +308,14 @@ def is_current(self):
281308 assert running_task is not None
282309 return (self .current_task == _get_current_task ()) and not running_task .cancelled ()
283310
311+ def _is_context_closed (self ):
312+ if self ._context is None :
313+ return False
314+ context = self ._context ()
315+ if context is None :
316+ return False
317+ return context .closed_event .is_set ()
318+
284319 async def _async_run (self , call_event_loop : asyncio .AbstractEventLoop , future : asyncio .Future , args , kwargs ) -> None :
285320 self ._start_event .wait ()
286321
@@ -298,12 +333,34 @@ async def runner():
298333 if self .is_current () and not task_for_this_call .cancelled (): # type: ignore
299334 self ._result .value = TaskResult [R ](value = value , latest = value , _state = TaskState .FINISHED , progress = self ._last_progress )
300335 logger .info ("setting result to %r" , value )
301- call_event_loop .call_soon_threadsafe (future .set_result , value )
336+ try :
337+ call_event_loop .call_soon_threadsafe (future .set_result , value )
338+ except Exception as e :
339+ if not self ._is_context_closed ():
340+ logger .exception (
341+ "error setting result from for task %s. Original exception: %s\n Reason for failing to set result: %s" , self .function .__name__ , e , e
342+ )
343+ else :
344+ logger .debug (
345+ "ignoring error setting result from for task %s. Original exception: %s\n Reason for failing to set result: %s" ,
346+ self .function .__name__ ,
347+ e ,
348+ e ,
349+ )
302350 except Exception as e :
303351 if self .is_current ():
304352 logger .exception (e )
305353 self ._result .value = TaskResult [R ](latest = self ._last_value , exception = e , _state = TaskState .ERROR )
306- call_event_loop .call_soon_threadsafe (future .set_exception , e )
354+ try :
355+ call_event_loop .call_soon_threadsafe (future .set_exception , e )
356+ except Exception as e2 :
357+ # we don't know if it is still useful to show this error, so we show it regardless if the context is closed or not
358+ logger .exception (
359+ "error setting exception from for task %s. Original exception: %s\n Reason for failing to set exception: %s" ,
360+ self .function .__name__ ,
361+ e ,
362+ e2 ,
363+ )
307364 # Although this seems like an easy way to handle cancellation, an early cancelled task will never execute
308365 # so this code will never execute, so we need to handle this in the cancel function in __call__
309366 # except asyncio.CancelledError as e:
0 commit comments