diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 3b0156f516..4f53a5f5dc 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import Optional
from six import iteritems, itervalues, string_types
@@ -22,9 +23,16 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
+from twisted.internet.interfaces import IDelayedCall
from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ Membership,
+ RelationTypes,
+ UserTypes,
+)
from synapse.api.errors import (
AuthError,
Codes,
@@ -62,6 +70,17 @@ class MessageHandler(object):
self.storage = hs.get_storage()
self.state_store = self.storage.state
self._event_serializer = hs.get_event_client_serializer()
+ self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+ self._is_worker_app = bool(hs.config.worker_app)
+
+ # The scheduled call to self._expire_event. None if no call is currently
+ # scheduled.
+ self._scheduled_expiry = None # type: Optional[IDelayedCall]
+
+ if not hs.config.worker_app:
+ run_as_background_process(
+ "_schedule_next_expiry", self._schedule_next_expiry
+ )
@defer.inlineCallbacks
def get_room_data(
@@ -225,6 +244,100 @@ class MessageHandler(object):
for user_id, profile in iteritems(users_with_profile)
}
+ def maybe_schedule_expiry(self, event):
+ """Schedule the expiry of an event if there's not already one scheduled,
+ or if the one running is for an event that will expire after the provided
+ timestamp.
+
+ This function needs to invalidate the event cache, which is only possible on
+ the master process, and therefore needs to be run on there.
+
+ Args:
+ event (EventBase): The event to schedule the expiry of.
+ """
+ assert not self._is_worker_app
+
+ expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
+ if not isinstance(expiry_ts, int) or event.is_state():
+ return
+
+ # _schedule_expiry_for_event won't actually schedule anything if there's already
+ # a task scheduled for a timestamp that's sooner than the provided one.
+ self._schedule_expiry_for_event(event.event_id, expiry_ts)
+
+ @defer.inlineCallbacks
+ def _schedule_next_expiry(self):
+ """Retrieve the ID and the expiry timestamp of the next event to be expired,
+ and schedule an expiry task for it.
+
+ If there's no event left to expire, set _expiry_scheduled to None so that a
+ future call to save_expiry_ts can schedule a new expiry task.
+ """
+ # Try to get the expiry timestamp of the next event to expire.
+ res = yield self.store.get_next_event_to_expire()
+ if res:
+ event_id, expiry_ts = res
+ self._schedule_expiry_for_event(event_id, expiry_ts)
+
+ def _schedule_expiry_for_event(self, event_id, expiry_ts):
+ """Schedule an expiry task for the provided event if there's not already one
+ scheduled at a timestamp that's sooner than the provided one.
+
+ Args:
+ event_id (str): The ID of the event to expire.
+ expiry_ts (int): The timestamp at which to expire the event.
+ """
+ if self._scheduled_expiry:
+ # If the provided timestamp refers to a time before the scheduled time of the
+ # next expiry task, cancel that task and reschedule it for this timestamp.
+ next_scheduled_expiry_ts = self._scheduled_expiry.getTime() * 1000
+ if expiry_ts < next_scheduled_expiry_ts:
+ self._scheduled_expiry.cancel()
+ else:
+ return
+
+ # Figure out how many seconds we need to wait before expiring the event.
+ now_ms = self.clock.time_msec()
+ delay = (expiry_ts - now_ms) / 1000
+
+ # callLater doesn't support negative delays, so trim the delay to 0 if we're
+ # in that case.
+ if delay < 0:
+ delay = 0
+
+ logger.info("Scheduling expiry for event %s in %.3fs", event_id, delay)
+
+ self._scheduled_expiry = self.clock.call_later(
+ delay,
+ run_as_background_process,
+ "_expire_event",
+ self._expire_event,
+ event_id,
+ )
+
+ @defer.inlineCallbacks
+ def _expire_event(self, event_id):
+ """Retrieve and expire an event that needs to be expired from the database.
+
+ If the event doesn't exist in the database, log it and delete the expiry date
+ from the database (so that we don't try to expire it again).
+ """
+ assert self._ephemeral_events_enabled
+
+ self._scheduled_expiry = None
+
+ logger.info("Expiring event %s", event_id)
+
+ try:
+ # Expire the event if we know about it. This function also deletes the expiry
+ # date from the database in the same database transaction.
+ yield self.store.expire_event(event_id)
+ except Exception as e:
+ logger.error("Could not expire event %s: %r", event_id, e)
+
+ # Schedule the expiry of the next event to expire.
+ yield self._schedule_next_expiry()
+
# The duration (in ms) after which rooms should be removed
# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
@@ -295,6 +408,10 @@ class EventCreationHandler(object):
5 * 60 * 1000,
)
+ self._message_handler = hs.get_message_handler()
+
+ self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
+
@defer.inlineCallbacks
def create_event(
self,
@@ -877,6 +994,10 @@ class EventCreationHandler(object):
event, context=context
)
+ if self._ephemeral_events_enabled:
+ # If there's an expiry timestamp on the event, schedule its expiry.
+ self._message_handler.maybe_schedule_expiry(event)
+
yield self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id)
def _notify():
|