Library(redis_stream): Capping the stream in consumer groups

I started a new topic to make it more organized.

The way the code is now (V8.3.9-18-geeae287), the streams used in the consumer groups code ( library(redis_stream) ) will grow unbounded, and all the data will remain even when the processes are terminated. This is the default redis way of operating also.

However, I think this is a little dangerous for the regular user. We have some options that I can think of:

  • Make the MAXLEN ~ option for the XADD command have a large default value, to prevent a nightmare with users asking why the machine is running out of memory.

  • Maybe provide a call-back option when the stream grows close to a certain size, but this one seems less desirable, since we would need to check the size of the stream regularly.

  • Provide an option to call XTRIM regularly at user specified time intervals, with a user-specified limit for the stream.

Probably we should also add a fairly noticeable paragraph in the documentation explaining that the stream will grow unbound unless MAXLEN has the proper value or XTRIM is called. The user can also call DEL <streamname> to delete the entire stream when the process is finishing.

In any case, I think leaving the stream with unbound growth by default is rather dangerous.

EDIT: to see the danger of this, run the demo and then terminate all processes. then in redis-cli run XRANGE candidates - + and you will see the messages still there.

If you need some post moved about, just ask as I can use my admin rights. I usually agree to what is requested.

Thanks EricGT! I am always grateful for the welcoming and polite environment that you produce in this forum. If I think of something to move I’ll let you know.

Thanks. I wasn’t claiming all was done :slight_smile: Yes, limiting the memory usage to make it run forever is something that must be done. I had hoped (and some with me as I read some SO topics) that ACK’ed messages would disappear, but as I understand it the ACK stuff is part of the consumer group and the stream itself is unaware of all this. Still, I would like to have something saying “remove all ACK’ed” messages. No such thing exists :frowning: All we have is XTRIM and the MAXLEN option of XADD. Both limit the length, but it is rather hard to set a general limit on the length. It depends on the the size of the messages, whether messages come in more or less randomly or in bursts (that cause the stream to grow for a while), available memory, how bad it is if some messages get lost, etc.

Something similar holds for the timeout/block for XREADGROUP: it is clear you need to enable this to deal with not-ACK’ed messages due to some consumer failure, but a sensible default value is hard to give.

So, what to do? We already have xstream_set/2 that allows setting the maxlen for a stream. So, that is merely a matter of documenting. I thought that maybe we could use XPENDING, but we can’t. As long as no consumer asked anything, nothing is considered pending. That part now starts to make sense. Guess time for some more Googling …

I think the following from the XDEL docs explains why redis behaves this way, and why XTRIM/MAXLEN ~ is better.

Redis streams are represented in a way that makes them memory efficient: a radix tree is used in order to index macro-nodes that pack linearly tens of stream entries. Normally what happens when you delete an entry from a stream is that the entry is not really evicted, it just gets marked as deleted.

Eventually if all the entries in a macro-node are marked as deleted, the whole node is destroyed and the memory reclaimed. This means that if you delete a large amount of entries from a stream, for instance more than 50% of the entries appended to the stream, the memory usage per entry may increment, since what happens is that the stream will start to be fragmented. However the stream performances will remain the same.

BTW, there is another option, we can set the EXPIRE time of the entire stream. I tested it and it deletes the whole stream at expiration time if no XADDs have been made. As soon as an XADD is made, the expiration time is set to -1 automatically, so the stream will not expire if an XADD was made after calling EXPIRE.

Interesting, but it doesn’t work as with the stream, the associated group also vanishes:

t :-
    redis(default, xgroup(create, s, grp, $, mkstream)),
    redis(default, pexpire(s, 100)),
    xadd(default, s, _, _{a:1}),
    redis(default, xreadgroup(group, grp, alice, streams, s, >), L1),
    writeln(L1),
    sleep(0.2),
    xadd(default, s, _, _{a:2}),
    redis(default, xreadgroup(group, grp, alice, streams, s, >), L2),
    writeln(L2).

Now we get an error on the second xreadgroup command :frowning:

Ahh…I didn’t test it with the group. I guess we are left with XTRIM and MAXLEN.