Playing around with local LLM and having issues handling chunked responses

I’ve been playing around with talking to a local LLM (in this case via ollama) using some proof-of-concept code pasted further down so I can have an assistant at the REPL.
FYI I’m using the latest develop version of SWI 9.3.24

If you want to play/test, you will need to install ollama and a suitable model. e.g.

brew install ollama
ollama serve&
ollama pull codellama:latest

In another terminal session you have swipl running the module code and ask the LLM questions. e.g.

?- ol_chat("Tell me about yourself.").
% _7036{messages:[_7022{content:Tell me about yourself.,role:user}],model:codellama,stream:true}
% > POST /api/chat HTTP/1.1
% > Host: localhost:11434
% > User-Agent: SWI-Prolog
% > Connection: close
% > Content-Type: application/json
Content-Length: 114


Warning: Ambiguous operation on stream pair <stream>(0x600002841400,0x600002841600)

Hello! I am LLaMA, an AI assistant developed by Meta AI that can understand and respond to human input in a conversational manner. I am trained on a massive dataset of text from the internet and can answer questions, provide information, and even generate creative content such as stories or dialogue. I can understand natural language and can communicate with you in a way that feels natural and similar to human conversation. I can be used to create chatbots, virtual assistants, and other applications where natural language input and output is desired.
% LLM duration: 26.903 seconds, Evals: 114, EPS=5.85768
true.

It all works fine except for the warning, which I cannot figure out how to remove. I believe it’s because the local LLM is sending chunked data, and due to issues with the chunked library I’m having to use http_chunked_open/3 directly, and maybe at that moment the predicate doesn’t know if the passed stream (from http_open/3) is a read or write stream? Is there any way to get rid of the warning?

Reading the docs, I shouldn’t need to use http_chunked_open/3 as the http_stream library should have transparently decoded the traffic such that the response stream from http_open/3 consists of json objects, however it fails reading the stream produced (see section after code).

Working code below:

:-module(ollama, [
             ol_chat/1,
             ol_chat/2,
             ol_clear/0,
             ol_history/0,
             ol_regenerate/0,
             ol_version/0,
             ol_version/1]).

:-use_module(library(macros)).
:-use_module(library(http/http_stream)).
:-use_module(library(http/http_json)).
:-use_module(library(http/http_open)).
:-use_module(library(http/http_client)).
:-use_module(library(http/json)).
:-use_module(library(memfile)).
:-use_module(library(predicate_options)).

:-debug(ollama).
:-debug(http(send_request)).

/*********** Default Model Settings **************/

%#define(llm_model, 'deepseek-coder-v2:latest').
#define(llm_model,'codellama').
#define(rag_model,'mxbai-embed-large:latest').
#define(ollama_host, 'localhost:11434').
#define(stream, true).

/*************************************************/
:-dynamic ol_history/3.


:-predicate_options(ol_version/1, 1, [host(text)]).
:-predicate_options(ol_chat/2, 2, [host(text),
                                   role:oneof([role, tool, assistant]),
                                   model(text),
                                   stream(boolean)]).

ol_clear :- retractall(ol_history(_, _, _)).

ol_regenerate :-
    ol_history(D, Question, _),
    \+ (ol_history(P, _, _), P@>D),
    once(retract(ol_history(D, _, _))),
    ol_chat(Question).

ol_version:-ol_version([]).
ol_version(Options) :-
    option(host(Host), Options, #ollama_host),
    atom_concat('http://', Host, Server),
    atom_concat(Server, "/api/version", URL),
    http_open(URL, Reply, [method(get), json_object(dict)]),
    json_read_dict(Reply, Data, []),
    close(Reply),
    print_message(information, format('~w', Data.version)).

ol_chat(Data) :- ol_chat(Data, []).
ol_chat(Data, Options) :-
    option(host(Host), Options, #ollama_host),
    option(role(Role), Options, 'user'),
    option(model(Model), Options, #llm_model),
    option(stream(Stream), Options, #stream),
    atom_concat('http://', Host, Server),
    atom_concat(Server, "/api/chat", URL),
    findall([_{role:user, content:Me}, _{role:assistant, content:You}], ol_history(_, Me, You), History),
    flatten(History, FHistory),
    append(FHistory, [_{role:Role, content:Data}], ChatHistory),
    Message = _{model:Model, messages:ChatHistory, stream:Stream},
    debug(ollama, '~w', Message),
    http_open(URL, Response, [headers(ResponseHeaders), post(json(Message)), raw_encoding(chunked)]),
    ( Stream == true ->
          setup_call_cleanup(
              (
                  new_memory_file(MFile),
                  open_memory_file(MFile, write, MStream)
              ),
              (
                  dump_stream(Response, MStream, CreatedAt),
                  flush_output(MStream),
                  close(MStream),
                  memory_file_to_string(MFile, Content),
                  assert(ol_history(CreatedAt, Data, Content))
              ),
              (
                  free_memory_file(MFile)
              )
          )
      ;
          % Was it chunked?
          (memberchk(transfer_encoding(chunked), ResponseHeaders) ->
               http_chunked_open(Response, ChunkData, []),
               json_read_dict(ChunkData, Reply, [])
           ;
               json_read_dict(Response, Reply, [])
          ),
          print_message(informational, format('LLM duration: ~3f seconds, Evals: ~I, EPS=~5f', [Reply.total_duration/10^9,Reply.eval_count,Reply.eval_count/Reply.eval_duration * 10^9])),
          write(Reply.message.content),
          assert(ol_history(Reply.created_at, Data, Reply.message.content))
    ).

ol_history:-
    ol_history(D, M, Y), \+format('-------------------------------------------------------------------------------~w--~nMe : ~w~nYou:~w~n', [D, M, Y]).
ol_history:-
	                   format('-----------------------------------------End of Transcript -------------------------------------------------~n').

dump_stream(Stream, MStream, CreatedAt):-
    http_chunked_open(Stream, ChunkData, []),
    dump_part(ChunkData, MStream, CreatedAt).

dump_part(ChunkData, MStream, CreatedAt):-
    json_read_dict(ChunkData, Part),
    write(Part.message.content),
    flush_output,
    format(MStream, '~w', Part.message.content),
    (_{ done:true } :< Part ->
         print_message(informational, format('LLM duration: ~3f seconds, Evals: ~I, EPS=~5f', [Part.total_duration/10^9,Part.eval_count,Part.eval_count/Part.eval_duration * 10^9])),
         CreatedAt = Part.created_at
     ;
         dump_part(ChunkData, MStream, CreatedAt)
    ).

The version that highlights issues letting the library handle the chunked transfer outputs the following chat:

?- ol_chat("Tell me about yourself.").
% _9496{messages:[_9240{content:Tell me about yourself.,role:user}],model:codellama,stream:true}
% > POST /api/chat HTTP/1.1
% > Host: localhost:11434
% > User-Agent: SWI-Prolog
% > Connection: close
% > Content-Type: application/json
Content-Length: 114



ERROR: key `message' does not exist in _59116{error:"error reading llm response: context canceled"}

If chunked transfer_encoding is supposed to be handled transparently I would expect the code to work below:

Failing code below:

:-module(ollama, [
             ol_chat/1,
             ol_chat/2,
             ol_clear/0,
             ol_history/0,
             ol_regenerate/0,
             ol_version/0,
             ol_version/1]).

:-use_module(library(macros)).
:-use_module(library(http/http_stream)).
:-use_module(library(http/http_json)).
:-use_module(library(http/http_open)).
:-use_module(library(http/http_client)).
:-use_module(library(http/json)).
:-use_module(library(memfile)).
:-use_module(library(predicate_options)).

:-debug(ollama).
:-debug(http(send_request)).

/*********** Default Model Settings **************/

%#define(llm_model, 'deepseek-coder-v2:latest').
#define(llm_model,'codellama').
#define(rag_model,'mxbai-embed-large:latest').
#define(ollama_host, 'localhost:11434').
#define(stream, true).

/*************************************************/
:-dynamic ol_history/3.


:-predicate_options(ol_version/1, 1, [host(text)]).
:-predicate_options(ol_chat/2, 2, [host(text),
                                   role:oneof([role, tool, assistant]),
                                   model(text),
                                   stream(boolean)]).

ol_clear :- retractall(ol_history(_, _, _)).

ol_regenerate :-
    ol_history(D, Question, _),
    \+ (ol_history(P, _, _), P@>D),
    once(retract(ol_history(D, _, _))),
    ol_chat(Question).

ol_version:-ol_version([]).
ol_version(Options) :-
    option(host(Host), Options, #ollama_host),
    atom_concat('http://', Host, Server),
    atom_concat(Server, "/api/version", URL),
    http_open(URL, Reply, [method(get), json_object(dict)]),
    json_read_dict(Reply, Data, []),
    close(Reply),
    print_message(information, format('~w', Data.version)).

ol_chat(Data) :- ol_chat(Data, []).
ol_chat(Data, Options) :-
    option(host(Host), Options, #ollama_host),
    option(role(Role), Options, 'user'),
    option(model(Model), Options, #llm_model),
    option(stream(Stream), Options, #stream),
    atom_concat('http://', Host, Server),
    atom_concat(Server, "/api/chat", URL),
    findall([_{role:user, content:Me}, _{role:assistant, content:You}], ol_history(_, Me, You), History),
    flatten(History, FHistory),
    append(FHistory, [_{role:Role, content:Data}], ChatHistory),
    Message = _{model:Model, messages:ChatHistory, stream:Stream},
    debug(ollama, '~w', Message),
    http_open(URL, Response, [post(json(Message))]),
    ( Stream == true ->
          setup_call_cleanup(
              (
                  new_memory_file(MFile),
                  open_memory_file(MFile, write, MStream)
              ),
              (
                  dump_stream(Response, MStream, CreatedAt),
                  flush_output(MStream),
                  close(MStream),
                  memory_file_to_string(MFile, Content),
                  assert(ol_history(CreatedAt, Data, Content))
              ),
              (
                  free_memory_file(MFile)
              )
          )
      ;
          json_read_dict(Response, Reply, []),
          print_message(informational, format('LLM duration: ~3f seconds, Evals: ~I, EPS=~5f', [Reply.total_duration/10^9,Reply.eval_count,Reply.eval_count/Reply.eval_duration * 10^9])),
          write(Reply.message.content),
          assert(ol_history(Reply.created_at, Data, Reply.message.content))
    ).

ol_history:-
    ol_history(D, M, Y), \+format('-------------------------------------------------------------------------------~w--~nMe : ~w~nYou:~w~n', [D, M, Y]).
ol_history:-
	                   format('-----------------------------------------End of Transcript -------------------------------------------------~n').

dump_stream(Stream, MStream, CreatedAt):-
    dump_part(Stream, MStream, CreatedAt).

dump_part(Stream, MStream, CreatedAt):-
    json_read_dict(Stream, Part),
    write(Part.message.content),
    flush_output,
    format(MStream, '~w', Part.message.content),
    (_{ done:true } :< Part ->
         print_message(informational, format('LLM duration: ~3f seconds, Evals: ~I, EPS=~5f', [Part.total_duration/10^9,Part.eval_count,Part.eval_count/Part.eval_duration * 10^9])),
         CreatedAt = Part.created_at
     ;
         dump_part(Stream, MStream, CreatedAt)
    ).

It results from an I/O predicate that is used on a stream pair and that could apply to both the input and output stream, but not meaningfully on both. If you step through the code with the tracer, you probably see the culprit.

AFAIK, http_open/3 should transparently handle chunked encoding. Well, after loading library(http/http_stream) as you can find at SWI-Prolog -- library(http/http_open): HTTP client library

I’m curious how good such as solution will be. I tried playing with llama about half a year ago with little luck. But, you surely have move experience and things have evolved :slight_smile:

Yes, I’ve been looking at the library and cannot figure out the discrepancy… :frowning:

Not really, but some organisations are wary of sending data to cloud LLMs, and prefer to use local LLMs. I was investigating how easy the Ollama API would be to implement as I noticed from their README there were wrappers for quite a few languages (even Haskell! ) .
It would also enable a quick/naive RAG (retrieval augmented generation) implementation using something like redis and the embeddings endpoint to index local documents…

1 Like

Works also with dnf install ollama (Fedora 42) :slight_smile:

With a little debugging it is your explicit call to http_chunked_open/3. This applies both the input and output stream and is (thus) ambiguous.

The problem is (was) that http_open/3 closes the output part of the stream when applying a transfer filter. Ollama interprets that as a request to terminate the request. It is quite nice that we can terminate the request :slight_smile: I pushed a fix that creates a new stream pair from the chunked filter stream and the original output stream. See FIXED: http_open/3: when applying a transfer filter, do not close the… · SWI-Prolog/packages-http@0b5ffd4 · GitHub. I’m not sure about the interaction with keep-alive connections. Ollama requests a close, so that does not apply. The general case requires some testing. It will work anyway as an I/O error when trying to revive a keep-alive connection always results in reconnecting.

Note that your code should close the stream pair returned by http_open/3.

Please keep us posted!

2 Likes