Help getting up and running with multithreading

Sorry in advance for the noobie questions but the documentation is a bit sparse on usage examples and this is my first ever attempt at multithreading a program. If you could humor me with this and maybe a couple follow up questions just to push me in the right direction that would be amazing.

For this program

data([[[a,10],[b,11],[c,12],[d,13],[e,14]],
      [[f,15],[g,16],[h,16],[i,17],[j,18]],
      [[k,19],[l,20],[m,21],[n,22],[o,23]],
      [[p,24],[q,25],[r,26],[s,27],[t,28]],
      [[u,29],[v,30],[w,31],[x,32],[y,33]]]).

task_([],[]) :- sleep(3).
task_([[_,Num]|Tuples],[Num|Nums]) :-
  task_(Tuples,Nums).

task([],[]).
task([Row|Rows],[Result|Results]) :-
  task_(Row,Result),
  task(Rows,Results).

task_init(Result) :-
  data(Data),
  task(Data,Result).

I get the expected time profile

?- time(task_init(Result)).
% 42 inferences, 0.000 CPU in 15.016 seconds (0% CPU, 217617 Lips)
Result = [[10, 11, 12, 13, 14], [15, 16, 16, 17, 18], [19, 20, 21, 22, 23], [24, 25, 26, 27, 28], [29, 30, 31, 32, 33]].

because 5 rows x 3 seconds of sleep each = 15 seconds.

How would I multithread this so that I have a thread running in parallel for each row — that is, for each call recursive call of task_/2 — so we get about 3 seconds total sleep? Correct? Because we should get five parallel threads, each sleeping for 3 seconds simultaneously? Is that possible?

I thought maybe

task([],[]).
task([Row|Rows],[Result|Results]) :-
  thread_create(task_(Row,Result),_),
  task(Rows,Results).

but then I get

?- time(task_init(Result)).
% 16 inferences, 0.000 CPU in 0.000 seconds (30% CPU, 124031 Lips)
Result = ['$VAR'('_'), '$VAR'('_'), '$VAR'('_'), '$VAR'('_'), '$VAR'('_')].

You can probably rewrite this code using maplist/3.

If it works using maplist/3, then simply change it to use concurrent_maplist/3.

In general, if you have code that looks like this:

p([], []).
p([X|Xs], [Y|Ys]) :-
    do_something(X, Y),
    p(Xs, Ys).

you can change it to:

p(L1, L2) :- maplist(do_something, L1, L2).

or

p(L1, L2) :- concurrent_maplist(do_something, L1, L2).

Please note the caveats with concurrent_maplist/3 (e.g., no backtracking in do_something/2, and the overhead might overwhelm the speed-up of concurrency).

2 Likes

And if you do get to the point where you want to measure time, be specific about what clock you’re using - see statistics/2.

Choose from cputime, thread_cputime, real_time, etc. Maybe someone has a “best practice” for how to measure “time” for a multi-threaded program.

Perhaps slightly off topic but I’m trying to test an example from the coroutining guide

findall(Templ, Goal, List) :-
        setup_call_cleanup(
            engine_create(Templ, Goal, E),
            get_answers(E, List),
            engine_destroy(E)).

get_answers(E, [H|T]) :-
        engine_next(E, H), !,
        get_answers(E, T).
get_answers(_, []).

and getting No permission to modify static procedure findall/3.

I tried adding the :- redefine_system_predicate findall. directive at the top as recommended here but still doesn’t work and it gives me an additional syntax error.

How do I test this? I’m thinking coroutines may be what I actually need.

Define it with a different name. This just illustrates how one could implement findall/3 using engines.

And yes, you could use this to implement rate limiting over multiple threads by using an engine to decide when the next query may be launched. It is an elegant solution, but IMO a little more complicated than using the message queue approach.

Thanks @jan , is this what I would use for the message queue? Again, never done multithreading before in any lang so I’m just trying to figure things out.

I’ll take a whack at it but if it’s ok I’ll update here periodically with my progress for some input from the group?

Super appreciate everyone’s help.

Yes, that is what I mean. They are simple FIFO (First In First Out) queues of terms that may be accessed safely from multiple threads. Notably, a reading thread will block if there is no (matching) message in the queue to wakeup if one arrives. Two (or more) threads can read from the queue. A term is guaranteed to be given to only one thread; the others remain waiting.

They can implement quite a few ways to have cooperating threads.

That’s more than welcome, I follow this conversation closely.

1 Like

So here’s a functional simulacrum of what I’m trying to do

(?- task_init(Result)).

which is process paginated api data like

Server = [[a,b,c],
	      [d,e,f],
	      [g,h,i]].

where each row is a page.

Basic algorithm is :- Call a page, process each element on the page, store in a list, call next page, repeat until final page is processed, and then return the complete output list.

Turning the page (task/4) takes time, processing each element (task_/5) takes time. After each api call I need to check a rate limiter because I’m only allowed to run X number of requests within Y time on a moving window, so if the limit is hit, I need to wait out the remainer of the current window, reset the count, move to the next window and continue.

Would be great if I could allocate a thread to each page so instead of

Thread0 -> Page0 = [a,b,c] —> [processed(a),processed(b),processed(c)|[Page1]]
		 , Page1 = [d,e,f] ,, [Page0|[processed(d),processed(e),processed(f)|[Page2]]]
		 , Page2 = [g,h,i] ,, [Page0|[Page2|[processed(g),processed(h),processed(i)|[]]]].

I did

Thread0 -> {Page0}[a,b,c].
Thread1 -> {Page1}[d,e,f].
Thread2 -> {Page2}[g,h,i].

such that all threads are synchronized where if one thread hits the rate limit then all threads pause.

Do I understand correctly the strategy to multithread this app is

  1. Create a task queue.
  2. Push each page call into the queue, like
task(Count,Time,0,Results).
task(Count,Time,1,Results).
task(Count,Time,2,Results).
  1. Set the data I’m tracking like Time and Count in flags rather than passing around so accessible across threads.
  2. Then spawn threads to access the queue, pick up the tasks, and run in parallel?

Would that look something like this?

Please note the modifications I made to task/4 and task_/5: Where before I was using a continuation from the base case of task_/5 to perform the pagination, now since I spawn a thread per page making “turning the page” unnecessary, there is no continuation and I simply return the processed list for that page instead.

Is that correct? Would that be the more advisable pattern for what I’m doing?

This code is incomplete and work in progress, but hopefully enough to show you my thought process.

Here’s what I’m doing to push the tasks into the queue

task_queue_(P,_) :- total(P),!.
task_queue_(Page,Queue) :-
  thread_send_message(Queue,task(Page,Results)),
  succ(Page,Page0),
  task_queue_(Page0,Queue).
task_queue(Queue) :-
  message_queue_create(Queue),
  task_queue_(0,Queue).

Is that correct? Now how do I go about consuming them? Another recursive loop with thread_get_message/2 followed by call/1/2 for each task in the queue?

Also:

  • Will the call to sleep/1 in my rate_limit/0 pause all threads? Or how do I do that?
  • Even if I can pause across threads in my ratelimiter, is there a risk of two threads making a call simultaneously where that second call may be the one that violates the rate limit? If so how would you prevent that? Or should that not be an issue?
  • Perhaps a question for later but after each thread returns the output list for their assigned page, how do I consolidate all into a single output list like I had in st.pl? Do I have them assert it into the db on exit like assert(Page,List). and pick it up from there?

I didn’t look at the code, just at the comments. Some things stand out

There is no point to deal with a Results variable here. The thread that accepts this message gets a copy, so binding Results is a no-op. What you do instead is that you create a second queue for the results. Now the thread simple gets task(Page), solves this puzzle and sends task(Page, Results) to the other queue. Alternatively, and it does case possibly easier, is to simply assert task(Page, Result), so after all is done you have a dynamic predicate holding all results.

As to the rate limiting, my first choice would be to send the task-terms to the queue at the rate limit, to make it simple, using a fixed sleep/1 between adding terms.

sleep/1 only pauses the calling thread. There is, as in most thread libraries, no (reliable) way to pause other (or all) threads. You can often pause another thread using

  thread_signal(Target, sleep(Time))

But there is no guarantee when thread signals are processed. Notably blocking system calls as well as long running foreign code may delay handling for a long time.

Note that if you want to stop this process gracefully, you should send as many stop messages to the queue as you have threads listening to the queue. On receiving stop, the thread should simply succeeds its loop. Now you can call thread_join/1 on each of the threads you created. After all join operations succeed, all work is done.

concurrent_maplist/3 provides well tested example code for all of this, including aborting the whole process if some error occurs.

Ok, that makes sense and about what I figured. Thanks for confirming.

Sorry, not quite sure I follow. The adding of the terms to the queue isn’t what I need to rate limit for, but the actual execution of those terms. So single threaded I’m doing

Thread0, do_something, check_rl, do_something, rl_reached, sleep...., do_something

and with multi thread it should be

Thread0, do_something, check_rl, do_something, rl_reached, sleep...., do_something
Thread1, do_something, check_rl, do_something, sleep...., do_something

where any thread hitting the rl should sleep all others. But if they’re running in parallel I don’t get how that’s done.

Can you clarify please?

If you send jobs to the queue that implicate exactly one request and you send them to the queue at the rate limit you’ll never get above the limit. Remember that getting a message from a queue that is empty will block until there is a (matching) message.

Ahh ok I see, that helps.

Right, but each page has multiple calls that are made which I was willing to run in serial per thread: I was hoping I could only have the pages run in parallel like

T0 -> P0 -> process(a), process(b), process(c)
T1 -> P1 -> process(d), process(e), process(f)
T2 -> P2 -> process(g), process(h), process(i)

Then it sounds like I won’t be able to do that and instead I would need to flip it around and call the pages in serial but process each element on the page in parallel like this?

T0 -> P0 -> T1 -> process(a)
            T2 -> process(b)
            T3 -> process(c)
T0 -> P1 -> T1 -> process(d)
            T2 -> process(e)
            T3 -> process(f)
T0 -> P2 -> T1 -> process(g)
            T2 -> process(h)
            T3 -> process(i)

Alright will work on this more next weekend. This is helping, thank you!

Alright, the fruits of my labor. (?- main. → ?- listing(mydata/3).)

I think this works to rate limit as needed without even using a message queue.

I may have more questions later, and just for my own edification I would still like to understand how engines work, but I think I have enough to go on for now.

Thanks again, Jan!