Pyspark 数据类型及转换
-
Spark 数据类型
ByteType, 1-byte
ShortType, 2-byte
IntegerType, 4-byte
LongType, 8-byte
FloatType, 4-type
DoubleType, 8-byte
DecimalType, arbitrary sided decimal numbers
StringType
BinaryType
BooleanType
TimetampType
DateType. Year month, day
ArrayType, 数组,可以包含空值
MapType, (keyType, valueType, valueContainsNull)
StructType(fields) , StructFiled(name, dataType, nullable)
2.查看数据类型
df.dtypes
3.数据类型转换
from pyspark.sql.types import IntegerType
(1) df = df.withColumn(‘str_col_int’, df1[‘str_col’].cast(‘int’))
(2) df.select(df(“colA”).cast(“int”)) 等价于 df.select(df(“colA”).cast(IntegerType))
3.字符串转为array
to_array = udf(lambda x: [x], ArrayType(StringType()))
4.拆分数组
有个需求, 将 values列拆开,每个元素形成单独的一列
key | values
1 |[0.2,0.3,0.4]
2 |[0.2,0.3,0.2]
key | value1 | value2 | value3
1 | 0.2 |0.3 | 0.4
2 |0.2 |0.3 | 0.2
方案1
使用以下方式, 在
toArray().tolist()
这一步非常消耗内存,导致后续无法写表保存结果
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
return udf(to_array_, ArrayType(DoubleType()))(col)
def get_feat(df):
df = df.withColumn('embed', to_array(col(‘values’)))\
.select(['col_a’,’values’,’label'] + [col('embed')[i] for i in range(embed_dim)])
for i in range(embed_dim):
df = df.withColumnRenamed('embed['+str(i)+']','embed_'+str(i))
df = df.drop('values')
return df
方案2
将value这一列转成 list形式,然后保存表
df = df.select(df("values").cast(ArrayType))
df.write.saveAsTable(‘db_name.table_name’)
读取的时候,用hive sql 来拆分这一列
fe = spark.sql('''select passenger_phone,
values[0] as f0,
values[1] as f1,
values[2] as f2,
values[3] as f3,
values[4] as f4,
values[5] as f5,
values[6] as f6,
values[7] as f7,
values[8] as f8,
values[9] as f9
from db_name.table_name''')
pyspark 读取csv 文件
df4 = spark.read.option("inferSchema",True) \
.option("delimiter",",") \
.option("header","True")\
.csv("src/main/resources/zipcodes.csv")
Pyspark 数据类型及转换Spark 数据类型ByteType, 1-byteShortType, 2-byteIntegerType, 4-byteLongType, 8-byteFloatType, 4-typeDoubleType, 8-byteDecimalType, arbitrary sided decimal numbersStringTypeBinaryTypeBooleanTypeTimetampTypeDateType. Year month, da
dataframe是pyspark中常见的数据类型,一般从load的sql中读取。有时候输入数据源并非sql,这时如何处理呢?
具体转化示例
list转化为dataframe
先将list转化为 dataframe
import pandas as pd
data_list = [['wer', 1], ['asd', 2]]
panda_df = pd.DataFrame(data_list, columns=['col_name1', 'col_name2'])
# 此处要注意panda和pand
needConversion()
此类型是否需要在Python对象和内部SQL对象之间进行转换。
这用于避免对ArrayType / MapType / StructType进行不必要的转换。
simpleString()
toInternal()
将Python对象转换成SQL
h . dig ( :widht ) #=> ditto
#=> nil
h . fetch ( :widht ) #=> Can detect typo, But too long and cannot know suggestion from did_you_mean gem
# KeyError: key not found: :widht
# Where is from `who` key? Is this ex
IX1083AFZZ是日本夏普公司生产的调频中放及鉴频集成电路,广泛应用于汽车音晌、家用音响及其他各种音响中。
IX1083AFZZ集成电路采用单列9脚封装方式,其弓脚功能及数据见下表所列。
表 IX1083AFZZ集成电路的引脚功能及数据
|-- Id: string (nullable = true)
|-- groupId: string (nullable = true)
|-- matchId: string (nullable = true)
|-- assists: string (nullable = true)
|-- boosts: stri...
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functio...
如果你想将 SQL Server 中的一行单列数据转换为一行多列数据,可以使用 `PIVOT` 操作符。 `PIVOT` 可以将单列的数据转换为多列的数据,同时对每个新列进行聚合操作,如 `SUM`、`AVG`、`MAX` 或 `MIN`。
以下是一个使用 `PIVOT` 的示例查询,假设你有一个名为 `MyTable` 的表,其中包含一个名为 `Value` 的列,你想要将其转换为一行多列:
SELECT [Column1], [Column2], [Column3]
FROM (
SELECT Value, 'Column' + CAST(ROW_NUMBER() OVER (ORDER BY Value) AS VARCHAR(10)) AS ColumnName
FROM MyTable
) AS SourceTable
PIVOT (
MAX(Value)
FOR ColumnName IN ([Column1], [Column2], [Column3])
) AS PivotTable;
这个查询中,我们首先使用 `ROW_NUMBER()` 函数为每个行分配一个唯一的序号,然后将这些序号与字符串 "Column" 连接起来以创建列名。然后我们使用 `PIVOT` 操作符将列名转换为列,使用 `MAX` 函数对每个新列进行聚合操作。
请注意,此示例假定有且只有三行数据需要转换,如果需要转换更多的行数据,则需要相应地增加列数。
另外,需要注意的是,使用 `PIVOT` 操作符需要在查询中使用聚合函数,如果不需要聚合操作,则可以使用 `UNPIVOT` 操作符将多列数据转换为单列数据。
Bruce-XIAO:
时空图卷积神经网络(STGCN):一个用于交通预测的神经网络框架
Hikari_Houmra:
java实现简易计算器
时空图卷积神经网络(STGCN):一个用于交通预测的神经网络框架
陈烛_江说:
java实现简易计算器
沐子辰830: