More thread wrangling

Hi, this is a follow up to my previous post about this. Now that I’m a little more practiced in multithreading, I would like to reset a bit on this question if I may.

Again, what I’m trying to accomplish:

  1. Paginate through API calls.
  2. Abide by a rate limit.
  3. Execute page calls:
    1. in order
    2. asynchronously, up until the rate limit.

On the previous post I was given advice which ultimately helped me accomplish my goal (thank you!). The advice was: If you have units of work [1, 2, 3, 4, 5, 6] then “bucketize“ the work per the rate limit, so if it’s 3 you should get [[1, 2, 3],[4, 5, 6]] , and then iterate through each bucket, executing each concurrently and sleeping between each bucket.

I used concurrent/3 for this and it worked like a charm.

The only reason this worked though is because the API I’m calling happens to provide a totalPages property so, very roughly, that enabled me to do something like

get_things([]).
get_things([Bucket|Buckets]) :-
  concurrent(3,Bucket,[]),
  sleep(Rate_limit_sleep),
  get_things(Buckets).

get_things_wrapper(Things) :-
  api_call(get_things,Response),
  Total_pages = Response.totalPages,
  bucketize_pages(Total_pages,Page_buckets),
  get_things(Page_buckets).

What I’m wondering though, is what if I was not provided the total pages beforehand?

What if instead of totalPages the response only provides a boolean is_last property which lets you know when you’ve paginated to the last page. In that case you can’t bucketize as above because you don’t know how many buckets to create and you would have to process each page as you get to it to check for that property.

So I’m trying to get this minimal example to work:

rate_limit(Count,New_count) :-
  succ(Count,Count_),
  once((
     Count_ = 3,
     writeln("wait"),
     sleep(3),
     New_count = 0
     ;
     New_count = Count_
   )).

work(Work) :-
  %Network_latency = 2,
  %sleep(Network_latency),
  writeln(Work).

do_work(_,[]) :- !.
do_work(C,[Work|Works]) :-
  thread_create(work(Work),_),
  rate_limit(C,C0),
  do_work(C0,Works).

main :-
  numlist(1,12,Work),
  do_work(0,Work).

(in this example we happen to know there are 12 units of work but again, assume you don’t know that in advance.)

Ignoring the “network latency” for now, what I would naively expect running main here is 1, 2, 3, wait(3s), 4, 5, 6, wait(3s)..., and so on.

Instead what I’m getting is a variety of wonky output like wait 1, 2, 3, 4, 5, 6, wait, 7, wait, 8, 9, 10, 11, wait, 12 or 1, 2, wait, 3, 4, wait, 5, 6, wait, 7, 9, 8, 10, 11, wait, 12.

From my surface level understanding I believe this because we are offloading the threads to the OS but we’re cycling through do_work/2 so fast that the OS isn’t able to schedule them before we hit the rate limit.

Now uncomment the two latency lines in work/1 and run it. From my testing, this now executes concurrently; ie. I only had to wait 2 seconds for all three, not 3 units_work * 2 seconds_each = 6 seconds synchronous wait, but out of order.

Any guidance here is greatly appreciated.

In short what I’m trying to do is get my output to appear exactly like, and be as performant as, this

rate_limit(Count,New_count) :-
  succ(Count,Count_),
  once((
     Count_ = 10,
     writeln("wait"),
     sleep(3),
     New_count = 0
     ;
     New_count = Count_
   )).

work(Work) :-
  %Network_latency = 2,
  %sleep(Network_latency),
  writeln(Work).

do_work(_,[]) :- !.
do_work(C,[Work|Works]) :-
  work(Work),
  rate_limit(C,C0),
  do_work(C0,Works).

main :-
  numlist(1,100,Work),
  do_work(0,Work).

except when you uncomment network latency, I don’t want to wait 2 seconds * 10 units of work = 20 seconds between rate limits. I want them to execute concurrently so I only wait 2 seconds total.

As another example, or even (the prototype of) a solution, you might be interested in the source code here:

Answer Sources in Prolog
https://github.com/jp-diegidio/Nan.System.Sources-Prolog

Look in particular at the creation/destruction code, as that’s where I allocate then reclaim resources: I use threads and message queues, plus few other things. (The error handling code in particular is quite horrible, and maybe not just that: anyway that’s another story.)

Several years have passed, but I should still be able to answer basic questions if you have any. Also, beware that it’s SWI-Prolog 7.3.25.

Thank you, sir. Checking it out.

I also had to slightly edit my requirements with the recent realization that I actually need the pages called asynchronously but in order, because if I have pages …4,5,6 within a rate limit cycle, and 4 is the last page, I don’t want to call 6 & 5 out of order just because the threads happen to be scheduled that way and waste api calls. I want 4 to be called, see it’s the last page and break.

With these answer sources, which also report determinism, you can implement any combinator with any synchronization strategy between the processes relatively easily.

Unfortunately, I didn’t write much documentation at the time, and the only non-trivial example is the parallel combinator you can find here: https://github.com/jp-diegidio/Nan.System.Sources-Prolog/blob/396e54dbb911f2f7a4e8a1324f84c6c05db67bd8/Code/sources.pl#L125

Some basic combinators could indeed be implemented in the library itself, and the whole thing be encapsulated a bit better, up to some “closed algebra of interactors”: but I didn’t have enough motivation at the time to keep working on it; anyway, it’s still an open project… – Also, I don’t know if meanwhile the project has been superseded by some other package or library.

That said, could you please clarify your requirement: What do you mean by rate limit exactly? Not more than some N threads running at any time? Also, you say asynchronously, which informally I read “in the background”, but what’s happening in the main thread: just orchestrate the tasks till full completion?

There’s a time, thread, and max requests rate limit, but for now I’m mainly focusing on max requests.

I think I’m coming to the realization that what I’m trying to do is impossible in parallel :upside_down_face:

I think best case scenario, if I’m going to base the logic on is_last, here’s what I would want:

So by way of example, there’s the Verse programming language. Verse supports several types of control flows for concurrency. One of them is called race. What race does is sets up some concurrent threads and after the first thread successfully completes, verse cancels the rest.

I’m wondering if I could do is something like that except instead of basing it on which thread simply completes first, rather it completes with a certain condition, and cancels concurrent threads with a certain condition as well.

Meaning:

Suppose I’m concurrently calling

api_call(get_thing,page(4),Response).
api_call(get_thing,page(5),Response).
api_call(get_thing,page(6),Response).

and suppose one of these is the last page but we don’t know which one it is.

If page(4) comes back with Response.is_last = true then we don’t need pages 5 and 6. If page(5) completes first and comes back with is_last then we still need page(4) but we can cancel page(6).

So is there a way that we can cancel threads based on the payload they are executing? ie. page(4) comes back with is_last , so all threads with payload api_call(get_thing,page(N),Response) where N > 4 get cancelled?


Of course, this leads to the possibility that unbeknownst to me page(4) is last but pages 5 and 6 get called first, so :upside_down_face: this may have been a big swing and a miss for me.

I dunno.. Is what I’m trying to do not possible?

Appreciate your input.

first_solution/3 does that.

I’ll call it good and just keep using totalPages and hope they don’t deprecate it.

Thanks all!

I dunno.. Is what I’m trying to do not possible?

The fact that you ask for an arbitrary number of “pages” since you don’t know in advance how many there are I do indeed find a dubious “pattern”. :slight_smile: Anyway, never mind too much the example:

To do e.g. cancellation of tasks, assuming the tasks are in fact cancellable (somehow / to some extent), I think you do need an async pattern, by which I mean e.g. something like my begin and end methods.

But I’d say at this point you should really clarify if you are after a concrete solution to a concrete problem / use case, but then you should say exactly which is that and let’s stick to it; or, you are investigating how to put together a general library whose interface would indeed look something like Verse (though, rather than reverse-engineering Verse, I’d still stick to some mathematics).

IOW, a generic discussion / brainstorming of course has its place, but eventually you’d have to narrow it down to one specific problem. [P.S. Never mind, meanwhile indeed you have.]

1 Like