0%

入手了比亚乔 x7 ,主要通勤用,简单记录相关的事项,供有需要的读者参考。后续可能会随着使用不断更新。

需求

基本需求

  1. 通勤,摩托车路线,单程 30km,来回 60km。位于北京,京 B 不能进入四环(包括四环主路)。
  2. 周末买菜,去周边走走。远距离就考虑开车了。
  3. 家人建议买踏板。

车型定为踏板。另外通勤距离较长,不想经常加油,需要油箱大一点的,最终选择了比亚乔 x7。

通勤成本

比亚乔 x7,油箱 12.1L,城市通行平均油耗约 3.5L/100km,95 号汽油约 7.5 元/L,假设工作日平均每天 65km,节假日平均每天 50km,

  1. 成本:(65*20+50*8)/100*3.5*7.5=446.25 元
  2. 续航:12.1/3.5=345km,345-65*5=20km,基本足够工作日通勤。

驾照

增驾的时候可选 E(两轮)或 D(三轮),我选择的是 D,增驾完成后两轮和三轮都能骑,驾校学习+考试总共约 1900 元。

在北京学摩托相对严格些,科目一和四需要刷视频课时,然而一般就是点开放着,基本没有人会正儿八经看完;科目二和三练车也需要刷够课时,主要的时间就花在这个上面了。最终考试的时候,车上配有雷达和传感器,检测必要的身体动作和车的位置。这样学习和考核难度其实已经很低了,不过我还看到有的地方能一天拿本,实在是无力吐嘈。

参考日本的摩托驾照,我觉得国内 E 和 D 的划分明显是不合理的。就两轮摩托而言,骑行大排量和小踏板需要的技能是有不少区别的,刚拿到拿到 D 照的第三天就看到新手骑大排上高速 gg 的新闻。

装备

  1. 头盔
  2. 骑行服,手套,护具
    按季节不同,分不同款式,购买带护具的会方便些。
    • 春夏秋
      我目前的搭配是骑行服+护膝+手套。没有骑行裤,需要一个雨衣裤子来应对下雨天气。

    • 暂定为骑行服+骑行裤+手套。冬季雨雪天气道路结冰不骑车。
item detial product
头盔 需要考虑头围和头型 LS2 396(无法放入坐桶,考虑换 SHOEI GT AIR 2),摩雷士 R50S
骑行服 防水,带护具 NERVE 春夏款
骑行裤   -
手套 防水,带护具 NERVE 春夏款
护具 骑行服有护具,但是后座带人时需要 摩多狼护膝
雨衣   蓝狮雨衣

改装

改装主要是围绕安全性和功能性。下面列出了计划要做的改装,实际不一定都改,看实际使用情况。

査车主要针对的是*外观和噪音*,但是要注意,查得严的时候,*尾箱、保险杠、手机支架、小黄鸭装饰等都算作非法改装*。

item detial product
车锁 U 型锁,链锁,碟刹锁,品牌:abus,xena,top dog,kovix XENA XSU-170 U 型锁,kovix KVX 14mm 碟剎锁
车衣   淘宝车衣
运动相机 正好表弟有闲置的 Insta360 影石 ONE R 双镜头版
行车记录仪 喜郎 MX901,HFK-HM501,其实没有必要带屏幕 喜郎 MX901,外部走线,保险盒在坐桶下
补胎工具   -
轮胎 增强湿地抓地力,倍耐力天使,米其林 city -
轮胎打气工具   小米充气泵 1s
胎压检测仪 KEVTU T3,建议迷你传感器,重量对轮胎旋转有影响 -
尾箱 品牌:loboo,shad SHAD 48L(实际使用感觉太大,考虑换 39L)
尾箱内衬   淘宝尾箱内衬
尾箱架   普通尾箱架
尾箱支撑杆   -
坐垫套 内装一个垫子 -
坐桶内衬 行车记录仪放在坐桶,正好把多余的线放在内衬下面 淘宝坐桶内衬
坐桶隔热   -
坐桶橡胶垫螺丝垫片 内径 1cm,外径 2cm -
保险杠 倒了还得跑一趟车行修车 群尾保险杠
挡泥板延长   -
大防滑边撑   -
高风挡   -
合金折叠挂钩   -
合金牛角 原厂牛角太高,左侧牛角倒车变形 淘宝牛角
传动盖子 淘宝有塑料模仿镀铬的(可惜不是 Piaggio,是 Vespa) -
防盗机油尺   -
排气管防烫板   -
刷 ecu 调节风扇启动温度,从 5 格启动变为 4 格启动 -
散热开口   -
油箱盖透气阀 加油时,油箱压力大,喷气 淘宝透气阀,自己改装
单向阀   -
加强版水箱 点检可以免费更换 -
压力轴承   -
前避震 太软,换弹簧+避震油 -
防冻液 换沸点高的 -

车牌

京 A

目前只能买到,市场价大概 30w。

京 B

办理限制

  1. 不可上牌的户口或居住证:东城、西城、海淀、朝阳、丰台、石景山。
  2. 可上牌的户口或居住证:通州、昌平、房山、门头沟、大兴、顺义、怀柔、密云、延庆、平谷。

限行

2021 年 5 月 9 日起北京京 B 摩托车新规,

  1. 长安街及其延长线(五棵松桥至四惠桥)、广场东侧路、广场西侧路、人大会堂西路、府右街、南长街、北长街、南池子大街、北池子大街。
  2. 西四东大街、西安门大街(西四南大街至西什库大街)、西黄城根北街、西黄城根南街。
  3. 四环路(不含辅路)以内道路全天禁止京 B 号牌摩托车行驶。
  4. 门头沟,234 国道南辛房路口至王平路口段、南赵路、天门山路、瓜草地路、南辛房村中街、焦岭路、赵家台路、妙峰山路、南樱路、禅涧路、玫瑰园路、南东路、上苇甸路、果园环路、上果路、炭厂大街、上苇甸环路、大禅路、大琅路全天禁止摩托车通行。

违反摩托车禁限行规定,处 100 元罚款,记 3 分。

详细可参考限行政策

上牌流程

  1. 可交给车行代办。
  2. 自己上牌可以参考小白京B私户摩托车上牌历程北京摩托车京B牌照上牌流程-20210406

保险

  1. 交强险
  2. 三者
  3. 人身意外伤害
  4. 车损险:不确定是否需要,暂时没有买
  5. 盗抢险:不确定是否需要,暂时没有买

其他

  1. 凯励程:不确定是否需要,暂时没有买
  2. 摩托助手:救援

维护和保养

车行

哪购买就去哪做保养,不过也有一些口碑不错的车行(北京可以参考最差车行排行)。

  1. 老李车行
  2. 两轮族摩托车行

维护表

item desc
保养 磨合期:500km、1000km、2000km、3000km 换机油;后续每 2000km ~ 3000km 换机油、机滤、空滤
前 1000km 不超过 max speed 80%
汽油 95 号或以上
前轮胎压(带乘客时) 2 bar(2 bar)
后轮胎压(带乘客时) 2.2 bar(2.5 bar)
空气过滤器 肥皂水清洗,汽油+空气过滤海绵油浸泡后直接装回

img

img

img

img

总结

  1. 裸车 19980,预期全部办完(包括装备 25000)
  2. 实际花费高于预期,落地 23618,装备和改装全部加起来大概 7000
  3. 改装其实花了不少钱和精力,那直接买 3w 的踏板是否会避免这部分开销呢?我感觉明显不可能。
  4. 骑行感受和通勤体验以后更新

P.S.
和老爸交流了以下,原来他以前也骑摩托车,对机械的喜爱是有传承的啊。

Thread

Thread Model

  1. LinuxThreads
    • Default thread implementation since Linux kernel 2.0
    • Obsolete
  2. Next Generation POSIX Thread (NGPT)
    • IBM developed version of POSIX thread library
    • Abandoned
  3. Native POSIX Thread Library(NPTL)
    • Developed by RedHat
    • Better performance and scalability than LinuxThreads
    • Since Linux Kernel 2.6, glibc 2.3.5

Memory Layout

  • Heap:被所有线程共享。
  • Stack:每个线程都有自己独立的 stack。

Thread Stack

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
low  ┌──────────┐
│... │ ─┐ ─┐
├──────────┤ │ │
│list │ │ │
├──────────┤ ├─►pthread struct │
│tid │ │ │
├──────────┤ │ │
│... │ ─┘ │
├──────────┤ ├─►thread 1
│ │ │
│stack │ │
│ │ ─┘
├──────────┤
│ │ ───►stack guard(4k)
├──────────┤
│... │ ─┐
├──────────┤ │
│list │ │
├──────────┤ │
│tid │ │
├──────────┤ │
│... │ │
├──────────┤ ├─►thread 2
│ │ │
│stack │ │
│ │ ─┘
high └──────────┘

Allocate

  1. Implementation

    线程栈分配逻辑如下(基于 glibc 2.29.9000),

    1. __pthread_create_2_1 创建 thread 时,调用 ALLOCATE_STACK 宏创建 Thread stack。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      int
      __pthread_create_2_1 (pthread_t *newthread, const pthread_attr_t *attr,
      void *(*start_routine) (void *), void *arg)
      {
      // ...
      struct pthread *pd = NULL;
      int err = ALLOCATE_STACK (iattr, &pd);
      int retval = 0;
      // ...
      }
    2. ALLOCATE_STACK 宏调用 allcate_stack 来执行具体的创建逻辑。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      static int
      allocate_stack (const struct pthread_attr *attr, struct pthread **pdp,
      ALLOCATE_STACK_PARMS)
      {
      // ...
      // 从 cache 中查找空闲的栈内存
      pd = get_cached_stack (&size, &mem);
      // 没有空闲的则 mmap 创建
      if (pd == NULL)
      {
      // ...
      mem = __mmap (NULL, size, (guardsize == 0) ? prot : PROT_NONE,
      MAP_PRIVATE | MAP_ANONYMOUS | MAP_STACK, -1, 0);
      // ...

      /* Allocate the DTV for this thread. */
      _dl_allocate_tls (TLS_TPADJ (pd));
      // ...

      /* Prepare to modify global data. */
      lll_lock (stack_cache_lock, LLL_PRIVATE);
      /* And add to the list of stacks in use. */
      // 将 stack 内存挂到 stack_used list 中
      stack_list_add (&pd->list, &stack_used);
      lll_unlock (stack_cache_lock, LLL_PRIVATE);

      // ...
      }
      // ...
      // 更新全局栈内存指针
      *stack = pd->stackblock;
      *stacksize = stacktop - *stack;
      // ...
      }

Thread-Local Storage

Thread-local storage(TLS)提供了一种机制,使得每个 thread 都拥有一份变量的实例,对 TLS 中变量进行的修改仅对当前 thread 可见。

Common Implementation

  1. pthread:thread-specific data (TSD)
  2. ELF(Executable and Linkable Format) TLS
    • gcc: __thread
    • c11: _Thread_local
    • c++11: thread_local

Example Usages

  1. adext ThreadData 主要用于存储 iconf ctx,确保每个 thread 使用自己的 ctx 来 merge cgi 下发的实验配置。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    static __thread ThreadData* s_td = NULL;

    class ThreadData {
    public:
    ThreadData();
    ~ThreadData();

    int init();

    void reset();

    int set_request_info(int bucketid,bs::RankFrontendResp *admergeResult);

    ::iconf::Context* get_context() {
    return _ctx;
    }
    private:
    ::iconf::Context* _ctx;
    };
  2. jemalloc 使用 TLS 来保存每个线程特有的数据,避免锁竞争。

    1
    2
    3
    4
    5
    #define	malloc_tsd_data(a_attr, a_name, a_type, a_initializer)		\
    a_attr __thread a_type JEMALLOC_TLS_MODEL \
    a_name##tsd_tls = a_initializer; \
    a_attr pthread_key_t a_name##tsd_tsd; \
    a_attr bool a_name##tsd_booted = false;
  3. POSIX errno

    errno is thread-local; setting it in one thread does not affect its value in any other thread.

Implementation

pthread:thread-specific data (TSD)

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <cassert>
#include <iostream>
#include <pthread.h>

pthread_key_t tsd_key;

void free_memory(void *arg) {
int *p = reinterpret_cast<int *>(arg);
delete p;
}

void *run(void *arg) {
int *d = reinterpret_cast<int *>(arg);
void *p = pthread_getspecific(tsd_key);
assert(p == nullptr);
p = new int(*d);

int *pa = reinterpret_cast<int *>(p);
*pa += 5;

int err = pthread_setspecific(tsd_key, p);
assert(err == 0);

pthread_t tid = pthread_self();
p = pthread_getspecific(tsd_key);
int *a = reinterpret_cast<int *>(p);

std::cout << *a << " in thread " << tid << std::endl;

return nullptr;
}

int main(int argc, char *argv[]) {
int err = pthread_key_create(&tsd_key, free_memory);
assert(err == 0);

pthread_t tid[2];

int data[] = {10, 20};

pthread_create(&tid[0], nullptr, run, &data[0]);
pthread_create(&tid[1], nullptr, run, &data[1]);

pthread_join(tid[0], nullptr);
pthread_join(tid[1], nullptr);

err = pthread_key_delete(tsd_key);
assert(err == 0);
return 0;
}

// output
// 25 in thread 139672264689408
// 15 in thread 139672273082112

Memory Layout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 low ┌─────────────────┐
│header │ ───►tcbhead_t struct ─┐
├─────────────────┤ │
│list │ │ ─┐
├─────────────────┤ │ │
│tid │ │ │
├─────────────────┤ ├─►pthread struct │
│specific_1stblock│ │ │
├─────────────────┤ │ │
│specific │ │ │
├─────────────────┤ │ ├─►thread 1
│... │ ─┘ │
├─────────────────┤ │
│ │ │
│stack │ │
│ │ ─┘
├─────────────────┤
│ │ ───►stack guard(4k)
├─────────────────┤
│header │ ─┐
├─────────────────┤ │
│list │ │
├─────────────────┤ │
│tid │ │
├─────────────────┤ │
│... │ │
├─────────────────┤ ├─►thread 2
│ │ │
│stack │ │
│ │ ─┘
high └─────────────────┘

Implementation

  1. 全局分配唯一 key
  2. 各个 thread 存储自己的 key-value map

以下分析基于 glibc 2.29.9000。

  1. 如何访问 Thread Stack

    TSD 是存储在各个线程的内存中,要访问 TSD,就需要一种方法来访问线程栈。

    glibc 通过调用 clone 实现创建线程。如果提供了 CLONE_SETTLS ,那么会将 tls 存入 fs 寄存器。

    1
    2
    int clone(int (*fn)(void *), void *stack, int flags, void *arg, ...
    /* pid_t *parent_tid, void *tls, pid_t *child_tid */ );

    在调用 clone 时, tlsstruct pthread 的地址。从而可以通过 fs 寄存器来访问到线程栈。

    1
    2
    3
    4
    TLS_DEFINE_INIT_TP (tp, pd);
    if (__glibc_unlikely (ARCH_CLONE (&start_thread, STACK_VARIABLES_ARGS,
    clone_flags, pd, &pd->tid, tp, &pd->tid)
    == -1))

    glibc,以及各种 TLS 实现中对线程栈的访问都是利用 fs 实现的。

    fs 寄存器可以通过如下方式获得:

    1. arch_prctl

      1
      arch_prctl(ARCH_GET_FS, &pthread_addr);
    2. asm %fs

      1
      2
      3
      4
      5
      6
      7
      8
      # define THREAD_SELF \
      ({ struct pthread *__self; \
      asm ("mov %%fs:%c1,%0" : "=r" (__self) \
      : "i" (offsetof (struct pthread, header.self))); \
      __self;})

      struct pthread *self;
      self = THREAD_SELF;
  2. Create

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #define PTHREAD_KEYS_MAX        1024

    typedef unsigned int pthread_key_t;

    /* Thread-local data handling. */
    struct pthread_key_struct
    {
    /* Sequence numbers. Even numbers indicated vacant entries. Note
    that zero is even. We use uintptr_t to not require padding on
    32- and 64-bit machines. On 64-bit machines it helps to avoid
    wrapping, too. */
    uintptr_t seq;
    /* Destructor for the data. */
    void (*destr) (void *);
    };

    struct pthread_key_struct __pthread_keys[PTHREAD_KEYS_MAX]

    在主线程中调用 pthread_key_create 完成 key 的创建。glibc 定义了全局数组 __pthread_keys 来管理 key,每个进程最多创建 1024 个 key。

    数组中每个元素的:

    1. seq 用于判断 key 是否被使用,为奇数则被使用

    2. destr 用于存储释放资源的函数指针

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      #define KEY_UNUSED(p) (((p) & 1) == 0)

      int
      __pthread_key_create (pthread_key_t *key, void (*destr) (void *))
      {
      /* Find a slot in __pthread_keys which is unused. */
      for (size_t cnt = 0; cnt < PTHREAD_KEYS_MAX; ++cnt)
      {
      // ...
      /* Remember the destructor. */
      __pthread_keys[cnt].destr = destr;
      /* Return the key to the caller. */
      *key = cnt;
      /* The call succeeded. */
      return 0;
      }
      // ...
      }
  3. Set

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    #define PTHREAD_KEY_2NDLEVEL_SIZE       32

    // 32
    #define PTHREAD_KEY_1STLEVEL_SIZE \
    ((PTHREAD_KEYS_MAX + PTHREAD_KEY_2NDLEVEL_SIZE - 1) \
    / PTHREAD_KEY_2NDLEVEL_SIZE)

    struct pthread_key_data
    {
    /* Sequence number. We use uintptr_t to not require padding on
    32- and 64-bit machines. On 64-bit machines it helps to avoid
    wrapping, too. */
    uintptr_t seq;
    /* Data pointer. */
    void *data;
    } specific_1stblock[PTHREAD_KEY_2NDLEVEL_SIZE];
    /* Two-level array for the thread-specific data. */
    struct pthread_key_data *specific[PTHREAD_KEY_1STLEVEL_SIZE];

    各个线程将 key-value 存储到线程栈中,

    1. 如果 key < 32,到存储到 specific_1stblock

    2. 如果 32 <= key < 1024,存储到二维数组 specific

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      int
      __pthread_setspecific (pthread_key_t key, const void *value)
      {
      // ...
      struct pthread *self;
      self = THREAD_SELF;
      /* Special case access to the first 2nd-level block. This is the
      usual case. */
      if (__glibc_likely (key < PTHREAD_KEY_2NDLEVEL_SIZE))
      {
      // ...
      level2 = &self->specific_1stblock[key];
      // ...
      }
      else
      {
      // ...
      idx1st = key / PTHREAD_KEY_2NDLEVEL_SIZE;
      idx2nd = key % PTHREAD_KEY_2NDLEVEL_SIZE;
      // ...
      /* This is the second level array. Allocate it if necessary. */
      level2 = THREAD_GETMEM_NC (self, specific, idx1st);
      if (level2 == NULL)
      {
      // 分配 specific 二维数组
      // ...
      }
      /* Pointer to the right array element. */
      level2 = &level2[idx2nd];
      }
      /* Store the data and the sequence number so that we can recognize
      stale data. */
      level2->seq = seq;
      level2->data = (void *) value;
      return 0;
      }
  4. Get

    根据 key 的取值范围,决定访问 specific_1stblock ,还是 specific

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    void *
    __pthread_getspecific (pthread_key_t key)
    {
    // ...
    if (__glibc_likely (key < PTHREAD_KEY_2NDLEVEL_SIZE))
    data = &THREAD_SELF->specific_1stblock[key];
    else
    {
    // ...
    unsigned int idx1st = key / PTHREAD_KEY_2NDLEVEL_SIZE;
    unsigned int idx2nd = key % PTHREAD_KEY_2NDLEVEL_SIZE;
    // ...
    data = &level2[idx2nd];
    }

    void *result = data->data;
    // ...
    return result;
    }
  5. Delete

    pthread_key_delete 会把 seq++,置为偶数,表示 key 为被使用,并不会真正的调用 destr 来进行资源的释放。

    当线程执行完毕时, __nptl_deallocate_tsd 调用每个 key 的 destr 执行清理。

ELF TLS

pthread TSD 全局分配 key,需要调用 glibc API 在运行时完成 TSD 的构造和使用,比较复杂,影响效率。因此就有了 ELF TLS。

ELF TLS 是编译器(gcc/clang)的扩展特性,其实现需要如下方面的支持:

  1. ELF 文件需要区分普通静态全局变量和 TLS 变量
  2. 程序启动时,动态链接器需要初始化 TLS 变量的段
  3. 线程运行时,需要为 TLS 分配专门的内存区域,以及动态计算 TLS 的地址

ELF TLS Section

普通 static 全局变量,根据是否初始化分别存储在 .data (已初始化)和 .bss (未初始化,初始化为 0),类似的 TLS 变量存储在 .tdata.tbss 段。

由于 TLS 是各个线程私有的数据,因此线程是不能直接访问 .tdata.tbss 段的。

运行时分配 TLS

  1. Memory Layout

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
     low ┌──────┬──────────┐
    │ │... │ ─┐ ─┐
    │ ├──────────┤ │ │
    │header│dtv │ │ │
    │ ├──────────┤ │ │
    │ │... │ │ │
    ├──────┴──────────┤ ├─►pthread struct
    list │ │ │
    ├─────────────────┤ │ │
    tid │ │ │
    ├─────────────────┤ │ ├─►thread 1
    │... │ │ │
    ├─────────────────┤ │ │
    │... │ ─┘ │
    ├─────────────────┤ │
    │ │ │
    stack │ │
    │ │ ─┘
    ├─────────────────┤
    │ │ ───►stack guard(4k)
    ├─────────────────┤
    header │ ─┐
    ├─────────────────┤ │
    list │ │
    ├─────────────────┤ │
    tid │ │
    ├─────────────────┤ │
    │... │ │
    ├─────────────────┤ ├─►thread 2
    │ │ │
    stack │ │
    │ │ ─┘
    high └─────────────────┘

    TLS 的内存布局用两种,对于 x86-64 是如下布局,

    img

    tp(thread pointer,也是前面提到的 fs 寄存器的值) 指向 TCB,也就是 pthread struct,TCB 包含了一个 dtv(dynamic thread vector)指针,指向线程的 dtv 数组。

    dtv 存储了 TLS block 地址,通过 module ID 和 TLS 变量的偏移量来实现 TLS 变量的寻址。对于 exec,module ID 是 1;动态库( dlopen )的 module ID 直到加载的时候才会分配。

    根据 TLS 变量在何处被定义,其存储分为,

    1. 来自 exec、以及 shared object: main 函数之前加载的 TLS 变量存储在 TCB 前面的 TLS block。dtv[1] 指向这个 TLS block。
    2. dlopen 加载的 shared object:TLS 变量在 TLS Blocks for Dynamically-Loaded Modules。
  2. Allocate

    程序启动时,动态链接器将 exec、shared object 映射到进程地址空间,并进行相关初始化,

    1. 可能会对这两个段进行重定位,并把重定位后的段保存为 TLS 初始化镜像,之后便不再改动
    2. 调用 _dl_determine_tlsoffset 获得每个 module 中 TLS 变量的 offset(在编译的时候决定),offset 表示变量在 TLS block 中的偏移量
    3. 调用 _dl_allocate_tls_storage 分配 TLS 和 dtv 的内存
    4. 调用 _dl_allocate_tls_init 完成 TLS 和 dtv 的初始化
  3. Implementation

    1. 前面提到线程启动时, ALLOCATE_STACK 宏调用 allcate_stack 来执行创建线程栈的逻辑,其中调用 _dl_allocate_tls 是进行 TLS 和 dtv 的分配以及初始化。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      static int
      allocate_stack (const struct pthread_attr *attr, struct pthread **pdp,
      ALLOCATE_STACK_PARMS)
      {
      // ...
      /* Allocate the DTV for this thread. */
      _dl_allocate_tls (TLS_TPADJ (pd));
      // ...
      }
    2. _dl_allocate_tls_init 将每个 module 的 TLS 初始化镜像复制到 TLS block。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      void *
      _dl_allocate_tls_init (void *result)
      {
      // ...
      while (1)
      {
      // ...
      dtv[map->l_tls_modid].pointer.val = TLS_DTV_UNALLOCATED;
      dtv[map->l_tls_modid].pointer.to_free = NULL;
      // ...
      dest = (char *) result - map->l_tls_offset;
      // ...

      /* Set up the DTV entry. The simplified __tls_get_addr that
      some platforms use in static programs requires it. */
      dtv[map->l_tls_modid].pointer.val = dest;
      /* Copy the initialization image and clear the BSS part. */
      memset (__mempcpy (dest, map->l_tls_initimage,
      map->l_tls_initimage_size), '\0',
      map->l_tls_blocksize - map->l_tls_initimage_size);
      }
      // ...
      }

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <stdio.h>
#include <pthread.h>
#include <asm/prctl.h>

static int __thread var_1 = 0;
static int __thread var_2 = 1;

void* func(void* arg) {
long pthread_addr = 0;
arch_prctl(ARCH_GET_FS, &pthread_addr);

var_1 = 1;
var_2 = 2;

printf(
"&pthread = %p, &var_1 = %p, off_var_1 = %d, &var_2 = %p, off_var_2 = "
"%d\n",
pthread_addr, &var_1, (long)&var_1 - pthread_addr, &var_2,
(long)&var_2 - pthread_addr);
return NULL;
}

int main() {
pthread_t tids[3];
int i = 0;

for (i = 0; i < 3; ++i) {
pthread_create(&tids[i], NULL, func, NULL);
}

for (i = 0; i < 3; ++i) {
pthread_join(tids[i], NULL);
}

getchar();
return 0;
}

// &pthread = 0x7ff58904d700, &var_1 = 0x7ff58904d6f8, off_var_1 = -8, &var_2 = 0x7ff58904d6fc, off_var_2 = -4
// &pthread = 0x7ff58804b700, &var_1 = 0x7ff58804b6f8, off_var_1 = -8, &var_2 = 0x7ff58804b6fc, off_var_2 = -4
// &pthread = 0x7ff58884c700, &var_1 = 0x7ff58884c6f8, off_var_1 = -8, &var_2 = 0x7ff58884c6fc, off_var_2 = -4

TLS Access Models

根据 TLS 变量定义的位置,以及访问的位置不同,有以下 4 中使用 TLS 变量的模型。具体采用哪种模式由编译器和链接器共同决定。

  1. General Dynamic

    最通用的访问模型,任何地方定义的 TLS 变量,在任何地方访问,都可以使用这个方式。其他模型都是加上各种限制条件以后的优化。

    可以用这个公式来表示 dtv[ti_module].pointer + ti_offset 。实际可以使用 __tls_get_addr 来获取 TLS 变量地址。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    void *
    __tls_get_addr (GET_ADDR_ARGS)
    {
    dtv_t *dtv = THREAD_DTV ();

    if (__glibc_unlikely (dtv[0].counter != GL(dl_tls_generation)))
    return update_get_addr (GET_ADDR_PARAM);

    void *p = dtv[GET_ADDR_MODULE].pointer.val;

    if (__glibc_unlikely (p == TLS_DTV_UNALLOCATED))
    return tls_get_addr_tail (GET_ADDR_PARAM, dtv, NULL);

    return (char *) p + GET_ADDR_OFFSET;
    }
  2. Local Dynamic

    Local Dynamic 是对 General Dynamic 的优化。如果 TLS 变量在同一个模块被定义和使用,则可以使用这个模型。

    TLS 变量的 offset 在编译时可以确定,因此可以减少 __tls_get_addr 的调用。

  3. Initial Exec

    如果可以 TLS 变量在程序启动时就已分配好,则采用此模型。

  4. Local Exec

    对 Local Dynamic 的优化,如果 TLS 变量在 exec 中定义,在 exec 中访问,则使用这个模型。

    通过 fs 寄存器 + 偏移量访问。这种是最简单,也是最高效的方式,和访问局部变量没有区别。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
      var_1 = 1;
    401194: 64 c7 04 25 fc ff ff movl $0x1,%fs:0xfffffffffffffffc
    40119b: ff 01 00 00 00
    4011a0: 64 48 8b 0c 25 00 00 mov %fs:0x0,%rcx
    4011a7: 00 00
    4011a9: 48 8d 91 fc ff ff ff lea -0x4(%rcx),%rdx
    var_2 = 2;
    4011b0: 64 c7 04 25 f8 ff ff movl $0x2,%fs:0xfffffffffffffff8
    4011b7: ff 02 00 00 00
    4011bc: 48 8d 89 f8 ff ff ff lea -0x8(%rcx),%rcx

gcc: __thread

pass

c11: _Thread_local

同 gcc __thread

c++11: thread_local

相对于 __threadthread_local 可以在首次使用时自动初始化,线程结束时自动析构。

Summary

  1. pthread:thread-specific data (TSD)
    • 全局分配 key
    • 线程栈中的 pthread struct 存储自己线程的 key-value
    • 每个进程的 TSD 个数有限, PTHREAD_KEYS_MAX
    • 需要调用 glibc API 完成 TSD 的构造和使用,逻辑复杂
  2. ELF TLS
    • 效率比 TSD 高
    • 使用方便

References

Thread

  1. Understanding the Memory Layout of Linux Executables
  2. Understanding Linux /proc/id/maps
  3. Glibc 线程资源分配与释放–—线程栈
  4. 虚拟内存[02] Linux 中的各种栈:进程栈 线程栈 内核栈 中断栈
  5. Threads
  6. source code of userspace/glibc/nptl/

TLS

  1. gcc Thread-Local Storage
  2. ELF Handling For Thread-Local Storage
  3. A Deep dive into (implicit) Thread Local Storage
  4. All about thread-local storage
  5. Thread-local storage
  6. stackoverflow How does the gcc `_thread` work?
  7. 线程私有存储

起因

notion 是个方便高效的工具,用起来很顺手,但是最近频繁的 Notion Incident 邮件报警看得我着实心烦,很担心哪天要用的时候服务挂了干着急。所以决定寻找 notion 的替代。

img

目标

对我来说,一个高效的任务管理系统需要有一下功能:

  1. 任务管理:管理 deadline、与他人共同工作的项目。
  2. 材料记录和收集:记录想法、临时的一些思路、可以灵活的添加链接、图片等。
  3. 保存和分享:持久化保存数据、不限于工具限制,便于导出分享。
  4. 定期 review:达成一个长期的目标需要定期 review 短期的 task,保证方向合理和根据达成情况适当调整目标。
  5. 易用:学习和维护成本低,用工具管理 task,而不是被工具牵制。

工具选择

基于前面描述的功能点,很多工具就被否定了,

  1. 各种 todo 管理 app,以及附带番茄钟计时器的 app,例如:PomotodoWunderlist。 过于简单。
  2. 专门的任务管理 app,例如:omnifocus。 不便于添加各种资料。
  3. notion:不便于导出,服务频繁出问题。

最终我选择了 emacs 的 org mode。因为我有 emacs 的使用经历,上手 org-mode 较为容易;其次是会简单的写和改 elisp 代码,配置起来难度不大。

  1. 简单但是比 markdown 强大的多的纯文本记录格式。
  2. task 支持状态、标签和时间,能方便的移动 task,便于调整 task 和 review。
  3. agenda view 过滤和查看 task。
  4. 通过 pandoc 实现丰富的导出功能,就算哪天不用 org mode,也不影响查阅内容。
  5. 添加图片确实不方便,但通过 org-download 也基本能实现拖拽添加。
  6. 华丽呼哨的定制,感觉这个才是从 notion 换到 emacs 的动力

img

我的 org-mode workflow

task 状态

参考别人的用法后,规划了如下几个 task 状态,含义如下:

  • TODO:收集到 inbox 中,但是还没有决定是否要做
  • NEXT:review TODO 的 task 以后,规划要做
  • PROG:正在进行
  • DONE:完成
  • HOLD:由于某些原因 task 暂停了
  • CANCELLED:取消不做了

状态的变更规则如下:

  1. TODO -> NEXT -> PROG -> DONE
  2. PROG -> HOLD
  3. PROG -> CANCELLED

除了上述状态,如果有的 task 有开始或截止时间,那么还可以加上 SCHEDULED 或者 DEADLINE。

daily highlight

经过长久的实践,发现不管怎么规划 task,一旦事情堆的太多,看着整页的 NEXT 就会不想做。为此每天开始都先定一下当天必须要完成的事情,叫做 daily highlight,不限于工作的任务,休息日的时候也可以是看一本书之类的。daily highlight 不要过多,2~3 件即可。上班的时候集中精力做完 daily highlight 以后就可以愉快的摸鱼了

目前使用 org mode 的时间还不长,上述的 workflow 可能还会随着时间不断更新。

导入 notion 文件

从 notion 导出的格式为 markdown,可以使用 pandoc 转为 org 格式。

pandoc -f markdown -t org markdown_file.md | sed -E "/^[[:space:]]+:/ d"

References

  1. A Guide to My Organizational Workflow: How to Streamline Your Life
  2. An Org-Mode Workflow for Task Management
  3. Org-mode Workflow: A Preview

有段时间没有更新了。最近因为一些原因,换了工作。

之前的工作内容偏dmp,离线处理,业务相关的东西一直关注的比较少。现在转向线上广告引擎,与业务结合比较紧密,有点不习惯,需要改变一下思考的方式了。

quicklist是由ziplist构成的双向链表。ziplist需要可能对内存进行复制,在长度较长的时候,性能不佳。quicklist存储多个小ziplist,对除headtail外的节点还进行了压缩,保证了push和pop性能的同时,又减少了内存的占用。

结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;

typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;

typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : 16; /* fill factor for individual nodes */
unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;

quicklistLZF:存储压缩后的ziplist

quicklist

  • fill:存放list-max-ziplist-size参数
    • fill取正值时,表示entry个数
    • 取负值时,表示大小
  • compress:存放list-compress-depth参数

push

push时,先检查本次push是否会超过限制大小,quicklist->fill,依次按照用户定义大小、默认个数(SIZE_SAFETY_LIMIT)、用户定义个数检查。

  • 如果没有超过,那么在head的ziplist头部添加entry。
  • 如果超过,那么创建新的ziplist,并在quicklist->head前插入新node。最后压缩原来的quicklist->head节点。

quicklistPushTail也是类似的,但是在ziplist的尾部添加entry,且插入后是压缩原来的quicklist->tail节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}

ziplist

结构

ziplist是一个压缩的双向链表,由一个特殊编码的连续内存块构成。如果没有特殊指定,所有字段都是以小端序来存储的。

1
2
3
4
5
ziplist:
<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>

entry:
<prevlen> <encoding> <entry-data>

ziplist:

  • uint32_t zlbytes:ziplist占据的内存大小,包括zlbytes字段。
  • uint32_t zltail:最后一个entry(非zlend)的offset,记录了尾节点距离起始地址的字节数,相当于尾指针的作用。
  • uint16_t zllen:entry的数目,如果元素数目大于2^16-2,那么值为2^16-1,只有通过遍历才能得知,有多少个元素。zlen不计入entry数目。
  • uint8_t zlend:特殊的entry,ziplist的结尾标记,值为0xFF

每个entry大小是不固定的:

  • prevlen:前一个entry的长度,用于从后往前遍历时计算prev entry的起始地址。长度小于254字节和大于等于254字节分别使用两种存储方式。

    value prevlen size
    < 254 0xbb 1字节
    >=254 0xFE xx xx xx xx 5字节

    当长度大于等于254时,0xFE代表prevlen值的类型,后面的4个字节才是长度。

    这样存储方式刚好可以与zlend0xFF区分开来,没有一个entry的开头会是0xFF

  • encoding:记录了entry-data数据类型和长度。

    前两个bit代表存储的是字节数组,还是整型。

    encoding size 值的类型
    00pppppp 1 byte 长度<=63的字节数组
    `01pppppp qqqqqqqq` 2 bytes(14 bits使用大端序存储)
    `10000000 qqqqqqqq rrrrrrrr
    11000000 1 byte int16_t
    11010000 1 byte int32_t
    11100000 1 byte int64_t
    11110000 1 byte 24 bit signed integer
    11111110 1 byte 8 bit signed integer
    1111xxxx 1 byte 没有content属性,4个bit存储[0, 12]的值
    11111111 1 byte zlend
  • entry-data:节点的值,字节数组或整形。

插入

ziplist使用了很多宏来实现,主要是进行类型转换和指针运算以便访问相关的字段。

插入分三个情况,
-w629

需要分别准备新entry的prevlenencoding,分配空间。

  1. prevlen
    先看插入位置是否在zlen之前,

    • 如果不在,p处的prevlen就是新entry的prevlen

    • 如果在,此时无法直接得知prevlen,需要看zlend前是否还有元素。若没有,则zipRawEntryLength(ptail)计算tail元素的长度。

      p不是zlend的时候,新entry的len会作为p处entry的prevlen,需要确保pprevlen空间足够。

      -w736

  2. encoding
    对于encoding,先会尝试编码为整型zipTryEncoding(s,slen,&value,&encoding)

    • 如果可以编码为整型,那么zipIntSize(encoding)得到entry-data的大小。
    • 如果无法合法的编码为整型,那么根据slen编码为字节数组zipStoreEntryEncoding(NULL,encoding,slen),entry-data的大小为slen
  3. 分配空间,
    计算出各字段所需的空间后,memmove()来完成空间的调整。要注意对源地址的处理,p-nextdiff

    • 如果nextdiff == 0,说明p处entry的prevlen,可以保存新entry的大小。

    • 如果nextdiff == 4,那么说明空间不够,此时p处entry的prevlen原大小为1 byte,从p往前数4 bytes,加起来的5 bytes作为新prevlen的存储,因此从p - 4处开始memmove()

      -w614

    • 如果nextdiff == -4,说明空间多出4 bytes,从p + 4的位置开始memmove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789; /* initialized to avoid warning. Using a value
that is easy to see if for some reason
we use it uninitialized. */
zlentry tail;

/* Find out prevlen for the entry that is inserted.
*
* entry开头的1 byte是不是0xFF
*
* 如果插入位置是在zlend处,需要判断zlend前是否有entry,
* 如果没有,那么新prevlen为0,
* 如果有,那么需要计算zl+zltail处entry的大小,将其设置为新entry的prevlen
*
*/
if (p[0] != ZIP_END) {
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else {
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) {
prevlen = zipRawEntryLength(ptail);
}
}

/* See if the entry can be encoded */
if (zipTryEncoding(s,slen,&value,&encoding)) {
/* 'encoding' is set to the appropriate integer encoding */
reqlen = zipIntSize(encoding);
} else {
/* 'encoding' is untouched, however zipStoreEntryEncoding will use the
* string length to figure out how to encode it. */
reqlen = slen;
}
/* We need space for both the length of the previous entry and
* the length of the payload. */
reqlen += zipStorePrevEntryLength(NULL,prevlen);
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

/* When the insert position is not equal to the tail, we need to
* make sure that the next entry can hold this entry's length in
* its prevlen field. */
int forcelarge = 0;
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}

/* Store offset because a realloc may change the address of zl. */
offset = p-zl;
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
p = zl+offset;

/* Apply memory move when necessary and update tail offset. */
if (p[0] != ZIP_END) {
/* Subtract one because of the ZIP_END bytes */
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

/* Encode this entry's raw length in the next entry. */
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);

/* Update offset for tail */
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

/* When the tail contains more than one entry, we need to take
* "nextdiff" in account as well. Otherwise, a change in the
* size of prevlen doesn't have an effect on the *tail* offset. */
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
/* This element will be the new tail. */
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}

/* When nextdiff != 0, the raw length of the next entry has changed, so
* we need to cascade the update throughout the ziplist */
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}

/* Write the entry */
p += zipStorePrevEntryLength(p,prevlen);
p += zipStoreEntryEncoding(p,encoding,slen);
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}

连锁更新

插入或删除时,可能导致后面entry存储的prevlen发生变化。理论上,扩张和缩小都会有,但redis有意忽略了缩小的情况,避免连续的插入导致频繁的扩容和缩小。

1
2
3
4
5
6
7
8
9
10
if (next.prevrawlensize > rawlensize) {
/* This would result in shrinking, which we want to avoid.
* So, set "rawlen" in the available bytes. */
zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
} else {
zipStorePrevEntryLength(p+rawlen,rawlen);
}

/* Stop here, as the raw length of "next" has not changed. */
break;

结构

intset存储了有序的整数集合。

1
2
3
4
5
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;

encoding决定如何解析contents,取值为,

1
2
3
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

字节序

不同于其他结构,intset在存储的时候考虑了字节序的问题,redis会使用小端序来存储intset的所有字段。目的是intset能够兼容不同字节序的cpu。

sds也有多字节的字段,为什么sds不做这个转换?

查找

由于contents[]是有序的,因此直接使用二分查找。

在执行二分查找前,先进行了特殊情况的判断,避免进行多余的搜索,

  • 如果数组长度为0,则直接返回。
  • 如果value大于末尾元素,或小于首部元素,也直接返回,并set插入位置pos

二分查找中使用了mid = ((unsigned int)min + (unsigned int)max) >> 1来计算mid,会不会存在溢出的问题?毕竟intset的length的类型是uint32_t,理论可以保存$2^{32}$个元素。redis仅使用intset存储少量的元素,如果元素过多,会使用其他方式存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;

/* The value can never be found when the set is empty */
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}

while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}

if (value == cur) {
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min;
return 0;
}
}

插入

插入分两种情况,

  • 可直接插入
  • 新元素的类型比集合存储的类型长,需要升级

直接插入

对于直接插入,先使用intsetSearch()找到待插入位置pos,然后移动pos后的所有元素。

1
2
3
4
5
6
7
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}

is = intsetResize(is,intrev32ifbe(is->length)+1);
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);

移动元素分两个步骤完成,

  • intsetResize()中调用zrealloc()先分配is->length+1的空间,这里可能会有一次内存拷贝。

  • 然后在intsetMoveTail()使用memmove()移动内存,这里一定会有一次内存拷贝。memmove()保证了在原位置和目标位置的情况下,能够安全的进行拷贝。

    计算源地址和目的地址时,需要先将contents转换为内容实际对应的类型,然后再做指针的计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static intset *intsetResize(intset *is, uint32_t len) {
uint32_t size = len*intrev32ifbe(is->encoding);
is = zrealloc(is,sizeof(intset)+size);
return is;
}

static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
void *src, *dst;
uint32_t bytes = intrev32ifbe(is->length)-from;
uint32_t encoding = intrev32ifbe(is->encoding);

if (encoding == INTSET_ENC_INT64) {
src = (int64_t*)is->contents+from;
dst = (int64_t*)is->contents+to;
bytes *= sizeof(int64_t);
} else if (encoding == INTSET_ENC_INT32) {
src = (int32_t*)is->contents+from;
dst = (int32_t*)is->contents+to;
bytes *= sizeof(int32_t);
} else {
src = (int16_t*)is->contents+from;
dst = (int16_t*)is->contents+to;
bytes *= sizeof(int16_t);
}
memmove(dst,src,bytes);
}

需要升级编码

对于插入类型大于集合现有类型的情况,那么待插入的元素一定是在数组头部(负数的时候)或者尾部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0;

/* First set new encoding and resize */
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);

/* Upgrade back-to-front so we don't overwrite values.
* Note that the "prepend" variable is used to make sure we have an empty
* space at either the beginning or the end of the intset. */
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

/* Set the value at the beginning or the end. */
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

判断新的编码类型并分配内存以后,从后往前遍历,逐个升级编码。这里需要处理prepend和append的情况。区别就是对于prepend,遍历时,新的元素的位置是当前位置+1,_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Set the value at pos, using the configured encoding. */
static void _intsetSet(intset *is, int pos, int64_t value) {
uint32_t encoding = intrev32ifbe(is->encoding);

if (encoding == INTSET_ENC_INT64) {
((int64_t*)is->contents)[pos] = value;
memrev64ifbe(((int64_t*)is->contents)+pos);
} else if (encoding == INTSET_ENC_INT32) {
((int32_t*)is->contents)[pos] = value;
memrev32ifbe(((int32_t*)is->contents)+pos);
} else {
((int16_t*)is->contents)[pos] = value;
memrev16ifbe(((int16_t*)is->contents)+pos);
}
}

降级编码

除了遍历,无法确定是否存在超出范围的元素,intset不支持降级。

删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/* Delete integer from intset */
intset *intsetRemove(intset *is, int64_t value, int *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 0;

if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
uint32_t len = intrev32ifbe(is->length);

/* We know we can delete */
if (success) *success = 1;

/* Overwrite value with tail and update length */
if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
is = intsetResize(is,len-1);
is->length = intrev32ifbe(len-1);
}
return is;
}

skiplist

结构

skiplist提供了平均$O(log n)$的查找、插入和删除。zskiplistNode中使用柔性数组设计,每个节点存储了sds,level数组存储了各层指向其他节点的指针,最大层数为ZSKIPLIST_MAXLEVEL = 64

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;

typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;

插入

先找到待插入的位置,查找的时候从顶层开始,在当前层尽可能的往前移动,每层都满足[header, update[i]]:(< score) or (== score and < ele))。最终的插入位置在update[i]之后。

要提的一点是redis的skiplist还维护了,

  • span,记录了两个节点间的距离。

    • 寻找插入位置时,需要在每层都累加节点的span

    • 插入时,

      1
      2
      x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
      update[i]->level[i].span = (rank[0] - rank[i]) + 1;
  • forward,记录了节点的prev指针。

    在插入时,如果新节点的random level > zsl->level,则需要将update中多出来的level设置为header。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}

删除

和以插入相同的方式查找待删除节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}

删除时需要维护spanforward

  • 如果当前层update[i]->level[i].forward == x

    • span

      累加上x的,并减1。

    • forward

      在每层指向待删除节点的下一个节点,update[i]->level[i].forward = x->level[i].forward

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}

Reference

  1. Redis内部数据结构详解(6)——skiplist