@@ -62,7 +62,7 @@ func loadAddress(path string) (string, error) {
6262 return string (data ), nil
6363}
6464
65- func loadShim (ctx context.Context , bundle * Bundle , events * exchange. Exchange , rt * runtime. TaskList , onClose func ()) (_ * shim , err error ) {
65+ func loadShim (ctx context.Context , bundle * Bundle , onClose func ()) (_ * shim , err error ) {
6666 address , err := loadAddress (filepath .Join (bundle .Path , "address" ))
6767 if err != nil {
6868 return nil , err
@@ -117,15 +117,15 @@ func loadShim(ctx context.Context, bundle *Bundle, events *exchange.Exchange, rt
117117 }
118118 }()
119119 s := & shim {
120- client : client ,
121- task : task .NewTaskClient (client ),
122- bundle : bundle ,
123- events : events ,
124- rtTasks : rt ,
120+ client : client ,
121+ task : task .NewTaskClient (client ),
122+ bundle : bundle ,
125123 }
126124 ctx , cancel := timeout .WithContext (ctx , loadTimeout )
127125 defer cancel ()
128- if err := s .Connect (ctx ); err != nil {
126+
127+ // Check connectivity
128+ if _ , err := s .PID (ctx ); err != nil {
129129 return nil , err
130130 }
131131 return s , nil
@@ -186,23 +186,9 @@ func cleanupAfterDeadShim(ctx context.Context, id, ns string, rt *runtime.TaskLi
186186var _ runtime.Task = & shim {}
187187
188188type shim struct {
189- bundle * Bundle
190- client * ttrpc.Client
191- task task.TaskService
192- taskPid int
193- events * exchange.Exchange
194- rtTasks * runtime.TaskList
195- }
196-
197- func (s * shim ) Connect (ctx context.Context ) error {
198- response , err := s .task .Connect (ctx , & task.ConnectRequest {
199- ID : s .ID (),
200- })
201- if err != nil {
202- return err
203- }
204- s .taskPid = int (response .TaskPid )
205- return nil
189+ bundle * Bundle
190+ client * ttrpc.Client
191+ task task.TaskService
206192}
207193
208194func (s * shim ) Shutdown (ctx context.Context ) error {
@@ -227,8 +213,15 @@ func (s *shim) ID() string {
227213}
228214
229215// PID of the task
230- func (s * shim ) PID () uint32 {
231- return uint32 (s .taskPid )
216+ func (s * shim ) PID (ctx context.Context ) (uint32 , error ) {
217+ response , err := s .task .Connect (ctx , & task.ConnectRequest {
218+ ID : s .ID (),
219+ })
220+ if err != nil {
221+ return 0 , errdefs .FromGRPC (err )
222+ }
223+
224+ return response .TaskPid , nil
232225}
233226
234227func (s * shim ) Namespace () string {
@@ -239,7 +232,7 @@ func (s *shim) Close() error {
239232 return s .client .Close ()
240233}
241234
242- func (s * shim ) Delete (ctx context.Context ) (* runtime.Exit , error ) {
235+ func (s * shim ) delete (ctx context.Context , removeTask func ( ctx context. Context , id string ) ) (* runtime.Exit , error ) {
243236 response , shimErr := s .task .Delete (ctx , & task.DeleteRequest {
244237 ID : s .ID (),
245238 })
@@ -264,7 +257,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
264257 // So we should remove the record and prevent duplicate events from
265258 // ttrpc-callback-on-close.
266259 if shimErr == nil {
267- s . rtTasks . Delete (ctx , s .ID ())
260+ removeTask (ctx , s .ID ())
268261 }
269262
270263 if err := s .waitShutdown (ctx ); err != nil {
@@ -275,7 +268,7 @@ func (s *shim) Delete(ctx context.Context) (*runtime.Exit, error) {
275268
276269 // remove self from the runtime task list
277270 // this seems dirty but it cleans up the API across runtimes, tasks, and the service
278- s . rtTasks . Delete (ctx , s .ID ())
271+ removeTask (ctx , s .ID ())
279272 if err := s .bundle .Delete (); err != nil {
280273 log .G (ctx ).WithField ("id" , s .ID ()).WithError (err ).Error ("failed to delete bundle" )
281274 }
@@ -311,11 +304,12 @@ func (s *shim) Create(ctx context.Context, opts runtime.CreateOpts) (runtime.Tas
311304 Options : m .Options ,
312305 })
313306 }
314- response , err := s .task .Create (ctx , request )
307+
308+ _ , err := s .task .Create (ctx , request )
315309 if err != nil {
316310 return nil , errdefs .FromGRPC (err )
317311 }
318- s . taskPid = int ( response . Pid )
312+
319313 return s , nil
320314}
321315
@@ -338,13 +332,12 @@ func (s *shim) Resume(ctx context.Context) error {
338332}
339333
340334func (s * shim ) Start (ctx context.Context ) error {
341- response , err := s .task .Start (ctx , & task.StartRequest {
335+ _ , err := s .task .Start (ctx , & task.StartRequest {
342336 ID : s .ID (),
343337 })
344338 if err != nil {
345339 return errdefs .FromGRPC (err )
346340 }
347- s .taskPid = int (response .Pid )
348341 return nil
349342}
350343
@@ -359,7 +352,7 @@ func (s *shim) Kill(ctx context.Context, signal uint32, all bool) error {
359352 return nil
360353}
361354
362- func (s * shim ) Exec (ctx context.Context , id string , opts runtime.ExecOpts ) (runtime.Process , error ) {
355+ func (s * shim ) Exec (ctx context.Context , id string , opts runtime.ExecOpts ) (runtime.ExecProcess , error ) {
363356 if err := identifiers .Validate (id ); err != nil {
364357 return nil , errors .Wrapf (err , "invalid exec id %s" , id )
365358 }
@@ -422,14 +415,18 @@ func (s *shim) CloseIO(ctx context.Context) error {
422415}
423416
424417func (s * shim ) Wait (ctx context.Context ) (* runtime.Exit , error ) {
418+ taskPid , err := s .PID (ctx )
419+ if err != nil {
420+ return nil , err
421+ }
425422 response , err := s .task .Wait (ctx , & task.WaitRequest {
426423 ID : s .ID (),
427424 })
428425 if err != nil {
429426 return nil , errdefs .FromGRPC (err )
430427 }
431428 return & runtime.Exit {
432- Pid : uint32 ( s . taskPid ) ,
429+ Pid : taskPid ,
433430 Timestamp : response .ExitedAt ,
434431 Status : response .ExitStatus ,
435432 }, nil
@@ -468,7 +465,7 @@ func (s *shim) Stats(ctx context.Context) (*ptypes.Any, error) {
468465 return response .Stats , nil
469466}
470467
471- func (s * shim ) Process (ctx context.Context , id string ) (runtime.Process , error ) {
468+ func (s * shim ) Process (ctx context.Context , id string ) (runtime.ExecProcess , error ) {
472469 p := & process {
473470 id : id ,
474471 shim : s ,
0 commit comments