Concurrent_setof?

Now that we have concurrent_maplist/2, concurrent_forall/2, concurrent_and/2, does it make sense to have a concurrent_setof/3? (and concurrent_aggregate/3, etc.)? My experience with concurrent_maplist/2 is that its speed-up varies a lot: sometimes the speedup matches the number of cores; sometimes I get a slowdown (this is what I would expect).

Related: a version of setof/3 that doesn’t use sort/2 but instead uses a hash (or assoc) to guarantee unique results.

PS: It’d also be nice if concurrent_maplist/2 allowed specifying the number of threads, the way the other concurrent predicates do. Or I suppose I could use setup_call_cleanup/3 to wrap the call with a save cpu_count, change its value, then restore (does it make sense to add such a predicate to library(thread)?) … the use case is when I have a number of predicates I want to run in parallel with a wide variation of run times and it’s impossible to predict which ones will be long running, so it’s nice to get as many as possible into the queue.

PPS: In trying to understand the implementation of setof/3, I found that it uses $new_findall_bag, which doesn’t seem to be defined anywhere in the sources …

What would you gain? First we have the backtracking phase. I see no way to to distribute enumerating all answers to a goal over multiple threads. Of course, if this goal does something that can be done concurrently, we are fine. The answers need to be copied away to survive backtracking. That is also hard to do concurrently as communicating terms between threads already implies the same copying process.

Then we need to collect, sort and do stuff with the non-existential variables. None of these phases is easy to do concurrently. Not sorting may help a little, but as I recall, the ordered set is used to simplify the non-existential variable handling.

Possibly. Not sure about a sensible way to specify the number of CPUs. Proper use of the concurrency features is indeed hard. Possibly we should adhere a different model where we have a dynamic pool of threads ready to do arbitrary work and the concurrent_* predicates just give work to this pool? Then there could be a monitoring thread that watches overall performance and decides to grow or shrink the pool. Something like that is already done for the HTTP server. Ideally there should be some way to figure out whether the copying and queue overhead is fair compared to the compute cost. That is not easy to predict :frowning:

$ git grep -n \$new_findall_bag
pl-bag.c:145:PRED_IMPL("$new_findall_bag", 0, new_findall_bag, 0)
pl-bag.c:401:  PRED_DEF("$new_findall_bag",     0, new_findall_bag,     0)

(I just discovered the Engines section of the manual, which further answers my question …)

Re-reading the code, I see that both concurrent_forall/2 and concurrent_and/2 run the Generator sequentially (and so does engine_next/2) and the Test or Action can run in parallel with the Generator, so conceivably the gathering part of setof/3 could also run in parallel with the generator; but I’m guessing that the gathering part is quite fast?

If aggregation runs over an indexed argument, then I suppose the generator could be broken up into chunks, each running in parallel, and the results merged? But that sounds tricky and not clear that it would often give a speed-up …

BTW, this is my solution for controlling the number of threads used by concurrent_maplist/2:

%! concurrent_count(+Max, -Cores, :Goal) is nondet.
% A utility for controlling the number of concurrent threads.
% Max can be an expression allowed  on the r.h.s. of is/2;
%   it can include the number of cores by using the Cores logical variable, e.g.:
%     concurrent_count(2 * Cores, Cores, my_goal(...)).
?- meta_predicate concurrent_count(+, ?, 0).
concurrent_count(Max, Cores, Goal) :-
    current_prolog_flag(cpu_count, Cores),
    MaxValue is Max,
    setup_call_cleanup(set_prolog_flag(cpu_count, MaxValue),
                       Goal,
                       set_prolog_flag(cpu_count, Cores)).

That is indeed possible. It makes little sense for setof/3 as the first thing we need to do is sort/2, so we must wait for all answers. Even if we relax the ordering, we need to split the result sets into bags with the same variable values for the free variables and thus still need to first collect all answers. I fear there is little to gain while the cost in terms of complexity is high.

Sounds pretty hairy :frowning: But yes, it might be possible to run generators concurrently for specific cases. Preserving full semantics under cuts, dynamic database changes, etc. will be really hard, I fear.

It is as good as it gets right now. I think it is worthwhile looking for improvements at some point. For one thing, what happens if you use this in a web server that may already run tasks concurrently? As you note, the optimal number of threads is really hard to predict. It is also hard to determine dynamically. Sometimes threads are waiting for I/O. Even than, adding more workers may make this better or may not help at all because they are all waiting for the same bottleneck. Sometimes they are waiting for each others results. Again, more is not always better. Sometimes they wait on synchronization and while throwing more cores at it does make CPU usage go up, overall performance goes down because the cores are mostly executing spin locks, resolving CAS conflicts, etc. …

If the application produces something at a certain rate that you can monitor you could dynamically change the concurrency to optimize. If not, I have no clue :frowning:

You need some slicing parameter to run them in parallel. You can then use concurrent_and/2.
Just compose concurrent_and/2 and findall/3, bagof/3, setof/3, …:

?- findall(X, concurrent_and(slice(Y),generate(Y,X)), L).

Internally findall/3 is using multi-threaded unsafe ‘$add_findall_bag’ I suspect, but concurrent_and/2 is already serializing the Xs through its output message queue.

You could combine ‘$add_findall_bag’ and the output message queue into a multi-threading safe ‘$add_findall_bag’. That is the child threads would not write into the output message queue but directly call ‘$add_findall_bag’.

You can think about the pros and cons. You need a new multi-threading safe ‘$add_findall_bag’, the old ‘$add_findall_bag’ probably doesn’t have enough synchronization to allow multiple writers. Also ‘$add_findall_bag’ uses current_bag, the child threads would need to use something else.

Maybe your message queues have a ‘$collect_findall_bag’, so that a tweaked concurrent_and/2 would do the job? The child threads would write into the output message queue as before, but this output message queue is unbounded and has no consumer like in the normal concurrent_and/2.

Instead the result is materialized if the modified concurrent_and/2 has finished. Probably only a marginal speed gain? Implement it, decide a test cases and measure it.