1+ module FSharp.Compiler.Service.Tests.GraphProcessing
2+
3+ open System
4+ open System.Collections .Concurrent
5+ open System.Collections .Generic
6+ open System.Threading
7+
8+ /// Used for processing
9+ type NodeInfo < 'Item > =
10+ {
11+ Item : 'Item
12+ Deps : 'Item []
13+ TransitiveDeps : 'Item []
14+ Dependants : 'Item []
15+ ProcessedDepsCount : int
16+ }
17+ type Node < 'Item , 'State , 'Result > =
18+ {
19+ Info : NodeInfo < 'Item >
20+ Result : ( 'State * 'Result ) option
21+ }
22+
23+ // TODO Do we need to suppress some error logging if we
24+ // TODO apply the same partial results multiple times?
25+ // TODO Maybe we can enable logging only for the final fold
26+ /// <summary >
27+ /// Combine results of dependencies needed to type-check a 'higher' node in the graph
28+ /// </summary >
29+ /// <param name =" deps " >Direct dependencies of a node</param >
30+ /// <param name =" transitiveDeps " >Transitive dependencies of a node</param >
31+ /// <param name =" folder " >A way to fold a single result into existing state</param >
32+ let combineResults
33+ ( deps : Node < 'Item , 'State , 'Result >[])
34+ ( transitiveDeps : Node < 'Item , 'State , 'Result >[])
35+ ( folder : 'State -> 'Result -> 'State )
36+ : 'State
37+ =
38+ let biggestDep =
39+ let sizeMetric node =
40+ // Could also use eg. total file size/AST size
41+ node.Info.TransitiveDeps.Length
42+ deps
43+ |> Array.maxBy sizeMetric
44+ let orFail value =
45+ value
46+ |> Option.defaultWith ( fun () -> failwith " Unexpected lack of result" )
47+ let firstState =
48+ biggestDep.Result
49+ |> orFail
50+ |> fst
51+
52+ // TODO Potential perf optimisation: Keep transDeps in a HashSet from the start,
53+ // avoiding reconstructing the HashSet here
54+
55+ // Add single-file results of remaining transitive deps one-by-one using folder
56+ // Note: Good to preserve order here so that folding happens in file order
57+ let included = HashSet( biggestDep.Info.TransitiveDeps)
58+ let resultsToAdd =
59+ transitiveDeps
60+ |> Array.filter ( fun dep -> included.Contains dep.Info.Item = false )
61+ |> Array.map ( fun dep ->
62+ dep.Result
63+ |> orFail
64+ |> snd
65+ )
66+ let state = Array.fold folder firstState resultsToAdd
67+ state
68+
69+
70+ // TODO Test this version
71+ /// Untested version that uses MailboxProcessor.
72+ /// See http://www.fssnip.net/nX/title/Limit-degree-of-parallelism-using-an-agent for implementation
73+ let processInParallelUsingMailbox
74+ ( firstItems : 'Item [])
75+ ( work : 'Item -> Async < 'Item []>)
76+ ( parallelism : int )
77+ ( notify : int -> unit )
78+ ( ct : CancellationToken )
79+ : unit
80+ =
81+ let processedCountLock = Object()
82+ let mutable processedCount = 0
83+ let agent = Parallel.threadingLimitAgent 10 ct
84+ let rec processItem item =
85+ async {
86+ let! toSchedule = work item
87+ let pc = lock processedCountLock ( fun () -> processedCount <- processedCount + 1 ; processedCount)
88+ notify pc
89+ toSchedule |> Array.iter ( fun x -> agent.Post( Parallel.Start( processItem x)))
90+ }
91+ firstItems |> Array.iter ( fun x -> agent.Post( Parallel.Start( processItem x)))
92+ ()
93+
94+ // TODO Could replace with MailboxProcessor+Tasks/Asyncs instead of BlockingCollection + Threads
95+ // See http://www.fssnip.net/nX/title/Limit-degree-of-parallelism-using-an-agent
96+ let processInParallel
97+ ( firstItems : 'Item [])
98+ ( work : 'Item -> 'Item [])
99+ ( parallelism : int )
100+ ( stop : int -> bool )
101+ ( ct : CancellationToken )
102+ : unit
103+ =
104+ let bc = new BlockingCollection< 'Item>()
105+ firstItems |> Array.iter bc.Add
106+ let processedCountLock = Object()
107+ let mutable processedCount = 0
108+ let processItem item =
109+ let toSchedule = work item
110+ let processedCount = lock processedCountLock ( fun () -> processedCount <- processedCount + 1 ; processedCount)
111+ toSchedule |> Array.iter bc.Add
112+ processedCount
113+
114+ // TODO Could avoid workers with some semaphores
115+ let workerWork () : unit =
116+ for node in bc.GetConsumingEnumerable( ct) do
117+ if not ct.IsCancellationRequested then // improve
118+ let processedCount = processItem node
119+ if stop processedCount then
120+ bc.CompleteAdding()
121+
122+ Array.Parallel.map workerWork |> ignore // use cancellation
123+ ()
124+
125+ let processGraph
126+ ( graph : FileGraph )
127+ ( doWork : 'Item -> 'State -> 'Result * 'State )
128+ ( folder : 'State -> 'Result -> 'State )
129+ ( parallelism : int )
130+ : 'State
131+ =
132+ let transitiveDeps = graph |> calcTransitiveGraph
133+ let dependants = graph |> reverseGraph
134+ let nodes = graph.Keys |> Seq.map ...
135+ let leaves = nodes |> Seq.filter ...
136+ let work
137+ ( node : Node < 'Item , 'State , 'Result >)
138+ : Node < 'Item , 'State , 'Result >[]
139+ =
140+ let inputState = combineResults node.Deps node.TransitiveDeps folder
141+ let res = doWork node.Info.Item
142+ node.Result <- res
143+ let unblocked =
144+ node.Info.Dependants
145+ |> Array.filter ( fun x ->
146+ let pdc =
147+ lock x ( fun () ->
148+ x.Info.ProcessedDepsCount++
149+ x.Info.PrcessedDepsCount
150+ )
151+ pdc = node.Info.Deps.Length
152+ )
153+ |> Array.map ( fun x -> nodes[ x])
154+ unblocked
155+
156+ processInParallel
157+ leaves
158+ work
159+ parallelism
160+ ( fun processedCount -> processedCount = nodes.Length)
161+
162+ let state = combineResults nodes nodes addCheckResultsToTcState
163+ state
0 commit comments