Custom kout/kneighbor, multi-node-shortest-path, jaccard-similar and template paths#1174
Custom kout/kneighbor, multi-node-shortest-path, jaccard-similar and template paths#1174
Conversation
b2cab74 to
9eb68d8
Compare
Codecov Report
@@ Coverage Diff @@
## master #1174 +/- ##
============================================
+ Coverage 64.98% 65.12% +0.14%
- Complexity 5652 5748 +96
============================================
Files 361 366 +5
Lines 29429 29875 +446
Branches 4158 4222 +64
============================================
+ Hits 19123 19456 +333
- Misses 8352 8443 +91
- Partials 1954 1976 +22
Continue to review full report at Codecov.
|
| import com.baidu.hugegraph.util.ExecutorUtil; | ||
| import com.baidu.hugegraph.util.Log; | ||
|
|
||
| public class Consumers<V> { |
There was a problem hiding this comment.
move Consumers to com/baidu/hugegraph/util
There was a problem hiding this comment.
…h and jaccard similar * fix OLTP algorithm not check if source/target vertex exist (#1156) * support customized kout and kneighbor * support multi node shortest path API * support template paths api * support jaccard similars find oltp api * use multi-thread to accelerate customized kout/kneighbor, multi-node-shortest-path, jaccard-similar and template-paths * big depth and both direction use multi threads, otherwise single threead * fix shortest path api NLP Change-Id: I7eb4ceaccdc4a6b2a4a7b944edc83dff64a98f5d
* template path use concurrent hashset to save paths in concurrent mode * support property filter for paths api Change-Id: I2e589f58cdfc48b4b8d16b7780b78cc17ab107d6
also paths and template paths use single thread Change-Id: I4375c455f377d9e4d43c0284ec30da1725fdf6bc
paths supports nearest args oltp multiple threads reuse add multiple thread depth config Change-Id: I8c2f921ffb56302fb8d300c09a5a13857dac187a
Change-Id: I6d972894f3eec339181e5626bf45d01f36dbd4c0
implements: #1173 Change-Id: I85aa1d4274554d65f85a0deb7ac596e65dbb503b
Change-Id: I73d877fbd2099d9e63ef29ee0a10df30354595f9
Change-Id: Ib6fa985ff48fab203611218cc955079ad8cc90c3
Change-Id: Ia6ff5efd7e2a195f85427992b7bff875d30dec48
Change-Id: Icbc65aa24e069b67bd5653b79473da48e2f973c6
Change-Id: I606772946a2a55f7472274242fb18f4ca91c189a
5635e57 to
25cb47a
Compare
|
|
||
| // Traversal vertices of previous level | ||
| RepeatEdgeStep finalCurrentStep = currentStep; | ||
| traverseIds(this.sources.keySet().iterator(), vid -> { |
Change-Id: If6f0c1370a2376ef176ce2db39e002ae11590e8d
| return new MultivaluedHashMap<>(); | ||
| } | ||
|
|
||
| protected static List<Id> joinPath(Node pre, Node back, boolean ring) { |
| new ConcurrentMultiValuedMap<>(); | ||
| private ConcurrentMultiValuedMap<Id, Node> targetsAll = | ||
| new ConcurrentMultiValuedMap<>(); | ||
| private void traverseOne(Id v, RepeatEdgeStep step, boolean forward) { |
| // Re-init sources | ||
| if (currentStep.remainTimes() > 0) { | ||
| this.sources = newVertices; | ||
| private void reInitCurrentIfNeeded(RepeatEdgeStep step, |
| private void reInitAllIfNeeded(boolean forward) { | ||
| if (forward) { | ||
| // Re-init source all if last forward finished one super step | ||
| // and not last super step |
There was a problem hiding this comment.
not finished last super step? or current step is not the last super step?
and use /*
| } | ||
| } else { | ||
| // Re-init target all if last forward finished one super step | ||
| // and not last super step |
| @Override | ||
| public void traverseOneLayer( | ||
| Map<Id, List<Node>> vertices, RepeatEdgeStep step, | ||
| BiConsumer<Id, RepeatEdgeStep> biConsumer) { |
|
|
||
| protected boolean lastStep() { | ||
| return this.stepCount == this.totalSteps - 1; | ||
| public RepeatEdgeStep step(boolean forward) { |
| this.sourceIndex = i; | ||
| break; | ||
| private void processOneForForward(Id source, Id target) { | ||
| for (Node n : this.sources.get(source)) { |
There was a problem hiding this comment.
rename source to sourceV, and rename n to source, same as target
| if (this.reachLimit()) { | ||
| return; | ||
| private void processOneForBackward(Id source, Id target) { | ||
| for (Node n : this.targets.get(source)) { |
| if (forward) { | ||
| processOneForForward(source, target); | ||
| } else { | ||
| processOneForBackward(source, target); |
There was a problem hiding this comment.
can we merge the two methods with one, and pass sourcesAll or targetsAll to that method
| @SuppressWarnings("unused") | ||
| private void installLicense(HugeConfig config, String md5) { | ||
| LicenseVerifier.instance().install(config, this, md5); | ||
| // LicenseVerifier.instance().install(config, this, md5); |
| Iterator<Vertex> iterator, | ||
| boolean countOnly) { | ||
| List<Map<String, Object>> pathList; | ||
| pathList = new ArrayList<>(); |
| rangeInt(0, 65535), | ||
| 10 | ||
| ); | ||
|
|
| private Traverser concurrentTraverser(List<Id> sources, List<Id> targets, | ||
| EdgeStep step, boolean nearest, | ||
| long capacity, long limit) { | ||
| return new ConcurrentTraverser(sources, targets, step, capacity, limit); |
There was a problem hiding this comment.
also refactor this class, and can we share code with templatepath?
| } | ||
|
|
||
| public Set<Node> customizedKneighborSingle(Id source, EdgeStep step, | ||
| int maxDepth, long limit) { |
There was a problem hiding this comment.
refactor with one method and call newSet() of Single and Concurrent
| if (groupProperty == null) { | ||
| E.checkArgument(minGroups == 0, | ||
| "Can not set min group count when " + | ||
| "group property not set"); |
| if (groupProperty != null) { | ||
| if (groupProperty == null) { | ||
| E.checkArgument(minGroups == 0, | ||
| "Can not set min group count when " + |
| }); | ||
| } | ||
|
|
||
| protected Set<Node> adjacentVertices(Set<Node> vertices, EdgeStep step, |
There was a problem hiding this comment.
move Iterator adjacentVertices() to line 293
| return jaccardSimilarity(sourceNeighbors, targetNeighbors); | ||
| } | ||
|
|
||
| public double jaccardSimilarity(Set<Id> set1, Set<Id> set2) { |
There was a problem hiding this comment.
prefer move kout/jaccardSimilarity out of this class
| return results; | ||
| } | ||
|
|
||
| public Map<Id, Double> jaccardSimilarsSingle(Id source, EdgeStep step, |
Change-Id: I5b7441f562106fa3810d6d844c897add3010ef30
| } else { | ||
| strategy = new SingleTraverseStrategy(this.graph()); | ||
| if (nearest) { | ||
| traverser = new SingleNearestTraverser(sourceList, targetList, |
There was a problem hiding this comment.
no nearest when Concurrent?
| if (--depth < 0 || traverser.reachLimit()) { | ||
| break; | ||
| Traverser traverser; | ||
|
|
| return newSet(true); | ||
| } | ||
|
|
||
| protected static <V> Set<V> newSet(boolean single) { |
There was a problem hiding this comment.
prefer rename to concurrent
|
|
||
| } | ||
|
|
||
| public abstract class PathTraverser { |
| this.beforeTraverse(true); | ||
|
|
||
| // Traversal vertices of previous level | ||
| traverseOneLayer(this.sources, currentStep, this::forward); |
|
|
||
| currentStep.swithDirection(); | ||
| // Traversal vertices of previous level | ||
| traverseOneLayer(this.targets, currentStep, this::backward); |
|
|
||
| private void processOne(Id source, Id target, boolean forward) { | ||
| if (forward) { | ||
| processOneForForward(source, target); |
| if (forward) { | ||
| processOneForForward(source, target); | ||
| } else { | ||
| processOneForBackward(source, target); |
| return this.paths.size(); | ||
| } | ||
|
|
||
| protected boolean finish() { |
| @Override | ||
| public Iterator<Edge> edgesOfVertex(Id source, EdgeStep edgeStep) { | ||
| return super.edgesOfVertex(source, edgeStep); | ||
| } |
| Map<Id, List<HugeTraverser.Node>> newVertices, | ||
| Map<Id, List<HugeTraverser.Node>> targets); | ||
|
|
||
| public Iterator<Edge> edgesOfVertex(Id source, EdgeStep edgeStep); |
Change-Id: I68226c55ca387ecfdd103e4af8b5e45faa9a934e
| implements AutoCloseable { | ||
|
|
||
| protected static ExecutorService executor; | ||
| private static final String EXECUTOR_NAME = "oltp"; |
There was a problem hiding this comment.
also rename TpTraverser to OltpTraverser
| public static TraverseStrategy create(boolean concurrent, HugeGraph graph) { | ||
| return concurrent ? new ConcurrentTraverseStrategy(graph) : | ||
| new SingleTraverseStrategy(graph); | ||
|
|
| int workers = this.config().get(CoreOptions.OLTP_CONCURRENT_THREADS); | ||
| if (workers > 0) { | ||
| executor = Consumers.newThreadPool(name, workers); | ||
| executor = Consumers.newThreadPool(EXECUTOR_NAME, workers); |
There was a problem hiding this comment.
lock for null judgment and assignment
| if (--depth < 0 || traverser.reachLimit()) { | ||
| break; | ||
| Traverser traverser; | ||
|
|
No description provided.