@@ -77,8 +77,9 @@ object PrePostProcessWikipedia extends Logging {
7777 val xmlRDD = sc.newAPIHadoopFile(rawData, classOf [XmlInputFormat ], classOf [LongWritable ], classOf [Text ], hadoopconf)
7878 .map(t => t._2.toString)
7979
80- val wikiRDD = xmlRDD.map { raw => new WikiArticle (raw) }
81- .filter { art => art.relevant }.repartition(128 )
80+ val allArtsRDD = xmlRDD.map { raw => new WikiArticle (raw) }
81+
82+ val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128 )
8283 val vertices : RDD [(VertexId , String )] = wikiRDD.map { art => (art.vertexID, art.title) }
8384 val verticesToSave = vertices.map {v => v._1 + " \t " + v._2}
8485 verticesToSave.saveAsTextFile(vertPath)
@@ -101,8 +102,15 @@ object PrePostProcessWikipedia extends Logging {
101102 .map(t => t._2.toString)
102103 // xmlRDD.count
103104 logWarning(s " XML RDD counted. Found ${xmlRDD.count} raw articles. " )
104- val wikiRDD = xmlRDD.map { raw => new WikiArticle (raw) }
105- .filter { art => art.relevant }.repartition(128 )
105+
106+ val allArtsRDD = xmlRDD.map { raw => new WikiArticle (raw) }.cache
107+ val numRedirects = allArtsRDD.filter { art => art.redirect }.count
108+ val numStubs = allArtsRDD.filter { art => art.stub }.count
109+ val numDisambig = allArtsRDD.filter { art => art.disambig }.count
110+ val numTitleNotFound = allArtsRDD.filter { art => art.title == WikiArticle .notFoundString }.count
111+ logWarning(s " Filter results: \t Redirects: $numRedirects \t Stubs: $numStubs \t Disambiguations: $numDisambig \t Title not found: $numTitleNotFound" )
112+
113+ val wikiRDD = allArtsRDD.filter { art => art.relevant }.repartition(128 )
106114 logWarning(s " wikiRDD counted. Found ${wikiRDD.count} relevant articles. " )
107115 val vertices : RDD [(VertexId , String )] = wikiRDD.map { art => (art.vertexID, art.title) }
108116 val edges : RDD [Edge [Double ]] = wikiRDD.flatMap { art => art.edges }
@@ -126,6 +134,7 @@ object PrePostProcessWikipedia extends Logging {
126134 var currentGraph = g
127135 logWarning(" starting iterations" )
128136 for (i <- 0 to numRepetitions) {
137+ val startTime = System .currentTimeMillis
129138 logWarning(" starting pagerank" )
130139 val pr = PageRank .run(currentGraph, 20 )
131140 pr.vertices.count
@@ -167,6 +176,7 @@ object PrePostProcessWikipedia extends Logging {
167176
168177 // (((set, vtuple) => set.add(vtuple._2)), ((set1, set2) => set1 union set2)).size
169178 logWarning(s " Number of connected components for iteration $i: $numCCs" )
179+ logWarning(s " TIMEX iter $i ${(System .currentTimeMillis - startTime)/ 1000.0 }" )
170180 // TODO will this result in too much memory overhead???
171181 currentGraph = newGraph
172182 }
0 commit comments