package container import ( "encoding/json" "sync" ) var SHARD_COUNT = 32 type ConcurrentMap []*ConcurrentMapShared type ConcurrentMapShared struct { items map[string]interface{} sync.RWMutex } func NewConcurrentMap() ConcurrentMap { m := make(ConcurrentMap, SHARD_COUNT) for i := 0; i < SHARD_COUNT; i++ { m[i] = &ConcurrentMapShared{items: make(map[string]interface{})} } return m } func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { return m[uint(fnv32(key))%uint(SHARD_COUNT)] } func (m ConcurrentMap) MSet(data map[string]interface{}) { for key, value := range data { shard := m.GetShard(key) shard.Lock() shard.items[key] = value shard.Unlock() } } func (m ConcurrentMap) Set(key string, value interface{}) { // Get map shard. shard := m.GetShard(key) shard.Lock() shard.items[key] = value shard.Unlock() } func (m ConcurrentMap) Upsert(key string, value interface{}, cb UpsertCb) (res interface{}) { shard := m.GetShard(key) shard.Lock() v, ok := shard.items[key] res = cb(ok, v, value) shard.items[key] = res shard.Unlock() return res } func (m ConcurrentMap) SetIfAbsent(key string, value interface{}) bool { // Get map shard. shard := m.GetShard(key) shard.Lock() _, ok := shard.items[key] if !ok { shard.items[key] = value } shard.Unlock() return !ok } func (m ConcurrentMap) Get(key string) (interface{}, bool) { shard := m.GetShard(key) shard.RLock() val, ok := shard.items[key] shard.RUnlock() return val, ok } func (m ConcurrentMap) Count() int { count := 0 for i := 0; i < SHARD_COUNT; i++ { shard := m[i] shard.RLock() count += len(shard.items) shard.RUnlock() } return count } func (m ConcurrentMap) Has(key string) bool { // Get shard shard := m.GetShard(key) shard.RLock() // See if element is within shard. _, ok := shard.items[key] shard.RUnlock() return ok } func (m ConcurrentMap) Remove(key string) { // Try to get shard. shard := m.GetShard(key) shard.Lock() delete(shard.items, key) shard.Unlock() } func (m ConcurrentMap) RemoveCb(key string, cb RemoveCb) bool { // Try to get shard. shard := m.GetShard(key) shard.Lock() v, ok := shard.items[key] remove := cb(key, v, ok) if remove && ok { delete(shard.items, key) } shard.Unlock() return remove } func (m ConcurrentMap) Pop(key string) (v interface{}, exists bool) { // Try to get shard. shard := m.GetShard(key) shard.Lock() v, exists = shard.items[key] delete(shard.items, key) shard.Unlock() return v, exists } func (m ConcurrentMap) IsEmpty() bool { return m.Count() == 0 } func (m ConcurrentMap) Iter() <-chan Tuple { chans := snapshot(m) ch := make(chan Tuple) go fanIn(chans, ch) return ch } func (m ConcurrentMap) IterBuffered() <-chan Tuple { chans := snapshot(m) total := 0 for _, c := range chans { total += cap(c) } ch := make(chan Tuple, total) go fanIn(chans, ch) return ch } func (m ConcurrentMap) Items() map[string]interface{} { tmp := make(map[string]interface{}) // Insert items to temporary map. for item := range m.IterBuffered() { tmp[item.Key] = item.Val } return tmp } func (m ConcurrentMap) IterCb(fn IterCb) { for idx := range m { shard := (m)[idx] shard.RLock() for key, value := range shard.items { fn(key, value) } shard.RUnlock() } } func (m ConcurrentMap) Keys() []string { count := m.Count() ch := make(chan string, count) go func() { // Foreach shard. wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) for _, shard := range m { go func(shard *ConcurrentMapShared) { // Foreach key, value pair. shard.RLock() for key := range shard.items { ch <- key } shard.RUnlock() wg.Done() }(shard) } wg.Wait() close(ch) }() // Generate keys keys := make([]string, 0, count) for k := range ch { keys = append(keys, k) } return keys } func (m ConcurrentMap) MarshalJSON() ([]byte, error) { // Create a temporary map, which will hold all item spread across shards. tmp := make(map[string]interface{}) // Insert items to temporary map. for item := range m.IterBuffered() { tmp[item.Key] = item.Val } return json.Marshal(tmp) } type UpsertCb func(exist bool, valueInMap interface{}, newValue interface{}) interface{} type RemoveCb func(key string, v interface{}, exists bool) bool type Tuple struct { Key string Val interface{} } func snapshot(m ConcurrentMap) (chans []chan Tuple) { chans = make([]chan Tuple, SHARD_COUNT) wg := sync.WaitGroup{} wg.Add(SHARD_COUNT) // Foreach shard. for index, shard := range m { go func(index int, shard *ConcurrentMapShared) { // Foreach key, value pair. shard.RLock() chans[index] = make(chan Tuple, len(shard.items)) wg.Done() for key, val := range shard.items { chans[index] <- Tuple{key, val} } shard.RUnlock() close(chans[index]) }(index, shard) } wg.Wait() return chans } type IterCb func(key string, v interface{}) func fanIn(chans []chan Tuple, out chan Tuple) { wg := sync.WaitGroup{} wg.Add(len(chans)) for _, ch := range chans { go func(ch chan Tuple) { for t := range ch { out <- t } wg.Done() }(ch) } wg.Wait() close(out) } func fnv32(key string) uint32 { hash := uint32(2166136261) const prime32 = uint32(16777619) for i := 0; i < len(key); i++ { hash *= prime32 hash ^= uint32(key[i]) } return hash }