summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py140
1 files changed, 101 insertions, 39 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 39d7724778..a7cd779b02 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,9 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import sys
 
-import six
 from six import iteritems, itervalues, string_types
 
 from canonicaljson import encode_canonical_json, json
@@ -25,17 +23,25 @@ from twisted.internet import defer
 from twisted.internet.defer import succeed
 
 from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    ConsentNotGivenError,
+    NotFoundError,
+    SynapseError,
+)
 from synapse.api.urls import ConsentURIBuilder
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
-from synapse.replication.http.send_event import send_event_to_master
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, UserID
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -75,35 +81,87 @@ class MessageHandler(object):
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
             room_state = yield self.store.get_state_for_events(
-                [membership_event_id], [key]
+                [membership_event_id], StateFilter.from_types([key])
             )
             data = room_state[membership_event_id].get(key)
 
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def get_state_events(self, user_id, room_id, is_guest=False):
+    def get_state_events(
+        self, user_id, room_id, state_filter=StateFilter.all(),
+        at_token=None, is_guest=False,
+    ):
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
-        left the room return the state events from when they left.
+        left the room return the state events from when they left. If an explicit
+        'at' parameter is passed, return the state events as of that event, if
+        visible.
 
         Args:
             user_id(str): The user requesting state events.
             room_id(str): The room ID to get all state events from.
+            state_filter (StateFilter): The state filter used to fetch state
+                from the database.
+            at_token(StreamToken|None): the stream token of the at which we are requesting
+                the stats. If the user is not allowed to view the state as of that
+                stream token, we raise a 403 SynapseError. If None, returns the current
+                state based on the current_state_events table.
+            is_guest(bool): whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
+        Raises:
+            NotFoundError (404) if the at token does not yield an event
+
+            AuthError (403) if the user doesn't have permission to view
+            members of this room.
         """
-        membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
-            room_id, user_id
-        )
+        if at_token:
+            # FIXME this claims to get the state at a stream position, but
+            # get_recent_events_for_room operates by topo ordering. This therefore
+            # does not reliably give you the state at the given stream position.
+            # (https://github.com/matrix-org/synapse/issues/3305)
+            last_events, _ = yield self.store.get_recent_events_for_room(
+                room_id, end_token=at_token.room_key, limit=1,
+            )
 
-        if membership == Membership.JOIN:
-            room_state = yield self.state.get_current_state(room_id)
-        elif membership == Membership.LEAVE:
-            room_state = yield self.store.get_state_for_events(
-                [membership_event_id], None
+            if not last_events:
+                raise NotFoundError("Can't find event for token %s" % (at_token, ))
+
+            visible_events = yield filter_events_for_client(
+                self.store, user_id, last_events,
+            )
+
+            event = last_events[0]
+            if visible_events:
+                room_state = yield self.store.get_state_for_events(
+                    [event.event_id], state_filter=state_filter,
+                )
+                room_state = room_state[event.event_id]
+            else:
+                raise AuthError(
+                    403,
+                    "User %s not allowed to view events in room %s at token %s" % (
+                        user_id, room_id, at_token,
+                    )
+                )
+        else:
+            membership, membership_event_id = (
+                yield self.auth.check_in_room_or_world_readable(
+                    room_id, user_id,
+                )
             )
-            room_state = room_state[membership_event_id]
+
+            if membership == Membership.JOIN:
+                state_ids = yield self.store.get_filtered_current_state_ids(
+                    room_id, state_filter=state_filter,
+                )
+                room_state = yield self.store.get_events(state_ids.values())
+            elif membership == Membership.LEAVE:
+                room_state = yield self.store.get_state_for_events(
+                    [membership_event_id], state_filter=state_filter,
+                )
+                room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
         defer.returnValue(
@@ -171,7 +229,7 @@ class EventCreationHandler(object):
         self.notifier = hs.get_notifier()
         self.config = hs.config
 
-        self.http_client = hs.get_simple_http_client()
+        self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -212,10 +270,14 @@ class EventCreationHandler(object):
                 where *hashes* is a map from algorithm to hash.
 
                 If None, they will be requested from the database.
-
+        Raises:
+            ResourceLimitError if server is blocked to some resource being
+            exceeded
         Returns:
             Tuple of created event (FrozenEvent), Context
         """
+        yield self.auth.check_auth_blocking(requester.user.to_string())
+
         builder = self.event_builder_factory.new(event_dict)
 
         self.validator.validate_new(builder)
@@ -365,6 +427,9 @@ class EventCreationHandler(object):
 
         if event.is_state():
             prev_state = yield self.deduplicate_state_event(event, context)
+            logger.info(
+                "Not bothering to persist duplicate state event %s", event.event_id,
+            )
             if prev_state is not None:
                 defer.returnValue(prev_state)
 
@@ -556,21 +621,22 @@ class EventCreationHandler(object):
             event, context
         )
 
+        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+        # hack around with a try/finally instead.
+        success = False
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
-                yield send_event_to_master(
-                    clock=self.hs.get_clock(),
+                yield self.send_event_to_master(
+                    event_id=event.event_id,
                     store=self.store,
-                    client=self.http_client,
-                    host=self.config.worker_replication_host,
-                    port=self.config.worker_replication_http_port,
                     requester=requester,
                     event=event,
                     context=context,
                     ratelimit=ratelimit,
                     extra_users=extra_users,
                 )
+                success = True
                 return
 
             yield self.persist_and_notify_client_event(
@@ -580,17 +646,16 @@ class EventCreationHandler(object):
                 ratelimit=ratelimit,
                 extra_users=extra_users,
             )
-        except:  # noqa: E722, as we reraise the exception this is fine.
-            # Ensure that we actually remove the entries in the push actions
-            # staging area, if we calculated them.
-            tp, value, tb = sys.exc_info()
-
-            run_in_background(
-                self.store.remove_push_actions_from_staging,
-                event.event_id,
-            )
 
-            six.reraise(tp, value, tb)
+            success = True
+        finally:
+            if not success:
+                # Ensure that we actually remove the entries in the push actions
+                # staging area, if we calculated them.
+                run_in_background(
+                    self.store.remove_push_actions_from_staging,
+                    event.event_id,
+                )
 
     @defer.inlineCallbacks
     def persist_and_notify_client_event(
@@ -713,11 +778,8 @@ class EventCreationHandler(object):
             event, context=context
         )
 
-        # this intentionally does not yield: we don't care about the result
-        # and don't need to wait for it.
-        run_in_background(
-            self.pusher_pool.on_new_notifications,
-            event_stream_id, max_stream_id
+        yield self.pusher_pool.on_new_notifications(
+            event_stream_id, max_stream_id,
         )
 
         def _notify():