Hi, this is a follow up to my previous post about this. Now that I’m a little more practiced in multithreading, I would like to reset a bit on this question if I may.
Again, what I’m trying to accomplish:
- Paginate through API calls.
- Abide by a rate limit.
- Execute page calls:
- in order
- asynchronously, up until the rate limit.
On the previous post I was given advice which ultimately helped me accomplish my goal (thank you!). The advice was: If you have units of work [1, 2, 3, 4, 5, 6]
then “bucketize“ the work per the rate limit, so if it’s 3 you should get [[1, 2, 3],[4, 5, 6]]
, and then iterate through each bucket, executing each concurrently and sleeping between each bucket.
I used concurrent/3 for this and it worked like a charm.
The only reason this worked though is because the API I’m calling happens to provide a totalPages
property so, very roughly, that enabled me to do something like
get_things([]).
get_things([Bucket|Buckets]) :-
concurrent(3,Bucket,[]),
sleep(Rate_limit_sleep),
get_things(Buckets).
get_things_wrapper(Things) :-
api_call(get_things,Response),
Total_pages = Response.totalPages,
bucketize_pages(Total_pages,Page_buckets),
get_things(Page_buckets).
What I’m wondering though, is what if I was not provided the total pages beforehand?
What if instead of totalPages
the response only provides a boolean is_last
property which lets you know when you’ve paginated to the last page. In that case you can’t bucketize as above because you don’t know how many buckets to create and you would have to process each page as you get to it to check for that property.
So I’m trying to get this minimal example to work:
rate_limit(Count,New_count) :-
succ(Count,Count_),
once((
Count_ = 3,
writeln("wait"),
sleep(3),
New_count = 0
;
New_count = Count_
)).
work(Work) :-
%Network_latency = 2,
%sleep(Network_latency),
writeln(Work).
do_work(_,[]) :- !.
do_work(C,[Work|Works]) :-
thread_create(work(Work),_),
rate_limit(C,C0),
do_work(C0,Works).
main :-
numlist(1,12,Work),
do_work(0,Work).
(in this example we happen to know there are 12 units of work but again, assume you don’t know that in advance.)
Ignoring the “network latency” for now, what I would naively expect running main here is 1, 2, 3, wait(3s), 4, 5, 6, wait(3s)...
, and so on.
Instead what I’m getting is a variety of wonky output like wait 1, 2, 3, 4, 5, 6, wait, 7, wait, 8, 9, 10, 11, wait, 12
or 1, 2, wait, 3, 4, wait, 5, 6, wait, 7, 9, 8, 10, 11, wait, 12
.
From my surface level understanding I believe this because we are offloading the threads to the OS but we’re cycling through do_work/2
so fast that the OS isn’t able to schedule them before we hit the rate limit.
Now uncomment the two latency lines in work/1
and run it. From my testing, this now executes concurrently; ie. I only had to wait 2 seconds for all three, not 3 units_work * 2 seconds_each = 6 seconds synchronous wait, but out of order.
Any guidance here is greatly appreciated.
In short what I’m trying to do is get my output to appear exactly like, and be as performant as, this
rate_limit(Count,New_count) :-
succ(Count,Count_),
once((
Count_ = 10,
writeln("wait"),
sleep(3),
New_count = 0
;
New_count = Count_
)).
work(Work) :-
%Network_latency = 2,
%sleep(Network_latency),
writeln(Work).
do_work(_,[]) :- !.
do_work(C,[Work|Works]) :-
work(Work),
rate_limit(C,C0),
do_work(C0,Works).
main :-
numlist(1,100,Work),
do_work(0,Work).
except when you uncomment network latency, I don’t want to wait 2 seconds * 10 units of work = 20 seconds between rate limits. I want them to execute concurrently so I only wait 2 seconds total.