需要注意:在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包,需要我们另行下载。
打开
spark-example-1.6.0.jar
下载jar包,然后将该jar包放入到如下文件夹中: /usr/local/spark-2.3.0/jars/hbase
然后,使用vim编辑器打开spark-env.sh文件,设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,在文件最前面增加下面一行内容:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*
可以测试一下服务器环境是否部署完成:
输入pyspark命令,进入到spark的shell下,依次输入如下代码:
```
host = 'xxx.xxx.x.1'
table = 'demotable'
spark_host = "spark://xxx.xxx.x.1:7077"
spark = SparkSession.builder.master(spark_host).appName("ps").getOrCreate()
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
```
查看是否有报错,正常情况应该打印出
demotable
表中的数据。
2. windows10本地环境部署
修改windows的hosts文件(C:\Windows\System32\drivers\etc),在文件末尾添加如下代码:
其IP地址和名字要和服务器集群配置一致。
XXX.XXX.x.4 hadoop4
XXX.XXX.x.3 hadoop3
XXX.XXX.x.1 hadoop1
XXX.XXX.x.2 hadoop2
2.1 从官网下载spark的部署包到本地
3. pycharm项目设置
新建项目目,创建一个python文件,使用如下代码,访问hbase数据表 'H_99SleepData' ,应该打印出该表的数据
from pyspark.sql import SparkSession
spark_local = "local[*]"
host = 'XXX.XXX.X.X'
#table name
table = 'demotable'
#建立spark连接
spark = SparkSession.builder.master(spark_local).appName("test").getOrCreate()
hbaseconf = {"hbase.zookeeper.quorum": host,
"hbase.mapreduce.inputtable": table,
#定义起止行
#"hbase.mapreduce.scan.row.start": row,
# "hbase.mapreduce.scan.row.stop": row1
}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
#得到rdd
hbase_rdd = spark.sparkContext.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv, valueConverter=valueConv, conf=hbaseconf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
运行代码,输出结果
4. Hbase 写数据
# Hbase写数据 已创建数据表:studentSparkdemo,列族为:info
rawData = ['5,info,name,Xinqing','5,info,gender,女']
# ( rowkey , [ row key , column family , column name , value ] )
keyConv_w = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv_w = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": "XXX.XXX.X.X",
"hbase.mapred.outputtable": "studentSparkdemo",
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
spark.sparkContext.parallelize(rawData)\
.map(lambda x: (x[0], x.split(',')))\
.saveAsNewAPIHadoopDataset(
conf=conf,
keyConverter=keyConv_w,
valueConverter=valueConv_w)
5. 遇到的问题:
5.1. winutils :
将路径加入到classpath中: