summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-24 10:00:13 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-24 10:00:13 +0100
commit5097aee740b542407e5bb13d19a3e3e6c2227316 (patch)
tree09a03650256e09cd0b5df59dbf2d7bb2ba14df6c /synapse/handlers/sync.py
parentchangelog (diff)
parentImprove help and cmdline option names for --generate-config options (#5512) (diff)
downloadsynapse-5097aee740b542407e5bb13d19a3e3e6c2227316.tar.xz
Merge branch 'develop' into rav/cleanup_metrics
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py694
1 files changed, 351 insertions, 343 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 62fda0c664..c5188a1f8e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -64,20 +64,14 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
 LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
 
 
-SyncConfig = collections.namedtuple("SyncConfig", [
-    "user",
-    "filter_collection",
-    "is_guest",
-    "request_key",
-    "device_id",
-])
-
-
-class TimelineBatch(collections.namedtuple("TimelineBatch", [
-    "prev_batch",
-    "events",
-    "limited",
-])):
+SyncConfig = collections.namedtuple(
+    "SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"]
+)
+
+
+class TimelineBatch(
+    collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"])
+):
     __slots__ = []
 
     def __nonzero__(self):
@@ -85,18 +79,24 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
         to tell if room needs to be part of the sync result.
         """
         return bool(self.events)
+
     __bool__ = __nonzero__  # python3
 
 
-class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
-    "room_id",           # str
-    "timeline",          # TimelineBatch
-    "state",             # dict[(str, str), FrozenEvent]
-    "ephemeral",
-    "account_data",
-    "unread_notifications",
-    "summary",
-])):
+class JoinedSyncResult(
+    collections.namedtuple(
+        "JoinedSyncResult",
+        [
+            "room_id",  # str
+            "timeline",  # TimelineBatch
+            "state",  # dict[(str, str), FrozenEvent]
+            "ephemeral",
+            "account_data",
+            "unread_notifications",
+            "summary",
+        ],
+    )
+):
     __slots__ = []
 
     def __nonzero__(self):
@@ -111,77 +111,93 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
             # nb the notification count does not, er, count: if there's nothing
             # else in the result, we don't need to send it.
         )
+
     __bool__ = __nonzero__  # python3
 
 
-class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
-    "room_id",            # str
-    "timeline",           # TimelineBatch
-    "state",              # dict[(str, str), FrozenEvent]
-    "account_data",
-])):
+class ArchivedSyncResult(
+    collections.namedtuple(
+        "ArchivedSyncResult",
+        [
+            "room_id",  # str
+            "timeline",  # TimelineBatch
+            "state",  # dict[(str, str), FrozenEvent]
+            "account_data",
+        ],
+    )
+):
     __slots__ = []
 
     def __nonzero__(self):
         """Make the result appear empty if there are no updates. This is used
         to tell if room needs to be part of the sync result.
         """
-        return bool(
-            self.timeline
-            or self.state
-            or self.account_data
-        )
+        return bool(self.timeline or self.state or self.account_data)
+
     __bool__ = __nonzero__  # python3
 
 
-class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
-    "room_id",   # str
-    "invite",    # FrozenEvent: the invite event
-])):
+class InvitedSyncResult(
+    collections.namedtuple(
+        "InvitedSyncResult",
+        ["room_id", "invite"],  # str  # FrozenEvent: the invite event
+    )
+):
     __slots__ = []
 
     def __nonzero__(self):
         """Invited rooms should always be reported to the client"""
         return True
+
     __bool__ = __nonzero__  # python3
 
 
-class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
-    "join",
-    "invite",
-    "leave",
-])):
+class GroupsSyncResult(
+    collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"])
+):
     __slots__ = []
 
     def __nonzero__(self):
         return bool(self.join or self.invite or self.leave)
+
     __bool__ = __nonzero__  # python3
 
 
-class DeviceLists(collections.namedtuple("DeviceLists", [
-    "changed",   # list of user_ids whose devices may have changed
-    "left",      # list of user_ids whose devices we no longer track
-])):
+class DeviceLists(
+    collections.namedtuple(
+        "DeviceLists",
+        [
+            "changed",  # list of user_ids whose devices may have changed
+            "left",  # list of user_ids whose devices we no longer track
+        ],
+    )
+):
     __slots__ = []
 
     def __nonzero__(self):
         return bool(self.changed or self.left)
+
     __bool__ = __nonzero__  # python3
 
 
-class SyncResult(collections.namedtuple("SyncResult", [
-    "next_batch",  # Token for the next sync
-    "presence",  # List of presence events for the user.
-    "account_data",  # List of account_data events for the user.
-    "joined",  # JoinedSyncResult for each joined room.
-    "invited",  # InvitedSyncResult for each invited room.
-    "archived",  # ArchivedSyncResult for each archived room.
-    "to_device",  # List of direct messages for the device.
-    "device_lists",  # List of user_ids whose devices have changed
-    "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
-                                   # for this device
-    "groups",
-])):
+class SyncResult(
+    collections.namedtuple(
+        "SyncResult",
+        [
+            "next_batch",  # Token for the next sync
+            "presence",  # List of presence events for the user.
+            "account_data",  # List of account_data events for the user.
+            "joined",  # JoinedSyncResult for each joined room.
+            "invited",  # InvitedSyncResult for each invited room.
+            "archived",  # ArchivedSyncResult for each archived room.
+            "to_device",  # List of direct messages for the device.
+            "device_lists",  # List of user_ids whose devices have changed
+            "device_one_time_keys_count",  # Dict of algorithm to count for one time keys
+            # for this device
+            "groups",
+        ],
+    )
+):
     __slots__ = []
 
     def __nonzero__(self):
@@ -190,20 +206,20 @@ class SyncResult(collections.namedtuple("SyncResult", [
         events.
         """
         return bool(
-            self.presence or
-            self.joined or
-            self.invited or
-            self.archived or
-            self.account_data or
-            self.to_device or
-            self.device_lists or
-            self.groups
+            self.presence
+            or self.joined
+            or self.invited
+            or self.archived
+            or self.account_data
+            or self.to_device
+            or self.device_lists
+            or self.groups
         )
+
     __bool__ = __nonzero__  # python3
 
 
 class SyncHandler(object):
-
     def __init__(self, hs):
         self.hs_config = hs.config
         self.store = hs.get_datastore()
@@ -217,13 +233,16 @@ class SyncHandler(object):
 
         # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
         self.lazy_loaded_members_cache = ExpiringCache(
-            "lazy_loaded_members_cache", self.clock,
-            max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+            "lazy_loaded_members_cache",
+            self.clock,
+            max_len=0,
+            expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
         )
 
     @defer.inlineCallbacks
-    def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
-                               full_state=False):
+    def wait_for_sync_for_user(
+        self, sync_config, since_token=None, timeout=0, full_state=False
+    ):
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
@@ -239,13 +258,15 @@ class SyncHandler(object):
         res = yield self.response_cache.wrap(
             sync_config.request_key,
             self._wait_for_sync_for_user,
-            sync_config, since_token, timeout, full_state,
+            sync_config,
+            since_token,
+            timeout,
+            full_state,
         )
         defer.returnValue(res)
 
     @defer.inlineCallbacks
-    def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
-                                full_state):
+    def _wait_for_sync_for_user(self, sync_config, since_token, timeout, full_state):
         if since_token is None:
             sync_type = "initial_sync"
         elif full_state:
@@ -261,14 +282,17 @@ class SyncHandler(object):
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
             result = yield self.current_sync_for_user(
-                sync_config, since_token, full_state=full_state,
+                sync_config, since_token, full_state=full_state
             )
         else:
+
             def current_sync_callback(before_token, after_token):
                 return self.current_sync_for_user(sync_config, since_token)
 
             result = yield self.notifier.wait_for_events(
-                sync_config.user.to_string(), timeout, current_sync_callback,
+                sync_config.user.to_string(),
+                timeout,
+                current_sync_callback,
                 from_token=since_token,
             )
 
@@ -281,8 +305,7 @@ class SyncHandler(object):
 
         defer.returnValue(result)
 
-    def current_sync_for_user(self, sync_config, since_token=None,
-                              full_state=False):
+    def current_sync_for_user(self, sync_config, since_token=None, full_state=False):
         """Get the sync for client needed to match what the server has now.
         Returns:
             A Deferred SyncResult.
@@ -334,8 +357,7 @@ class SyncHandler(object):
                 # result returned by the event source is poor form (it might cache
                 # the object)
                 room_id = event["room_id"]
-                event_copy = {k: v for (k, v) in iteritems(event)
-                              if k != "room_id"}
+                event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
             receipt_key = since_token.receipt_key if since_token else "0"
@@ -353,22 +375,30 @@ class SyncHandler(object):
             for event in receipts:
                 room_id = event["room_id"]
                 # exclude room id, as above
-                event_copy = {k: v for (k, v) in iteritems(event)
-                              if k != "room_id"}
+                event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
         defer.returnValue((now_token, ephemeral_by_room))
 
     @defer.inlineCallbacks
-    def _load_filtered_recents(self, room_id, sync_config, now_token,
-                               since_token=None, recents=None, newly_joined_room=False):
+    def _load_filtered_recents(
+        self,
+        room_id,
+        sync_config,
+        now_token,
+        since_token=None,
+        recents=None,
+        newly_joined_room=False,
+    ):
         """
         Returns:
             a Deferred TimelineBatch
         """
         with Measure(self.clock, "load_filtered_recents"):
             timeline_limit = sync_config.filter_collection.timeline_limit()
-            block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline()
+            block_all_timeline = (
+                sync_config.filter_collection.blocks_all_room_timeline()
+            )
 
             if recents is None or newly_joined_room or timeline_limit < len(recents):
                 limited = True
@@ -396,11 +426,9 @@ class SyncHandler(object):
                 recents = []
 
             if not limited or block_all_timeline:
-                defer.returnValue(TimelineBatch(
-                    events=recents,
-                    prev_batch=now_token,
-                    limited=False
-                ))
+                defer.returnValue(
+                    TimelineBatch(events=recents, prev_batch=now_token, limited=False)
+                )
 
             filtering_factor = 2
             load_limit = max(timeline_limit * filtering_factor, 10)
@@ -427,9 +455,7 @@ class SyncHandler(object):
                     )
                 else:
                     events, end_key = yield self.store.get_recent_events_for_room(
-                        room_id,
-                        limit=load_limit + 1,
-                        end_token=end_key,
+                        room_id, limit=load_limit + 1, end_token=end_key
                     )
                 loaded_recents = sync_config.filter_collection.filter_room_timeline(
                     events
@@ -462,15 +488,15 @@ class SyncHandler(object):
                 recents = recents[-timeline_limit:]
                 room_key = recents[0].internal_metadata.before
 
-            prev_batch_token = now_token.copy_and_replace(
-                "room_key", room_key
-            )
+            prev_batch_token = now_token.copy_and_replace("room_key", room_key)
 
-        defer.returnValue(TimelineBatch(
-            events=recents,
-            prev_batch=prev_batch_token,
-            limited=limited or newly_joined_room
-        ))
+        defer.returnValue(
+            TimelineBatch(
+                events=recents,
+                prev_batch=prev_batch_token,
+                limited=limited or newly_joined_room,
+            )
+        )
 
     @defer.inlineCallbacks
     def get_state_after_event(self, event, state_filter=StateFilter.all()):
@@ -486,7 +512,7 @@ class SyncHandler(object):
             A Deferred map from ((type, state_key)->Event)
         """
         state_ids = yield self.store.get_state_ids_for_event(
-            event.event_id, state_filter=state_filter,
+            event.event_id, state_filter=state_filter
         )
         if event.is_state():
             state_ids = state_ids.copy()
@@ -511,13 +537,13 @@ class SyncHandler(object):
         # does not reliably give you the state at the given stream position.
         # (https://github.com/matrix-org/synapse/issues/3305)
         last_events, _ = yield self.store.get_recent_events_for_room(
-            room_id, end_token=stream_position.room_key, limit=1,
+            room_id, end_token=stream_position.room_key, limit=1
         )
 
         if last_events:
             last_event = last_events[-1]
             state = yield self.get_state_after_event(
-                last_event, state_filter=state_filter,
+                last_event, state_filter=state_filter
             )
 
         else:
@@ -549,7 +575,7 @@ class SyncHandler(object):
 
         # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
         last_events, _ = yield self.store.get_recent_event_ids_for_room(
-            room_id, end_token=now_token.room_key, limit=1,
+            room_id, end_token=now_token.room_key, limit=1
         )
 
         if not last_events:
@@ -559,28 +585,25 @@ class SyncHandler(object):
         last_event = last_events[-1]
         state_ids = yield self.store.get_state_ids_for_event(
             last_event.event_id,
-            state_filter=StateFilter.from_types([
-                (EventTypes.Name, ''),
-                (EventTypes.CanonicalAlias, ''),
-            ]),
+            state_filter=StateFilter.from_types(
+                [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
+            ),
         )
 
         # this is heavily cached, thus: fast.
         details = yield self.store.get_room_summary(room_id)
 
-        name_id = state_ids.get((EventTypes.Name, ''))
-        canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
+        name_id = state_ids.get((EventTypes.Name, ""))
+        canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
 
         summary = {}
         empty_ms = MemberSummary([], 0)
 
         # TODO: only send these when they change.
-        summary["m.joined_member_count"] = (
-            details.get(Membership.JOIN, empty_ms).count
-        )
-        summary["m.invited_member_count"] = (
-            details.get(Membership.INVITE, empty_ms).count
-        )
+        summary["m.joined_member_count"] = details.get(Membership.JOIN, empty_ms).count
+        summary["m.invited_member_count"] = details.get(
+            Membership.INVITE, empty_ms
+        ).count
 
         # if the room has a name or canonical_alias set, we can skip
         # calculating heroes. Empty strings are falsey, so we check
@@ -592,7 +615,7 @@ class SyncHandler(object):
 
         if canonical_alias_id:
             canonical_alias = yield self.store.get_event(
-                canonical_alias_id, allow_none=True,
+                canonical_alias_id, allow_none=True
             )
             if canonical_alias and canonical_alias.content.get("alias"):
                 defer.returnValue(summary)
@@ -600,26 +623,14 @@ class SyncHandler(object):
         me = sync_config.user.to_string()
 
         joined_user_ids = [
-            r[0]
-            for r in details.get(Membership.JOIN, empty_ms).members
-            if r[0] != me
+            r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me
         ]
         invited_user_ids = [
-            r[0]
-            for r in details.get(Membership.INVITE, empty_ms).members
-            if r[0] != me
+            r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me
         ]
-        gone_user_ids = (
-            [
-                r[0]
-                for r in details.get(Membership.LEAVE, empty_ms).members
-                if r[0] != me
-            ] + [
-                r[0]
-                for r in details.get(Membership.BAN, empty_ms).members
-                if r[0] != me
-            ]
-        )
+        gone_user_ids = [
+            r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
+        ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
 
         # FIXME: only build up a member_ids list for our heroes
         member_ids = {}
@@ -627,20 +638,18 @@ class SyncHandler(object):
             Membership.JOIN,
             Membership.INVITE,
             Membership.LEAVE,
-            Membership.BAN
+            Membership.BAN,
         ):
             for user_id, event_id in details.get(membership, empty_ms).members:
                 member_ids[user_id] = event_id
 
         # FIXME: order by stream ordering rather than as returned by SQL
-        if (joined_user_ids or invited_user_ids):
-            summary['m.heroes'] = sorted(
+        if joined_user_ids or invited_user_ids:
+            summary["m.heroes"] = sorted(
                 [user_id for user_id in (joined_user_ids + invited_user_ids)]
             )[0:5]
         else:
-            summary['m.heroes'] = sorted(
-                [user_id for user_id in gone_user_ids]
-            )[0:5]
+            summary["m.heroes"] = sorted([user_id for user_id in gone_user_ids])[0:5]
 
         if not sync_config.filter_collection.lazy_load_members():
             defer.returnValue(summary)
@@ -652,8 +661,7 @@ class SyncHandler(object):
         # track which members the client should already know about via LL:
         # Ones which are already in state...
         existing_members = set(
-            user_id for (typ, user_id) in state.keys()
-            if typ == EventTypes.Member
+            user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member
         )
 
         # ...or ones which are in the timeline...
@@ -664,10 +672,10 @@ class SyncHandler(object):
         # ...and then ensure any missing ones get included in state.
         missing_hero_event_ids = [
             member_ids[hero_id]
-            for hero_id in summary['m.heroes']
+            for hero_id in summary["m.heroes"]
             if (
-                cache.get(hero_id) != member_ids[hero_id] and
-                hero_id not in existing_members
+                cache.get(hero_id) != member_ids[hero_id]
+                and hero_id not in existing_members
             )
         ]
 
@@ -691,8 +699,9 @@ class SyncHandler(object):
         return cache
 
     @defer.inlineCallbacks
-    def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
-                            full_state):
+    def compute_state_delta(
+        self, room_id, batch, sync_config, since_token, now_token, full_state
+    ):
         """ Works out the difference in state between the start of the timeline
         and the previous sync.
 
@@ -745,23 +754,23 @@ class SyncHandler(object):
 
             timeline_state = {
                 (event.type, event.state_key): event.event_id
-                for event in batch.events if event.is_state()
+                for event in batch.events
+                if event.is_state()
             }
 
             if full_state:
                 if batch:
                     current_state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[-1].event_id, state_filter=state_filter,
+                        batch.events[-1].event_id, state_filter=state_filter
                     )
 
                     state_ids = yield self.store.get_state_ids_for_event(
-                        batch.events[0].event_id, state_filter=state_filter,
+                        batch.events[0].event_id, state_filter=state_filter
                     )
 
                 else:
                     current_state_ids = yield self.get_state_at(
-                        room_id, stream_position=now_token,
-                        state_filter=state_filter,
+                        room_id, stream_position=now_token, state_filter=state_filter
                     )
 
                     state_ids = current_state_ids
@@ -775,7 +784,7 @@ class SyncHandler(object):
                 )
             elif batch.limited:
                 state_at_timeline_start = yield self.store.get_state_ids_for_event(
-                    batch.events[0].event_id, state_filter=state_filter,
+                    batch.events[0].event_id, state_filter=state_filter
                 )
 
                 # for now, we disable LL for gappy syncs - see
@@ -793,12 +802,11 @@ class SyncHandler(object):
                 state_filter = StateFilter.all()
 
                 state_at_previous_sync = yield self.get_state_at(
-                    room_id, stream_position=since_token,
-                    state_filter=state_filter,
+                    room_id, stream_position=since_token, state_filter=state_filter
                 )
 
                 current_state_ids = yield self.store.get_state_ids_for_event(
-                    batch.events[-1].event_id, state_filter=state_filter,
+                    batch.events[-1].event_id, state_filter=state_filter
                 )
 
                 state_ids = _calculate_state(
@@ -854,8 +862,7 @@ class SyncHandler(object):
 
                 # add any member IDs we are about to send into our LruCache
                 for t, event_id in itertools.chain(
-                    state_ids.items(),
-                    timeline_state.items(),
+                    state_ids.items(), timeline_state.items()
                 ):
                     if t[0] == EventTypes.Member:
                         cache.set(t[1], event_id)
@@ -864,10 +871,14 @@ class SyncHandler(object):
         if state_ids:
             state = yield self.store.get_events(list(state_ids.values()))
 
-        defer.returnValue({
-            (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(list(state.values()))
-        })
+        defer.returnValue(
+            {
+                (e.type, e.state_key): e
+                for e in sync_config.filter_collection.filter_room_state(
+                    list(state.values())
+                )
+            }
+        )
 
     @defer.inlineCallbacks
     def unread_notifs_for_room_id(self, room_id, sync_config):
@@ -875,7 +886,7 @@ class SyncHandler(object):
             last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
                 user_id=sync_config.user.to_string(),
                 room_id=room_id,
-                receipt_type="m.read"
+                receipt_type="m.read",
             )
 
             notifs = []
@@ -909,7 +920,9 @@ class SyncHandler(object):
 
         logger.info(
             "Calculating sync response for %r between %s and %s",
-            sync_config.user, since_token, now_token,
+            sync_config.user,
+            since_token,
+            now_token,
         )
 
         user_id = sync_config.user.to_string()
@@ -920,11 +933,12 @@ class SyncHandler(object):
             raise NotImplementedError()
         else:
             joined_room_ids = yield self.get_rooms_for_user_at(
-                user_id, now_token.room_stream_id,
+                user_id, now_token.room_stream_id
             )
 
         sync_result_builder = SyncResultBuilder(
-            sync_config, full_state,
+            sync_config,
+            full_state,
             since_token=since_token,
             now_token=now_token,
             joined_room_ids=joined_room_ids,
@@ -941,8 +955,7 @@ class SyncHandler(object):
         _, _, newly_left_rooms, newly_left_users = res
 
         block_all_presence_data = (
-            since_token is None and
-            sync_config.filter_collection.blocks_all_presence()
+            since_token is None and sync_config.filter_collection.blocks_all_presence()
         )
         if self.hs_config.use_presence and not block_all_presence_data:
             yield self._generate_sync_entry_for_presence(
@@ -973,22 +986,23 @@ class SyncHandler(object):
             room_id = joined_room.room_id
             if room_id in newly_joined_rooms:
                 issue4422_logger.debug(
-                    "Sync result for newly joined room %s: %r",
-                    room_id, joined_room,
+                    "Sync result for newly joined room %s: %r", room_id, joined_room
                 )
 
-        defer.returnValue(SyncResult(
-            presence=sync_result_builder.presence,
-            account_data=sync_result_builder.account_data,
-            joined=sync_result_builder.joined,
-            invited=sync_result_builder.invited,
-            archived=sync_result_builder.archived,
-            to_device=sync_result_builder.to_device,
-            device_lists=device_lists,
-            groups=sync_result_builder.groups,
-            device_one_time_keys_count=one_time_key_counts,
-            next_batch=sync_result_builder.now_token,
-        ))
+        defer.returnValue(
+            SyncResult(
+                presence=sync_result_builder.presence,
+                account_data=sync_result_builder.account_data,
+                joined=sync_result_builder.joined,
+                invited=sync_result_builder.invited,
+                archived=sync_result_builder.archived,
+                to_device=sync_result_builder.to_device,
+                device_lists=device_lists,
+                groups=sync_result_builder.groups,
+                device_one_time_keys_count=one_time_key_counts,
+                next_batch=sync_result_builder.now_token,
+            )
+        )
 
     @measure_func("_generate_sync_entry_for_groups")
     @defer.inlineCallbacks
@@ -999,11 +1013,11 @@ class SyncHandler(object):
 
         if since_token and since_token.groups_key:
             results = yield self.store.get_groups_changes_for_user(
-                user_id, since_token.groups_key, now_token.groups_key,
+                user_id, since_token.groups_key, now_token.groups_key
             )
         else:
             results = yield self.store.get_all_groups_for_user(
-                user_id, now_token.groups_key,
+                user_id, now_token.groups_key
             )
 
         invited = {}
@@ -1031,17 +1045,19 @@ class SyncHandler(object):
                     left[group_id] = content["content"]
 
         sync_result_builder.groups = GroupsSyncResult(
-            join=joined,
-            invite=invited,
-            leave=left,
+            join=joined, invite=invited, leave=left
         )
 
     @measure_func("_generate_sync_entry_for_device_list")
     @defer.inlineCallbacks
-    def _generate_sync_entry_for_device_list(self, sync_result_builder,
-                                             newly_joined_rooms,
-                                             newly_joined_or_invited_users,
-                                             newly_left_rooms, newly_left_users):
+    def _generate_sync_entry_for_device_list(
+        self,
+        sync_result_builder,
+        newly_joined_rooms,
+        newly_joined_or_invited_users,
+        newly_left_rooms,
+        newly_left_users,
+    ):
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
@@ -1065,24 +1081,20 @@ class SyncHandler(object):
             changed.update(newly_joined_or_invited_users)
 
             if not changed and not newly_left_users:
-                defer.returnValue(DeviceLists(
-                    changed=[],
-                    left=newly_left_users,
-                ))
+                defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
 
             users_who_share_room = yield self.store.get_users_who_share_room_with_user(
                 user_id
             )
 
-            defer.returnValue(DeviceLists(
-                changed=users_who_share_room & changed,
-                left=set(newly_left_users) - users_who_share_room,
-            ))
+            defer.returnValue(
+                DeviceLists(
+                    changed=users_who_share_room & changed,
+                    left=set(newly_left_users) - users_who_share_room,
+                )
+            )
         else:
-            defer.returnValue(DeviceLists(
-                changed=[],
-                left=[],
-            ))
+            defer.returnValue(DeviceLists(changed=[], left=[]))
 
     @defer.inlineCallbacks
     def _generate_sync_entry_for_to_device(self, sync_result_builder):
@@ -1109,8 +1121,9 @@ class SyncHandler(object):
             deleted = yield self.store.delete_messages_for_device(
                 user_id, device_id, since_stream_id
             )
-            logger.debug("Deleted %d to-device messages up to %d",
-                         deleted, since_stream_id)
+            logger.debug(
+                "Deleted %d to-device messages up to %d", deleted, since_stream_id
+            )
 
             messages, stream_id = yield self.store.get_new_messages_for_device(
                 user_id, device_id, since_stream_id, now_token.to_device_key
@@ -1118,7 +1131,10 @@ class SyncHandler(object):
 
             logger.debug(
                 "Returning %d to-device messages between %d and %d (current token: %d)",
-                len(messages), since_stream_id, stream_id, now_token.to_device_key
+                len(messages),
+                since_stream_id,
+                stream_id,
+                now_token.to_device_key,
             )
             sync_result_builder.now_token = now_token.copy_and_replace(
                 "to_device_key", stream_id
@@ -1145,8 +1161,7 @@ class SyncHandler(object):
         if since_token and not sync_result_builder.full_state:
             account_data, account_data_by_room = (
                 yield self.store.get_updated_account_data_for_user(
-                    user_id,
-                    since_token.account_data_key,
+                    user_id, since_token.account_data_key
                 )
             )
 
@@ -1160,27 +1175,28 @@ class SyncHandler(object):
                 )
         else:
             account_data, account_data_by_room = (
-                yield self.store.get_account_data_for_user(
-                    sync_config.user.to_string()
-                )
+                yield self.store.get_account_data_for_user(sync_config.user.to_string())
             )
 
-            account_data['m.push_rules'] = yield self.push_rules_for_user(
+            account_data["m.push_rules"] = yield self.push_rules_for_user(
                 sync_config.user
             )
 
-        account_data_for_user = sync_config.filter_collection.filter_account_data([
-            {"type": account_data_type, "content": content}
-            for account_data_type, content in account_data.items()
-        ])
+        account_data_for_user = sync_config.filter_collection.filter_account_data(
+            [
+                {"type": account_data_type, "content": content}
+                for account_data_type, content in account_data.items()
+            ]
+        )
 
         sync_result_builder.account_data = account_data_for_user
 
         defer.returnValue(account_data_by_room)
 
     @defer.inlineCallbacks
-    def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
-                                          newly_joined_or_invited_users):
+    def _generate_sync_entry_for_presence(
+        self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users
+    ):
         """Generates the presence portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
@@ -1223,17 +1239,13 @@ class SyncHandler(object):
         extra_users_ids.discard(user.to_string())
 
         if extra_users_ids:
-            states = yield self.presence_handler.get_states(
-                extra_users_ids,
-            )
+            states = yield self.presence_handler.get_states(extra_users_ids)
             presence.extend(states)
 
             # Deduplicate the presence entries so that there's at most one per user
             presence = list({p.user_id: p for p in presence}.values())
 
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
+        presence = sync_config.filter_collection.filter_presence(presence)
 
         sync_result_builder.presence = presence
 
@@ -1253,8 +1265,8 @@ class SyncHandler(object):
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         block_all_room_ephemeral = (
-            sync_result_builder.since_token is None and
-            sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
+            sync_result_builder.since_token is None
+            and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
         )
 
         if block_all_room_ephemeral:
@@ -1275,15 +1287,14 @@ class SyncHandler(object):
                 have_changed = yield self._have_rooms_changed(sync_result_builder)
                 if not have_changed:
                     tags_by_room = yield self.store.get_updated_tags(
-                        user_id,
-                        since_token.account_data_key,
+                        user_id, since_token.account_data_key
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
                         defer.returnValue(([], [], [], []))
 
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
-            "m.ignored_user_list", user_id=user_id,
+            "m.ignored_user_list", user_id=user_id
         )
 
         if ignored_account_data:
@@ -1296,7 +1307,7 @@ class SyncHandler(object):
             room_entries, invited, newly_joined_rooms, newly_left_rooms = res
 
             tags_by_room = yield self.store.get_updated_tags(
-                user_id, since_token.account_data_key,
+                user_id, since_token.account_data_key
             )
         else:
             res = yield self._get_all_rooms(sync_result_builder, ignored_users)
@@ -1331,8 +1342,8 @@ class SyncHandler(object):
                 for event in it:
                     if event.type == EventTypes.Member:
                         if (
-                            event.membership == Membership.JOIN or
-                            event.membership == Membership.INVITE
+                            event.membership == Membership.JOIN
+                            or event.membership == Membership.INVITE
                         ):
                             newly_joined_or_invited_users.add(event.state_key)
                         else:
@@ -1343,12 +1354,14 @@ class SyncHandler(object):
 
         newly_left_users -= newly_joined_or_invited_users
 
-        defer.returnValue((
-            newly_joined_rooms,
-            newly_joined_or_invited_users,
-            newly_left_rooms,
-            newly_left_users,
-        ))
+        defer.returnValue(
+            (
+                newly_joined_rooms,
+                newly_joined_or_invited_users,
+                newly_left_rooms,
+                newly_left_users,
+            )
+        )
 
     @defer.inlineCallbacks
     def _have_rooms_changed(self, sync_result_builder):
@@ -1454,7 +1467,9 @@ class SyncHandler(object):
                         prev_membership = old_mem_ev.membership
                     issue4422_logger.debug(
                         "Previous membership for room %s with join: %s (event %s)",
-                        room_id, prev_membership, old_mem_ev_id,
+                        room_id,
+                        prev_membership,
+                        old_mem_ev_id,
                     )
 
                 if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
@@ -1476,8 +1491,7 @@ class SyncHandler(object):
                     if not old_state_ids:
                         old_state_ids = yield self.get_state_at(room_id, since_token)
                         old_mem_ev_id = old_state_ids.get(
-                            (EventTypes.Member, user_id),
-                            None,
+                            (EventTypes.Member, user_id), None
                         )
                         old_mem_ev = None
                         if old_mem_ev_id:
@@ -1498,7 +1512,8 @@ class SyncHandler(object):
             # Always include leave/ban events. Just take the last one.
             # TODO: How do we handle ban -> leave in same batch?
             leave_events = [
-                e for e in non_joins
+                e
+                for e in non_joins
                 if e.membership in (Membership.LEAVE, Membership.BAN)
             ]
 
@@ -1526,15 +1541,17 @@ class SyncHandler(object):
                 else:
                     batch_events = None
 
-                room_entries.append(RoomSyncResultBuilder(
-                    room_id=room_id,
-                    rtype="archived",
-                    events=batch_events,
-                    newly_joined=room_id in newly_joined_rooms,
-                    full_state=False,
-                    since_token=since_token,
-                    upto_token=leave_token,
-                ))
+                room_entries.append(
+                    RoomSyncResultBuilder(
+                        room_id=room_id,
+                        rtype="archived",
+                        events=batch_events,
+                        newly_joined=room_id in newly_joined_rooms,
+                        full_state=False,
+                        since_token=since_token,
+                        upto_token=leave_token,
+                    )
+                )
 
         timeline_limit = sync_config.filter_collection.timeline_limit()
 
@@ -1581,7 +1598,8 @@ class SyncHandler(object):
                 # debugging for https://github.com/matrix-org/synapse/issues/4422
                 issue4422_logger.debug(
                     "RoomSyncResultBuilder events for newly joined room %s: %r",
-                    room_id, entry.events,
+                    room_id,
+                    entry.events,
                 )
             room_entries.append(entry)
 
@@ -1606,12 +1624,14 @@ class SyncHandler(object):
         sync_config = sync_result_builder.sync_config
 
         membership_list = (
-            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+            Membership.INVITE,
+            Membership.JOIN,
+            Membership.LEAVE,
+            Membership.BAN,
         )
 
         room_list = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=user_id,
-            membership_list=membership_list
+            user_id=user_id, membership_list=membership_list
         )
 
         room_entries = []
@@ -1619,23 +1639,22 @@ class SyncHandler(object):
 
         for event in room_list:
             if event.membership == Membership.JOIN:
-                room_entries.append(RoomSyncResultBuilder(
-                    room_id=event.room_id,
-                    rtype="joined",
-                    events=None,
-                    newly_joined=False,
-                    full_state=True,
-                    since_token=since_token,
-                    upto_token=now_token,
-                ))
+                room_entries.append(
+                    RoomSyncResultBuilder(
+                        room_id=event.room_id,
+                        rtype="joined",
+                        events=None,
+                        newly_joined=False,
+                        full_state=True,
+                        since_token=since_token,
+                        upto_token=now_token,
+                    )
+                )
             elif event.membership == Membership.INVITE:
                 if event.sender in ignored_users:
                     continue
                 invite = yield self.store.get_event(event.event_id)
-                invited.append(InvitedSyncResult(
-                    room_id=event.room_id,
-                    invite=invite,
-                ))
+                invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite))
             elif event.membership in (Membership.LEAVE, Membership.BAN):
                 # Always send down rooms we were banned or kicked from.
                 if not sync_config.filter_collection.include_leave:
@@ -1646,22 +1665,31 @@ class SyncHandler(object):
                 leave_token = now_token.copy_and_replace(
                     "room_key", "s%d" % (event.stream_ordering,)
                 )
-                room_entries.append(RoomSyncResultBuilder(
-                    room_id=event.room_id,
-                    rtype="archived",
-                    events=None,
-                    newly_joined=False,
-                    full_state=True,
-                    since_token=since_token,
-                    upto_token=leave_token,
-                ))
+                room_entries.append(
+                    RoomSyncResultBuilder(
+                        room_id=event.room_id,
+                        rtype="archived",
+                        events=None,
+                        newly_joined=False,
+                        full_state=True,
+                        since_token=since_token,
+                        upto_token=leave_token,
+                    )
+                )
 
         defer.returnValue((room_entries, invited, []))
 
     @defer.inlineCallbacks
-    def _generate_room_entry(self, sync_result_builder, ignored_users,
-                             room_builder, ephemeral, tags, account_data,
-                             always_include=False):
+    def _generate_room_entry(
+        self,
+        sync_result_builder,
+        ignored_users,
+        room_builder,
+        ephemeral,
+        tags,
+        account_data,
+        always_include=False,
+    ):
         """Populates the `joined` and `archived` section of `sync_result_builder`
         based on the `room_builder`.
 
@@ -1678,9 +1706,7 @@ class SyncHandler(object):
         """
         newly_joined = room_builder.newly_joined
         full_state = (
-            room_builder.full_state
-            or newly_joined
-            or sync_result_builder.full_state
+            room_builder.full_state or newly_joined or sync_result_builder.full_state
         )
         events = room_builder.events
 
@@ -1697,7 +1723,8 @@ class SyncHandler(object):
         upto_token = room_builder.upto_token
 
         batch = yield self._load_filtered_recents(
-            room_id, sync_config,
+            room_id,
+            sync_config,
             now_token=upto_token,
             since_token=since_token,
             recents=events,
@@ -1708,7 +1735,8 @@ class SyncHandler(object):
             # debug for https://github.com/matrix-org/synapse/issues/4422
             issue4422_logger.debug(
                 "Timeline events after filtering in newly-joined room %s: %r",
-                room_id, batch,
+                room_id,
+                batch,
             )
 
         # When we join the room (or the client requests full_state), we should
@@ -1726,16 +1754,10 @@ class SyncHandler(object):
 
         account_data_events = []
         if tags is not None:
-            account_data_events.append({
-                "type": "m.tag",
-                "content": {"tags": tags},
-            })
+            account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
 
         for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
+            account_data_events.append({"type": account_data_type, "content": content})
 
         account_data_events = sync_config.filter_collection.filter_room_account_data(
             account_data_events
@@ -1743,16 +1765,13 @@ class SyncHandler(object):
 
         ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
 
-        if not (always_include
-                or batch
-                or account_data_events
-                or ephemeral
-                or full_state):
+        if not (
+            always_include or batch or account_data_events or ephemeral or full_state
+        ):
             return
 
         state = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, now_token,
-            full_state=full_state
+            room_id, batch, sync_config, since_token, now_token, full_state=full_state
         )
 
         summary = {}
@@ -1760,22 +1779,19 @@ class SyncHandler(object):
         # we include a summary in room responses when we're lazy loading
         # members (as the client otherwise doesn't have enough info to form
         # the name itself).
-        if (
-            sync_config.filter_collection.lazy_load_members() and
-            (
-                # we recalulate the summary:
-                #   if there are membership changes in the timeline, or
-                #   if membership has changed during a gappy sync, or
-                #   if this is an initial sync.
-                any(ev.type == EventTypes.Member for ev in batch.events) or
-                (
-                    # XXX: this may include false positives in the form of LL
-                    # members which have snuck into state
-                    batch.limited and
-                    any(t == EventTypes.Member for (t, k) in state)
-                ) or
-                since_token is None
+        if sync_config.filter_collection.lazy_load_members() and (
+            # we recalulate the summary:
+            #   if there are membership changes in the timeline, or
+            #   if membership has changed during a gappy sync, or
+            #   if this is an initial sync.
+            any(ev.type == EventTypes.Member for ev in batch.events)
+            or (
+                # XXX: this may include false positives in the form of LL
+                # members which have snuck into state
+                batch.limited
+                and any(t == EventTypes.Member for (t, k) in state)
             )
+            or since_token is None
         ):
             summary = yield self.compute_summary(
                 room_id, sync_config, batch, state, now_token
@@ -1794,9 +1810,7 @@ class SyncHandler(object):
             )
 
             if room_sync or always_include:
-                notifs = yield self.unread_notifs_for_room_id(
-                    room_id, sync_config
-                )
+                notifs = yield self.unread_notifs_for_room_id(room_id, sync_config)
 
                 if notifs is not None:
                     unread_notifications["notification_count"] = notifs["notify_count"]
@@ -1807,11 +1821,8 @@ class SyncHandler(object):
             if batch.limited and since_token:
                 user_id = sync_result_builder.sync_config.user.to_string()
                 logger.info(
-                    "Incremental gappy sync of %s for user %s with %d state events" % (
-                        room_id,
-                        user_id,
-                        len(state),
-                    )
+                    "Incremental gappy sync of %s for user %s with %d state events"
+                    % (room_id, user_id, len(state))
                 )
         elif room_builder.rtype == "archived":
             room_sync = ArchivedSyncResult(
@@ -1841,9 +1852,7 @@ class SyncHandler(object):
             Deferred[frozenset[str]]: Set of room_ids the user is in at given
             stream_ordering.
         """
-        joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(
-            user_id,
-        )
+        joined_rooms = yield self.store.get_rooms_for_user_with_stream_ordering(user_id)
 
         joined_room_ids = set()
 
@@ -1862,11 +1871,9 @@ class SyncHandler(object):
             logger.info("User joined room after current token: %s", room_id)
 
             extrems = yield self.store.get_forward_extremeties_for_room(
-                room_id, stream_ordering,
-            )
-            users_in_room = yield self.state.get_current_users_in_room(
-                room_id, extrems,
+                room_id, stream_ordering
             )
+            users_in_room = yield self.state.get_current_users_in_room(room_id, extrems)
             if user_id in users_in_room:
                 joined_room_ids.add(room_id)
 
@@ -1886,7 +1893,7 @@ def _action_has_highlight(actions):
 
 
 def _calculate_state(
-    timeline_contains, timeline_start, previous, current, lazy_load_members,
+    timeline_contains, timeline_start, previous, current, lazy_load_members
 ):
     """Works out what state to include in a sync response.
 
@@ -1930,15 +1937,12 @@ def _calculate_state(
 
     if lazy_load_members:
         p_ids.difference_update(
-            e for t, e in iteritems(timeline_start)
-            if t[0] == EventTypes.Member
+            e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member
         )
 
     state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
 
-    return {
-        event_id_to_key[e]: e for e in state_ids
-    }
+    return {event_id_to_key[e]: e for e in state_ids}
 
 
 class SyncResultBuilder(object):
@@ -1961,8 +1965,10 @@ class SyncResultBuilder(object):
         groups (GroupsSyncResult|None)
         to_device (list)
     """
-    def __init__(self, sync_config, full_state, since_token, now_token,
-                 joined_room_ids):
+
+    def __init__(
+        self, sync_config, full_state, since_token, now_token, joined_room_ids
+    ):
         """
         Args:
             sync_config (SyncConfig)
@@ -1991,8 +1997,10 @@ class RoomSyncResultBuilder(object):
     """Stores information needed to create either a `JoinedSyncResult` or
     `ArchivedSyncResult`.
     """
-    def __init__(self, room_id, rtype, events, newly_joined, full_state,
-                 since_token, upto_token):
+
+    def __init__(
+        self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token
+    ):
         """
         Args:
             room_id(str)