pandas提供了很多的方式去创建dataframe,例如接收字典,读取csv或excel格式文件。

在读取本地文件的时候,可以直接调用读取csv的方法,但是多数情况下,csv文件是作为一种模型的训练数据存储在hdfs上,pandas是没有办法直接读取hdfs上的数据的,需要通过hdfs客户端从hdfs上读取文件内容,再想办法转为pandas的dataframe。

  • 之前用的两种方式:
    1. 一种是读取hdfs的csv,然后写入本地文件,再利用pandas读取这个本地文件返回一个dataframe。
      import hdfs
      import pandas as pd
      hdfs_user = "root"
      hdfs_addr = "http://centos121:9870"
      cli = hdfs.InsecureClient(hdfs_addr, user=hdfs_user)
      hdfs_path = "/test/a.csv"
      with cli.read(hdfs_path, encoding='utf-8')as reader:
          res_list = list()
          for row in reader:
              res_list.append(row)
          with open("D:\\b.csv", 'w', encoding='utf-8')as f:
              f.writelines(res_list)
      df = pd.read_csv("D:\\b.csv")
      print(df)

    2. 第二种是读取hdfs的csv,然后存到一个列表中,然后逐行处理字符串,最终得到一个二维数组,然后将二维数组传给pandas得到一个dataframe。

      import hdfs
      import pandas as pd
      import re
      hdfs_user = "root"
      hdfs_addr = "http://centos121:9870"
      cli = hdfs.InsecureClient(hdfs_addr, user=hdfs_user)
      hdfs_path = "/test/a.csv"
      def split_by_dot_escape_quote(string):
          按逗号分隔字符串,若其中有引号,将引号内容视为整体
          # 匹配引号中的内容,非贪婪,采用正向肯定环视,
          # 当左引号(无论单双引)被匹配到,放入组quote,
          # 中间的内容任意,但是要用+?,非贪婪,且至少有一次匹配到字符,
          # 若*?,则匹配0次也可,并不会匹配任意字符(环视只匹配位置不匹配字符),
          # 由于在任意字符后面又限定了前面匹配到的quote,故只会匹配到",
          # +?则会限定前面必有字符被匹配,故"",或引号中任意值都可匹配到
          pattern = '(?=(?P<quote>[\'\"])).+?(?P=quote)'
          rs = re.finditer(pattern, string)
          for data in rs:
              # 匹配到的字符串
              old_str = data.group()
              # 将匹配到的字符串中的逗号替换为特定字符,
              # 以便还原到原字符串进行替换
              new_str = old_str.replace(',', '${dot}')
              # 由于匹配到的引号仅为字符串申明,并不具有实际意义,
              # 需要把匹配时遇到的引号都去掉,只替换掉当前匹配组的引号
              new_str = re.sub(data.group('quote'), '', new_str)
              string = string.replace(old_str, new_str)
          sps = string.split(',')
          return map(lambda x: x.replace('${dot}', ','), sps)
      def read_hdfs(hdfs_path, random_num_list=None):
          head_str = ''  # 表头
          res_list = []  # 表格数据
          with cli.read(hdfs_path, encoding='utf-8') as reader:
              index = 0
              for i, row_data in enumerate(reader):
                  if random_num_list is not None and len(random_num_list) <= 0:
                      break
                  if i == 0:  # 取第一行表头
                      head_str = re.sub('\r|\n|\t', '', row_data)
                  else:  # 去掉表头后的数据
                      index += 1  # 非空值数据当前行号
                      if random_num_list is not None and len(random_num_list) > 0 and index == random_num_list[0]:
                          res_list.append(re.sub('\r|\n|\t', '', row_data))
                          random_num_list.remove(random_num_list[0])
                      elif random_num_list is None:
                          res_list.append(re.sub('\r|\n|\t', '', row_data))
          src_list = list(map(split_by_dot_escape_quote, res_list))
          return pd.DataFrame(src_list, columns=head_str.split(','))
      print(read_hdfs(hdfs_path))
      

      第二种方式,大多数操作都是自己在处理每种情况,一是出错率较高,二是比较繁琐,由于代码水平有限,执行效率也不如pandas直接读取高。

  • 后来想到一种改良第一种方式的办法,不写入文件,只创建一个文件对象用来存储读到的内容,然后将这个文件对象传给pandas,事实证明这是可行的。因为本质上pandas读取csv得到的也是一个在内存中的文件对象,这样的话我就不需要再用正则去判断诸多可能导致创建df失败的情况了。

    from io import StringIO
    import hdfs
    import pandas as pd
    hdfs_user = "root"
    hdfs_addr = "http://centos121:9870"
    hdfs_path = "/test/a.csv"
    cli = hdfs.InsecureClient(hdfs_addr, user=hdfs_user)
    with cli.read(hdfs_path, encoding='utf-8')as reader:
        res_list = ''
        for row in reader:
            res_list += row
        pd.read_csv(StringIO(res_list))

    StringIO就是用来在内存中读写字符串,它会返回一个文件对象,正好可以传入pandas的read_csv,这样就实现了不需要和磁盘io的过程,减少了开销。

Python保险客户办理数据集 csv 将其中的json 字符串 转换为 dataframe 格式 化处理 case_id,event,timestamp,payload,aggregate_type 125044,claim,2020-09-06 00:01:00.277,"{'person': {'relation': 'self'}, 'incident': {'incident_scene': 'work', 'date_of_incident': '2018-02-05', 'time_of_incident': '10:30:00'}, 'occupation': {'occupation': 'employee'}}",case case_id,event,timestamp,payload,aggregate_type 125044,connected_to_customer,2021-02-18 19:51:09.219,{'customerId': 'ec40fad4-bc74-4774-8a2c-ba92c57191b4' numpy pandas 数据分析 数据挖掘 scala代码 import org.apache.spark.sql. DataFrame import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType} // 定义schema val schema:StructType = StructType( Array( StructField("name", St 情况一: 一个 csv 文件作为数据集,另外 一个 csv 文件作为查找条件,假设有 一个 csv 文件A,包含学生的姓名、id、年龄、性别等信息;另外 一个 csv 文件B,只包含一些需要查找的id号码。我们需要从A中查找这些id对应的学生姓名,并将结果写入 一个 新的 csv 文件。假设有两个 csv 文件A和B,其中A包含学生的姓名、id、年龄、性别等信息;但是这两个文件的条目数不同,我们需要根据id进行匹配,将两个文件中数据匹配成功的行合并在一起。情况二:两个 csv 文件的条目数不同,需要按照相同的字段进行匹配。 pandas 创建 DataFrame 的几种方式 如果你是 一个 pandas初学者,那么不知道你会不会像我一样。在学用列表或者数组 创建 DataFrame 时理不清怎样用数据生成以及想要形状的的 Dataframe ,那么,现在,你不用自己琢磨了,我这里给你整理了一下,现在我们就来看看这三种生成 Dataframe 的方式。 1.用传入列表或者数组 创建 DataFrame 采用列表 创建 DataFrame nums = [[i for i in range(3)] for _ in range(10)] Dataframe 是pandas中的一种数据结构,表示二维矩阵的数据表,区别于列表和字典这种一维的结构。二维具体表示为行和列,类似于sql中表的 格式 (或者简单理解为类似于excel中的一张表),每一列可储存不同的数据类型,比如字符型、数值型、布尔型或者object类型。...... 不多说,直接上代码from hdfs import Clientimport pandas as pdFILENAME = "/tmp/preprocess/part-00000" #hdfs文件路径COLUMNNAMES = [xx']def readHDFS():'''读取hdfs文件Returns:df: dataframe hdfs数据'''client = Client(HDFSHOST)... import io a = '1,2,3;4,5,6;7,8,9;' df = pd.read_ csv (io.StringIO(a), lineterminator=';', header=None) print(df) lineterminator:参数可以根据数据的结构进行更改,我的数据是用;隔开的,所以参数设置为; csv _file = &amp;quot;yourfilename. csv &amp;quot; csv _data = pd.read_ csv ( csv _file, low_memory = False)#防止弹出警告 csv _df = ... 一直以为pandas的输出 字符串 不可以带双引号,但有时候又需要带双引号的 字符串 ,怎么办呢?这个问题困扰我很久始终没解决,今天也是试着死马当活马医的态度试试看,无意间大喜过望。真是踏破铁鞋无觅处得来全不费工夫,...