Add Task.WhenEach to process tasks as they complete#100316
Add Task.WhenEach to process tasks as they complete#100316stephentoub merged 6 commits intodotnet:mainfrom
Conversation
|
Note regarding the |
src/libraries/System.Private.CoreLib/src/System/Threading/Tasks/Task.cs
Outdated
Show resolved
Hide resolved
src/libraries/System.Runtime/tests/System.Threading.Tasks.Tests/MethodCoverage.cs
Outdated
Show resolved
Hide resolved
src/libraries/System.Runtime/tests/System.Threading.Tasks.Tests/MethodCoverage.cs
Outdated
Show resolved
Hide resolved
src/libraries/System.Runtime/tests/System.Threading.Tasks.Tests/MethodCoverage.cs
Outdated
Show resolved
Hide resolved
Also clean up an extra level of indentation.
This comment was marked as outdated.
This comment was marked as outdated.
|
Will this be backported to .NET 8? |
No |
|
@stephentoub while (connectionRequests.Count > 0)
{
var cts = new CancellationTokenSource();
var timeout = Task.Delay(10000, cts.Token);
var result = await Task.WhenAny(Task.WhenAny(connectionRequests), timeout).ConfigureAwait(false);
if (result != timeout)
{
...
yield return obj;
}
else
{
//no new task completed in 10 seconds, lets end this
}
}I'm not seeing any way to add a timeout with the new method, that would result in much cleaner code, thanks. I had the code like this before because in older versions of .NET some ping requests would just hang around forever and never return in Android so I just added this 10 second timeout to handle that case. |
|
Maybe something like this: using var cts = new CancellationTokenSource(10_000);
await foreach (var item in Task.WhenEach(connectionRequests).WithCancellation(cts.Token))
{
ProcessItem(item.Result);
cts.TryReset(); // We got an item, so reset the token
} |
Is this necessary? e.g. Task<int>[] tasks = [
Task.Run(() => 1),
Task.Run(() => {Task.Delay(20_000).GetAwaiter().GetResult(); return 2; }),
Task.Run(() => 3),
Task.Run(() => {Task.Delay(20_000).GetAwaiter().GetResult(); return 4; }),
Task.Run(() => 5)
];
using var cts = new CancellationTokenSource(10_000);
try
{
await foreach (var item in Task.WhenEach(tasks).WithCancellation(cts.Token))
{
Console.WriteLine($"{item.Result} at {DateTime.UtcNow.ToString("o")}");
//cts.TryReset(); // We got an item, so reset the token
}
}
catch (TaskCanceledException ex)
{
Console.WriteLine($"Exception at {DateTime.UtcNow.ToString("o")} {ex}");
}
after uncommenting |
|
Try using var cts = new CancellationTokenSource(10_000);
await foreach (var item in Task.WhenEach(connectionRequests).WithCancellation(cts.Token))
{
// If TryReset fails, the timeout expired right after the last task completed.
if (!cts.TryReset()) cts.Token.ThrowIfCancellationRequested();
ProcessItem(item.Result);
cts.CancelAfter(10_000); // Start the timer again
} |
|
Ah yes you have to call CancelAfter again! (I should have tested 😄 ) |
|
didn't know about the "WithCancellation", this should work, thanks. |
|
@am11 The reset is required because the timeout for the next item, not the entire operation IIUC. |
|
Oh, i didn't know about it because it looks like it doesn't exist... I looked at the issue and it looks like it was implemented as "WaitAsync" in 2021. Where are you guys getting "WithCancellation" from? If it was named "WithCancellation" im sure I would have found it when I was looking for a way to cancel a task, I would have never looked for "WaitAsync" |
|
|
WaitAsync is different |
Ah I see now, was just looking at Task |
|
The only problem I see now is that this throws an exception instead of my previous code that did not when using a delay with "WhenAny", because of this I can no longer yield return inside of the "await foreach" block. Looks like its easier to just do it the original way for my use case. |
|
I see there is ConfigureAwaitOptions.SuppressThrowing now but it's not applicable to ConfigureAwait on IAsyncEnumerable, is there a technical reason why it cant be applied here? |
|
How would you use it? |
|
something like: await foreach (var item in Task.WhenEach(connectionRequests).WithCancellation(cts.Token).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing))
{
yield return device;
}not sure how that would be able to actually end the foreach loop like an exception is able to though. I just can't have an exception here because I'm not allowed to try/catch around a "yield return". On second thought, I think having the cancellation exception thrown up is actually better for my case since I can just handle it at the higher level and log that the scan was not complete there instead of here. |
Yeah, I'd be concerned about exposing/supporting |
* Add Task.WhenEach to process tasks as they complete * Address PR feedback * Fix some async void tests to be async Task across libs * Remove extra awaiter field from state machine Also clean up an extra level of indentation.
Fixes #61959