相关文章推荐
道上混的豌豆  ·  北京外国语大学 ...·  1 年前    · 
文质彬彬的大象  ·  javascript - CSS mask ...·  1 年前    · 
痴情的雪糕  ·  Android Kotlin 實作 Day ...·  1 年前    · 

Get Your Hands Dirty!

这是对于中国人民大学信息学院研究生课——PostgreSQL源码分析课程作业的回顾。
PostgreSQL的源码中定义了很多内建的函数(build-in function),这些函数是在数据库启动时就可以使用的,一般是一些比较重要和常用的函数。作为内核学习的第一步,我选择了自己实现一个内建的聚合函数,并重新编译运行数据库。
此文要综合用到前三篇博客的内容:

一、明确目标

我们的目标是增加一些聚合函数,功能为计算信息熵相关的值(很有DB for AI)的感觉了。
具体来讲,我们打算要实现以下四个函数:

1.1 entropy

D K L ( p q ) = i = 1 N P ( x i ) ( l o g p ( x i ) l o g q ( x i ) )

D J S ( p q ) = 2 1 D K L ( P M ) + 2 1 D K L ( M Q )

输入为int的聚合函数 输入为text的聚合函数
entropy(int, int) entropy(text, text)
cross_entropy(int, int) cross_entropy(text, text)
kl_divergence(int, int) kl_divergence(text, text)
js_divergence(int, int) js_divergence(text, text)

通过查阅文档,我们可以看到,一个聚合函数至少有两个函数来实现:

  1. 聚集转换函数(aggregate transition function):该函数接收聚合函数的输入参数以及初始,并且返回一个转换后的状态。
  2. 聚集最终函数(aggregate final function):该函数接受转换完成之后的状态,并且计算最终结果当作聚集函数的返回值。

此外,还可以添加聚集结合函数(aggregate combine function):这个函数接收两个状态,合并为一个状态,并且返回,主要是当并行计算时,可能多组同时计算得到多个中间状态,需要通过这样的函数去合并这些状态。

进行下面步骤之前,我们先用源码安装好PostgreSQL(教程见 这里 )。在学习如何新增函数的过程中,少不了对原有的函数的学习,这个过程中需要调试来辅助学习,因此还应该先学习这篇文章:【 如何调试PG 】。

二、新增一个.c文件

我们新增一个.c文件,具体加在 {PGPATH}/src/backend/utils/adt/entropy.c 这个位置。{PGPATH}是你的PostgreSQL源码位置。具体为什么是这里呢,因为我们看源码发现,这个adt文件夹下面,都是针对内建的数据类型使用的各种函数的实现,比如求和、求方差、求平均数、求斜率等函数都实现在这个文件夹下。这里的adt,其实就是我们数据结构中抽象数据类型(Abstract Data Type)的缩写。所以大胆猜想,文件应该加在这里。
里面具体的一些写法,如果看不懂没关系,很多是PG的宏定义,或者一些PG系统的特性(比如context的切换)。只需要打开几个其他文件,看懂算法,然后照抄即可,毕竟我们首要目标是实现我们的功能。

下面的代码看起来虽长,但结构比较清晰,自我感觉还是很好懂的:

2.1 定义及实现函数

实现了上面8个函数每一个的转换函数、组合函数、最终函数。实现的函数有:
主要的函数,这些函数都需要之后写入PostgreSQL的初始函数表:
1.1. Datum int_entropy_accum(PG_FUNCTION_ARGS)
1.2. Datum text_entropy_accum(PG_FUNCTION_ARGS)
1.3. Datum entropy_combine(PG_FUNCTION_ARGS)
1.4. Datum entropy_final(PG_FUNCTION_ARGS)
1.5. Datum cross_int_entropy_accum(PG_FUNCTION_ARGS)
1.6. Datum cross_text_entropy_accum(PG_FUNCTION_ARGS)
1.7. Datum cross_entropy_combine(PG_FUNCTION_ARGS)
1.8. Datum cross_entropy_final(PG_FUNCTION_ARGS)
1.9. Datum kld_final(PG_FUNCTION_ARGS)
1.10. Datum jsd_final(PG_FUNCTION_ARGS)

辅助的函数,这里的函数只在该模块内部调用:
1.11. int APHash(char *str)
1.12. static ItemPs *makeItemPsAggState(FunctionCallInfo fcinfo)
1.13. static TwoItemPs *makeTwoItemPsAggState(FunctionCallInfo fcinfo)
1.14. void freeItemPs(ItemPs *state)
1.15. void freeTwoItemPs(TwoItemPs *state)
1.16. int is_in(int value, int *arr, int length)
1.17. void add_value_to_state(ItemPs *state, int value, int count, bool N_add)

2.2 定义了两种结构体

用ItemPs和TwoItemPs,去保存聚合函数的中间状态。由于我们要计算熵,ItemPs状态需要保存要计算的元素总数N、保存各个元素的值category、各个元素的统计数目counts,num记录元素的种类数,也即category和counts保存值的个数,size记录总的开辟空间的大小。TwoItemPs由两个ItemPs构成。
具体定义如下:

typedef struct ItemPs
   int N;         // 元素数量统计
   int num;       // 已经有的种类个数
   int size;      // 总的长度
   int *counts;   // 各个元素的个数统计
   int *category; // 各个元素的大小
} ItemPs;
typedef struct TwoItemPs
   ItemPs *x;
   ItemPs *y;
} TwoItemPs;

2.3 内存管理

在写C代码中,要注意内存的开辟与释放。PostgreSQL有一套自己的内存上下文管理系统,所以要用PostgreSQL提供的palloc、pfree、repalloc等函数,以及分配内存时要注意使用MemoryContextSwitchTo函数,此处不详细说明,用的时候只需要照抄别的函数即可。

2.4 对输入返回的说明

输入的PG_FUNCTION_ARGS其实是一个宏定义,就是FunctionCallInfo fcinfo,保存了调用这个函数的函数的信息以及输入的参数列表。
返回值中的Datum其实就是一个long long int,用来存储一个地址,代表一个指针,定义及相关函数操作有一个专门的模块datum.c,也在adt文件夹下面。

/*-------------------------------------------------------------------------
 - entropy.c
 - a module contain some functions defined by Qinliang Xue
 - TODO: 
 - - mse function
 - - Wasserstein distance
 *-------------------------------------------------------------------------
#include "postgres.h"
#include <ctype.h>
#include <float.h>
#include <limits.h>
#include <math.h>
#include <string.h>
#include "catalog/pg_type.h"
#include "common/hashfn.h"
#include "common/int.h"
#include "funcapi.h"
#include "lib/hyperloglog.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "nodes/supportnodes.h"
#include "utils/array.h"
#include "utils/arrayaccess.h"
#include "utils/datum.h"
#include "utils/float.h"
#include "utils/guc.h"
#include "utils/int8.h"
#include "utils/numeric.h"
#include "utils/sortsupport.h"
#include "utils/builtins.h"
#define INIT_NUM 10
typedef struct ItemPs
    int N;         // 元素数量统计
    int num;       // 已经有的种类个数
    int size;      // 总的长度
    int *counts;   // 各个元素的个数统计
    int *category; // 各个元素的大小
} ItemPs;
typedef struct TwoItemPs
    ItemPs *x;
    ItemPs *y;
} TwoItemPs;
/* APHash - convert a string to a int
 * used in text entropy functions
int APHash(char *str)
    int hash = 0;
    int i;
    for (i = 0; *str; i++)
        if ((i & 1) == 0)
            hash ^= ((hash << 7) ^ (*str++) ^ (hash >> 3));
            hash ^= (~((hash << 11) ^ (*str++) ^ (hash >> 5)));
    return (hash & 0x7FFFFFFF);
/* makeItemPsAggState
 * Init an ItemPs state and return it,
 * the input parameter fcinfo contains the called function's information
static ItemPs *
makeItemPsAggState(FunctionCallInfo fcinfo)
    ItemPs *state;
    MemoryContext aggcontext;
    MemoryContext oldcontext;
    if (!AggCheckCallContext(fcinfo, &aggcontext))
        /* cannot be called directly because of internal-type argument */
        elog(ERROR, "aggregate function entropy called in non-aggregate context");
	 * Create state in aggregate context.  It'll stay there across subsequent
	 * calls.
    oldcontext = MemoryContextSwitchTo(aggcontext);
    state = (ItemPs *)palloc(sizeof(ItemPs));
    state->category = (int *)palloc(sizeof(int) * INIT_NUM);
    state->counts = (int *)palloc(sizeof(int) * INIT_NUM);
    state->size = INIT_NUM;
    state->num = 0;
    state->N = 0;
    MemoryContextSwitchTo(oldcontext);
    return state;
/* makeTowItemPsAggState
 * Init a TwoItemPs state and return it,
 * the input parameter fcinfo contains the called function's information
static TwoItemPs *
makeTwoItemPsAggState(FunctionCallInfo fcinfo)
    TwoItemPs *state;
    MemoryContext aggcontext;
    MemoryContext oldcontext;
    if (!AggCheckCallContext(fcinfo, &aggcontext))
        /* cannot be called directly because of internal-type argument */
        elog(ERROR, "aggregate function entropy called in non-aggregate context");
	 * Create state in aggregate context.  It'll stay there across subsequent
	 * calls.
    oldcontext = MemoryContextSwitchTo(aggcontext);
    state = (TwoItemPs *)palloc(sizeof(TwoItemPs));
    state->x = makeItemPsAggState(fcinfo);
    state->y = makeItemPsAggState(fcinfo);
    MemoryContextSwitchTo(oldcontext);
    return state;
void freeItemPs(ItemPs *state)
    pfree(state->category);
    pfree(state->counts);
    pfree(state);
void freeTwoItemPs(TwoItemPs *state)
    freeItemPs(state->x);
    freeItemPs(state->y);
    pfree(state);
 * is_in - check a item in an array
 * the simplest way to find in order
int is_in(int value, int *arr, int length)
    for (int i = 0; i < length; i++)
        if (arr[i] == value)
            return i;
    return -1;
void add_value_to_state(ItemPs *state, int value, int count, bool N_add)
    int index = is_in(value, state->category, state->num);
    if (index >= 0)
        state->counts[index] += count;
        if (state->num == state->size)
            state->size += INIT_NUM;
            state->category = (int *)repalloc(state->category, sizeof(int) * INIT_NUM);
            state->counts = (int *)repalloc(state->counts, sizeof(int) * INIT_NUM);
        state->category[state->num] = value;
        state->counts[state->num++] = count;
    if (N_add)
        state->N += count;
 * int_entropy_accum - the aggregate transition function of entropy(int, int)
 * input params: PG_FUNCTION_ARGS is a marco definition of FunctionCallInfo fcinfo, containing the actural args
 * we can get args using PG_GETARG_{type}(args_order), for example, PG_GETARG_DATUM(1) means get the second datum args.
 * return: Datum is a long long int, this is a address of pointer
Datum
    int_entropy_accum(PG_FUNCTION_ARGS)
    ItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (ItemPs *)PG_GETARG_POINTER(0);
    if (state == NULL)
        state = makeItemPsAggState(fcinfo);
    int value = PG_GETARG_DATUM(1);
    add_value_to_state(state, value, 1, true); // first add, so count is 1
    PG_RETURN_POINTER(state);
Datum
    text_entropy_accum(PG_FUNCTION_ARGS)
    ItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (ItemPs *)PG_GETARG_POINTER(0);
    if (state == NULL)
        state = makeItemPsAggState(fcinfo);
    int value = APHash(text_to_cstring(PG_GETARG_TEXT_PP(1))); // text need hash func to int
    add_value_to_state(state, value, 1, true);                 // first add, so count is 1
    PG_RETURN_POINTER(state);
Datum
    entropy_combine(PG_FUNCTION_ARGS)
    ItemPs *state1;
    ItemPs *state2;
    state1 = (ItemPs *)PG_GETARG_POINTER(0);
    state2 = (ItemPs *)PG_GETARG_POINTER(1);
    int i;
    for (i = 0; i < state2->num; i++)
        add_value_to_state(state1, state2->category[i], state2->counts[i], true);
    PG_RETURN_POINTER(state1);
Datum
    entropy_final(PG_FUNCTION_ARGS)
    ItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (ItemPs *)PG_GETARG_POINTER(0);
    float8 entro = 0;
    int cate_num = state->num;
    int *counts = state->counts;
    int *category = state->category;
    int N = state->N;
    int j;
    for (j = 0; j < cate_num; j++)
        float8 p = (float8)(1.0 * counts[j] / N);
        entro += -p * log(p);
    freeItemPs(state);
    PG_RETURN_FLOAT8(entro);
Datum
    cross_int_entropy_accum(PG_FUNCTION_ARGS)
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    if (state == NULL)
        state = makeTwoItemPsAggState(fcinfo);
    int value1 = PG_GETARG_DATUM(1);
    int value2 = PG_GETARG_DATUM(2);
    int index1 = is_in(value1, state->x->category, state->x->num);
    int index12 = is_in(value1, state->y->category, state->y->num);
    int index2 = is_in(value2, state->y->category, state->y->num);
    int index21 = is_in(value2, state->x->category, state->x->num);
    add_value_to_state(state->x, value1, 1, true);
    add_value_to_state(state->y, value1, 0, false);
    add_value_to_state(state->y, value2, 1, true);
    add_value_to_state(state->x, value2, 0, false);
    PG_RETURN_POINTER(state);
Datum
    cross_text_entropy_accum(PG_FUNCTION_ARGS)
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    if (state == NULL)
        state = makeTwoItemPsAggState(fcinfo);
    int value1 = APHash(text_to_cstring(PG_GETARG_TEXT_PP(1)));
    int value2 = APHash(text_to_cstring(PG_GETARG_TEXT_PP(2)));
    int index1 = is_in(value1, state->x->category, state->x->num);
    int index12 = is_in(value1, state->y->category, state->y->num);
    int index2 = is_in(value2, state->y->category, state->y->num);
    int index21 = is_in(value2, state->x->category, state->x->num);
    add_value_to_state(state->x, value1, 1, true);
    add_value_to_state(state->y, value1, 0, false);
    add_value_to_state(state->y, value2, 1, true);
    add_value_to_state(state->x, value2, 0, false);
    PG_RETURN_POINTER(state);
Datum
    cross_entropy_combine(PG_FUNCTION_ARGS)
    TwoItemPs *state1;
    TwoItemPs *state2;
    state1 = (TwoItemPs *)PG_GETARG_POINTER(0);
    state2 = (TwoItemPs *)PG_GETARG_POINTER(1);
    int i;
    for (i = 0; i < state2->x->num; i++)
        add_value_to_state(state1->x, state2->x->category[i], state2->x->counts[i], true);
        add_value_to_state(state1->y, state2->x->category[i], 0, false);
        add_value_to_state(state1->y, state2->y->category[i], state2->y->counts[i], true);
        add_value_to_state(state1->x, state2->y->category[i], 0, false);
    PG_RETURN_POINTER(state1);
Datum
    cross_entropy_final(PG_FUNCTION_ARGS)
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    float8 entro = 0;
    if (state->x->num != state->y->num)
        PG_RETURN_NULL();
    if (state->x->N != state->y->N)
        PG_RETURN_NULL();
    int cate_num = state->x->num;
    int i;
    for (i = 0; i < cate_num; i++)
        if (state->x->category[i] != state->y->category[i])
            PG_RETURN_NULL();
    int *counts1 = state->x->counts;
    int *counts2 = state->y->counts;
    int N = state->x->N;
    int j;
    for (j = 0; j < cate_num; j++)
        float8 p = (float8)(1.0 * counts1[j] / N);
        float8 q = (float8)(1.0 * counts2[j] / N);
        entro += -p * log(q);
    freeTwoItemPs(state);
    PG_RETURN_FLOAT8(entro);
Datum
    kld_final(PG_FUNCTION_ARGS)
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    float8 entro = 0;
    if (state->x->num != state->y->num)
        PG_RETURN_NULL();
    if (state->x->N != state->y->N)
        PG_RETURN_NULL();
    int cate_num = state->x->num;
    int i;
    for (i = 0; i < cate_num; i++)
        if (state->x->category[i] != state->y->category[i])
            PG_RETURN_NULL();
    int *counts1 = state->x->counts;
    int *counts2 = state->y->counts;
    int N = state->x->N;
    int j;
    for (j = 0; j < cate_num; j++)
        float8 p = (float8)(1.0 * counts1[j] / N);
        float8 q = (float8)(1.0 * counts2[j] / N);
        entro += p * (log(p) - log(q));
    freeTwoItemPs(state);
    PG_RETURN_FLOAT8(entro);
Datum
    jsd_final(PG_FUNCTION_ARGS)
    TwoItemPs *state;
    state = PG_ARGISNULL(0) ? NULL : (TwoItemPs *)PG_GETARG_POINTER(0);
    float8 entro = 0;
    if (state->x->num != state->y->num)
        PG_RETURN_NULL();
    if (state->x->N != state->y->N)
        PG_RETURN_NULL();
    int cate_num = state->x->num;
    int i;
    for (i = 0; i < cate_num; i++)
        if (state->x->category[i] != state->y->category[i])
            PG_RETURN_NULL();
    int *counts1 = state->x->counts;
    int *counts2 = state->y->counts;
    int N = state->x->N;
    int j;
    for (j = 0; j < cate_num; j++)
        float8 p = (float8)(1.0 * counts1[j] / N);
        float8 q = (float8)(1.0 * counts2[j] / N);
        float8 m = (p + q) / 2;
        float8 e1 = p * (log(p) - log(m));
        float8 e2 = q * (log(q) - log(m));
        entro += ((e1 + e2) / 2);
    freeTwoItemPs(state);
    PG_RETURN_FLOAT8(entro);
 

这个文件位置是{PGPATH}/src/include/utils/builtins.h为什么builtin.h文件要修改呢?这里其实偷了个懒,按理说我们应该新增一个.h文件,但由于实现的函数很少,就直接在builtins.h中添加了,可以达到同样的效果。
随便找个地方,加入以下几行,也就是我们上面实现的几个要用到的函数。这里是实现函数的声明。

/* entropy.c */
extern Datum int_entropy_accum(PG_FUNCTION_ARGS);
extern Datum text_entropy_accum(PG_FUNCTION_ARGS);
extern Datum entropy_combine(PG_FUNCTION_ARGS);
extern Datum entropy_final(PG_FUNCTION_ARGS);
extern Datum cross_int_entropy_accum(PG_FUNCTION_ARGS);
extern Datum cross_text_entropy_accum(PG_FUNCTION_ARGS);
extern Datum cross_entropy_combine(PG_FUNCTION_ARGS);
extern Datum cross_entropy_final(PG_FUNCTION_ARGS);
extern Datum kld_final(PG_FUNCTION_ARGS);
extern Datum jsd_final(PG_FUNCTION_ARGS);

四、修改部分配置文件

这里修改配置文件之后,数据库在编译时会自动更新bki文件(bki即为backend interface),在初始化的时候根据bki脚本来初始化数据库。要修改的文件有:

  1. {PGPATH}/src/include/catalog/pg_proc.adt

修改pg_proc.adt是因为pg_proc表存储关于函数(或过程)的信息,这是一个系统表,在数据库初始化的时候会建立,只有把一个函数定义写入这个表,数据库才会在启动的时候加入该函数。

具体修改如下:pg_proc.adt其实是一个大型的list,内部包含了很多dict,每个dict对应一个函数的实现,在列表末尾加入以下内容(其实和顺序无关):

# entropy
{ oid => '5106', descr => 'aggregate transition function',
  proname => 'int_entropy_accum', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal int8', prosrc => 'int_entropy_accum' },
{ oid => '5107', descr => 'aggregate transition function',
  proname => 'text_entropy_accum', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal text', prosrc => 'text_entropy_accum' },
{ oid => '5108', descr => 'aggregate combine function',
  proname => 'entropy_combine', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal internal', prosrc => 'entropy_combine' },
{ oid => '5109', descr => 'aggregate final function',
  proname => 'entropy_final', prorettype => 'float8',
  proargtypes => 'internal', prosrc => 'entropy_final' },
{ oid => '5110',descr => 'calculate int entropy',
  proname => 'entropy', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'int8',
  prosrc => 'aggregate_dummy' },
{ oid => '5111',descr => 'calculate text entropy',
  proname => 'entropy', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'text',
  prosrc => 'aggregate_dummy' },
# cross_entropy
{ oid => '5112', descr => 'aggregate transition function',
  proname => 'cross_int_entropy_accum', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal int8 int8', prosrc => 'cross_int_entropy_accum' },
{ oid => '5113', descr => 'aggregate transition function',
  proname => 'cross_text_entropy_accum', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal text text', prosrc => 'cross_text_entropy_accum' },
{ oid => '5114', descr => 'aggregate combine function',
  proname => 'cross_entropy_combine', proisstrict => 'f', prorettype => 'internal',
  proargtypes => 'internal internal', prosrc => 'cross_entropy_combine' },
{ oid => '5115', descr => 'aggregate final function',
  proname => 'cross_entropy_final', prorettype => 'float8',
  proargtypes => 'internal', prosrc => 'cross_entropy_final' },
{ oid => '5116', descr => 'calculate cross int entropy',
  proname => 'cross_entropy', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'int8 int8',
  prosrc => 'aggregate_dummy' },
{ oid => '5117', descr => 'calculate cross text entropy',
  proname => 'cross_entropy', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'text text',
  prosrc => 'aggregate_dummy' },
{ oid => '5118', descr => 'aggregate final function',
  proname => 'kld_final', prorettype => 'float8',
  proargtypes => 'internal', prosrc => 'kld_final' },
{ oid => '5119', descr => 'calculate kl divergence',
  proname => 'kld', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'int8 int8',
  prosrc => 'aggregate_dummy' },
{ oid => '5120', descr => 'calculate kl divergence',
  proname => 'kld', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'text text',
  prosrc => 'aggregate_dummy' },
{ oid => '5121', descr => 'aggregate final function',
  proname => 'jsd_final', prorettype => 'float8',
  proargtypes => 'internal', prosrc => 'jsd_final' },
{ oid => '5122', descr => 'calculate js divergence',
  proname => 'jsd', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'int8 int8',
  prosrc => 'aggregate_dummy' },
{ oid => '5123', descr => 'calculate js divergence',
  proname => 'jsd', prokind => 'a', proisstrict => 'f',
  prorettype => 'float8', proargtypes => 'text text',
  prosrc => 'aggregate_dummy' }

这里特别说明一下,internal代表所有非标准的自定义的类型,比如我们自己定义的struct或者class,oid是PostgreSQL数据库中的一个唯一标识码,只需要保证没有重复使用即可。要了解上面每一项的含义,可以看【文档】中对pg_proc的描述。

  1. {PGPATH}/src/include/catalog/pg_aggregate.adt

修改pg_aggregate.adt的原因同上,这个表存储了聚合函数,如果只是普通函数不用修改这里,但聚合函数需要修改。

结构和上面的pg_proc.adt很像,具体含义参见【文档】中对pg_aggregate的描述。我们在列表末尾添加如下内容:

# entropy
{ aggfnoid => 'entropy(int8)', aggtransfn => 'int_entropy_accum', 
aggfinalfn => 'entropy_final', aggtranstype => 'internal', aggcombinefn => 'entropy_combine',
aggmtransfn => 'int_entropy_accum', aggmfinalfn => 'entropy_final', aggmtranstype => 'internal'},
{ aggfnoid => 'entropy(text)', aggtransfn => 'text_entropy_accum', 
aggfinalfn => 'entropy_final', aggtranstype => 'internal', aggcombinefn => 'entropy_combine',
aggmtransfn => 'text_entropy_accum', aggmfinalfn => 'entropy_final', aggmtranstype => 'internal'},
# cross_entropy
{ aggfnoid => 'cross_entropy(int8,int8)', aggtransfn => 'cross_int_entropy_accum', 
aggfinalfn => 'cross_entropy_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_int_entropy_accum', aggmfinalfn => 'cross_entropy_final', aggmtranstype => 'internal'},
{ aggfnoid => 'cross_entropy(text,text)', aggtransfn => 'cross_text_entropy_accum', 
aggfinalfn => 'cross_entropy_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_text_entropy_accum', aggmfinalfn => 'cross_entropy_final', aggmtranstype => 'internal'},
# kld
{ aggfnoid => 'kld(int8,int8)', aggtransfn => 'cross_int_entropy_accum', 
aggfinalfn => 'kld_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_int_entropy_accum', aggmfinalfn => 'kld_final', aggmtranstype => 'internal'},
{ aggfnoid => 'kld(text,text)', aggtransfn => 'cross_text_entropy_accum', 
aggfinalfn => 'kld_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_text_entropy_accum', aggmfinalfn => 'kld_final', aggmtranstype => 'internal'},
{ aggfnoid => 'jsd(int8,int8)', aggtransfn => 'cross_int_entropy_accum', 
aggfinalfn => 'jsd_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_int_entropy_accum', aggmfinalfn => 'jsd_final', aggmtranstype => 'internal'},
{ aggfnoid => 'jsd(text,text)', aggtransfn => 'cross_text_entropy_accum', 
aggfinalfn => 'jsd_final', aggtranstype => 'internal', aggcombinefn => 'cross_entropy_combine',
aggmtransfn => 'cross_text_entropy_accum', aggmfinalfn => 'jsd_final', aggmtranstype => 'internal'}
  1. {PGPATH}/src/backend/utils/adt/Makefile

Makefile是定义一个工程如何编译链接的文件。修改这个文件是因为要把新增的一个.c文件编译出来的.o文件加入到链接的列表之中,不然我们的修改不会生效。PostgreSQL的设计艺术之一就是,各种模块很多且之间互相具有比较高的独立性,我们如果修改了一个文件,可以单独重新编译.o文件,只用重新链接就可以生成数据库,而无需编译其他文件。

这个文件改动很少,只需要在OBJS的列表中加入entropy.o即可。建议按字母序添加到相应的位置,这样和PG原有风格保持一致。

  1. {PGPATH}/src/tools/pgindent/typedefs.list

这个文件定义了所有内建的数据类型的名称,我们需要把自己定义的几个类型加进去,这个目录下有pgindent等脚本会自动执行把这些类型名称写入内建类型。

这个文件改动同样很少,只需要写入两行,分别是我们自己新定义的两个结构体。建议按字母序添加到相应的位置,这样和PG原有风格保持一致。

ItemPs
TwoItemPs

五、重新编译运行

简单起见,可以直接全部重新编译一次,执行默认的编译命令即可,速度也是很快的。
过程是和第一次安装时候一样(见这篇
博客),首先通过visual studio的交叉编译工具,进入{PGPATH}/src/tools/msvc下。
执行下面命令,E:\pgdb是我指定的安装目录。

build DEBUG
vcregress check
install E:\pgdb

然后切换到安装目录的bin文件夹下,启动数据库:

initdb -D ../data
pg_ctl start -l logfile -D ../data
psql -d postgres

随便建立几个表:

CREATE TABLE public.u1 (
    n1 integer,
    n2 integer,
    n3 character varying(10),
    n4 character varying(20)
CREATE TABLE public.u2 (
    n1 integer,
    n2 integer,
CREATE TABLE public.u3 (
    n1 integer,
    n2 integer,
    n3 character varying(10),

随便插入一些数据:

insert into u1 values (1,2,'a','b'),(0,2,'s','b'),
	(2,1,'s','b'),(0,1,'b','s'),(2,0,'b','a');
insert into u3 values (0,1,'a'),(0,1,'a'),(1,1,'a'),
(1,0,'a'),(0,0,'a'),(0,1,'a'),(0,1,'a'),(0,1,'a'),
(0,1,'a'),(0,1,'b'),(0,0,'b'),(1,0,'b'),(0,0,'a'),
(0,0,'b'),(0,1,'a'),(0,1,'a'),(0,1,'a'),(0,1,'a'),
(0,1,'b');

试一下我们添加的几个函数是否成功:

select entropy(n1) from u3;
select n3, cross_entropy(n1, n2) from u3 group by n3;
select n3, kld(n1, n2), kld(n2, n1) from u3 group by n3;
select n3, jsd(n1, n2), jsd(n2, n1) from u3 group by n3;

在这里插入图片描述
如果看到这一步,就算是我们最终大功告成,可以开心地随意添加你想添加的函数进入PostgreSQL的内核中了。

PostgreSQL的源码中定义了很多内建的函数(build-in function),这些函数是在数据库启动时就可以使用的,一般是一些比较重要和常用的函数。作为内核学习的第一步,我选择了自己实现一个内建的聚合函数,并重新编译运行数据库。我们的目标是增加一些聚合函数,功能为计算信息熵相关的值(很有DB for AI)的感觉了。
pg中创建函数时可以指定权限检测,有两个可选参数:SECURITY INVOKER和SECURITY DEFINER。 SECURITY INVOKER:表示要用调用该函数的用户的特权来执行它。这是默认值。 SECURITY DEFINER:指定要用拥有该函数的用户的特权来执行该函数。 比如我们使用普通用户创建了一个调用者权限的函数,那么当超级用户调用这个函数时,就会以超级用户的权限来执行,这就会导致很大的安全隐患。 我们现在数据库中有两个用户:超级用户postgres和普通用户bill。 接着我们用普通用户创建一个表,并在这个普通用户的表上创建触发器。当超级用户操作这个表,并触发了触
平常工作中,有时需要远端连接 PostgreSQL 数据库做些维护,例如远端备份等;如果备份脚本写在远端机器,备份的时候会弹出密码输入提示,那么脚本就不能后台执行,这里总结了几种不弹出密码输入提示的方法。 --测试环境 目标库IP:  192.168.1.25/5432 ; 数据库: Mydb ;用户名:postgres 客户端IP:  192.168.1.26 --在 192.168
FunctionCallInfoData是实际传给C Language function函数的数据,它默认是隐藏的,实际上每一个C Language function函数默认都有一个FunctionCallInfoData.它的定义如下:typedef struct FunctionCallInfoData FmgrInfo *flinfo; /*函数描述指针*/
PostgreSQL 10 - 自定义聚合函数 比如,想解决一个很简单的问题:比如客户打车要付钱,假如3公里内13元,以后每公里需要付2.30元。那一共付多少钱呢? 当然,不用自定义聚合也能解决。不过,看看我怎么做。首先,增加测试数据: postgres=# CREATE TABLE t_taxi (c_id int, km numeric); CREATE TABLE postgres=# IN...
2020年5月2日新增: 前天,某位算法大佬提醒我,信息熵代表的是信息的混乱程度,这成了我继续将信息熵运用在经济活动解释当中的一个理解bug,直到昨天我带着老婆去了一趟合肥。老婆是一个好奇宝宝的好处就是,总是会问为什么,????。 还是那句话,一个好问题,远比好答案更有价值。 我想到,其实最基础的信息熵就可以很好的解释很多经济行为当中描述稀缺性和差异性以及规模之间的价值和风险。 关于最近某p2p公司的失...

三、修改builtins.h文件

这四个函数的参数可以允许多种类型,目前我们设计允许int8(即一字节大小的整型数字)、text(PostgreSQL中的特殊类型,用来存储字符串),text底层实现就是一个包含长度信息和一个柔性数组的结构体(柔性数组的讲解看 这里 )。这一来,一共要实现8个聚合函数,分别为:

1.4 js_divergence