Building a port scanner

Looks like this is out.

In reading the documentation

The maximum number of threads defined is the amount of cores available.

Since the number of cores is now typically about 8,12 or 16 on laptops and my other example ran cleanly with 50 threads, concurrent_forall/3 will probably be much slower, but then again Jan W. is known to change the code based on forum post. :grinning:


EDIT

There are two versions of concurrent_forall/3. One is in the standard Prolog and one is in Package “xlibrary”

1 Like

The code seems to say something different – that the # of threads defaults to the # of CPUs but otherwise can be anything. @jan ?

    (   option(threads(Jobs), Options)
    ->  true
    ;   current_prolog_flag(cpu_count, Jobs)
    ),

1 Like

When I went looking for concurrent_forall/3 I found it first in Package “xlibrary” but in searching for your example code found the version Jan W. created. That explains the differences.

Now looking at Jan W. version. :wink:

While I am reading your replies in this post, for the particular current need of the port scanner there is no need to balance out the threads. Each thread is trying to open a port and return only one of two results, open or closed and when the port is open the response is instantaneous and the thread is available in the pool, when the port is closed it is due to a timeout which takes ~5 seconds. As only 3 out of 65536 ports are open, most of the threads wait for the timeout then return to the pool. If you start the program with 50 threads in the pool and 1000 ports you will see the results typically get dumped out in a batch of 50 then pause, a batch of 50 then pause, etc. The open ports get dumped out almost immediately. So there is nothing to balance as the threads are typically completing in sync.

I do agree that balancing is a desired trait for threading that should be considered when multithreading.

For some other tests that I have to write, they might benefit from balancing and I will revisit your replies. :grinning:

The next version makes use of message queues to pass the result of the port scan back to the main thread. This also adds debug messages from library(debug).

start_scan_pool_02(Number_of_threads,Options) :-
	thread_pool_create(scan_pool,Number_of_threads,Options).
stop_scan_pool_02 :-
	thread_pool_destroy(scan_pool).

scan_pool_02(IP_address,Low_port,High_port,Number_of_threads) :-
    message_queue_create(Result_queue),
    start_scan_pool_02(Number_of_threads,[]),
    time(
        (
            findall(Id,
                (   between(Low_port,High_port,Port),
                    thread_create_in_pool(scan_pool,worker(Result_queue,IP_address,Port),Id,[])
                ), Ids),
            join_all(Ids),
            gather_results(Result_queue,Port_results),
            format('~w~n',[Port_results])  % Peter once showed me a better predicate to print structures but I can't recall it at the moment.  Found it: print_term/2
        )
    ),
    stop_scan_pool_02.

worker(Result_queue,IP_address,Port) :-
    debug(threads, 'Worker: port ~w', [Port]),
    port_scan_02(IP_address,Port,Port_result),
    thread_send_message(Result_queue, result(Port,Port_result)).

port_scan_02(IP_address,Port,Result) :-
    catch(
        setup_call_cleanup(
            tcp_socket(Socket),
            (
                % Open stream socket based on TCP/IP which uses IP address and port number, i.e. INET socket
                tcp_connect(Socket, IP_address:Port),
                Result = open
            ),
            tcp_close_socket(Socket)
        ),
        error(_,_),
        Result = closed
    ).

gather_results(Result_queue,[result(Port,Port_result)|Port_results]) :-
    thread_get_message(Result_queue,result(Port,Port_result),[timeout(0)]),
    debug(threads, 'Result - Port: ~w - Result: ~w', [Port,Port_result]),
    gather_results(Result_queue,Port_results),
    !.
gather_results(_,[]).

Example run

?- debug(threads).
Warning: threads: no matching debug topic (yet)
true.

?- scan_pool_02('140.211.166.101',75,84,3).
% [Thread 4] Worker: port 75
% [Thread 5] Worker: port 76
% [Thread 6] Worker: port 77
% [Thread 7] Worker: port 78
% [Thread 8] Worker: port 79
% [Thread 9] Worker: port 80
% [Thread 10] Worker: port 81
% [Thread 11] Worker: port 82
% [Thread 12] Worker: port 83
% [Thread 13] Worker: port 84
% Result - Port: 77 - Result: closed
% Result - Port: 75 - Result: closed
% Result - Port: 76 - Result: closed
% Result - Port: 80 - Result: open
% Result - Port: 79 - Result: closed
% Result - Port: 78 - Result: closed
% Result - Port: 81 - Result: closed
% Result - Port: 82 - Result: closed
% Result - Port: 83 - Result: closed
% Result - Port: 84 - Result: closed
[result(77,closed),result(75,closed),result(76,closed),result(80,open),result(79,closed),result(78,closed),result(81,closed),result(82,closed),result(83,closed),result(84,closed)]
% 41,514 inferences, 0.016 CPU in 63.136 seconds (0% CPU, 2656896 Lips)
true.

@jan

For the debug messages % [Thread 4] Worker: port 75 I was not expecting the thread numbers to be all differenent as this is using a pool of threads. Is the code correctly reusing the threads from a pool, am I reading the debug messages incorrectly or something else?

I don’t really like the name balance much as its relation to concurrency is a bit far away to me and you can balance practically everything, but I have little clue what a balanced conjuction is.

I do think the behavior is useful and intuitive. I assume this also puts the generator in a separate thread and gives the main thread the task of collecting results?

Although you can bootstrap concurrent_forall/2 on balance/1 it might not be ideal to do so. To me, it seems balance is considerably more complicated and slower due to the fact that we must return and collect the results.

With multithreaded code I lean toward having the worker threads knowing next to nothing of the outside world. In my earlier versions the worker threads would use format/2 which in my mind says the workers know about the display device. By passing the results back in a message queue, this removes that dependency/knowledge. In reading your reply it seems that there is another view to be learned.

The final goal of this specific port scanner is to collect the results and persistent them to a file using library(persistency). My plan was to collect the results from the message queue then pass and persist them to a file in the main thread but in reading your reply it seems it should be done in the worker thread. Feedback desired?

A thread_pool could be a misleading name. It is a wrapper around thread_create/3 that limits the number of threads you can create and make subsequent calls to create a thread either fail with an error or wait until some thread in the pool stopped. Thread IDs can only be reused after you join the ended thread.

I’d use concurrent_forall/3 using the threads(Count) option and assert the port status in the database to be collected later. Properly managing a set of threads and ensuring they are properly reclaimed is quite complex. Since some time SWI-Prolog will GC forgotten threads, but it can take a while before it realises they are forgotten.

1 Like

The next version switches to using concurrent_forall/3. Since this predicate is so new (3 days old) and I run on Windows, I installed the Windows 64-bit version of the daily build.

SWI-Prolog (threaded, 64 bits, version 8.3.2-198-gd839164c7)

Note: This code also moved executing debug/1 on the command line into a Prolog directive
:- debug(concurrent). which can easily be commented out. If you peruse the SWI-Prolog source code on GitHub you will often find these lines commented out.

Also notice how much simpler the code becomes when using concurrent_forall/3.

The hardest part about writing this was trying to understand concurrent_forall(:Generate, :Test), how did Generate and Test align with my existing code. So instead of trying to understand the code from the top down I looked at the critical predicate common to all of this which is thread_create/2 and in concurrent_forall/2 is in the line maplist(thread_create(fa_worker(Q, Me, Templ, Test)), Workers) and just figured out which variables I could set and what they needed. The only one that can be set by calling concurrent_forall/3 is Test which needs to be port_scan(IP_address,Port), after that identifying what the rest of concurrent_forall needed was easy.

For this use of concurrent_forall instead of concurrent_forall(:Generate, :Test) I think of it as concurrent_forall(:Generate unique threads values, :Call thread with unique values).

:- debug(concurrent).

concurrent_scan(IP_address,Low_port,High_port,Number_of_threads) :-
    concurrent_forall(
        between(Low_port,High_port,Port),
        port_scan(IP_address,Port),
        [threads(Number_of_threads)]
    ).

port_scan(IP_address,Port) :-
    catch(
        setup_call_cleanup(
            tcp_socket(Socket),
            (
                % Open stream socket based on TCP/IP which uses IP address and port number, i.e. INET socket
                tcp_connect(Socket, IP_address:Port),
                format('Port ~w: open~n', [Port])
            ),
            tcp_close_socket(Socket)
        ),
        error(_,_),
        true
    ).

Example run

?- concurrent_scan('140.211.166.101',75,84,3).
% [Thread 5] Running test user:port_scan('140.211.166.101',77)
% [Thread 3] Running test user:port_scan('140.211.166.101',76)
% [Thread 4] Running test user:port_scan('140.211.166.101',75)
% [Thread 5] Running test user:port_scan('140.211.166.101',78)
% [Thread 4] Running test user:port_scan('140.211.166.101',79)
% [Thread 3] Running test user:port_scan('140.211.166.101',80)
Port 80: open
% [Thread 3] Running test user:port_scan('140.211.166.101',81)
% [Thread 5] Running test user:port_scan('140.211.166.101',82)
% [Thread 4] Running test user:port_scan('140.211.166.101',83)
% [Thread 3] Running test user:port_scan('140.211.166.101',84)
true.

NB The threads are being reused this time.

A more comprehensive example. (debug/1 was commented out.)

?- time(concurrent_scan('140.211.166.101',1,65536,8192)).
Port 80: open
Port 22: open
Port 443: open
% 196,643 inferences, 0.500 CPU in 174.439 seconds (0% CPU, 393286 Lips)
true.

8192 threads checking 65536 ports in ~3 minutes.


The comment Jan W. made about

and that I asked about now makes more sense. If you read the code for concurrent_forall you will notice that to pass messages back would require adding more complexity to something that is already very complex. So I take it to mean that if you want to use concurrent_forall then use it as designed, even it is breaking some rules of thumb such as have the threads know as little as possible about the outside world.

Just wondering what it would mean to write this in SWI-Prolog, I got to these two files.

test_balance.pl (1.1 KB) balance.pl (4.2 KB)

The implementation is rather tricky. I did decide for an arity 2 version for now to make clean what the generator and tester are. Not yet sure what to do with it. When matured and with a proper name, add it to the library(thread), I guess.

1 Like

The next version adds library(persistency).

:- use_module(library(persistency)).

:- working_directory(_,'C:\\Users\\Eric\\Documents\\Port Scans').

:- persistent
    port_scan_result(port:integer,result:atom).

:- initialization(db_attach('port_scan_result.journal', [])).

exists_port_scan_result(Request,Response) :-
    port_scan_result(Request,Response).

add_port_scan_result(Request,Response) :-
    with_mutex(port_scan_result_journal, assert_port_scan_result(Request,Response)).

concurrent_scan_02(IP_address,Low_port,High_port,Number_of_threads) :-
    concurrent_forall(
        between(Low_port,High_port,Port),
        port_scan_02(IP_address,Port),
        [threads(Number_of_threads)]
    ).

port_scan_02(IP_address,Port) :-
    catch(
        setup_call_cleanup(
            tcp_socket(Socket),
            (
                % Open stream socket based on TCP/IP which uses IP address and port number, i.e. INET socket
                tcp_connect(Socket, IP_address:Port),
                (
                    exists_port_scan_result(Port,open), !
                ;
                    add_port_scan_result(Port,open)
                )
            ),
            tcp_close_socket(Socket)
        ),
        error(_,_),
        (
            exists_port_scan_result(Port,closed), !
        ;
            add_port_scan_result(Port,closed)
        )
    ).

Example run.

NB halt. is needed so that the data is written to the file. Until halt all of the data resides as facts in the Prolog database.

?- concurrent_scan_02('140.211.166.101',75,84,3).
true.

?- halt.

File: port_scan_result.journal

created(1593621240.60769).
assert(port_scan_result(77,closed)).
assert(port_scan_result(76,closed)).
assert(port_scan_result(75,closed)).
assert(port_scan_result(80,open)).
assert(port_scan_result(78,closed)).
assert(port_scan_result(79,closed)).
assert(port_scan_result(81,closed)).
assert(port_scan_result(83,closed)).
assert(port_scan_result(82,closed)).
assert(port_scan_result(84,closed)).

A more comprehensive example.

?- time(concurrent_scan_02('140.211.166.101',1,65536,8192)).
% 196,642 inferences, 0.547 CPU in 175.149 seconds (0% CPU, 359574 Lips)
true.

8192 threads checking 65536 ports in ~3 minutes.

AFAIK with_mutex/2 is not needed in add_port_scan_result/2 because there should only ever be one fact for each port and once the facts are generated after the first time I don’t ever expect them to change. Also only one thread will be adding the new fact for the specific port once, that thread will not be competing with other threads and thus no lock is needed. Since this was created to test the open ports of a specific site if something changes then there is something very wrong somewhere.

Since there are so few examples of working library(persistency) code to be found, I am leaving with_mutex/2 for the write in because people are likely to copy this and if the with_mutex/2 were missing they would be wondering why on rare days their code does not work.

EDIT

Starting with SWI-Prolog 8.3.3 library(persistency) (GitHub Commit) was made more thread friendly by wrapping the four predicates with with_mutex/2.

So

add_port_scan_result(Request,Response) :-
    with_mutex(port_scan_result_journal, assert_port_scan_result(Request,Response)).

can be changed to

add_port_scan_result(Request,Response) :-
    assert_port_scan_result(Request,Response).

From documentation

This module requires the same thread-synchronization as the normal Prolog database. This implies that if each individual assert or retract takes the database from one consistent state to the next, no additional locking is required. If more than one elementary database operation is required to get from one consistent state to the next, both updating and querying the database must be locked using with_mutex/2.

The persistency library requires the same locking as normal query/assert/retract: as long as each individual assert/retract moves from one consistent state to the next, no locking is required. Only if moving from one consistent state to the next requires multiple retract/retract operations locking is required, both for the reader and writer. Eventually, transactions will require multiple modifications to be wrapped in a transaction and query to require nothing special.

1 Like

I don’t know if I am understanding that sentence correctly.

My take is that this will be like SQL databases where one wraps a series of statements in a transaction and once the transaction completes or is committed will the updates occur. But the use of the word eventually means it does not exist at present. That does make sense.

1 Like

format/3 is atomic, like all the core stream write predicates.

No I mean the ordering might get corrupted, so that when you re-run the journal from file, you don’t get what you had in memory before hand. This is result from pre-emptive concurrency

of threads, it could happend without locking that the library(persistency) commits the following error. Just look at db_assert/1 and db_retract/1:

  Thread 1:                 Thread 2:
  assert/1                    
                            retract/1
                            write_action/2
  write_action/2

So you did assert/1 retract/1, but while reading the journal it will do retract/1 assert/1. But I guess you will not see this interleaving for the port scanner, since it has only assert. You need to make another example.I did an example fill_unsafe on comp.lang.prolog and on twitter which also illustrates an interleaving problem. It might take thousands of operations until the unlucky interleaving happens.

2 Likes

Here are the results on the modified code. This is the latest version compiled with GCC 9 using PGO, running on Ubuntu 20.04, AMD 3950X, 16 cores, 32 threads. As you see, up to 16 threads the speedup is close to optimal (14 times). Hyperthreading apparently does very little in this case.
The code is attached.

18 ?- time(count(N)).
% 69,907,410 inferences, 3.583 CPU in 3.589 seconds (100% CPU, 19512169 Lips)
N = 78499.

19 ?- time(count2(N,1)).
% 4,104 inferences, 0.014 CPU in 3.536 seconds (0% CPU, 287142 Lips)
N = 78499.

20 ?- time(count2(N,2)).
% 4,132 inferences, 0.014 CPU in 1.786 seconds (1% CPU, 293717 Lips)
N = 78499.

21 ?- time(count2(N,4)).
% 4,202 inferences, 0.024 CPU in 0.899 seconds (3% CPU, 178360 Lips)
N = 78499.

22 ?- time(count2(N,8)).
% 4,392 inferences, 0.005 CPU in 0.468 seconds (1% CPU, 937750 Lips)
N = 78499.

23 ?- time(count2(N,16)).
% 4,950 inferences, 0.005 CPU in 0.250 seconds (2% CPU, 1055806 Lips)
N = 78499.

24 ?- time(count2(N,32)).
% 6,832 inferences, 0.007 CPU in 0.213 seconds (3% CPU, 1041262 Lips)
N = 78499.

25 ?- A is 3.536/0.250.
A = 14.144.

primes.pl (553 Bytes)

1 Like

Right. Added locks to the basic update predicates. This still requires the user to add another layer of locks if multiple related changes are needed. Updated the docs to explain this more clearly.

Thanks for spotting.

2 Likes

Today Jan W. released SWI-Prolog 8.3.3 which included a modified version of Jan B. balance/2,3 named concurrent_and/2,3.

So I gave it a try.

The first thing I learned the hard way is that you have to accept each answer, e.g. press the space bar after true is displayed.

concurrent_scan(IP_address,Low_port,High_port,Number_of_threads) :-
    concurrent_and(
        between(Low_port,High_port,Port),
        port_scan(IP_address,Port),
        [threads(Number_of_threads)]
    ).

port_scan(IP_address,Port) :-
    catch(
        setup_call_cleanup(
            tcp_socket(Socket),
            (
                % Open stream socket based on TCP/IP which uses IP address and port number, i.e. INET socket
                tcp_connect(Socket, IP_address:Port),
                format('Port ~w: open~n', [Port])
            ),
            tcp_close_socket(Socket)
        ),
        error(_,_),
        true
    ).

Example run

?- concurrent_scan('140.211.166.101',79,82,3).
Port 80: open
true ;
true ;
true ;
true ;
false.

Adding fail to the end of the query will fix having to press the space bar but this is not the way concurrent_and/2 is designed to work, e.g.

?- concurrent_scan('140.211.166.101',79,82,3),fail.
Port 80: open
false.

If one looks at the test cases in test_balance.pl one sees the use of setof/3. Incorporating setof/3 leads to

concurrent_scan_02(IP_address,Low_port,High_port,Number_of_threads,Filtered_results) :-
    setof(
        Port-Result,
        concurrent_and(
            between(Low_port,High_port,Port),
            port_scan_02(IP_address,Port,Result),
            [threads(Number_of_threads)]
        ),
        Results
    ),
    include(filter,Results,Filtered_results).

port_scan_02(IP_address,Port,Result) :-
    catch(
        setup_call_cleanup(
            tcp_socket(Socket),
            (
                % Open stream socket based on TCP/IP which uses IP address and port number, i.e. INET socket
                tcp_connect(Socket, IP_address:Port),
                Result = open
            ),
            tcp_close_socket(Socket)
        ),
        error(_,_),
        Result = closed
    ).

filter(Port-open).

Example run

?- concurrent_scan_02('140.211.166.101',79,82,3,Results).
Results = [80-open].

A more comprehensive example.

?- time(concurrent_scan_02('140.211.166.101',1,65536,8192,Results)).
% 135,209,226 inferences, 16.281 CPU in 187.520 seconds (9% CPU, 8304597 Lips)
Results = [22-open, 80-open, 443-open].

8192 threads checking 65536 ports in ~3 minutes.

Probably not. On the other hand I don’t think I’d use a lock. Worst case you have a couple of redundant entered/1 clauses. They should not affect the result and it is pretty unlikely there will be so many that it has significant impact on memory usage.