Skip to content

Commit

Permalink
redis presence code/layout improvements (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Oct 9, 2023
1 parent 2c52320 commit 6036b76
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 55 deletions.
11 changes: 11 additions & 0 deletions internal/redis_lua/presence_add.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- Add/update presence information.
-- KEYS[1] - presence set key
-- KEYS[2] - presence hash key
-- ARGV[1] - key expire seconds
-- ARGV[2] - expire at for set member
-- ARGV[3] - client ID
-- ARGV[4] - info payload
redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
redis.call("hset", KEYS[2], ARGV[3], ARGV[4])
redis.call("expire", KEYS[1], ARGV[1])
redis.call("expire", KEYS[2], ARGV[1])
12 changes: 12 additions & 0 deletions internal/redis_lua/presence_get.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Get presence information.
-- KEYS[1] - presence set key
-- KEYS[2] - presence hash key
-- ARGV[1] - current timestamp in seconds
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
end
return redis.call("hgetall", KEYS[2])
6 changes: 6 additions & 0 deletions internal/redis_lua/presence_rem.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Remove client presence.
-- KEYS[1] - presence set key
-- KEYS[2] - presence hash key
-- ARGV[1] - client ID
redis.call("hdel", KEYS[2], ARGV[1])
redis.call("zrem", KEYS[1], ARGV[1])
113 changes: 58 additions & 55 deletions presence_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"time"

_ "embed"

"github.com/centrifugal/centrifuge/internal/convert"

"github.com/centrifugal/protocol"
Expand All @@ -18,9 +20,9 @@ var _ PresenceManager = (*RedisPresenceManager)(nil)
// RedisPresenceManager keeps presence in Redis thus allows scaling nodes.
type RedisPresenceManager struct {
node *Node
sharding bool
config RedisPresenceManagerConfig
shards []*RedisShard
sharding bool
addPresenceScript *rueidis.Lua
remPresenceScript *rueidis.Lua
presenceScript *rueidis.Lua
Expand All @@ -43,44 +45,15 @@ type RedisPresenceManagerConfig struct {
Shards []*RedisShard
}

const (
// Add/update client presence information.
// KEYS[1] - presence set key
// KEYS[2] - presence hash key
// ARGV[1] - key expire seconds
// ARGV[2] - expire at for set member
// ARGV[3] - client ID
// ARGV[4] - info payload
addPresenceSource = `
redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
redis.call("hset", KEYS[2], ARGV[3], ARGV[4])
redis.call("expire", KEYS[1], ARGV[1])
redis.call("expire", KEYS[2], ARGV[1])
`

// Remove client presence.
// KEYS[1] - presence set key
// KEYS[2] - presence hash key
// ARGV[1] - client ID
remPresenceSource = `
redis.call("hdel", KEYS[2], ARGV[1])
redis.call("zrem", KEYS[1], ARGV[1])
`

// Get presence information.
// KEYS[1] - presence set key
// KEYS[2] - presence hash key
// ARGV[1] - current timestamp in seconds
presenceSource = `
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
end
return redis.call("hgetall", KEYS[2])
`
var (
//go:embed internal/redis_lua/presence_add.lua
addPresenceScriptSource string

//go:embed internal/redis_lua/presence_rem.lua
remPresenceScriptSource string

//go:embed internal/redis_lua/presence_get.lua
presenceScriptSource string
)

// NewRedisPresenceManager creates new RedisPresenceManager.
Expand All @@ -102,15 +75,15 @@ func NewRedisPresenceManager(n *Node, config RedisPresenceManagerConfig) (*Redis
}

m := &RedisPresenceManager{
node: n,
shards: config.Shards,
config: config,
sharding: len(config.Shards) > 1,
addPresenceScript: rueidis.NewLuaScript(addPresenceSource),
remPresenceScript: rueidis.NewLuaScript(remPresenceSource),
presenceScript: rueidis.NewLuaScript(presenceSource),
}
node: n,
shards: config.Shards,
config: config,
sharding: len(config.Shards) > 1,

addPresenceScript: rueidis.NewLuaScript(addPresenceScriptSource),
remPresenceScript: rueidis.NewLuaScript(remPresenceScriptSource),
presenceScript: rueidis.NewLuaScript(presenceScriptSource),
}
return m, nil
}

Expand All @@ -130,16 +103,27 @@ func (m *RedisPresenceManager) AddPresence(ch string, uid string, info *ClientIn
return m.addPresence(m.getShard(ch), ch, uid, info)
}

func (m *RedisPresenceManager) addPresence(s *RedisShard, ch string, uid string, info *ClientInfo) error {
func (m *RedisPresenceManager) addPresenceScriptKeysArgs(s *RedisShard, ch string, uid string, info *ClientInfo) ([]string, []string, error) {
expire := int(m.config.PresenceTTL.Seconds())
infoBytes, err := infoToProto(info).MarshalVT()
if err != nil {
return err
return nil, nil, err
}
expireAt := time.Now().Unix() + int64(expire)
hashKey := m.presenceHashKey(s, ch)
setKey := m.presenceSetKey(s, ch)
resp := m.addPresenceScript.Exec(context.Background(), s.client, []string{string(setKey), string(hashKey)}, []string{strconv.Itoa(expire), strconv.FormatInt(expireAt, 10), uid, convert.BytesToString(infoBytes)})

keys := []string{string(setKey), string(hashKey)}
args := []string{strconv.Itoa(expire), strconv.FormatInt(expireAt, 10), uid, convert.BytesToString(infoBytes)}
return keys, args, nil
}

func (m *RedisPresenceManager) addPresence(s *RedisShard, ch string, uid string, info *ClientInfo) error {
keys, args, err := m.addPresenceScriptKeysArgs(s, ch, uid, info)
if err != nil {
return err
}
resp := m.addPresenceScript.Exec(context.Background(), s.client, keys, args)
if rueidis.IsRedisNil(resp.Error()) {
return nil
}
Expand All @@ -151,10 +135,20 @@ func (m *RedisPresenceManager) RemovePresence(ch string, uid string) error {
return m.removePresence(m.getShard(ch), ch, uid)
}

func (m *RedisPresenceManager) removePresence(s *RedisShard, ch string, uid string) error {
func (m *RedisPresenceManager) removePresenceScriptKeysArgs(s *RedisShard, ch string, uid string) ([]string, []string, error) {
hashKey := m.presenceHashKey(s, ch)
setKey := m.presenceSetKey(s, ch)
resp := m.remPresenceScript.Exec(context.Background(), s.client, []string{string(setKey), string(hashKey)}, []string{uid})
keys := []string{string(setKey), string(hashKey)}
args := []string{uid}
return keys, args, nil
}

func (m *RedisPresenceManager) removePresence(s *RedisShard, ch string, uid string) error {
keys, args, err := m.removePresenceScriptKeysArgs(s, ch, uid)
if err != nil {
return err
}
resp := m.remPresenceScript.Exec(context.Background(), s.client, keys, args)
if rueidis.IsRedisNil(resp.Error()) {
return nil
}
Expand All @@ -166,12 +160,21 @@ func (m *RedisPresenceManager) Presence(ch string) (map[string]*ClientInfo, erro
return m.presence(m.getShard(ch), ch)
}

// Presence - see PresenceManager interface description.
func (m *RedisPresenceManager) presence(s *RedisShard, ch string) (map[string]*ClientInfo, error) {
func (m *RedisPresenceManager) presenceScriptKeysArgs(s *RedisShard, ch string) ([]string, []string, error) {
hashKey := m.presenceHashKey(s, ch)
setKey := m.presenceSetKey(s, ch)
now := int(time.Now().Unix())
resp, err := m.presenceScript.Exec(context.Background(), s.client, []string{string(setKey), string(hashKey)}, []string{strconv.Itoa(now)}).ToArray()
keys := []string{string(setKey), string(hashKey)}
args := []string{strconv.Itoa(now)}
return keys, args, nil
}

func (m *RedisPresenceManager) presence(s *RedisShard, ch string) (map[string]*ClientInfo, error) {
keys, args, err := m.presenceScriptKeysArgs(s, ch)
if err != nil {
return nil, err
}
resp, err := m.presenceScript.Exec(context.Background(), s.client, keys, args).ToArray()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 6036b76

Please sign in to comment.