11/// Parallel processing of graph of work items with dependencies
22module ParallelTypeCheckingTests.GraphProcessing
33
4+ open System.Collections .Concurrent
45open System.Collections .Generic
56open System.Threading
7+ open ParallelTypeCheckingTests.Parallel
68
79/// Used for processing
810type NodeInfo < 'Item > =
@@ -13,17 +15,32 @@ type NodeInfo<'Item> =
1315 Dependants: 'Item []
1416 }
1517
18+ // TODO Do not expose this type to other files
1619type Node < 'Item , 'Result > =
1720 {
1821 Info: NodeInfo < 'Item >
1922 mutable ProcessedDepsCount: int
2023 mutable Result: 'Result option
2124 }
2225
26+ /// Basic concurrent set implemented using ConcurrentDictionary
27+ type private ConcurrentSet < 'a >() =
28+ let dict = ConcurrentDictionary< 'a, unit>()
29+
30+ member this.Add ( item : 'a ): bool =
31+ dict.TryAdd( item, ())
32+
33+ /// <summary >
34+ /// A generic method to generate results for a graph of work items in parallel.
35+ /// Processes leaves first, and after each node has been processed, schedules any now unblocked dependants.
36+ /// Returns a list of results, per item.
37+ /// </summary >
38+ /// <param name =" graph " >Graph of work items</param >
39+ /// <param name =" doWork " >A function to generate results for a single item</param >
2340let processGraphSimple < 'Item , 'Result when 'Item : equality and 'Item : comparison >
2441 ( graph : Graph < 'Item >)
42+ // TODO Avoid exposing mutable nodes to the caller
2543 ( doWork : IReadOnlyDictionary < 'Item , Node < 'Item , 'Result >> -> Node < 'Item , 'Result > -> 'Result )
26- ( parallelism : int )
2744 : 'Result [] // Results in order defined in 'graph'
2845 =
2946 let transitiveDeps = graph |> Graph.transitiveOpt
@@ -57,43 +74,43 @@ let processGraphSimple<'Item, 'Result when 'Item: equality and 'Item: comparison
5774 graph.Keys
5875 |> Seq.map ( fun item -> item, makeNode item)
5976 |> readOnlyDict
60- let lookup item = nodes[ item]
61- let lookupMany items = items |> Array.map lookup
77+ let lookupMany items = items |> Array.map ( fun item -> nodes[ item])
6278 let leaves =
6379 nodes.Values
6480 |> Seq.filter ( fun n -> n.Info.Deps.Length = 0 )
6581 |> Seq.toArray
6682
6783 printfn $" Node count: {nodes.Count}"
84+ use cts = new CancellationTokenSource()
6885
69- let work
86+ let mutable processedCount = 0
87+ let waitHandle = new AutoResetEvent( false )
88+ let rec post node =
89+ Async.Start( async { work node}, cts.Token)
90+ and work
7091 ( node : Node < 'Item , 'Result >)
71- : Node < 'Item , 'Result >[] =
92+ : unit =
7293 let singleRes = doWork nodes node
7394 node.Result <- Some singleRes
74- // Need to double-check that only one dependency schedules this dependant
75- let unblocked =
95+ let unblockedDependants =
7696 node.Info.Dependants
7797 |> lookupMany
78- |> Array.filter ( fun x ->
79- let pdc =
80- // TODO Not ideal, better ways most likely exist
81- lock x ( fun () ->
82- x.ProcessedDepsCount <- x.ProcessedDepsCount + 1
83- x.ProcessedDepsCount)
84- pdc = x.Info.Deps.Length)
85- unblocked
86-
87- use cts = new CancellationTokenSource()
88-
89- Parallel.processInParallel
90- leaves
91- work
92- parallelism
93- ( fun processedCount -> processedCount = nodes.Count)
94- cts.Token
95- ( fun x -> x.Info.Item.ToString())
96-
98+ // For every dependant, increment its number of processed dependencies,
99+ // and filter ones which now have all dependencies processed.
100+ |> Array.filter ( fun dependant ->
101+ // This counter can be incremented by multiple workers on different threads.
102+ let pdc = Interlocked.Increment(& dependant.ProcessedDepsCount)
103+ // Note: We cannot read 'dependant.ProcessedDepsCount' again to avoid returning the same item multiple times.
104+ pdc = dependant.Info.Deps.Length)
105+ unblockedDependants |> Array.iter post
106+ if Interlocked.Increment(& processedCount) = nodes.Count then
107+ waitHandle.Set() |> ignore
108+
109+ leaves |> Array.iter post
110+ // TODO Handle async exceptions
111+ // q.Error += ...
112+ waitHandle.WaitOne() |> ignore
113+
97114 nodes.Values
98115 |> Seq.map ( fun node ->
99116 node.Result
0 commit comments