Library(redis): stream data

There should probably be a way with Reply as Value to convert the stream data to a more usable type:


1 ?- use_module(library(redis)).
true.

2 ?- redis(default,xrange(mys,-,+),R).  % with some mock data in the stream
R = [["1603327576388-0", ["type", "backup", "cmd", "dobak", "outcome", "sucess"]], ["1603327589307-0", ["type", "backup", "cmd", "dobak", "outcome"|...]], ["1603327611351-0", ["type", "backup", "cmd", "dorestore"|...]], ["1603327651797-0", ["type", "backup", "cmd"|...]], ["1603327669719-0", ["type", "backup"|...]], ["1603327683570-0", ["type"|...]], ["1603395488304-0", [...|...]]].

Also there is a problem with error handling (probably data left in the buffers due to the exception?):

3 ?- redis(default,xrange(mys,-,+),R as dict(auto)).
ERROR: Domain error: `redis_map_length' expected, found `7'
ERROR: In:
ERROR:   [19] throw(error(domain_error(redis_map_length,7),_88492))
ERROR:   [16] redis:redis_write_msg(<stream>(0x56491604ab00,0x56491604ac00),...),redis:redis_read_stream(default,<stream>(0x56491604ab00,0x56491604ac00),...) at /home/u/tmp/swipl-devel/build.release/home/boot/init.pl:368
ERROR:   [15] <meta call>
ERROR:   [14] with_mutex(default,(redis_write_msg(<stream>(0x56491604ab00,0x56491604ac00),...),redis_read_stream(default,<stream>(0x56491604ab00,0x56491604ac00),...))) <foreign>
ERROR:   [12] catch(redis:redis2(default,...,...),error(_88666,_88668),redis:true) at /home/u/tmp/swipl-devel/build.release/home/boot/init.pl:528
ERROR:   [11] redis:redis1(default,xrange(mys,-,+),_88720 as dict(auto)) at /home/u/tmp/swipl-devel/build.release/home/library/redis.pl:517
ERROR:   [10] redis:redis(default,xrange(mys,-,+),_88772 as dict(auto)) at /home/u/tmp/swipl-devel/build.release/home/library/redis.pl:505
ERROR:    [9] toplevel_call(user:user: ...) at /home/u/tmp/swipl-devel/build.release/home/boot/toplevel.pl:1113
ERROR: 
ERROR: Note: some frames are missing due to last-call optimization.
ERROR: Re-run your program in debug mode (:- debug.) to get more detail.

4 ?- redis(default,xrange(mys,-,+),R).
R = ["1603327576388-0", ["type", "backup", "cmd", "dobak", "outcome", "sucess"]].

5 ?- redis(default,xrange(mys,-,+),R).
R = ["1603327589307-0", ["type", "backup", "cmd", "dobak", "outcome", "error"]].

5 ?- redis(default,xrange(mys,-,+),R).
R = ["1603327611351-0", ["type", "backup", "cmd", "dorestore", "outcome", "partial success"]].

Notice how after the exception the replies are partial left overs. Also strangely enough the top-level line number did not get updated in the last call.

SIDE NOTE: for some reason the post coloring is off and the commands I typed in the top-level after the exception are colored red)

Yeah. The error handling is a problem. I’m not yet sure how to deal with this. I see roughly three options:

  • As with socket errors, drop the connection on an error. It will reconnect in a fresh state. Fairly simple to do, but it is a bit ugly to disconnect for this reason.
  • Build some re-sync. I guess the best way to do so is to run echo(Magic) and eat all data until you see Magic. In practice that should work pretty much ok, I guess. If Magic is some crypto quality random is should be hard to misuse. Also feels a bit ugly though …
  • Remember what should be pending and read all that. That feels the cleanest. Considering the complexity it might in practice be the worst approach though :frowning:

No clue why the toplevel seems to get upset. One problem at a time …

Would it be helpful if I described the mechanism used by Google’s Bigtable? - Although it’s a query-reply API, not a stream API, so might not be relevant. (And I’m not familiar with what redis uses under the covers to detect that a connection has gone away … IIRC, standard TCP/IP can take up to 2 hours to time out under some circumstances.)

1 Like

It would be great to know

Since this is done within an exception we can afford a little bit of time: how about reading until the socket buffer is empty (discarding the data), and then try reading again (perhaps after a few milliseconds?), if there is no data we are clean.

There is another option, which I think might be better:

  • Read all the data for the reply, following the RESP3 or RESP2 spec, and do the conversion of types (Reply as Type) only after all the data has been read.

Timing is dangerous stuff in concurrent programming. Under heavy load, unreliable network connections, etc. things can easily take a little longer.

That is indeed a safe route. The price is that we need to buffer the entire message, I guess as a linked C structs. That means quite a bit if code, memory allocation and deallocation and doubling the memory usage :frowning: For now I’ve implemented resync/1 inside library(redis), which

  • Computes a big random number
  • Sends this to the connection as ECHO Number
  • Scans the reply for $<len>\r\n<number>\r\n

That may look like a bit weird solution. It is fairly simple and looks safe enough to me though.

Available on GIT.

Almost missed that. Difficult. Some of the Redis replies are complicated nested structures. Here we have an array of messages, where every message is an array of length 2 holding the message id and an array alternating field/value pairs. The latter seems even when using the RESP3 to be an array rather than a map. We’d need something like

redis(default, xrange(s,-,+), Reply as list(pairs(string,dict(auto)))).

To get [“Id”-_{key:value, …}, …]

And the alternative is to have more high level predicates to deal with streams as already provided by library(redis_streams).

What is better?

I think this is better, because it is more generic, and we’ll be able to use it for user defined types also. It is also semantically consistent (e.g. primitive types are treated with the same syntax as nested structures). An additional advantage is that it will allow usage of future redis data types without having to change library(redis). All in all I think this is a great option, much better than the alternative.

EDIT : we could provide the atom streamdata as an alias for list(pairs(string,dict(auto)))).

Ahh… I didn’t think about this, quite true.

Seems to me the most sensible solution, given the potential memory usage.

I tend to agree. The … as Type notation works quite well while trying to use all this to make SWISH run on Redis. Arbitrary nesting of the type specification requires some redesign. It is quite exiting to see how much needs to be done to realize a truly natural binding to Redis.

This isn’t the end. I understand some more logic needs to be implemented in a client to talk to a Redis cluster. As I understand it, the client needs to interrogate the cluster to establish how keys are distributed over the masters, split requests into read-only requests (that can be sent to the closest instance, being either a master or a replicator) and write requests that must identify the key and compute the responsible master and send the request to this master.

Yes, I find it quite an important and exciting addition to SW-Prolog.

Right; there also was an effort (for Redis 6) to produce a proxy that would do this, but it is now only alpha.

INTERESTING: the most important discovery that I have made in learning redis with you is a new architecture for distributed systems using streams.

The idea is that the “source of truth” is an immutable event stream (which can include many streams that confluence into larger streams, etc).

The key is that the stream(s) contain replayable commands/data which allow any connected system to reconstruct the state at any point in time (separating action from perception). The only way to make a change is to emit an event into the stream. This allows completely new (and maybe even totally different) clients to join the system in a much easier manner, and it also allows flexible growth of the data model. This solves many problems with synchronization and makes for a very flexible and consistent system, see this talk for more info on how this is used in banking systems, automotive and other critical systems.

EDIT: I think all this can be applied to SWISH in a very nice way. The events in streams would be something like:

  • I have this prolog program → goes into the compilation consumer stream
  • I have the compilation results from this program (errors, or pointers to other streams that can receive prolog queries) → goes into the program compiled stream
  • I have this query → goes into the I want to run a query for this program stream
  • I have this result to a query → goes into the “query results for program A” stream

Some streams can go out of existence as they stop being useful.

All these would go into a group of connected streams and then the clients (prolog servers or javascript clients) would connect to the streams and emit new events or receive query results, etc.

Something I have learned, particular to how redis streams are designed, is that we should pair consumers with streams in such a way that a consumer can handle any subsequence of messages in its corresponding stream without filtering the items. These smaller streams that correspond to one type of consumer can be connected or filtered into larger or filtered streams for other types consumers. Quite an interesting and exciting architecture.

Yes. I’m now storing the dynamic data of SWISH in Redis, which should allow multiple SWISH instances to connect to the same Redis instance. If you put a load balancer in front of this that implements sticky sessions this should create a cluster. While I think this is a good starting point, I started to realize we can probably use Redis streams to implement brokering Pengines at a lower level. We’ll see …

3 Likes

Any chance that this might involve a more thorough redesign of library(pengines) – perhaps something along the lines of the ideas behind Web Prolog? If what you write above means that a new library(pengines), which implements the brokering of pengines at a lower level, can no longer remain compatible with the old one (although I’m not sure this is what it implies), it seems to me that this would be a good point to reconsider the design more broadly.

FYI for others:

Pengines - Getting started
library(pengines): Pengines: Web Logic Programming Made Easy

1 Like

I don’t know. One thing is that there are (I think) quite a few projects that use one of the Pengines client implementations in various languages. That is a strong motivation to keep the HTTP api compatible. Of course, this can be layered over something different.

It could be interesting to use redis streams to create and communicate with a Pengine. That would allow to have a pool of Prolog processes, possibly running on multiple hosts that host Pengines. That provides horizontal scalability.

I have the impression that Pengines, as they now work in the context of SWISH, have a different aim than Web Prolog. We’ll see. One step at a time :slight_smile:

1 Like

That’s not the way I see it. I’m 100% sure the pengines (as they appear in Web Prolog) can do exactly the same job as Pengines do in SWISH. Well, I suppose the aim of Web Prolog is broader and more general than Pengines, but why hold that against it?

As I show in my Intro to Web Prolog for Erlangers paper, you can, in addition to serving SWISH, do a lot more in Web Prolog compared to what can be done with library(pengines), using primitives that really work and are easy to understand as they have developed by the people behind well-established languages such as Erlang and Elixir.

BTW, I found, only yesterday, a submission to Quora by Ulf Wiger, a well-known Erlang expert, who seems to like the ideas behind Web Prolog. Here are his thoughts.

And, finally, quoting Richard O’Keefe, who wrote, after having read my paper:

I should tell you that I am liking what I see very much and would enjoy using this.

I bet you and probably others are missing Richard’s input to the Prolog community. Who knows, perhaps Web Prolog, or hopefully a much improved version of it, can be used to lure him back?

Yeah, I know I’m grasping at straws here… :slight_smile:

(Disclaimer: I know very little about redis, so I have no idea how relevant this is)
(Disclaimer: My knowledge of Bigtable is ~10 years old, but I don’t think anything significant has changed)
(Disclaimer: Spanner was experimental when I was at Google, and I don’t know much about it; however, I think it basically follows the Bigtable model, but expands it to work better across geographically dispersed data centers)

The core papers are:
Bigtable
Spanner
Borg
Chubby lock server

Bigtable partitions the table data into thousands or millions of “tablets”; each tablet is handled by at most one “tablet server”. The data itself are handled by the distributed file system (originally, “GFS”; now it’s “Colossus”) which has its own set of servers, redundancy, Reed-Solomon checksums. (The details of how the key-values are stored, the journaling, and Colossus aren’t relevant to this discussions.)

There is also an underlying system for managing jobs called Borg, which handles distributed jobs with multiple servers and automatically restarts servers when they fail. Thus, Bigtable doesn’t need to do anything to keep its various servers up and running; but it does need to handle some of the aspects of a server stopping and a replacement server starting. The basic model is of multiple servers, all running the same executable, and each with its own unique identifier. Communication is done via RPC, and all servers are treated as if they’re remote, even when running on the same machine (typically, each server runs in a chroot jail). When a server is restarted, it can be on the same or a different machine.

There are 3 components to Bigtable: the master, the tablet servers, and the clients. The master is multiple servers that elect a single master using Paxos. The master also contains various pieces of “root” data, and the lock files that tablet servers use to ensure that each tablet is handled by only one tablet server. (The lock files and their content use Chubby)

Each tablet server handles a few thousands tables. It accepts commands from the master to handle new tablets; there is also an internal API that allows tablet splitting and migration of tablets to other servers. Roughly speaking, a split is done by picking a split point, releasing the tablet, and asking the master to create two new tablets - in this way, processing load automatically rebalances across multiple tablet servers.

When a client access a table row, it looks up the key in a list of key-range→tablet-server. If there’s no entry, the client asks the master which server to use. The client then communicates directly with the tablet server. If there’s a failure, the client assumes that the server has died and asks the master for the new server.

From a management point of view, an easy way of fixing a stuck tablet server is to kill it – Borg will start a new one (possibly on a different machine) and everything will continue as before (the newly started tablet server will ask the master about which key ranges it should handle; depending on how fast the tablet server restarts, the ranges might have already been migrated to other tablet servers). There are also situations where communication between the master and the tablet server fails – the master can kill the tablet server and the same recovery process will occur; similarly, if a client loses contact with a tablet server, it can ask the master for a replacement. (There are some tricky corner cases when the master thinks that the tablet server is OK but the client can’t communicate with it.)

Note that a tablet server doesn’t have any local data – everything is stored in the distributed file system “Colossus”. This greatly simplifies the recovery process. (I think that it’s now the case that there are literally no local disks on Google cloud servers - all data are stored on the equivalent of Network Attached Storage (NAS), which is made possible by super high-speed local networks that can have similar throughput to locally attached disk.) The distributed file system and the Chubby lock server are used to ensure that there are no “zombie” tablet servers that duplicate live tablet servers.

If a tablet server gets an error, the standard behavior is to write a log message and die. Borg will then start a new tablet server (possibly on another machine) and an automated process will analyze the log messages. If a tablet server gets overloaded, its keyrange can be changed – the client will get an error for keys outside the range and will get a new server’s address from the master.

When there’s a software upgrade to the tablet servers, it’s done through Borg by making the new executable available and then killing the tablet servers over a period of time – the normal handling of a dead tablet server automatically starts a new one, but with the upgraded software. Because all communication is done via gRPC, the upgrade is very smooth. (I’ve had experience with more conventional systems that use.so files; for those, an upgrade tends to be a painful process.)

The communication channels use a protocol on top of TCP and UDP (this also transparently handles encryption – by default every connection, even on the same machine, is encrypted). There are both request-response and streaming variants of RPC; both use protobufs – details are in the various gRPC documents. Standard TCP is not good for quickly detecting end-point failures, which is why additional keep-alives are added (I forget the details; roughly speaking there are UDP keep-alives in addition to TCP, plus some trickery for handling situations where there’s asymmetric routing or where routing varies according to a combination of IP/port/protocol).

In summary: clients talk directly to servers and only talk to the master when they need to know where a tablet server is. All data are stored in the distributed file system or in special (distributed) lock files. The standard method of handling errors is to kill a tablet server. If a server gets overloaded, typically its keyrange is changed and a new server is started. Lock files are used to ensure no zombie or duplicate servers.

2 Likes

Do you have an update on this? I am getting ready to write some redis stream code and this will surely help!! Thanks!

I’m a bit trapped in tabling and transaction related stuff. I’ll surely get back to this. For now I’m afraid you either got to do it yourself or abstract the places where you need this with some utility predicate. That should typically do the job as such complex return values are related to only a couple of Redis commands.

1 Like