Is Erlang's `receive` more powerful/expressive than `thread_get_message`?

Yes, I know how painful that can be. However, please note that when I wrote “in SWI-Prolog”, that was a mistake – I meant “in Web Prolog”. Having said that, an implementation of Web Prolog in SWI-Prolog will likely be assembled from libraries, and there will likely be one library which implements receive/1-2 so that you can use it as you would use any other library. Of course, you don’t have to use it, you may prefer to stick with the thread_* predicates, and there could be lots of reasons for why that would be a better choice. Notably, you may want to allow processes to share the dynamic database so that one process can read or retract what another process has asserted. That’s what I’m doing in the implementation of the caching scheme that was discussed a while ago.

When programming in Web Prolog, which is a language rather than a library, you would not have that choice. The thread_* predicates are simply not available. If you need concurrency, you have to use spawn/2-3, !/2, receive/1-2, etc, and you’d also find that processes can no longer share data since Web Prolog, just like Erlang, adheres to the idea that processes should “communicate to share memory, rather than share memory to communicate”.

Another thing that Web Prolog gives you, and which is not provided by the thread_* predicates, is a straightforward way to do distributive programming in a fairly network-transparent manner. Network transparency with respect to where a process is running, locally or remotely, is compatible with message-passing concurrency, but not with shared-state concurrency. Adding distributed message-passing to concurrent Erlang did not add any new concepts, and no new concept are added when we add distributed message-passing to concurrent Web Prolog. Pass the node option to spawn/2-3 or pengine_spawn/1-2 and off you go, or use rpc/2-3 if synchronous communication and sequential processing is good enough for your purpose.

Sure, and you’re writing one, are you not? That’s a great initiative, so thank you for that!

1 Like

Here is my second version which uses separate threads for important and normal (which I assume is the idea), and doesn’t require any shared memory thanks to message queues.

important(ImportantQueue) :-
    repeat,
    thread_get_message(Message),
    (    Message \== finish
    ->   thread_send_message(ImportantQueue, Message),
         fail
    ;    thread_send_message(ImportantQueue, finish),
         !
    ).

normal(NormalQueue) :-
    repeat,
    thread_get_message(Message),
    (    Message \== finish
    ->   thread_send_message(NormalQueue, Message),
         fail
    ;    thread_send_message(NormalQueue, finish),
         !
    ).

dispatcher(ImportantThread, NormalThread, []) :-
    thread_send_message(ImportantThread, finish),
    thread_send_message(NormalThread, finish), !.

dispatcher(ImportantThread, NormalThread, [Priority-Message|InList]) :-
    (    Priority > 10
    ->   thread_send_message(ImportantThread, Message)
    ;    thread_send_message(NormalThread, Message)
    ),
    dispatcher(ImportantThread, NormalThread, InList).

important_listener(ImportantQueue, Acc, High) :-
    thread_get_message(ImportantQueue, Message),
    (    Message \== finish
    ->   append(Acc, [Message], Next),
         important_listener(ImportantQueue, Next, High)
    ;    Acc = High
    ).

normal_listener(NormalQueue, Acc, Low) :-
    thread_get_message(NormalQueue, Message),
    (    Message \== finish
    ->   append(Acc, [Message], Next),
         normal_listener(NormalQueue, Next, Low)
    ;    Acc = Low
    ).

priority(In, Out) :-
    setup_call_cleanup(( message_queue_create(ImportantQueue),
                         message_queue_create(NormalQueue),
                         thread_create(important(ImportantQueue), ImportantThread),
                         thread_create(normal(NormalQueue), NormalThread)
                       ),
                       ( dispatcher(ImportantThread, NormalThread, In),
                         important_listener(ImportantQueue, [], High),
                         normal_listener(NormalQueue, [], Low),
                         append(High, Low, Out)
                       ),
                       ( thread_join(ImportantThread),
                         thread_join(NormalThread),
                         message_queue_destroy(ImportantQueue),
                         message_queue_destroy(NormalQueue)
                       )).

The original tests result in:

?- priority([15-high, 7-low, 1-low, 17-high], L).
L = [high, high, low, low].

?- priority([13-let, 5-out, 11-the, 17-who, 7-dogs], L).
L = [let, the, who, out, dogs].

Here is a version using the timeout option instead of sending final messages. I had to make small delays in the output listeners to get this to work, so sending final messages seems more efficient.

important(ImportantQueue) :-
    thread_self(PID),
    repeat,
    (    thread_get_message(PID, Message, [timeout(0)])
    ->   thread_send_message(ImportantQueue, Message),
         fail
    ;    !
    ).

normal(NormalQueue) :-
    thread_self(PID),
    repeat,
    (    thread_get_message(PID, Message, [timeout(0)])
    ->   thread_send_message(NormalQueue, Message),
         fail
    ;    !
    ).

dispatcher(_ImportantThread, _NormalThread, []) :- !.

dispatcher(ImportantThread, NormalThread, [Priority-Message|InList]) :-
    (    Priority > 10
    ->   thread_send_message(ImportantThread, Message)
    ;    thread_send_message(NormalThread, Message)
    ),
    dispatcher(ImportantThread, NormalThread, InList).

important_listener(ImportantQueue, Acc, High) :-
    (    thread_get_message(ImportantQueue, Message, [timeout(0.1)])
    ->   append(Acc, [Message], Next),
         important_listener(ImportantQueue, Next, High)
    ;    copy_term(Acc, High)
    ).

normal_listener(NormalQueue, Acc, Low) :-
    (    thread_get_message(NormalQueue, Message, [timeout(0.1)])
    ->   append(Acc, [Message], Next),
         normal_listener(NormalQueue, Next, Low)
    ;    copy_term(Acc, Low)
    ).

priority(In, Out) :-
    setup_call_cleanup(( message_queue_create(ImportantQueue),
                         message_queue_create(NormalQueue),
                         thread_create(important(ImportantQueue), ImportantThread),
                         thread_create(normal(NormalQueue), NormalThread)
                       ),
                       ( dispatcher(ImportantThread, NormalThread, In),
                         important_listener(ImportantQueue, [], High),
                         normal_listener(NormalQueue, [], Low),
                         append(High, Low, Out)
                       ),
                       ( thread_join(ImportantThread),
                         thread_join(NormalThread),
                         message_queue_destroy(ImportantQueue),
                         message_queue_destroy(NormalQueue)
                       )).

I pushed a patch that does allow for this. This implies you can now get arbitrary messages from a queue out of order by registering a guard as freeze(X, myguard(X)), followed by thread_get_message(Q, X,[timeout(0)]).

3 Likes

Its a normal piece of Prolog semantics, so simply use your imagination. You can use a disjunction as a guard and set some variable to capture the associated action. One thing is not honored: being a destructive data structure, possible choice points of the constraint are pruned

I’m not able to make sense of what you write above, and I’m not even sure if you’re talking to me. :slight_smile:

Anyway, here’s a complete example:

:- op(800,  xfx,    !).
:- op(1000, xfy, when).


self(Self) :-
    thread_self(Self).



Pid ! Message :-
    send(Pid, Message).

send(Pid, Message) :-
    catch(thread_send_message(Pid, Message), _, true).



:- thread_local deferred/1.

receive(Clauses) :-
    receive(Clauses, []).

receive(Clauses, Options) :-
    thread_self(Mailbox),
    (   clause(deferred(Message), true, Ref),
        select_rclause_body(Clauses, Message, Body)
    ->  erase(Ref),
        call(Body)
    ;   receive(Mailbox, Clauses, Options)
    ).

receive(Mailbox, Clauses, Options) :-    
    (   thread_get_message(Mailbox, Message, Options)
    ->  (   select_rclause_body(Clauses, Message, Body)
        ->  call(Body)
        ;   assertz(deferred(Message)),
            receive(Mailbox, Clauses, Options)
        )
    ;   option(on_timeout(Body), Options, true),
        call(Body)
    ).

select_rclause_body({PatternGuard -> Body}, Message, Body) :-
    var(PatternGuard), !,
    PatternGuard = Message.
select_rclause_body({PatternGuard -> Body}, Message, Body) :-
    check(PatternGuard, Message).
select_rclause_body({PatternGuard -> Body ; _}, Message, Body) :-
    check(PatternGuard, Message), !.
select_rclause_body({_ ; Rules}, Message, Body) :-
    select_rclause_body({Rules}, Message, Body).

check(Pattern when Guard, Message) :- !,
    Message = Pattern,
    once(Guard).
check(Pattern, Pattern).



flush :-
    self(Mailbox),
    flush(Mailbox).

flush(Mailbox) :-
    receive({
        Message ->
            format("Shell got ~q~n", [Message]),
            flush(Mailbox)
    },[ timeout(0)
    ]).


important(Messages) :-
    receive({
        Priority-Message when Priority > 10 ->
            Messages = [Message|MoreMessages],
            important(MoreMessages)
    },[ timeout(0),
        on_timeout(normal(Messages))
    ]).

normal(Messages) :-
    receive({
        _-Message ->
            Messages = [Message|MoreMessages],
            normal(MoreMessages)
    },[ timeout(0),
        on_timeout(Messages=[])
    ]).
    

init1 :-
    self(S), S ! 15-high, S ! 7-low, S ! 1-low, S ! 17-high.

init2 :-
    self(S), S ! 15-high, S ! dummy(1), S ! 7-low, S ! 1-low, S ! 17-high, S ! dummy(2).

If you load this source in SWI-Prolog, you can do:

?- init1.
true.

?- important(Ms).
Ms = [high, high, low, low].

?- flush.
true.

?- init2.
true.

?- important(Ms).
Ms = [high, high, low, low].

?- flush.
Shell got dummy(1)
Shell got dummy(2)
true.

?-

Oh, that’s really strange, and a bit worrying… I’m not sure what could possibly cause that.

But you’re running SWI-Prolog under Windows, right? So maybe it’s a Windows thing?

Anyone else care to run it and report back?

Ah, ok. I guess that’s alright then…

Great! Does it work with when/2 also?

Of course. In most cases freeze/2 will do though as you can create an as specific as possible term. Often there will be only one variable in such term for which you can use freeze/2 to add a constraint. If you have more such variables when/2 may come handy.

1 Like

Why not? The only thing the patch does is extending the good old plain unification with running constraints triggered by the unification. It is really a minor patch.

With Web Prolog, I don’t. And Erlang doesn’t do that either. It might be difficult to implement when producer and consumer are running on different nodes – maybe that’s the reason?

No, it just rips them.