Improve sal-akka-raft serialization protocol
Description
Attachments
is blocked by
is duplicated by
relates to
Activity
Robert Varga January 7, 2025 at 9:03 PM
Receivers of new serialization protocol should be using
Robert Varga December 31, 2023 at 11:44 AM
Alright, after spending a couple of days in the area, this is a tad more complicated.
SegmentedJournal's API captures the RAFT journal, i.e. Journal<E> would really like to be Journal<ReplicatedLogEntry>, which is quite a bit different layer from where sal-akka-segmented-journal operates.
https://git.opendaylight.org/gerrit/c/controller/+/109487 shows a prototype outline of what we would like to see w.r.t. serialization API: each entry ends up writing multiple fragments and on read it is composed of a number of read only ByteBuffer (backed by the mapped file).
The end play is , which ends up integrating persistence into ReplicatedLog implementation, so that the implementation deals with snapshots, trimming, persistence etc. etc.
Digging into this deeper, we have two technologies available off the shelf:
1. Aeron Archive, which uses multiple segments and allows querying it via replay, but the problem is this functionality assumes remote access and therefore there is no way to take a reference to the off-heap DirectBuffer for efficient fragment reassembly (i.e. for messages >16MiB it is recommended to perform application-level reassembly). The up side is that Aeron has log replication freely available.
2. Chronicle Queue, which uses multiple segments and heavily assumes local control plus offers a very neat API to stored bytes (https://www.javadoc.io/static/net.openhft/chronicle-bytes/2.25ea3/net/openhft/chronicle/bytes/BytesMarshallable.html) with possibly being able to retain references (BytesStore and ReferenceCounted interfaces). The down side is that log replication is a commercial feature.
The BytesMarshallable interface certainly looks like something we'd like as the base model of how a payload gets serialized/deserialized.
The second part of this is the fact that Payload users seem to want the associated Identifier – all relevant implementations have some sort of identifier and those which do not can fake it. Lifecycle is quite murky, but it would seem advantageous to move the identifier into an envelope, so it is available without deserializing the entire payload (as it is at the head of it!).
That brings us to the Serdes API. As the mentioned patch shows, we want to side-step the need for writeObject() in sal-akka-segmented-journal by having a each Payload type:
having a symbolic name (plain ASCII, i.e. 0-127, bytes, so encoding is single-byte)
be dynamically bound to marshallers, so that each symbolic name has an assigned int32 (or smaller) unique to the set of known marshallers
Based on this API, segmented-journal should be able to side-step writeObject() and thus eliminate the worst Java serialization overhead.
In parallel to this, we need to fix the entry UUID duplications, which looks like something that is internal to the journal plugin and does not need any other help.
Robert Varga April 19, 2023 at 9:28 PMEdited
Right, and that's all centralized in one place and does not bring sal-akka-segmented-journal into the picture at all.
So let's start at the bottom and design how will things get persisted in sal-akka-segmented-journal and atomix-storage, what interface will be available for sal-akka-raft to plug into. That includes how things will hit storage, but also how things will get restored – potentially in a different version.
The goal is not to rely on Serializable anywhere in the stack.
Samuel Schneider April 19, 2023 at 1:46 PM
Robert Varga March 2, 2023 at 9:55 PMEdited
The end-to-end serialization protocol needs to achieve something that Aeron has integrated: a single logical message (say, CommitTransactionPayload) is not written as a single byte[] prepared beforehand, but rather is scattered into multiple fragments which have some (possibly small) maximum size – in our world each such fragment would be a single SegmentedJournal entries. These then get presented to upper layers on reception, i.e. we are looking at doing reassembly.] In concrete terms this would mean that a ReplicatedLog entry is backed by multiple SegmentedJournal entries.
For our implementation purposes, I think we should be okay by not exposing fragmentation explictly, but hiding it behind a DataOutput – which on writeInt() does the right thing behind the scenes. On reception, the callback would be a DataInput, which actually goes to multiple SegmentedJournal spans (suitably abstracted as ByteBuffer, or similar) and reads them as things are being read it.
I am not sure this will cut it, though: the read side may be fine being synchronous, the write side, though, can hit storage allocation (i.e. when SegmentedJournal creates the next file) and at least in CSIT that operation can take almost a minute – and during that time we need to be able to handle RAFT messages, so as not to lose leadership (for example).
The cutover to Akka Artery and tell-based protocol (which puts more strain on persistence) crops up problems with serialization of our messages, like (estimates) and (raw volume).
Given that Akka has deprecated Java serialization as the protocol to use and the mentioned issues, it would be prudent to implement a better serialization protocol around Payloads, both in order to remove dependency on Java serialization and to improve its efficiency.
This mostly revolves around org.opendaylight.controller.cluster.raft.messages.Payload extensibility – we really want to attach a PayloadRegistry which will handle dispatch to serialization based on a single-byte type and base it on DataOutput/WritableObject interfaces. This will allow us to reduce the message overheads (which are significant).
We should also deal with sal-akka-raft message serialization based on similar interface and tie it in with sal-akka-segmented-journal, so that it can operate more efficiently than through Serializable.