data_path = "./test_file.json" # 本地 # data_path = "hdfs://..." df = spark.read.json(data_path) df = spark.read.option("multiLine", True).option("mode", "PERMISSIVE").json(data_path)
  • 读 parquet
  • data_path = "hdfs://..."  
    df = spark.read.parquet(data_path)
    

    三、基本操作

    2.1 建立SparkSession对象

    一切操作之前需要先建立一个SparkSession对象(运行Spark code的Entrance point,可以理解为交互部件): 详见: pyspark.sql module

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
    # spark = SparkSession.builder.appName('mu').master('local').getOrCreate()
    
  • 如果遇到如下报错
  • Traceback (most recent call last):
      File "/Users/my_name/caogao/code_test_1/code_test_pyspark.py", line 5, in <module>
        spark = SparkSession.builder.master("local").getOrCreate()
      File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/sql/session.py", line 186, in getOrCreate
        sc = SparkContext.getOrCreate(sparkConf)
      File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 376, in getOrCreate
        SparkContext(conf=conf or SparkConf())
      File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 136, in __init__
        conf, jsc, profiler_cls)
      File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 198, in _do_init
        self._jsc = jsc or self._initialize_context(self._conf._jconf)
      File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/pyspark/context.py", line 315, in _initialize_context
        return self._jvm.JavaSparkContext(jconf)
      File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/java_gateway.py", line 1569, in __call__
        answer, self._gateway_client, None, self._fqn)
      File "/Users/my_name/opt/anaconda3/envs/py3.7/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
    : java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
    

    则在开头添加代码

    import pyspark
    conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1')
    sc = pyspark.SparkContext(master='local', appName='myAppName',conf=conf)
    

    参考:解决方案

    2.2 创建模拟数据表

    test = []
    test.append((1, 'age', '30', 50, 40))
    test.append((1, 'city', 'beijing', 50, 40))
    test.append((1, 'gender', 'fale', 50, 40))
    test.append((1, 'height', '172cm', 50, 40))
    test.append((1, 'weight', '70kg', 50, 40))
    test.append((2, 'age', '26', 100, 80))
    test.append((2, 'city', 'beijing', 100, 80))
    test.append((2, 'gender', 'fale', 100, 80))
    test.append((2, 'height', '170cm', 100, 80))
    test.append((2, 'weight', '65kg', 100, 80))
    test.append((3, 'age', '35', 99, 99))
    test.append((3, 'city', 'nanjing', 99, 99))
    test.append((3, 'gender', 'female', 99, 99))
    test.append((3, 'height', '161cm', 99, 99))
    test.append((3, 'weight', '50kg', 99, 99))
    df = spark.createDataFrame(test,
    						  ['user_id', 'attr_name','attr_value', 'income', 'expenses'])
    
    df = spark.createDataFrame([('1', 'Joe', '70000', '1'), ('2', 'Henry', '80000', None)],
                               ['Id', 'Name', 'Sallary', 'DepartmentId'])
    

    2.3 查

    2.3.1 行元素查询操作

    1. 打印数据

    df.show()默认打印前20条数据,当然可以指定具体打印多少条数据。

    如果有些属性值特别长,pyspark会截断数据导致打不全,这时候可以使用. df.show(truncate=False)

    >>> df.show()
    +-------+---------+----------+------+--------+
    |user_id|attr_name|attr_value|income|expenses|
    +-------+---------+----------+------+--------+
    |      1|      age|        30|    50|      40|
    |      1|     city|   beijing|    50|      40|
    |      1|   gender|      fale|    50|      40|
    |      1|   height|     172cm|    50|      40|
    |      1|   weight|      70kg|    50|      40|
    |      2|      age|        26|   100|      80|
    |      2|     city|   beijing|   100|      80|
    |      2|   gender|      fale|   100|      80|
    |      2|   height|     170cm|   100|      80|
    |      2|   weight|      65kg|   100|      80|
    |      3|      age|        35|    99|      99|
    |      3|     city|   nanjing|    99|      99|
    |      3|   gender|    female|    99|      99|
    |      3|   height|     161cm|    99|      99|
    |      3|   weight|      50kg|    99|      99|
    +-------+---------+----------+------+--------+
    >>> df.show(3)
    +-------+---------+----------+------+--------+
    |user_id|attr_name|attr_value|income|expenses|
    +-------+---------+----------+------+--------+
    |      1|      age|        30|    50|      40|
    |      1|     city|   beijing|    50|      40|
    |      1|   gender|      fale|    50|      40|
    +-------+---------+----------+------+--------+
    only showing top 3 rows
    

    2. 打印概要

    >>> df.printSchema()
     |-- user_id: long (nullable = true)
     |-- attr_name: string (nullable = true)
     |-- attr_value: string (nullable = true)
     |-- income: long (nullable = true)
     |-- expenses: long (nullable = true)
    

    3. 查询总行数

    >>> df.count()
    

    4. 获取头几行到本地

    >>> list = df.head(3) 
    >>> df.head(3)
    [Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40)]
    >>> df.take(5)
    [Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40), Row(user_id=1, attr_name=u'height', attr_value=u'172cm', income=50, expenses=40), Row(user_id=1, attr_name=u'weight', attr_value=u'70kg', income=50, expenses=40)]
    

    5. 查询某列为null的行

    >>> from pyspark.sql.functions import isnull
    >>> df = df.filter(isnull("income"))
    >>> df.show()
    19/02/22 17:05:51 WARN DFSClient: Slow ReadProcessor read fields took 87487ms (threshold=30000ms); ack: seqno: 198 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 17565965 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
    +-------+---------+----------+------+--------+
    |user_id|attr_name|attr_value|income|expenses|
    +-------+---------+----------+------+--------+
    +-------+---------+----------+------+--------+
    

    6. 输出list类型,list中每个元素是Row类:

    >>> df.collect()
    [Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40), Row(user_id=1, attr_name=u'city', attr_value=u'beijing', income=50, expenses=40), Row(user_id=1, attr_name=u'gender', attr_value=u'fale', income=50, expenses=40), Row(user_id=1, attr_name=u'height', attr_value=u'172cm', income=50, expenses=40), Row(user_id=1, attr_name=u'weight', attr_value=u'70kg', income=50, expenses=40), Row(user_id=2, attr_name=u'age', attr_value=u'26', income=100, expenses=80), Row(user_id=2, attr_name=u'city', attr_value=u'beijing', income=100, expenses=80), Row(user_id=2, attr_name=u'gender', attr_value=u'fale', income=100, expenses=80), Row(user_id=2, attr_name=u'height', attr_value=u'170cm', income=100, expenses=80), Row(user_id=2, attr_name=u'weight', attr_value=u'65kg', income=100, expenses=80), Row(user_id=3, attr_name=u'age', attr_value=u'35', income=99, expenses=99), Row(user_id=3, attr_name=u'city', attr_value=u'nanjing', income=99, expenses=99), Row(user_id=3, attr_name=u'gender', attr_value=u'female', income=99, expenses=99), Row(user_id=3, attr_name=u'height', attr_value=u'161cm', income=99, expenses=99), Row(user_id=3, attr_name=u'weight', attr_value=u'50kg', income=99, expenses=99)]
    

    注:此方法将所有数据全部导入到本地,返回一个Array对象。当然,我们可以取出Array中的值,是一个Row,我们也可以取出Row中的值。

    >>> list = df.collect()
    >>> 19/02/22 16:54:04 WARN DFSClient: Slow ReadProcessor read fields took 43005ms (threshold=30000ms); ack: seqno: 179 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 18446744073455908425 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
    >>> list[0]
    Row(user_id=1, attr_name=u'age', attr_value=u'30', income=50, expenses=40)
    >>> list[0][1]
    u'age'
    

    7. 查询概况

    >>> df.describe().show()
    19/02/22 16:58:23 WARN DFSClient: Slow ReadProcessor read fields took 78649ms (threshold=30000ms); ack: seqno: 188 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 187817284 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
    +-------+------------------+---------+------------------+-----------------+------------------+
    |summary|           user_id|attr_name|        attr_value|           income|          expenses|
    +-------+------------------+---------+------------------+-----------------+------------------+
    |  count|                15|       15|                15|               15|                15|
    |   mean|               2.0|     null|30.333333333333332|             83.0|              73.0|
    | stddev|0.8451542547285166|     null| 4.509249752822894|24.15722311383137|25.453037988757707|
    |    min|                 1|      age|             161cm|               50|                40|
    |    max|                 3|   weight|           nanjing|              100|                99|
    +-------+------------------+---------+------------------+-----------------+------------------+
    

    8. 去重set操作

  • distinct() 无法传参
  • >>> df.distinct().show()
    +-------+                                                                       
    |user_id|
    +-------+
    |      1|
    |      3|
    |      2|
    +-------+
    
  • 去重并计数
  • df.groupBy("col1").agg(F.countDistinct("col2")).orderBy("col1", ascending=False).show()
    # 和下面分多次统计,效果相同
    df1 = req_df.filter("col1=1").select("col2").dropDuplicates(subset=["col2"])
    df1.count()
    dfn = req_df.filter("col1=n").select("col2").dropDuplicates(subset=["col2"])
    dfn.count()
    
  • 如果要传参,选择需要去重的列,采用dropDuplicates()
  • A = [("A", 1, "AAA", "AAAAA"), ("A", 2, "AAA", "AAAAA")]
    df = spark.createDataFrame(A,['name','id', "name1", "name2"])
    df.show()
    +----+---+-----+-----+
    |name| id|name1|name2|
    +----+---+-----+-----+
    |   A|  1|  AAA|AAAAA|
    |   A|  2|  AAA|AAAAA|
    +----+---+-----+-----+
    # 直接 df.dropDuplicates() 只有当整行相同时才能去重
    df.dropDuplicates().show()
    +----+---+-----+-----+
    |name| id|name1|name2|
    +----+---+-----+-----+
    |   A|  2|  AAA|AAAAA|
    |   A|  1|  AAA|AAAAA|
    +----+---+-----+-----+
    # 针对某些列去重1
    df.dropDuplicates(subset=["name", "name1", "name2"]).show()
    +----+---+-----+-----+
    |name| id|name1|name2|
    +----+---+-----+-----+
    |   A|  1|  AAA|AAAAA|
    +----+---+-----+-----+
    # 针对某些列去重2
    df.dropDuplicates(subset=[c for c in df.columns if c != "id"]).show()
    +----+---+-----+-----+
    |name| id|name1|name2|
    +----+---+-----+-----+
    |   A|  1|  AAA|AAAAA|
    +----+---+-----+-----+
    

    2.3.2 列元素操作

    1. 选择一列或多列:select 一般来说,selectselectExpr 是一样的,区别可以看 Spark---DataFrame学习(二)——select、selectExpr函数

    df.select("age").show()
    
    df["age"]
    df.age
    df.select(“name”)
    df.select(df[‘name’], df[‘age’]+1)
    df.select(df.a, df.b, df.c)    # 选择a、b、c三列
    df.select(df["a"], df["b"], df["c"])    # 选择a、b、c三列
    

    2. where按条件选择filter 和 where 是一样的) 语法:where(conditionExpr: String) 传入筛选条件表达式,可以用andor。得到DataFrame类型的返回结果 注意:字符串 b 需要加引号

    >>> df.where("id = 1 or c1 = 'b'" ).show()                     
    +-------+---------+----------+------+--------+
    | id    |attr_name|attr_value|income|   c1   |
    +-------+---------+----------+------+--------+
    |      1|      age|        30|    50|      c|
    |      2|     city|   beijing|    50|      b|
    |      2|   gender|      fale|    50|      b|
    |      3|   height|     172cm|    50|      b|
    |      4|   weight|      70kg|    50|      b|
    +-------+---------+----------+------+--------+
    

    3. filter 根据字段选择filter 和 where 是一样的

    注意:filter 有好几种用法,推荐第一种

    df.filter("id = 1 or c1 = 'b'" ).show()
    
    df.filter((df.id =="1") & (df.c1=="b"))
    df.filter((df.id =="1") | (df.c1=="b"))
    
    df.filter('id=="1"').filter('c1=="b"')
    
    df.filter("id == 1 or c1 == 'b'")
    
  • 对于 bool 型字段
  • A = [('Pirate',True),('Monkey',False), ('Ninja',True),('Dodo',False), ('Spa',False)]
    df = spark.createDataFrame(A,['name','is_boy'])
    df.show()
    +------+------+
    |  name|is_boy|
    +------+------+
    |Pirate|  true|
    |Monkey| false|
    | Ninja|  true|
    |  Dodo| false|
    |   Spa| false|
    +------+------+
    # 大写 True 可以
    df.filter("is_boy=True").show()
    +------+------+
    |  name|is_boy|
    +------+------+
    |Pirate|  true|
    | Ninja|  true|
    +------+------+
    # 小写 true 也可以
    df.filter("is_boy=true").show()
    +------+------+
    |  name|is_boy|
    +------+------+
    |Pirate|  true|
    | Ninja|  true|
    +------+------+
    # 下面这种写法也可以(默认=True)
    df.filter("is_boy").show()
    +------+------+
    |  name|is_boy|
    +------+------+
    |Pirate|  true|
    | Ninja|  true|
    +------+------+
    df.filter("is_boy=False").show()
    +------+------+
    |  name|is_boy|
    +------+------+
    |Monkey| false|
    |  Dodo| false|
    |   Spa| false|
    +------+------+
    
  • 对于 Null 类型
  • 可以有2种用法

    import pyspark.sql.functions as F
    df_.show()
    +----+-----+
    |name|value|
    +----+-----+
    |   a| null|
    |   b|    2|
    |   c| null|
    +----+-----+
    df_.filter("value is null").show()
    df_.filter(F.col("value").isNull()).show()
    +----+-----+
    |name|value|
    +----+-----+
    |   a| null|
    |   c| null|
    +----+-----+
    df_.filter("value is not null").show()
    df_.filter(F.col("value").isNotNull()).show()
    +----+-----+
    |name|value|
    +----+-----+
    |   b|    2|
    +----+-----+
    
  • 对于空字符串(非 null)
  • df_.show()
    +----+-----+
    |name|value|
    +----+-----+
    |   a|     |
    |   b|    2|
    |   c|     |
    +----+-----+
    df_.filter("value=''").show()  # 空字符串
    +----+-----+
    |name|value|
    +----+-----+
    |   a|     |
    |   c|     |
    +----+-----+
    

    2.3.3 排序

    1. orderBy:按指定字段排序,默认为升序

    >>> df.orderBy(df.income.desc()).show()         
    19/02/22 18:02:31 WARN DFSClient: Slow ReadProcessor read fields took 87360ms (threshold=30000ms); ack: seqno: 325 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 14139744 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[172.21.3.38:50010,DS-82aedc87-a850-40aa-9d04-dc62ab0988ef,DISK], DatanodeInfoWithStorage[172.21.80.165:50010,DS-305daec5-3c77-48cd-bee2-4f839aea8bb4,DISK], DatanodeInfoWithStorage[172.21.151.40:50010,DS-29ba84d5-ad7d-407f-9484-d85aa3f0a736,DISK]]
    +-------+---------+----------+------+--------+
    |user_id|attr_name|attr_value|income|expenses|
    +-------+---------+----------+------+--------+
    |      2|   gender|      fale|   100|      80|
    |      2|   weight|      65kg|   100|      80|
    |      2|   height|     170cm|   100|      80|
    |      2|      age|        26|   100|      80|
    |      2|     city|   beijing|   100|      80|
    |      3|   gender|    female|    99|      99|
    |      3|      age|        35|    99|      99|
    |      3|   height|     161cm|    99|      99|
    |      3|   weight|      50kg|    99|      99|
    |      3|     city|   nanjing|    99|      99|
    |      1|      age|        30|    50|      40|
    |      1|   height|     172cm|    50|      40|
    |      1|     city|   beijing|    50|      40|
    |      1|   weight|      70kg|    50|      40|
    |      1|   gender|      fale|    50|      40|
    +-------+---------+----------+------+--------+
    

    2.3.4 抽样

    sample是抽样函数,其中withReplacement = True or False代表是否有放回。42是seed。

    t1 = train.sample(False, 0.2, 42)
    

    2.4 增加、删除、修改列

  • 增加列用 withColumn 方法 增加一列value全为0的列
  • from pyspark.sql.functions import lit
    df.withColumn('newCol', lit(0)).show()
    ## 输出
    +---+-----+-------+------------+------+
    | Id| Name|Sallary|DepartmentId|newCol|
    +---+-----+-------+------------+------+
    |  1|  Joe|  70000|           1|     0|
    |  2|Henry|  80000|        null|     0|
    +---+-----+-------+------------+------+
    
  • 重命名列名 pyspark系列--dataframe基础
  • # spark-1
    # 在创建dataframe的时候重命名
    data = spark.createDataFrame(data=[("Alberto", 2), ("Dakota", 2)],
                                  schema=['name','length'])
    data.show()
    data.printSchema()
    # spark-2
    # 使用selectExpr方法
    # 原始column as 修改之后的column
    # cast 是修改整列的属性
    color_df2 = color_df.selectExpr('cast(color as long) as color2','length as length2')
    color_df2.show()
    # spark-3
    # withColumnRenamed方法
    color_df2 = color_df.withColumnRenamed('color','color2')\
                        .withColumnRenamed('length','length2')
    color_df2.show()
    # spark-4
    # alias 方法
    color_df.select(color_df.color.alias('color2')).show()
    

    2.5 groupBy 分组统计

    In [63]: df.groupby('Sallary').count().show()
    +-------------+-----+                                                           
    |app_category2|count|
    +-------------+-----+
    |         null|  231|
    |           77|  215|
    |           81|  378|
    |           84|   14|
    +-------------+-----+
    

    注意!正确参数是 ascending,如果误拼写成 ascending,不会报错,但是不能正确排序,要注意!!!

  • 从小到大排序:ascending=True
  • 从大到小排序:ascending=False
  • valuesA = [('Pirate','boy',1),('Monkey','girl',2),('Monkey','boy',3),('Ninja','girl',3),('Spa','boy',4), ('Spa','boy',5), ('Spa','girl',7)]
    df = spark.createDataFrame(valuesA,['name','sex','value'])
    In [8]: df.show()
    +------+----+-----+
    |  name| sex|value|
    +------+----+-----+
    |Pirate| boy|    1|
    |Monkey|girl|    2|
    |Monkey| boy|    3|
    | Ninja|girl|    3|
    |   Spa| boy|    4|
    |   Spa| boy|    5|
    |   Spa|girl|    7|
    +------+----+-----+
    # 从大到小排序
    df.groupBy("name", "sex").count().orderBy("count", ascending=False).show()
    +------+----+-----+
    |  name| sex|count|
    +------+----+-----+
    |   Spa| boy|    2|
    |Monkey| boy|    1|
    |   Spa|girl|    1|
    |Monkey|girl|    1|
    |Pirate| boy|    1|
    | Ninja|girl|    1|
    +------+----+-----+
    
  • collect_set 和 collect_list
  • from pyspark.sql import functions as F
    df.show()
    +---+-----+----+
    | id|value|name|
    +---+-----+----+
    |  a| null| Leo|
    |  a|   11|null|
    |  a|   11|Mike|
    |  a|   22| Leo|
    +---+-----+----+
    # collect_list 汇总到列表中;collect_set 汇总到列表中,再去重
    df.groupBy("id").agg(F.sum("value").alias("value_sum"), F.collect_set("value").alias("value_collect_set"), F.collect_list("name").alias("name_collect_list")).show()
    +---+---------+-----------------+-----------------+
    | id|value_sum|value_collect_set|name_collect_list|
    +---+---------+-----------------+-----------------+
    |  a|       44|         [22, 11]| [Leo, Mike, Leo]|
    +---+---------+-----------------+-----------------+
    
  • 关于 pk_key
  • 当pk_key为列表时,可以用星号 *pk_key 来取出pk_key的值(似乎不用星号也行,还没看到不用星号会失败的情况)

    详见下面关于 python 中单星号的用法(解压参数列表)

    df1.show()
    +----+-----+
    |name|value|
    +----+-----+
    |   a|    1|
    |   a|    2|
    |   a|    2|
    +----+-----+
    df1.groupBy(*pk).agg(F.sum("value")).show()
    +----+-----+----------+
    |name|value|sum(value)|
    +----+-----+----------+
    |   a|    1|         1|
    |   a|    2|         4|
    +----+-----+----------+
    # 在这里,pk 带不带星号,没有区别
    df1.groupBy(pk).agg(F.sum("value")).show()
    +----+-----+----------+
    |name|value|sum(value)|
    +----+-----+----------+
    |   a|    1|         1|
    |   a|    2|         4|
    +----+-----+----------+
    
  • 单个星号 * 参考 python 中单* 和双 ** 的用法
  • 单星号的其中一个用法是解压参数列表

    def func(a, b):
    	print a, b
    param = [1, 2]
    func(*param)
    

    2.6 join 操作

  • 如果想要保证左表不丢失数据,则需要用 left join,否则,用普通 join 即可
  • 在使用 left join 的时候,左表比右表大,join 不上的部分,会以 null 显示,需要手动把这些null替换为其他值,便于计算,比如替换为0

    # 这个参数使用的场合为:假如某个字段默认是null,你想其返回的不是null,而是比如0或其他值,可以使用这个函数 
    df = df.join(df1,'t_id','left').withColumn('is_name',F.coalesce('my_col',F.lit(0))).drop('my_col')
    # 其实就是把 my_col 列中为 NULL 的替换为 0
    
  • join 两个表,二者有一个共同列 “ad_id”
  • 想知道第一个表的 ad_id 对应的 ocpc_type,所以需要到表2去找
  • df = spark.createDataFrame([('1', 'Joe'), ('4', 'Henry'), ('1', 'Nan'), ('4', 'Hesssnry')], ['ad_id', 'Name'])
    df2 = spark.createDataFrame([('1', 'A'), ('4', 'B'), ('5', 'C')], ['ad_id', 'ocpc_type'])
    df3 = df2.join(df, on='ad_id', how='left')
    df3.show()
    +-----+---------+--------+
    |ad_id|ocpc_type|    Name|
    +-----+---------+--------+
    |    5|        C|    null|
    |    1|        A|     Joe|
    |    1|        A|     Nan|
    |    4|        B|   Henry|
    |    4|        B|Hesssnry|
    +-----+---------+--------+
    df3.filter('ocpc_type == "A"').show()
    +-----+---------+----+
    |ad_id|ocpc_type|Name|
    +-----+---------+----+
    |    1|        A| Joe|
    |    1|        A| Nan|
    +-----+---------+----+
    # 如果变换下join到顺序
    df3 = df.join(df2, on='ad_id', how='left')
    df3.show()
    +-----+--------+---------+
    |ad_id|    Name|ocpc_type|
    +-----+--------+---------+
    |    1|     Joe|        A|
    |    1|     Nan|        A|
    |    4|   Henry|        B|
    |    4|Hesssnry|        B|
    +-----+--------+---------+
    

    可以理解为,哪个表在join操作的前面,就以其为主,后面的为补充

    left_semi 取 df1 和 df2 相交的部分,df1的数据

    left_anti 取 df1 和 df2 相交的部分,df1的余下数据

    df1.show()
    +---+---+
    | id|num|
    +---+---+
    |  A|  1|
    |  B|  2|
    |  C|  3|
    +---+---+
    df2.show()
    +---+---+
    | id|num|
    +---+---+
    |  C| 33|
    |  D|  4|
    |  E|  5|
    +---+---+
    # 1. 普通的 left join,右表中 join 不上的会以 null 填充
    df1.join(df2, "id", "left").show()
    +---+---+----+
    | id|num| num|
    +---+---+----+
    |  B|  2|null|
    |  C|  3|  33|
    |  A|  1|null|
    +---+---+----+
    # 2. 取 df1 和 df2 相交的部分,df1的数据(注意到 num 的取值为3,而不是33)
    df1.join(df2, "id", "left_semi").show()
    +---+---+
    | id|num|
    +---+---+
    |  C|  3|
    +---+---+
    # 3. 取 df1 和 df2 相交的部分,df1的余下数据
    df1.join(df2, "id", "left_anti").show()
    +---+---+
    | id|num|
    +---+---+
    |  B|  2|
    |  A|  1|
    +---+---+
    

    当某个表 join 时,如果 join 的 pk_key 有重复的话,会出现组合爆炸的情况,需要保证 join 双方都没有重复的 pk_key

    valuesA = [('Pirate',1),('Monkey',2),('Monkey',3),('Ninja',3),('Spaghetti',4)]
    TableA = spark.createDataFrame(valuesA,['name','id'])
    valuesB = [('Rutabaga',11) ,('Monkey',22) ,('Monkey',222),('Ninja',33),('Darth Vader',44)]
    TableB = spark.createDataFrame(valuesB,['name','id2'])
    TableA.join(TableB,on='name').show(50,False)
    +------+---+---+
    |name  |id |id2|
    +------+---+---+
    |Ninja |3  |33 |
    |Monkey|2  |222|
    |Monkey|2  |22 |
    |Monkey|3  |222|
    |Monkey|3  |22 |
    +------+---+---+
    # left join 保证了左表的数据不丢失,join 不上的,右表会以 null 填充
    TableA.join(TableB,on='name',how='left').show(50,False)
    +---------+---+----+
    |name     |id |id2 |
    +---------+---+----+
    |Spaghetti|4  |null|
    |Ninja    |3  |33  |
    |Pirate   |1  |null|
    |Monkey   |2  |22  |
    |Monkey   |2  |222 |
    |Monkey   |3  |22  |
    |Monkey   |3  |222 |
    +---------+---+----+
    # 由于 tableA 和 tableB 中虽然有重复的 pk_key,但是值是不一样的,没法去重
    TableA.dropDuplicates().join(TableB.dropDuplicates(),on='name').show(50,False)
    +------+---+---+
    |name  |id |id2|
    +------+---+---+
    |Ninja |3  |33 |
    |Monkey|3  |22 |
    |Monkey|3  |222|
    |Monkey|2  |22 |
    |Monkey|2  |222|
    +------+---+---+
    
    In [25]: valuesC = [('Pirate',1),('Monkey',222),('Monkey',111),('Ninja',3),('Spaghetti',4)]
    In [26]: TableC = spark.createDataFrame(valuesC,['name','id'])
    In [27]: TableC.show()
    +---------+---+
    |     name| id|
    +---------+---+
    |   Pirate|  1|
    |   Monkey|222|
    |   Monkey|111|
    |    Ninja|  3|
    |Spaghetti|  4|
    +---------+---+
    In [28]: TableC.dropDuplicates().show()
    +---------+---+
    |     name| id|
    +---------+---+
    |   Pirate|  1|
    |    Ninja|  3|
    |   Monkey|111|
    |   Monkey|222|
    |Spaghetti|  4|
    +---------+---+
    ----------------------------------------------------------------------------------------------
    In [23]: valuesC = [('Pirate',1),('Monkey',222),('Monkey',222),('Ninja',3),('Spaghetti',4)]
    In [24]: TableC = spark.createDataFrame(valuesC,['name','id'])
    In [25]: TableC.show()
    +---------+---+
    |     name| id|
    +---------+---+
    |   Pirate|  1|
    |   Monkey|222|
    |   Monkey|222|
    |    Ninja|  3|
    |Spaghetti|  4|
    +---------+---+
    # 去重, join 之前必须保证 join 两者表中去重过
    In [26]: TableC.dropDuplicates().show()
    +---------+---+
    |     name| id|
    +---------+---+
    |   Pirate|  1|
    |    Ninja|  3|
    |   Monkey|222|
    |Spaghetti|  4|
    +---------+---+
    

    注意,如果列名重复,join 之后会出现重复列

    df1 = spark.createDataFrame([("A", 1), ("B", 2)], ["name", "num"])
    df1.show()
    +----+---+
    |name|num|
    +----+---+
    |   A|  1|
    |   B|  2|
    +----+---+
    df2 = spark.createDataFrame([("A", 1), ("B", 2), ("C", 3)], ["name", "num"])
    df2.show()
    +----+---+
    |name|num|
    +----+---+
    |   A|  1|
    |   B|  2|
    |   C|  3|
    +----+---+
    df3 = df1.join(df2, "name")
    df3.show()
    +----+---+---+
    |name|num|num|
    +----+---+---+
    |   B|  2|  2|
    |   A|  1|  1|
    +----+---+---+
    

    2.5 复杂用法实例

    from pyspark.sql import functions as F
    
  • coalesce(与 mysql 类似) 作用是将返回传入的参数中第一个非null的值,比如
  • mysql

    SELECT COALESCE(NULL, NULL, 1); 
    Return 1 
    # 如果传入的参数所有都是null,则返回null,比如 
    SELECT COALESCE(NULL, NULL, NULL, NULL); 
    Return NULL 
    # 参数说明:如果a==null,则选择b;如果b==null,则选择c;如果a!=null,则选择a;如果a b c 都为null ,则返回为null(没意义)
    select coalesce(a,b,c);
    

    Spark

    # 这个参数使用的场合为:假如某个字段默认是null,你想其返回的不是null,而是比如0或其他值,可以使用这个函数 df = df.join(df1,'t_id','left').withColumn('is_name',F.coalesce('my_col',F.lit(0))).drop('my_col') # 其实就是把 my_col 列中为 NULL 的替换为 0
  • 时间戳转时间 注意! 这里用到的是 spark SQL 的语法,而不是python的语法 ,参考 Spark SQL
  • valuesA = [('Pirate',1609785094),('Monkey',1609785094),('Monkey',1609785094),('Ninja',1609785094),('Spaghetti',0)]
    TableA = spark.createDataFrame(valuesA,['name','time'])
    new_time = F.expr("FROM_UNIXTIME(`time`, 'yyyy-MM-dd')")
    # print new_time 看看
    df2 = df.where(new_time == "2021-01-01")
    df2.show()
    

    如果是 python 的话,则用下面的语法

    #coding:UTF-8
    import time
    dt = "2016-05-05 20:28:54"
    #转换成时间数组
    timeArray = time.strptime(dt, "%Y-%m-%d %H:%M:%S")
    #转换成新的时间格式(20160505-20:28:54)
    dt_new = time.strftime("%Y%m%d-%H:%M:%S",timeArray)
    print dt_new
    

    2.7 判断两个 dataframe 是否相同

    参考:Spark sql实战--如何比较两个dataframe是否相等

    a = [('Pirate',1),('Monkey',2)]
    A = spark.createDataFrame(a,['name','id'])
    In [3]: A.show()
    +------+---+
    |  name| id|
    +------+---+
    |Pirate|  1|
    |Monkey|  2|
    +------+---+
    b = [('Monkey',2),('Pirate',1)]
    B = spark.createDataFrame(b,['name','id'])
    In [6]: B.show()
    +------+---+
    |  name| id|
    +------+---+
    |Monkey|  2|
    |Pirate|  1|
    +------+---+
    def match_df(df1, df2):
        count1 = len(df1.subtract(df2).take(1))
        count2 = len(df2.subtract(df1).take(1))
        return True if count1 == count2 and count1 == 0 else False
    print match_df(A, B)
    

    2.8 交集&并集&合集

    1. 交集&并集&合集

  • 差集 except
  • # df1不在df2中的部分,可以理解为 df1-(df1和df2的交集)
    df1.subtract(df2)
    In [31]: df1.show()
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    +-----+
    In [32]: df2.show()
    +-----+
    |value|
    +-----+
    |    2|
    |    3|
    |    4|
    +-----+
    In [33]: df1.subtract(df2).show()
    +-----+
    |value|
    +-----+
    |    1|
    +-----+
    
    df1.intersect(df2)
    
    df1.union(df2)
    # 并去重
    df1.union(df2).distinct()
    
  • 关于union的坑 union的2张表,必须保证字段完全相同,且字段的顺序完全相同!函数本身不会按照字段来union,只会机械得进行2表的拼接
  • df1 = spark.createDataFrame([("A", 1, 0), ("B", 1, 0)], ["id", "is_girl", "is_boy"])
    +---+-------+------+
    | id|is_girl|is_boy|
    +---+-------+------+
    |  A|      1|     0|
    |  B|      1|     0|
    +---+-------+------+
    df2 = spark.createDataFrame([("C", 1, 0), ("D", 1, 0)], ["id", "is_boy", "is_girl"])
    +---+------+-------+
    | id|is_boy|is_girl|
    +---+------+-------+
    |  C|     1|      0|
    |  D|     1|      0|
    +---+------+-------+
    # 直接union的话,由于字段顺序不同,只会机械得将2张表组合在一起,并不会自动调换字段的顺序
    # 这样拼接是错误的!!!
    df1.union(df2).show()
    +---+-------+------+
    | id|is_girl|is_boy|
    +---+-------+------+
    |  A|      1|     0|
    |  B|      1|     0|
    |  C|      1|     0|
    |  D|      1|     0|
    +---+-------+------+
    # 需要手动修改字段顺序,保证字段顺序一致
    df1.selectExpr("id", "is_girl", "is_boy").union(df2.selectExpr("id", "is_girl", "is_boy")).show()
    +---+-------+------+
    | id|is_girl|is_boy|
    +---+-------+------+
    |  A|      1|     0|
    |  B|      1|     0|
    |  C|      0|     1|
    |  D|      0|     1|
    +---+-------+------+
    

    2. join和交集的区别

    df1 = spark.createDataFrame([("A", 1), ("A", 11), ("B", 2), ("B", 3)], ["name", "num"]).select("name")
    df2 = spark.createDataFrame([("A", 1), ("B", 2), ("B", 3)], ["name", "num"]).select("name")
    df1.show()
    +----+
    |name|
    +----+
    |   A|
    |   A|
    |   B|
    |   B|
    +----+
    df2.show()
    +----+
    |name|
    +----+
    |   A|
    |   B|
    |   B|
    +----+
    # intersect 自带左右两端去重
    In [28]: df1.intersect(df2).show()
    +----+
    |name|
    +----+
    |   B|
    |   A|
    +----+
    # 如果有重复,join会导致重复更严重
    In [29]: df1.join(df2, "name").show()
    +----+
    |name|
    +----+
    |   B|
    |   B|
    |   B|
    |   B|
    |   A|
    |   A|
    +----+
    # 手动两端去重(和 intersect 效果一样了)
    In [30]: df1.dropDuplicates().join(df2.dropDuplicates(), "name").show()
    +----+
    |name|
    +----+
    |   B|
    |   A|
    +----+
    

    2.9 计算某列的均值 & 求和

    valuesA = [('Pirate',1),('Monkey',2),('Monkey',3),('Ninja',3),('Spaghetti',4)]
    A = spark.createDataFrame(valuesA,['name','id'])
    ########## 法一 ############
    A.agg({'id': 'avg'}).show()
    +-------+
    |avg(id)|
    +-------+
    |    2.6|
    +-------+
    A.agg({'id': 'sum'}).show()
    +-------+
    |sum(id)|
    +-------+
    |     13|
    +-------+
    ############ 法二 ############
    from pyspark.sql import functions as F
    A.agg(F.avg('id').alias('id_avg')).show()
    +------+
    |id_avg|
    +------+
    |   2.6|
    +------+
    A.agg(F.sum('id').alias('id_sum')).show()
    +------+
    |id_sum|
    +------+
    |    13|
    +------+
    
  • 例子:求列中各个元素的占比
  • collect 可以取出列中的元素值

    import pyspark.sql.functions as F
    A = [[1,'CAT1',10], [2, 'CAT2', 20], [3, 'CAT3', 70]]
    df = spark.createDataFrame(A, ['id', 'cate', 'value'])
    df.show()
    +---+----+-----+
    | id|cate|value|
    +---+----+-----+
    |  1|CAT1|   10|
    |  2|CAT2|   20|
    |  3|CAT3|   70|
    +---+----+-----+
    # 求列和 法一
    df.agg(F.sum("value")).show()
    +----------+
    |sum(value)|
    +----------+
    |       100|
    +----------+
    # 求列和 法二
    df.groupBy("cate").sum("value").show()
    +----+----------+
    |cate|sum(value)|
    +----+----------+
    |CAT2|        20|
    |CAT1|        10|
    |CAT3|        70|
    +----+----------+
    # 求列和 法三
    df.groupBy("value").sum().collect()
    Out[36]:
    [Row(value=10, sum(id)=1, sum(value)=10),
     Row(value=20, sum(id)=2, sum(value)=20),
     Row(value=70, sum(id)=3, sum(value)=70)]
     df.groupBy("value").sum().collect()[0][1]
     Out[37]: 1
     df.groupBy("value").sum().collect()[0][2]
    Out[38]: 10
    # 求列和 法四
    df.agg({"value":"sum"}).collect()
    Out[39]: Row(sum(value)=100)
    df.agg({"value":"sum"}).collect()[0][0]
    Out[41]: 100
    # 求列和 法五(推荐)
    df.agg(F.sum("value")).collect()[0][0]
    Out[47]: 100
    

    开始求占比

    # 获取列求和值
    value_sum = df.agg(F.sum("value")).collect()[0][0]
    # 新增一列
    df2 = df.withColumn("sum", F.lit(value_sum))
    df2.show()
    +---+----+-----+---+
    | id|cate|value|sum|
    +---+----+-----+---+
    |  1|CAT1|   10|100|
    |  2|CAT2|   20|100|
    |  3|CAT3|   70|100|
    +---+----+-----+---+
    df2 = df2.withColumn("ratio", F.round(F.col("value") / F.col("sum"), 3))
    df2.show()
    +---+----+-----+---+-----+
    | id|cate|value|sum|ratio|
    +---+----+-----+---+-----+
    |  1|CAT1|   10|100|  0.1|
    |  2|CAT2|   20|100|  0.2|
    |  3|CAT3|   70|100|  0.7|
    +---+----+-----+---+-----+
    

    四、复杂操作

    4.1 concat_ws 重组列

  • concat_ws
  • import pyspark.sql.functions as F
    df1.show()
    +----+-----+
    |name|value|
    +----+-----+
    |   a|    1|
    |   b|    2|
    +----+-----+
    # 将两列通过下划线 “_”,进行合并
    df1.select(F.concat_ws("_", F.col("name"), F.col("value").alias("name_value")), "name").show()
    +-----------------------------------------+----+
    |concat_ws(_, name, value AS `name_value`)|name|
    +-----------------------------------------+----+
    |                                      a_1|   a|
    |                                      b_2|   b|
    +-----------------------------------------+----+
    

    4.2 udf 复杂自定义函数

    参考:【Pyspark】UDF函数的使用、UDF传入多个参数、UDF传出多个参数、传入特殊数据类型

    from pyspark.sql.types import ArrayType, IntegerType
    from pyspark.sql import functions as F
    A = [("a", [1,2,3], [10, 20, 30]), ("b", [4, 5, 6], [100, 200, 300])]
    df1 = spark.createDataFrame(A, ["name", "value1", "value2"])
    df1.show()
    +----+---------+---------------+
    |name|   value1|         value2|
    +----+---------+---------------+
    |   a|[1, 2, 3]|   [10, 20, 30]|
    |   b|[4, 5, 6]|[100, 200, 300]|
    +----+---------+---------------+
    # 自定义函数
    def func(list1, list2):
    	list1 和 list2 分别是表的两个列名
    	list3 = []
    	for i, j in zip(list1, list2):
    		list3.append(i * j)
    	return list3
    # udf 需要指定函数的输出类型,这里是整数列表
    func_udf = F.udf(func, ArrayType(IntegerType()))
    df2 = df1.withColumn("new_col", func_udf("value1", "value2"))
    df2.show()
    +----+---------+---------------+-----------------+
    |name|   value1|         value2|          new_col|
    +----+---------+---------------+-----------------+
    |   a|[1, 2, 3]|   [10, 20, 30]|     [10, 40, 90]|
    |   b|[4, 5, 6]|[100, 200, 300]|[400, 1000, 1800]|
    +----+---------+---------------+-----------------+
    

    4.3 window 分组排序

  • Spark Window 入门介绍
  • Spark Window Functions-PySpark(窗口函数)
  • 需求是,先对表中数据分组,再在组内进行排序

    找出每个科目中,排名第一的学生
    from pyspark.sql import Window
    from pyspark.sql import functions as F
    df = spark.createDataFrame((
    ["A", 1, "Science", 20],
    ["B", 1, "Science", 80],
    ["C", 2, "Science", 90],
    ["D", 2, "Science", 40],
    ["E", 3, "Science", 60],
    ["F", 4, "Art", 60],
    ["G", 4, "Art", 50],
    ["H", 5, "Art", 90],
    ["I", 5, "Art", 100],
    ["J", 6, "Art", 20],
    ), ["name", "class", "subject", "score"])
    # 按照 subject 分组,而后按照 score 从大到小排序
    window = Window.partitionBy("subject").orderBy(F.desc("score"))
    df = df.withColumn("rank", F.row_number().over(window))
    +----+-----+-------+-----+----+
    |name|class|subject|score|rank|
    +----+-----+-------+-----+----+
    |   C|    2|Science|   90|   1|
    |   B|    1|Science|   80|   2|
    |   E|    3|Science|   60|   3|
    |   D|    2|Science|   40|   4|
    |   A|    1|Science|   20|   5|
    |   I|    5|    Art|  100|   1|
    |   H|    5|    Art|   90|   2|
    |   F|    4|    Art|   60|   3|
    |   G|    4|    Art|   50|   4|
    |   J|    6|    Art|   20|   5|
    +----+-----+-------+-----+----+
    # 过滤出每组的第一名
    df.filter("rank=1").show()
    +----+-----+-------+-----+----+
    |name|class|subject|score|rank|
    +----+-----+-------+-----+----+
    |   C|    2|Science|   90|   1|
    |   I|    5|    Art|  100|   1|
    +----+-----+-------+-----+----+
    

    4.4 中文乱码问题

    u"中文" 即可

    In [61]: A = [("a", "快手"), ("b", "抖音")]
    In [62]: df__ = spark.createDataFrame(A, ["id", "name"])
    In [63]: df__.show()
    +---+------+
    | id|  name|
    +---+------+
    |  a|快手|
    |  b|抖音|
    +---+------+
    In [58]: A = [("a", u"快手"), ("b", u"抖音")]
    In [59]: df__ = spark.createDataFrame(A, ["id", "name"])
    In [60]: df__.show()
    +---+----+
    | id|name|
    +---+----+
    |  a|  快手|
    |  b|  抖音|
    +---+----+
    

    4.5 split 拆分

    from pyspark.sql.functions import split
    from pyspark.sql import functions as F
    A = [("A", "20%"), ("B", "18%")]
    df_ = spark.createDataFrame(A, ["name", "ratio1"])
    df2_ = df_.withColumn('ratio1_new', split(F.col("ratio1"), "%").getItem(0) * F.lit(0.01))
    df2_.show()
    +----+------+----------+
    |name|ratio1|ratio1_new|
    +----+------+----------+
    |   A|   20%|       0.2|
    |   B|   18%|      0.18|
    +----+------+----------+
    

    五、值替换

    5.1 空值替换

    法一、使用 fillna 函数

    A = [("a", 1, None), ("b", None, 2), ("c", None, None)]
    df_ = spark.createDataFrame(A, ["name", "value1", "value2"])
    df_.show()
    +----+------+------+
    |name|value1|value2|
    +----+------+------+
    |   a|     1|  null|
    |   b|  null|     2|
    |   c|  null|  null|
    +----+------+------+
    df_.fillna({"value1": 0.0, "value2": 11.0}).show()
    +----+------+------+
    |name|value1|value2|
    +----+------+------+
    |   a|     1|    11|
    |   b|     0|     2|
    |   c|     0|    11|
    +----+------+------+
    

    法二、when...otherwise 替换

    这个方法可以进行复杂的值替换

    from pyspark.sql import functions as F
    A = [("a", 1, None), ("b", None, 2), ("c", None, None)]
    df_ = spark.createDataFrame(A, ["name", "value1", "value2"])
    df_.show()
    +----+------+------+
    |name|value1|value2|
    +----+------+------+
    |   a|     1|  null|
    |   b|  null|     2|
    |   c|  null|  null|
    +----+------+------+
    # 注意!这个方法如果没有显示指定的值会变成null,所以每一类情况都得考虑
    df_.withColumn("value3", F.when(F.col("value1")<10, F.lit(10)).otherwise(F.lit(-10))).show()
    +----+------+------+------+
    |name|value1|value2|value3|
    +----+------+------+------+
    |   a|     1|  null|    10|
    |   b|  null|     2|   -10|
    |   c|  null|  null|   -10|
    +----+------+------+------+
    # withColumn出来的新列如果和原先存在的列同名的话会自动覆盖
    df_.withColumn("value1",F.when(F.col("value1").isNull(),F.lit(0.0)).otherwise(F.lit(F.col("value1"))))\
    .withColumn("value2", F.when(F.col("value2").isNull(),F.lit(11)).otherwise(F.lit(F.col("value2")))).show()
    +----+------+------+
    |name|value1|value2|
    +----+------+------+
    |   a|   1.0|    11|
    |   b|   0.0|     2|
    |   c|   0.0|    11|
    +----+------+------+
    # 对现有列的值域进行复杂分类 (1)
    group1 = ["a"]
    group2 = ["b"]
    df = df.withColumn("group", F.when(F.col("name").isin(group1), F.lit("goup_1")).when(F.col("name").isin(group2), F.lit("goup_2")).otherwise(F.lit("group_other")))
    +----+------+------+-----------+
    |name|value1|value2|      group|
    +----+------+------+-----------+
    |   a|     1|  null|     goup_1|
    |   b|  null|     2|     goup_2|
    |   c|  null|  null|group_other|
    +----+------+------+-----------+
    # 对现有列的值域进行复杂分类 (2)
    # 注意等于号是双等于 “==”
    df = df.withColumn("value", F.when(F.col("value1")==1.0, F.lit("value_is_1")).otherwise(F.lit("value_is_other")))
    +----+------+------+--------------+
    |name|value1|value2|         value|
    +----+------+------+--------------+
    |   a|     1|  null|    value_is_1|
    |   b|  null|     2|value_is_other|
    |   c|  null|  null|value_is_other|
    +----+------+------+--------------+
    
  • bool 值的情况
  • 使用双等于号 "=="

    df.show()
    +----+-----+
    |name|value|
    +----+-----+
    |   a| true|
    |   b|false|
    |   c| true|
    +----+-----+
    df.withColumn("value_new", F.when(F.col("value")==True, F.lit(1)).otherwise(F.lit(0))).show()
    +----+-----+---------+
    |name|value|value_new|
    +----+-----+---------+
    |   a| true|        1|
    |   b|false|        0|
    |   c| true|        1|
    +----+-----+---------+
    
  • pyspark.sql api 文档
  • Spark-SQL之DataFrame操作大全
  • Spark 2.2.x 中文文档
  • Pyspark数据基础操作集合(DataFrame)
  • PySpark-DataFrame各种常用操作举例
  • (超详细)PySpark︱DataFrame操作指南:增/删/改/查/合并/统计与数据处理
  • pyspark.sql module
  • Spark---DataFrame学习(二)——select、selectExpr函数
  • Spark
    私信