@@ -18,9 +18,9 @@ let runCompiler () =
1818type GenericNode < 'State , 'SingleResult > =
1919 {
2020 Idx : FileIdx
21- Deps : FileIdx []
22- TransitiveDeps : FileIdx []
23- Dependants : FileIdx []
21+ mutable Deps : GenericNode < 'State , 'SingleResult > []
22+ mutable TransitiveDeps : GenericNode < 'State , 'SingleResult > []
23+ mutable Dependants : GenericNode < 'State , 'SingleResult > []
2424 mutable Result : ( 'SingleResult * 'State ) option
2525 mutable UnprocessedDepsCount : int
2626 _lock : Object
@@ -30,6 +30,7 @@ type GenericNode<'State, 'SingleResult> =
3030 | :? GenericNode< 'State, 'SingleResult> as other -> ( this.Idx = other.Idx)
3131 | _ -> false
3232 override this.GetHashCode () = this.Idx.Idx
33+ override this.ToString () = this.Idx.ToString()
3334
3435module Node =
3536 let idx ( node : GenericNode < _ , _ >) = node.Idx
@@ -44,17 +45,19 @@ type Node = GenericNode<State, SingleResult>
4445/// </summary >
4546/// <param name =" graph " ></param >
4647/// <param name =" deps " >Transitive deps</param >
47- let combineResults < 'State , 'SingleResult > ( graph : IDictionary < FileIdx , GenericNode < 'State , 'SingleResult >>) ( transitiveDeps : FileIdx []) ( folder : 'State -> 'SingleResult -> 'State ) : 'State =
48+ let combineResults < 'State , 'SingleResult >
49+ ( transitiveDeps : GenericNode < 'State , 'SingleResult >[])
50+ ( folder : 'State -> 'SingleResult -> 'State ) : 'State
51+ =
4852 // Find the child with most transitive deps
4953 let biggestChild =
5054 transitiveDeps
51- |> Array.map ( fun d -> graph[ d])
5255 |> Array.maxBy ( fun n -> n.TransitiveDeps.Length)
5356
5457 // Start with that child's state
5558 let state = biggestChild.Result |> Option.defaultWith ( fun () -> failwith " Unexpected lack of result" ) |> snd
5659
57- let alreadyIncluded = HashSet< FileIdx >( biggestChild.TransitiveDeps, HashIdentity.Structural)
60+ let alreadyIncluded = HashSet< GenericNode < 'State , 'SingleResult > >( biggestChild.TransitiveDeps, HashIdentity.Structural)
5861
5962 // Find individual results from all transitive deps that were not in biggestChild
6063 let toBeAdded =
@@ -64,7 +67,7 @@ let combineResults<'State, 'SingleResult> (graph : IDictionary<FileIdx, GenericN
6467 // Add those results to the initial one
6568 let state =
6669 toBeAdded
67- |> Array.map ( fun d -> graph [ d ] . Result |> Option.defaultWith ( fun () -> failwith " Unexpected lack of result" ) |> fst)
70+ |> Array.map ( fun d -> d .Result |> Option.defaultWith ( fun () -> failwith " Unexpected lack of result" ) |> fst)
6871 |> Array.fold folder state
6972
7073 state
@@ -76,56 +79,55 @@ let actualActualWork (idx : FileIdx) (state : State) : SingleResult =
7679 let thisResult = idx.Idx
7780 thisResult
7881
79- let processGraph ( graph : IDictionary < FileIdx , Node >) ( work : FileIdx -> SingleResult * State ) =
80-
82+ let processGraph < 'State , 'SingleResult >
83+ ( graph : GenericNode < 'State , 'SingleResult >[])
84+ ( work : GenericNode < 'State , 'SingleResult > -> 'State -> 'SingleResult )
85+ ( folder : 'State -> 'SingleResult -> 'State )
86+ =
8187 printfn " start"
82- use q = new BlockingCollection< FileIdx >()
88+ use q = new BlockingCollection< GenericNode < 'State , 'SingleResult > >()
8389
8490 // Add leaves to the queue
8591 let filesWithoutDeps =
8692 graph
87- |> Seq.filter ( fun x -> x.Value.UnprocessedDepsCount = 0 )
88- filesWithoutDeps
89- |> Seq.iter ( fun f -> q.Add( f.Key))
93+ |> Seq.filter ( fun x -> x.UnprocessedDepsCount = 0 )
94+ |> Seq.iter ( fun f -> q.Add( f))
9095
9196 // Keep track of the number of items to be processed
92- let l = Object()
93- let mutable unprocessedCount = graph.Count
97+ let _lock = Object()
98+ let mutable unprocessedCount = graph.Length
9499
95100 let decrementProcessedCount () =
96- lock l ( fun () ->
101+ lock _ lock ( fun () ->
97102 unprocessedCount <- unprocessedCount - 1
98103 printfn $" UnprocessedCount = {unprocessedCount}"
99104 )
100105
101106 // Processing of a single node/file
102- let go ( idx : FileIdx ) : unit =
103- let node = graph[ idx]
104- printfn $" Start {idx} -> %+A {node.Deps}"
107+ let go ( node : GenericNode < 'State , 'SingleResult >) : unit =
108+ printfn $" Start {node} -> %+A {node.Deps}"
105109 Thread.Sleep( 500 )
106- let node = graph[ idx]
107- let state = combineResults graph node.TransitiveDeps fold
108- let singleResult = actualActualWork idx state
110+ let state = combineResults node.TransitiveDeps folder
111+ let singleResult = work node state
109112 node.Result <- Some ( singleResult, state)
110- printfn $" Stop {idx } work - SingleResult={singleResult} State={state}"
113+ printfn $" Stop {node } work - SingleResult={singleResult} State={state}"
111114
112115 // Increment processed deps count for all dependants and schedule those who are now unblocked
113116 node.Dependants
114117 |> Array.iter ( fun dependant ->
115- let node = graph[ dependant]
116118 let unprocessedDepsCount =
117- lock node ._ lock ( fun () ->
118- node .UnprocessedDepsCount <- node .UnprocessedDepsCount - 1
119- node .UnprocessedDepsCount
119+ lock dependant ._ lock ( fun () ->
120+ dependant .UnprocessedDepsCount <- dependant .UnprocessedDepsCount - 1
121+ dependant .UnprocessedDepsCount
120122 )
121- printfn $" {idx }'s dependant {dependant} now has {unprocessedDepsCount} unprocessed deps left"
123+ printfn $" {node }'s dependant {dependant} now has {unprocessedDepsCount} unprocessed deps left"
122124 // Dependant is unblocked - schedule it
123125 if unprocessedDepsCount = 0 then
124126 printfn $" Scheduling {dependant}"
125127 q.Add( dependant)
126128 )
127129
128- printfn $" Quitting {idx }"
130+ printfn $" Quitting {node }"
129131 decrementProcessedCount ()
130132 ()
131133
@@ -135,7 +137,7 @@ let processGraph (graph : IDictionary<FileIdx, Node>) (work : FileIdx -> SingleR
135137 |> Seq.iter go
136138 printfn $" end worker {idx}"
137139
138- let maxParallel = 4
140+ let maxParallel = 4 // TODO Change - base on CPU count?
139141 printfn " workers"
140142 let workers =
141143 [| 1 .. maxParallel|]
@@ -149,11 +151,10 @@ let processGraph (graph : IDictionary<FileIdx, Node>) (work : FileIdx -> SingleR
149151 printfn " waitall"
150152 Task.WaitAll workers
151153
152- let fullResult = combineResults graph ( graph.Values |> Seq.map Node.idx |> Seq.toArray )
154+ let fullResult = combineResults graph
153155
154156 printfn $" End result: {fullResult}"
155157
156-
157158[<Test>]
158159let runGrapher () =
159160 // let args =
@@ -177,9 +178,21 @@ let runGrapher () =
177178 let transitiveDeps = deps |> FileGraph.calcTransitiveGraph
178179 let transitiveDependants = transitiveDeps |> FileGraph.reverse
179180
181+ let nodes =
182+ deps.Keys
183+ |> Seq.map ( fun idx -> idx, { Idx = idx; Deps = [||]; Dependants = [||]; TransitiveDeps = [||]; Result = None; UnprocessedDepsCount = 0 ; _ lock = Object()})
184+ |> readOnlyDict
185+
186+ let processs deps = deps |> Array.map ( fun d -> nodes[ d])
187+
180188 let graph =
181- transitiveDeps
182- |> Seq.map ( fun ( KeyValue ( idx , deps )) -> idx, { Idx = idx; Deps = deps; Dependants = dependants[ idx]; TransitiveDeps = transitiveDependants[ idx]; ThisResult = None; PartialResult = None; UnprocessedDepsCount = deps.Length; _ lock = Object()})
183- |> dict
189+ nodes
190+ |> Seq.iter ( fun ( KeyValue ( idx , node )) ->
191+ node.Deps <- processs deps[ idx]
192+ node.TransitiveDeps <- processs transitiveDeps[ idx]
193+ node.Dependants <- processs dependants[ idx]
194+ node.UnprocessedDepsCount <- node.Deps.Length
195+ )
196+ nodes.Values |> Seq.toArray
184197
185198 processGraph graph
0 commit comments