spark访问hbase

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark.rdd.NewHadoopRDD

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tmp")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.count()

import scala.collection.JavaConverters._

hBaseRDD.map(tuple => tuple._2).map(result => result.getColumn("cf".getBytes(), "val".getBytes())).map(keyValues => {
( keyValues.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getRow,
  keyValues.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue
)
}).take(10)

hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toChar).mkString
)
}).take(10)

conf.set(TableInputFormat.INPUT_TABLE, "test1")

//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toInt).mkString
)
}).take(10)

import java.nio.ByteBuffer
hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {
(
  row._1.map(_.toChar).mkString,
  ByteBuffer.wrap(row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue).getLong
)
}).take(10)

//conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "lf")
conf.set(TableInputFormat.SCAN_COLUMNS, "lf:app1")

//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

import java.nio.ByteBuffer
hBaseRDD.map(tuple => tuple._2).map(result => {
  ( result.getRow.map(_.toChar).mkString,
    ByteBuffer.wrap(result.value).getLong
  )
}).take(10)

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "test1")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

var rows = hBaseRDD.map(tuple => tuple._2).map(result => result.getRow.map(_.toChar).mkString)
rows.map(row => row.split("\\|")).map(r => if (r.length > 1) (r(0), r(1)) else (r(0), "") ).groupByKey.take(10)

本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1717761

时间: 2016-08-28

spark访问hbase的相关文章

开源大数据技术专场(上午):Spark、HBase、JStorm应用与实践

16日上午9点,2016云栖大会"开源大数据技术专场" (全天)在阿里云技术专家封神的主持下开启.通过封神了解到,在上午的专场中,阿里云高级技术专家无谓.阿里云技术专家封神.阿里巴巴中间件技术部高级技术专家天梧.阿里巴巴中间件技术部资深技术专家纪君祥将给大家带来Hadoop.Spark.HBase.JStorm Turbo等内容. 无谓:Hadoop过去现在未来,从阿里云梯到E-MapReduce 阿里云高级技术专家 无谓  从开辟大数据先河至现在,风雨十年,Hadoop已成为企业的通

如何利用mapreduce访问hbase数据

package com.mr.test; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; imp

Spark与HBase的整合

前言 之前因为仅仅是把HBase当成一个可横向扩展并且具有持久化能力的KV数据库,所以只用在了指标存储上,参看很早之前的一篇文章基于HBase做Storm 实时计算指标存储.这次将HBase用在了用户行为存储上,因为Rowkey的过滤功能也很不错,可以很方便的把按人或者内容的维度过滤出所有的行为.从某种意义上,HBase的是一个有且仅有一个多字段复合索引的存储引擎. 虽然我比较推崇实时计算,不过补数据或者计算历史数据啥的,批处理还是少不了的.对于历史数据的计算,其实我是有两个选择的,一个是基于H

spark 调用hbase出现Cannot create a record reader because of a previous error

问题描述 使用spark调用Hbase时出现Cannotcreatearecordreaderbecauseofapreviouserror异常:org.apache.spark.SparkException:Jobabortedduetostagefailure:Task0instage0.0failed4times,mostrecentfailure:Losttask0.3instage0.0(TID3,zdwlhadoop1):java.io.IOException:Cannotcreat

spark往hbase写数据

问题描述 valresult:org.apache.spark.rdd.RDD[(String,Int)]result.foreach(res=>{varput=newPut(java.util.UUID.randomUUID().toString.reverse.getBytes()).add("lv6".getBytes(),res._1.toString.getBytes(),res._2.toString.getBytes)table.put(put)})上面是程序,re

spark读取hbase空指针异常,跪求大神指导

问题描述 spark版本:1.2.1hbase版本:0.98importorg.apache.hadoop.hbase.HBaseConfigurationimportorg.apache.hadoop.hbase.mapreduce.TableInputFormatimportorg.apache.spark.SparkConfimportorg.apache.spark.SparkContextobjectHBaseTest{defmain(args:Array[String]){valsp

java访问Hbase

package com.db.hadoop.hbase01; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Exec

我为什么建议自建HBase集群应该迁移过来?

引言 最近云HBase商业化了,HBase在业界应用还是比较广泛.在云上环境下中,不少客户都自建了HBase集群,还有一部分用户是把HBase集群放在Hadoop离线集群内部.此文主要对比下云HBase数据库跟自建HBase的差异.另外,在成本上,云HBase数据库跟自建基本差不多,目前云HBase在推广打折阶段,比自建还便宜不少 自建HBase与ApsaraDB HBase对比 自建目前在云上,基本是基于ecs去自己构建,ApsaraDB HBase我们还是做了不少事情的: ApsaraDB

如何访问E-MapReduce中HBase集群

一.创建HBase集群 E-MapReduce在EMR-1.2.0版本开始支持HBase(1.1.1)了,创建集群时注意点如下: 1)选择付费类型 创建集群的基本信息页面可选择付费类型,包括包年包月和按量付费两种,一般HBase集群都是长期存在的,所以选择包年包月价格更实惠. 2)选择软件版本配置 产品版本选择EMR-1.2.0及以上版本,集群类型选择HBASE,目前EMR支持的HBase版本号为1.1.1. 3)集群网络配置 可以选择将HBase集群创建在经典网络环境或者专有网络环境(VPC)