Some basic Messaging patterns, howto in Web-Prolog?

Is there some repository that shows how to implement these basic patterns in Web-Prolog by just using the Erlang idiom, and not more?

  1. Competing Consumers:

python-two

  1. Publish/Subscribe:

python-three

  1. Routing:

python-four

  1. Wire Tap (combinable with 1), 2) and 3) for debugging):

WireTap

1 Like

No, there isn’t. The best (and most important) examples of what can be built on top of spawn/2-3, !/2, receive/1-2 etc. is the pengine, and rpc/2-3 on top of pengines.

Contributions are of course welcome! :slight_smile:

For 1) we could make the consumers more active, in that they announce each time when they are ready to consume. So it would be more this here, with the arrows from the consumers inverted. In the implementation it is then essential that the receive can defer a guarded message:

0dbcccfeaf89b4ad263e2158b6aee27fcb5d8896

/* competing consumers pipe */
pipe(Queue) :-
receive 
    get(Pid) when \+ queue_empty(Queue) ->  /* from consumer C1,..,Cn */ 
          queue_get(Queue, Item, Queue2),
          Pid ! reply(Item), 
          pipe(Queue2).
    put(Item) ->  /* from producer P */
          queue_put(Queue, Item, Queue2),
          pipe(Queue2).

So what happens, we have an additional actor for the pipe besides the participating actors P and C1,…,Cn. The code doesn’t show queue details. Instead the queue is an argument to the actors perpetual code, it could be also some datastructure with side effect.

You could rpc-ify it by this stub:

    get(Pipe, Item) :-
          self(Pid),
          Pipe ! get(Pid),
          receive
              reply(Item) -> true. 

Can all this be demonstrated on xxx.prolog.computer?

Yes, although I’m not going to spend time actually programming this myself (I have other aspects I want to concentrate on) I think that something like that might work.

If your queues are lists, for example, you should be able to call

?- spawn(pipe([]), Pid).
Pid = ....

and then start talking to your concurrent queue using either asynchronus communication primitives such as !/2 and receive/2, or choose a synchronous more high-level solution, such as your get/2 – which is defined in terms of !/2 and receive/2. Apart from no sign of respect for concrete syntax, you seem to get it. :slight_smile:

No, you need a node that offers the ACTOR profile for that. If you haven’t already, you would need to download and install the PoC at https://github.com/Web-Prolog/swi-web-prolog and run the code there.

It’s a nice example, so I would perhaps like to add it to the many other examples in the tutorial that comes with the PoC, if that’s alright with you. If you don’t implement it, I might, but only later.

Sure, my guess is that many examples in that book can fairly easily be translated into Web Prolog. After all, Akka is inspired by Erlang. (Jonas Bonér, who created Akka, worked for Ericsson too.)

It’s even easier to translate programs written in Erlang, though, or programs written in Elixir (since Elixir is basically Erlang with a different syntax).

Erlang has something called OTP (stands for Open Telecom Platform), which contains many interesting libraries for making writing fault-tolerant concurrent and distributed applications easier. Some of them might be possible to port to Web Prolog. For others, it’s not clear how useful they will be in a web programming context.

It’s more promising, I think, to first work in the direction of pengines and RPC. I’ve shown it before, but I think a program such as this says a lot about what can be done because receive/1-2 is semi-deterministic:

search(Query, Pid, Options) :-
    self(Self),
    spawn(query(Query, Pid, Self), Pid, [
        monitor(true),
        src_predicates([query/2])
        | Options
    ]).
    
query(Query, Self, Parent) :-
    call_cleanup(Query, Det=true),
    (   var(Det)
    ->  Parent ! success(Self, Query, true),
        receive({
            next -> fail;
            stop -> Parent ! stopped(Self)
        })
    ;   Parent ! success(Self, Query, false)
    ).

Again, we don’t have to build such things ourselves, since we have pengines.

First, let me point out that Erlang’s receive primitive is often referred to as a selective receive. (Googling for ’ “selective receive” Erlang’ finds almost 2000 hits.)

I see at least three issues with your proposal:

  1. What about guards? Where do they come in? If you’re serious here, you owe me an implementation of important/1 using your message_select/2, i.e. what we discussed in this topic.

  2. And in what way is your proposal any better than using thread_get_message/1, like so?:

    query(Query, Self, Parent) :-
        call_cleanup(Query, Det=true),
        (   var(Det)
        ->  Parent ! success(Self, Query, true),
            thread_get_message(Message),
            (Message==next -> fail;
             Message==stop -> Parent ! stopped(Self))
        ;   Parent ! success(Self, Query, false)
        ).
  1. And what happens if someone sends a message foo to the process, does is crash? Or is the message deferred? Using thread_get_message/1 as above would not defer foo, but since the whole if-then fails, it would have the same effect as sending next. (There are ways around that but you have to change the form of your messages.) What about your solution?

Since you plugged out my receive from the example and plugged in your own construct, I got the impression you were, but never mind.

Ok, and if you also specify how it works with respect to the deferring of messages, you’d have something that might be equivalent to Erlang’s receive, right? But then why not go with something that basically is Erlang’s receive, except that it also has a couple of extra properties such as more expressive guards and a semi-deterministic behaviour?

A predicate is said to be semi-deterministic if it either fails, or succeeds exactly once. That’s true of read/1 for example:

?- read(X).
|: a.
X = a.

?- read(a).
|: b.
false.

Now, calling receive/1-2 either fails, or succeeds exactly once, so it is semi-deterministic. You can of course choose to implement something like this using receive/1:

message_select([Alt1, Alt2], I) :-
    receive({
        Alt1 -> I = 1;
        Alt2 -> I = 2
    }).

and then call it like so:

    ...
    message_select([next, stop], I),
    (I==1 -> fail;
     I==2 -> Parent ! stopped(Self)),
    ...

But why would you? Just to be able to say that your receive construct is deterministic?

And again, what if foo comes along? The receive in the definition of message_select/2 will defer it, but what should be the value of I then, and what happens in the if-then construct?

It seems to me that the if-then construct must somehow be moved into message_select/2, and yeah, wait for it … that’s what Erlang’s and Web Prolog’s receive does!

But, with the if-then construct integrated in that way, what should happen if the body of one of the receive clauses fails? We have to account for this possibility. Of course, you could imagine throwing an error at that point, but I think it’s much more useful if the whole call to receive/1-2 is allowed to fail. That’s the only way it can fail, and that’s why it must be deemed semi-deterministic.

Good move! :slight_smile:

Sure, here’s an actor implementing one of the patterns you asked for - a simple publish-subscribe service:

pubsub_service(Subscribers0) :-
    receive({
        publish(Message) ->
            forall(member(Pid, Subscribers0), Pid ! msg(Message)),
            pubsub_service(Subscribers0);
        subscribe(Pid) ->
            pubsub_service([Pid|Subscribers0]);
        unsubscribe(Pid) ->
            (   select(Pid, Subscribers0, Subscribers)
            ->  pubsub_service(Subscribers)
            ;   pubsub_service(Subscribers0)
            ).    
    }).

It you are the owner of a node, you can spawn this actor, register it under a mnemonic name, and make it available on the Web:

:- spawn(pubsub_service([]), Pid),
   register(pubsub_service, Pid).

Here’s how to subscribe to the service, and invoke a repeat-fail loop waiting for messages to arrive from it:

?- self(Self),
   pubsub_service ! subscribe(Self),
   repeat,
   io:write("Waiting for a message ..."),
   receive({
       msg(Message) ->
           io:format("Received: ~p", [Message]),
           fail
   }).

Here’s how to publish a message:

?- pubsub_service ! publish(hello).

And no, it won’t work against an ISOBASE node - ACTOR capabilities are required.

This example is from the Web Prolog tutorial. If you haven’t already, you would need to download and install the PoC available at https://github.com/Web-Prolog/swi-web-prolog and run the code there.

You pub/sub is not in a separate file, so that it can be consulted. Its embedded inside HTML. But the idea would be to have a repository with modules that can be used via use_module/1. I wrote:

You see that there is no referential reuse of the massaging pattern in your code, since the code is copy pasted and duplicated into your chat example and into your resident file. It takes a little bit more work to make a re-usable (can be use via use_module/1) messaging pattern repository than simply having by accident some messaging patterns here and then.

Or is use_module/1 not part of ISOBASE? How would you wire tap your pub sub? An example product that is reusable is RabbitMQ, which was prototyped in Erlang. You can still find and download the RabbitMQ Erlang source code, it is a little large though. Of course an open source Web Prolog repository with client and server libraries would then show how it is done in Web Prolog.

Edit 20.11.2019:
I don’t know yet exactly how messaging pattern are reused. Have to check what the various books exactly do, how they instantiate a message pattern. In your example you use spawn/2, because its only one actor realizing the core object of the messaging pattern. But how to wire tap the thing?

Since an ISOBASE node is designed for stateless querying only, it’s very unlikely that it will offer use_module/1. An ISOTOPE node might offer it (or something comparable), and the same goes for an ACTOR node.

What do you mean by “wire tap”? The clients talk to the pub-sub server over websockets, if that’s what you mean. The same way as a shell written in JavaScript talks to a pengine, and the same way as a pengine talks to another pengine running on a remote node. That’s why it needs an ACTOR node.

You shouldn’t compare a simple demo in a tutorial with something like that.

Now I have also actors running in Jekejeke Prolog currently
based on UDP/IP so that process borders can be crossed. In
my prototype of a broker for Web Prolog the PIDs are clear

text, so I don’t really need some register/2. Time-out works fine,
here is an example with time-out:

ha(_) :-
   receive([stop], 1000, _), !.
ha(N) :-
   write(N), write(': Ha.'), nl,
   tschi(N).

tschi(_) :-
   receive([stop], 1000, _), !.
tschi(N) :-
   write(N), write(': Tschi'), nl,
   ha(N).

Here is an example interaction on localhost:3011 with such an actor class:

Jekejeke Prolog 4, Runtime Library 1.4.2 (November 12, 2019)

?- spawn(localhost, 3010, ha(1), Pid1).
Pid1 = pid(localhost, 3010, 'Thread-7')

?- spawn(localhost, 3010, ha(2), Pid2).
Pid2 = pid(localhost, 3010, 'Thread-8')

?- send(pid(localhost, 3010, 'Thread-7'), stop).
Yes

Credits go to Torbjörn Lager in as far as I used the mashup
from the SWI-Prolog discussion list about implementing
receive/[2,3]. There I threw in the use of clause references

which can be seen in the proof of concept now. But I didn’t
need to add much to Jekejeke Prolog. The message queues
already had a pipe_poll/3 with a time-out parameter, so that

this could be used for the receive/3 variant.

Source code of the broker:
https://gist.github.com/jburse/253739554305af625e746366cff9b66b#file-broker-p

The actors receive is a little primitive since it does always consume the matching message queue entry. So for example if you want to have Future.isDone() call, you probably need to add to Erlang a peek which does return a copy of the message queue entry but leaves the message queue nevertheless intact.

Otherwise this has to be solved as follows, with pushing the future back on the queue. Also the isDone() call has to be mediated by the actor that owns the message queue. So the Java example does not really translate to a direct call of isDone() as below, and would be more complicated in practice:

isDone(Future) :-
    receive 
       Future  -> self(Pid), Pid ! Future;
       after 0 -> fail.

Great! Hope you can build something that can run the prio queue example, since this seems to be a nice acid test for what we may want to require of a construct like this.

Works so far, but it needs the new version of the broker, and I also needed to push a fix for my message queues. There was a glitch when time-out=0, but this should be fixed now:

grafik

The example is prioque.p:
https://gist.github.com/jburse/253739554305af625e746366cff9b66b#file-prioque-p

The new broker is broker2.p, you can compare
it with broker.p to see how I added guards:
https://gist.github.com/jburse/253739554305af625e746366cff9b66b#file-broker2-p

Since I needed to push a fix to my messaging queue library, you can only try it after release 1.4.2 of Jekejeke Prolog has been published. The fix is even not yet on GitHub.

The same broker (actually version 3) can also do mobile devices:


https://gist.github.com/jburse/d184be158356b56e2c17a59babe45636#gistcomment-3091651

Woa!

But the adoption of spawn from Erlang/Elixir that returns a Pid should be scrutinized. It could be more efficient to have simply a run that doesn’t return a Pid. Similar idea has the “go” command in the go programming language. spawn could be bootstrapped.

Here is how spawn/4 can be bootstrapped provided we have only run/3:

% spawn(-Atom, +Integer, +Goal, -Pid)
spawn(H, P, G, A) :-
   self(C),
   run(H, P, (self(B), send(C,started(B)), G)),
   receive(started(A), _).

So spawn/4 is disposable. I retested the Ha Tschi example and it runs fine.
Will update the new code on gist.

Looks good! Now if you would use the Erlang-ish syntax too, you would have the beginning of an implementation of Web Prolog. :slight_smile:

Then it wouldn’t be Web Prolog. Doesn’t seem wise to miss the chance, in my opinion, to have a syntax and semantics which is close to Erlang – something that an Erlang programmer would understand right away. I’m not familiar with Go, but I doubt it has a syntax derived from Prolog.

Yes, that looks like something I might to in my implementation too.

You mean the other way around – how pengine_spawn/1-2 is defined in terms of spawn/2-3?

In Web Prolog, pengine_spawn/1-2 is defined in terms of spawn/2-3. A pengine is seen as a special case of an actor – an actor that comes with a built-in protocol.

Have a look at this:

simple_pengine(Query, Pid, Options) :-
    self(Self),
    spawn(query(Query, Pid, Self), Pid, [
        monitor(true),
        src_predicates([query/2])
        | Options
    ]).
    
query(Query, Self, Parent) :-
    call_cleanup(Query, Det=true),
    (   var(Det)
    ->  Parent ! success(Self, Query, true),
        receive({
            next -> fail;
            stop -> Parent ! stopped(Self)
        })
    ;   Parent ! success(Self, Query, false)
    ).

Of course, a “real” pengine is more complicated.

Your onions can be read as bootstrapping hierarchy. But what would be more interesting how you plug in different transports into the lowest levels primitives. I think Erlang proposes to write different servers not assuming rpc, which in my world go by the name message brokers. The rest then follows.

Its strange that there is no primitive that allows bootstrapping spawn/2-3 in Web Prolog Ă  la SWI-Prolog. Also rpc/2 can be bootstrapped, but you make it a primitive:

For example Erlang had traditionally rpc/2 bootstrapped. The book is also full of servers, which in my world go by the name message brokers, but I did not yet cut through it, and whether its usuable. I had Erlang book 1st edition, this Erlang boo 2st edition, which seems to be a total rewrite:

Unbenannt4
https://gangrel.files.wordpress.com/2015/08/programming-erlang-2nd-edition.pdf

The book is full of rpc bootstrapping.