diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b793fc4df7..8343b5839d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -86,14 +86,14 @@ class MessageHandler(BaseHandler):
# map from purge id to PurgeStatus
self._purges_by_id = {}
- def start_purge_history(self, room_id, topological_ordering,
+ def start_purge_history(self, room_id, token,
delete_local_events=False):
"""Start off a history purge on a room.
Args:
room_id (str): The room to purge from
- topological_ordering (int): minimum topo ordering to preserve
+ token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
@@ -115,19 +115,19 @@ class MessageHandler(BaseHandler):
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
self._purge_history,
- purge_id, room_id, topological_ordering, delete_local_events,
+ purge_id, room_id, token, delete_local_events,
)
return purge_id
@defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, topological_ordering,
+ def _purge_history(self, purge_id, room_id, token,
delete_local_events):
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
- topological_ordering (int): minimum topo ordering to preserve
+ token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
@@ -138,7 +138,7 @@ class MessageHandler(BaseHandler):
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(
- room_id, topological_ordering, delete_local_events,
+ room_id, token, delete_local_events,
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 585f3e4da2..91218e40e6 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -464,61 +464,6 @@ class PresenceHandler(object):
return syncing_user_ids
@defer.inlineCallbacks
- def update_external_syncs(self, process_id, syncing_user_ids):
- """Update the syncing users for an external process
-
- Args:
- process_id(str): An identifier for the process the users are
- syncing against. This allows synapse to process updates
- as user start and stop syncing against a given process.
- syncing_user_ids(set(str)): The set of user_ids that are
- currently syncing on that server.
- """
-
- # Grab the previous list of user_ids that were syncing on that process
- prev_syncing_user_ids = (
- self.external_process_to_current_syncs.get(process_id, set())
- )
- # Grab the current presence state for both the users that are syncing
- # now and the users that were syncing before this update.
- prev_states = yield self.current_state_for_users(
- syncing_user_ids | prev_syncing_user_ids
- )
- updates = []
- time_now_ms = self.clock.time_msec()
-
- # For each new user that is syncing check if we need to mark them as
- # being online.
- for new_user_id in syncing_user_ids - prev_syncing_user_ids:
- prev_state = prev_states[new_user_id]
- if prev_state.state == PresenceState.OFFLINE:
- updates.append(prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=time_now_ms,
- last_user_sync_ts=time_now_ms,
- ))
- else:
- updates.append(prev_state.copy_and_replace(
- last_user_sync_ts=time_now_ms,
- ))
-
- # For each user that is still syncing or stopped syncing update the
- # last sync time so that we will correctly apply the grace period when
- # they stop syncing.
- for old_user_id in prev_syncing_user_ids:
- prev_state = prev_states[old_user_id]
- updates.append(prev_state.copy_and_replace(
- last_user_sync_ts=time_now_ms,
- ))
-
- yield self._update_states(updates)
-
- # Update the last updated time for the process. We expire the entries
- # if we don't receive an update in the given timeframe.
- self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
- self.external_process_to_current_syncs[process_id] = syncing_user_ids
-
- @defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
"""Update the syncing users for an external process as a delta.
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 646a95c9fb..6835a7bba2 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -151,10 +151,11 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")
- depth = event.depth
+ token = yield self.store.get_topological_token_for_event(event_id)
+
logger.info(
- "[purge] purging up to depth %i (event_id %s)",
- depth, event_id,
+ "[purge] purging up to token %s (event_id %s)",
+ token, event_id,
)
elif 'purge_up_to_ts' in body:
ts = body['purge_up_to_ts']
@@ -174,7 +175,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
)
)
if room_event_after_stream_ordering:
- (_, depth, _) = room_event_after_stream_ordering
+ token = yield self.store.get_topological_token_for_event(
+ room_event_after_stream_ordering,
+ )
else:
logger.warn(
"[purge] purging events not possible: No event found "
@@ -187,9 +190,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.NOT_FOUND,
)
logger.info(
- "[purge] purging up to depth %i (received_ts %i => "
+ "[purge] purging up to token %d (received_ts %i => "
"stream_ordering %i)",
- depth, ts, stream_ordering,
+ token, ts, stream_ordering,
)
else:
raise SynapseError(
@@ -199,7 +202,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
)
purge_id = yield self.handlers.message_handler.start_purge_history(
- room_id, depth,
+ room_id, token,
delete_local_events=delete_local_events,
)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 05cde96afc..5ebef98c4f 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,7 +33,7 @@ from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id
+from synapse.types import get_domain_from_id, RoomStreamToken
import synapse.metrics
# these are only included to make the type annotations work
@@ -1803,15 +1803,14 @@ class EventsStore(EventsWorkerStore):
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
def purge_history(
- self, room_id, topological_ordering, delete_local_events,
+ self, room_id, token, delete_local_events,
):
"""Deletes room history before a certain point
Args:
room_id (str):
- topological_ordering (int):
- minimum topo ordering to preserve
+ token (str): A topological token to delete events before
delete_local_events (bool):
if True, we will delete local events as well as remote ones
@@ -1821,13 +1820,15 @@ class EventsStore(EventsWorkerStore):
return self.runInteraction(
"purge_history",
- self._purge_history_txn, room_id, topological_ordering,
+ self._purge_history_txn, room_id, token,
delete_local_events,
)
def _purge_history_txn(
- self, txn, room_id, topological_ordering, delete_local_events,
+ self, txn, room_id, token_str, delete_local_events,
):
+ token = RoomStreamToken.parse(token_str)
+
# Tables that should be pruned:
# event_auth
# event_backward_extremities
@@ -1872,6 +1873,13 @@ class EventsStore(EventsWorkerStore):
" ON events_to_purge(should_delete)",
)
+ # We do joins against events_to_purge for e.g. calculating state
+ # groups to purge, etc., so lets make an index.
+ txn.execute(
+ "CREATE INDEX events_to_purge_id"
+ " ON events_to_purge(event_id)",
+ )
+
# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
@@ -1884,7 +1892,7 @@ class EventsStore(EventsWorkerStore):
rows = txn.fetchall()
max_depth = max(row[0] for row in rows)
- if max_depth <= topological_ordering:
+ if max_depth <= token.topological:
# We need to ensure we don't delete all the events from the datanase
# otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties)
@@ -1900,7 +1908,7 @@ class EventsStore(EventsWorkerStore):
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )
- should_delete_params += (room_id, topological_ordering)
+ should_delete_params += (room_id, token.topological)
txn.execute(
"INSERT INTO events_to_purge"
@@ -1923,13 +1931,13 @@ class EventsStore(EventsWorkerStore):
logger.info("[purge] Finding new backward extremities")
# We calculate the new entries for the backward extremeties by finding
- # all events that point to events that are to be purged
+ # events to be purged that are pointed to by events we're not going to
+ # purge.
txn.execute(
"SELECT DISTINCT e.event_id FROM events_to_purge AS e"
" INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
- " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
- " WHERE e2.topological_ordering >= ?",
- (topological_ordering, )
+ " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
+ " WHERE ep2.event_id IS NULL",
)
new_backwards_extrems = txn.fetchall()
@@ -1953,16 +1961,22 @@ class EventsStore(EventsWorkerStore):
# Get all state groups that are only referenced by events that are
# to be deleted.
- txn.execute(
- "SELECT state_group FROM event_to_state_groups"
- " INNER JOIN events USING (event_id)"
- " WHERE state_group IN ("
- " SELECT DISTINCT state_group FROM events_to_purge"
- " INNER JOIN event_to_state_groups USING (event_id)"
- " )"
- " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
- (topological_ordering, )
- )
+ # This works by first getting state groups that we may want to delete,
+ # joining against event_to_state_groups to get events that use that
+ # state group, then left joining against events_to_purge again. Any
+ # state group where the left join produce *no nulls* are referenced
+ # only by events that are going to be purged.
+ txn.execute("""
+ SELECT state_group FROM
+ (
+ SELECT DISTINCT state_group FROM events_to_purge
+ INNER JOIN event_to_state_groups USING (event_id)
+ ) AS sp
+ INNER JOIN event_to_state_groups USING (state_group)
+ LEFT JOIN events_to_purge AS ep USING (event_id)
+ GROUP BY state_group
+ HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+ """)
state_rows = txn.fetchall()
logger.info("[purge] found %i redundant state groups", len(state_rows))
@@ -2109,10 +2123,25 @@ class EventsStore(EventsWorkerStore):
#
# So, let's stick it at the end so that we don't block event
# persistence.
- logger.info("[purge] updating room_depth")
+ #
+ # We do this by calculating the minimum depth of the backwards
+ # extremities. However, the events in event_backward_extremities
+ # are ones we don't have yet so we need to look at the events that
+ # point to it via event_edges table.
+ txn.execute("""
+ SELECT COALESCE(MIN(depth), 0)
+ FROM event_backward_extremities AS eb
+ INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
+ INNER JOIN events AS e ON e.event_id = eg.event_id
+ WHERE eb.room_id = ?
+ """, (room_id,))
+ min_depth, = txn.fetchone()
+
+ logger.info("[purge] updating room_depth to %d", min_depth)
+
txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
- (topological_ordering, room_id,)
+ (min_depth, room_id,)
)
# finally, drop the temp table. this will commit the txn in sqlite,
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 2f95e7e82a..709c69a926 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -297,18 +297,22 @@ class ReceiptsWorkerStore(SQLBaseStore):
if receipt_type != "m.read":
return
- # Returns an ObservableDeferred
+ # Returns either an ObservableDeferred or the raw result
res = self.get_users_with_read_receipts_in_room.cache.get(
room_id, None, update_metrics=False,
)
- if res:
- if isinstance(res, defer.Deferred) and res.called:
+ # first handle the Deferred case
+ if isinstance(res, defer.Deferred):
+ if res.called:
res = res.result
- if user_id in res:
- # We'd only be adding to the set, so no point invalidating if the
- # user is already there
- return
+ else:
+ res = None
+
+ if res and user_id in res:
+ # We'd only be adding to the set, so no point invalidating if the
+ # user is already there
+ return
self.get_users_with_read_receipts_in_room.invalidate((room_id,))
|