Is there a way to clear a queue short of running get_message until all the terms are consumed or destroying and recreating? Like a retractall but for message queues? How would I do that?
Even more pressing - if you’re consuming terms from a queue with thread_get_message, what is the signal, or base case if consuming recursively, that the queue is now empty, or that you’ve reached the last term?
I thought maybe if the queue was empty and you tried to pull a message it would fail but looks like if you have this
No. In all the years I never found the need. This may indicate you are trying something dubious Well, if you really want you can use the size property (message_queue_property/2) and get messages until this is zero. Queues are primarily there for concurrency though, so the result of asking the property may be out of date at the moment the predicate completes. The properties are there notably for debugging purposes.
That too is not what they are meant for. You can use thread_get_message/3 with a timeout of zero. Again, it is rare that you should need that.
Queues are best to be compared with pipes that connect processes, but holding terms rather than bytes. They have a few additional properties, such as allowing multiple consumers (readers). If you want to distribute some work using a queue, the way to go is
Create the queue
Create N “workers” that read from this queue.
Send terms that can trigger a unit of work to the queue.
Now, to terminate this gracefully, you send N “end” messages and you make each worker a loop that
gets the next message (unit of work)
if end, stop (succeed)
else, do the unit of work and start the next iteration.
After sending all units of work and the N end messages, the main thread can call thread_join/1 on all thread identifiers to wait for their termination. We end up with something like this:
Hm ok thanks @jan … honestly I’m a little surprised.
Create N “workers” that read from this queue.
What if I don’t know what N is at write time?
Can I give you a little background? Hopefully you can recommend something?
I had created a post on the prolog subreddit but my account was suspended for no apparent reason so the post was removed, but I’m writing a client that will be making api requests.
I have multiple keys — say a, b, and c — which I would like to rotate through at a particular interval to make those requests, so that I’m doing
api call with key(a)
api call with key(a)
rate limit hit!
api call with key(b)
api call with key(b)
rate limit hit!
api call with key(c)
api call with key(c)
rate limit hit!
api call with key(a)
api call with key(a)
...
Now that’s fine for the small demo model of the client but since I’m multithreading I was thinking I should use a queue instead of the global db. What I was hoping to achieve, if you check what I’m doing in assert_keys.pl, is:
Push the keys into a queue.
Pop key(a), make requests until the rate limit (ie. the interval) is reached.
Push that key into a placeholder queue.
Pop key(b) and repeat.
When all keys are consumed, the rotation back to key(a) is done by resetting all keys from the placeholder back into the initial key queue and clearing the placeholder as prep for the next round.
This is exactly what I’m doing in assert_keys.pl except with the global db instead of a queue. However with multithreading I want the popping to occur atomically so I don’t have multiple threads popping multiple keys when they should all be using the same one.
Now, currently the way my code works is I create those keys on the server with a set value; so I’ll have a num_keys(N). and create that many keys, so in its current iteration I do happen to know how many keys I’m working with and I suppose I can use that to cut my queue reads at that number but what if in the future the keys are dynamically pushed into the queue and I won’t know how many ahead of time?
That means I need to implement a whole separate counting system just to keep track of that?
Depends on the concurrency you want. If you have CPU bound tasks you typically pick the number of cores. Web requests are (network) I/O bound, so you use enough to reach at least the rate limit. Using too many doesn’t matter much.
Otherwise, I try to keep telling you to limit the rate at which you put the tasks into the queue. That is a single task, so that is where it is easy. As putting something into the queue is fast, it is good enough to simply sleep/1 between adding the messages.
If you put all messages info the queue and then start rate limiting on the concurrent tasks it gets relatively complicated. Why do that if there is such a simple solution? By limiting the speed at which you insert messages into the task queue, the workers will simply suspend on the queue if they go too fast.