summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-18 17:46:04 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-18 17:46:04 +0100
commitef8415adc2bb14fce19bbe454357bef2b701eca2 (patch)
tree313bf881fe0583a5129ed0e472667e6be566b6d4 /synapse/replication
parentfix changelog (diff)
parentFix seven contrib files with Python syntax errors (#5446) (diff)
downloadsynapse-github/dbkr/3pid_verification_logging.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/3pid_verification_logging github/dbkr/3pid_verification_logging dbkr/3pid_verification_logging
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/_base.py10
-rw-r--r--synapse/replication/slave/storage/events.py14
-rw-r--r--synapse/replication/slave/storage/keys.py21
-rw-r--r--synapse/replication/tcp/streams/_base.py1
-rw-r--r--synapse/replication/tcp/streams/events.py11
5 files changed, 31 insertions, 26 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py

index e81456ab2b..0a432a16fa 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -17,11 +17,17 @@ import abc import logging import re +from six import raise_from from six.moves import urllib from twisted.internet import defer -from synapse.api.errors import CodeMessageException, HttpResponseException +from synapse.api.errors import ( + CodeMessageException, + HttpResponseException, + RequestSendFailed, + SynapseError, +) from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import random_string @@ -175,6 +181,8 @@ class ReplicationEndpoint(object): # on the master process that we should send to the client. (And # importantly, not stack traces everywhere) raise e.to_synapse_error() + except RequestSendFailed as e: + raise_from(SynapseError(502, "Failed to talk to master"), e) defer.returnValue(result) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index b457c5563f..a3952506c1 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py
@@ -23,6 +23,7 @@ from synapse.replication.tcp.streams.events import ( from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore +from synapse.storage.relations import RelationsWorkerStore from synapse.storage.roommember import RoomMemberWorkerStore from synapse.storage.signatures import SignatureWorkerStore from synapse.storage.state import StateGroupWorkerStore @@ -52,6 +53,7 @@ class SlavedEventStore(EventFederationWorkerStore, EventsWorkerStore, SignatureWorkerStore, UserErasureWorkerStore, + RelationsWorkerStore, BaseSlavedStore): def __init__(self, db_conn, hs): @@ -89,7 +91,7 @@ class SlavedEventStore(EventFederationWorkerStore, for row in rows: self.invalidate_caches_for_event( -token, row.event_id, row.room_id, row.type, row.state_key, - row.redacts, + row.redacts, row.relates_to, backfilled=True, ) return super(SlavedEventStore, self).process_replication_rows( @@ -102,7 +104,7 @@ class SlavedEventStore(EventFederationWorkerStore, if row.type == EventsStreamEventRow.TypeId: self.invalidate_caches_for_event( token, data.event_id, data.room_id, data.type, data.state_key, - data.redacts, + data.redacts, data.relates_to, backfilled=False, ) elif row.type == EventsStreamCurrentStateRow.TypeId: @@ -114,7 +116,8 @@ class SlavedEventStore(EventFederationWorkerStore, raise Exception("Unknown events stream row type %s" % (row.type, )) def invalidate_caches_for_event(self, stream_ordering, event_id, room_id, - etype, state_key, redacts, backfilled): + etype, state_key, redacts, relates_to, + backfilled): self._invalidate_get_event_cache(event_id) self.get_latest_event_ids_in_room.invalidate((room_id,)) @@ -136,3 +139,8 @@ class SlavedEventStore(EventFederationWorkerStore, state_key, stream_ordering ) self.get_invited_rooms_for_user.invalidate((state_key,)) + + if relates_to: + self.get_relations_for_event.invalidate_many((relates_to,)) + self.get_aggregation_groups_for_event.invalidate_many((relates_to,)) + self.get_applicable_edit.invalidate((relates_to,)) diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index 8032f53fec..cc6f7f009f 100644 --- a/synapse/replication/slave/storage/keys.py +++ b/synapse/replication/slave/storage/keys.py
@@ -13,22 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage import DataStore -from synapse.storage.keys import KeyStore +from synapse.storage import KeyStore -from ._base import BaseSlavedStore, __func__ +# KeyStore isn't really safe to use from a worker, but for now we do so and hope that +# the races it creates aren't too bad. - -class SlavedKeyStore(BaseSlavedStore): - _get_server_verify_key = KeyStore.__dict__[ - "_get_server_verify_key" - ] - - get_server_verify_keys = __func__(DataStore.get_server_verify_keys) - store_server_verify_key = __func__(DataStore.store_server_verify_key) - - get_server_certificate = __func__(DataStore.get_server_certificate) - store_server_certificate = __func__(DataStore.store_server_certificate) - - get_server_keys_json = __func__(DataStore.get_server_keys_json) - store_server_keys_json = __func__(DataStore.store_server_keys_json) +SlavedKeyStore = KeyStore diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 8971a6a22e..b6ce7a7bee 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -32,6 +32,7 @@ BackfillStreamRow = namedtuple("BackfillStreamRow", ( "type", # str "state_key", # str, optional "redacts", # str, optional + "relates_to", # str, optional )) PresenceStreamRow = namedtuple("PresenceStreamRow", ( "user_id", # str diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index e0f6e29248..f1290d022a 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -80,11 +80,12 @@ class BaseEventsStreamRow(object): class EventsStreamEventRow(BaseEventsStreamRow): TypeId = "ev" - event_id = attr.ib() # str - room_id = attr.ib() # str - type = attr.ib() # str - state_key = attr.ib() # str, optional - redacts = attr.ib() # str, optional + event_id = attr.ib() # str + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str, optional + redacts = attr.ib() # str, optional + relates_to = attr.ib() # str, optional @attr.s(slots=True, frozen=True)