Reading from a socket, creep

Hi,

I am converting my RBaseXClient to a SWI-prolog version (and thanks to Jan, I already figured out how I can use dicts as the main data structure to be used instead of the R object oriented classes).
I use this code to open a socketConnection to a BaseX-server (mostly copied from internet):

create_Socket(Host, Port, Username, Password, StreamPair) :-
    atomic_list_concat([Host, Port], ':', HP_A),
    term_to_atom(HP, HP_A),
    setup_call_catcher_cleanup(tcp_socket(Socket),
                               tcp_connect(Socket, HP),
                               exception(_),
                               tcp_close_socket(Socket)),
    setup_call_cleanup(tcp_open_socket(Socket, In, Out),
                     chat_to_server(In, Out),
                     close_connection(In, Out)).

In the example this code is intended to be used for reading from the socket:

chat_to_server(In, Out) :-
    read(Term),
    (   Term == end_of_file
    ->  true
    ;   format(Out, '~q .~n', [Term]),
        flush_output(Out),
        read(In, Reply),
        write(Reply),
        chat_to_server(In, Out)
    ).

In the debugger I can see that connection and streams are created.

According to the client server protocol, the first read operation after creating the socket should result in something like ā€œBaseX:1369578179679ā€. This realm and timestamp is used, together with username and password, for authenthication. And once authenticated,the socket can be used for all operations.
Executing read(Term) in chat_to_server however results in a loop and a creep... message in the debugger.

From R I know that the socketconnection can be used in nonblocking mode.

I also tried using this predicate for reading but that didnā€™t make any difference:

readBin(StreamPair, Reply) :-
    stream_pair(StreamPair, In, _),
    read(In, Reply).

How can I read from a socket?

Thankds,
Ben

What is this? If Host is a valid Prolog atom syntax, the result is the same as Host:Port, no? Otherwise it is a syntax error exception.

Using read/2 :slight_smile: But, that only reads Prolog terms, i.e., something that ends with ā€œ.ā€ followed by white space. What you probably want is to read a line. You can do that using e.g., read_line_to_string/2. Even more low-level stuff is get_code/2, get_byte/2, read_string/3, etc. In any case, you can read from a stream pair. It will automatically use the input half of the pair.

Before you write another R interface, be aware there are already 4 of them:

  • Two by @nicos, one talking to R using stdio and one embedding R using the C interface.
  • One by @mgondan1 also using the C interface, but primarily aiming at using Prolog from R
  • One by me based on the Rserve protocol. This is used by SWISH.

So far all of them have a different focus. It is clear that R ā†” Prolog is popular :slight_smile:

1 Like

And thereā€™s also a stub in the MQI docs

https://www.swi-prolog.org/pldoc/man?section=mqi-overview

As the docs say, ā€žNote that this is not an example of how to use the MQI from R, it just shows the first code a developer would write as they begin to build a nice library to connect R to Prolog using the MQI.ā€œ

:blush:

My baseXClient("localhost", 1984, "admin", "admin", Session) predicate calls the predicate create_socket as above. Without the transformation tcp_connect returns the exception.
I also tried baseXClient("localhost:1984", "admin", "admin", Session) but that also returned an error.

No, I donā€™t want to read a line, I want to read codes from the stream until the buffer is empty. I will try if get_byte gives better results.
I didnā€™t know that choosing the input half of the pair is done automatically. It often takes a lof of time to find this sort of information :wink:

@Jan,
Your remarks about low-level stuff put me on a search trail. https://www.swi-prolog.org/pldoc/doc_for?object=read_pending_codes/3 looks interesting, it resembles stuff I know from R.

I know (from working on RBaseX) that the buffer contains a byte-array and I also know how that byte-array has to be used.

I am not working on yet another R interface, I am working on a swiprolog-client for BaseX. BaseX is primarily a XML-database but it can also store HTML, JSON, CSV, others and binary data. It is also a XQuery 3.1 Processor with full support for the W3C Update and Full-Text extensions.

Not really. When doing networking you need something in the data that tell you when to stop reading as the data may arrive in quite arbitrary batches. You can read as much data as is available using read_pending_codes/3, which you may have to combine with fill_buffer/1 and wait_for_input/3. I doubt that is a good idea.

I guess something tells you where the byte array ends? A delimiter? A known length, something at the start that indicates its length?

1 Like

After changing ā€œlocalhostā€ into ā€˜localhostā€™, tcp_connect worked fine. (This wasnā€™t the first time I made thise error and it probably isnā€™t the last time either :smirk:)

Once a socket has been created, this socket is used for streams of raw data (both input and output). You have to process the streams until no more bytes are available.
I have checked the Java-sources for the server. The socket is created as ā€œnew InetSocketAddress(host, port)ā€. After a BufferInput is created, ā€˜getā€™ is used to read from the stream. I didnā€™t find any checks on the length of the buffer.
I also checked C and C++ examples of client-code and none of them checked the length.

In R I use this code for reading:

readBin_ <- function(conn) {
  total_read <- rd <- as.raw(c())
  while(!done(rd, length(total_read))) {
    socketSelect(list(conn))
    rd <- readBin(conn, "raw", 1024)
    total_read <- c(total_read,rd)
    }
  return(total_read)
}
done <- function(rd, total_length) {
  finish <- TRUE
  if (total_length == 0) {
    finish <- FALSE
  } else {
    i <- length(rd)
    if (i ==1024) finish <- FALSE
  }
  return(finish)
}

readBin is trying to read a block of 1024 bytes and repeats until a block is partially empty. As you can see there are no checks on the total length.

The first read takes place while authenticating. And this is the byte-array (hex-values) that is returned:
42 61 73 65 58 3A 31 33 36 39 35 37 38 31 37 39 36 37 39 00.
Since this is a long sequence of hex-codes, I wonder if I should try to use pure_input.pl and phrase_from_stream for reading? And if so, what are the predicates used for writing large chunks of raw data to a stream?

Ben

Very weird. Ok, it is waiting for a reply, so you will indeed never read too much. If the initial message is a multiple of 1024 long youā€™ll deadlock though. AFAIK the network layers may also break the package, resulting in a partial read may not be a multiple of 1024. Possibly ending in 00 is not an accident?

Probably not :slight_smile:

?- atom_codes(A, [0x42,0x61,0x73,0x65,0x58,0x3A,0x31,0x33,0x36,0x39,0x35,0x37,0x38,0x31,0x37,0x39,0x36,0x37,0x39]).
A = 'BaseX:1369578179679'.

So, the thing looks like a 0-terminated string indicating the software and some integer that acts as authentication. Just use get_byte/2 until you get a 0-byte :slight_smile:

Of course, given it is a short string and it cannot be joined to anything coming after it reading until there is no more is fairly safe. Still bad style though.

Buffsize 1024 is just an arbitrary block size. With small block sizes the done() fuction is called too often, thus reducing performance. AFAIK choosing a large block size means that even for small server-responses, R first allocates a complet block in memory. 1024 seemed a good value for me.

All the results from the server come as 0-terminated strings or raw byte arrays. And the response from the server always terminates with a 00-byte (meaning success) or a 01-byte (meaning error). But since in some cases 00-bytes are inserted deliberately in the stream (amongst others they are used to separate the query results), getting a 00-byte does not mean that there are no more bytes pending in the buffer.

As long as there is at least 1 byte in the buffer, readBin_ does not enter a deadlock. (But I will give my R-code a good look! Thanks for the hint).

There were several in the R forums who shared your thoughts on bad style coding. However, after just accepting that this is the format that is used, it turned out to be quite easy to convert the server response to the desired output.
What applies to the output of the server also applies to the input. The input is also presented to the server as 1 large array of raw code but at least in R creating this array is very simple.

I hope you now understand why Iā€™m looking for methods to perform low-level read and write operations on the socket.

Just the condition seems wrong. I understand arbitrary arrays are sent and thus it is possible for these arrays to be a multiple of 1024 long. Now your loop is going to try and read the next block which will never come. And, if we are talking arbitrary (and long) arrays, network layers will chop it into pieces and some delay in the network will give you short blocks while there is still data in the pipeline. If you only use the localhost loopback network you might not suffer too much from this (depending on load and OS).

Anyway, In theory you can get there using the following steps:

  • Set the stream buffer size to 1024 (using set_stream/2).
  • Make sure the stream is binary (sockets are by default binary).
  • Loop using fill_buffer/1 and read_pending_codes/3 until the next block you get is not 1024 long.

This is not pretty and also relatively slow. Iā€™d fix the protocol. A good policy is to first send the length of the message as HTTP does or, if you do not know, use something along the lines of HTTP chunked encoding that chops a message into pieces each of which is preceded by a length (and the last has some indication this is the end). Most likely R has support for websockets (it supports nearly everything :slight_smile: ), which is a great way to send arbitrary messages over a stream. If you decide to stick with what you have, my advice would be to add a small C or C++ helper to read the message and probably also do some initial conversion such that you get a Prolog friendly message. Prolog is not good with bytes :frowning:

Fixing the protocol will not be an option. The installed base of BaseX is too great and they will not change the protocol just because I run into problems :smirk: :wink:.

Some time ago I started working on generic C++ code that would handle all interactions between the BaseX server and a client. I stopped this subproject because after a lot of searching I finally found the cause for the lower performance of my RBaseX client. The need to write the generic C++ code disappeared.
While working on RBaseX, I got a good idea of the structure of the data that is transported and of the basic transformations. The associated code is already neatly isolated.
I will take another look at my C++ code and see if I can integrate it into my prolog project as well.

1 Like

That may change if you can demonstrate that the protocol is broken. From what you describe I think that is an easy job :slight_smile: