diff --git a/synapse/types.py b/synapse/types.py
index bc73e3775d..6c338fe0b7 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -22,6 +22,7 @@ from typing import (
TYPE_CHECKING,
Any,
Dict,
+ Iterable,
Mapping,
MutableMapping,
Optional,
@@ -44,7 +45,7 @@ if TYPE_CHECKING:
if sys.version_info[:3] >= (3, 6, 0):
from typing import Collection
else:
- from typing import Container, Iterable, Sized
+ from typing import Container, Sized
T_co = TypeVar("T_co", covariant=True)
@@ -389,7 +390,7 @@ def map_username_to_mxid_localpart(username, case_sensitive=False):
return username.decode("ascii")
-@attr.s(frozen=True, slots=True)
+@attr.s(frozen=True, slots=True, cmp=False)
class RoomStreamToken:
"""Tokens are positions between events. The token "s1" comes after event 1.
@@ -411,6 +412,31 @@ class RoomStreamToken:
event it comes after. Historic tokens start with a "t" followed by the
"topological_ordering" id of the event it comes after, followed by "-",
followed by the "stream_ordering" id of the event it comes after.
+
+ There is also a third mode for live tokens where the token starts with "m",
+ which is sometimes used when using sharded event persisters. In this case
+ the events stream is considered to be a set of streams (one for each writer)
+ and the token encodes the vector clock of positions of each writer in their
+ respective streams.
+
+ The format of the token in such case is an initial integer min position,
+ followed by the mapping of instance ID to position separated by '.' and '~':
+
+ m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}. ...
+
+ The `min_pos` corresponds to the minimum position all writers have persisted
+ up to, and then only writers that are ahead of that position need to be
+ encoded. An example token is:
+
+ m56~2.58~3.59
+
+ Which corresponds to a set of three (or more writers) where instances 2 and
+ 3 (these are instance IDs that can be looked up in the DB to fetch the more
+ commonly used instance names) are at positions 58 and 59 respectively, and
+ all other instances are at position 56.
+
+ Note: The `RoomStreamToken` cannot have both a topological part and an
+ instance map.
"""
topological = attr.ib(
@@ -419,6 +445,25 @@ class RoomStreamToken:
)
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
+ instance_map = attr.ib(
+ type=Dict[str, int],
+ factory=dict,
+ validator=attr.validators.deep_mapping(
+ key_validator=attr.validators.instance_of(str),
+ value_validator=attr.validators.instance_of(int),
+ mapping_validator=attr.validators.instance_of(dict),
+ ),
+ )
+
+ def __attrs_post_init__(self):
+ """Validates that both `topological` and `instance_map` aren't set.
+ """
+
+ if self.instance_map and self.topological:
+ raise ValueError(
+ "Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'."
+ )
+
@classmethod
async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken":
try:
@@ -427,6 +472,20 @@ class RoomStreamToken:
if string[0] == "t":
parts = string[1:].split("-", 1)
return cls(topological=int(parts[0]), stream=int(parts[1]))
+ if string[0] == "m":
+ parts = string[1:].split("~")
+ stream = int(parts[0])
+
+ instance_map = {}
+ for part in parts[1:]:
+ key, value = part.split(".")
+ instance_id = int(key)
+ pos = int(value)
+
+ instance_name = await store.get_name_from_instance_id(instance_id)
+ instance_map[instance_name] = pos
+
+ return cls(topological=None, stream=stream, instance_map=instance_map,)
except Exception:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
@@ -450,14 +509,61 @@ class RoomStreamToken:
max_stream = max(self.stream, other.stream)
- return RoomStreamToken(None, max_stream)
+ instance_map = {
+ instance: max(
+ self.instance_map.get(instance, self.stream),
+ other.instance_map.get(instance, other.stream),
+ )
+ for instance in set(self.instance_map).union(other.instance_map)
+ }
+
+ return RoomStreamToken(None, max_stream, instance_map)
+
+ def as_historical_tuple(self) -> Tuple[int, int]:
+ """Returns a tuple of `(topological, stream)` for historical tokens.
+
+ Raises if not an historical token (i.e. doesn't have a topological part).
+ """
+ if self.topological is None:
+ raise Exception(
+ "Cannot call `RoomStreamToken.as_historical_tuple` on live token"
+ )
- def as_tuple(self) -> Tuple[Optional[int], int]:
return (self.topological, self.stream)
+ def get_stream_pos_for_instance(self, instance_name: str) -> int:
+ """Get the stream position that the given writer was at at this token.
+
+ This only makes sense for "live" tokens that may have a vector clock
+ component, and so asserts that this is a "live" token.
+ """
+ assert self.topological is None
+
+ # If we don't have an entry for the instance we can assume that it was
+ # at `self.stream`.
+ return self.instance_map.get(instance_name, self.stream)
+
+ def get_max_stream_pos(self) -> int:
+ """Get the maximum stream position referenced in this token.
+
+ The corresponding "min" position is, by definition just `self.stream`.
+
+ This is used to handle tokens that have non-empty `instance_map`, and so
+ reference stream positions after the `self.stream` position.
+ """
+ return max(self.instance_map.values(), default=self.stream)
+
async def to_string(self, store: "DataStore") -> str:
if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream)
+ elif self.instance_map:
+ entries = []
+ for name, pos in self.instance_map.items():
+ instance_id = await store.get_id_for_instance(name)
+ entries.append("{}.{}".format(instance_id, pos))
+
+ encoded_map = "~".join(entries)
+ return "m{}~{}".format(self.stream, encoded_map)
else:
return "s%d" % (self.stream,)
@@ -549,7 +655,7 @@ class PersistedEventPosition:
stream = attr.ib(type=int)
def persisted_after(self, token: RoomStreamToken) -> bool:
- return token.stream < self.stream
+ return token.get_stream_pos_for_instance(self.instance_name) < self.stream
def to_room_stream_token(self) -> RoomStreamToken:
"""Converts the position to a room stream token such that events
|