Is Erlang's `receive` more powerful/expressive than `thread_get_message`?

It’s very quiet here today, so let me give you a couple of problems to chew on.

As seen earlier here, by means of the thread_* predicates it’s often possible and indeed fairly easy to translate an Erlang program into a functionally equivalent Prolog program. But is it always possible/easy?

Here’s another Erlang example, this time an implementation of a kind of priority queue from a very nice and free introduction to Erlang:

important() ->
    receive
        {Priority, Message} when Priority > 10 ->
            [Message | important()]
    after 0 ->
        normal()
    end.
 
normal() ->
    receive
        {_, Message} ->
            [Message | normal()]
    after 0 ->
        []
    end.

The idea is that calling important/1 will build a list of all messages with those with a priority above 10 coming first.

Here’s how it can be programmed in Web Prolog:

important(Messages) :-
    receive({
        Priority-Message when Priority > 10 ->
            Messages = [Message|MoreMessages],
            important(MoreMessages)
    },[
        timeout(0),
        on_timeout(normal(Messages))
    ]).

normal(Messages) :-
    receive({
        _-Message ->
            Messages = [Message|MoreMessages],
            normal(MoreMessages)
    },[
        timeout(0),
        on_timeout(Messages = [])
    ]).

and here’s how it runs:

?- self(S), S ! 15-high, S ! 7-low, S ! 1-low, S ! 17-high.
S = 45783241@'http://localhost:3060'.
 
?- important(Messages).
Messages = [high,high,low,low].

Problem 1

How would you program this example using suitable thread_* predicates?

Here’s how you should be able to test your solution:

?- thread_self(S), 
   thread_send_message(S, 15-high), 
   thread_send_message(S, 7-low), 
   thread_send_message(S, 1-low), 
   thread_send_message(S, 17-high). 
S = main.
   
?- important(Messages).
Messages = [high,high,low,low].

Problem 2

Informed by your solution to Problem 1, how would you implement the receive/1-2 predicate so that it can handle this example (and other examples involving receive/1-2)?

Problem 3

Does this example show that Erlangs receive/1-2 is more expressive/powerful than the thread_get_message/1-3 predicate that SWI-Prolog and (other Prolog systems) offer?

My own conclusions so far

I have failed to find a neat solution to Problem 1. I hope that someone will be able to show me how to do it!

I have a couple of solutions to Problem 2, but they’re more hacky than I’d like.

My answer to the question in Problem 3 is yes. I think so. Can you prove me wrong?

If the answer boils down to “Can I get the pairs from a message queue with Priority > 10”, the answer is no. You can get the messages from a queue out-of-order based on unification. If handling constraints was properly supported in thread_get_message/3, this would work to get the important messages (it is then easy to get the others):

important(Messages) :-
    freeze(P, P>10),
    get_messages(P-_, Messages).

get_message(M) :-
    thread_self(S),
    thread_get_message(S,M,[timeout(0)]).

get_messages(M, [H|T]) :-
    copy_term(M, H),
    get_message(H),
    !,
    get_messages(M, T).
get_messages(_, []).

Would this classify as elegant? I might have a look in making this work as it might be easy and there are surely use cases for this.

2 Likes

Speaking as a complete Erlang novice and an “intermediate” SWI Prologer, I’ve been mulling the pros and cons.

I’ve gradually been chopping back the examples I gave in Erlang "ping pong" concurrent programming example translated into SWI Prolog as I realised I didn’t need thread_self (-Pid) and thread_get_message (+Pid, ?Term) since both can be combined into thread_get_message (?Term) if I’m getting messages from the thread I’m in’s queue.

I like Erlang’s

receive
   pattern1 ->
       actions1;
   pattern2 ->
       actions2;
   ....
   patternN ->
       actionsN
end.

In Prolog, I can sometimes achieve the above with something like:

message_handler(pattern1, Result1) :-
    action1(pattern1, Result1), !.
message_handler(pattern2, Result2) :-
    action2(pattern2, Result2), !.
...
message_handler(patternN, ResultN) :-
    actionN(patternN, ResultN), !.
message_handler(_, false).
 
listener :-
    thread_get_message(Pattern),
    message_handler(Pattern, Result),
   (    Result
   ->  listener
   ;     % send finish or whatever to end
   ).

But I often find myself writing ugly

 (  case == pattern1 
-> action1
;    ( case == pattern2
    -> action2
     ; ...
     )
)

So some tips on how to do case statements nicely in Prolog would be welcome.

1 Like

I may be biased, but I think the Erlang-ish solution is more elegant. If you could provide primitives in SWI-Prolog that allow us to easily implement receive/1-2, then at least I would be happy. :slight_smile:

Here’s my implementation from years ago, which seems to work, but which is a bit hacky and probably slower than it needs to be:

:- op(1000, xfy, when).

:- thread_local deferred/1.

put_deferred(Deferred) :-
    retractall(deferred(_)),
    assertz(deferred(Deferred)).

get_deferred(Deferred) :-
    deferred(Deferred).
    
add_to_deferred(Message) :-
    deferred(Deferred),
    append(Deferred, [Message], NewDeferred),
    put_deferred(NewDeferred).

:- put_deferred([]).


receive(Clauses) :-
    receive(Clauses, []).
    
receive(Clauses, Options) :-
    thread_self(Mailbox),
    get_deferred(Deferred),
    (   select(Message, Deferred, DeferredRest),
        select_clause_body(Clauses, Message, Body)
    ->  put_deferred(DeferredRest),
        call(Body)
    ;   receive(Mailbox, Clauses, Options)
    ).

receive(Mailbox, Clauses, Options) :-    
    (   thread_get_message(Mailbox, Message, Options)
    ->  (   select_clause_body(Clauses, Message, Body)
        ->  call(Body)
        ;   add_to_deferred(Message),
            receive(Mailbox, Clauses, Options)
        )
    ;   option(on_timeout(Body), Options, true),
        call(Body)
    ).

select_clause_body({PatternGuard -> Body}, Message, Body) :-
    var(PatternGuard), !,
    PatternGuard = Message.
select_clause_body({PatternGuard -> Body}, Message, Body) :-
    check(PatternGuard, Message).
select_clause_body({PatternGuard -> Body ; _}, Message, Body) :-
    check(PatternGuard, Message), !.
select_clause_body({_ ; Rules}, Message, Body) :-
    select_clause_body({Rules}, Message, Body).

check(Pattern when Guard, Message) :- !,
    Message = Pattern,
    once(Guard).
check(Pattern, Pattern).



So I suppose you would agree that having receive/1-2 in SWI-Prolog would be useful as it implements the same semantics (and, actually, it’s somewhat more powerful in Web Prolog than in Erlang since it’s semi-deterministic and allows us to use any goal in a when guard)?

Thanks, I have to go now, but will check it out tonight and tomorrow.

A reservation I have is receive strikes me as a derivative of Erlang’s case statement, which doesn’t easily lend itself to Prolog syntax:

case get_message(Pattern) of
   pattern1 ->
       actions1;
   pattern2 ->
       actions2;
   ....
   patternN ->
       actionsN
end.

As I mentioned above, I often struggle to find a neat way of achieving the very common need to pick “cases” in Prolog, but I don’t see elegant ways of switching things to the “C-way” in Prolog without mucking up its beautifully minimalist syntax.

Follows a Logtalk implementation using threaded engines (which compile to the thread predicates you mentioned). It can be run using either SWI-Prolog or ECLiPSe.

:- object(pq).

	:- threaded.

	% initialize a perpetual threaded engine holding a priority queue
	:- initialization((
		maxheap::new(PQ),
		threaded_engine_create(none, loop(PQ), pq)
	)).

	% send a message to the priority queue
	:- public(send/1).
	send(M) :-
		threaded_engine_post(pq, M).

	% retrieve a list of messages ordered top priority first
	:- public(important/1).
	important(Ms) :-
		threaded_engine_post(pq, important),
		threaded_engine_next(pq, Ms).

	loop(PQ0) :-
		threaded_engine_fetch(T),
		(	T == important ->
			maxheap::as_list(PQ0, Ms),
			pairs::values(Ms, Vs),
			threaded_engine_yield(Vs),
			maxheap::new(PQ1)
		;	T = P-M,
			maxheap::insert(P, M, PQ0, PQ1)
		),
		loop(PQ1).

:- end_object.

Sample calls:

?- {heaps(loader), types(pairs), pq}.
...
true.

?- pq::(send(13-let),send(5-out),send(11-the),send(17-who),send(7-dogs)).
true.

?- pq::important(Ms).
Ms = [who, let, the, dogs, out].

?- pq::important(Ms).
Ms = [].

?- pq::(send(8-fun), send(11-have), send(3-'😛')).
true.

?- pq::important(Ms).
Ms = [have, fun, 😛].

P.S. Example now available at https://github.com/LogtalkDotOrg/logtalk3/tree/master/examples/engines/pmq

2 Likes

Nice to see someone else using Prolog with more than just the simple ASCII characters. :crazy_face:

2 Likes

There are several issues that are all connected. It isn’t really clear how the original problem relates to them:

  • Split a stream of terms
  • Deal with (or not) out-of-order handling of terms
  • Deal with concurrency/agents/threads/engines/nodes/… (what you want to call them)

(SWI-)Prolog has library(broadcast) which can do some of that and then there are message queues, predicates (queue = clause list as Jan B argues) or any arbitrary Prolog structure possibly captured in an engine (the engine example in the docs explains this for library(heaps).
I think CHR can also be used for this, possibly inside an engine.

It is not clear to me how Erlang’s receive relates to this and whether or not this is a point in the space that requires special attention.

1 Like

Thanks, this was a good suggestion and as you can see in the updated implementation below, I now use it to represent and process the queue of deferred messages. That’s neater and probably also more efficient than my previous representation.

:- op(1000, xfy, when).

:- thread_local deferred/1.

receive(Clauses) :-
    receive(Clauses, []).

receive(Clauses, Options) :-
    thread_self(Mailbox),
    (   clause(deferred(Message), true, Ref),
        select_rclause_body(Clauses, Message, Body)
    ->  erase(Ref),
	call(Body)
    ;   receive(Mailbox, Clauses, Options)
    ).

receive(Mailbox, Clauses, Options) :-    
    (   thread_get_message(Mailbox, Message, Options)
    ->  (   select_rclause_body(Clauses, Message, Body)
        ->  call(Body)
        ;   assertz(deferred(Message)),
            receive(Mailbox, Clauses, Options)
        )
    ;   option(on_timeout(Body), Options, true),
        call(Body)
    ).

select_rclause_body({PatternGuard -> Body}, Message, Body) :-
    var(PatternGuard), !,
    check(PatternGuard, Message).
select_rclause_body({PatternGuard -> Body}, Message, Body) :-
    check(PatternGuard, Message).
select_rclause_body({PatternGuard -> Body ; _}, Message, Body) :-
    check(PatternGuard, Message), !.
select_rclause_body({_ ; Rules}, Message, Body) :-
    select_rclause_body({Rules}, Message, Body).

check(Pattern when Guard, Message) :- !,
    Message = Pattern,
    once(Guard).
check(Pattern, Pattern).

Hmm, that looks very neat, but compared to the solution using receive/2, it does something slightly different. Your solution sorts the messages in the order of their priority. In contrast, given

important(Messages) :-
    receive({
        Priority-Message when Priority > 10 ->
            Messages = [Message|MoreMessages],
            important(MoreMessages)
    },[ timeout(0),
        on_timeout(normal(Messages))
    ]).

normal(Messages) :-
    receive({
        _-Message ->
            Messages = [Message|MoreMessages],
            normal(MoreMessages)
    },[ timeout(0),
        on_timeout(Messages = [])
    ]).

If I do

?- thread_self(S), S ! 13-let, S ! 5-out, S ! 11-the, S ! 17-who, S ! 7-dogs.

I get

?- important(Ms).
Ms = [let, the, who, out, dogs].

which is the correct result given the semantics of Erlang’s receive.

Understood, and I’m beginning to think that my current implementation is sufficient for my purposes, and that it is as elegant as it can get and still remain short and readable. I’m sure that it can be implemented more efficiently (perhaps by compiling the receive clauses) but that’s not important. It will work just fine as a reference implementation of receive/1-2, and that’s all I need at this stage.

Thanks everyone! Just to wrap things up, here’s the current documentation of receive/1-2:

and here’s the updated reference implementation, using the clause based representation suggested by Jan Burse for the queue of deferred messages:

:- op(1000, xfy, when).

:- thread_local deferred/1.

receive(Clauses) :-
    receive(Clauses, []).

receive(Clauses, Options) :-
    thread_self(Mailbox),
    (   clause(deferred(Message), true, Ref),
        select_rclause_body(Clauses, Message, Body)
    ->  erase(Ref),
        call(Body)
    ;   receive(Mailbox, Clauses, Options)
    ).

receive(Mailbox, Clauses, Options) :-    
    (   thread_get_message(Mailbox, Message, Options)
    ->  (   select_rclause_body(Clauses, Message, Body)
        ->  call(Body)
        ;   assertz(deferred(Message)),
            receive(Mailbox, Clauses, Options)
        )
    ;   option(on_timeout(Body), Options, true),
        call(Body)
    ).

select_rclause_body({PatternGuard -> Body}, Message, Body) :-
    var(PatternGuard), !,
    check(PatternGuard, Message).
select_rclause_body({PatternGuard -> Body}, Message, Body) :-
    check(PatternGuard, Message).
select_rclause_body({PatternGuard -> Body ; _}, Message, Body) :-
    check(PatternGuard, Message), !.
select_rclause_body({_ ; Rules}, Message, Body) :-
    select_rclause_body({Rules}, Message, Body).

check(Pattern when Guard, Message) :- !,
    Message = Pattern,
    once(Guard).
check(Pattern, Pattern).

Suggestions for other improvements are of course still welcome!

Easy to get exactly the results you want. For example:

:- object(split).

	:- threaded.

	% initialize a perpetual threaded engine holding the important and normal queues
	:- initialization(
		threaded_engine_create(none, loop(I-I, N-N), split)
	).

	% send a message to the queues
	:- public(send/1).
	send(Term) :-
		threaded_engine_post(split, Term).

	% retrieve a list of messages with top messages first
	:- public(important/1).
	important(Terms) :-
		threaded_engine_post(split, important),
		threaded_engine_next(split, Terms).

	loop(I0-IT0, N0-NT0) :-
		threaded_engine_fetch(Term),
		(	Term == important ->
			IT0 = N0, NT0 = [],
			threaded_engine_yield(I0),
			loop(I-I, N-N)
		;	Term = P-M,
			(	P > 10 ->
				IT0 = [M| IT1],
				loop(I0-IT1, N0-NT0)
			;	NT0 = [M| NT1],
				loop(I0-IT0, N0-NT1)
			)
		).

:- end_object.

And then you get exactly the behavior you’re looking for:

?- split::(send(13-let), send(5-out), send(11-the), send(17-who), send(7-dogs)).
true.

?- split::important(List).
List = [let, the, who, out, dogs].

?- split::important(List).
List = [].

?- split::(send(8-fun), send(11-have), send(3-'😛')).
true.

?- split::important(List).
List = [have, fun, 😛].
1 Like

I never doubted that! Looks cool! :+1:

1 Like

Well, then I guess it doesn’t work in Erlang either, so we’re fine.

For a reference implementation which (hopefully!) would work when ported to the other (two or three?) Prolog systems which implement the Threads draft, I think that’s sufficient. And receive/1-2 is a primitive built-in predicate in Web Prolog, and has no dependency on clause references or a particular way to represent message queues.

It’s true that Erlang restricts what is allowed in guards to a particular subset, but I propose that we should be more generous and allow any Prolog goal to appear there. Here’s what I’ve written about this elsewhere:

Erlang enforces purity and efficiency by only allowing a restricted set of primitives in guards, and completely disallows calling user-defined functions. For the use cases the people behind Erlang had, it probably made sense to impose such restrictions. In Web Prolog, although only the first solution will be searched for, any query may be used as a guard and values of variables bound by it are available in the body. This makes the receive construct more powerful than in Erlang, but it also means that the programmer is made responsible for keeping guards as simple and efficient as possible and to avoid side effects. In our opinion, enabling the programmer to condition the matching of a receive clause on the content of the whole Prolog database makes it worth it.

I edited my answer to use difference lists. As Jan also noticed, the point is that you can use coroutining engines or threaded engines to hold any arbitrary structure.

1 Like

This strikes me as a “libraries as languages” effort which I’m prejudiced against from my experiences in some online data science courses in Python where Matlab is brought in with a library called Numpy and R with a library called Pandas.

Both were “adapted for radio by knocking a nail into a piece of wood” (to use a Monty Python joke) in that if you don’t already know R, Pandas is confusingly different from “normal” Python. The same for Matlab and Numpy.

These efforts to bring language A into language B via a library invariably force you to look at the original language’s documentation rather than the translation’s to figure out how to use them.

Once you’ve been forced to learn language A anyway, writing it in language B via a library tends to be a bit pointless. I’ve found writing R programs in a strange Python dialect via Pandas just results in much slower and less memory efficient code.

Rather than making SWI Prolog concurrency look like Erlang, I’d rather see some tutorials on how to use what is already available (which is overwhelming).

For instance, for the above problem, I’d guess using thread_peek_message (?Term) to decide whether the head of the queue is important or normal may be a solution, but given no illustrative examples, I’d have to figure out myself from scratch whether that works or new messages which arrived after the queue was peeked will be popped.

Here is a stab I’ve made at it.

:- dynamic msg/1.

important(PID, High, HighOut) :-
    thread_get_message(PID, _-Message),
    append(High, [Message], HighOut).

normal(PID, Low, LowOut) :-
    thread_get_message(PID, _-Message),
    append(Low, [Message], LowOut).

priority(High, Low) :-
    thread_self(PID),
    thread_peek_message(PID, Msg),
    (    Priority-_ = Msg
    ->   (    Priority > 10
         ->   important(PID, High, HighOut),
              priority(HighOut, Low)
         ;    normal(PID, Low, LowOut),
              priority(High, LowOut)
         )
    ;    append(High, Low, ResultList),
         assertz(msg(ResultList))
    ).

priority(Messages) :-
    retractall(msg(_)),
    thread_create(priority([], []), PID),
    thread_send_message(PID, 15-high),
    thread_send_message(PID, 7-low),
    thread_send_message(PID, 1-low), 
    thread_send_message(PID, 17-high),
    thread_send_message(PID, finish),
    thread_join(PID),
    msg(Messages).

I initially thought simply using priority(High, Low, ResultList) would return the desired ResultList, but have since reached the enlightenment that the :Goal of thread_create (:Goal, -Id) does not appear able to return values.

Writing the result of the thread to a dynamic predicate in the clause store doesn’t seem elegant to me, but I’m not sure what alternatives are available.

Once I understand the timeout option I hope I’ll be able to do away with thread_send_message(PID, finish).