summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py49
1 files changed, 34 insertions, 15 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bfe2583002..9827c7eb8d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import itertools
 import logging
 from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
@@ -21,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tup
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
 from synapse.api.filtering import FilterCollection
 from synapse.events import EventBase
 from synapse.logging.context import current_context
@@ -32,6 +31,7 @@ from synapse.types import (
     Collection,
     JsonDict,
     MutableStateMap,
+    Requester,
     RoomStreamToken,
     StateMap,
     StreamToken,
@@ -87,7 +87,7 @@ class SyncConfig:
 class TimelineBatch:
     prev_batch = attr.ib(type=StreamToken)
     events = attr.ib(type=List[EventBase])
-    limited = attr.ib(bool)
+    limited = attr.ib(type=bool)
 
     def __bool__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -201,6 +201,8 @@ class SyncResult:
         device_lists: List of user_ids whose devices have changed
         device_one_time_keys_count: Dict of algorithm to count for one time keys
             for this device
+        device_unused_fallback_key_types: List of key types that have an unused fallback
+            key
         groups: Group updates, if any
     """
 
@@ -213,6 +215,7 @@ class SyncResult:
     to_device = attr.ib(type=List[JsonDict])
     device_lists = attr.ib(type=DeviceLists)
     device_one_time_keys_count = attr.ib(type=JsonDict)
+    device_unused_fallback_key_types = attr.ib(type=List[str])
     groups = attr.ib(type=Optional[GroupsSyncResult])
 
     def __bool__(self) -> bool:
@@ -240,7 +243,9 @@ class SyncHandler:
         self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
-        self.response_cache = ResponseCache(hs, "sync")
+        self.response_cache = ResponseCache(
+            hs, "sync"
+        )  # type: ResponseCache[Tuple[Any, ...]]
         self.state = hs.get_state_handler()
         self.auth = hs.get_auth()
         self.storage = hs.get_storage()
@@ -256,6 +261,7 @@ class SyncHandler:
 
     async def wait_for_sync_for_user(
         self,
+        requester: Requester,
         sync_config: SyncConfig,
         since_token: Optional[StreamToken] = None,
         timeout: int = 0,
@@ -269,7 +275,7 @@ class SyncHandler:
         # not been exceeded (if not part of the group by this point, almost certain
         # auth_blocking will occur)
         user_id = sync_config.user.to_string()
-        await self.auth.check_auth_blocking(user_id)
+        await self.auth.check_auth_blocking(requester=requester)
 
         res = await self.response_cache.wrap(
             sync_config.request_key,
@@ -457,8 +463,13 @@ class SyncHandler:
                 recents = []
 
             if not limited or block_all_timeline:
+                prev_batch_token = now_token
+                if recents:
+                    room_key = recents[0].internal_metadata.before
+                    prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+
                 return TimelineBatch(
-                    events=recents, prev_batch=now_token, limited=False
+                    events=recents, prev_batch=prev_batch_token, limited=False
                 )
 
             filtering_factor = 2
@@ -745,7 +756,7 @@ class SyncHandler:
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
-        # updates even if they occured logically before the previous event.
+        # updates even if they occurred logically before the previous event.
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
@@ -1014,10 +1025,14 @@ class SyncHandler:
         logger.debug("Fetching OTK data")
         device_id = sync_config.device_id
         one_time_key_counts = {}  # type: JsonDict
+        unused_fallback_key_types = []  # type: List[str]
         if device_id:
             one_time_key_counts = await self.store.count_e2e_one_time_keys(
                 user_id, device_id
             )
+            unused_fallback_key_types = await self.store.get_e2e_unused_fallback_key_types(
+                user_id, device_id
+            )
 
         logger.debug("Fetching group data")
         await self._generate_sync_entry_for_groups(sync_result_builder)
@@ -1041,6 +1056,7 @@ class SyncHandler:
             device_lists=device_lists,
             groups=sync_result_builder.groups,
             device_one_time_keys_count=one_time_key_counts,
+            device_unused_fallback_key_types=unused_fallback_key_types,
             next_batch=sync_result_builder.now_token,
         )
 
@@ -1378,13 +1394,16 @@ class SyncHandler:
                         return set(), set(), set(), set()
 
         ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
-            "m.ignored_user_list", user_id=user_id
+            AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
         )
 
+        # If there is ignored users account data and it matches the proper type,
+        # then use it.
+        ignored_users = frozenset()  # type: FrozenSet[str]
         if ignored_account_data:
-            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
-        else:
-            ignored_users = frozenset()
+            ignored_users_data = ignored_account_data.get("ignored_users", {})
+            if isinstance(ignored_users_data, dict):
+                ignored_users = frozenset(ignored_users_data.keys())
 
         if since_token:
             room_changes = await self._get_rooms_changed(
@@ -1478,7 +1497,7 @@ class SyncHandler:
         return False
 
     async def _get_rooms_changed(
-        self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+        self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
     ) -> _RoomChanges:
         """Gets the the changes that have happened since the last sync.
         """
@@ -1690,7 +1709,7 @@ class SyncHandler:
         return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms)
 
     async def _get_all_rooms(
-        self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+        self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
     ) -> _RoomChanges:
         """Returns entries for all rooms for the user.
 
@@ -1764,7 +1783,7 @@ class SyncHandler:
     async def _generate_room_entry(
         self,
         sync_result_builder: "SyncResultBuilder",
-        ignored_users: Set[str],
+        ignored_users: FrozenSet[str],
         room_builder: "RoomSyncResultBuilder",
         ephemeral: List[JsonDict],
         tags: Optional[Dict[str, Dict[str, Any]]],
@@ -1865,7 +1884,7 @@ class SyncHandler:
         # members (as the client otherwise doesn't have enough info to form
         # the name itself).
         if sync_config.filter_collection.lazy_load_members() and (
-            # we recalulate the summary:
+            # we recalculate the summary:
             #   if there are membership changes in the timeline, or
             #   if membership has changed during a gappy sync, or
             #   if this is an initial sync.