// scalastyle:off println
package [Link]
import [Link].{Seconds, StreamingContext}
import [Link]._
import [Link]._
import [Link]
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
* Run this on your local machine as
*/
object TwitterPopularTags {
def main(args: Array[String]) {
if ([Link] < 4) {
[Link]("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
"<access token> <access token secret> [<filters>]")
[Link](1)
[Link]()
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = [Link](4)
val filters = [Link]([Link] - 4)
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
[Link]("[Link]", consumerKey)
[Link]("[Link]", consumerSecret)
[Link]("[Link]", accessToken)
[Link]("[Link]", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = [Link](ssc, None, filters)
val hashTags = [Link](status => [Link](" ").filter(_.startsWith("#")))
val topCounts60 = [Link]((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))
val topCounts10 = [Link]((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))
// Print popular hashtags
[Link](rdd => {
val topList = [Link](10)
println("\nPopular topics in last 60 seconds (%s total):".format([Link]()))
[Link]{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
[Link](rdd => {
val topList = [Link](10)
println("\nPopular topics in last 10 seconds (%s total):".format([Link]()))
[Link]{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
[Link]()
[Link]()
// scalastyle:on println
package [Link]
import [Link].log4j.{Level, Logger}
import [Link]
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = [Link]
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom [Link] to the classpath.")
[Link]([Link])