1brc -- Billion row challenge

On my machine it completes 1B rows in about 25min, compared to mine which completes in about 44min. I think you’re totally right about effort vs reward for a C version. I wrote a Fortran version which runs in <6 seconds on 4 cores in 193 LoC in 1/10th of the time that it took me to write a Haskell version which runs in 7 minutes! “Horses for courses” as they say in the UK.

1 Like

Lots of interesting takeaways here, the main one being there’s a heavy price to be paid for converting text from a file into a list of characters so that you can use a dcg when you can use read_string to parse simple records straight from the text buffer (presumably).

I find this to be a pretty astonishing result. So that’s ~6 ns. per record for the whole job. For comparison using @jan’s numbers (84% of 9.6 sec for 10M records) for read_string (i.e., just the string level parse done in C?) is about 800 ns. per record; that’s over two orders of magnitude difference. I’m not sure even massive parallelism can explain that.

Well, not really if you have a look at the Fortran solution and compare it. Its fully array based and features:

  • Custom float parser.
  • Custom hash table implementation.
  • mmap.
  • OpenMP for parallel runs.

Further, the gfortran optimising compiler is amazing and generates architecture native code which is vectorised where possible. Finally, OpenMP is state of the art for efficient parallel processing, so adding cores reduces the time (on the same machine) almost in direct linear relationship since the problem is “embarrassingly parallel”.

Meanwhile, the best C version is 0.15s relying additionally on packing 256-bit SIMD registers for insanely fast calculation of critical elements.

The guys on the Haskell forum managed to pull together a version which runs at <4s on a 10 core Mac. Mine ran in 4.3s against the same data on the same hardware.

Other things to note: if you check the main 1BRC project page, all the best Java results are <2s.

I stand by my comment - still pretty astonishing to me. At 15 bytes per record, that’s about 2.5 Gbytes per second. I’m pretty sure I couldn’t read the data from permanent storage that quickly. And a solution that runs in 0.15 sec. well exceeds DD4 memory transfer rates, even if the 15GB of data is pre-loaded, if I understand the published numbers correctly. But, admittedly, there’s a lot I don’t understand.

At the very least, it’s hard (for me) to imagine that normal I/O via OS calls is included in the fastest measured times. What is being measured here?

The official competition starts with the data on a RAM disk, the official benchmarks are just for the Java version, the best being 1.53s. People have all sorts of setups for non official benchmarks. A guy on the Haskell 1brc thread clocked the C version referenced above at 0.7s on his hardware.

Even if starting from permanent storage, a basic PCIe SSD reads at 5-6G/s, whilst the fastest (Crucial T705) has a sequential read speed of 14.5G/s.

There is a little write up about the fastest C result here, but I’d encourage others to just give the problem a try. The juice has been worth the squeeze.

1 Like

I updated Jan example to do just that. It uses good old concurrent_maplist/4 to parallelise the work. Though merging dicts seemed too painful, so I converted them to compounds at reduce time.

process_file_by_partitions(File, Partitions) :-
    size_file(File,Size),
    Chunk is Size//Partitions,
    P is Partitions-1,
    numlist(0,P,Ps),
    convlist(mult(Chunk),Ps,LBs),
    length(Ds,Partitions),
    LBs = [_|T],
    append(T,[inf],UBs),
    concurrent_maplist(process_file(File),Ds,LBs,UBs),
    convlist(c,Ds,Ss),
    flatten(Ss,Fs),
    msort(Fs,S),
    reduce(S,Os),
    forall(member(city(City,Min,Max,Count,Sum),Os),
           (   Mean is Sum/Count,
               format('~w = ~1f, ~1f, ~1f~n', [City,Min,Max,Mean])
           )).

process_file(File, Dict, Start, Stop):-
    setup_call_cleanup(
        (open(File, read, In),seek(In,Start,bof,_),(Start>0->skip(In,'\n');true)),
        process_stream(In, #{}, Dict, Stop),
        close(In)).

process_stream(In, Data0, Data, Stop) :-
    byte_count(In,Pos),
    (Pos>Stop->Data=Data0
       ;
     read_string(In, ";", "", Sep, CityS),
     (   Sep == -1
     ->  Data = Data0
     ;   read_string(In, "\n", "", _, NumS),
         atom_string(City, CityS),
         number_string(Num, NumS),
         update(City, Num, Data0, Data1),
         process_stream(In, Data1, Data,Stop)
     )
    ).

update(City, Num, Data, Data) :-
    get_dict(City, Data, CData),
    !,
    CData = city(Min,Max,Count,Sum),
    (   Num < Min
    ->  nb_setarg(1, CData, Num)
    ;   true
    ),
    (   Num > Max
    ->  nb_setarg(2, CData, Num)
    ;   true
    ),
    NewCount is Count+1,
    nb_setarg(3, CData, NewCount),
    NewSum is Sum+Num,
    nb_setarg(4, CData, NewSum).
update(City, Num, Data0, Data) :-
    put_dict(City, Data0, city(Num,Num,1,Num), Data).

mult(A,B,C):-C is A*B.
c(I,O):-findall(city(Name,I1,I2,I3,I4),city(I1,I2,I3,I4)=I.Name,O).

reduce([],[]).
reduce([X],[X]):-!.
reduce([city(Name,I1,I2,I3,I4),city(Name,J1,J2,J3,J4)|Cs],Ks):-
   NewMin is min(I1,J1),
   NewMax is max(I2,J2),
   NewCnt is I3+J3,
   NewTot is I4+J4,
   reduce([city(Name,NewMin,NewMax,NewCnt,NewTot)|Cs],Ks),!.
reduce([H|T],S):-reduce(T,T2),S=[H|T2].

partitions is best set to your cpu_count. On my WSL instance 100 million rows completed in 180s with 1 partitions, and 30s with 16 partitions.

2 Likes

Its a good try but am I correct in saying that at least these things are an issue?

  • Partitions seem unaligned. I.e. If a partition cuts a row it will not adjust the adjacent partition to make sure that the row is included in just one partition.
  • The whole file must end up in memory simultaneously, so if you’ve not enough memory it won’t work. Solutions typically use mmap to avoid this.

No, because it will be picked up by the previous partition, since each thread only consume rows whose start position are in the partition range (note each thread still has access to the full file).

Maybe, what is in memory are the aggregates produced after streaming over a partition range, so at worse is Num-of-Partitions*Output Size. (every weather station in the output has an entry in each partition prior to reduction). I think the default buffering for streams is full-buffering, however not sure if streaming over a file loads the full file into memory.

Moreover,I was merely demonstrating a simple map-reduce implementation as outlined in the post above.

It would be interesting to see how fast Mercury is with this :grinning:

That is rather disappointing. As there is no serious interaction between the threads, the speedup should be pretty close to the number of cores as long as you do not need to rely on hyperthreading. With hyperthreading the speedup seems to depend a lot on the workload.
And of course, we may be faced with throttling and loss of the single CPU frequency boost.

Also, WSL file I/O is, AFAIK, a lot slower than Linux native file I/O, so possibly this is simply I/O bound?

I mean this situation:

Partition N


New York;23.4
Lon

Partition N+1

don;13.4
Paris;-23.2

There isn’t enough information in either partition to complete the entry for London. So, you need to cut the partitions ahead of time or have some kind of rules which backtrack beyond the boundaries of the partitions. On the sample given above 1 and 5 partitions give different results (I also note that both outputs are wrong).

> process_file_by_partitions("sample.txt",1).
Bridgetown = 26.9, 26.9, 0.0
Bulawayo = 8.9, 8.9, 0.1
Conakry = 31.2, 31.2, 0.0
Cracow = 12.6, 12.6, 0.1
Hamburg = 12.0, 12.0, 0.1
Istanbul = 6.2, 23.0, 0.1
Palembang = 38.8, 38.8, 0.0
Roseau = 34.4, 34.4, 0.0
St. John's = 15.2, 15.2, 0.1
> process_file_by_partitions("sample.txt",5).
Bridgetown = 26.9, 26.9, 0.0
Bulawayo = 8.9, 8.9, 0.1
Conakry = 31.2, 31.2, 0.0
Cracow = 12.6, 12.6, 0.1
Hamburg = 12.0, 12.0, 0.1
Istanbul = 6.2, 23.0, 0.1
Palembang = 38.8, 38.8, 0.0
Roseau = 34.4, 34.4, 0.0

This seems to be the case. It runs in constant memory.

Not disk bound :slight_smile: Note, your original single threaded code does 1B rows in 25min on my machine. Here is the multi threaded solution above for 1B rows vs my Fortran code as a high benchmark. Exactly the same file and hardware:

time swipl test.prolog > /dev/null

real    7m14.351s
user    55m35.318s
sys     0m10.728s

Fortran:

time ./1brc > /dev/null

real    0m5.712s
user    0m20.291s
sys     0m0.643s

This single threaded AWK solution runs in about 6m20s, which I think is a good benchmark for single threaded performance. My best single threaded Fortran code runs in about 32s.

There was an off by 1 error and I had mixed up the Sum and Count variables (I added the output code at the last minute, previously just looking at the data structures directly). Now corrected.

1 Like

I think I see. So, start positions demarcated by either start of file or new line. In the London case above, Partition 1 would extend to the end of the record because it includes the start of the row, whilst Partition 2, would skip to the first new line and begin from there instead: the two processes being complimentary.

This kind of problem is well-suited for “map-reduce”. SWI-Prolog can do the “map” part in parallel, but doesn’t (AFAIK) have support for multi-threading the “merge” or the “reduce”/“fold”/“aggregate”.

“Merge” can be partially multi-threaded, by taking the outputs of the “maps” as chunks and partially sorting them, then merging the chunks (also called “shuffling”); and the reduce/fold/aggregate can be multi-threaded by dispatching the same-key chunks to independent threads. Finally, these aggregated outputs are merged (with some additional opportunities for multi-threading).
So, it would be nice to have a multi-core version of msort/2.

The distribution of the data is important – Google’s MapReduce(*) has a lot of tuning parameters and on-the-fly analysis (plus quite a bit of stuff for dealing with shards that fail(**) or stall, plus opportunistic parallel processing using different parameters)

(*) I used Google’s MapReduce a lot about 15 years ago [some MR jobs took over a week to run over petabytes of data, with thousands of machines and 10s of thousands of cores] – it’s been replaced by “Flume”, but AFAIK this replacement is to allow a more flexible workflow than map-shard-shuffle-reduce and also more automation in tuning.
(**) Failure in the sense of a disk error, network failure, machine check, out-of-memory, etc.; not in the sense of logic programming failure.

1 Like

I notice you are using Mawk as a flavour of Awk.
Have you considered using alternative flavours?

For your information, here is some information on Gawk’s recent utilization of persistent-memory:

                                  run time      peak memory    intermediate
AWK script                       (sec)      footprint (K)    storage (K)
naive                               O(N)   242.132       2,081,360           n/a
rwarray build O(N)     250.288       2,846,868         156,832
rwarray query O(W)     11.653       2,081,444
freqtbl build O(N)       288.408       2,400,120          69,112
freqtbl query O(W)       11.624       2,336,616
pm-gawk build O(N)   251.946       2,079,520       2,076,608
pm-gawk query O(1)      0.026           3,252

Forgive me, but quickly scanning, is it that you are operating off an already compiled file from the cited dataset?

Is it formed from this?

Im looking forward to benchmarking whether pm-gawk is impactful.

The answer is probably get one of the “accepted” Java versions to work locally and use that to validate. Okay.

Ignore the rest


I know I am very late for the party, I have a small question. How do we compare the results of this challenge? I was looking at the data generating script in Python and it seems that it randomly picks up to 10K location names, and it randomly generates the temperature data. I didn’t see any seeding in the code. Maybe the Java version produces stable output?

So how do we know if the otherwise fast(er?) implementation is also correct? Do we compare the results between the multiple versions of the solution we have programmed?