summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-24 13:24:17 +0100
committerGitHub <noreply@github.com>2020-09-24 13:24:17 +0100
commitac11fcbbb8ccfeb4c72b5aae9faef28469109277 (patch)
treee35d6b0e50558eb0d83389bb40f8179c4c254aab /synapse/replication/tcp
parentMark the shadow_banned column as boolean in synapse_port_db. (#8386) (diff)
downloadsynapse-ac11fcbbb8ccfeb4c72b5aae9faef28469109277.tar.xz
Add EventStreamPosition type (#8388)
The idea is to remove some of the places we pass around `int`, where it can represent one of two things:

1. the position of an event in the stream; or
2. a token that partitions the stream, used as part of the stream tokens.

The valid operations are then:

1. did a position happen before or after a token;
2. get all events that happened before or after a token; and
3. get all events between two tokens.

(Note that we don't want to allow other operations as we want to change the tokens to be vector clocks rather than simple ints)
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py12
1 files changed, 9 insertions, 3 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e82b9e386f..55af3d41ea 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -29,7 +29,7 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
     EventsStreamRow,
 )
-from synapse.types import UserID
+from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.metrics import Measure
 
@@ -151,8 +151,14 @@ class ReplicationDataHandler:
                 extra_users = ()  # type: Tuple[UserID, ...]
                 if event.type == EventTypes.Member:
                     extra_users = (UserID.from_string(event.state_key),)
-                max_token = self.store.get_room_max_stream_ordering()
-                self.notifier.on_new_room_event(event, token, max_token, extra_users)
+
+                max_token = RoomStreamToken(
+                    None, self.store.get_room_max_stream_ordering()
+                )
+                event_pos = PersistedEventPosition(instance_name, token)
+                self.notifier.on_new_room_event(
+                    event, event_pos, max_token, extra_users
+                )
 
         # Notify any waiting deferreds. The list is ordered by position so we
         # just iterate through the list until we reach a position that is