The proper way to implement parallel streaming (as you call it) is to create a message queue, have one (or more) threads feeding in terms to process and N threads reading. A simple prototype is this:
:- meta_predicate
pstream(?, 0, ?, 0, +).
pstream(Var, Generator, WVar, WGoal, NWorkers) :-
message_queue_create(Q, [max_size(1000)]),
length(Workers, NWorkers),
maplist(thread_create(work(Q, WVar, WGoal)), Workers),
forall(Generator, thread_send_message(Q, work(Var))),
forall(between(1, NWorkers, _), thread_send_message(Q, done)),
maplist(thread_join, Workers).
work(Q, WVar, Goal) :-
repeat,
thread_get_message(Q, Msg),
( Msg = work(WVar)
-> call(Goal),
fail
; !
).
primes(Updo, Primes, Threads) :-
message_queue_create(Results),
thread_create(( pstream(C, between(1, Updo, C),
WC, test_prime(WC, Results),
Threads),
thread_send_message(Results, end_of_file)
),
_, [detached(true)]),
message_queue_members(Results, Primes).
message_queue_members(Queue, List) :-
thread_get_message(Queue, H),
message_queue_members(H, Queue, List).
message_queue_members(end_of_file, _, []) :-
!.
message_queue_members(H, Q, [H|T]) :-
thread_get_message(Q, H2),
message_queue_members(H2, Q, T).
test_prime(N, Results) :-
\+ ( M is floor(sqrt(N)),
between(2,M,K),
N mod K =:= 0
),
thread_send_message(Results, N).
This doesn’t scale very well either:
?- time(primes(1 000 000, L, 1)), length(L, N).
% 157,006 inferences, 0.552 CPU in 10.394 seconds (5% CPU, 284480 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.
?- time(primes(1 000 000, L, 2)), length(L, N).
% 157,004 inferences, 0.489 CPU in 6.381 seconds (8% CPU, 321170 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 17, 19|...],
N = 78499.
?- time(primes(1 000 000, L, 4)), length(L, N).
% 157,004 inferences, 0.361 CPU in 4.875 seconds (7% CPU, 434847 Lips)
L = [1, 2, 3, 7, 5, 11, 13, 19, 17|...],
N = 78499.
?- time(primes(1 000 000, L, 8)), length(L, N).
% 157,004 inferences, 0.505 CPU in 10.893 seconds (5% CPU, 310887 Lips)
L = [1, 2, 3, 5, 7, 11, 13, 19, 17|...],
N = 78499.
That is to be expected as for the lower numbers all the exchange work is probably too much. The somewhat surprising result is that if we change the code to find primes in a range, we get:
?- time(primes(900 000, 1 000 000, L, 1)), length(L, N).
% 14,456 inferences, 0.045 CPU in 1.211 seconds (4% CPU, 321732 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900089, 900091, 900103, 900121|...],
N = 7224.
?- time(primes(900 000, 1 000 000, L, 2)), length(L, N).
% 14,454 inferences, 0.039 CPU in 0.671 seconds (6% CPU, 369823 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900089, 900091, 900103, 900121|...],
N = 7224.
?- time(primes(900 000, 1 000 000, L, 4)), length(L, N).
% 14,454 inferences, 0.040 CPU in 0.379 seconds (10% CPU, 363849 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900091, 900121, 900089, 900103|...],
N = 7224.
?- time(primes(900 000, 1 000 000, L, 8)), length(L, N).
% 14,454 inferences, 0.049 CPU in 1.121 seconds (4% CPU, 294992 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900089, 900091, 900103, 900121|...],
N = 7224.
?- time(primes(900 000, 1 000 000, L, 16)), length(L, N).
% 14,454 inferences, 0.050 CPU in 1.516 seconds (3% CPU, 286556 Lips)
L = [900001, 900007, 900019, 900037, 900061, 900089, 900091, 900103, 900121|...],
N = 7224.
I’m didn’t expect that. I guess it is related to contention on one of the queues.