summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorHubert Chathi <hubert@uhoreg.ca>2019-10-30 10:17:10 -0400
committerHubert Chathi <hubert@uhoreg.ca>2019-10-30 10:17:10 -0400
commit7d7eac61be337b7a3ed7274a13e131f41d0966d9 (patch)
treee6f70c499902b00ed0450e24bb4132e3079698e6 /synapse/handlers
parentblack (diff)
parentMerge pull request #6240 from matrix-org/erikj/split_out_persistence_store (diff)
downloadsynapse-7d7eac61be337b7a3ed7274a13e131f41d0966d9.tar.xz
Merge branch 'develop' into cross-signing_federation
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py25
-rw-r--r--synapse/handlers/message.py3
-rw-r--r--synapse/handlers/read_marker.py13
-rw-r--r--synapse/handlers/receipts.py37
-rw-r--r--synapse/handlers/stats.py4
5 files changed, 43 insertions, 39 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4b4c6c15f9..08276fdebf 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -109,6 +109,7 @@ class FederationHandler(BaseHandler):
         self.hs = hs
 
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
         self.server_name = hs.hostname
@@ -1222,7 +1223,6 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred[FrozenEvent]
         """
-
         if get_domain_from_id(user_id) != origin:
             logger.info(
                 "Got /make_join request for user %r from different origin %s, ignoring",
@@ -1251,7 +1251,7 @@ class FederationHandler(BaseHandler):
                 builder=builder
             )
         except AuthError as e:
-            logger.warn("Failed to create join %r because %s", event, e)
+            logger.warn("Failed to create join to %s because %s", room_id, e)
             raise e
 
         event_allowed = yield self.third_party_event_rules.check_event_allowed(
@@ -1280,11 +1280,20 @@ class FederationHandler(BaseHandler):
         event = pdu
 
         logger.debug(
-            "on_send_join_request: Got event: %s, signatures: %s",
+            "on_send_join_request from %s: Got event: %s, signatures: %s",
+            origin,
             event.event_id,
             event.signatures,
         )
 
+        if get_domain_from_id(event.sender) != origin:
+            logger.info(
+                "Got /send_join request for user %r from different origin %s",
+                event.sender,
+                origin,
+            )
+            raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
+
         event.internal_metadata.outlier = False
         # Send this event on behalf of the origin server.
         #
@@ -1503,6 +1512,14 @@ class FederationHandler(BaseHandler):
             event.signatures,
         )
 
+        if get_domain_from_id(event.sender) != origin:
+            logger.info(
+                "Got /send_leave request for user %r from different origin %s",
+                event.sender,
+                origin,
+            )
+            raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
+
         event.internal_metadata.outlier = False
 
         context = yield self._handle_new_event(origin, event)
@@ -2648,7 +2665,7 @@ class FederationHandler(BaseHandler):
                 backfilled=backfilled,
             )
         else:
-            max_stream_id = yield self.store.persist_events(
+            max_stream_id = yield self.storage.persistence.persist_events(
                 event_and_contexts, backfilled=backfilled
             )
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0f8cce8ffe..7908a2d52c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -234,6 +234,7 @@ class EventCreationHandler(object):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
         self.state = hs.get_state_handler()
         self.clock = hs.get_clock()
         self.validator = EventValidator()
@@ -868,7 +869,7 @@ class EventCreationHandler(object):
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+        event_stream_id, max_stream_id = yield self.storage.persistence.persist_event(
             event, context=context
         )
 
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 3e4d8c93a4..e3b528d271 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from twisted.internet import defer
-
 from synapse.util.async_helpers import Linearizer
 
 from ._base import BaseHandler
@@ -32,8 +30,7 @@ class ReadMarkerHandler(BaseHandler):
         self.read_marker_linearizer = Linearizer(name="read_marker")
         self.notifier = hs.get_notifier()
 
-    @defer.inlineCallbacks
-    def received_client_read_marker(self, room_id, user_id, event_id):
+    async def received_client_read_marker(self, room_id, user_id, event_id):
         """Updates the read marker for a given user in a given room if the event ID given
         is ahead in the stream relative to the current read marker.
 
@@ -41,8 +38,8 @@ class ReadMarkerHandler(BaseHandler):
         the read marker has changed.
         """
 
-        with (yield self.read_marker_linearizer.queue((room_id, user_id))):
-            existing_read_marker = yield self.store.get_account_data_for_room_and_type(
+        with await self.read_marker_linearizer.queue((room_id, user_id)):
+            existing_read_marker = await self.store.get_account_data_for_room_and_type(
                 user_id, room_id, "m.fully_read"
             )
 
@@ -50,13 +47,13 @@ class ReadMarkerHandler(BaseHandler):
 
             if existing_read_marker:
                 # Only update if the new marker is ahead in the stream
-                should_update = yield self.store.is_event_after(
+                should_update = await self.store.is_event_after(
                     event_id, existing_read_marker["event_id"]
                 )
 
             if should_update:
                 content = {"event_id": event_id}
-                max_id = yield self.store.add_account_data_to_room(
+                max_id = await self.store.add_account_data_to_room(
                     user_id, room_id, "m.fully_read", content
                 )
                 self.notifier.on_new_event("account_data_key", max_id, users=[user_id])
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 6854c751a6..9283c039e3 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 
 from synapse.handlers._base import BaseHandler
 from synapse.types import ReadReceipt, get_domain_from_id
+from synapse.util.async_helpers import maybe_awaitable
 
 logger = logging.getLogger(__name__)
 
@@ -36,8 +37,7 @@ class ReceiptsHandler(BaseHandler):
         self.clock = self.hs.get_clock()
         self.state = hs.get_state_handler()
 
-    @defer.inlineCallbacks
-    def _received_remote_receipt(self, origin, content):
+    async def _received_remote_receipt(self, origin, content):
         """Called when we receive an EDU of type m.receipt from a remote HS.
         """
         receipts = []
@@ -62,17 +62,16 @@ class ReceiptsHandler(BaseHandler):
                         )
                     )
 
-        yield self._handle_new_receipts(receipts)
+        await self._handle_new_receipts(receipts)
 
-    @defer.inlineCallbacks
-    def _handle_new_receipts(self, receipts):
+    async def _handle_new_receipts(self, receipts):
         """Takes a list of receipts, stores them and informs the notifier.
         """
         min_batch_id = None
         max_batch_id = None
 
         for receipt in receipts:
-            res = yield self.store.insert_receipt(
+            res = await self.store.insert_receipt(
                 receipt.room_id,
                 receipt.receipt_type,
                 receipt.user_id,
@@ -99,14 +98,15 @@ class ReceiptsHandler(BaseHandler):
 
         self.notifier.on_new_event("receipt_key", max_batch_id, rooms=affected_room_ids)
         # Note that the min here shouldn't be relied upon to be accurate.
-        yield self.hs.get_pusherpool().on_new_receipts(
-            min_batch_id, max_batch_id, affected_room_ids
+        await maybe_awaitable(
+            self.hs.get_pusherpool().on_new_receipts(
+                min_batch_id, max_batch_id, affected_room_ids
+            )
         )
 
         return True
 
-    @defer.inlineCallbacks
-    def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
+    async def received_client_receipt(self, room_id, receipt_type, user_id, event_id):
         """Called when a client tells us a local user has read up to the given
         event_id in the room.
         """
@@ -118,24 +118,11 @@ class ReceiptsHandler(BaseHandler):
             data={"ts": int(self.clock.time_msec())},
         )
 
-        is_new = yield self._handle_new_receipts([receipt])
+        is_new = await self._handle_new_receipts([receipt])
         if not is_new:
             return
 
-        yield self.federation.send_read_receipt(receipt)
-
-    @defer.inlineCallbacks
-    def get_receipts_for_room(self, room_id, to_key):
-        """Gets all receipts for a room, upto the given key.
-        """
-        result = yield self.store.get_linearized_receipts_for_room(
-            room_id, to_key=to_key
-        )
-
-        if not result:
-            return []
-
-        return result
+        await self.federation.send_read_receipt(receipt)
 
 
 class ReceiptEventSource(object):
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 466daf9202..26bc276692 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -45,6 +45,8 @@ class StatsHandler(StateDeltasHandler):
         self.is_mine_id = hs.is_mine_id
         self.stats_bucket_size = hs.config.stats_bucket_size
 
+        self.stats_enabled = hs.config.stats_enabled
+
         # The current position in the current_state_delta stream
         self.pos = None
 
@@ -61,7 +63,7 @@ class StatsHandler(StateDeltasHandler):
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
-        if not self.hs.config.stats_enabled or self._is_processing:
+        if not self.stats_enabled or self._is_processing:
             return
 
         self._is_processing = True