How would you parallelize between(1,20,N), prime(N)?

Problem is that some programmers might nevertheless prefer Python, Haskell, Elexir, etc… since these languages might provide parallel streaming (a form of OR-parallelism realizing parallel backtracking).

Even recent Java introduce spliterator etc… and can do some generate-test in parallel. Maybe Prolog is a little lacking here, although different Prolog systems offer some solutions here. Like for example:

High level thread primitives
https://www.swi-prolog.org/pldoc/doc/SWI/library/thread.pl

Homework: How would you parallelize between(1,20,N), prime(N) ?

--------- part that was missing from the thread migration ------------

One important programming pattern in Prolog is generate and test. But you don’t use findall/3 for it. Here is an example, testing whether a number is a prime number, spot the generate and the test:

prime(N) :-
    M is floor(sqrt(N)),
    between(2,M,K),
    N mod K =:= 0, !, fail.
prime(_).

Here is an example run:

?- between(1,20,N), prime(N), write(N), nl, fail; true.
1
2
3
5
7
11
13
17
19
true.

findall/3 on the other hand is a primitive aggregation predicate. So we might have a new pattern, which is generate-test-aggregate, which data science might like. Time to write a book about it.

BTW: There is no difference between a test and a filter, they are just Prolog goals. Don’t tell any Python, Haskell, Elexir, etc… programmer, that Prolog is that easy.

2 Likes

It is not completely elegant, but IMO not too bad either :slight_smile: A concurrent_include/3 wouldn’t be that hard to write.

primes(N, L) :-
    numlist(1, N, Candidates),
    concurrent_maplist(is_prime, Candidates, Result),
    include(integer, Result, L).

is_prime(N, false) :-
    M is floor(sqrt(N)),
    between(2,M,K), N mod K =:= 0,
    !.
is_prime(N, N).
107 ?- time((primes(1 000 000, L), length(L, N))).
% 21,921,613 inferences, 5.542 CPU in 9.603 seconds (58% CPU, 3955186 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.
2 Likes

While I like the answer by Jan, this might help to improve upon it.

Parallel Prime Sieve: Finding Prime Numbers

N = 78499.

In checking found out that the solution added 1 (which is not prime) when it shouldn’t. I know it is a small change. :slightly_smiling_face:

what do you get for performance sequential versus parallel
like for example between(900000,1000000,X), prime(X), fail?

(Your code would probably need a little rewriting, since it doesn’t
really qualify as a solution to the problem as intended, since it
materializes a list, and doesn’t do parallel backtracking like the
parallel streaming in Python, Haskell, Elixir etc… is supposed to do)

Can you give a reference that gives more details on this.

I get the part about doing parts of the problem in parallel. I am also thinking that because the answer by Jan pre-generated the list numlist(1, N, Candidates) that is not allowed/prefered. What I am not getting is how the other languages would use parallel streaming to solve this problem in a way that is different than the answer by Jan. Mostly I don’t understand what is meant by how the word backtracking in parallel backtracking works in those languages. :slightly_smiling_face:

concurrent_maplist/3 is ok, it works in parallel. But Jan W. has turned the problem from a generate-test problem, into a generate-test-aggregate problem. But the question asked is only between(1,20,N), prime(N), i.e. only generate-test without the aggregate.

I just wanted to clarify what the scope of the question is, so that we can narrow down solutions. Parallel streaming does no aggregate automatically, you can use a sequence of maps, and then the last map might be a print, and you get the result in some order.

The print has a lock, avoid scrambling text output. In my example I use the top-level as the end of the parallel streaming pipe. With a fail at the end, I even don’t want to see anything, and only measure the performance sequential versus parallel.

I don’t know much how concurrent_maplist/3 is implemented, but it might be quite difficult when comparing sequential versus parallel, when you compare a generate-test in sequential with a generate-test-aggregate in parallel,

the sequential might still perform better.

1 Like

Ok, I see I can do the test sequential versus parallel myself.

For the sequential I use this query:

?- time(aggregate_all(count, (between(1,1000000,X), prime(X)), N)).
% 138,716,312 inferences, 8.906 CPU in 8.904 seconds (100% CPU, 15575165 Lips)
N = 78499.

For the parallel I get on my machine:

?- time((primes(1000000,L), length(L,N))).
% 21,921,681 inferences, 3.813 CPU in 4.576 seconds (83% CPU, 5749949 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.

Ok, nice, faster!

Thanks for fixing.

If you are anyone else ever needs a topic split into a new topic, don’t hesitate to ask the admins. Personally unless I see a reason not to split a thread when asked I will. Also it is very easy to do. Writing this reply took more time than it does to split a topic.

A parallel backtracking solution is slightly faster. I get on the same machine, with another Prolog system. First the sequential performance:

?- time(aggregate_all(count, 
                     (between(1,1000000,X), prime(X)), N)).
% Up 6,737 ms, GC 33 ms, Thread Cpu 6,656 ms (Current 07/16/19 01:49:02)
N = 78499

Doing the same in parallel gives:

?- time(aggregate_all(count, 
                      balance((between(1,1000000,X), prime(X))), N)).
% Up 4,167 ms, GC 39 ms, Thread Cpu 219 ms (Current 07/16/19 01:49:50) 
N = 78499

Disclaimer: This is a preview, currently fine-tuning the underlying queues that make the predicate balance/1 work. I expect it to run yet a little faster.

I am not yet using ArrayBlockingQueue, only some homegrown Queue, which doesn’t use different condition variables for non-empty and non-full.

Working on it…

The proper way to implement parallel streaming (as you call it) is to create a message queue, have one (or more) threads feeding in terms to process and N threads reading. A simple prototype is this:

:- meta_predicate
    pstream(?, 0, ?, 0, +).

pstream(Var, Generator, WVar, WGoal, NWorkers) :-
    message_queue_create(Q, [max_size(1000)]),
    length(Workers, NWorkers),
    maplist(thread_create(work(Q, WVar, WGoal)), Workers),
    forall(Generator, thread_send_message(Q, work(Var))),
    forall(between(1, NWorkers, _), thread_send_message(Q, done)),
    maplist(thread_join, Workers).

work(Q, WVar, Goal) :-
    repeat,
    thread_get_message(Q, Msg),
    (   Msg = work(WVar)
    ->  call(Goal),
        fail
    ;   !
    ).

primes(Updo, Primes, Threads) :-
    message_queue_create(Results),
    thread_create(( pstream(C, between(1, Updo, C),
                            WC, test_prime(WC, Results),
                            Threads),
                    thread_send_message(Results, end_of_file)
                  ),
                  _, [detached(true)]),
    message_queue_members(Results, Primes).

message_queue_members(Queue, List) :-
    thread_get_message(Queue, H),
    message_queue_members(H, Queue, List).

message_queue_members(end_of_file, _, []) :-
    !.
message_queue_members(H, Q, [H|T]) :-
    thread_get_message(Q, H2),
    message_queue_members(H2, Q, T).

test_prime(N, Results) :-
    \+ ( M is floor(sqrt(N)),
         between(2,M,K),
         N mod K =:= 0
       ),
    thread_send_message(Results, N).

This doesn’t scale very well either:

?- time(primes(1 000 000, L, 1)), length(L, N).
% 157,006 inferences, 0.552 CPU in 10.394 seconds (5% CPU, 284480 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.

?- time(primes(1 000 000, L, 2)), length(L, N).
% 157,004 inferences, 0.489 CPU in 6.381 seconds (8% CPU, 321170 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.

?- time(primes(1 000 000, L, 4)), length(L, N).
% 157,004 inferences, 0.361 CPU in 4.875 seconds (7% CPU, 434847 Lips)
L = [1, 2, 3, 7, 5, 11, 13, 19, 17|...],
N = 78499.

?- time(primes(1 000 000, L, 8)), length(L, N).
% 157,004 inferences, 0.505 CPU in 10.893 seconds (5% CPU, 310887 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 19, 17|...],
N = 78499.

That is to be expected as for the lower numbers all the exchange work is probably too much. The somewhat surprising result is that if we change the code to find primes in a range, we get:

?- time(primes(900 000, 1 000 000, L, 1)), length(L, N).
% 14,456 inferences, 0.045 CPU in 1.211 seconds (4% CPU, 321732 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900089, 900091, 900103, 900121|...],
N = 7224.

?- time(primes(900 000, 1 000 000, L, 2)), length(L, N).
% 14,454 inferences, 0.039 CPU in 0.671 seconds (6% CPU, 369823 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900089, 900091, 900103, 900121|...],
N = 7224.

?- time(primes(900 000, 1 000 000, L, 4)), length(L, N).
% 14,454 inferences, 0.040 CPU in 0.379 seconds (10% CPU, 363849 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900091, 900121, 900089, 900103|...],
N = 7224.

?- time(primes(900 000, 1 000 000, L, 8)), length(L, N).
% 14,454 inferences, 0.049 CPU in 1.121 seconds (4% CPU, 294992 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900089, 900091, 900103, 900121|...],
N = 7224.

?- time(primes(900 000, 1 000 000, L, 16)), length(L, N).
% 14,454 inferences, 0.050 CPU in 1.516 seconds (3% CPU, 286556 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900089, 900091, 900103, 900121|...],
N = 7224.

I’m didn’t expect that. I guess it is related to contention on one of the queues.

What is the surprise exactly? Looks same behaviour, varying number of threads along N=1,2,4,8,16, gives the best result for N=4. How many logical CPUs does your machine have?

logical CPUs = number of cores x hyper threading factor

https://en.wikipedia.org/wiki/Hyper-threading

The results are quite good, much better performance than my balance/1. And also bettern performance as concurrent_maplist/3:

 concurrent_maplist/3: 4.576 seconds (see some previous post)
 pstream/5: 2.985 seconds (see below)

On my machine the logical number of CPUs is N=8 and not N=4.

?- time((primes(1 000 000, L, 8), length(L, N))).
% 157,006 inferences, 0.047 CPU in 2.959 seconds (2% CPU, 3349461 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.

?- time((primes(1 000 000, L, 8), length(L, N))).
% 157,006 inferences, 0.234 CPU in 2.997 seconds (8% CPU, 669892 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.

?- time((primes(1 000 000, L, 8), length(L, N))).
% 157,006 inferences, 0.094 CPU in 2.993 seconds (3% CPU, 1674731 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.

?- time((primes(1 000 000, L, 8), length(L, N))).
% 157,006 inferences, 0.109 CPU in 2.985 seconds (4% CPU, 1435483 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.

I was doing multiple runs, because I wanted to see whether the runtime time is eratic.

Somehow SWI-Prolog agrees that N=8 on my machine:

Welcome to SWI-Prolog (threaded, 64 bits, version 8.1.6)

?- current_prolog_flag(cpu_count, X).
X = 8.

I have a similar flag in my system:

?- current_prolog_flag(sys_cpu_count, X).
X = 8

But physically the machine ( i7-6700HQ) is only quad core, with
hyper threading, so 8 = 4 x 2, thats what happens.

P.S.: Maybe you get an itch more performance, if you allow backtracking
of the solutions and use aggregate_all/3, instead of materializing a list.

Not sure…

The test results are from a dual AMD EPYC 7401 24-Core Processor, which means 48 cores and 96 threads. Without queues involved this typically scales close to linear to 48 tasks. This is most likely a contention issue on one of the threads. In the first attempt there is a report on every number, but in my second there is only a report on primes. So, it is most likely the first queue that distributes the work is the problem. Instead of a single queue we could use a queue per thread and distribute the candidates over these threads. The advantage of a single thread is that you do not have to worry about the distribution of work, which is particularly important if the amount of work spent on a single task is hard to predict (as is the case for the prime example).

This probably requires a smarter queue implementation. Currently this is a linked list of terms protected by a mutex and condition variable (and an emulation thereof for Windows).

So you were expecting that for N=8,16(,32,64) it is showing again more speed-up?
Do you own the machine, or do you only have a virtual machine account also known as cloud offering. It could be that you don’t get more CPUs(*) although there are more.

A further problem could be memory bandwidth, which is also a limiting factor. For example the code of pstream/5, test_prime/2, etc… is shared by all threads. Is this lock free? More importantly is it read-only? Is it a contiguous chunk. Does it get into a cache close to the logical CPU?

Just some guesses what could go wrong…

This is bare metal here and CWI, i.e., no VM, just plain Fedora Linux on the raw machine). Running normal Prolog predicates (even dynamic) is completely read-only and involves no locks. As said, this typically scales well. The suspect shared resource is the queue to which the candidates are sent and from which the workers read the candidates. A worker will grab the queue mutex and check for available messages. If found (I guess likely in this case as the writer seems much faster) it removes the message and unlocks the mutex. Otherwise it waits on the condition variable, which unlocks the mutex to re-lock it when it is signalled (so we get two lock/unlock pairs). And no, it slows down far too quick to suspect memory bandwidth.

Try larger queue size. Its now max_size(1000). On the other hand I was experimenting with N^2 as queue size. And started seeing good results. This would mean you need larger queue size, for example:

  N=32      max_size(1024)
  N=64      max_size(4096)

Etc… Probably to do that a mutex/condition which doesn’t get into a blocking state is cheaper, than one that gets into a blocking state.

Here is a test with N=8 for my predicate balance/1:

max_size(8):

?- time(aggregate_all(count, balance((between(1,1000000,X), prime(X))), N)).
% Up 5,936 ms, GC 40 ms, Thread Cpu 407 ms (Current 07/16/19 13:39:37) 
N = 78499

max_size(64):

?- time(aggregate_all(count, balance((between(1,1000000,X), prime(X))), N)).
% Up 4,072 ms, GC 38 ms, Thread Cpu 234 ms (Current 07/16/19 13:36:48)
N = 78499

I tried without, which simply means the writer never blocks. Didn’t make a noticeable difference. The problem is quite likely the number of workers grabbing work from the same queue. It seems to be something worthwhile to look into. Anyone interested, please go ahead. I’m too deeply involved in tabling right now.

Yes you are right, I guess. Unlimited queue will also block. Since an unlimited queue can be still empty. It cannot be full, but it can be empty. It might block in message_queue_members/3, while performing retrieval of the primes. And it might also block in work/3.

The probability that it blocks in work/3 is much higher, the more workers you have. Its very likely that they will deplete what was produced by between/3, when there are more worker. So I guess the problem could be that the blocking is expensive.

Maybe should introduce a counter somewhere, so that I can measure when the queue goes into wait. This could maybe shed more light into this mistery…

Here is a solution in the spirit of what parallel streams can do. For example through Java spliterator and terminal sinks. I don’t know yet how Prolog could automatically do the necessary transformations. But I applied two transformations manually to the problem:

  1. Lower Pressure on Input Queue:
    To lower this pressure I don’t enumerate between(1,1000000, _ ),
    but instead only between(1,1000, _ ) and then each job works
    on a slice of 1000 numbers.

  2. Lower Pressure on Output Queue:
    To lower this pressure I don’t return individual prime numbers,
    but instead a return already the aggregated count of the 1000 numbers,
    and then do an aggregate sum.

The results are little bit encouraging. I had to use JDK 13.0, which is worse in sequential but better in parallel. JDK 13.0 uses a different garbage collector than JDK 1.8.

Sequential:

?- time(count(N)).
% Up 8,655 ms, GC 39 ms, Thread Cpu 8,593 ms (Current 07/16/19 16:47:49)
N = 78499

Parallel:

?- time(count2(N)).
% Up 2,628 ms, GC 4 ms, Thread Cpu 0 ms (Current 07/16/19 16:48:16)
N = 78499

The source code is here:

prime(N) :-
    M is floor(sqrt(N)),
    between(2, M, K),
    N mod K =:= 0, !, fail.
prime(_).

/* sequential */
count(N) :-
   aggregate_all(sum(M), (between(1, 1000, Y), slice(Y, M)), N).

/* parellel */
count2(N) :-
   aggregate_all(sum(M), balance((between(1, 1000, Y), slice(Y, M))), N).

slice(Y,M) :-
   aggregate_all(count, (H is Y*1000, L is H-999, between(L, H, X), prime(X)), M).