//进入终端开启hbase之前需要开启hadoop cd /usr/local/hadoop ./sbin/start-all.sh jps//查看是否成功,成功会 cd /usr/local/hbase ./bin/start-hbase.sh //启动Hbase ./bin/hbase shell //启动hbase shell
在Hbase数据库中,不需要创建数据库,只要直接创建表就可以
//查看已经有的表 hbase> list
- 如果里面已经有一个名称为student的表,请使用如下命令删除(如果不存在student表,就不用执行下面的删除命令了)
hbase> disable 'student' hbase> drop 'student'
- 我们可以在hbase shell中使用下面命令创建:
hbase> create 'student','info' //可通过describe命令查看“student”表的基本信息: hbase> describe 'student' //首先录入student表的第一个学生记录 hbase> put 'student','1','info:name','Xueqian' hbase> put 'student','1','info:gender','F' hbase> put 'student','1','info:age','23' //然后录入student表的第二个学生记录 hbase> put 'student','2','info:name','Weiliang' hbase> put 'student','2','info:gender','M' hbase> put 'student','2','info:age','24' //如果每次只查看一行,就用下面命令 hbase> get 'student','1' //如果每次查看全部数据,就用下面命令 hbase> scan 'student'配置Spark:
另外打开一个终端
cd /usr/local/spark/jars mkdir hbase cd hbase cp /usr/local/hbase/lib/hbase*.jar ./ cp /usr/local/hbase/lib/guava-12.0.1.jar ./ cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./ cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
编写程序读取Hbase数据:把Hbase的lib目录下的一些jar文件拷贝到Spark中,这些都是编程时需要引入的jar包,只有这样,后面编译和运行过程才不会出错。
如果要让Spark读取Hbase,就需要使用SparkContext提供的newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中
cd /usr/local/spark/mycode mkdir hbase cd hbase mkdir -p src/main/scala cd src/main/scala gedit SH.scala
然后,在SH.scala文件中输入以下代码:
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SH { def main(args: Array[String]) { val conf = HbaseConfiguration.create() val sc = new SparkContext(new SparkConf()) //设置查询的表名 conf.set(TableInputFormat.INPUT_TABLE, "student") val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = stuRDD.count() println("Students RDD Count:" + count) stuRDD.cache() //遍历输出 stuRDD.foreach({ case (_,result) => val key = Bytes.toString(result.getRow) val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes)) val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes)) val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age) }) } }
然后就可以用sbt打包编译。不过,在编译之前,需要新建一个simple.sbt文件,在simple.sbt配置文件中,需要知道scalaVersion、spark-core、hbase-client、hbase-common、hbase-server的版本号
cd /usr/local/hbase //电脑的hbase安装目录 cd lib ls
ls命令会把“/usr/local/hbase/lib”目录下的所有jar文件全部列出来
有了这些版本号信息,我们就可以新建一个simple.sbt文件:
cd /usr/local/spark/mycode/hbase gedit simple.sbt
//版本号要输对 name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.5" libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.5" libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.5"
下面就可以运行sbt打包命令
cd /usr/local/spark/mycode/hbase //一定把这个设置为当前目录 /usr/local/sbt/sbt package
执行后得到如下结果
Students RDD Count:2 Row key:1 Name:Xueqian Gender:F Age:23 Row key:2 Name:Weiliang Gender:M Age:24编写程序向Hbase写入数据:
下面编写程序向Hbase中写入两行数据
cd /usr/local/spark/mycode/hbase gedit src/main/scala/SW.scala
在SW.scala文件中输入下面代码
import org.apache.hadoop.hbase.HbaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.spark._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes object SW { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("SparkWriteHbase").setMaster("local") val sc = new SparkContext(sparkConf) val tablename = "student" sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename) val job = new Job(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //构建两行记录 val rdd = indataRDD.map(_.split(',')).map{arr=>{ val put = new Put(Bytes.toBytes(arr(0))) //行健的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) //info:name列的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2))) //info:gender列的值 put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt)) //info:age列的值 (new ImmutableBytesWritable, put) }} rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) } }
保存并退出gedit编辑器,然后,使用sbt打包编译,命令如下:
cd /usr/local/spark/mycode/hbase /usr/local/sbt/sbt package
最后,通过 spark-submit 运行程序。我们就可以将生成的 jar 包通过 spark-submit 提交到 Spark 中运行了,命令如下:
/usr/local/spark/bin/spark-submit --driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf --class "SW" /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
执行后,我们可以切换到刚才的Hbase终端窗口,在Hbase shell中输入如下命令查看结果:
hbase> scan 'student'
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)