33using System . Collections . Generic ;
44using System . IO ;
55using System . Linq ;
6+ using System . Net . WebSockets ;
7+ using System . Text ;
68using System . Threading ;
79using System . Threading . Tasks ;
810using GitHub . DistributedTask . WebApi ;
911using GitHub . Runner . Sdk ;
12+ using GitHub . Services . Common ;
13+ using Newtonsoft . Json ;
1014using Pipelines = GitHub . DistributedTask . Pipelines ;
1115
1216namespace GitHub . Runner . Common
@@ -30,6 +34,11 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
3034 private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan . FromMilliseconds ( 500 ) ;
3135 private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan . FromMilliseconds ( 500 ) ;
3236 private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan . FromMilliseconds ( 1000 ) ;
37+ private static readonly TimeSpan _minDelayForWebsocketReconnect = TimeSpan . FromMilliseconds ( 1 ) ;
38+ private static readonly TimeSpan _maxDelayForWebsocketReconnect = TimeSpan . FromMilliseconds ( 500 ) ;
39+
40+ private static readonly int _minWebsocketFailurePercentageAllowed = 50 ;
41+ private static readonly int _minWebsocketBatchedLinesCountToConsider = 5 ;
3342
3443 // Job message information
3544 private Guid _scopeIdentifier ;
@@ -58,6 +67,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
5867 private Task _fileUploadDequeueTask ;
5968 private Task _timelineUpdateDequeueTask ;
6069
70+ private Task _websocketConnectTask = null ;
71+
6172 // common
6273 private IJobServer _jobServer ;
6374 private Task [ ] _allDequeueTasks ;
@@ -71,14 +82,21 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
7182
7283 // Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds).
7384 // Then the dequeue will happen every 500ms.
74- // In this way, customer still can get instance live console output on job start,
85+ // In this way, customer still can get instance live console output on job start,
7586 // at the same time we can cut the load to server after the build run for more than 60s
7687 private int _webConsoleLineAggressiveDequeueCount = 0 ;
7788 private const int _webConsoleLineAggressiveDequeueLimit = 4 * 60 ;
7889 private const int _webConsoleLineQueueSizeLimit = 1024 ;
7990 private bool _webConsoleLineAggressiveDequeue = true ;
8091 private bool _firstConsoleOutputs = true ;
8192
93+ private int totalBatchedLinesAttemptedByWebsocket = 0 ;
94+ private int failedAttemptsToPostBatchedLinesByWebsocket = 0 ;
95+
96+ private ClientWebSocket _websocketClient = null ;
97+
98+ private ServiceEndpoint _serviceEndPoint ;
99+
82100 public override void Initialize ( IHostContext hostContext )
83101 {
84102 base . Initialize ( hostContext ) ;
@@ -89,6 +107,10 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest)
89107 {
90108 Trace . Entering ( ) ;
91109
110+ this . _serviceEndPoint = jobRequest . Resources . Endpoints . Single ( x => string . Equals ( x . Name , WellKnownServiceEndpointNames . SystemVssConnection , StringComparison . OrdinalIgnoreCase ) ) ;
111+
112+ InitializeWebsocket ( ) ;
113+
92114 if ( _queueInProcess )
93115 {
94116 Trace . Info ( "No-opt, all queue process tasks are running." ) ;
@@ -156,6 +178,9 @@ public async Task ShutdownAsync()
156178 await ProcessTimelinesUpdateQueueAsync ( runOnce : true ) ;
157179 Trace . Info ( "Timeline update queue drained." ) ;
158180
181+ Trace . Info ( $ "Disposing websocket client ...") ;
182+ this . _websocketClient ? . CloseOutputAsync ( WebSocketCloseStatus . NormalClosure , "Shutdown" , CancellationToken . None ) ;
183+
159184 Trace . Info ( "All queue process tasks have been stopped, and all queues are drained." ) ;
160185 }
161186
@@ -292,14 +317,69 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
292317 {
293318 try
294319 {
295- // we will not requeue failed batch, since the web console lines are time sensitive.
296- if ( batch [ 0 ] . LineNumber . HasValue )
320+ if ( this . _websocketConnectTask != null )
297321 {
298- await _jobServer . AppendTimelineRecordFeedAsync ( _scopeIdentifier , _hubName , _planId , _jobTimelineId , _jobTimelineRecordId , stepRecordId , batch . Select ( logLine => logLine . Line ) . ToList ( ) , batch [ 0 ] . LineNumber . Value , default ( CancellationToken ) ) ;
322+ // lazily await here, we are already in the background task here
323+ await this . _websocketConnectTask ;
299324 }
300- else
325+
326+ var pushedLinesViaWebsocket = false ;
327+ if ( this . _websocketClient != null )
328+ {
329+ var linesWrapper = batch [ 0 ] . LineNumber . HasValue ? new TimelineRecordFeedLinesWrapper ( stepRecordId , batch . Select ( logLine => logLine . Line ) . ToList ( ) , batch [ 0 ] . LineNumber . Value ) :
330+ new TimelineRecordFeedLinesWrapper ( stepRecordId , batch . Select ( logLine => logLine . Line ) . ToList ( ) ) ;
331+ var jsonData = StringUtil . ConvertToJson ( linesWrapper ) ;
332+ try
333+ {
334+ totalBatchedLinesAttemptedByWebsocket ++ ;
335+ var jsonDataBytes = Encoding . UTF8 . GetBytes ( jsonData ) ;
336+ // break the message into chunks of 1024 bytes
337+ for ( var i = 0 ; i < jsonDataBytes . Length ; i += 1 * 1024 )
338+ {
339+ var lastChunk = i + ( 1 * 1024 ) >= jsonDataBytes . Length ;
340+ var chunk = new ArraySegment < byte > ( jsonDataBytes , i , Math . Min ( 1 * 1024 , jsonDataBytes . Length - i ) ) ;
341+ await this . _websocketClient . SendAsync ( chunk , WebSocketMessageType . Text , endOfMessage : lastChunk , CancellationToken . None ) ;
342+ }
343+
344+ pushedLinesViaWebsocket = true ;
345+ }
346+ catch ( Exception ex )
347+ {
348+ Trace . Info ( $ "Caught exception during append web console line to websocket, let's fallback to sending via non-websocket call (total calls: { totalBatchedLinesAttemptedByWebsocket } , failed calls: { failedAttemptsToPostBatchedLinesByWebsocket } , websocket state: { this . _websocketClient ? . State } ).") ;
349+ Trace . Error ( ex ) ;
350+ failedAttemptsToPostBatchedLinesByWebsocket ++ ;
351+ if ( totalBatchedLinesAttemptedByWebsocket > _minWebsocketBatchedLinesCountToConsider )
352+ {
353+ // let's consider failure percentage
354+ if ( failedAttemptsToPostBatchedLinesByWebsocket * 100 / totalBatchedLinesAttemptedByWebsocket > _minWebsocketFailurePercentageAllowed )
355+ {
356+ Trace . Info ( $ "Exhausted websocket allowed retries, we will not attempt websocket connection for this job to post lines again.") ;
357+ this . _websocketClient ? . CloseOutputAsync ( WebSocketCloseStatus . InternalServerError , "Shutdown due to failures" , CancellationToken . None ) ;
358+ this . _websocketClient = null ;
359+ }
360+ }
361+
362+ if ( this . _websocketClient != null )
363+ {
364+ var delay = BackoffTimerHelper . GetRandomBackoff ( _minDelayForWebsocketReconnect , _maxDelayForWebsocketReconnect ) ;
365+ Trace . Info ( $ "Websocket is not open, let's attempt to connect back again with random backoff { delay } ms (total calls: { totalBatchedLinesAttemptedByWebsocket } , failed calls: { failedAttemptsToPostBatchedLinesByWebsocket } ).") ;
366+ InitializeWebsocket ( delay ) ;
367+ }
368+ }
369+ }
370+
371+ // if we can't push via websocket, let's fallback to posting via REST API
372+ if ( ! pushedLinesViaWebsocket )
301373 {
302- await _jobServer . AppendTimelineRecordFeedAsync ( _scopeIdentifier , _hubName , _planId , _jobTimelineId , _jobTimelineRecordId , stepRecordId , batch . Select ( logLine => logLine . Line ) . ToList ( ) , default ( CancellationToken ) ) ;
374+ // we will not requeue failed batch, since the web console lines are time sensitive.
375+ if ( batch [ 0 ] . LineNumber . HasValue )
376+ {
377+ await _jobServer . AppendTimelineRecordFeedAsync ( _scopeIdentifier , _hubName , _planId , _jobTimelineId , _jobTimelineRecordId , stepRecordId , batch . Select ( logLine => logLine . Line ) . ToList ( ) , batch [ 0 ] . LineNumber . Value , default ( CancellationToken ) ) ;
378+ }
379+ else
380+ {
381+ await _jobServer . AppendTimelineRecordFeedAsync ( _scopeIdentifier , _hubName , _planId , _jobTimelineId , _jobTimelineRecordId , stepRecordId , batch . Select ( logLine => logLine . Line ) . ToList ( ) , default ( CancellationToken ) ) ;
382+ }
303383 }
304384
305385 if ( _firstConsoleOutputs )
@@ -391,6 +471,46 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
391471 }
392472 }
393473
474+ private void InitializeWebsocket ( TimeSpan ? delay = null )
475+ {
476+ if ( _serviceEndPoint . Authorization != null &&
477+ _serviceEndPoint . Authorization . Parameters . TryGetValue ( EndpointAuthorizationParameters . AccessToken , out var accessToken ) &&
478+ ! string . IsNullOrEmpty ( accessToken ) )
479+ {
480+ if ( _serviceEndPoint . Data . TryGetValue ( "FeedStreamUrl" , out var feedStreamUrl ) && ! string . IsNullOrEmpty ( feedStreamUrl ) )
481+ {
482+ // let's ensure we use the right scheme
483+ feedStreamUrl = feedStreamUrl . Replace ( "https://" , "wss://" ) . Replace ( "http://" , "ws://" ) ;
484+ Trace . Info ( $ "Creating websocket client ..." + feedStreamUrl ) ;
485+ this . _websocketClient = new ClientWebSocket ( ) ;
486+ this . _websocketClient . Options . SetRequestHeader ( "Authorization" , $ "Bearer { accessToken } ") ;
487+ this . _websocketConnectTask = Task . Run ( async ( ) =>
488+ {
489+ try
490+ {
491+ Trace . Info ( $ "Attempting to start websocket client with delay { delay } .") ;
492+ await Task . Delay ( delay ?? TimeSpan . Zero ) ;
493+ await this . _websocketClient . ConnectAsync ( new Uri ( feedStreamUrl ) , default ( CancellationToken ) ) ;
494+ Trace . Info ( $ "Successfully started websocket client.") ;
495+ }
496+ catch ( Exception ex )
497+ {
498+ Trace . Info ( "Exception caught during websocket client connect, fallback of HTTP would be used now instead of websocket." ) ;
499+ Trace . Error ( ex ) ;
500+ }
501+ } ) ;
502+ }
503+ else
504+ {
505+ Trace . Info ( $ "No FeedStreamUrl found, so we will use Rest API calls for sending feed data") ;
506+ }
507+ }
508+ else
509+ {
510+ Trace . Info ( $ "No access token from the service endpoint") ;
511+ }
512+ }
513+
394514 private async Task ProcessTimelinesUpdateQueueAsync ( bool runOnce = false )
395515 {
396516 while ( ! _jobCompletionSource . Task . IsCompleted || runOnce )
@@ -489,8 +609,8 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
489609
490610 if ( runOnce )
491611 {
492- // continue process timeline records update,
493- // we might have more records need update,
612+ // continue process timeline records update,
613+ // we might have more records need update,
494614 // since we just create a new sub-timeline
495615 if ( pendingSubtimelineUpdate )
496616 {
0 commit comments