summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2015-09-21 14:38:13 +0100
committerMark Haines <mjark@negativecurvature.net>2015-09-21 14:38:13 +0100
commitee2d722f0f4a216815b7bd83f41c4d59b306f45b (patch)
tree59f5083c2190159f6073dcc8729e916c0d1122dd /synapse
parentMerge pull request #267 from matrix-org/markjh/missing_requirements (diff)
parentClarify that room_initial_sync returns a python dict (diff)
downloadsynapse-ee2d722f0f4a216815b7bd83f41c4d59b306f45b.tar.xz
Merge pull request #276 from matrix-org/markjh/history_for_rooms_that_have_been_left
SPEC-216: Allow users to view the history of rooms that they have left.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py51
-rw-r--r--synapse/api/constants.py11
-rw-r--r--synapse/handlers/message.py206
-rw-r--r--synapse/handlers/room.py37
-rw-r--r--synapse/rest/client/v1/initial_sync.py2
-rw-r--r--synapse/rest/client/v1/room.py20
-rw-r--r--synapse/storage/roommember.py6
-rw-r--r--synapse/storage/stream.py42
8 files changed, 260 insertions, 115 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index df788230fa..847ff60671 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -119,6 +119,20 @@ class Auth(object):
 
     @defer.inlineCallbacks
     def check_joined_room(self, room_id, user_id, current_state=None):
+        """Check if the user is currently joined in the room
+        Args:
+            room_id(str): The room to check.
+            user_id(str): The user to check.
+            current_state(dict): Optional map of the current state of the room.
+                If provided then that map is used to check whether they are a
+                member of the room. Otherwise the current membership is
+                loaded from the database.
+        Raises:
+            AuthError if the user is not in the room.
+        Returns:
+            A deferred membership event for the user if the user is in
+            the room.
+        """
         if current_state:
             member = current_state.get(
                 (EventTypes.Member, user_id),
@@ -135,6 +149,43 @@ class Auth(object):
         defer.returnValue(member)
 
     @defer.inlineCallbacks
+    def check_user_was_in_room(self, room_id, user_id, current_state=None):
+        """Check if the user was in the room at some point.
+        Args:
+            room_id(str): The room to check.
+            user_id(str): The user to check.
+            current_state(dict): Optional map of the current state of the room.
+                If provided then that map is used to check whether they are a
+                member of the room. Otherwise the current membership is
+                loaded from the database.
+        Raises:
+            AuthError if the user was never in the room.
+        Returns:
+            A deferred membership event for the user if the user was in the
+            room. This will be the join event if they are currently joined to
+            the room. This will be the leave event if they have left the room.
+        """
+        if current_state:
+            member = current_state.get(
+                (EventTypes.Member, user_id),
+                None
+            )
+        else:
+            member = yield self.state.get_current_state(
+                room_id=room_id,
+                event_type=EventTypes.Member,
+                state_key=user_id
+            )
+        membership = member.membership if member else None
+
+        if membership not in (Membership.JOIN, Membership.LEAVE):
+            raise AuthError(403, "User %s not in room %s" % (
+                user_id, room_id
+            ))
+
+        defer.returnValue(member)
+
+    @defer.inlineCallbacks
     def check_host_in_room(self, room_id, host):
         curr_state = yield self.state.get_current_state(room_id)
 
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 1423986c1e..3385664394 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -27,16 +27,6 @@ class Membership(object):
     LIST = (INVITE, JOIN, KNOCK, LEAVE, BAN)
 
 
-class Feedback(object):
-
-    """Represents the types of feedback a user can send in response to a
-    message."""
-
-    DELIVERED = u"delivered"
-    READ = u"read"
-    LIST = (DELIVERED, READ)
-
-
 class PresenceState(object):
     """Represents the presence state of a user."""
     OFFLINE = u"offline"
@@ -73,7 +63,6 @@ class EventTypes(object):
     PowerLevels = "m.room.power_levels"
     Aliases = "m.room.aliases"
     Redaction = "m.room.redaction"
-    Feedback = "m.room.message.feedback"
 
     RoomHistoryVisibility = "m.room.history_visibility"
     CanonicalAlias = "m.room.canonical_alias"
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 23b779ad7c..bda8eb5f3f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,13 +16,13 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError, SynapseError
+from synapse.api.errors import SynapseError
 from synapse.streams.config import PaginationConfig
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
 from synapse.util.logcontext import PreserveLoggingContext
-from synapse.types import UserID, RoomStreamToken
+from synapse.types import UserID, RoomStreamToken, StreamToken
 
 from ._base import BaseHandler
 
@@ -71,7 +71,7 @@ class MessageHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_messages(self, user_id=None, room_id=None, pagin_config=None,
-                     feedback=False, as_client_event=True):
+                     as_client_event=True):
         """Get messages in a room.
 
         Args:
@@ -79,26 +79,52 @@ class MessageHandler(BaseHandler):
             room_id (str): The room they want messages from.
             pagin_config (synapse.api.streams.PaginationConfig): The pagination
             config rules to apply, if any.
-            feedback (bool): True to get compressed feedback with the messages
             as_client_event (bool): True to get events in client-server format.
         Returns:
             dict: Pagination API results
         """
-        yield self.auth.check_joined_room(room_id, user_id)
+        member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
 
         data_source = self.hs.get_event_sources().sources["room"]
 
-        if not pagin_config.from_token:
+        if pagin_config.from_token:
+            room_token = pagin_config.from_token.room_key
+        else:
             pagin_config.from_token = (
                 yield self.hs.get_event_sources().get_current_token(
                     direction='b'
                 )
             )
+            room_token = pagin_config.from_token.room_key
 
-        room_token = RoomStreamToken.parse(pagin_config.from_token.room_key)
+        room_token = RoomStreamToken.parse(room_token)
         if room_token.topological is None:
             raise SynapseError(400, "Invalid token")
 
+        pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+            "room_key", str(room_token)
+        )
+
+        source_config = pagin_config.get_source_config("room")
+
+        if member_event.membership == Membership.LEAVE:
+            # If they have left the room then clamp the token to be before
+            # they left the room
+            leave_token = yield self.store.get_topological_token_for_event(
+                member_event.event_id
+            )
+            leave_token = RoomStreamToken.parse(leave_token)
+            if leave_token.topological < room_token.topological:
+                source_config.from_key = str(leave_token)
+
+            if source_config.direction == "f":
+                if source_config.to_key is None:
+                    source_config.to_key = str(leave_token)
+                else:
+                    to_token = RoomStreamToken.parse(source_config.to_key)
+                    if leave_token.topological < to_token.topological:
+                        source_config.to_key = str(leave_token)
+
         yield self.hs.get_handlers().federation_handler.maybe_backfill(
             room_id, room_token.topological
         )
@@ -106,7 +132,7 @@ class MessageHandler(BaseHandler):
         user = UserID.from_string(user_id)
 
         events, next_key = yield data_source.get_pagination_rows(
-            user, pagin_config.get_source_config("room"), room_id
+            user, source_config, room_id
         )
 
         next_token = pagin_config.from_token.copy_and_replace(
@@ -255,29 +281,26 @@ class MessageHandler(BaseHandler):
         Raises:
             SynapseError if something went wrong.
         """
-        have_joined = yield self.auth.check_joined_room(room_id, user_id)
-        if not have_joined:
-            raise RoomError(403, "User not in room.")
+        member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
 
-        data = yield self.state_handler.get_current_state(
-            room_id, event_type, state_key
-        )
-        defer.returnValue(data)
-
-    @defer.inlineCallbacks
-    def get_feedback(self, event_id):
-        # yield self.auth.check_joined_room(room_id, user_id)
-
-        # Pull out the feedback from the db
-        fb = yield self.store.get_feedback(event_id)
+        if member_event.membership == Membership.JOIN:
+            data = yield self.state_handler.get_current_state(
+                room_id, event_type, state_key
+            )
+        elif member_event.membership == Membership.LEAVE:
+            key = (event_type, state_key)
+            room_state = yield self.store.get_state_for_events(
+                room_id, [member_event.event_id], [key]
+            )
+            data = room_state[member_event.event_id].get(key)
 
-        if fb:
-            defer.returnValue(fb)
-        defer.returnValue(None)
+        defer.returnValue(data)
 
     @defer.inlineCallbacks
     def get_state_events(self, user_id, room_id):
-        """Retrieve all state events for a given room.
+        """Retrieve all state events for a given room. If the user is
+        joined to the room then return the current state. If the user has
+        left the room return the state events from when they left.
 
         Args:
             user_id(str): The user requesting state events.
@@ -285,18 +308,23 @@ class MessageHandler(BaseHandler):
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         """
-        yield self.auth.check_joined_room(room_id, user_id)
+        member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+
+        if member_event.membership == Membership.JOIN:
+            room_state = yield self.state_handler.get_current_state(room_id)
+        elif member_event.membership == Membership.LEAVE:
+            room_state = yield self.store.get_state_for_events(
+                room_id, [member_event.event_id], None
+            )
+            room_state = room_state[member_event.event_id]
 
-        # TODO: This is duplicating logic from snapshot_all_rooms
-        current_state = yield self.state_handler.get_current_state(room_id)
         now = self.clock.time_msec()
         defer.returnValue(
-            [serialize_event(c, now) for c in current_state.values()]
+            [serialize_event(c, now) for c in room_state.values()]
         )
 
     @defer.inlineCallbacks
-    def snapshot_all_rooms(self, user_id=None, pagin_config=None,
-                           feedback=False, as_client_event=True):
+    def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True):
         """Retrieve a snapshot of all rooms the user is invited or has joined.
 
         This snapshot may include messages for all rooms where the user is
@@ -306,7 +334,6 @@ class MessageHandler(BaseHandler):
             user_id (str): The ID of the user making the request.
             pagin_config (synapse.api.streams.PaginationConfig): The pagination
             config used to determine how many messages *PER ROOM* to return.
-            feedback (bool): True to get feedback along with these messages.
             as_client_event (bool): True to get events in client-server format.
         Returns:
             A list of dicts with "room_id" and "membership" keys for all rooms
@@ -316,7 +343,9 @@ class MessageHandler(BaseHandler):
         """
         room_list = yield self.store.get_rooms_for_user_where_membership_is(
             user_id=user_id,
-            membership_list=[Membership.INVITE, Membership.JOIN]
+            membership_list=[
+                Membership.INVITE, Membership.JOIN, Membership.LEAVE
+            ]
         )
 
         user = UserID.from_string(user_id)
@@ -358,19 +387,32 @@ class MessageHandler(BaseHandler):
 
             rooms_ret.append(d)
 
-            if event.membership != Membership.JOIN:
+            if event.membership not in (Membership.JOIN, Membership.LEAVE):
                 return
+
             try:
+                if event.membership == Membership.JOIN:
+                    room_end_token = now_token.room_key
+                    deferred_room_state = self.state_handler.get_current_state(
+                        event.room_id
+                    )
+                elif event.membership == Membership.LEAVE:
+                    room_end_token = "s%d" % (event.stream_ordering,)
+                    deferred_room_state = self.store.get_state_for_events(
+                        event.room_id, [event.event_id], None
+                    )
+                    deferred_room_state.addCallback(
+                        lambda states: states[event.event_id]
+                    )
+
                 (messages, token), current_state = yield defer.gatherResults(
                     [
                         self.store.get_recent_events_for_room(
                             event.room_id,
                             limit=limit,
-                            end_token=now_token.room_key,
-                        ),
-                        self.state_handler.get_current_state(
-                            event.room_id
+                            end_token=room_end_token,
                         ),
+                        deferred_room_state,
                     ]
                 ).addErrback(unwrapFirstError)
 
@@ -417,15 +459,85 @@ class MessageHandler(BaseHandler):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def room_initial_sync(self, user_id, room_id, pagin_config=None,
-                          feedback=False):
-        current_state = yield self.state.get_current_state(
-            room_id=room_id,
+    def room_initial_sync(self, user_id, room_id, pagin_config=None):
+        """Capture the a snapshot of a room. If user is currently a member of
+        the room this will be what is currently in the room. If the user left
+        the room this will be what was in the room when they left.
+
+        Args:
+            user_id(str): The user to get a snapshot for.
+            room_id(str): The room to get a snapshot of.
+            pagin_config(synapse.streams.config.PaginationConfig):
+                The pagination config used to determine how many messages to
+                return.
+        Raises:
+            AuthError if the user wasn't in the room.
+        Returns:
+            A JSON serialisable dict with the snapshot of the room.
+        """
+
+        member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
+
+        if member_event.membership == Membership.JOIN:
+            result = yield self._room_initial_sync_joined(
+                user_id, room_id, pagin_config, member_event
+            )
+        elif member_event.membership == Membership.LEAVE:
+            result = yield self._room_initial_sync_parted(
+                user_id, room_id, pagin_config, member_event
+            )
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
+                                  member_event):
+        room_state = yield self.store.get_state_for_events(
+            member_event.room_id, [member_event.event_id], None
+        )
+
+        room_state = room_state[member_event.event_id]
+
+        limit = pagin_config.limit if pagin_config else None
+        if limit is None:
+            limit = 10
+
+        stream_token = yield self.store.get_stream_token_for_event(
+            member_event.event_id
+        )
+
+        messages, token = yield self.store.get_recent_events_for_room(
+            room_id,
+            limit=limit,
+            end_token=stream_token
+        )
+
+        messages = yield self._filter_events_for_client(
+            user_id, room_id, messages
         )
 
-        yield self.auth.check_joined_room(
-            room_id, user_id,
-            current_state=current_state
+        start_token = StreamToken(token[0], 0, 0, 0)
+        end_token = StreamToken(token[1], 0, 0, 0)
+
+        time_now = self.clock.time_msec()
+
+        defer.returnValue({
+            "membership": member_event.membership,
+            "room_id": room_id,
+            "messages": {
+                "chunk": [serialize_event(m, time_now) for m in messages],
+                "start": start_token.to_string(),
+                "end": end_token.to_string(),
+            },
+            "state": [serialize_event(s, time_now) for s in room_state.values()],
+            "presence": [],
+            "receipts": [],
+        })
+
+    @defer.inlineCallbacks
+    def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
+                                  member_event):
+        current_state = yield self.state.get_current_state(
+            room_id=room_id,
         )
 
         # TODO(paul): I wish I was called with user objects not user_id
@@ -439,8 +551,6 @@ class MessageHandler(BaseHandler):
             for x in current_state.values()
         ]
 
-        member_event = current_state.get((EventTypes.Member, user_id,))
-
         now_token = yield self.hs.get_event_sources().get_current_token()
 
         limit = pagin_config.limit if pagin_config else None
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c5d1001b50..243623190f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,7 +25,6 @@ from synapse.api.constants import (
 from synapse.api.errors import StoreError, SynapseError
 from synapse.util import stringutils, unwrapFirstError
 from synapse.util.async import run_on_reactor
-from synapse.events.utils import serialize_event
 
 from collections import OrderedDict
 import logging
@@ -343,41 +342,6 @@ class RoomMemberHandler(BaseHandler):
                     remotedomains.add(member.domain)
 
     @defer.inlineCallbacks
-    def get_room_members_as_pagination_chunk(self, room_id=None, user_id=None,
-                                             limit=0, start_tok=None,
-                                             end_tok=None):
-        """Retrieve a list of room members in the room.
-
-        Args:
-            room_id (str): The room to get the member list for.
-            user_id (str): The ID of the user making the request.
-            limit (int): The max number of members to return.
-            start_tok (str): Optional. The start token if known.
-            end_tok (str): Optional. The end token if known.
-        Returns:
-            dict: A Pagination streamable dict.
-        Raises:
-            SynapseError if something goes wrong.
-        """
-        yield self.auth.check_joined_room(room_id, user_id)
-
-        member_list = yield self.store.get_room_members(room_id=room_id)
-        time_now = self.clock.time_msec()
-        event_list = [
-            serialize_event(entry, time_now)
-            for entry in member_list
-        ]
-        chunk_data = {
-            "start": "START",  # FIXME (erikj): START is no longer valid
-            "end": "END",
-            "chunk": event_list
-        }
-        # TODO honor Pagination stream params
-        # TODO snapshot this list to return on subsequent requests when
-        # paginating
-        defer.returnValue(chunk_data)
-
-    @defer.inlineCallbacks
     def change_membership(self, event, context, do_auth=True):
         """ Change the membership status of a user in a room.
 
@@ -646,7 +610,6 @@ class RoomEventSource(object):
             to_key=config.to_key,
             direction=config.direction,
             limit=config.limit,
-            with_feedback=True
         )
 
         defer.returnValue((events, next_key))
diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py
index 4ea4da653c..bac68cc29f 100644
--- a/synapse/rest/client/v1/initial_sync.py
+++ b/synapse/rest/client/v1/initial_sync.py
@@ -26,14 +26,12 @@ class InitialSyncRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_GET(self, request):
         user, _ = yield self.auth.get_user_by_req(request)
-        with_feedback = "feedback" in request.args
         as_client_event = "raw" not in request.args
         pagination_config = PaginationConfig.from_request(request)
         handler = self.handlers.message_handler
         content = yield handler.snapshot_all_rooms(
             user_id=user.to_string(),
             pagin_config=pagination_config,
-            feedback=with_feedback,
             as_client_event=as_client_event
         )
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index c9c27dd5a0..23871f161e 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -290,12 +290,18 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
     def on_GET(self, request, room_id):
         # TODO support Pagination stream API (limit/tokens)
         user, _ = yield self.auth.get_user_by_req(request)
-        handler = self.handlers.room_member_handler
-        members = yield handler.get_room_members_as_pagination_chunk(
+        handler = self.handlers.message_handler
+        events = yield handler.get_state_events(
             room_id=room_id,
-            user_id=user.to_string())
+            user_id=user.to_string(),
+        )
+
+        chunk = []
 
-        for event in members["chunk"]:
+        for event in events:
+            if event["type"] != EventTypes.Member:
+                continue
+            chunk.append(event)
             # FIXME: should probably be state_key here, not user_id
             target_user = UserID.from_string(event["user_id"])
             # Presence is an optional cache; don't fail if we can't fetch it
@@ -308,7 +314,9 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
             except:
                 pass
 
-        defer.returnValue((200, members))
+        defer.returnValue((200, {
+            "chunk": chunk
+        }))
 
 
 # TODO: Needs unit testing
@@ -321,14 +329,12 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
         pagination_config = PaginationConfig.from_request(
             request, default_limit=10,
         )
-        with_feedback = "feedback" in request.args
         as_client_event = "raw" not in request.args
         handler = self.handlers.message_handler
         msgs = yield handler.get_messages(
             room_id=room_id,
             user_id=user.to_string(),
             pagin_config=pagination_config,
-            feedback=with_feedback,
             as_client_event=as_client_event
         )
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8eee2dfbcc..cd9eefbd9f 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
 
 RoomsForUser = namedtuple(
     "RoomsForUser",
-    ("room_id", "sender", "membership")
+    ("room_id", "sender", "membership", "event_id", "stream_ordering")
 )
 
 
@@ -141,9 +141,11 @@ class RoomMemberStore(SQLBaseStore):
         args.extend(membership_list)
 
         sql = (
-            "SELECT m.room_id, m.sender, m.membership"
+            "SELECT m.room_id, m.sender, m.membership, m.event_id, e.stream_ordering"
             " FROM room_memberships as m"
             " INNER JOIN current_state_events as c"
+            " ON e.event_id = c.event_id "
+            " INNER JOIN events as e "
             " ON m.event_id = c.event_id "
             " AND m.room_id = c.room_id "
             " AND m.user_id = c.state_key"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d7fe423f5a..3cab06fdef 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -159,9 +159,7 @@ class StreamStore(SQLBaseStore):
 
     @log_function
     def get_room_events_stream(self, user_id, from_key, to_key, room_id,
-                               limit=0, with_feedback=False):
-        # TODO (erikj): Handle compressed feedback
-
+                               limit=0):
         current_room_membership_sql = (
             "SELECT m.room_id FROM room_memberships as m "
             " INNER JOIN current_state_events as c"
@@ -227,10 +225,7 @@ class StreamStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def paginate_room_events(self, room_id, from_key, to_key=None,
-                             direction='b', limit=-1,
-                             with_feedback=False):
-        # TODO (erikj): Handle compressed feedback
-
+                             direction='b', limit=-1):
         # Tokens really represent positions between elements, but we use
         # the convention of pointing to the event before the gap. Hence
         # we have a bit of asymmetry when it comes to equalities.
@@ -302,7 +297,6 @@ class StreamStore(SQLBaseStore):
 
     @cachedInlineCallbacks(num_args=4)
     def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
-        # TODO (erikj): Handle compressed feedback
 
         end_token = RoomStreamToken.parse_stream_token(end_token)
 
@@ -379,6 +373,38 @@ class StreamStore(SQLBaseStore):
             )
             defer.returnValue("t%d-%d" % (topo, token))
 
+    def get_stream_token_for_event(self, event_id):
+        """The stream token for an event
+        Args:
+            event_id(str): The id of the event to look up a stream token for.
+        Raises:
+            StoreError if the event wasn't in the database.
+        Returns:
+            A deferred "s%d" stream token.
+        """
+        return self._simple_select_one_onecol(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcol="stream_ordering",
+        ).addCallback(lambda row: "s%d" % (row,))
+
+    def get_topological_token_for_event(self, event_id):
+        """The stream token for an event
+        Args:
+            event_id(str): The id of the event to look up a stream token for.
+        Raises:
+            StoreError if the event wasn't in the database.
+        Returns:
+            A deferred "t%d-%d" topological token.
+        """
+        return self._simple_select_one(
+            table="events",
+            keyvalues={"event_id": event_id},
+            retcols=("stream_ordering", "topological_ordering"),
+        ).addCallback(lambda row: "t%d-%d" % (
+            row["topological_ordering"], row["stream_ordering"],)
+        )
+
     def _get_max_topological_txn(self, txn):
         txn.execute(
             "SELECT MAX(topological_ordering) FROM events"