Qlib学习笔记5-DataLayer: Data API之Filter(过滤器)

Qlib学习笔记5-DataLayer: Data API之Filter(过滤器)

前言

本文大部分按照官方文档的行文逻辑, 但会进行补充或删减 ,只讲笔者认为较为重要的部分,其他部分可以参考官方文档: 官方文档Data Layer部分

本章将讲解 DataLayer:Data API 部分,官方文档中的介绍分为如下几个部分:

DataLayer:Data API

其中 Feature Filter 部分是我们需要注意的的。

Feature 可以理解为 特征矩阵 ,比如每支标的的OHLCV。对于 Feature 部分的讲解,在 Qlib学习笔记3-快速开始(上) 中已经进行示范,故不再进行赘述。

Filter 起到的作用是 数据筛选 ,比起 D.features()中的instruments 参数, Filter 的功能有过之而无不及,是一个强大的数据筛选类。在 qlib.data.filter 中为我们提供了 BaseDFilter NameDFilter SeriesDFilter ExpressionDFilter 四个 Filter 类,而他们继承关系如下:

Filters之间的继承关系

下面我们将一一介绍这几个 Filter 的作用与不同之处。

filter.py 源码传送门: 传送门

1. BaseDFilter

BaseDFilter 是接下来我们要讲的三个 Filter 类的父类,其源码如下:

class BaseDFilter(abc.ABC):
    """Dynamic Instruments Filter Abstract class
    Users can override this class to construct their own filter
    Override __init__ to input filter regulations
    Override filter_main to use the regulations to filter instruments
    def __init__(self):
    @staticmethod
    def from_config(config):
        """Construct an instance from config dict.
        Parameters
        ----------
        config : dict
            dict of config parameters.
        raise NotImplementedError("Subclass of BaseDFilter must reimplement `from_config` method")
    @abstractmethod
    def to_config(self):
        """Construct an instance from config dict.
        Returns
        ----------
            return the dict of config parameters.
        raise NotImplementedError("Subclass of BaseDFilter must reimplement `to_config` method")

BaseDFilter 为一个抽象类,使用者如果想要自定义一个 Filter ,需要继承该类并重载里面的方法。注释中提到两个点:

  • Override __init__ to input filter regulations:自定义 Filter 时,需要重载 __init__() 方法,在该方法中输入我们的 regulations 参数,包括但不限于:正则表达式( NameDFilter )、 Qlib expression ExpressionDFilter )。 通俗解释为我们在这个 __init__() 方法中需要初始化我们的筛选(过滤)规则。
  • Override filter_main to use the regulations to filter instruments:重载 filter_main 方法使用 __init__() 中输入的 regulation 参数来 filter 我们的 instruments 通俗解释为我们在这个 filter_main() 函数中完成筛选(过滤)操作。

可以看到, BaseDFilter 类继承了 abc 模块中的 ABC 类,且有一个抽象方法 to_config() 和一个静态方法 from_config() 。这使得我们无法直接对 BaseDFilter 进行实例化, 想要使用该方法,就必须要继承该类并实现 to_config() 如果我们使用如下代码实例化,则会产生如下报错:

bdf = BaseDFilter()
# output:TypeError: Can't instantiate abstract class BaseDFilter with abstract methods to_config

那么 from_config() 函数和 to_config() 两个函数到底是做什么的呢?在注释中告诉我们: to_config() 函数会返回一个 dict 类型的 config ,为该 Filter 的具体参数; from_config() 函数需要传入一个 dict 类型的 config ,会返回一个根据给的 config 而生成的当前类实例。在之后的 NameDFilter 中会举例说明。

2. SeriesDFilter

2.1 __init__()

SeriesDFilter 继承至 BaseDFilter ,同样的, SeriesDFilter 也不可直接进行如下初始化,如果直接初始化,会产生如下报错:

sdf = SeriesDFilter()
# output:TypeError: Can't instantiate abstract class SeriesDFilter with abstract methods _getFilterSeries, to_config

那么既然如果不能直接实例化,那 __init__() 是如何初始化的呢?答案在 SeriesDFilter 的子类中进行初始化,如: NameDFilter 中的 __init__() 函数中:

def __init__(self, name_rule_re, fstart_time=None, fend_time=None):
    """Init function for name filter class
    params:
    ------
    name_rule_re: str
    regular expression for the name rule.
    super(NameDFilter, self).__init__(fstart_time, fend_time)
    self.name_rule_re = name_rule_re

其中 super(NameDFilter, self).__init__(fstart_time, fend_time) 这句便是对 SeriesDFilter 的初始化。 SeriesDFilter.__init__() 的源码如下:

def __init__(self, fstart_time=None, fend_time=None, keep=False):
    super(SeriesDFilter, self).__init__()
    self.filter_start_time = pd.Timestamp(fstart_time) if fstart_time else None
    self.filter_end_time = pd.Timestamp(fend_time) if fend_time else None
    self.keep = keep

注释我就不放了,其中 fstart_time 参数为 filter 开始对 instruments 使用 regulation 进行过滤的开始时间; fend_time 同理; keep 表示是否保留那些在 fstart_time-fend_time 之间没有 features (没有数据)的 instruments

2.2 _getFilterSeries()

细心的同学可能会发现,直接实例化 SeriesDFilter 和上边实例化 BaseDFilter 时有所不同,这里的报错出现了另一个抽象方法 _getFilterSeries() ,我们点进源码查看,发现该方法的源码如下:

@abstractmethod
def _getFilterSeries(self, instruments, fstart, fend):
    """Get filter series based on the rules assigned during the initialization and the input time range.
        Parameters
        ----------
        instruments : dict
            the dict of instruments to be filtered.
        fstart : pd.Timestamp
            start time of filter.
        fend : pd.Timestamp
            end time of filter.
        .. note:: fstart/fend indicates the intersection of instruments start/end time and filter start/end time.
        Returns
        ----------
        pd.Dataframe
            a series of {pd.Timestamp => bool}.
    raise NotImplementedError("Subclass of SeriesDFilter must reimplement `getFilterSeries` method")

这是一个相当重要的方法,也是我们在进行继承时 主要需要重写的地方 。区别于 filter_main() 函数, 我们希望将真正核心的筛选(过滤)步骤,或者说最能体现这个类的特点的步骤放在这个函数中 ,举个例子: NameDFilter 以正则表达式筛选 instruments ,而在他的 _getFilterSeries() 函数源码如下:

def _getFilterSeries(self, instruments, fstart, fend):
    all_filter_series = {}
    filter_calendar = Cal.calendar(start_time=fstart, end_time=fend, freq=self.filter_freq)
    for inst, timestamp in instruments.items():
        if re.match(self.name_rule_re, inst):
            _filter_series = pd.Series({timestamp: True for timestamp in filter_calendar})
    else:
        _filter_series = pd.Series({timestamp: False for timestamp in filter_calendar})
    all_filter_series[inst] = _filter_series
    return all_filter_series

不需要逐行看清,只需看到函数中for-loop的 re.match() 就知道, NameDFilter 就是在这个函数中实现正则匹配的。

2.3 其他工具函数

由于篇幅问题,下面只列举出他们的函数头和返回的值:

  • _getTimeBound(self, instruments) :return the lower time bound and upper time bound of all the instruments。
  • _toSeries(self, time_range, target_timestamp) :return the series of bool value for an instrument。
  • _filterSeries(self, timestamp_series, filter_series) :return the series of bool value indicating whether the date satisfies the filter condition and exists in target timestamp.
  • _toTimestamp(self, timestamp_series) :return the list of tuple (timestamp, timestamp)

2.4 __call__() & filter_main()

当我们实例化 SeriesDFilter 及其子类时,如果要调用实例时,就会调用 __call__() 方法,其源码如下:

def __call__(self, instruments, start_time=None, end_time=None, freq="day"):
    """Call this filter to get filtered instruments list"""
    self.filter_freq = freq
    return self.filter_main(instruments, start_time, end_time)

这个函数的原理很简单,就是调用 filter_main() 函数对 insturments 进行筛选(过滤)操作。

由于篇幅问题,下方只放出 filter_main() 的函数头:

def filter_main(self, instruments, start_time=None, end_time=None):
    """Implement this method to filter the instruments.
        Parameters
        ----------
        instruments: dict
            input instruments to be filtered.
        start_time: str
            start of the time range.
        end_time: str
            end of the time range.
        Returns
        ----------
            filtered instruments, same structure as input instruments.
        """

这个函数做的事情就是调用我们之前提到的各种工具函数,最后返回根据我们 regulation 过滤(筛选)过的 instruments 。其 实这也算是 SeriesDFilter 的核心,在所有继承 SeriesDFilter 的子类中其实都省去了很多对于时间的操作,因为这些对于时间范围的操作大多都在 filter_main() 中实现了,使用者在自定义 filter 时,只需要考虑 __init__() _getFilterSeries() 函数的重写即可。

至此我们已经将 filter.py 脚本中的 BaseDFilter SeriesDFilter 讲完,这两个类实际上是整个脚本文件中的大头,而继承了 NameDFilter 类的 NameDFilter ExpressionDFilter 反而相当简单。

3. NameDFilter

NameDFilter 继承自 SeriesDFilter 类, 可以使用正则表达式来对 instruments 的名字进行匹配,也即找出名字符合某种模式的 instruments

3.1 __init__() & _getFilterSeries()

除去继承但未重载(或复写)的函数, NameDFilter 中一共只有四个函数,且都重载(或复写)于他们的父类,分别是: __init__() _getFilterSeries() from_config() to_config() 。其中 __init__() 函数中多了一个参数 name_rule_re

def __init__(self, name_rule_re, fstart_time=None, fend_time=None):
        """Init function for name filter class
        params:
        ------
        name_rule_re: str
            regular expression for the name rule.
        super(NameDFilter, self).__init__(fstart_time, fend_time)
        self.name_rule_re = name_rule_re

name_rule_re 即用于匹配 instrument 名字的正则表达式,举个例子,我想找到中证300中名字中带 SH 的股票,即在上交所上市的股票,使用如下代码:

# 找到代码中以'SH'开头的instruments
sh_ndf = NameDFilter('SH')
# 使用filter_pipe参数输入sh_ndf
csi300_sz_ins = D.instruments(market='csi300', filter_pipe=[sz_ndf])
# 使用list_instruments查看csi300_sh_ins中包含的所有个体
sh_dict = D.list_instruments(csi300_sh_ins).keys()

输出:

SH开头的股票都被筛选出来

当然你可以使用更多更复杂的正则表达式来匹配 instruments 的名字,由于篇幅问题( 我自己不太懂正则 ),故在此不再放出更多例子。

对于 _getFilterSeries() 函数,其源码如下:

def _getFilterSeries(self, instruments, fstart, fend):
    all_filter_series = {}
    filter_calendar = Cal.calendar(start_time=fstart, end_time=fend, freq=self.filter_freq)
    # 遍历所有instruments,一个个进行匹配
    for inst, timestamp in instruments.items():
        if re.match(self.name_rule_re, inst):
            _filter_series = pd.Series({timestamp: True for timestamp in filter_calendar})
        else:
            _filter_series = pd.Series({timestamp: False for timestamp in filter_calendar})
        all_filter_series[inst] = _filter_series
    return all_filter_series

其核心内容即使用正则表达式对 instruments 的名字进行匹配。之后返回一个 boolean Series 给到 filter_main() 函数用于筛选,相当于返回一个 mask

3.2 from_config() & to_config()

对于 from_config() to_config() 函数,他们的代码和干的事情其实都很简单,源码如下:

@staticmethod
def from_config(config):
    return NameDFilter(
        name_rule_re=config["name_rule_re"],
        fstart_time=config["filter_start_time"],
        fend_time=config["filter_end_time"],
def to_config(self):
    return {
        "filter_type": "NameDFilter",
        "name_rule_re": self.name_rule_re,
        "filter_start_time": str(self.filter_start_time) if self.filter_start_time else self.filter_start_time,
        "filter_end_time": str(self.filter_end_time) if self.filter_end_time else self.filter_end_time,
    }

敲个代码试试看他们到底返回什么:

sh_ndf_cfg = sh_ndf.to_config()
print(sh_ndf_cfg)
print(NameDFilter.from_config(sh_ndf_cfg).to_config())

输出如下:

也就是说 to_config() 会返回当前实例的 config ,而 from_config() 作为一个 静态方法 ,可以直接从类中调用,输入 config 返回一个实例化对象。

4. ExpressionDFilter

4.1 与NameDFilter的区别

ExpressionDFilter 同样继承于 SeriesDFilter ,但与 NameDFilter instruments 层面的筛选不同的是, Expression 是对 features 的筛选。

4.2 __init__() & _getFilterSeries()

ExpressionDFilter __init__() NameDFilter 的差不多,只不过他的 regulation 变成了 rule_expression ,这个参数要求我们传入 Qlib 特有的 experssion 表达式:

def __init__(self, rule_expression, fstart_time=None, fend_time=None, keep=False):
    super(ExpressionDFilter, self).__init__(fstart_time, fend_time, keep=keep)
    self.rule_expression = rule_expression

ExpressionDFilter _getFilterSeries() 十分有趣,他使用了表达式机制来对我们传入的 rule_expression 进行识别,之后返回一个mask到 filter_main() 函数中:

def _getFilterSeries(self, instruments, fstart, fend):
    # do not use dataset cache
    try:
        _features = DatasetD.dataset(
            instruments,
            [self.rule_expression],
            fstart,
            fend,
            freq=self.filter_freq,
            disk_cache=0,
    except TypeError:
    # use LocalDatasetProvider
        _features = DatasetD.dataset(instruments, [self.rule_expression], fstart, fend, freq=self.filter_freq)
    rule_expression_field_name = list(_features.keys())[0]
    all_filter_series = _features[rule_expression_field_name]
    return all_filter_series

值得注意的是,在这个函数中使用了 DatasetD.dataset ,我们的 rule_expression 参数被当作 field 输入,之后从 dataset 中再取出这一列。这其实是 Datalayer:Dataset 中的内容, Qlib 中的 Dataset 自带一种表达式机制,我们可以将一个表达式当作 field 参数输入,举个例子:

如果$close<5则为1

由于 ExpressionDFilter from_config() to_config() 函数其实作用差不太多,故在此不再赘述。

5. 自定义Filter

如果我们想要自定义一个 Filter ,建议直接继承 SeriesDFilter ,然后按照以下模式进行自定义即可:

class MyDFilter(SeriesDFilter):
    # regulation为约束参数,即用于_getFilterDSeries中用于筛选的“依据”
    def __init__(self, regulation, fstart_time=None, fend_time=None, keep=False):
        super(MyDFilter, self).__init__(fstart_time, fend_time, keep=keep)
        self.regulation = regulation
    # filter逻辑
    def _getFilterSeries(self, instruments, fstart, fend):
        ##############################
        # 重写区域
        ##############################
    @staticmethod
    def from_config(config):
        return MyDFilter(
            regulation=config["regulation"],
            fstart_time=config["filter_start_time"],
            fend_time=config["filter_end_time"],
            keep=config["keep"],
    def to_config(self):
        return {