diff options
author | Erik Johnston <erik@matrix.org> | 2018-02-15 13:03:05 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-02-15 13:03:05 +0000 |
commit | d66afef01e826864d16de5e9499d69ddd1306c6c (patch) | |
tree | 77e48d2733851fd494af09a560447602b00f79cd | |
parent | Avoid doing presence updates on replication reconnect (diff) | |
parent | Merge pull request #2872 from matrix-org/erikj/event_worker_dont_log (diff) | |
download | synapse-d66afef01e826864d16de5e9499d69ddd1306c6c.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
-rw-r--r-- | synapse/replication/http/send_event.py | 11 | ||||
-rw-r--r-- | synapse/storage/state.py | 82 |
2 files changed, 51 insertions, 42 deletions
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index ff9b9d2f10..7b21a2213c 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -15,6 +15,7 @@ from twisted.internet import defer +from synapse.api.errors import SynapseError, MatrixCodeMessageException from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext from synapse.http.servlet import RestServlet, parse_json_object_from_request @@ -27,6 +28,7 @@ import re logger = logging.getLogger(__name__) +@defer.inlineCallbacks def send_event_to_master(client, host, port, requester, event, context): """Send event to be handled on the master @@ -48,7 +50,14 @@ def send_event_to_master(client, host, port, requester, event, context): "requester": requester.serialize(), } - return client.post_json_get_json(uri, payload) + try: + result = yield client.post_json_get_json(uri, payload) + except MatrixCodeMessageException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise SynapseError(e.code, e.msg, e.errcode) + defer.returnValue(result) class ReplicationSendEventRestServlet(RestServlet): diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 91a66e07c9..e1845b07ad 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -654,6 +654,47 @@ class StateGroupWorkerStore(SQLBaseStore): return self.runInteraction("store_state_group", _store_state_group_txn) + def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """) + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): """ Keeps track of the state at a given event. @@ -728,47 +769,6 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): (event_id,), state_group_id ) - def _count_state_group_hops_txn(self, txn, state_group): - """Given a state group, count how many hops there are in the tree. - - This is used to ensure the delta chains don't get too long. - """ - if isinstance(self.database_engine, PostgresEngine): - sql = (""" - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT count(*) FROM state; - """) - - txn.execute(sql, (state_group,)) - row = txn.fetchone() - if row and row[0]: - return row[0] - else: - return 0 - else: - # We don't use WITH RECURSIVE on sqlite3 as there are distributions - # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) - next_group = state_group - count = 0 - - while next_group: - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - if next_group: - count += 1 - - return count - @defer.inlineCallbacks def _background_deduplicate_state(self, progress, batch_size): """This background update will slowly deduplicate state by reencoding |