Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to use Redis 7.4 hash field ttl for presence #403

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ vendor/
*.snap.inprogress
devdata/
*.orig
.luarc.json
27 changes: 16 additions & 11 deletions internal/redis_lua/broker_history_add_list.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@ if result_key_expire ~= '' then
local cached_result = redis.call("hmget", result_key, "e", "s")
local result_epoch, result_offset = cached_result[1], cached_result[2]
if result_epoch ~= false then
return {result_offset, result_epoch, "1"}
return { result_offset, result_epoch, "1" }
end
end

local current_epoch = redis.call("hget", meta_key, "e")
if current_epoch == false then
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
end

local top_offset = redis.call("hincrby", meta_key, "s", 1)

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
redis.call("expire", meta_key, meta_expire)
end

local prev_message_payload = ""
Expand All @@ -42,15 +42,20 @@ redis.call("ltrim", list_key, 0, ltrim_right_bound)
redis.call("expire", list_key, list_ttl)

if channel ~= '' then
if use_delta == "1" then
payload = "__" .. "d1:" .. top_offset .. ":" .. current_epoch .. ":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload
end
redis.call(publish_command, channel, payload)
if use_delta == "1" then
payload = "__" ..
"d1:" ..
top_offset ..
":" ..
current_epoch ..
":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload
end
redis.call(publish_command, channel, payload)
end

if result_key_expire ~= '' then
redis.call("hset", result_key, "e", current_epoch, "s", top_offset)
redis.call("expire", result_key, result_key_expire)
redis.call("hset", result_key, "e", current_epoch, "s", top_offset)
redis.call("expire", result_key, result_key_expire)
end

return {top_offset, current_epoch, "0"}
return { top_offset, current_epoch, "0" }
57 changes: 31 additions & 26 deletions internal/redis_lua/broker_history_add_stream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,61 @@ if result_key_expire ~= '' then
local cached_result = redis.call("hmget", result_key, "e", "s")
local result_epoch, result_offset = cached_result[1], cached_result[2]
if result_epoch ~= false then
return {result_offset, result_epoch, "1"}
return { result_offset, result_epoch, "1" }
end
end

local current_epoch = redis.call("hget", meta_key, "e")
if current_epoch == false then
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
current_epoch = new_epoch_if_empty
redis.call("hset", meta_key, "e", current_epoch)
end

local top_offset = redis.call("hincrby", meta_key, "s", 1)

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
redis.call("expire", meta_key, meta_expire)
end

local prev_message_payload = ""
if use_delta == "1" then
local prev_entries = redis.call("xrevrange", stream_key, "+", "-", "COUNT", 1)
if #prev_entries > 0 then
prev_message_payload = prev_entries[1][2]["d"]
local fields_and_values = prev_entries[1][2]
-- Loop through the fields and values to find the field "d"
for i = 1, #fields_and_values, 2 do
local field = fields_and_values[i]
local value = fields_and_values[i + 1]
if field == "d" then
prev_message_payload = value
break -- Stop the loop once we find the field "d"
local prev_entries = redis.call("xrevrange", stream_key, "+", "-", "COUNT", 1)
if #prev_entries > 0 then
prev_message_payload = prev_entries[1][2]["d"]
local fields_and_values = prev_entries[1][2]
-- Loop through the fields and values to find the field "d"
for i = 1, #fields_and_values, 2 do
local field = fields_and_values[i]
local value = fields_and_values[i + 1]
if field == "d" then
prev_message_payload = value
break -- Stop the loop once we find the field "d"
end
end
end
end
end

redis.call("xadd", stream_key, "MAXLEN", stream_size, top_offset, "d", message_payload)
redis.call("expire", stream_key, stream_ttl)

if channel ~= '' then
local payload
if use_delta == "1" then
payload = "__" .. "d1:" .. top_offset .. ":" .. current_epoch .. ":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload
else
payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload
end
redis.call(publish_command, channel, payload)
local payload
if use_delta == "1" then
payload = "__" ..
"d1:" ..
top_offset ..
":" ..
current_epoch ..
":" .. #prev_message_payload .. ":" .. prev_message_payload .. ":" .. #message_payload .. ":" .. message_payload
else
payload = "__" .. "p1:" .. top_offset .. ":" .. current_epoch .. "__" .. message_payload
end
redis.call(publish_command, channel, payload)
end

if result_key_expire ~= '' then
redis.call("hset", result_key, "e", current_epoch, "s", top_offset)
redis.call("expire", result_key, result_key_expire)
redis.call("hset", result_key, "e", current_epoch, "s", top_offset)
redis.call("expire", result_key, result_key_expire)
end

return {top_offset, current_epoch, "0"}
return { top_offset, current_epoch, "0" }
14 changes: 7 additions & 7 deletions internal/redis_lua/broker_history_list.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ local stream_meta = redis.call("hmget", meta_key, "e", "s")
local current_epoch, top_offset = stream_meta[1], stream_meta[2]

if current_epoch == false then
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch)
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch)
end

if top_offset == false then
top_offset = 0
top_offset = 0
end

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
redis.call("expire", meta_key, meta_expire)
end

local pubs = nil
if include_publications ~= "0" then
pubs = redis.call("lrange", list_key, 0, list_right_bound)
pubs = redis.call("lrange", list_key, 0, list_right_bound)
end

return {top_offset, current_epoch, pubs}
return { top_offset, current_epoch, pubs }
50 changes: 25 additions & 25 deletions internal/redis_lua/broker_history_stream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,43 @@ local stream_meta = redis.call("hmget", meta_key, "e", "s")
local current_epoch, top_offset = stream_meta[1], stream_meta[2]

if current_epoch == false then
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch)
current_epoch = new_epoch_if_empty
top_offset = 0
redis.call("hset", meta_key, "e", current_epoch)
end

if top_offset == false then
top_offset = 0
top_offset = 0
end

if meta_expire ~= '0' then
redis.call("expire", meta_key, meta_expire)
redis.call("expire", meta_key, meta_expire)
end

local pubs = nil

if include_publications ~= "0" then
if limit ~= "0" then
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit)
if limit ~= "0" then
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+", "COUNT", limit)
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit)
end
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-", "COUNT", limit)
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+")
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-")
end
end
else
if reverse == "0" then
pubs = redis.call("xrange", stream_key, since_offset, "+")
else
local get_offset = top_offset
if since_offset ~= "0" then
get_offset = since_offset
end
pubs = redis.call("xrevrange", stream_key, get_offset, "-")
end
end
end

return {top_offset, current_epoch, pubs}
return { top_offset, current_epoch, pubs }
2 changes: 1 addition & 1 deletion internal/redis_lua/broker_publish_idempotent.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if result_key_expire ~= '' then
local stream_meta = redis.call("hmget", result_key, "e", "s")
local result_epoch, result_offset = stream_meta[1], stream_meta[2]
if result_epoch ~= false then
return {result_offset, result_epoch}
return { result_offset, result_epoch }
end
end

Expand Down
19 changes: 14 additions & 5 deletions internal/redis_lua/presence_add.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,34 @@
-- ARGV[4] - info payload
-- ARGV[5] - user ID
-- ARGV[6] - enable user mapping "0" or "1"
-- ARGV[7] - use hash field TTL "0" or "1"

-- Check if client ID is new.
local isNewClient = false
if ARGV[6] ~= '0' then
isNewClient = redis.call("hexists", KEYS[2], ARGV[3]) == 0
isNewClient = redis.call("hexists", KEYS[2], ARGV[3]) == 0
end

-- Add per-client presence.
redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
redis.call("hset", KEYS[2], ARGV[3], ARGV[4])
redis.call("expire", KEYS[1], ARGV[1])
redis.call("expire", KEYS[2], ARGV[1])
if ARGV[7] == '0' then
redis.call("zadd", KEYS[1], ARGV[2], ARGV[3])
redis.call("expire", KEYS[1], ARGV[1])
else
redis.call("hexpire", KEYS[2], ARGV[1], "FIELDS", "1", ARGV[3])
end

-- Add per-user information.
if ARGV[6] ~= '0' then
redis.call("zadd", KEYS[3], ARGV[2], ARGV[5])
redis.call("expire", KEYS[3], ARGV[1])
if isNewClient then
redis.call("hincrby", KEYS[4], ARGV[5], 1)
end
redis.call("expire", KEYS[4], ARGV[1])
if ARGV[7] == '0' then
redis.call("zadd", KEYS[3], ARGV[2], ARGV[5])
redis.call("expire", KEYS[3], ARGV[1])
else
redis.call("hexpire", KEYS[4], ARGV[1], "FIELDS", "1", ARGV[5])
end
end
15 changes: 9 additions & 6 deletions internal/redis_lua/presence_get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
-- KEYS[1] - presence set key
-- KEYS[2] - presence hash key
-- ARGV[1] - current timestamp in seconds
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
-- ARGV[2] - use hash field TTL "0" or "1"
if ARGV[2] == '0' then
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
end
end
return redis.call("hgetall", KEYS[2])
9 changes: 7 additions & 2 deletions internal/redis_lua/presence_rem.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
-- ARGV[1] - client ID
-- ARGV[2] - user ID
-- ARGV[3] - enable user mapping "0" or "1"
-- ARGV[4] - use hash field TTL "0" or "1"

local clientExists = false
if ARGV[3] ~= '0' then
Expand All @@ -14,14 +15,18 @@ if ARGV[3] ~= '0' then
end

redis.call("hdel", KEYS[2], ARGV[1])
redis.call("zrem", KEYS[1], ARGV[1])
if ARGV[4] == '0' then
redis.call("zrem", KEYS[1], ARGV[1])
end

if ARGV[3] ~= '0' and clientExists then
local connectionsCount = redis.call("hincrby", KEYS[4], ARGV[2], -1)
-- If the number of connections for this user is zero, remove the user
-- from the sorted set and clean hash.
if connectionsCount <= 0 then
redis.call("zrem", KEYS[3], ARGV[2])
if ARGV[4] == '0' then
redis.call("zrem", KEYS[3], ARGV[2])
end
redis.call("hdel", KEYS[4], ARGV[2])
end
end
31 changes: 18 additions & 13 deletions internal/redis_lua/presence_stats_get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@
-- KEYS[3] - per-user zset key
-- KEYS[4] - per-user hash key
-- ARGV[1] - current timestamp in seconds
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
-- ARGV[2] - use hash field TTL "0" or "1"
if ARGV[2] == '0' then
local expired = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
if #expired > 0 then
for num = 1, #expired do
redis.call("hdel", KEYS[2], expired[num])
end
redis.call("zremrangebyscore", KEYS[1], "0", ARGV[1])
end
end

local userExpired = redis.call("zrangebyscore", KEYS[3], "0", ARGV[1])
if #userExpired > 0 then
for num = 1, #userExpired do
redis.call("hdel", KEYS[4], userExpired[num])
end
redis.call("zremrangebyscore", KEYS[3], "0", ARGV[1])
if ARGV[2] == '0' then
local userExpired = redis.call("zrangebyscore", KEYS[3], "0", ARGV[1])
if #userExpired > 0 then
for num = 1, #userExpired do
redis.call("hdel", KEYS[4], userExpired[num])
end
redis.call("zremrangebyscore", KEYS[3], "0", ARGV[1])
end
end

local clientCount = redis.call("hlen", KEYS[2])
local userCount = redis.call("hlen", KEYS[4])

return {clientCount, userCount}
return { clientCount, userCount }
Loading
Loading