sparkstreaming测试之二使用网络数据源
测试思路:

创新互联致力于互联网品牌建设与网络营销,包括成都网站设计、网站建设、SEO优化、网络推广、整站优化营销策划推广、电子商务、移动互联网营销等。创新互联为不同类型的客户提供良好的互联网应用定制及解决方案,创新互联核心团队十年专注互联网开发,积累了丰富的网站经验,为广大企业客户提供一站式企业网站建设服务,在网站建设行业内树立了良好口碑。
首先,创建网络数据源数据发送器(程序一);
其次,创建spark接收数据程序(程序二);
接着,将程序一打包,放在服务器上执行。这里有三个参数分别是:所要发送的数据文件,通过哪个端口号发送,每隔多少毫秒发送一次数据;
最后,运行spark程序,这里每隔5秒处理一次数据。有两个参数:监听的端口号,每隔多少毫秒接收一次数据。
观察效果。
程序一:
sparkStreaming
import java.io.PrintWriter
import java.net.ServerSocket
import scala.io.Source
object SalaSimulation {
(length: ) = {
java.util.Random
rdm = Random
rdm.nextInt(length)
}
(args: Array[]){
(args.length != ){
System..println()
System.()
}
filename = args()
lines = Source.(filename).getLines.toList
filerow = lines.length
listener = ServerSocket(args().toInt)
(){
socket = listener.accept()
Thread(){
= {
(+socket.getInetAddress)
out = PrintWriter(socket.getOutputStream())
(){
Thread.(args().toLong)
content = lines((filerow))
(content)
out.write(content +)
out.flush()
}
socket.close()
}
}.start()
}
}
}程序二:
sparkStreaming
import org.apache.log4j.{LoggerLevel}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{SecondsStreamingContext}
import org.apache.spark.{SparkContextSparkConf}
import org.apache.spark.streaming.StreamingContext._
object NetworkWordCount {
def main(args: Array[]){
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
conf = SparkConf().setAppName().setMaster()
sc = SparkContext(conf)
ssc = StreamingContext(sc())
lines = ssc.socketTextStream(args()args().toIntStorageLevel.)
words = lines.flatMap(_.split())
wordCounts = words.map(x=>(x)).reduceByKey(_+_)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
网站名称:sparkstreaming测试之二使用网络数据源
分享网址:http://www.jxjierui.cn/article/gohgsi.html


咨询
建站咨询
