Skip to content

Commit c9c4adc

Browse files
committed
Use simple async tasks rather than fixed number of worker tasks
1 parent 9deabfa commit c9c4adc

2 files changed

Lines changed: 44 additions & 28 deletions

File tree

tests/ParallelTypeCheckingTests/Code/GraphProcessing.fs

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
/// Parallel processing of graph of work items with dependencies
22
module ParallelTypeCheckingTests.GraphProcessing
33

4+
open System.Collections.Concurrent
45
open System.Collections.Generic
56
open System.Threading
7+
open ParallelTypeCheckingTests.Parallel
68

79
/// Used for processing
810
type NodeInfo<'Item> =
@@ -13,17 +15,32 @@ type NodeInfo<'Item> =
1315
Dependants: 'Item[]
1416
}
1517

18+
// TODO Do not expose this type to other files
1619
type Node<'Item, 'Result> =
1720
{
1821
Info: NodeInfo<'Item>
1922
mutable ProcessedDepsCount: int
2023
mutable Result: 'Result option
2124
}
2225

26+
/// Basic concurrent set implemented using ConcurrentDictionary
27+
type private ConcurrentSet<'a>() =
28+
let dict = ConcurrentDictionary<'a, unit>()
29+
30+
member this.Add(item: 'a): bool =
31+
dict.TryAdd(item, ())
32+
33+
/// <summary>
34+
/// A generic method to generate results for a graph of work items in parallel.
35+
/// Processes leaves first, and after each node has been processed, schedules any now unblocked dependants.
36+
/// Returns a list of results, per item.
37+
/// </summary>
38+
/// <param name="graph">Graph of work items</param>
39+
/// <param name="doWork">A function to generate results for a single item</param>
2340
let processGraphSimple<'Item, 'Result when 'Item: equality and 'Item: comparison>
2441
(graph: Graph<'Item>)
42+
// TODO Avoid exposing mutable nodes to the caller
2543
(doWork: IReadOnlyDictionary<'Item, Node<'Item, 'Result>> -> Node<'Item, 'Result> -> 'Result)
26-
(parallelism: int)
2744
: 'Result[] // Results in order defined in 'graph'
2845
=
2946
let transitiveDeps = graph |> Graph.transitiveOpt
@@ -57,43 +74,43 @@ let processGraphSimple<'Item, 'Result when 'Item: equality and 'Item: comparison
5774
graph.Keys
5875
|> Seq.map (fun item -> item, makeNode item)
5976
|> readOnlyDict
60-
let lookup item = nodes[item]
61-
let lookupMany items = items |> Array.map lookup
77+
let lookupMany items = items |> Array.map (fun item -> nodes[item])
6278
let leaves =
6379
nodes.Values
6480
|> Seq.filter (fun n -> n.Info.Deps.Length = 0)
6581
|> Seq.toArray
6682

6783
printfn $"Node count: {nodes.Count}"
84+
use cts = new CancellationTokenSource()
6885

69-
let work
86+
let mutable processedCount = 0
87+
let waitHandle = new AutoResetEvent(false)
88+
let rec post node =
89+
Async.Start(async {work node}, cts.Token)
90+
and work
7091
(node: Node<'Item, 'Result>)
71-
: Node<'Item, 'Result>[] =
92+
: unit =
7293
let singleRes = doWork nodes node
7394
node.Result <- Some singleRes
74-
// Need to double-check that only one dependency schedules this dependant
75-
let unblocked =
95+
let unblockedDependants =
7696
node.Info.Dependants
7797
|> lookupMany
78-
|> Array.filter (fun x ->
79-
let pdc =
80-
// TODO Not ideal, better ways most likely exist
81-
lock x (fun () ->
82-
x.ProcessedDepsCount <- x.ProcessedDepsCount + 1
83-
x.ProcessedDepsCount)
84-
pdc = x.Info.Deps.Length)
85-
unblocked
86-
87-
use cts = new CancellationTokenSource()
88-
89-
Parallel.processInParallel
90-
leaves
91-
work
92-
parallelism
93-
(fun processedCount -> processedCount = nodes.Count)
94-
cts.Token
95-
(fun x -> x.Info.Item.ToString())
96-
98+
// For every dependant, increment its number of processed dependencies,
99+
// and filter ones which now have all dependencies processed.
100+
|> Array.filter (fun dependant ->
101+
// This counter can be incremented by multiple workers on different threads.
102+
let pdc = Interlocked.Increment(&dependant.ProcessedDepsCount)
103+
// Note: We cannot read 'dependant.ProcessedDepsCount' again to avoid returning the same item multiple times.
104+
pdc = dependant.Info.Deps.Length)
105+
unblockedDependants |> Array.iter post
106+
if Interlocked.Increment(&processedCount) = nodes.Count then
107+
waitHandle.Set() |> ignore
108+
109+
leaves |> Array.iter post
110+
// TODO Handle async exceptions
111+
// q.Error += ...
112+
waitHandle.WaitOne() |> ignore
113+
97114
nodes.Values
98115
|> Seq.map (fun node ->
99116
node.Result

tests/ParallelTypeCheckingTests/Code/TypeCheckingGraphProcessing.fs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ let processGraph<'Item, 'State, 'Result, 'FinalFileResult when 'Item: equality a
5858
(doWork: 'Item -> 'State -> 'Result)
5959
(folder: bool -> 'State -> 'Result -> 'FinalFileResult * 'State)
6060
(emptyState: 'State)
61-
(parallelism: int)
61+
(_parallelism: int)
6262
: 'FinalFileResult[] * 'State =
6363

6464
let work
@@ -79,7 +79,6 @@ let processGraph<'Item, 'State, 'Result, 'FinalFileResult when 'Item: equality a
7979
processGraphSimple
8080
graph
8181
work
82-
parallelism
8382

8483
let finals, state: 'FinalFileResult[] * 'State =
8584
results

0 commit comments

Comments
 (0)