/* This is our hash table structure. Every dictionary has two of this as we * implement incremental rehashing, for the old to the new table. */ typedefstructdictht { dictEntry **table; unsignedlong size; unsignedlong sizemask; unsignedlong used; } dictht;
typedefstructdict { dictType *type; void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ unsignedlong iterators; /* number of iterators currently running */ } dict;
for (table = 0; table <= 1; table++) { idx = hash & d->ht[table].sizemask; /* Search if this slot does not already contain the given key */ he = d->ht[table].table[idx]; while(he) { if (key==he->key || dictCompareKeys(d, key, he->key)) { if (existing) *existing = he; return-1; } he = he->next; } if (!dictIsRehashing(d)) break; }
staticint _dictExpandIfNeeded(dict *d) { /* Incremental rehashing already in progress. Return. */ if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */ if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash * table (global setting) or we should avoid it but the ratio between * elements/buckets is over the "safe" threshold, we resize doubling * the number of buckets. */ if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) { return dictExpand(d, d->ht[0].used*2); } return DICT_OK; }
/* Expand or create the hash table */ intdictExpand(dict *d, unsignedlong size) { /* the size is invalid if it is smaller than the number of * elements already inside the hash table */ if (dictIsRehashing(d) || d->ht[0].used > size) return DICT_ERR;
dictht n; /* the new hash table */ unsignedlong realsize = _dictNextPower(size);
/* Rehashing to the same table size is not useful. */ if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */ n.size = realsize; n.sizemask = realsize-1; n.table = zcalloc(realsize*sizeof(dictEntry*)); n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ if (d->ht[0].table == NULL) { d->ht[0] = n; return DICT_OK; }
/* Prepare a second hash table for incremental rehashing */ d->ht[1] = n; d->rehashidx = 0; return DICT_OK; }
/* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ assert(d->ht[0].size > (unsignedlong)d->rehashidx); while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return1; } de = d->ht[0].table[d->rehashidx]; /* Move all the keys in this bucket from the old to the new hash HT */ while(de) { uint64_t h;
nextde = de->next; /* Get the index in the new hash table */ h = dictHashKey(d, de->key) & d->ht[1].sizemask; de->next = d->ht[1].table[h]; d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; } d->ht[0].table[d->rehashidx] = NULL; d->rehashidx++; }
/* Check if we already rehashed the whole table... */ if (d->ht[0].used == 0) { zfree(d->ht[0].table); d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; return0; }
/* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { next = de->next; fn(privdata, de); de = next; }
/* Set unmasked bits so incrementing the reversed cursor * operates on the masked bits */ v |= ~m0;
/* Increment the reverse cursor */ v = rev(v); v++; v = rev(v);
/* Make sure t0 is the smaller and t1 is the bigger table */ if (t0->size > t1->size) { t0 = &d->ht[1]; t1 = &d->ht[0]; }
m0 = t0->sizemask; m1 = t1->sizemask;
/* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t0->table[v & m0]); de = t0->table[v & m0]; while (de) { next = de->next; fn(privdata, de); de = next; }
/* Iterate over indices in larger table that are the expansion * of the index pointed to by the cursor in the smaller table */ do { /* Emit entries at cursor */ if (bucketfn) bucketfn(privdata, &t1->table[v & m1]); de = t1->table[v & m1]; while (de) { next = de->next; fn(privdata, de); de = next; }
/* Increment the reverse cursor not covered by the smaller mask.*/ v |= ~m1; v = rev(v); v++; v = rev(v);
/* Continue while bits covered by mask difference is non-zero */ } while (v & (m0 ^ m1));
func(rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
执行l.RLock()后,goroutine 1获得写锁。
状态:获得读锁
readerCount = 1,readerWait = 0
goroutine 2:l.Lock()
1 2 3 4 5 6 7 8 9 10
func(rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
func(rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
funcprocess(ctx context.Context, wg *sync.WaitGroup, retCh chanint) { defer wg.Done() for { time.Sleep(time.Second*1) // process // send process result back select { case <-ctx.Done(): retCh <- 1 retCh <- 2 return default: retCh <- 1 retCh <- 2 } } }
funcloop(ctx context.Context, wg *sync.WaitGroup, retCh chanint) { defer wg.Done() wg.Add(1) go process(ctx, wg, retCh) for { select { case <-ctx.Done(): return case ret := <-retCh: fmt.Println(ret)
} } }
funcmain() { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} retCh := make(chanint, 1) wg.Add(1) go loop(ctx, wg, retCh)
# serialize the data before acquiring the lock obj = _ForkingPickler.dumps(obj) if wacquire isNone: send_bytes(obj) else: wacquire() try: send_bytes(obj) finally: wrelease() except IndexError: pass except Exception as e: # ......
defget(self, block=True, timeout=None): if block and timeout isNone: with self._rlock: res = self._recv_bytes() self._sem.release() else: if block: deadline = time.monotonic() + timeout ifnot self._rlock.acquire(block, timeout): raise Empty try: if block: timeout = deadline - time.monotonic() ifnot self._poll(timeout): # 此时的timeout已经减去了等待_rlock.acquire的时间 raise Empty elifnot self._poll(): raise Empty res = self._recv_bytes() self._sem.release() finally: self._rlock.release() # unserialize the data after having released the lock return _ForkingPickler.loads(res)
def_send_bytes(self, buf): n = len(buf) # For wire compatibility with 3.2 and lower header = struct.pack("!i", n) if n > 16384: # The payload is large so Nagle's algorithm won't be triggered # and we'd better avoid the cost of concatenation. self._send(header) self._send(buf) else: # Issue #20540: concatenate before sending, to avoid delays due # to Nagle's algorithm on a TCP socket. # Also note we want to avoid sending a 0-length buffer separately, # to avoid "broken pipe" errors if the other end closed the pipe. self._send(header + buf)
deftask_done(self): with self._cond: ifnot self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): self._cond.notify_all()
defjoin(self): with self._cond: ifnot self._unfinished_tasks._semlock._is_zero(): self._cond.wait()
def_run_finalizers(minpriority=None): ''' Run all finalizers whose exit priority is not None and at least minpriority Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. '''
a. 先看4,既然\(C_{new}\)出现了,那么\(C_{old,new}\)肯定提交了,这个提交需要分别来自新旧配置的多数派,因此\(C_{old}\)中不包含\(C_{old,new}\)的server无法选举为leaderb.
\(C_{new}\)是在\(C_{old,new}\)提交后才复制,如果选举出4,1就不会存在
文章讨论了几种进行日志压缩的方法, * Snapshotting memory-based state
machines * Snapshotting disk-based state machines * Incremental cleaning
approaches * Leader-based approaches
这几个方法有一些核心概念基本都是相通的, 1.
每个server独立的负责日志压缩,而非由leader集中决定。 2.
raft向状态机转移维护prefix of the log的职责。 3. raft丢弃部分日志前缀a
prefix of the log后,状态机会承担两个新的职责 1.
如果server重启,状态机在apply entry前,需要先load那些被丢弃的日志。 2.
为了落后较多的server或新server能够追上,状态机可能需要输出一个状态的镜像。