pyspark: sql.functions以及udf函数

大纲

  • 选取列 select
  • 常数列 lit
  • 条件分支 when otherwise
  • 窗口函数 row_number
  • 自定义函数 udf
  • split & exploda
  • 本文主要是列举一些pyspark中类似于sql的相关函数,以及如何自定义函数。首先,创建一个dataframe。以下都是在pyspark的交互界面下执行,版本为2.1.1

    from pyspark.sql import Row
    from pyspark.sql import functions as sf
    rdd = sc.parallelize([Row(name='Alice', level='a', age=5, height=80),Row(name='Bob', level='a', age=5, height=80),Row(name='Cycy', level='b', age=10, height=80),Row(name='Didi', level='b', age=12, height=75),Row(name='EiEi', level='b', age=10, height=70)])
    df = rdd.toDF()
    print df.show()
    +---+------+-----+-----+
    |age|height|level| name|
    +---+------+-----+-----+
    |  5|    80|    a|Alice|
    |  5|    80|    a|  Bob|
    | 10|    80|    b| Cycy|
    | 12|    75|    b| Didi|
    | 10|    70|    b| EiEi|
    +---+------+-----+-----+
    

    1. 选取列 select

    除了选取现有的列,还可以增加新列,并且还可以将列的顺序重排。

    df1 = df.select("name", (df.age+1).alias("new_age"))
    print df1.show()
    +-----+-------+
    | name|new_age|
    +-----+-------+
    |Alice|      6|
    |  Bob|      6|
    | Cycy|     11|
    | Didi|     13|
    | EiEi|     11|
    +-----+-------+
    

    2.常数列 lit

    df2 = df.select("name", (df.age+1).alias("new_age"), sf.lit(2))
    print df2.show()
    +-----+-------+---+
    | name|new_age|  2|
    +-----+-------+---+
    |Alice|      6|  2|
    |  Bob|      6|  2|
    | Cycy|     11|  2|
    | Didi|     13|  2|
    | EiEi|     11|  2|
    +-----+-------+---+
    # 也可以重命名
    df2 = df.select("name", (df.age+1).alias("new_age"), sf.lit(2).alias("constant"))
    print df2.show()
    +-----+-------+--------+
    | name|new_age|constant|
    +-----+-------+--------+
    |Alice|      6|       2|
    |  Bob|      6|       2|
    | Cycy|     11|       2|
    | Didi|     13|       2|
    | EiEi|     11|       2|
    +-----+-------+--------+
    

    当然新增列的方式还可以用withColumn,这里不赘述了。

    3.条件分支 when otherwise

    当多个条件时,一直使用when进行连接,直到使用otherwise。注意当逻辑判断中出现多个判断,则需单个使用()后再进行&或|连接,比如(df.age>=7)&(df.age<11); 否则会报错。

    df3 = df.withColumn("when", sf.when(df.age<7, "kindergarten").when((df.age>=7)&(df.age<11), 'low_grade').otherwise("high_grade"))
    print df3.show()
    +---+------+-----+-----+------------+
    |age|height|level| name|        when|
    +---+------+-----+-----+------------+
    |  5|    80|    a|Alice|kindergarten|
    |  5|    80|    a|  Bob|kindergarten|
    | 10|    80|    b| Cycy|   low_grade|
    | 12|    75|    b| Didi|  high_grade|
    | 10|    70|    b| EiEi|   low_grade|
    +---+------+-----+-----+------------+
    

    4. 数学函数

    数学函数不在此枚举,包括简单的+、-、*、/,log、pow、各三角函数,以及还有round、floor等。具体可见官网 pyspark.sql.functions

    5. 时间函数

  • 获取时间current_date()、current_timestamp()、
  • 格式转换date_format()、year()、month()、等
  • 时间运算date_add()、date_sub()等
  • 6. 窗口函数 row_number

    from pyspark.sql.window import Window
    df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy("level").orderBy("age")).alias("rowNum"))
    # 其他写法
    df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy(df.level).orderBy(df.age)).alias("rowNum"))
    print df_r.show()
    +---+------+-----+-----+----------+                                             
    |age|height|level| name|row_number|
    +---+------+-----+-----+----------+
    | 10|    80|    b| Cycy|         1|
    | 10|    70|    b| EiEi|         2|
    | 12|    75|    b| Didi|         3|
    |  5|    80|    a|  Bob|         1|
    |  5|    80|    a|Alice|         2|
    

    表示逆序,或者根据多个字段分组

    df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy(df.level, df.age).orderBy(sf.desc("name"))).alias("rowNum"))
    # 另一种写法
    df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy("level", "age").orderBy(sf.desc("name"))).alias("rowNum"))
    print df_r.show()
    +---+------+-----+-----+----------+
    |age|height|level| name|row_number|
    +---+------+-----+-----+----------+
    |  5|    80|    a|  Bob|         1|
    |  5|    80|    a|Alice|         2|
    | 10|    70|    b| EiEi|         1|
    | 10|    80|    b| Cycy|         2|
    | 12|    75|    b| Didi|         1|
    +---+------+-----+-----+----------+
    

    可是,下面这种写法是错误的。

    df_r = df.withColumn('row_number', sf.row_number().over(Window.partitionBy(df.level, df.age).orderBy(sf.desc(df.name))).alias("rowNum"))
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/user/local/spark-2.1.1/python/pyspark/sql/functions.py", line 39, in _
        jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
      File "/home/user/local/spark-2.1.1/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/home/user/local/spark-2.1.1/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/home/user/local/spark-2.1.1/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 323, in get_return_value
    py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.desc. Trace:
    py4j.Py4JException: Method desc([class org.apache.spark.sql.Column]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
        at py4j.Gateway.invoke(Gateway.java:274)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
    

    7. 自定义函数 udf

    udf只能对每一行进行操作,无法对groupBy后的数据处理。

    from pyspark.sql import types as st
    def ratio(a, b):
        if a is None or b is None or b == 0:
            r = -1.0
        else:
            r = 1.0 * a / b
        return r
    col_ratio = udf(ratio, st.DoubleType())
    df_udf = df.withColumn("ratio", col_ratio(df.age, df.height))
    print df_udf.show()
    +---+------+-----+-----+-------------------+
    |age|height|level| name|              ratio|
    +---+------+-----+-----+-------------------+
    |  5|    80|    a|Alice|             0.0625|
    |  5|    80|    a|  Bob|             0.0625|
    | 10|    80|    b| Cycy|              0.125|
    | 12|    75|    b| Didi|               0.16|
    | 10|    70|    b| EiEi|0.14285714285714285|
    +---+------+-----+-----+-------------------+
    

    2.3版本以后有pandas_udf,用法比udf更多,可以进行groupBy后的聚合。但由于目前我使用的pyspark版本限制,无法进行实验。

    8. split & exploda

  • 官网 pyspark.sql.functions