需要注意:在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包,需要我们另行下载。

打开 spark-example-1.6.0.jar 下载jar包,然后将该jar包放入到如下文件夹中: /usr/local/spark-2.3.0/jars/hbase

然后,使用vim编辑器打开spark-env.sh文件,设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,在文件最前面增加下面一行内容:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/usr/local/hbase/bin/hbase classpath):/usr/local/spark/jars/hbase/*

可以测试一下服务器环境是否部署完成:

输入pyspark命令,进入到spark的shell下,依次输入如下代码:

```

host = 'xxx.xxx.x.1'

table = 'demotable'

spark_host = "spark://xxx.xxx.x.1:7077"

spark = SparkSession.builder.master(spark_host).appName("ps").getOrCreate()

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)

count = hbase_rdd.count()

hbase_rdd.cache()

output = hbase_rdd.collect()

for (k, v) in output:

print (k, v)

```

查看是否有报错,正常情况应该打印出 demotable 表中的数据。

2. windows10本地环境部署

修改windows的hosts文件(C:\Windows\System32\drivers\etc),在文件末尾添加如下代码:

其IP地址和名字要和服务器集群配置一致。

XXX.XXX.x.4 hadoop4

XXX.XXX.x.3 hadoop3

XXX.XXX.x.1 hadoop1

XXX.XXX.x.2 hadoop2

2.1 从官网下载spark的部署包到本地

https://archive.apache.org/dist/spark/spark-2.3.0/

3. pycharm项目设置

新建项目目,创建一个python文件,使用如下代码,访问hbase数据表 'H_99SleepData'  ,应该打印出该表的数据

from pyspark.sql import SparkSession

spark_local = "local[*]"

host = 'XXX.XXX.X.X'

#table name

table = 'demotable'

#建立spark连接

spark = SparkSession.builder.master(spark_local).appName("test").getOrCreate()

hbaseconf = {"hbase.zookeeper.quorum": host,

"hbase.mapreduce.inputtable": table,

#定义起止行

#"hbase.mapreduce.scan.row.start": row,

# "hbase.mapreduce.scan.row.stop": row1

}

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

#得到rdd

hbase_rdd = spark.sparkContext.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",

"org.apache.hadoop.hbase.io.ImmutableBytesWritable",

"org.apache.hadoop.hbase.client.Result",

keyConverter=keyConv, valueConverter=valueConv, conf=hbaseconf)

count = hbase_rdd.count()

hbase_rdd.cache()

output = hbase_rdd.collect()

for (k, v) in output:

print (k, v)

运行代码,输出结果


4. Hbase 写数据

# Hbase写数据 已创建数据表:studentSparkdemo,列族为:info

rawData = ['5,info,name,Xinqing','5,info,gender,女']

# ( rowkey , [ row key , column family , column name , value ] )

keyConv_w = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"

valueConv_w = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

conf = {"hbase.zookeeper.quorum": "XXX.XXX.X.X",

"hbase.mapred.outputtable": "studentSparkdemo",

"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",

"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",

"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}

spark.sparkContext.parallelize(rawData)\

.map(lambda x: (x[0], x.split(',')))\

.saveAsNewAPIHadoopDataset(

conf=conf,

keyConverter=keyConv_w,

valueConverter=valueConv_w)

5. 遇到的问题:

5.1. winutils :

将路径加入到classpath中: