Skip to content

Commit 4d76eb4

Browse files
committed
2 parents 4c5730f + ddc700e commit 4d76eb4

File tree

5 files changed

+196
-16
lines changed

5 files changed

+196
-16
lines changed

.github/workflows/release.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,17 @@ jobs:
260260
console.log(releaseNote)
261261
core.setOutput('version', runnerVersion);
262262
core.setOutput('note', releaseNote);
263+
264+
- name: Validate Packages HASH
265+
working-directory: _package
266+
run: |
267+
ls -l
268+
echo "${{needs.build.outputs.win-x64-sha}} actions-runner-win-x64-${{ steps.releaseNote.outputs.version }}.zip" | shasum -a 256 -c
269+
echo "${{needs.build.outputs.osx-x64-sha}} actions-runner-osx-x64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c
270+
echo "${{needs.build.outputs.linux-x64-sha}} actions-runner-linux-x64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c
271+
echo "${{needs.build.outputs.linux-arm-sha}} actions-runner-linux-arm-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c
272+
echo "${{needs.build.outputs.linux-arm64-sha}} actions-runner-linux-arm64-${{ steps.releaseNote.outputs.version }}.tar.gz" | shasum -a 256 -c
273+
263274
# Create GitHub release
264275
- uses: actions/create-release@master
265276
id: createRelease

src/Runner.Common/JobServerQueue.cs

Lines changed: 128 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
using System.Collections.Generic;
44
using System.IO;
55
using System.Linq;
6+
using System.Net.WebSockets;
7+
using System.Text;
68
using System.Threading;
79
using System.Threading.Tasks;
810
using GitHub.DistributedTask.WebApi;
911
using GitHub.Runner.Sdk;
12+
using GitHub.Services.Common;
13+
using Newtonsoft.Json;
1014
using Pipelines = GitHub.DistributedTask.Pipelines;
1115

1216
namespace GitHub.Runner.Common
@@ -30,6 +34,11 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
3034
private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500);
3135
private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500);
3236
private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000);
37+
private static readonly TimeSpan _minDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(1);
38+
private static readonly TimeSpan _maxDelayForWebsocketReconnect = TimeSpan.FromMilliseconds(500);
39+
40+
private static readonly int _minWebsocketFailurePercentageAllowed = 50;
41+
private static readonly int _minWebsocketBatchedLinesCountToConsider = 5;
3342

3443
// Job message information
3544
private Guid _scopeIdentifier;
@@ -58,6 +67,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
5867
private Task _fileUploadDequeueTask;
5968
private Task _timelineUpdateDequeueTask;
6069

70+
private Task _websocketConnectTask = null;
71+
6172
// common
6273
private IJobServer _jobServer;
6374
private Task[] _allDequeueTasks;
@@ -71,14 +82,21 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
7182

7283
// Web console dequeue will start with process queue every 250ms for the first 60*4 times (~60 seconds).
7384
// Then the dequeue will happen every 500ms.
74-
// In this way, customer still can get instance live console output on job start,
85+
// In this way, customer still can get instance live console output on job start,
7586
// at the same time we can cut the load to server after the build run for more than 60s
7687
private int _webConsoleLineAggressiveDequeueCount = 0;
7788
private const int _webConsoleLineAggressiveDequeueLimit = 4 * 60;
7889
private const int _webConsoleLineQueueSizeLimit = 1024;
7990
private bool _webConsoleLineAggressiveDequeue = true;
8091
private bool _firstConsoleOutputs = true;
8192

93+
private int totalBatchedLinesAttemptedByWebsocket = 0;
94+
private int failedAttemptsToPostBatchedLinesByWebsocket = 0;
95+
96+
private ClientWebSocket _websocketClient = null;
97+
98+
private ServiceEndpoint _serviceEndPoint;
99+
82100
public override void Initialize(IHostContext hostContext)
83101
{
84102
base.Initialize(hostContext);
@@ -89,6 +107,10 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest)
89107
{
90108
Trace.Entering();
91109

110+
this._serviceEndPoint = jobRequest.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
111+
112+
InitializeWebsocket();
113+
92114
if (_queueInProcess)
93115
{
94116
Trace.Info("No-opt, all queue process tasks are running.");
@@ -156,6 +178,9 @@ public async Task ShutdownAsync()
156178
await ProcessTimelinesUpdateQueueAsync(runOnce: true);
157179
Trace.Info("Timeline update queue drained.");
158180

181+
Trace.Info($"Disposing websocket client ...");
182+
this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Shutdown", CancellationToken.None);
183+
159184
Trace.Info("All queue process tasks have been stopped, and all queues are drained.");
160185
}
161186

@@ -292,14 +317,69 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
292317
{
293318
try
294319
{
295-
// we will not requeue failed batch, since the web console lines are time sensitive.
296-
if (batch[0].LineNumber.HasValue)
320+
if (this._websocketConnectTask != null)
297321
{
298-
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken));
322+
// lazily await here, we are already in the background task here
323+
await this._websocketConnectTask;
299324
}
300-
else
325+
326+
var pushedLinesViaWebsocket = false;
327+
if (this._websocketClient != null)
328+
{
329+
var linesWrapper = batch[0].LineNumber.HasValue? new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value):
330+
new TimelineRecordFeedLinesWrapper(stepRecordId, batch.Select(logLine => logLine.Line).ToList());
331+
var jsonData = StringUtil.ConvertToJson(linesWrapper);
332+
try
333+
{
334+
totalBatchedLinesAttemptedByWebsocket++;
335+
var jsonDataBytes = Encoding.UTF8.GetBytes(jsonData);
336+
// break the message into chunks of 1024 bytes
337+
for (var i = 0; i < jsonDataBytes.Length; i += 1 * 1024)
338+
{
339+
var lastChunk = i + (1 * 1024) >= jsonDataBytes.Length;
340+
var chunk = new ArraySegment<byte>(jsonDataBytes, i, Math.Min(1 * 1024, jsonDataBytes.Length - i));
341+
await this._websocketClient.SendAsync(chunk, WebSocketMessageType.Text, endOfMessage:lastChunk, CancellationToken.None);
342+
}
343+
344+
pushedLinesViaWebsocket = true;
345+
}
346+
catch (Exception ex)
347+
{
348+
Trace.Info($"Caught exception during append web console line to websocket, let's fallback to sending via non-websocket call (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}, websocket state: {this._websocketClient?.State}).");
349+
Trace.Error(ex);
350+
failedAttemptsToPostBatchedLinesByWebsocket++;
351+
if (totalBatchedLinesAttemptedByWebsocket > _minWebsocketBatchedLinesCountToConsider)
352+
{
353+
// let's consider failure percentage
354+
if (failedAttemptsToPostBatchedLinesByWebsocket * 100 / totalBatchedLinesAttemptedByWebsocket > _minWebsocketFailurePercentageAllowed)
355+
{
356+
Trace.Info($"Exhausted websocket allowed retries, we will not attempt websocket connection for this job to post lines again.");
357+
this._websocketClient?.CloseOutputAsync(WebSocketCloseStatus.InternalServerError, "Shutdown due to failures", CancellationToken.None);
358+
this._websocketClient = null;
359+
}
360+
}
361+
362+
if (this._websocketClient != null)
363+
{
364+
var delay = BackoffTimerHelper.GetRandomBackoff(_minDelayForWebsocketReconnect, _maxDelayForWebsocketReconnect);
365+
Trace.Info($"Websocket is not open, let's attempt to connect back again with random backoff {delay} ms (total calls: {totalBatchedLinesAttemptedByWebsocket}, failed calls: {failedAttemptsToPostBatchedLinesByWebsocket}).");
366+
InitializeWebsocket(delay);
367+
}
368+
}
369+
}
370+
371+
// if we can't push via websocket, let's fallback to posting via REST API
372+
if (!pushedLinesViaWebsocket)
301373
{
302-
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken));
374+
// we will not requeue failed batch, since the web console lines are time sensitive.
375+
if (batch[0].LineNumber.HasValue)
376+
{
377+
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), batch[0].LineNumber.Value, default(CancellationToken));
378+
}
379+
else
380+
{
381+
await _jobServer.AppendTimelineRecordFeedAsync(_scopeIdentifier, _hubName, _planId, _jobTimelineId, _jobTimelineRecordId, stepRecordId, batch.Select(logLine => logLine.Line).ToList(), default(CancellationToken));
382+
}
303383
}
304384

305385
if (_firstConsoleOutputs)
@@ -391,6 +471,46 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
391471
}
392472
}
393473

474+
private void InitializeWebsocket(TimeSpan? delay = null)
475+
{
476+
if (_serviceEndPoint.Authorization != null &&
477+
_serviceEndPoint.Authorization.Parameters.TryGetValue(EndpointAuthorizationParameters.AccessToken, out var accessToken) &&
478+
!string.IsNullOrEmpty(accessToken))
479+
{
480+
if (_serviceEndPoint.Data.TryGetValue("FeedStreamUrl", out var feedStreamUrl) && !string.IsNullOrEmpty(feedStreamUrl))
481+
{
482+
// let's ensure we use the right scheme
483+
feedStreamUrl = feedStreamUrl.Replace("https://", "wss://").Replace("http://", "ws://");
484+
Trace.Info($"Creating websocket client ..." + feedStreamUrl);
485+
this._websocketClient = new ClientWebSocket();
486+
this._websocketClient.Options.SetRequestHeader("Authorization", $"Bearer {accessToken}");
487+
this._websocketConnectTask = Task.Run(async () =>
488+
{
489+
try
490+
{
491+
Trace.Info($"Attempting to start websocket client with delay {delay}.");
492+
await Task.Delay(delay ?? TimeSpan.Zero);
493+
await this._websocketClient.ConnectAsync(new Uri(feedStreamUrl), default(CancellationToken));
494+
Trace.Info($"Successfully started websocket client.");
495+
}
496+
catch(Exception ex)
497+
{
498+
Trace.Info("Exception caught during websocket client connect, fallback of HTTP would be used now instead of websocket.");
499+
Trace.Error(ex);
500+
}
501+
});
502+
}
503+
else
504+
{
505+
Trace.Info($"No FeedStreamUrl found, so we will use Rest API calls for sending feed data");
506+
}
507+
}
508+
else
509+
{
510+
Trace.Info($"No access token from the service endpoint");
511+
}
512+
}
513+
394514
private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
395515
{
396516
while (!_jobCompletionSource.Task.IsCompleted || runOnce)
@@ -489,8 +609,8 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
489609

490610
if (runOnce)
491611
{
492-
// continue process timeline records update,
493-
// we might have more records need update,
612+
// continue process timeline records update,
613+
// we might have more records need update,
494614
// since we just create a new sub-timeline
495615
if (pendingSubtimelineUpdate)
496616
{

src/Runner.Worker/ExecutionContext.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -550,10 +550,15 @@ public void AddIssue(Issue issue, string logMessage = null)
550550
issue.Message = issue.Message[.._maxIssueMessageLength];
551551
}
552552

553+
// Tracking the line number (logFileLineNumber) and step number (stepNumber) for each issue that gets created
554+
// Actions UI from the run summary page use both values to easily link to an exact locations in logs where annotations originate from
555+
if (_record.Order != null)
556+
{
557+
issue.Data["stepNumber"] = _record.Order.ToString();
558+
}
559+
553560
if (issue.Type == IssueType.Error)
554561
{
555-
// tracking line number for each issue in log file
556-
// log UI use this to navigate from issue to log
557562
if (!string.IsNullOrEmpty(logMessage))
558563
{
559564
long logLineNumber = Write(WellKnownTags.Error, logMessage);
@@ -569,8 +574,6 @@ public void AddIssue(Issue issue, string logMessage = null)
569574
}
570575
else if (issue.Type == IssueType.Warning)
571576
{
572-
// tracking line number for each issue in log file
573-
// log UI use this to navigate from issue to log
574577
if (!string.IsNullOrEmpty(logMessage))
575578
{
576579
long logLineNumber = Write(WellKnownTags.Warning, logMessage);
@@ -586,9 +589,6 @@ public void AddIssue(Issue issue, string logMessage = null)
586589
}
587590
else if (issue.Type == IssueType.Notice)
588591
{
589-
590-
// tracking line number for each issue in log file
591-
// log UI use this to navigate from issue to log
592592
if (!string.IsNullOrEmpty(logMessage))
593593
{
594594
long logLineNumber = Write(WellKnownTags.Notice, logMessage);

src/Runner.Worker/JobExtension.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ public void FinalizeJob(IExecutionContext jobContext, Pipelines.AgentJobRequestM
406406
// create a new timeline record node for 'Finalize job'
407407
IExecutionContext context = jobContext.CreateChild(Guid.NewGuid(), "Complete job", $"{nameof(JobExtension)}_Final", null, null, ActionRunStage.Post);
408408
context.StepTelemetry.Type = "runner";
409-
context.StepTelemetry.Action = "complete_joh";
409+
context.StepTelemetry.Action = "complete_job";
410410
using (var register = jobContext.CancellationToken.Register(() => { context.CancelToken(); }))
411411
{
412412
try

0 commit comments

Comments
 (0)