summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py402
1 files changed, 21 insertions, 381 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 178209a209..abfa8c65a4 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,19 +16,16 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.push.action_generator import ActionGenerator
-from synapse.streams.config import PaginationConfig
 from synapse.types import (
-    UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
+    UserID, RoomAlias, RoomStreamToken, get_domain_from_id
 )
-from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
-from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.async import run_on_reactor, ReadWriteLock
+from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_events_for_client
 
@@ -49,7 +46,6 @@ class MessageHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.clock = hs.get_clock()
         self.validator = EventValidator()
-        self.snapshot_cache = SnapshotCache()
 
         self.pagination_lock = ReadWriteLock()
 
@@ -86,8 +82,8 @@ class MessageHandler(BaseHandler):
             room_token = pagin_config.from_token.room_key
         else:
             pagin_config.from_token = (
-                yield self.hs.get_event_sources().get_current_token(
-                    direction='b'
+                yield self.hs.get_event_sources().get_current_token_for_room(
+                    room_id=room_id
                 )
             )
             room_token = pagin_config.from_token.room_key
@@ -243,6 +239,21 @@ class MessageHandler(BaseHandler):
                 "Tried to send member event through non-member codepath"
             )
 
+        # We check here if we are currently being rate limited, so that we
+        # don't do unnecessary work. We check again just before we actually
+        # send the event.
+        time_now = self.clock.time()
+        allowed, time_allowed = self.ratelimiter.send_message(
+            event.sender, time_now,
+            msg_rate_hz=self.hs.config.rc_messages_per_second,
+            burst_count=self.hs.config.rc_message_burst_count,
+            update=False,
+        )
+        if not allowed:
+            raise LimitExceededError(
+                retry_after_ms=int(1000 * (time_allowed - time_now)),
+            )
+
         user = UserID.from_string(event.sender)
 
         assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
@@ -392,377 +403,6 @@ class MessageHandler(BaseHandler):
             [serialize_event(c, now) for c in room_state.values()]
         )
 
-    def snapshot_all_rooms(self, user_id=None, pagin_config=None,
-                           as_client_event=True, include_archived=False):
-        """Retrieve a snapshot of all rooms the user is invited or has joined.
-
-        This snapshot may include messages for all rooms where the user is
-        joined, depending on the pagination config.
-
-        Args:
-            user_id (str): The ID of the user making the request.
-            pagin_config (synapse.api.streams.PaginationConfig): The pagination
-            config used to determine how many messages *PER ROOM* to return.
-            as_client_event (bool): True to get events in client-server format.
-            include_archived (bool): True to get rooms that the user has left
-        Returns:
-            A list of dicts with "room_id" and "membership" keys for all rooms
-            the user is currently invited or joined in on. Rooms where the user
-            is joined on, may return a "messages" key with messages, depending
-            on the specified PaginationConfig.
-        """
-        key = (
-            user_id,
-            pagin_config.from_token,
-            pagin_config.to_token,
-            pagin_config.direction,
-            pagin_config.limit,
-            as_client_event,
-            include_archived,
-        )
-        now_ms = self.clock.time_msec()
-        result = self.snapshot_cache.get(now_ms, key)
-        if result is not None:
-            return result
-
-        return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms(
-            user_id, pagin_config, as_client_event, include_archived
-        ))
-
-    @defer.inlineCallbacks
-    def _snapshot_all_rooms(self, user_id=None, pagin_config=None,
-                            as_client_event=True, include_archived=False):
-
-        memberships = [Membership.INVITE, Membership.JOIN]
-        if include_archived:
-            memberships.append(Membership.LEAVE)
-
-        room_list = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=user_id, membership_list=memberships
-        )
-
-        user = UserID.from_string(user_id)
-
-        rooms_ret = []
-
-        now_token = yield self.hs.get_event_sources().get_current_token()
-
-        presence_stream = self.hs.get_event_sources().sources["presence"]
-        pagination_config = PaginationConfig(from_token=now_token)
-        presence, _ = yield presence_stream.get_pagination_rows(
-            user, pagination_config.get_source_config("presence"), None
-        )
-
-        receipt_stream = self.hs.get_event_sources().sources["receipt"]
-        receipt, _ = yield receipt_stream.get_pagination_rows(
-            user, pagination_config.get_source_config("receipt"), None
-        )
-
-        tags_by_room = yield self.store.get_tags_for_user(user_id)
-
-        account_data, account_data_by_room = (
-            yield self.store.get_account_data_for_user(user_id)
-        )
-
-        public_room_ids = yield self.store.get_public_room_ids()
-
-        limit = pagin_config.limit
-        if limit is None:
-            limit = 10
-
-        @defer.inlineCallbacks
-        def handle_room(event):
-            d = {
-                "room_id": event.room_id,
-                "membership": event.membership,
-                "visibility": (
-                    "public" if event.room_id in public_room_ids
-                    else "private"
-                ),
-            }
-
-            if event.membership == Membership.INVITE:
-                time_now = self.clock.time_msec()
-                d["inviter"] = event.sender
-
-                invite_event = yield self.store.get_event(event.event_id)
-                d["invite"] = serialize_event(invite_event, time_now, as_client_event)
-
-            rooms_ret.append(d)
-
-            if event.membership not in (Membership.JOIN, Membership.LEAVE):
-                return
-
-            try:
-                if event.membership == Membership.JOIN:
-                    room_end_token = now_token.room_key
-                    deferred_room_state = self.state_handler.get_current_state(
-                        event.room_id
-                    )
-                elif event.membership == Membership.LEAVE:
-                    room_end_token = "s%d" % (event.stream_ordering,)
-                    deferred_room_state = self.store.get_state_for_events(
-                        [event.event_id], None
-                    )
-                    deferred_room_state.addCallback(
-                        lambda states: states[event.event_id]
-                    )
-
-                (messages, token), current_state = yield preserve_context_over_deferred(
-                    defer.gatherResults(
-                        [
-                            preserve_fn(self.store.get_recent_events_for_room)(
-                                event.room_id,
-                                limit=limit,
-                                end_token=room_end_token,
-                            ),
-                            deferred_room_state,
-                        ]
-                    )
-                ).addErrback(unwrapFirstError)
-
-                messages = yield filter_events_for_client(
-                    self.store, user_id, messages
-                )
-
-                start_token = now_token.copy_and_replace("room_key", token[0])
-                end_token = now_token.copy_and_replace("room_key", token[1])
-                time_now = self.clock.time_msec()
-
-                d["messages"] = {
-                    "chunk": [
-                        serialize_event(m, time_now, as_client_event)
-                        for m in messages
-                    ],
-                    "start": start_token.to_string(),
-                    "end": end_token.to_string(),
-                }
-
-                d["state"] = [
-                    serialize_event(c, time_now, as_client_event)
-                    for c in current_state.values()
-                ]
-
-                account_data_events = []
-                tags = tags_by_room.get(event.room_id)
-                if tags:
-                    account_data_events.append({
-                        "type": "m.tag",
-                        "content": {"tags": tags},
-                    })
-
-                account_data = account_data_by_room.get(event.room_id, {})
-                for account_data_type, content in account_data.items():
-                    account_data_events.append({
-                        "type": account_data_type,
-                        "content": content,
-                    })
-
-                d["account_data"] = account_data_events
-            except:
-                logger.exception("Failed to get snapshot")
-
-        yield concurrently_execute(handle_room, room_list, 10)
-
-        account_data_events = []
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        ret = {
-            "rooms": rooms_ret,
-            "presence": presence,
-            "account_data": account_data_events,
-            "receipts": receipt,
-            "end": now_token.to_string(),
-        }
-
-        defer.returnValue(ret)
-
-    @defer.inlineCallbacks
-    def room_initial_sync(self, requester, room_id, pagin_config=None):
-        """Capture the a snapshot of a room. If user is currently a member of
-        the room this will be what is currently in the room. If the user left
-        the room this will be what was in the room when they left.
-
-        Args:
-            requester(Requester): The user to get a snapshot for.
-            room_id(str): The room to get a snapshot of.
-            pagin_config(synapse.streams.config.PaginationConfig):
-                The pagination config used to determine how many messages to
-                return.
-        Raises:
-            AuthError if the user wasn't in the room.
-        Returns:
-            A JSON serialisable dict with the snapshot of the room.
-        """
-
-        user_id = requester.user.to_string()
-
-        membership, member_event_id = yield self._check_in_room_or_world_readable(
-            room_id, user_id,
-        )
-        is_peeking = member_event_id is None
-
-        if membership == Membership.JOIN:
-            result = yield self._room_initial_sync_joined(
-                user_id, room_id, pagin_config, membership, is_peeking
-            )
-        elif membership == Membership.LEAVE:
-            result = yield self._room_initial_sync_parted(
-                user_id, room_id, pagin_config, membership, member_event_id, is_peeking
-            )
-
-        account_data_events = []
-        tags = yield self.store.get_tags_for_room(user_id, room_id)
-        if tags:
-            account_data_events.append({
-                "type": "m.tag",
-                "content": {"tags": tags},
-            })
-
-        account_data = yield self.store.get_account_data_for_room(user_id, room_id)
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        result["account_data"] = account_data_events
-
-        defer.returnValue(result)
-
-    @defer.inlineCallbacks
-    def _room_initial_sync_parted(self, user_id, room_id, pagin_config,
-                                  membership, member_event_id, is_peeking):
-        room_state = yield self.store.get_state_for_events(
-            [member_event_id], None
-        )
-
-        room_state = room_state[member_event_id]
-
-        limit = pagin_config.limit if pagin_config else None
-        if limit is None:
-            limit = 10
-
-        stream_token = yield self.store.get_stream_token_for_event(
-            member_event_id
-        )
-
-        messages, token = yield self.store.get_recent_events_for_room(
-            room_id,
-            limit=limit,
-            end_token=stream_token
-        )
-
-        messages = yield filter_events_for_client(
-            self.store, user_id, messages, is_peeking=is_peeking
-        )
-
-        start_token = StreamToken.START.copy_and_replace("room_key", token[0])
-        end_token = StreamToken.START.copy_and_replace("room_key", token[1])
-
-        time_now = self.clock.time_msec()
-
-        defer.returnValue({
-            "membership": membership,
-            "room_id": room_id,
-            "messages": {
-                "chunk": [serialize_event(m, time_now) for m in messages],
-                "start": start_token.to_string(),
-                "end": end_token.to_string(),
-            },
-            "state": [serialize_event(s, time_now) for s in room_state.values()],
-            "presence": [],
-            "receipts": [],
-        })
-
-    @defer.inlineCallbacks
-    def _room_initial_sync_joined(self, user_id, room_id, pagin_config,
-                                  membership, is_peeking):
-        current_state = yield self.state.get_current_state(
-            room_id=room_id,
-        )
-
-        # TODO: These concurrently
-        time_now = self.clock.time_msec()
-        state = [
-            serialize_event(x, time_now)
-            for x in current_state.values()
-        ]
-
-        now_token = yield self.hs.get_event_sources().get_current_token()
-
-        limit = pagin_config.limit if pagin_config else None
-        if limit is None:
-            limit = 10
-
-        room_members = [
-            m for m in current_state.values()
-            if m.type == EventTypes.Member
-            and m.content["membership"] == Membership.JOIN
-        ]
-
-        presence_handler = self.hs.get_presence_handler()
-
-        @defer.inlineCallbacks
-        def get_presence():
-            states = yield presence_handler.get_states(
-                [m.user_id for m in room_members],
-                as_event=True,
-            )
-
-            defer.returnValue(states)
-
-        @defer.inlineCallbacks
-        def get_receipts():
-            receipts_handler = self.hs.get_handlers().receipts_handler
-            receipts = yield receipts_handler.get_receipts_for_room(
-                room_id,
-                now_token.receipt_key
-            )
-            defer.returnValue(receipts)
-
-        presence, receipts, (messages, token) = yield defer.gatherResults(
-            [
-                preserve_fn(get_presence)(),
-                preserve_fn(get_receipts)(),
-                preserve_fn(self.store.get_recent_events_for_room)(
-                    room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
-                )
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
-
-        messages = yield filter_events_for_client(
-            self.store, user_id, messages, is_peeking=is_peeking,
-        )
-
-        start_token = now_token.copy_and_replace("room_key", token[0])
-        end_token = now_token.copy_and_replace("room_key", token[1])
-
-        time_now = self.clock.time_msec()
-
-        ret = {
-            "room_id": room_id,
-            "messages": {
-                "chunk": [serialize_event(m, time_now) for m in messages],
-                "start": start_token.to_string(),
-                "end": end_token.to_string(),
-            },
-            "state": state,
-            "presence": presence,
-            "receipts": receipts,
-        }
-        if not is_peeking:
-            ret["membership"] = membership
-
-        defer.returnValue(ret)
-
     @measure_func("_create_new_client_event")
     @defer.inlineCallbacks
     def _create_new_client_event(self, builder, prev_event_ids=None):