summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-05-24 09:43:35 +0100
committerErik Johnston <erik@matrix.org>2016-05-24 09:50:55 +0100
commit137e6a45577d5850ef6936670791af12c2fe74d9 (patch)
tree6799bde9f289b48e1f154fe7511aead7853a93dd
parentAdd back concurrently_execute (diff)
downloadsynapse-137e6a45577d5850ef6936670791af12c2fe74d9.tar.xz
Shuffle things room
-rw-r--r--synapse/handlers/sync.py70
1 files changed, 33 insertions, 37 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bc6d6af133..6b7c6a436e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -499,6 +499,10 @@ class SyncHandler(object):
 
     @defer.inlineCallbacks
     def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+        # NB: The now_token gets changed by some of the generate_sync_* methods,
+        # this is due to some of the underlying streams not supporting the ability
+        # to query up to a given point.
+        # Always use the `now_token` in `SyncResultBuilder`
         now_token = yield self.event_sources.get_current_token()
 
         sync_result_builer = SyncResultBuilder(
@@ -511,9 +515,10 @@ class SyncHandler(object):
             sync_result_builer
         )
 
-        newly_joined_rooms, newly_joined_users = yield self.generate_sync_entry_for_rooms(
+        res = yield self.generate_sync_entry_for_rooms(
             sync_result_builer, account_data_by_room
         )
+        newly_joined_rooms, newly_joined_users = res
 
         yield self.generate_sync_entry_for_presence(
             sync_result_builer, newly_joined_rooms, newly_joined_users
@@ -631,7 +636,7 @@ class SyncHandler(object):
 
         if sync_result_builer.since_token:
             res = yield self._get_rooms_changed(sync_result_builer, ignored_users)
-            joined, invited, archived, newly_joined_rooms = res
+            room_entries, invited, newly_joined_rooms = res
 
             tags_by_room = yield self.store.get_updated_tags(
                 user_id,
@@ -639,13 +644,12 @@ class SyncHandler(object):
             )
         else:
             res = yield self._get_all_rooms(sync_result_builer, ignored_users)
-            joined, invited, archived, newly_joined_rooms = res
+            room_entries, invited, newly_joined_rooms = res
 
             tags_by_room = yield self.store.get_tags_for_user(user_id)
 
-        def handle_joined(room_entry):
+        def handle_room_entries(room_entry):
             return self._generate_room_entry(
-                "joined",
                 sync_result_builer,
                 ignored_users,
                 room_entry,
@@ -655,21 +659,7 @@ class SyncHandler(object):
                 always_include=sync_result_builer.full_state,
             )
 
-        yield concurrently_execute(handle_joined, joined, 10)
-
-        def handle_archived(room_entry):
-            return self._generate_room_entry(
-                "archived",
-                sync_result_builer,
-                ignored_users,
-                room_entry,
-                ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
-                tags=tags_by_room.get(room_entry.room_id),
-                account_data=account_data_by_room.get(room_entry.room_id, {}),
-                always_include=sync_result_builer.full_state,
-            )
-
-        yield concurrently_execute(handle_archived, archived, 10)
+        yield concurrently_execute(handle_room_entries, room_entries, 10)
 
         sync_result_builer.invited.extend(invited)
 
@@ -711,7 +701,7 @@ class SyncHandler(object):
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
         newly_joined_rooms = []
-        archived = []
+        room_entries = []
         invited = []
         for room_id, events in mem_change_events_by_room_id.items():
             non_joins = [e for e in events if e.membership != Membership.JOIN]
@@ -759,8 +749,9 @@ class SyncHandler(object):
                 if since_token and since_token.is_after(leave_token):
                     continue
 
-                archived.append(RoomSyncResultBuilder(
+                room_entries.append(RoomSyncResultBuilder(
                     room_id=room_id,
+                    rtype="archived",
                     events=None,
                     newly_joined=room_id in newly_joined_rooms,
                     full_state=False,
@@ -778,7 +769,6 @@ class SyncHandler(object):
             limit=timeline_limit + 1,
         )
 
-        joined = []
         # We loop through all room ids, even if there are no new events, in case
         # there are non room events taht we need to notify about.
         for room_id in joined_room_ids:
@@ -789,8 +779,9 @@ class SyncHandler(object):
 
                 prev_batch_token = now_token.copy_and_replace("room_key", start_key)
 
-                joined.append(RoomSyncResultBuilder(
+                room_entries.append(RoomSyncResultBuilder(
                     room_id=room_id,
+                    rtype="joined",
                     events=events,
                     newly_joined=room_id in newly_joined_rooms,
                     full_state=False,
@@ -798,8 +789,9 @@ class SyncHandler(object):
                     upto_token=prev_batch_token,
                 ))
             else:
-                joined.append(RoomSyncResultBuilder(
+                room_entries.append(RoomSyncResultBuilder(
                     room_id=room_id,
+                    rtype="joined",
                     events=[],
                     newly_joined=room_id in newly_joined_rooms,
                     full_state=False,
@@ -807,7 +799,7 @@ class SyncHandler(object):
                     upto_token=since_token,
                 ))
 
-        defer.returnValue((joined, invited, archived, newly_joined_rooms))
+        defer.returnValue((room_entries, invited, newly_joined_rooms))
 
     @defer.inlineCallbacks
     def _get_all_rooms(self, sync_result_builer, ignored_users):
@@ -825,14 +817,14 @@ class SyncHandler(object):
             membership_list=membership_list
         )
 
-        joined = []
+        room_entries = []
         invited = []
-        archived = []
 
         for event in room_list:
             if event.membership == Membership.JOIN:
-                joined.append(RoomSyncResultBuilder(
+                room_entries.append(RoomSyncResultBuilder(
                     room_id=event.room_id,
+                    rtype="joined",
                     events=None,
                     newly_joined=False,
                     full_state=True,
@@ -857,8 +849,9 @@ class SyncHandler(object):
                 leave_token = now_token.copy_and_replace(
                     "room_key", "s%d" % (event.stream_ordering,)
                 )
-                archived.append(RoomSyncResultBuilder(
+                room_entries.append(RoomSyncResultBuilder(
                     room_id=event.room_id,
+                    rtype="archived",
                     events=None,
                     newly_joined=False,
                     full_state=True,
@@ -866,10 +859,10 @@ class SyncHandler(object):
                     upto_token=leave_token,
                 ))
 
-        defer.returnValue((joined, invited, archived, []))
+        defer.returnValue((room_entries, invited, []))
 
     @defer.inlineCallbacks
-    def _generate_room_entry(self, room_type, sync_result_builer, ignored_users,
+    def _generate_room_entry(self, sync_result_builer, ignored_users,
                              room_builder, ephemeral, tags, account_data,
                              always_include=False):
         since_token = sync_result_builer.since_token
@@ -892,7 +885,7 @@ class SyncHandler(object):
             now_token=upto_token,
             since_token=since_token,
             recents=events,
-            newly_joined_room=newly_joined,  # FIXME
+            newly_joined_room=newly_joined,
         )
 
         account_data_events = []
@@ -922,7 +915,7 @@ class SyncHandler(object):
             full_state=full_state
         )
 
-        if room_type == "joined":
+        if room_builder.rtype == "joined":
             unread_notifications = {}
             room_sync = JoinedSyncResult(
                 room_id=room_id,
@@ -943,7 +936,7 @@ class SyncHandler(object):
                     unread_notifications["highlight_count"] = notifs["highlight_count"]
 
                 sync_result_builer.joined.append(room_sync)
-        elif room_type == "archived":
+        elif room_builder.rtype == "archived":
             room_sync = ArchivedSyncResult(
                 room_id=room_id,
                 timeline=batch,
@@ -952,6 +945,8 @@ class SyncHandler(object):
             )
             if room_sync or always_include:
                 sync_result_builer.archived.append(room_sync)
+        else:
+            raise Exception("Unrecognized rtype: %r", room_builder.rtype)
 
 
 def _action_has_highlight(actions):
@@ -1017,9 +1012,10 @@ class SyncResultBuilder(object):
 
 
 class RoomSyncResultBuilder(object):
-    def __init__(self, room_id, events, newly_joined, full_state, since_token,
-                 upto_token):
+    def __init__(self, room_id, rtype, events, newly_joined, full_state,
+                 since_token, upto_token):
         self.room_id = room_id
+        self.rtype = rtype
         self.events = events
         self.newly_joined = newly_joined
         self.full_state = full_state