通过spark实现点击流日志分析案例

1. 访问的pv

package cn.itcast

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}

  object PV {
  def main(args: Array[String]): Unit = {
        //todo:创建sparkconf,设置appName
        //todo:setMaster("local[2]")在本地模拟spark运行 这里的数字表示 使用2个线程
        val sparkConf: SparkConf = new SparkConf().setAppName("PV").setMaster("local[2]")

        //todo:创建SparkContext
        val sc: SparkContext = new SparkContext(sparkConf)

        //todo:读取数据
        val file: RDD[String] = sc.textFile("d:\\data\\access.log")

        //todo:将一行数据作为输入,输出("pv",1)
        val pvAndOne: RDD[(String, Int)] = file.map(x=>("pv",1))

        //todo:聚合输出
         val totalPV: RDD[(String, Int)] = pvAndOne.reduceByKey(_+_)
         totalPV.foreach(println)

         sc.stop()
  }
}

 


2. 访问的uv
 

package cn.itcast

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}

  object UV {
  def main(args: Array[String]): Unit = {
    //todo:构建SparkConf和 SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("UV").setMaster("local[2]")

    val sc: SparkContext = new SparkContext(sparkConf)

    //todo:读取数据
    val file: RDD[String] = sc.textFile("d:\\data\\access.log")

    //todo:对每一行分隔,获取IP地址
    val ips: RDD[(String)] = file.map(_.split(" ")).map(x=>x(0))

    //todo:对ip地址进行去重,最后输出格式 ("UV",1)
    val uvAndOne: RDD[(String, Int)] = ips.distinct().map(x=>("UV",1))

    //todo:聚合输出
    val totalUV: RDD[(String, Int)] = uvAndOne.reduceByKey(_+_)
    totalUV.foreach(println)

    //todo:数据结果保存
    totalUV.saveAsTextFile("d:\\data\\out")

    sc.stop()
  }
}

 

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。


3. 访问的topN
 

package cn.itcast

  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}

  /**
  * 求访问的topN
  */
  object TopN {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setAppName("TopN").setMaster("local[2]")

    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    //读取数据
    val file: RDD[String] = sc.textFile("d:\\data\\access.log")

    //将一行数据作为输入,输出(来源URL,1)
    val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).filter(_.length>10).map(x=>(x(10),1))

    //聚合 排序-->降序
    val result: RDD[(String, Int)] = refUrlAndOne.reduceByKey(_+_).sortBy(_._2,false)

    //通过take取topN,这里是取前5名
    val finalResult: Array[(String, Int)] = result.take(5)
    println(finalResult.toBuffer)

    sc.stop()
  }
}

 


通过Spark实现ip地址查询
 

1. 需求分析

       在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。

 SparkRDD编程实战 Cloud 第1张

       因此,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。

2. 技术调研

       因为我们的需求是完成一张报表信息,所以对程序的实时性没有要求,所以可以选择内存计算spark来实现上述功能。

3. 架构设计

搭建spark集群

4. 开发流程

4.1. 数据准备

4.2. ip日志信息

在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍

 SparkRDD编程实战 Cloud 第2张

4.3. 城市ip段信息

 SparkRDD编程实战 Cloud 第3张

5. 代码开发

5.1. 思路

1、  加载城市ip段信息,获取ip起始数字和结束数字,经度,维度

2、  加载日志数据,获取ip信息,然后转换为数字,和ip段比较

3、  比较的时候采用二分法查找,找到对应的经度和维度

4、  然后对经度和维度做单词计数

5.2. 代码

package cn.itcast.IPlocaltion

  import java.sql.{Connection, DriverManager, PreparedStatement}
  import org.apache.spark.broadcast.Broadcast
  import org.apache.spark.rdd.RDD
  import org.apache.spark.{SparkConf, SparkContext}

  /**
  * ip地址查询
  */
  object IPLocaltion_Test {
  def main(args: Array[String]): Unit = {
      //todo:创建sparkconf 设置参数
      val sparkConf: SparkConf = new SparkConf().setAppName("IPLocaltion_Test").setMaster("local[2]")

      //todo:创建SparkContext
      val sc = new SparkContext(sparkConf) 

      //todo:读取基站数据
      val data: RDD[String] = sc.textFile("d:\\data\\ip.txt")

      //todo:对基站数据进行切分 ,获取需要的字段 (ipStart,ipEnd,城市位置,经度,纬度)
      val jizhanRDD: RDD[(String, String, String, String, String)] = data.map(_.split("\\|")).map(

        x => (x(2), x(3), x(4) + "-" + x(5) + "-" + x(6) + "-" + x(7) + "-" + x(8), x(13), x(14)))

      //todo:获取RDD的数据
      val jizhanData: Array[(String, String, String, String, String)] = jizhanRDD.collect()

      //todo:广播变量,一个只读的数据区,所有的task都能读到的地方
      val jizhanBroadcast: Broadcast[Array[(String, String, String, String, String)]] = sc.broadcast(jizhanData) 

      //todo:读取目标数据
      val destData: RDD[String] = sc.textFile("d:\\data\\20090121000132.394251.http.format")

      //todo:获取数据中的ip地址字段
      val ipData: RDD[String] = destData.map(_.split("\\|")).map(x=>x(1))

     //todo:把IP地址转化为long类型,然后通过二分法去基站数据中查找,找到的维度做wordCount
     val result=ipData.mapPartitions(iter=>{

      //获取广播变量中的值
      val valueArr: Array[(String, String, String, String, String)] = jizhanBroadcast.value

      //todo:操作分区中的itertator
      iter.map(ip=>{
        //将ip转化为数字long
        val ipNum:Long=ipToLong(ip)

        //拿这个数字long去基站数据中通过二分法查找,返回ip在valueArr中的下标
        val index:Int=binarySearch(ipNum,valueArr)

        //根据下标获取对一个的经纬度
        val tuple = valueArr(index)

        //返回结果 ((经度,维度),1)
        ((tuple._4,tuple._5),1)
      })
    })

    //todo:分组聚合
    val resultFinal: RDD[((String, String), Int)] = result.reduceByKey(_+_)

    //todo:打印输出
    resultFinal.foreach(println)

    //todo:将结果保存到mysql表中
  resultFinal.map(x=>(x._1._1,x._1._2,x._2)).foreachPartition(data2Mysql)
sc.stop()
  }

  //todo:ip转为long类型
  def ipToLong(ip: String): Long = {
    //todo:切分ip地址。
    val ipArray: Array[String] = ip.split("\\.")
    var ipNum=0L

    for(i <- ipArray){
      ipNum=i.toLong | ipNum << 8L
    }
    ipNum
  }

  //todo:通过二分查找法,获取ip在广播变量中的下标
  def binarySearch(ipNum: Long, valueArr: Array[(String, String, String, String, String)]): Int ={
    //todo:口诀:上下循环寻上下,左移右移寻中间
    //开始下标
    var start=0

    //结束下标
    var end=valueArr.length-1

    while(start<=end){
      val middle=(start+end)/2

      if(ipNum>=valueArr(middle)._1.toLong && ipNum<=valueArr(middle)._2.toLong){
        return middle
      }
      if(ipNum > valueArr(middle)._2.toLong){
        start=middle
      }

      if(ipNum<valueArr(middle)._1.toLong){
        end=middle
      }
    }
    -1
  }

  //todo:数据保存到mysql表中
  def data2Mysql(iterator:Iterator[(String,String, Int)]):Unit = {
    //todo:创建数据库连接Connection
    var conn:Connection=null

    //todo:创建PreparedStatement对象
    var ps:PreparedStatement=null

    //todo:采用拼占位符问号的方式写sql语句。
    var sql="insert into iplocaltion(longitude,latitude,total_count) values(?,?,?)"

    //todo:获取数据连接    conn=DriverManager.getConnection("jdbc:mysql://itcast01:3306/spark","root","root123")

    //todo:  选中想被try/catch包围的语句 ctrl+alt+t 快捷键选中try/catch/finally
    try {
        iterator.foreach(line=> {
        //todo:预编译sql语句
        ps = conn.prepareStatement(sql)

        //todo:对占位符设置值,占位符顺序从1开始,第一个参数是占位符的位置,第二个参数是占位符的值。
        ps.setString(1, line._1)
        ps.setString(2, line._2)
        ps.setLong(3, line._3)

        //todo:执行
        ps.execute()
        })
      } catch {
        case e:Exception =>println(e)
      } finally {
        if(ps!=null){
          ps.close()
        }

        if (conn!=null){
          conn.close()
        }
      }
  }
}

 

 

 

 

 

 

 

 

 

 

 


 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄