Help getting up and running with multithreading

Sorry in advance for the noobie questions but the documentation is a bit sparse on usage examples and this is my first ever attempt at multithreading a program. If you could humor me with this and maybe a couple follow up questions just to push me in the right direction that would be amazing.

For this program

data([[[a,10],[b,11],[c,12],[d,13],[e,14]],
      [[f,15],[g,16],[h,16],[i,17],[j,18]],
      [[k,19],[l,20],[m,21],[n,22],[o,23]],
      [[p,24],[q,25],[r,26],[s,27],[t,28]],
      [[u,29],[v,30],[w,31],[x,32],[y,33]]]).

task_([],[]) :- sleep(3).
task_([[_,Num]|Tuples],[Num|Nums]) :-
  task_(Tuples,Nums).

task([],[]).
task([Row|Rows],[Result|Results]) :-
  task_(Row,Result),
  task(Rows,Results).

task_init(Result) :-
  data(Data),
  task(Data,Result).

I get the expected time profile

?- time(task_init(Result)).
% 42 inferences, 0.000 CPU in 15.016 seconds (0% CPU, 217617 Lips)
Result = [[10, 11, 12, 13, 14], [15, 16, 16, 17, 18], [19, 20, 21, 22, 23], [24, 25, 26, 27, 28], [29, 30, 31, 32, 33]].

because 5 rows x 3 seconds of sleep each = 15 seconds.

How would I multithread this so that I have a thread running in parallel for each row — that is, for each call recursive call of task_/2 — so we get about 3 seconds total sleep? Correct? Because we should get five parallel threads, each sleeping for 3 seconds simultaneously? Is that possible?

I thought maybe

task([],[]).
task([Row|Rows],[Result|Results]) :-
  thread_create(task_(Row,Result),_),
  task(Rows,Results).

but then I get

?- time(task_init(Result)).
% 16 inferences, 0.000 CPU in 0.000 seconds (30% CPU, 124031 Lips)
Result = ['$VAR'('_'), '$VAR'('_'), '$VAR'('_'), '$VAR'('_'), '$VAR'('_')].

You can probably rewrite this code using maplist/3.

If it works using maplist/3, then simply change it to use concurrent_maplist/3.

In general, if you have code that looks like this:

p([], []).
p([X|Xs], [Y|Ys]) :-
    do_something(X, Y),
    p(Xs, Ys).

you can change it to:

p(L1, L2) :- maplist(do_something, L1, L2).

or

p(L1, L2) :- concurrent_maplist(do_something, L1, L2).

Please note the caveats with concurrent_maplist/3 (e.g., no backtracking in do_something/2, and the overhead might overwhelm the speed-up of concurrency).

2 Likes

And if you do get to the point where you want to measure time, be specific about what clock you’re using - see statistics/2.

Choose from cputime, thread_cputime, real_time, etc. Maybe someone has a “best practice” for how to measure “time” for a multi-threaded program.

Perhaps slightly off topic but I’m trying to test an example from the coroutining guide

findall(Templ, Goal, List) :-
        setup_call_cleanup(
            engine_create(Templ, Goal, E),
            get_answers(E, List),
            engine_destroy(E)).

get_answers(E, [H|T]) :-
        engine_next(E, H), !,
        get_answers(E, T).
get_answers(_, []).

and getting No permission to modify static procedure findall/3.

I tried adding the :- redefine_system_predicate findall. directive at the top as recommended here but still doesn’t work and it gives me an additional syntax error.

How do I test this? I’m thinking coroutines may be what I actually need.

Define it with a different name. This just illustrates how one could implement findall/3 using engines.

And yes, you could use this to implement rate limiting over multiple threads by using an engine to decide when the next query may be launched. It is an elegant solution, but IMO a little more complicated than using the message queue approach.

Thanks @jan , is this what I would use for the message queue? Again, never done multithreading before in any lang so I’m just trying to figure things out.

I’ll take a whack at it but if it’s ok I’ll update here periodically with my progress for some input from the group?

Super appreciate everyone’s help.

Yes, that is what I mean. They are simple FIFO (First In First Out) queues of terms that may be accessed safely from multiple threads. Notably, a reading thread will block if there is no (matching) message in the queue to wakeup if one arrives. Two (or more) threads can read from the queue. A term is guaranteed to be given to only one thread; the others remain waiting.

They can implement quite a few ways to have cooperating threads.

That’s more than welcome, I follow this conversation closely.

1 Like

So here’s a functional simulacrum of what I’m trying to do

(?- task_init(Result)).

which is process paginated api data like

Server = [[a,b,c],
	      [d,e,f],
	      [g,h,i]].

where each row is a page.

Basic algorithm is :- Call a page, process each element on the page, store in a list, call next page, repeat until final page is processed, and then return the complete output list.

Turning the page (task/4) takes time, processing each element (task_/5) takes time. After each api call I need to check a rate limiter because I’m only allowed to run X number of requests within Y time on a moving window, so if the limit is hit, I need to wait out the remainer of the current window, reset the count, move to the next window and continue.

Would be great if I could allocate a thread to each page so instead of

Thread0 -> Page0 = [a,b,c] —> [processed(a),processed(b),processed(c)|[Page1]]
		 , Page1 = [d,e,f] ,, [Page0|[processed(d),processed(e),processed(f)|[Page2]]]
		 , Page2 = [g,h,i] ,, [Page0|[Page2|[processed(g),processed(h),processed(i)|[]]]].

I did

Thread0 -> {Page0}[a,b,c].
Thread1 -> {Page1}[d,e,f].
Thread2 -> {Page2}[g,h,i].

such that all threads are synchronized where if one thread hits the rate limit then all threads pause.

Do I understand correctly the strategy to multithread this app is

  1. Create a task queue.
  2. Push each page call into the queue, like
task(Count,Time,0,Results).
task(Count,Time,1,Results).
task(Count,Time,2,Results).
  1. Set the data I’m tracking like Time and Count in flags rather than passing around so accessible across threads.
  2. Then spawn threads to access the queue, pick up the tasks, and run in parallel?

Would that look something like this?

Please note the modifications I made to task/4 and task_/5: Where before I was using a continuation from the base case of task_/5 to perform the pagination, now since I spawn a thread per page making “turning the page” unnecessary, there is no continuation and I simply return the processed list for that page instead.

Is that correct? Would that be the more advisable pattern for what I’m doing?

This code is incomplete and work in progress, but hopefully enough to show you my thought process.

Here’s what I’m doing to push the tasks into the queue

task_queue_(P,_) :- total(P),!.
task_queue_(Page,Queue) :-
  thread_send_message(Queue,task(Page,Results)),
  succ(Page,Page0),
  task_queue_(Page0,Queue).
task_queue(Queue) :-
  message_queue_create(Queue),
  task_queue_(0,Queue).

Is that correct? Now how do I go about consuming them? Another recursive loop with thread_get_message/2 followed by call/1/2 for each task in the queue?

Also:

  • Will the call to sleep/1 in my rate_limit/0 pause all threads? Or how do I do that?
  • Even if I can pause across threads in my ratelimiter, is there a risk of two threads making a call simultaneously where that second call may be the one that violates the rate limit? If so how would you prevent that? Or should that not be an issue?
  • Perhaps a question for later but after each thread returns the output list for their assigned page, how do I consolidate all into a single output list like I had in st.pl? Do I have them assert it into the db on exit like assert(Page,List). and pick it up from there?

I didn’t look at the code, just at the comments. Some things stand out

There is no point to deal with a Results variable here. The thread that accepts this message gets a copy, so binding Results is a no-op. What you do instead is that you create a second queue for the results. Now the thread simple gets task(Page), solves this puzzle and sends task(Page, Results) to the other queue. Alternatively, and it does case possibly easier, is to simply assert task(Page, Result), so after all is done you have a dynamic predicate holding all results.

As to the rate limiting, my first choice would be to send the task-terms to the queue at the rate limit, to make it simple, using a fixed sleep/1 between adding terms.

sleep/1 only pauses the calling thread. There is, as in most thread libraries, no (reliable) way to pause other (or all) threads. You can often pause another thread using

  thread_signal(Target, sleep(Time))

But there is no guarantee when thread signals are processed. Notably blocking system calls as well as long running foreign code may delay handling for a long time.

Note that if you want to stop this process gracefully, you should send as many stop messages to the queue as you have threads listening to the queue. On receiving stop, the thread should simply succeeds its loop. Now you can call thread_join/1 on each of the threads you created. After all join operations succeed, all work is done.

concurrent_maplist/3 provides well tested example code for all of this, including aborting the whole process if some error occurs.

Ok, that makes sense and about what I figured. Thanks for confirming.

Sorry, not quite sure I follow. The adding of the terms to the queue isn’t what I need to rate limit for, but the actual execution of those terms. So single threaded I’m doing

Thread0, do_something, check_rl, do_something, rl_reached, sleep...., do_something

and with multi thread it should be

Thread0, do_something, check_rl, do_something, rl_reached, sleep...., do_something
Thread1, do_something, check_rl, do_something, sleep...., do_something

where any thread hitting the rl should sleep all others. But if they’re running in parallel I don’t get how that’s done.

Can you clarify please?

If you send jobs to the queue that implicate exactly one request and you send them to the queue at the rate limit you’ll never get above the limit. Remember that getting a message from a queue that is empty will block until there is a (matching) message.

Ahh ok I see, that helps.

Right, but each page has multiple calls that are made which I was willing to run in serial per thread: I was hoping I could only have the pages run in parallel like

T0 -> P0 -> process(a), process(b), process(c)
T1 -> P1 -> process(d), process(e), process(f)
T2 -> P2 -> process(g), process(h), process(i)

Then it sounds like I won’t be able to do that and instead I would need to flip it around and call the pages in serial but process each element on the page in parallel like this?

T0 -> P0 -> T1 -> process(a)
            T2 -> process(b)
            T3 -> process(c)
T0 -> P1 -> T1 -> process(d)
            T2 -> process(e)
            T3 -> process(f)
T0 -> P2 -> T1 -> process(g)
            T2 -> process(h)
            T3 -> process(i)

Alright will work on this more next weekend. This is helping, thank you!

Alright, the fruits of my labor. (?- main. → ?- listing(mydata/3).)

I think this works to rate limit as needed without even using a message queue.

I may have more questions later, and just for my own edification I would still like to understand how engines work, but I think I have enough to go on for now.

Thanks again, Jan!

Could use a push in the right direction.

I’ve finally started testing my app and so far have gotten

Warning: [Thread 300] Thread running

and

Warning: [Thread 850] Thread running

on two different runs, and after tweaking a few things, followed by a huge block of a lot of angry red text, and then the run culminates in illegal hardware instruction swipl app.pl.

I know you may be limited without seeing what exactly the app is doing but any advice on what these warnings generally mean would be great.

Edit: Also one quick unrelated follow up:

If you have

goal1,
goal2

where goal1 calls other goals that spawn threads, I’m seeing goal2 running before goal1 completes. How do I ensure goal2 runs only after goal1 completes, like await in nodejs?

Is setup_call_cleanup/3 the only option for that or is there a better way?

A “grep” on the source tells me this message is from detached threads that die on an error/failure. The message should contain more that tell you the goal of the thread and the reason it died. Given the design discussed, I don’t see how you come to 850 threads though. It seems you are doing something very different.

Asynchronous functions are quite different from threads. To wait for completion there is thread_join/1,2. In the design discussed you only use this to wait for all work to complete at the end.

setup_call_cleanup/3 doesn’t wait for threads. It is intended in a single thread to create resources, use then and clean them up. The most typical example is a handle to an open stream.

So regarding thread_join/1/2 , if I have this for example

g2(T) :-
  atomic_concat("got: ",T,Log),
  writeln(Log),
  sleep(3),
  writeln("Done.").

g1(0).
g1(T) :-
  thread_create(g2(T),_), %g2(T),
  succ(T0,T),
  g1(T0).

main :-
  writeln("starting."),
  g1(5),
  writeln("ending.").

How do I ensure that “ending” only prints after g1 completes?

It of course does when running single threaded, like if you run g2(T) rather than thread_create(g2(T),_) in g1, but not when I create multiple threads for the goal so that I can wait out all my sleep(3)'s in g2 in parallel.

Output for the above is roughly

?- main.
starting.
got: 4
got: 5
ending.
got: 3
got: 1
got: 2
true Done.
Done.
Done.
Done.
Done.

Is the way you do it by modifying main to something like this?

main :-
  writeln("starting."),
  thread_create(g1(5),T_id),
  thread_join(T_id),
  writeln("ending.").

I did test this and it seems to work but I wanted doublecheck because on a whim I tested with larger numbers than 5, like thread_create(g1(20),T_id), and, it takes a few runs, but you occasionally get results like

?- 
|    main.
starting.
got: 19
got: 20
got: 18
got: 16
got: 12
got: 10
got: 15
got: 9
got: 7
got: 14
got: 17
got: 13
got: 8
got: 6
got: 3
got: 2
got: 11
got: 1
got: 5
ending.
got: 4
true.
...

…another few runs…

?- main.
starting.
got: 20
got: 19
got: 18
got: 17
got: 16
got: 14
got: 7
got: 2
got: 13
got: 8
got: 5
got: 10
got: 1
got: 12
got: 4
got: 15
got: 9
got: 6
ending.
got: 11
got: 3
true.

Is thread_join not guaranteed to wait or is this a bug? Or am I doing something wrong? What about for mission critical situations where not waiting completely will result in errors? Is there alternative I can use that would be more atomically guaranteed?

(even with 5 it happens occasionally.)

Seems to me that you should call join/1 on every thread you started, of course after all of them have been submitted, but then I would use concurrent_forall/3 instead:

g1(N) :-
    concurrent_forall(between(1,N,T),g2(T),[threads(N)]).

Note the option threads(N) to overcome the default number of HW threads available.

Thanks. I’d still like to know how exactly to fix using thread_create but I think concurrent_forall/3 works. Didn’t know about that one.

What if g2 relies on g1 for some token?

g2(K,T) :-
  atomic_list_concat([
    "got: ",T,'\n',
    "with: ",K],Log),
  writeln(Log),
  sleep(3),
  writeln("Done.").

g1(0).
g1(T) :-
  nth1(T,[q,w,e,r,t],K),
  g2(K,T),
  succ(T0,T),
  g1(T0).

main :-
  writeln("starting."),
  g1(5),
  writeln("ending.").

What is the general strategy to refactor code like this? Is it best practice to ensure that all dependencies are called directly from g2?

For example in this case I guess I could do

token(T,K) :-
  nth1(T,[q,w,e,r,t],K).  

g2(T) :-
  token(T,K),
  atomic_list_concat([
    "got: ",T,'\n',
    "with: ",K],Log),
  writeln(Log),
  sleep(3),
  writeln("Done.").

main :-
  writeln("starting."),
  N = 5,
  concurrent_forall(between(1,N,T),g2(T),[threads(N)]),
  writeln("ending.").

Is that the way this is typically done?

Roughly, there are three ways to wait for some thread to have done something.

  1. Use thread_join/1 to wait for the thread that does the work to complete
  2. Have the thread that does the work send some message to a message queue and wait for this message to arrive (that seems most appropriate here).
  3. Use thread_wait/2 to wait for a condition on the (dynamic) database. thread_wait/2 takes a goal that must become true to cause the wait to end and a list of options that tell it when to reevaluate this goal.

(1) is comparable to what you would do when spawning processes. (2) is comparable to using pipes to connect the process input of one process to the output of another and (3) relates to POSIX condition variables.

1 Like