From efe3c7977a3dc9c4308388f71e82b837df7d09b4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2019 16:59:21 +0100 Subject: Add simple send_relation API and track in DB --- synapse/storage/events.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7a7f841c6c..6802bf42ce 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1351,6 +1351,8 @@ class EventsStore( # Insert into the event_search table. self._store_guest_access_txn(txn, event) + self._handle_event_relations(txn, event) + # Insert into the room_memberships table. self._store_room_members_txn( txn, -- cgit 1.5.1 From b5c62c6b2643a138956af0b04521540c270a3e94 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 May 2019 10:18:53 +0100 Subject: Fix relations in worker mode --- synapse/replication/slave/storage/events.py | 13 ++++++++++--- synapse/replication/tcp/streams/_base.py | 1 + synapse/replication/tcp/streams/events.py | 11 ++++++----- synapse/storage/events.py | 12 ++++++++---- synapse/storage/relations.py | 4 +++- 5 files changed, 28 insertions(+), 13 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index b457c5563f..797450bc66 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -23,6 +23,7 @@ from synapse.replication.tcp.streams.events import ( from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore +from synapse.storage.relations import RelationsWorkerStore from synapse.storage.roommember import RoomMemberWorkerStore from synapse.storage.signatures import SignatureWorkerStore from synapse.storage.state import StateGroupWorkerStore @@ -52,6 +53,7 @@ class SlavedEventStore(EventFederationWorkerStore, EventsWorkerStore, SignatureWorkerStore, UserErasureWorkerStore, + RelationsWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): @@ -89,7 +91,7 @@ class SlavedEventStore(EventFederationWorkerStore, for row in rows: self.invalidate_caches_for_event( -token, row.event_id, row.room_id, row.type, row.state_key, - row.redacts, + row.redacts, row.relates_to, backfilled=True, ) return super(SlavedEventStore, self).process_replication_rows( @@ -102,7 +104,7 @@ class SlavedEventStore(EventFederationWorkerStore, if row.type == EventsStreamEventRow.TypeId: self.invalidate_caches_for_event( token, data.event_id, data.room_id, data.type, data.state_key, - data.redacts, + data.redacts, data.relates_to, backfilled=False, ) elif row.type == EventsStreamCurrentStateRow.TypeId: @@ -114,7 +116,8 @@ class SlavedEventStore(EventFederationWorkerStore, raise Exception("Unknown events stream row type %s" % (row.type, )) def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, - etype, state_key, redacts, backfilled): + etype, state_key, redacts, relates_to, + backfilled): self._invalidate_get_event_cache(event_id) self.get_latest_event_ids_in_room.invalidate((room_id,)) @@ -136,3 +139,7 @@ class SlavedEventStore(EventFederationWorkerStore, state_key, stream_ordering ) self.get_invited_rooms_for_user.invalidate((state_key,)) + + if relates_to: + self.get_relations_for_event.invalidate_many((relates_to,)) + self.get_aggregation_groups_for_event.invalidate_many((relates_to,)) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 8971a6a22e..b6ce7a7bee 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -32,6 +32,7 @@ BackfillStreamRow = namedtuple("BackfillStreamRow", ( "type", # str "state_key", # str, optional "redacts", # str, optional + "relates_to", # str, optional )) PresenceStreamRow = namedtuple("PresenceStreamRow", ( "user_id", # str diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index e0f6e29248..f1290d022a 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -80,11 +80,12 @@ class BaseEventsStreamRow(object): class EventsStreamEventRow(BaseEventsStreamRow): TypeId = "ev" - event_id = attr.ib() # str - room_id = attr.ib() # str - type = attr.ib() # str - state_key = attr.ib() # str, optional - redacts = attr.ib() # str, optional + event_id = attr.ib() # str + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str, optional + redacts = attr.ib() # str, optional + relates_to = attr.ib() # str, optional @attr.s(slots=True, frozen=True) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6802bf42ce..b025ebc926 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1657,10 +1657,11 @@ class EventsStore( def get_all_new_forward_event_rows(txn): sql = ( "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts" + " state_key, redacts, relates_to_id" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" " WHERE ? < stream_ordering AND stream_ordering <= ?" " ORDER BY stream_ordering ASC" " LIMIT ?" @@ -1675,11 +1676,12 @@ class EventsStore( sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts" + " state_key, redacts, relates_to_id" " FROM events AS e" " INNER JOIN ex_outlier_stream USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" " WHERE ? < event_stream_ordering" " AND event_stream_ordering <= ?" " ORDER BY event_stream_ordering DESC" @@ -1700,10 +1702,11 @@ class EventsStore( def get_all_new_backfill_event_rows(txn): sql = ( "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts" + " state_key, redacts, relates_to_id" " FROM events AS e" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" " WHERE ? > stream_ordering AND stream_ordering >= ?" " ORDER BY stream_ordering ASC" " LIMIT ?" @@ -1718,11 +1721,12 @@ class EventsStore( sql = ( "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts" + " state_key, redacts, relates_to_id" " FROM events AS e" " INNER JOIN ex_outlier_stream USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" " WHERE ? > event_stream_ordering" " AND event_stream_ordering >= ?" " ORDER BY event_stream_ordering DESC" diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index 1fd3d4fafc..732418ec65 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -109,7 +109,7 @@ class AggregationPaginationToken(object): return "%d-%d" % (self.count, self.stream) -class RelationsStore(SQLBaseStore): +class RelationsWorkerStore(SQLBaseStore): @cached(tree=True) def get_relations_for_event( self, @@ -318,6 +318,8 @@ class RelationsStore(SQLBaseStore): "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn ) + +class RelationsStore(RelationsWorkerStore): def _handle_event_relations(self, txn, event): """Handles inserting relation data during peristence of events -- cgit 1.5.1 From 935af0da380f39ba284b78054270331bdbad7712 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 20 May 2019 10:13:05 +0100 Subject: Correctly update aggregation counts after redaction --- synapse/storage/events.py | 3 +++ synapse/storage/relations.py | 17 +++++++++++++ tests/rest/client/v2_alpha/test_relations.py | 37 ++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b025ebc926..881d6d0126 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1325,6 +1325,9 @@ class EventsStore( txn, event.room_id, event.redacts ) + # Remove from relations table. + self._handle_redaction(txn, event.redacts) + # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index 6e216066ab..63e6185ee3 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -415,3 +415,20 @@ class RelationsStore(RelationsWorkerStore): if rel_type == RelationTypes.REPLACES: txn.call_after(self.get_applicable_edit.invalidate, (parent_id,)) + + def _handle_redaction(self, txn, redacted_event_id): + """Handles receiving a redaction and checking whether we need to remove + any redacted relations from the database. + + Args: + txn + redacted_event_id (str): The event that was redacted. + """ + + self._simple_delete_txn( + txn, + table="event_relations", + keyvalues={ + "event_id": redacted_event_id, + } + ) diff --git a/tests/rest/client/v2_alpha/test_relations.py b/tests/rest/client/v2_alpha/test_relations.py index cd965167f8..3737cdc396 100644 --- a/tests/rest/client/v2_alpha/test_relations.py +++ b/tests/rest/client/v2_alpha/test_relations.py @@ -320,6 +320,43 @@ class RelationsTestCase(unittest.HomeserverTestCase): }, ) + def test_aggregation_redactions(self): + """Test that annotations get correctly aggregated after a redactions. + """ + + channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "a") + self.assertEquals(200, channel.code, channel.json_body) + to_redact_event_id = channel.json_body["event_id"] + + channel = self._send_relation( + RelationTypes.ANNOTATION, "m.reaction", "a", access_token=self.user2_token + ) + self.assertEquals(200, channel.code, channel.json_body) + + # Now lets redact the 'a' reaction + request, channel = self.make_request( + "POST", + "/_matrix/client/r0/rooms/%s/redact/%s" % (self.room, to_redact_event_id), + access_token=self.user_token, + content={}, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.json_body) + + request, channel = self.make_request( + "GET", + "/_matrix/client/unstable/rooms/%s/aggregations/%s" + % (self.room, self.parent_id), + access_token=self.user_token, + ) + self.render(request) + self.assertEquals(200, channel.code, channel.json_body) + + self.assertEquals( + channel.json_body, + {"chunk": [{"type": "m.reaction", "key": "a", "count": 1}]}, + ) + def test_aggregation_must_be_annotation(self): """Test that aggregations must be annotations. """ -- cgit 1.5.1