Redis数据结构-整数集合

"Redis源码学习"

Posted by Simon on July 18, 2020

“Better code, better life. ”

Redis数据结构–整数集合

虽然内部数据结构非常强大, 但是创建一系列完整的数据结构本身也是一件相当耗费内存的工作, 当一个对象包含的元素数量并不多, 或者元素本身的体积并不大时, 使用代价高昂的内部数据结构并不是最好的办法。

为了解决这一问题, Redis 在条件允许的情况下, 会使用内存映射数据结构来代替内部数据结构。

内存映射数据结构是一系列经过特殊编码的字节序列, 创建它们所消耗的内存通常比作用类似的内部数据结构要少得多, 如果使用得当, 内存映射数据结构可以为用户节省大量的内存。

不过, 因为内存映射数据结构的编码和操作方式要比内部数据结构要复杂得多, 所以内存映射数据结构所占用的 CPU 时间会比作用类似的内部数据结构要多。

整数集合是Redis内存映射数据结构的一种,它用于有序、无重复地保存多个整数值, 根据元素的值, 自动选择该用什么长度的整数类型来保存元素。

举个例子, 如果在一个 intset 里面, 最长的元素可以用 int16_t 类型来保存, 那么这个 intset 的所有元素都以 int16_t 类型来保存。

另一方面, 如果有一个新元素要加入到这个intset , 并且这个元素不能用 int16_t 类型来保存 —— 比如说, 新元素的长度为 int32_t , 那么这个 intset 就会自动进行“升级”: 先将集合中现有的所有元素从 int16_t 类型转换为 int32_t 类型, 接着再将新元素加入到集合中。

根据需要, intset 可以自动从 int16_t 升级到 int32_tint64_t , 或者从 int32_t 升级到 int64_t

源码实现

整数集合的源码定义在src/intset.h中,其结构如下:

typedef struct intset {
    uint32_t encoding;
    uint32_t length;
    int8_t contents[];
} intset;

encoding 的值可以是以下三个常量之一(定义位于 intset.c ):

#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

contents 数组是实际保存元素的地方,数组中的元素有以下两个特性:

  • 元素不重复;
  • 元素在数组中由小到大排列;

contents 数组的 int8_t 类型声明比较容易让人误解,实际上, intset 并不使用 int8_t 类型来保存任何元素,结构中的这个类型声明只是作为一个占位符使用:在对 contents 中的元素进行读取或者写入时,程序并不是直接使用 contents 来对元素进行索引,而是根据 encoding 的值,对 contents 进行类型转换和指针运算,计算出元素在内存中的正确位置。在添加新元素,进行内存分配时,分配的空间也是由 encoding 的值决定。

下表列出了处理 intset 的一些主要操作,以及这些操作的算法复杂度:

操作 函数 复杂度
创建 intsetNew θ(1)
删除
添加新元素(不升级) intsetAdd O(N)
添加新元素(升级) intsetUpgradeAndAdd O(N)
按索引获取元素 _intsetGet θ(1)
按索引设置元素 _intsetSet θ(1)
查找元素,返回索引 intsetSearch O(lgN)
删除元素 intsetRemove O(N)

创建

/* Create an empty intset. */
intset *intsetNew(void) {
    intset *is = zmalloc(sizeof(intset));
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    is->length = 0;
    return is;
}

/* Toggle the 32 bit unsigned integer pointed by *p from little endian to
 * big endian */
void memrev32(void *p) {
    unsigned char *x = p, t;

    t = x[0];
    x[0] = x[3];
    x[3] = t;
    t = x[1];
    x[1] = x[2];
    x[2] = t;
}

创建函数先申请了一个intset大小的地址,默认的encoding为INTSET_ENC_INT16intrev32ifbe方法是redis定义的用来处理大小端字节序的。

添加元素

static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
        return INTSET_ENC_INT64;
    else if (v < INT16_MIN || v > INT16_MAX)
        return INTSET_ENC_INT32;
    else
        return INTSET_ENC_INT16;
}


/* Search for the position of "value". Return 1 when the value was found and
 * sets "pos" to the position of the value within the intset. Return 0 when
 * the value is not present in the intset and sets "pos" to the position
 * where "value" can be inserted. */
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */
        if (value > _intsetGet(is,max)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }

    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}


/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    /* Upgrade encoding if necessary. If we need to upgrade, we know that
     * this value should be either appended (if > 0) or prepended (if < 0),
     * because it lies outside the range of existing values. */
    if (valenc > intrev32ifbe(is->encoding)) {
        /* This always succeeds, so we don't need to curry *success. */
        return intsetUpgradeAndAdd(is,value);
    } else {
        /* Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found. */
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }

        is = intsetResize(is,intrev32ifbe(is->length)+1);
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }

    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}


添加新元素到 intset 的工作由 intset.c/intsetAdd 函数完成,需要处理以下三种情况:

  1. 元素已存在于集合,不做动作;
  2. 元素不存在于集合,并且添加新元素并不需要升级;
  3. 元素不存在于集合,但是要在升级之后,才能添加新元素;

并且, intsetAdd 需要维持 intset->contents 的以下性质:

  1. 确保数组中没有重复元素;
  2. 确保数组中的元素按由小到大排序;

由于intset是有序的,所以对于在instset查找某个元素,intsetSearch先用比较当前intset中最大最小的元素,然后再用二分查找。

新元素的位置确认后,要先将该位置之后的元素往后移一位,intsetMoveTail方法底层是memmove

升级


/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    uint8_t curenc = intrev32ifbe(is->encoding);
    uint8_t newenc = _intsetValueEncoding(value);
    int length = intrev32ifbe(is->length);
    int prepend = value < 0 ? 1 : 0;

    /* First set new encoding and resize */
    is->encoding = intrev32ifbe(newenc);
    is = intsetResize(is,intrev32ifbe(is->length)+1);

    /* Upgrade back-to-front so we don't overwrite values.
     * Note that the "prepend" variable is used to make sure we have an empty
     * space at either the beginning or the end of the intset. */
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    /* Set the value at the beginning or the end. */
    if (prepend)
        _intsetSet(is,0,value);
    else
        _intsetSet(is,intrev32ifbe(is->length),value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

关于升级操作,有两点需要提醒一下:

  • 第一,从较短整数到较长整数的转换,并不会更改元素里面的值。

    在 C 语言中,从长度较短的带符号整数到长度较长的带符号整数之间的转换(比如从 int16_t 转换为 int32_t )总是可行的(不会溢出)、无损的。

    另一方面,从较长整数到较短整数之间的转换,可能是有损的(比如从 int32_t 转换为 int16_t )。

    因为 intset 只进行从较短整数到较长整数的转换(也即是,只“升级”,不“降级”),因此,“升级”操作并不会修改元素原有的值。

  • 第二,集合编码元素的方式,由元素中长度最大的那个值来决定。

    就像前面演示的例子一样, 当要将一个 int32_t 编码的新元素添加到集合时, 集合原有的所有 int16_t 编码的元素, 都必须转换为 int32_t

    尽管这个集合真正需要用 int32_t 长度来保存的元素只有一个, 但整个集合的所有元素都必须转换为这种类型。

在进行升级的过程中,需要对数组内的元素进行“类型转换”和“移动”操作。

其中, 移动不仅出现在升级(intsetUpgradeAndAdd)操作中, 还出现其他对 contents 数组内容进行增删的操作上, 比如 intsetAddintsetRemove , 因为这种移动操作需要处理 intset 中的所有元素, 所以这些函数的复杂度都不低于 O(N) 。

其他操作

读取

有两种方式读取 intset 的元素,一种是 _intsetGet ,另一种是 intsetSearch

  • _intsetGet 接受一个给定的索引 pos ,并根据 intset->encoding 的值进行指针运算,计算出给定索引在 intset->contents 数组上的值。
  • intsetSearch 则使用二分查找算法,判断一个给定元素在 contents 数组上的索引。
写入

除了前面介绍过的 intsetAddintsetUpgradeAndAdd 之外, _intsetSet 也对集合进行写入操作: 它接受一个索引 pos ,以及一个 new_value ,将 contents 数组 pos 位置的值设为 new_value

删除

删除单个元素的工作由 intsetRemove 操作, 它先调用 intsetSearch 找到需要被删除的元素在 contents 数组中的索引, 然后使用内存移位操作,将目标元素从内存中抹去, 最后,通过内存重分配,对 contents 数组的长度进行调整。

总结

  • Intset 用于有序、无重复地保存多个整数值,会根据元素的值,自动选择该用什么长度的整数类型来保存元素。
  • 当一个位长度更长的整数值添加到 intset 时,需要对 intset 进行升级,新 intset 中每个元素的位长度,会等于新添加值的位长度,但原有元素的值不变。
  • 升级会引起整个 intset 进行内存重分配,并移动集合中的所有元素,这个操作的复杂度为 O(N) 。
  • Intset 只支持升级,不支持降级。
  • Intset 是有序的,程序使用二分查找算法来实现查找操作,复杂度为 O(lgN)。