Skip to content

Commit b788bfb

Browse files
committed
added more debug logging.
1 parent 75b2da7 commit b788bfb

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

graphx/src/main/scala/org/apache/spark/graphx/PrePostProcessWiki.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ object PrePostProcessWikipedia extends Logging {
7777
val xmlRDD = sc.newAPIHadoopFile(rawData, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], hadoopconf)
7878
.map(t => t._2.toString)
7979

80-
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
81-
.filter { art => art.relevant }.repartition(128)
80+
val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) }
81+
82+
val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)
8283
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
8384
val verticesToSave = vertices.map {v => v._1 + "\t"+ v._2}
8485
verticesToSave.saveAsTextFile(vertPath)
@@ -101,8 +102,15 @@ object PrePostProcessWikipedia extends Logging {
101102
.map(t => t._2.toString)
102103
// xmlRDD.count
103104
logWarning(s"XML RDD counted. Found ${xmlRDD.count} raw articles.")
104-
val wikiRDD = xmlRDD.map { raw => new WikiArticle(raw) }
105-
.filter { art => art.relevant }.repartition(128)
105+
106+
val allArtsRDD = xmlRDD.map { raw => new WikiArticle(raw) }.cache
107+
val numRedirects = allArtsRDD.filter { art => art.redirect }.count
108+
val numStubs = allArtsRDD.filter { art => art.stub }.count
109+
val numDisambig = allArtsRDD.filter { art => art.disambig }.count
110+
val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle.notFoundString }.count
111+
logWarning(s"Filter results:\tRedirects: $numRedirects \tStubs: $numStubs \tDisambiguations: $numDisambig \t Title not found: $numTitleNotFound")
112+
113+
val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128)
106114
logWarning(s"wikiRDD counted. Found ${wikiRDD.count} relevant articles.")
107115
val vertices: RDD[(VertexId, String)] = wikiRDD.map { art => (art.vertexID, art.title) }
108116
val edges: RDD[Edge[Double]] = wikiRDD.flatMap { art => art.edges }
@@ -126,6 +134,7 @@ object PrePostProcessWikipedia extends Logging {
126134
var currentGraph = g
127135
logWarning("starting iterations")
128136
for (i <- 0 to numRepetitions) {
137+
val startTime = System.currentTimeMillis
129138
logWarning("starting pagerank")
130139
val pr = PageRank.run(currentGraph, 20)
131140
pr.vertices.count
@@ -167,6 +176,7 @@ object PrePostProcessWikipedia extends Logging {
167176

168177
//(((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size
169178
logWarning(s"Number of connected components for iteration $i: $numCCs")
179+
logWarning(s"TIMEX iter $i ${(System.currentTimeMillis - startTime)/1000.0}")
170180
// TODO will this result in too much memory overhead???
171181
currentGraph = newGraph
172182
}

graphx/src/main/scala/org/apache/spark/graphx/WikiArticle.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ class WikiArticle(wtext: String) extends Serializable {
2424
try {
2525
XML.loadString(tiXML).text
2626
} catch {
27-
case e => "NOTFOUND" // don't use null because we get null pointer exceptions
27+
case e => WikiArticle.notFoundString // don't use null because we get null pointer exceptions
2828
}
2929
}
30-
val relevant: Boolean = !(redirect || stub || disambig || title == "NOTFOUND" || title == null)
30+
val relevant: Boolean = !(redirect || stub || disambig || title == WikiArticle.notFoundString || title == null)
3131
val vertexID: VertexId = WikiArticle.titleHash(title)
3232
val edges: HashSet[Edge[Double]] = {
3333
val temp = neighbors.map { n => Edge(vertexID, n, 1.0) }
@@ -48,6 +48,8 @@ object WikiArticle {
4848
@transient val stubPattern = "\\-stub\\}\\}".r
4949
@transient val linkPattern = Pattern.compile("\\[\\[(.*?)\\]\\]", Pattern.MULTILINE)
5050

51+
val notFoundString = "NOTFOUND"
52+
5153
private def parseLinks(wt: String): Array[String] = {
5254
val linkBuilder = new mutable.ArrayBuffer[String]()
5355
val matcher: Matcher = linkPattern.matcher(wt)

0 commit comments

Comments
 (0)