summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/e2e_keys.py52
-rw-r--r--synapse/handlers/e2e_room_keys.py28
-rw-r--r--synapse/handlers/pagination.py17
3 files changed, 94 insertions, 3 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1f90b0d278..056fb97acb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,6 +24,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import CodeMessageException, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import unwrapFirstError
 from synapse.util.retryutils import NotRetryingDestination
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
             "client_keys", self.on_federation_query_client_keys
         )
 
+    @trace
     @defer.inlineCallbacks
     def query_devices(self, query_body, timeout):
         """ Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         # First get local devices.
         failures = {}
         results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
                 r[user_id] = remote_queries[user_id]
 
         # Now fetch any devices that we don't have in our cache
+        @trace
         @defer.inlineCallbacks
         def do_remote_query(destination):
             """This is called when we are querying the device list of a user on
@@ -185,6 +191,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -198,6 +206,7 @@ class E2eKeysHandler(object):
 
         return {"device_keys": results, "failures": failures}
 
+    @trace
     @defer.inlineCallbacks
     def query_local_devices(self, query):
         """Get E2E device keys for local users
@@ -210,6 +219,7 @@ class E2eKeysHandler(object):
             defer.Deferred: (resolves to dict[string, dict[string, dict]]):
                  map from user_id -> device_id -> device details
         """
+        set_tag("local_query", query)
         local_query = []
 
         result_dict = {}
@@ -217,6 +227,14 @@ class E2eKeysHandler(object):
             # we use UserID.from_string to catch invalid user ids
             if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s", user_id)
+                log_kv(
+                    {
+                        "message": "Requested a local key for a user which"
+                        " was not local to the homeserver",
+                        "user_id": user_id,
+                    }
+                )
+                set_tag("error", True)
                 raise SynapseError(400, "Not a user here")
 
             if not device_ids:
@@ -241,6 +259,7 @@ class E2eKeysHandler(object):
                     r["unsigned"]["device_display_name"] = display_name
                 result_dict[user_id][device_id] = r
 
+        log_kv(results)
         return result_dict
 
     @defer.inlineCallbacks
@@ -251,6 +270,7 @@ class E2eKeysHandler(object):
         res = yield self.query_local_devices(device_keys_query)
         return {"device_keys": res}
 
+    @trace
     @defer.inlineCallbacks
     def claim_one_time_keys(self, query, timeout):
         local_query = []
@@ -265,6 +285,9 @@ class E2eKeysHandler(object):
                 domain = get_domain_from_id(user_id)
                 remote_queries.setdefault(domain, {})[user_id] = device_keys
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         results = yield self.store.claim_e2e_one_time_keys(local_query)
 
         json_result = {}
@@ -276,8 +299,10 @@ class E2eKeysHandler(object):
                         key_id: json.loads(json_bytes)
                     }
 
+        @trace
         @defer.inlineCallbacks
         def claim_client_keys(destination):
+            set_tag("destination", destination)
             device_keys = remote_queries[destination]
             try:
                 remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +315,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -313,9 +340,11 @@ class E2eKeysHandler(object):
             ),
         )
 
+        log_kv({"one_time_keys": json_result, "failures": failures})
         return {"one_time_keys": json_result, "failures": failures}
 
     @defer.inlineCallbacks
+    @tag_args
     def upload_keys_for_user(self, user_id, device_id, keys):
 
         time_now = self.clock.time_msec()
@@ -329,6 +358,13 @@ class E2eKeysHandler(object):
                 user_id,
                 time_now,
             )
+            log_kv(
+                {
+                    "message": "Updating device_keys for user.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             # TODO: Sign the JSON with the server key
             changed = yield self.store.set_e2e_device_keys(
                 user_id, device_id, time_now, device_keys
@@ -336,12 +372,24 @@ class E2eKeysHandler(object):
             if changed:
                 # Only notify about device updates *if* the keys actually changed
                 yield self.device_handler.notify_device_update(user_id, [device_id])
-
+        else:
+            log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
         one_time_keys = keys.get("one_time_keys", None)
         if one_time_keys:
+            log_kv(
+                {
+                    "message": "Updating one_time_keys for device.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             yield self._upload_one_time_keys_for_user(
                 user_id, device_id, time_now, one_time_keys
             )
+        else:
+            log_kv(
+                {"message": "Did not update one_time_keys", "reason": "no keys given"}
+            )
 
         # the device should have been registered already, but it may have been
         # deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +400,7 @@ class E2eKeysHandler(object):
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
+        set_tag("one_time_key_counts", result)
         return {"one_time_key_counts": result}
 
     @defer.inlineCallbacks
@@ -395,6 +444,7 @@ class E2eKeysHandler(object):
                     (algorithm, key_id, encode_canonical_json(key).decode("ascii"))
                 )
 
+        log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
         yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
 
 
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 41b871fc59..a9d80f708c 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -26,6 +26,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.logging.opentracing import log_kv, trace
 from synapse.util.async_helpers import Linearizer
 
 logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
         # changed.
         self._upload_linearizer = Linearizer("upload_room_keys_lock")
 
+    @trace
     @defer.inlineCallbacks
     def get_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
                 user_id, version, room_id, session_id
             )
 
+            log_kv(results)
             return results
 
+    @trace
     @defer.inlineCallbacks
     def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
         with (yield self._upload_linearizer.queue(user_id)):
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+    @trace
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
         """Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
             session_id(str): the session whose room_key we're setting
             room_key(dict): the room_key being set
         """
-
+        log_kv(
+            {
+                "message": "Trying to upload room key",
+                "room_id": room_id,
+                "session_id": session_id,
+                "user_id": user_id,
+            }
+        )
         # get the room_key for this particular row
         current_room_key = None
         try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
             )
         except StoreError as e:
             if e.code == 404:
-                pass
+                log_kv(
+                    {
+                        "message": "Room key not found.",
+                        "room_id": room_id,
+                        "user_id": user_id,
+                    }
+                )
             else:
                 raise
 
         if self._should_replace_room_key(current_room_key, room_key):
+            log_kv({"message": "Replacing room key."})
             yield self.store.set_e2e_room_key(
                 user_id, version, room_id, session_id, room_key
             )
+        else:
+            log_kv({"message": "Not replacing room_key."})
 
     @staticmethod
     def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
                 return False
         return True
 
+    @trace
     @defer.inlineCallbacks
     def create_version(self, user_id, version_info):
         """Create a new backup version.  This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
                     raise
             return res
 
+    @trace
     @defer.inlineCallbacks
     def delete_version(self, user_id, version=None):
         """Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
                 else:
                     raise
 
+    @trace
     @defer.inlineCallbacks
     def update_version(self, user_id, version, version_info):
         """Update the info about a given version of the user's backup
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d83aab3f74..5744f4579d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -70,6 +70,7 @@ class PaginationHandler(object):
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
+        self._server_name = hs.hostname
 
         self.pagination_lock = ReadWriteLock()
         self._purges_in_progress_by_room = set()
@@ -153,6 +154,22 @@ class PaginationHandler(object):
         """
         return self._purges_by_id.get(purge_id)
 
+    async def purge_room(self, room_id):
+        """Purge the given room from the database"""
+        with (await self.pagination_lock.write(room_id)):
+            # check we know about the room
+            await self.store.get_room_version(room_id)
+
+            # first check that we have no users in this room
+            joined = await defer.maybeDeferred(
+                self.store.is_host_joined, room_id, self._server_name
+            )
+
+            if joined:
+                raise SynapseError(400, "Users are still joined to this room")
+
+            await self.store.purge_room(room_id)
+
     @defer.inlineCallbacks
     def get_messages(
         self,