const STORE_MESSAGE_SCRIPT: &str = r#"
local message_key = KEYS[1]
local stream_id_key = KEYS[2]
local global_stream = KEYS[3]
local message_data = ARGV[1]
local message_id_bytes = ARGV[2]
local author_bytes = ARGV[3]
local expiration_time = ARGV[4] -- empty string if no expiration
local timeout = ARGV[5] -- empty string if no timeout
-- Try to store message with NX (only if not exists)
local set_result
if expiration_time ~= "" and timeout ~= "" then
set_result = redis.call('SET', message_key, message_data, 'EX', timeout, 'NX')
else
set_result = redis.call('SET', message_key, message_data, 'NX')
end
-- If message already exists, return existing stream ID
if not set_result then
local existing_stream_id = redis.call('GET', stream_id_key)
if existing_stream_id then
return {'EXISTS', existing_stream_id}
else
return {'ERROR', 'Message exists but no stream ID mapping found'}
end
end
-- Message is new - add to global stream
local xadd_args = {global_stream, '*', 'id', message_id_bytes, 'author', author_bytes}
-- Add expiration if provided
if expiration_time ~= "" then
table.insert(xadd_args, 'exp')
-- Decode hex-encoded expiration time back to bytes
local exp_bytes = {}
for i = 1, #expiration_time, 2 do
local byte = expiration_time:sub(i, i+1)
table.insert(exp_bytes, string.char(tonumber(byte, 16)))
end
table.insert(xadd_args, table.concat(exp_bytes))
end
-- Add tags from remaining ARGV (starting at index 6)
for i = 6, #ARGV, 2 do
if ARGV[i] and ARGV[i+1] then
table.insert(xadd_args, ARGV[i]) -- tag key
table.insert(xadd_args, ARGV[i+1]) -- tag value
end
end
local stream_id = redis.call('XADD', unpack(xadd_args))
-- Store the mapping from message-id to stream-id
redis.call('SET', stream_id_key, stream_id)
return {'STORED', stream_id}
"#;