summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2018-11-19 12:54:29 -0600
committerAmber Brown <hawkowl@atleastfornow.net>2018-11-19 12:54:29 -0600
commit678ad155a2eecc344d93c6849f7bd887bcc42191 (patch)
treef591ac30afc0e589c483421cbf8061fb56e27504 /synapse/storage/events.py
parentMerge branch 'release-v0.33.8' (diff)
parenttowncrier (diff)
downloadsynapse-678ad155a2eecc344d93c6849f7bd887bcc42191.tar.xz
Merge tag 'v0.33.9' 0.34.0rc2
Features
--------

- Include flags to optionally add `m.login.terms` to the registration flow when consent tracking is enabled.
([\#4004](https://github.com/matrix-org/synapse/issues/4004), [\#4133](https://github.com/matrix-org/synapse/issues/4133),
[\#4142](https://github.com/matrix-org/synapse/issues/4142), [\#4184](https://github.com/matrix-org/synapse/issues/4184))
- Support for replacing rooms with new ones ([\#4091](https://github.com/matrix-org/synapse/issues/4091), [\#4099](https://github.com/matrix-org/synapse/issues/4099),
[\#4100](https://github.com/matrix-org/synapse/issues/4100), [\#4101](https://github.com/matrix-org/synapse/issues/4101))

Bugfixes
--------

- Fix exceptions when using the email mailer on Python 3. ([\#4095](https://github.com/matrix-org/synapse/issues/4095))
- Fix e2e key backup with more than 9 backup versions ([\#4113](https://github.com/matrix-org/synapse/issues/4113))
- Searches that request profile info now no longer fail with a 500. ([\#4122](https://github.com/matrix-org/synapse/issues/4122))
- fix return code of empty key backups ([\#4123](https://github.com/matrix-org/synapse/issues/4123))
- If the typing stream ID goes backwards (as on a worker when the master restarts), the worker's typing handler will no longer erroneously report rooms containing new
typing events. ([\#4127](https://github.com/matrix-org/synapse/issues/4127))
- Fix table lock of device_lists_remote_cache which could freeze the application ([\#4132](https://github.com/matrix-org/synapse/issues/4132))
- Fix exception when using state res v2 algorithm ([\#4135](https://github.com/matrix-org/synapse/issues/4135))
- Generating the user consent URI no longer fails on Python 3. ([\#4140](https://github.com/matrix-org/synapse/issues/4140),
[\#4163](https://github.com/matrix-org/synapse/issues/4163))
- Loading URL previews from the DB cache on Postgres will no longer cause Unicode type errors when responding to the request, and URL previews will no longer fail if
the remote server returns a Content-Type header with the chartype in quotes. ([\#4157](https://github.com/matrix-org/synapse/issues/4157))
- The hash_password script now works on Python 3. ([\#4161](https://github.com/matrix-org/synapse/issues/4161))
- Fix noop checks when updating device keys, reducing spurious device list update notifications. ([\#4164](https://github.com/matrix-org/synapse/issues/4164))

Deprecations and Removals
-------------------------

- The disused and un-specced identicon generator has been removed. ([\#4106](https://github.com/matrix-org/synapse/issues/4106))
- The obsolete and non-functional /pull federation endpoint has been removed. ([\#4118](https://github.com/matrix-org/synapse/issues/4118))
- The deprecated v1 key exchange endpoints have been removed. ([\#4119](https://github.com/matrix-org/synapse/issues/4119))
- Synapse will no longer fetch keys using the fallback deprecated v1 key exchange method and will now always use v2.
([\#4120](https://github.com/matrix-org/synapse/issues/4120))

Internal Changes
----------------

- Fix build of Docker image with docker-compose ([\#3778](https://github.com/matrix-org/synapse/issues/3778))
- Delete unreferenced state groups during history purge ([\#4006](https://github.com/matrix-org/synapse/issues/4006))
- The "Received rdata" log messages on workers is now logged at DEBUG, not INFO. ([\#4108](https://github.com/matrix-org/synapse/issues/4108))
- Reduce replication traffic for device lists ([\#4109](https://github.com/matrix-org/synapse/issues/4109))
- Fix `synapse_replication_tcp_protocol_*_commands` metric label to be full command name, rather than just the first character
([\#4110](https://github.com/matrix-org/synapse/issues/4110))
- Log some bits about room creation ([\#4121](https://github.com/matrix-org/synapse/issues/4121))
- Fix `tox` failure on old systems ([\#4124](https://github.com/matrix-org/synapse/issues/4124))
- Add STATE_V2_TEST room version ([\#4128](https://github.com/matrix-org/synapse/issues/4128))
- Clean up event accesses and tests ([\#4137](https://github.com/matrix-org/synapse/issues/4137))
- The default logging config will now set an explicit log file encoding of UTF-8. ([\#4138](https://github.com/matrix-org/synapse/issues/4138))
- Add helpers functions for getting prev and auth events of an event ([\#4139](https://github.com/matrix-org/synapse/issues/4139))
- Add some tests for the HTTP pusher. ([\#4149](https://github.com/matrix-org/synapse/issues/4149))
- add purge_history.sh and purge_remote_media.sh scripts to contrib/ ([\#4155](https://github.com/matrix-org/synapse/issues/4155))
- HTTP tests have been refactored to contain less boilerplate. ([\#4156](https://github.com/matrix-org/synapse/issues/4156))
- Drop incoming events from federation for unknown rooms ([\#4165](https://github.com/matrix-org/synapse/issues/4165))
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py165
1 files changed, 114 insertions, 51 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8881b009df..2047110b1d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,6 +38,7 @@ from synapse.state import StateResolutionStore
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import batch_iter
 from synapse.util.async_helpers import ObservableDeferred
@@ -205,7 +206,8 @@ def _retry_on_integrity_error(func):
 
 # inherits from EventFederationStore so that we can call _update_backward_extremities
 # and _handle_mult_prev_events (though arguably those could both be moved in here)
-class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
+class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
+                  BackgroundUpdateStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
@@ -414,7 +416,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                             )
                             if len_1:
                                 all_single_prev_not_state = all(
-                                    len(event.prev_events) == 1
+                                    len(event.prev_event_ids()) == 1
                                     and not event.is_state()
                                     for event, ctx in ev_ctx_rm
                                 )
@@ -438,7 +440,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                                 # guess this by looking at the prev_events and checking
                                 # if they match the current forward extremities.
                                 for ev, _ in ev_ctx_rm:
-                                    prev_event_ids = set(e for e, _ in ev.prev_events)
+                                    prev_event_ids = set(ev.prev_event_ids())
                                     if latest_event_ids == prev_event_ids:
                                         state_delta_reuse_delta_counter.inc()
                                         break
@@ -549,7 +551,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         result.difference_update(
             e_id
             for event in new_events
-            for e_id, _ in event.prev_events
+            for e_id in event.prev_event_ids()
         )
 
         # Finally, remove any events which are prev_events of any existing events.
@@ -867,7 +869,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                     "auth_id": auth_id,
                 }
                 for event, _ in events_and_contexts
-                for auth_id, _ in event.auth_events
+                for auth_id in event.auth_event_ids()
                 if event.is_state()
             ],
         )
@@ -2034,55 +2036,37 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
 
         logger.info("[purge] finding redundant state groups")
 
-        # Get all state groups that are only referenced by events that are
-        # to be deleted.
-        # This works by first getting state groups that we may want to delete,
-        # joining against event_to_state_groups to get events that use that
-        # state group, then left joining against events_to_purge again. Any
-        # state group where the left join produce *no nulls* are referenced
-        # only by events that are going to be purged.
+        # Get all state groups that are referenced by events that are to be
+        # deleted. We then go and check if they are referenced by other events
+        # or state groups, and if not we delete them.
         txn.execute("""
-            SELECT state_group FROM
-            (
-                SELECT DISTINCT state_group FROM events_to_purge
-                INNER JOIN event_to_state_groups USING (event_id)
-            ) AS sp
-            INNER JOIN event_to_state_groups USING (state_group)
-            LEFT JOIN events_to_purge AS ep USING (event_id)
-            GROUP BY state_group
-            HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+            SELECT DISTINCT state_group FROM events_to_purge
+            INNER JOIN event_to_state_groups USING (event_id)
         """)
 
-        state_rows = txn.fetchall()
-        logger.info("[purge] found %i redundant state groups", len(state_rows))
-
-        # make a set of the redundant state groups, so that we can look them up
-        # efficiently
-        state_groups_to_delete = set([sg for sg, in state_rows])
-
-        # Now we get all the state groups that rely on these state groups
-        logger.info("[purge] finding state groups which depend on redundant"
-                    " state groups")
-        remaining_state_groups = []
-        for i in range(0, len(state_rows), 100):
-            chunk = [sg for sg, in state_rows[i:i + 100]]
-            # look for state groups whose prev_state_group is one we are about
-            # to delete
-            rows = self._simple_select_many_txn(
-                txn,
-                table="state_group_edges",
-                column="prev_state_group",
-                iterable=chunk,
-                retcols=["state_group"],
-                keyvalues={},
-            )
-            remaining_state_groups.extend(
-                row["state_group"] for row in rows
+        referenced_state_groups = set(sg for sg, in txn)
+        logger.info(
+            "[purge] found %i referenced state groups",
+            len(referenced_state_groups),
+        )
 
-                # exclude state groups we are about to delete: no point in
-                # updating them
-                if row["state_group"] not in state_groups_to_delete
+        logger.info("[purge] finding state groups that can be deleted")
+
+        state_groups_to_delete, remaining_state_groups = (
+            self._find_unreferenced_groups_during_purge(
+                txn, referenced_state_groups,
             )
+        )
+
+        logger.info(
+            "[purge] found %i state groups to delete",
+            len(state_groups_to_delete),
+        )
+
+        logger.info(
+            "[purge] de-delta-ing %i remaining state groups",
+            len(remaining_state_groups),
+        )
 
         # Now we turn the state groups that reference to-be-deleted state
         # groups to non delta versions.
@@ -2127,11 +2111,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         logger.info("[purge] removing redundant state groups")
         txn.executemany(
             "DELETE FROM state_groups_state WHERE state_group = ?",
-            state_rows
+            ((sg,) for sg in state_groups_to_delete),
         )
         txn.executemany(
             "DELETE FROM state_groups WHERE id = ?",
-            state_rows
+            ((sg,) for sg in state_groups_to_delete),
         )
 
         logger.info("[purge] removing events from event_to_state_groups")
@@ -2227,6 +2211,85 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
 
         logger.info("[purge] done")
 
+    def _find_unreferenced_groups_during_purge(self, txn, state_groups):
+        """Used when purging history to figure out which state groups can be
+        deleted and which need to be de-delta'ed (due to one of its prev groups
+        being scheduled for deletion).
+
+        Args:
+            txn
+            state_groups (set[int]): Set of state groups referenced by events
+                that are going to be deleted.
+
+        Returns:
+            tuple[set[int], set[int]]: The set of state groups that can be
+            deleted and the set of state groups that need to be de-delta'ed
+        """
+        # Graph of state group -> previous group
+        graph = {}
+
+        # Set of events that we have found to be referenced by events
+        referenced_groups = set()
+
+        # Set of state groups we've already seen
+        state_groups_seen = set(state_groups)
+
+        # Set of state groups to handle next.
+        next_to_search = set(state_groups)
+        while next_to_search:
+            # We bound size of groups we're looking up at once, to stop the
+            # SQL query getting too big
+            if len(next_to_search) < 100:
+                current_search = next_to_search
+                next_to_search = set()
+            else:
+                current_search = set(itertools.islice(next_to_search, 100))
+                next_to_search -= current_search
+
+            # Check if state groups are referenced
+            sql = """
+                SELECT DISTINCT state_group FROM event_to_state_groups
+                LEFT JOIN events_to_purge AS ep USING (event_id)
+                WHERE state_group IN (%s) AND ep.event_id IS NULL
+            """ % (",".join("?" for _ in current_search),)
+            txn.execute(sql, list(current_search))
+
+            referenced = set(sg for sg, in txn)
+            referenced_groups |= referenced
+
+            # We don't continue iterating up the state group graphs for state
+            # groups that are referenced.
+            current_search -= referenced
+
+            rows = self._simple_select_many_txn(
+                txn,
+                table="state_group_edges",
+                column="prev_state_group",
+                iterable=current_search,
+                keyvalues={},
+                retcols=("prev_state_group", "state_group",),
+            )
+
+            prevs = set(row["state_group"] for row in rows)
+            # We don't bother re-handling groups we've already seen
+            prevs -= state_groups_seen
+            next_to_search |= prevs
+            state_groups_seen |= prevs
+
+            for row in rows:
+                # Note: Each state group can have at most one prev group
+                graph[row["state_group"]] = row["prev_state_group"]
+
+        to_delete = state_groups_seen - referenced_groups
+
+        to_dedelta = set()
+        for sg in referenced_groups:
+            prev_sg = graph.get(sg)
+            if prev_sg and prev_sg in to_delete:
+                to_dedelta.add(sg)
+
+        return to_delete, to_dedelta
+
     @defer.inlineCallbacks
     def is_event_after(self, event_id1, event_id2):
         """Returns True if event_id1 is after event_id2 in the stream