summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py8
-rw-r--r--synapse/handlers/message.py123
-rw-r--r--synapse/handlers/room.py50
3 files changed, 173 insertions, 8 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d3267734f7..d9d0cd9eef 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -121,6 +121,7 @@ class FederationHandler(BaseHandler):
         self.pusher_pool = hs.get_pusherpool()
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self._message_handler = hs.get_message_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
         self.config = hs.config
         self.http_client = hs.get_simple_http_client()
@@ -141,6 +142,8 @@ class FederationHandler(BaseHandler):
 
         self.third_party_event_rules = hs.get_third_party_event_rules()
 
+        self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
+
     @defer.inlineCallbacks
     def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False):
         """ Process a PDU received via a federation /send/ transaction, or
@@ -2715,6 +2718,11 @@ class FederationHandler(BaseHandler):
                 event_and_contexts, backfilled=backfilled
             )
 
+            if self._ephemeral_messages_enabled:
+                for (event, context) in event_and_contexts:
+                    # If there's an expiry timestamp on the event, schedule its expiry.
+                    self._message_handler.maybe_schedule_expiry(event)
+
             if not backfilled:  # Never notify for backfilled events
                 for event, _ in event_and_contexts:
                     yield self._notify_persisted_event(event, max_stream_id)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3b0156f516..4f53a5f5dc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import Optional
 
 from six import iteritems, itervalues, string_types
 
@@ -22,9 +23,16 @@ from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
 from twisted.internet.defer import succeed
+from twisted.internet.interfaces import IDelayedCall
 
 from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
+from synapse.api.constants import (
+    EventContentFields,
+    EventTypes,
+    Membership,
+    RelationTypes,
+    UserTypes,
+)
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -62,6 +70,17 @@ class MessageHandler(object):
         self.storage = hs.get_storage()
         self.state_store = self.storage.state
         self._event_serializer = hs.get_event_client_serializer()
+        self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+        self._is_worker_app = bool(hs.config.worker_app)
+
+        # The scheduled call to self._expire_event. None if no call is currently
+        # scheduled.
+        self._scheduled_expiry = None  # type: Optional[IDelayedCall]
+
+        if not hs.config.worker_app:
+            run_as_background_process(
+                "_schedule_next_expiry", self._schedule_next_expiry
+            )
 
     @defer.inlineCallbacks
     def get_room_data(
@@ -225,6 +244,100 @@ class MessageHandler(object):
             for user_id, profile in iteritems(users_with_profile)
         }
 
+    def maybe_schedule_expiry(self, event):
+        """Schedule the expiry of an event if there's not already one scheduled,
+        or if the one running is for an event that will expire after the provided
+        timestamp.
+
+        This function needs to invalidate the event cache, which is only possible on
+        the master process, and therefore needs to be run on there.
+
+        Args:
+            event (EventBase): The event to schedule the expiry of.
+        """
+        assert not self._is_worker_app
+
+        expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
+        if not isinstance(expiry_ts, int) or event.is_state():
+            return
+
+        # _schedule_expiry_for_event won't actually schedule anything if there's already
+        # a task scheduled for a timestamp that's sooner than the provided one.
+        self._schedule_expiry_for_event(event.event_id, expiry_ts)
+
+    @defer.inlineCallbacks
+    def _schedule_next_expiry(self):
+        """Retrieve the ID and the expiry timestamp of the next event to be expired,
+        and schedule an expiry task for it.
+
+        If there's no event left to expire, set _expiry_scheduled to None so that a
+        future call to save_expiry_ts can schedule a new expiry task.
+        """
+        # Try to get the expiry timestamp of the next event to expire.
+        res = yield self.store.get_next_event_to_expire()
+        if res:
+            event_id, expiry_ts = res
+            self._schedule_expiry_for_event(event_id, expiry_ts)
+
+    def _schedule_expiry_for_event(self, event_id, expiry_ts):
+        """Schedule an expiry task for the provided event if there's not already one
+        scheduled at a timestamp that's sooner than the provided one.
+
+        Args:
+            event_id (str): The ID of the event to expire.
+            expiry_ts (int): The timestamp at which to expire the event.
+        """
+        if self._scheduled_expiry:
+            # If the provided timestamp refers to a time before the scheduled time of the
+            # next expiry task, cancel that task and reschedule it for this timestamp.
+            next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000
+            if expiry_ts < next_scheduled_expiry_ts:
+                self._scheduled_expiry.cancel()
+            else:
+                return
+
+        # Figure out how many seconds we need to wait before expiring the event.
+        now_ms = self.clock.time_msec()
+        delay = (expiry_ts - now_ms) / 1000
+
+        # callLater doesn't support negative delays, so trim the delay to 0 if we're
+        # in that case.
+        if delay < 0:
+            delay = 0
+
+        logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay)
+
+        self._scheduled_expiry = self.clock.call_later(
+            delay,
+            run_as_background_process,
+            "_expire_event",
+            self._expire_event,
+            event_id,
+        )
+
+    @defer.inlineCallbacks
+    def _expire_event(self, event_id):
+        """Retrieve and expire an event that needs to be expired from the database.
+
+        If the event doesn't exist in the database, log it and delete the expiry date
+        from the database (so that we don't try to expire it again).
+        """
+        assert self._ephemeral_events_enabled
+
+        self._scheduled_expiry = None
+
+        logger.info("Expiring event %s", event_id)
+
+        try:
+            # Expire the event if we know about it. This function also deletes the expiry
+            # date from the database in the same database transaction.
+            yield self.store.expire_event(event_id)
+        except Exception as e:
+            logger.error("Could not expire event %s: %r", event_id, e)
+
+        # Schedule the expiry of the next event to expire.
+        yield self._schedule_next_expiry()
+
 
 # The duration (in ms) after which rooms should be removed
 # `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
@@ -295,6 +408,10 @@ class EventCreationHandler(object):
                 5 * 60 * 1000,
             )
 
+        self._message_handler = hs.get_message_handler()
+
+        self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+
     @defer.inlineCallbacks
     def create_event(
         self,
@@ -877,6 +994,10 @@ class EventCreationHandler(object):
             event, context=context
         )
 
+        if self._ephemeral_events_enabled:
+            # If there's an expiry timestamp on the event, schedule its expiry.
+            self._message_handler.maybe_schedule_expiry(event)
+
         yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
 
         def _notify():
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e92b2eafd5..22768e97ff 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -198,21 +199,21 @@ class RoomCreationHandler(BaseHandler):
         # finally, shut down the PLs in the old room, and update them in the new
         # room.
         yield self._update_upgraded_room_pls(
-            requester, old_room_id, new_room_id, old_room_state
+            requester, old_room_id, new_room_id, old_room_state,
         )
 
         return new_room_id
 
     @defer.inlineCallbacks
     def _update_upgraded_room_pls(
-        self, requester, old_room_id, new_room_id, old_room_state
+        self, requester, old_room_id, new_room_id, old_room_state,
     ):
         """Send updated power levels in both rooms after an upgrade
 
         Args:
             requester (synapse.types.Requester): the user requesting the upgrade
-            old_room_id (unicode): the id of the room to be replaced
-            new_room_id (unicode): the id of the replacement room
+            old_room_id (str): the id of the room to be replaced
+            new_room_id (str): the id of the replacement room
             old_room_state (dict[tuple[str, str], str]): the state map for the old room
 
         Returns:
@@ -298,7 +299,7 @@ class RoomCreationHandler(BaseHandler):
             tombstone_event_id (unicode|str): the ID of the tombstone event in the old
                 room.
         Returns:
-            Deferred[None]
+            Deferred
         """
         user_id = requester.user.to_string()
 
@@ -333,6 +334,7 @@ class RoomCreationHandler(BaseHandler):
             (EventTypes.Encryption, ""),
             (EventTypes.ServerACL, ""),
             (EventTypes.RelatedGroups, ""),
+            (EventTypes.PowerLevels, ""),
         )
 
         old_room_state_ids = yield self.store.get_filtered_current_state_ids(
@@ -346,6 +348,31 @@ class RoomCreationHandler(BaseHandler):
             if old_event:
                 initial_state[k] = old_event.content
 
+        # Resolve the minimum power level required to send any state event
+        # We will give the upgrading user this power level temporarily (if necessary) such that
+        # they are able to copy all of the state events over, then revert them back to their
+        # original power level afterwards in _update_upgraded_room_pls
+
+        # Copy over user power levels now as this will not be possible with >100PL users once
+        # the room has been created
+
+        power_levels = initial_state[(EventTypes.PowerLevels, "")]
+
+        # Calculate the minimum power level needed to clone the room
+        event_power_levels = power_levels.get("events", {})
+        state_default = power_levels.get("state_default", 0)
+        ban = power_levels.get("ban")
+        needed_power_level = max(state_default, ban, max(event_power_levels.values()))
+
+        # Raise the requester's power level in the new room if necessary
+        current_power_level = power_levels["users"][requester.user.to_string()]
+        if current_power_level < needed_power_level:
+            # Assign this power level to the requester
+            power_levels["users"][requester.user.to_string()] = needed_power_level
+
+        # Set the power levels to the modified state
+        initial_state[(EventTypes.PowerLevels, "")] = power_levels
+
         yield self._send_events_for_new_room(
             requester,
             new_room_id,
@@ -874,6 +901,10 @@ class RoomContextHandler(object):
             room_id, event_id, before_limit, after_limit, event_filter
         )
 
+        if event_filter:
+            results["events_before"] = event_filter.filter(results["events_before"])
+            results["events_after"] = event_filter.filter(results["events_after"])
+
         results["events_before"] = yield filter_evts(results["events_before"])
         results["events_after"] = yield filter_evts(results["events_after"])
         results["event"] = event
@@ -902,7 +933,12 @@ class RoomContextHandler(object):
         state = yield self.state_store.get_state_for_events(
             [last_event_id], state_filter=state_filter
         )
-        results["state"] = list(state[last_event_id].values())
+
+        state_events = list(state[last_event_id].values())
+        if event_filter:
+            state_events = event_filter.filter(state_events)
+
+        results["state"] = state_events
 
         # We use a dummy token here as we only care about the room portion of
         # the token, which we replace.