I updated Jan example to do just that. It uses good old concurrent_maplist/4 to parallelise the work. Though merging dicts seemed too painful, so I converted them to compounds at reduce time.
process_file_by_partitions(File, Partitions) :-
size_file(File,Size),
Chunk is Size//Partitions,
P is Partitions-1,
numlist(0,P,Ps),
convlist(mult(Chunk),Ps,LBs),
length(Ds,Partitions),
LBs = [_|T],
append(T,[inf],UBs),
concurrent_maplist(process_file(File),Ds,LBs,UBs),
convlist(c,Ds,Ss),
flatten(Ss,Fs),
msort(Fs,S),
reduce(S,Os),
forall(member(city(City,Min,Max,Count,Sum),Os),
( Mean is Sum/Count,
format('~w = ~1f, ~1f, ~1f~n', [City,Min,Max,Mean])
)).
process_file(File, Dict, Start, Stop):-
setup_call_cleanup(
(open(File, read, In),seek(In,Start,bof,_),(Start>0->skip(In,'\n');true)),
process_stream(In, #{}, Dict, Stop),
close(In)).
process_stream(In, Data0, Data, Stop) :-
byte_count(In,Pos),
(Pos>Stop->Data=Data0
;
read_string(In, ";", "", Sep, CityS),
( Sep == -1
-> Data = Data0
; read_string(In, "\n", "", _, NumS),
atom_string(City, CityS),
number_string(Num, NumS),
update(City, Num, Data0, Data1),
process_stream(In, Data1, Data,Stop)
)
).
update(City, Num, Data, Data) :-
get_dict(City, Data, CData),
!,
CData = city(Min,Max,Count,Sum),
( Num < Min
-> nb_setarg(1, CData, Num)
; true
),
( Num > Max
-> nb_setarg(2, CData, Num)
; true
),
NewCount is Count+1,
nb_setarg(3, CData, NewCount),
NewSum is Sum+Num,
nb_setarg(4, CData, NewSum).
update(City, Num, Data0, Data) :-
put_dict(City, Data0, city(Num,Num,1,Num), Data).
mult(A,B,C):-C is A*B.
c(I,O):-findall(city(Name,I1,I2,I3,I4),city(I1,I2,I3,I4)=I.Name,O).
reduce([],[]).
reduce([X],[X]):-!.
reduce([city(Name,I1,I2,I3,I4),city(Name,J1,J2,J3,J4)|Cs],Ks):-
NewMin is min(I1,J1),
NewMax is max(I2,J2),
NewCnt is I3+J3,
NewTot is I4+J4,
reduce([city(Name,NewMin,NewMax,NewCnt,NewTot)|Cs],Ks),!.
reduce([H|T],S):-reduce(T,T2),S=[H|T2].
partitions is best set to your cpu_count. On my WSL instance 100 million rows completed in 180s with 1 partitions, and 30s with 16 partitions.