当我们将一个文本文件读取为RDD时,输入的每一行都会成为RDD的一个元素,也可以将多个完整的文本文件一次性读取为一个pair RDD,其中键是文件名,值是文件内容。
读取文本文件
只需要使用文件路径作为参数调用SparkContext中的textFile()函数,就可以读取一个文本文件
读取一个文本文件
val input = sc.textFile("file:///home/holden/repos/spark/README.md")
如果文件足够小,可以使用SparkContext,wholeTextFiles()方法,该方法会返回一个pair RDD,其中键是输入文件的文件名
val input = sc.wholeTextFiles("file:///home/holden/salesFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
JSON是一种使用较广的半结构化数据格式,读取JSON数据的最简单的方法可以在所有支持的编程语言中使用。然后使用JSON解释器来对RDD中的值进行映射操作,Scala中也可以使用一个自定义Hadoop格式来操作JSON数据。
在scala中读取JSON
import com.fasterxml.jackson.moudle.scala.DefaultScalaMoudle
import com.fasterxml.jackson.moudle.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMappr
import com.fasterxml.jackson.databind.DeserializationFeature
case class Person(name:String,lovesPandas:Boolean //必须是顶级类
//将其解析为特定的case class。使用flatMap,通过在遇到问题时返回空列表(None)
//来处理错误,而在没有问题时返回包含一个元素的列表(Some(_))
val result =input.flatMap(record =>{
Some(mapper.readValue(record,classOf[Person]))
}catch{
case e:Exception => None
在scala中保存为JSON
result.filter(p => p.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
逗号分隔值(CSV)与制表符分割值(TSV)
读取CSV
读取CSV/TSV数据和读取JSON数据相似,都需要先把文件当做普通文本文件来读取数据,再对数据进行处理。由于格式标准的缺失,同一个库的不同版本有时也会用不同的方式处理输入数据。
如果你的CSV的所有数据字段均没有包含换行符,你也可以使用textFile()读取并解析数据
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile(inputFile)
val result = input.map{line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
如果在字段中嵌有换行符,就需要完整读入每个文件,然后解析。
case class Person(name: String,favoriteAnimal:String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{case (_,txt) =>
val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x => Person(x(0),x(1)))
保存CSV
写出CSV/TSV数据很简单,可以通过重用输出编码器来加速
pandaLovers.map(persion => List(person.name,person.favoriteAnimal).toArray).mapPartitions{
people => val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
上述的例子中只能在我们知道所有要输出的字段时使用,然而如果一些字段名是在运行时由用户输入决定的,就要使用别的方法了,最简单的方法是遍历所有的数据,提取不同的键,然后分别输出。
SequenceFile
SequenceFile是由没有相对关系结构的键值对文件组成常用的Hadoop格式,SequenceFile文件有同步标记,Spark可以用它来定位到文件中的某个点,然后再与记录的边界对其。这可以让Spark使用多个节点高效的并行读取SequenceFile文件。
读取SequenceFile
Spark有专门用来读取SequenceFile的接口。SparkContext中,可以调用sequenceFile(path,keyClass,valueClass,minPartitions)
val data = sc.sequenceFile(infile,classOf[Text],classOf[IntWritable]).
map{case(x,y)=> (x.toString,y.get())}
保存SequenceFile
val data = sc.parallelize(List(("panda",3),("key",6),("Snail",2)))
data.saveAsSequenceFile(outputFile)
文件格式Spark对文件的读取和保存方式都很简单,会根据文件的扩展名选择对应的处理方式Spark支持的一些常见格式 格式名称 结构化 备注 文本文件 否 普通的文本文件,每行一条记录 JSON 半结构化 常见的基于文本的格式,半结构化,大多数库都要求每行一条记录 CSV 是 非常常见的基于文本的格式,通常在电子表格应用中使用 ...
可阅读可评论可分享可转载,希望向优秀的人学习
Scala 是一门多范式(multi-paradigm)的编程语言,设计初衷是要集成面向对象编程和函数式编程的各种特性。Scala 运行在Java虚拟机上,并兼容现有的Java程序。java、Scala都是基于JVM的编程语言(文件编译成class文件保存),类相互之间可以调用,Scala并可以调用现有的Java类库;Spark1.6中使用的是Sacla2.10。
面向过程:需要你自己去一步一步的执行
面向函数:也是需要自己去一步一步执行,只是执行的过程已经提前设定好了
今天在Windows上跑Scala词频统计的时候,发现RDD文件并不能直接保存到本地的txt文件中,如果使用saveAsTextFile方法的话,会将RDD数据以文件夹且数据不能直接使用
但是使用其他的方法又比较麻烦,对新手很不友好,这里教大家一个小方法
将RDD数据使用toArray()转为数组即可遍历写入本地文件了,如果有更好的方法,私信笔者,毕竟该方法Scala官方已经不推荐使用了。
代码如下:
package day01
import java.io.PrintWriter
import java
package study.spark.core.rdd.builder
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Memory_Par1 {
def main(args: Array[String]): Unit = {
// TODO 准备环境
val sparkConf =
文章目录读取数据源数据格式保存JSONcsvSequenceFile对象文件非文件系统数据源protocol buffer文件压缩文件系统Spark SQLApache Hive数据库
本地或分布式文件系统(NFS、HDFS等)
Spark中的结构化数据源
Cassandra、HBase、Elasticsearch、JDBC源
数据格式
文本文件、JSON、CSV、Sequenc...
object CSVSecLine {
def main(args: Array[String]): Unit = {
val fileNames: ListBuffer[String] = getFileName("H:\\csv")
//遍历文件夹中所有文件名,读取其第二行数据,写入新的文件
for (fileName <- fil...
val path2 ="E:\\data\\TEST1.txt"
val data = sc.textFile(path2)data.foreach(println)
直接将文件的绝对路径放在某个变量中,然后再使用textFile()读取里面的内容
但是一般开发都采用这种方法吧
二,相对路径
> [hadoop@mini1 ~]$ start-all.sh
3. 启动kafka(三台机器)
> [hadoop@mini1 kafka_2.12-0.11.0.2]$ bin/kafka-server-start.sh config/server.properties
4. 创建topic
> [hadoop@mini1 kafka_2.12-0.11.0.2]$ bin/kafka-topics.sh \
--create \
--zookeeper mini1:2181 \
--replication-factor 1 \
--partitions 1 \
--topic traffic
5. 启动consumer
> [hadoop@mini1 kafka_2.12-0.11.0.2]$ bin/kafka-console-consumer.sh \
--zookeeper mini1:2181 \
--topic traffic \
--from-beginning
6. 运行程序Producer
![produce](https://github.com/linwt/TrafficForecast-SparkMLlib/blob/master/picture/produce.png)
### 消费者模块
- 功能:消费kafka数据,并将处理后的数据存储到Redis中
- 操作步骤
1. 启动Redis
> 服务端:[hadoop@mini1 redis]# bin/redis-server ~/apps/redis/etc/redis.conf \
> 客户端:[hadoop@mini1 redis]# bin/redis-cli
2. 运行程序SparkConsumer
3. 查看Redis数据库
> 127.0.0.1:6379> select 1 \
> 127.0.0.1:6379[1]> keys * \
> 127.0.0.1:6379[1]> hgetall “20180823_0015”
![redis](https://github.com/linwt/TrafficForecast-SparkMLlib/blob/master/picture/redis.png)
### 数据建模模块
- 功能:读取Redis数据库数据,进行数据建模,并将模型保存到hdfs
- 操作步骤
1. 运行程序Train
2. web访问hdfs,查看保存结果
> mini1:50070
![labelPoint](https://github.com/linwt/TrafficForecast-SparkMLlib/blob/master/picture/labelPoint.png)
RSA实施
此代码实现了 RSA 加密算法,以提供用于学习目的的加密/解密机制。 该算法的描述可以在找到。
实现基本上有两个功能:加密和解密。 一旦我们读取了一个文件,我们就会得到一个字节数组。 然后我们对每个字节应用加密函数。 此加密数据使用名称格式 <output>.data 保存在同一目录下,其中 mmmm 是当前系统时间的最后四位数字。 此输出文件包含加密数据。
为了解密我们生成的文件,系统读取行并将解密函数应用于读取的每个值。 解密后,数据打印在界面上。
此实现的后备是加密文件格式。 我用空格分隔加密数据,以便知道如何正确解密它们。 此外,由于我对每个字节进行加密,因此如果文件太大,操作可能需要一些时间。
要运行该程序,您的机器上应该同时安装了 scala 和 sbt。 在命令行上,运行“sbt run”,项目依赖将被解析,项目将被启动。
spark_hbase
Spark有自己的示例,该示例将HBase和Spark集成到scala 和python转换器。
但是, HBaseResultToStringConverter中的python转换器HBaseResultToStringConverter 返回结果中第一列的值。 并且仅在返回org.apache.hadoop.hbase.client.Result并执行.count()调用时停止。
在这里,我们提供了Scala中的一个新示例,该示例涉及通过Spark将hbase中保存的数据传输到String ,以及python转换器的新示例。
scala 的示例将保存在hbase中的数据传输到RDD[String] ,该数据包含columnFamily,qualifier,timestamp,type,value 。
python的转换器示例将hbase中保存的数据传输到
Hadoop日志分析器
这个用 Scala、Spark 和 MLLIB 编写的应用程序的总体目标是根据日志数据预测应用程序故障。
我的解决方案由两个模块组成:解析器LogParser和日志分析器LogAnalysis 。 我解析了 5 种类型的日志,这些日志将解释用于机器学习部分的 6 个特征。
持续时间:我计算应用程序的开始时间和结束时间之间的差异。
分配的容器:我计算每个应用程序分配的容器数量。
Killed Container:我计算每个应用程序被杀死的容器数量。
成功容器:我计算成功退出的容器数量。 (这个似乎与前一个多余。所以我不使用它)。
内存占用(占 2 个功能):我计算了物理内存和虚拟内存在可用总内存中的比率。
日志分析器的工作方式如下:首先它读取每个日志行并解析它。 然后过滤掉未定义的日志行。 然后日志按应用程序 id 分组。
对于每个应用程序
SBT依赖项:
libraryDependencies + = " com.github.andyglow " %% " scala-jsonschema " % < version> // <-- required
libraryDependencies ++ = Seq (
" com.github.andyglow " %% " scala-jsonschema-core " % < version>, // <-- transitive
" com.github.andyglow " %% "
spark远程调试总结分享
scala实现读取数据然后提交spark案例
linux和windows环境:hadoop-2.7.5、spark2.1.2、jdk1.8、scala2.11、mongodb2.0.3 (linux和windows版本要保持一致如果不不一致,会报ClassNotFound等异常)
1.环境简述
这里我在开了三台虚拟机,hadoop01、02、03。
hadoop01为namenode,02和03为datanode。
hadoop01为master和worker。02和03为wor
键值对RDD数据分区器
Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数
(1)只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
(2)每个RDD的分区ID范围:0~nu...
文章目录通用的加载和保存方式ParquetJSONCSV
通用的加载和保存方式
SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL 默认读取和保存的文件格式为 parquet
1)加载数据
spark.read.load 是加载数据的通用方法
scala> spark.read.
csv format jdbc json load option options orc parquet schema
spark = SparkSession.builder \
.appName("Spark Hive Example") \
.config("spark.sql.warehouse.dir", "/path/to/hive/warehouse") \
.enableHiveSupport() \
.getOrCreate()
其中,`appName`为应用程序名称,`config`为Hive的仓库目录,`enableHiveSupport`为启用Hive支持。
3. 读取Hive表数据
使用Spark连接Hive后,可以通过以下代码读取Hive表数据:
df = spark.sql("SELECT * FROM hive_table")
其中,`hive_table`为Hive中的表名。
4. 将数据保存到Hive中
使用Spark连接Hive后,可以通过以下代码将数据保存到Hive中:
df.write.mode("overwrite").saveAsTable("hive_table")
其中,`mode`为写入模式,`saveAsTable`为保存到Hive表中。
完整代码示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Spark Hive Example") \
.config("spark.sql.warehouse.dir", "/path/to/hive/warehouse") \
.enableHiveSupport() \
.getOrCreate()
df = spark.sql("SELECT * FROM hive_table")
df.write.mode("overwrite").saveAsTable("hive_table")
注意:在使用Spark连接Hive时,需要确保Spark和Hive的版本兼容。
### 回答2:
问题:如何使用Spark连接Hive并保存数据?
解决方法:要使用Spark连接Hive并保存数据,需要按照以下步骤进行操作:
1. 配置Spark环境:确保安装了Spark和Hive,并在Spark配置文件中指定Hive的配置信息。
2. 创建SparkSession:在Spark中,可以通过创建SparkSession与Hive进行交互。可以使用以下代码创建一个SparkSession对象:
```scala
val spark = SparkSession.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
3. 加载Hive表数据:可以使用SparkSession的read方法加载Hive表数据,并创建一个DataFrame对象,例如:
```scala
val data = spark.read.table("database_name.table_name")
4. 在DataFrame上进行转换和处理:可以对加载的数据进行各种转换和处理操作,例如添加新列、过滤数据等。
5. 保存数据到Hive表:可以使用DataFrame的write方法将数据保存到Hive表中,例如:
```scala
data.write.mode("overwrite").saveAsTable("database_name.table_name")
这将会将数据覆盖性地保存到指定的Hive表中。
以上就是使用Spark连接Hive并保存数据的基本步骤。通过配置环境、创建SparkSession对象、加载Hive表数据、进行数据转换和处理以及保存数据到Hive表,可以实现Spark与Hive的连接和数据操作。
### 回答3:
在使用Spark连接Hive并保存数据时,可能会遇到以下问题:
1. 如何在Spark中连接Hive?
2. 如何将Spark处理的数据保存到Hive表中?
解决方案:
1. 在Spark中连接Hive可以通过配置Hive元数据连接来实现。首先,确保在Spark的配置文件中,如spark-defaults.conf中,设置了Spark的master地址。然后,引入Hive的依赖,创建一个SparkSession对象,并设置其配置属性hive.metastore.uris为Hive的元数据存储地址。例如:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Spark Connect Hive") \
.config("spark.master", "local") \
.config("spark.sql.warehouse.dir", "hdfs://<HDFS路径>") \
.config("hive.metastore.uris", "thrift://<Hive元数据存储地址>") \
.enableHiveSupport() \
.getOrCreate()
在这个示例中,我们使用`enableHiveSupport()`来启用Hive支持,并设置了Hive的元数据存储地址。
2. 将Spark处理的数据保存到Hive表中可以使用Spark的DataFrame API或SQL语句来实现。首先,通过Spark从各种数据源(如HDFS、关系型数据库等)读取数据,并转换为DataFrame。然后,使用DataFrame的`write.saveAsTable(<表名>)`方法将数据保存到Hive表中。例如:
```python
# 从HDFS读取数据并转换为DataFrame
df = spark.read.load("hdfs://<HDFS路径>")
# 将DataFrame保存到Hive表中
df.write.saveAsTable("<表名>")
通过上述代码,我们可以将DataFrame保存为Hive表。还可以根据需要使用其他选项,如`mode`来指定保存模式(例如追加、覆盖等),以及`partitionBy`来指定分区列。
通过设置Spark的配置属性,我们可以在Spark中连接Hive。然后,通过使用Spark的DataFrame API或SQL语句,我们可以将Spark处理的数据保存到Hive表中。