STORE_MESSAGE_SCRIPT

Constant STORE_MESSAGE_SCRIPT 

Source
const STORE_MESSAGE_SCRIPT: &str = r#"
local message_key = KEYS[1]
local stream_id_key = KEYS[2] 
local global_stream = KEYS[3]

local message_data = ARGV[1]
local message_id_bytes = ARGV[2]
local author_bytes = ARGV[3]
local expiration_time = ARGV[4] -- empty string if no expiration
local timeout = ARGV[5] -- empty string if no timeout

-- Try to store message with NX (only if not exists)
local set_result
if expiration_time ~= "" and timeout ~= "" then
    set_result = redis.call('SET', message_key, message_data, 'EX', timeout, 'NX')
else
    set_result = redis.call('SET', message_key, message_data, 'NX')  
end

-- If message already exists, return existing stream ID
if not set_result then
    local existing_stream_id = redis.call('GET', stream_id_key)
    if existing_stream_id then
        return {'EXISTS', existing_stream_id}
    else
        return {'ERROR', 'Message exists but no stream ID mapping found'}
    end
end

-- Message is new - add to global stream
local xadd_args = {global_stream, '*', 'id', message_id_bytes, 'author', author_bytes}

-- Add expiration if provided  
if expiration_time ~= "" then
    table.insert(xadd_args, 'exp')
    -- Decode hex-encoded expiration time back to bytes
    local exp_bytes = {}
    for i = 1, #expiration_time, 2 do
        local byte = expiration_time:sub(i, i+1)
        table.insert(exp_bytes, string.char(tonumber(byte, 16)))
    end
    table.insert(xadd_args, table.concat(exp_bytes))
end

-- Add tags from remaining ARGV (starting at index 6)
for i = 6, #ARGV, 2 do
    if ARGV[i] and ARGV[i+1] then
        table.insert(xadd_args, ARGV[i])     -- tag key
        table.insert(xadd_args, ARGV[i+1])   -- tag value  
    end
end

local stream_id = redis.call('XADD', unpack(xadd_args))

-- Store the mapping from message-id to stream-id
redis.call('SET', stream_id_key, stream_id)

return {'STORED', stream_id}
"#;