From 47cd8139ed41d3f63dcd7ca04547afbe51f6122b Mon Sep 17 00:00:00 2001 From: DMRobertson Date: Mon, 7 Aug 2023 17:37:08 +0000 Subject: deploy: 9d3713d6d512d30a42456c9af25a3ab1a8865406 --- .../development/synapse_architecture/streams.html | 316 +++++++++++++++++++++ 1 file changed, 316 insertions(+) create mode 100644 develop/development/synapse_architecture/streams.html (limited to 'develop/development/synapse_architecture/streams.html') diff --git a/develop/development/synapse_architecture/streams.html b/develop/development/synapse_architecture/streams.html new file mode 100644 index 0000000000..a76d3e8020 --- /dev/null +++ b/develop/development/synapse_architecture/streams.html @@ -0,0 +1,316 @@ + + + + + + Streams - Synapse + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ +
+ + + + + + + +
+
+ +
+ +
+ +

Streams

+

Synapse has a concept of "streams", which are roughly described in id_generators.py. +Generally speaking, streams are a series of notifications that something in Synapse's database has changed that the application might need to respond to. +For example:

+
    +
  • The events stream reports new events (PDUs) that Synapse creates, or that Synapse accepts from another homeserver.
  • +
  • The account data stream reports changes to users' account data.
  • +
  • The to-device stream reports when a device has a new to-device message.
  • +
+

See synapse.replication.tcp.streams for the full list of streams.

+

It is very helpful to understand the streams mechanism when working on any part of Synapse that needs to respond to changes—especially if those changes are made by different workers. +To that end, let's describe streams formally, paraphrasing from the docstring of AbstractStreamIdGenerator.

+

Definition

+

A stream is an append-only log T1, T2, ..., Tn, ... of facts1 which grows over time. +Only "writers" can add facts to a stream, and there may be multiple writers.

+

Each fact has an ID, called its "stream ID". +Readers should only process facts in ascending stream ID order.

+

Roughly speaking, each stream is backed by a database table. +It should have a stream_id (or similar) bigint column holding stream IDs, plus additional columns as necessary to describe the fact. +Typically, a fact is expressed with a single row in its backing table.2 +Within a stream, no two facts may have the same stream_id.

+
+

Aside. Some additional notes on streams' backing tables.

+
    +
  1. Rich would like to ditch the backing tables.
  2. +
  3. The backing tables may have other uses. +> For example, the events table serves backs the events stream, and is read when processing new events. +> But old rows are read from the table all the time, whenever Synapse needs to lookup some facts about an event.
  4. +
  5. Rich suspects that sometimes the stream is backed by multiple tables, so the stream proper is the union of those tables.
  6. +
+
+

Stream writers can "reserve" a stream ID, and then later mark it as having being completed. +Stream writers need to track the completion of each stream fact. +In the happy case, completion means a fact has been written to the stream table. +But unhappy cases (e.g. transaction rollback due to an error) also count as completion. +Once completed, the rows written with that stream ID are fixed, and no new rows +will be inserted with that ID.

+

Current stream ID

+

For any given stream reader (including writers themselves), we may define a per-writer current stream ID:

+
+

The current stream ID for a writer W is the largest stream ID such that +all transactions added by W with equal or smaller ID have completed.

+
+

Similarly, there is a "linear" notion of current stream ID:

+
+

The "linear" current stream ID is the largest stream ID such that +all facts (added by any writer) with equal or smaller ID have completed.

+
+

Because different stream readers A and B learn about new facts at different times, A and B may disagree about current stream IDs. +Put differently: we should think of stream readers as being independent of each other, proceeding through a stream of facts at different rates.

+

NB. For both senses of "current", that if a writer opens a transaction that never completes, the current stream ID will never advance beyond that writer's last written stream ID.

+

For single-writer streams, the per-writer current ID and the linear current ID are the same. +Both senses of current ID are monotonic, but they may "skip" or jump over IDs because facts complete out of order.

+

Example. +Consider a single-writer stream which is initially at ID 1.

+ + + + + + + + + + + + +
ActionCurrent stream IDNotes
1
Reserve 21
Reserve 31
Complete 31current ID unchanged, waiting for 2 to complete
Complete 23current ID jumps from 1 -> 3
Reserve 43
Reserve 53
Reserve 63
Complete 53
Complete 45current ID jumps 3->5, even though 6 is pending
Complete 66
+

Multi-writer streams

+

There are two ways to view a multi-writer stream.

+
    +
  1. Treat it as a collection of distinct single-writer streams, one +for each writer.
  2. +
  3. Treat it as a single stream.
  4. +
+

The single stream (option 2) is conceptually simpler, and easier to represent (a single stream id). +However, it requires each reader to know about the entire set of writers, to ensures that readers don't erroneously advance their current stream position too early and miss a fact from an unknown writer. +In contrast, multiple parallel streams (option 1) are more complex, requiring more state to represent (map from writer to stream id). +The payoff for doing so is that readers can "peek" ahead to facts that completed on one writer no matter the state of the others, reducing latency.

+

Note that a multi-writer stream can be viewed in both ways. +For example, the events stream is treated as multiple single-writer streams (option 1) by the sync handler, so that events are sent to clients as soon as possible. +But the background process that works through events treats them as a single linear stream.

+

Another useful example is the cache invalidation stream. +The facts this stream holds are instructions to "you should now invalidate these cache entries". +We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations. +(Invalidations are self-contained facts; and the invalidations commute/are idempotent).

+

Writing to streams

+

Writers need to track:

+
    +
  • track their current position (i.e. its own per-writer stream ID).
  • +
  • their facts currently awaiting completion.
  • +
+

At startup,

+
    +
  • the current position of that writer can be found by querying the database (which suggests that facts need to be written to the database atomically, in a transaction); and
  • +
  • there are no facts awaiting completion.
  • +
+

To reserve a stream ID, call nextval on the appropriate postgres sequence.

+

To write a fact to the stream: insert the appropriate rows to the appropriate backing table.

+

To complete a fact, first remove it from your map of facts currently awaiting completion. +Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream. +Upon doing so it should emit an RDATA message3, once for every fact between the old and the new stream ID.

+

Subscribing to streams

+

Readers need to track the current position of every writer.

+

At startup, they can find this by contacting each writer with a REPLICATE message, +requesting that all writers reply describing their current position in their streams. +Writers reply with a POSITION message.

+

To learn about new facts, readers should listen for RDATA messages and process them to respond to the new fact. +The RDATA itself is not a self-contained representation of the fact; +readers will have to query the stream tables for the full details. +Readers must also advance their record of the writer's current position for that stream.

+

Summary

+

In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous.

+
+
1 +

we use the word fact here for two reasons. +Firstly, the word "event" is already heavily overloaded (PDUs, EDUs, account data, ...) and we don't need to make that worse. +Secondly, "fact" emphasises that the things we append to a stream cannot change after the fact.

+
+
2 +

A fact might be expressed with 0 rows, e.g. if we opened a transaction to persist an event, but failed and rolled the transaction back before marking the fact as completed. +In principle a fact might be expressed with 2 or more rows; if so, each of those rows should share the fact's stream ID.

+
+
3 +

This communication used to happen directly with the writers over TCP; +nowadays it's done via Redis's Pubsub.

+
+ +
+ + +
+
+ + + +
+ + + + + + + + + + + + + \ No newline at end of file -- cgit 1.5.1