summary refs log tree commit diff
path: root/synapse/rest/client/v2_alpha/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/rest/client/v2_alpha/sync.py')
-rw-r--r--synapse/rest/client/v2_alpha/sync.py130
1 files changed, 127 insertions, 3 deletions
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 1840eef775..efd8281558 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -20,6 +20,7 @@ from synapse.http.servlet import (
 )
 from synapse.handlers.sync import SyncConfig
 from synapse.types import StreamToken
+from synapse.events import FrozenEvent
 from synapse.events.utils import (
     serialize_event, format_event_for_client_v2_without_event_id,
 )
@@ -81,7 +82,7 @@ class SyncRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request):
-        user, token_id = yield self.auth.get_user_by_req(request)
+        user, token_id, _ = yield self.auth.get_user_by_req(request)
 
         timeout = parse_integer(request, "timeout", default=0)
         since = parse_string(request, "since")
@@ -165,6 +166,20 @@ class SyncRestServlet(RestServlet):
         return {"events": filter.filter_presence(formatted)}
 
     def encode_joined(self, rooms, filter, time_now, token_id):
+        """
+        Encode the joined rooms in a sync result
+
+        :param list[synapse.handlers.sync.JoinedSyncResult] rooms: list of sync
+            results for rooms this user is joined to
+        :param FilterCollection filter: filters to apply to the results
+        :param int time_now: current time - used as a baseline for age
+            calculations
+        :param int token_id: ID of the user's auth token - used for namespacing
+            of transaction IDs
+
+        :return: the joined rooms list, in our response format
+        :rtype: dict[str, dict[str, object]]
+        """
         joined = {}
         for room in rooms:
             joined[room.room_id] = self.encode_room(
@@ -174,6 +189,20 @@ class SyncRestServlet(RestServlet):
         return joined
 
     def encode_invited(self, rooms, filter, time_now, token_id):
+        """
+        Encode the invited rooms in a sync result
+
+        :param list[synapse.handlers.sync.InvitedSyncResult] rooms: list of
+             sync results for rooms this user is joined to
+        :param FilterCollection filter: filters to apply to the results
+        :param int time_now: current time - used as a baseline for age
+            calculations
+        :param int token_id: ID of the user's auth token - used for namespacing
+            of transaction IDs
+
+        :return: the invited rooms list, in our response format
+        :rtype: dict[str, dict[str, object]]
+        """
         invited = {}
         for room in rooms:
             invite = serialize_event(
@@ -189,6 +218,20 @@ class SyncRestServlet(RestServlet):
         return invited
 
     def encode_archived(self, rooms, filter, time_now, token_id):
+        """
+        Encode the archived rooms in a sync result
+
+        :param list[synapse.handlers.sync.ArchivedSyncResult] rooms: list of
+             sync results for rooms this user is joined to
+        :param FilterCollection filter: filters to apply to the results
+        :param int time_now: current time - used as a baseline for age
+            calculations
+        :param int token_id: ID of the user's auth token - used for namespacing
+            of transaction IDs
+
+        :return: the invited rooms list, in our response format
+        :rtype: dict[str, dict[str, object]]
+        """
         joined = {}
         for room in rooms:
             joined[room.room_id] = self.encode_room(
@@ -199,8 +242,28 @@ class SyncRestServlet(RestServlet):
 
     @staticmethod
     def encode_room(room, filter, time_now, token_id, joined=True):
+        """
+        :param JoinedSyncResult|ArchivedSyncResult room: sync result for a
+            single room
+        :param FilterCollection filter: filters to apply to the results
+        :param int time_now: current time - used as a baseline for age
+            calculations
+        :param int token_id: ID of the user's auth token - used for namespacing
+            of transaction IDs
+        :param joined: True if the user is joined to this room - will mean
+            we handle ephemeral events
+
+        :return: the room, encoded in our response format
+        :rtype: dict[str, object]
+        """
         event_map = {}
-        state_events = filter.filter_room_state(room.state)
+        state_dict = room.state
+        timeline_events = filter.filter_room_timeline(room.timeline.events)
+
+        state_dict = SyncRestServlet._rollback_state_for_timeline(
+            state_dict, timeline_events)
+
+        state_events = filter.filter_room_state(state_dict.values())
         state_event_ids = []
         for event in state_events:
             # TODO(mjark): Respect formatting requirements in the filter.
@@ -210,7 +273,6 @@ class SyncRestServlet(RestServlet):
             )
             state_event_ids.append(event.event_id)
 
-        timeline_events = filter.filter_room_timeline(room.timeline.events)
         timeline_event_ids = []
         for event in timeline_events:
             # TODO(mjark): Respect formatting requirements in the filter.
@@ -220,6 +282,10 @@ class SyncRestServlet(RestServlet):
             )
             timeline_event_ids.append(event.event_id)
 
+        private_user_data = filter.filter_room_private_user_data(
+            room.private_user_data
+        )
+
         result = {
             "event_map": event_map,
             "timeline": {
@@ -228,6 +294,7 @@ class SyncRestServlet(RestServlet):
                 "limited": room.timeline.limited,
             },
             "state": {"events": state_event_ids},
+            "private_user_data": {"events": private_user_data},
         }
 
         if joined:
@@ -236,6 +303,63 @@ class SyncRestServlet(RestServlet):
 
         return result
 
+    @staticmethod
+    def _rollback_state_for_timeline(state, timeline):
+        """
+        Wind the state dictionary backwards, so that it represents the
+        state at the start of the timeline, rather than at the end.
+
+        :param dict[(str, str), synapse.events.EventBase] state: the
+            state dictionary. Will be updated to the state before the timeline.
+        :param list[synapse.events.EventBase] timeline: the event timeline
+        :return: updated state dictionary
+        """
+        logger.debug("Processing state dict %r; timeline %r", state,
+                     [e.get_dict() for e in timeline])
+
+        result = state.copy()
+
+        for timeline_event in reversed(timeline):
+            if not timeline_event.is_state():
+                continue
+
+            event_key = (timeline_event.type, timeline_event.state_key)
+
+            logger.debug("Considering %s for removal", event_key)
+
+            state_event = result.get(event_key)
+            if (state_event is None or
+                    state_event.event_id != timeline_event.event_id):
+                # the event in the timeline isn't present in the state
+                # dictionary.
+                #
+                # the most likely cause for this is that there was a fork in
+                # the event graph, and the state is no longer valid. Really,
+                # the event shouldn't be in the timeline. We're going to ignore
+                # it for now, however.
+                logger.warn("Found state event %r in timeline which doesn't "
+                            "match state dictionary", timeline_event)
+                continue
+
+            prev_event_id = timeline_event.unsigned.get("replaces_state", None)
+            logger.debug("Replacing %s with %s in state dict",
+                         timeline_event.event_id, prev_event_id)
+
+            if prev_event_id is None:
+                del result[event_key]
+            else:
+                result[event_key] = FrozenEvent({
+                    "type": timeline_event.type,
+                    "state_key": timeline_event.state_key,
+                    "content": timeline_event.unsigned['prev_content'],
+                    "sender": timeline_event.unsigned['prev_sender'],
+                    "event_id": prev_event_id,
+                    "room_id": timeline_event.room_id,
+                })
+            logger.debug("New value: %r", result.get(event_key))
+
+        return result
+
 
 def register_servlets(hs, http_server):
     SyncRestServlet(hs).register(http_server)