From 220a733d7379be88514f7681ec37388755d4e612 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Jun 2019 09:56:45 +0100 Subject: Fix handling of failures when calling /event_auth. When processing an incoming event over federation, we may try and resolve any unexpected differences in auth events. This is a non-essential process and so should not stop the processing of the event if it fails (e.g. due to the remote disappearing or not implementing the necessary endpoints). Fixes #3330 --- synapse/handlers/federation.py | 50 ++++++++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 12 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cf4fad7de0..fa735efedd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -35,6 +35,7 @@ from synapse.api.errors import ( CodeMessageException, FederationDeniedError, FederationError, + RequestSendFailed, StoreError, SynapseError, ) @@ -2027,9 +2028,15 @@ class FederationHandler(BaseHandler): """ room_version = yield self.store.get_room_version(event.room_id) - yield self._update_auth_events_and_context_for_auth( - origin, event, context, auth_events - ) + try: + yield self._update_auth_events_and_context_for_auth( + origin, event, context, auth_events + ) + except Exception: + # We don't really mind if the above fails, so lets not fail + # processing if it does. + logger.exception("Failed to call _update_auth_events_and_context_for_auth") + try: self.auth.check(room_version, event, auth_events=auth_events) except AuthError as e: @@ -2042,6 +2049,15 @@ class FederationHandler(BaseHandler): ): """Helper for do_auth. See there for docs. + Checks whether a given event has the expected auth events. If it + doesn't then we talk to the remote server to compare state to see if + we can come to a consensus (e.g. if one server missed some valid + state). + + This attempts to resovle any potential divergence of state between + servers, but is not essential and so failures should not block further + processing of the event. + Args: origin (str): event (synapse.events.EventBase): @@ -2088,9 +2104,14 @@ class FederationHandler(BaseHandler): missing_auth, ) try: - remote_auth_chain = yield self.federation_client.get_event_auth( - origin, event.room_id, event.event_id - ) + try: + remote_auth_chain = yield self.federation_client.get_event_auth( + origin, event.room_id, event.event_id + ) + except RequestSendFailed: + # The other side isn't around or doesn't implement the + # endpoint, so lets just bail out. + return seen_remotes = yield self.store.have_seen_events( [e.event_id for e in remote_auth_chain] @@ -2236,12 +2257,17 @@ class FederationHandler(BaseHandler): try: # 2. Get remote difference. - result = yield self.federation_client.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, - ) + try: + result = yield self.federation_client.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) + except RequestSendFailed: + # The other side isn't around or doesn't implement the + # endpoint, so lets just bail out. + return seen_remotes = yield self.store.have_seen_events( [e.event_id for e in result["auth_chain"]] -- cgit 1.5.1 From bc3d6b918b62c3dd6ce96eba638cf4601126e2f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Jun 2019 11:31:27 +0100 Subject: Add logging when request fails and clarify we ignore errors. --- synapse/handlers/federation.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fa735efedd..ac5ca79143 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2034,8 +2034,14 @@ class FederationHandler(BaseHandler): ) except Exception: # We don't really mind if the above fails, so lets not fail - # processing if it does. - logger.exception("Failed to call _update_auth_events_and_context_for_auth") + # processing if it does. However, it really shouldn't fail so + # let's still log as an exception since we'll still want to fix + # any bugs. + logger.exception( + "Failed to double check auth events for %s with remote. " + "Ignoring failure and continuing processing of event.", + event.event_id, + ) try: self.auth.check(room_version, event, auth_events=auth_events) @@ -2108,9 +2114,10 @@ class FederationHandler(BaseHandler): remote_auth_chain = yield self.federation_client.get_event_auth( origin, event.room_id, event.event_id ) - except RequestSendFailed: + except RequestSendFailed as e: # The other side isn't around or doesn't implement the # endpoint, so lets just bail out. + logger.info("Failed to get event auth from remote: %s", e) return seen_remotes = yield self.store.have_seen_events( @@ -2264,9 +2271,10 @@ class FederationHandler(BaseHandler): event.event_id, local_auth_chain, ) - except RequestSendFailed: + except RequestSendFailed as e: # The other side isn't around or doesn't implement the # endpoint, so lets just bail out. + logger.info("Failed to query auth from remote: %s", e) return seen_remotes = yield self.store.have_seen_events( -- cgit 1.5.1 From 75538813fcd0403ec8915484a813b99e6eb256c6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Jun 2019 15:45:46 +0100 Subject: Fix background updates to handle redactions/rejections (#5352) * Fix background updates to handle redactions/rejections In background updates based on current state delta stream we need to handle that we may not have all the events (or at least that `get_events` may raise an exception). --- changelog.d/5352.bugfix | 1 + synapse/handlers/presence.py | 11 ++++--- synapse/handlers/stats.py | 18 ++++++++---- synapse/storage/events_worker.py | 37 ++++++++++++++++++++++++ tests/handlers/test_stats.py | 62 ++++++++++++++++++++++++++++++++++++++-- 5 files changed, 117 insertions(+), 12 deletions(-) create mode 100644 changelog.d/5352.bugfix (limited to 'synapse/handlers') diff --git a/changelog.d/5352.bugfix b/changelog.d/5352.bugfix new file mode 100644 index 0000000000..2ffefe5a68 --- /dev/null +++ b/changelog.d/5352.bugfix @@ -0,0 +1 @@ +Fix room stats and presence background updates to correctly handle missing events. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6209858bbb..e49c8203ef 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -828,14 +828,17 @@ class PresenceHandler(object): # joins. continue - event = yield self.store.get_event(event_id) - if event.content.get("membership") != Membership.JOIN: + event = yield self.store.get_event(event_id, allow_none=True) + if not event or event.content.get("membership") != Membership.JOIN: # We only care about joins continue if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id) - if prev_event.content.get("membership") == Membership.JOIN: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + if ( + prev_event + and prev_event.content.get("membership") == Membership.JOIN + ): # Ignore changes to join events. continue diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 0e92b405ba..7ad16c8566 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -115,6 +115,7 @@ class StatsHandler(StateDeltasHandler): event_id = delta["event_id"] stream_id = delta["stream_id"] prev_event_id = delta["prev_event_id"] + stream_pos = delta["stream_id"] logger.debug("Handling: %r %r, %s", typ, state_key, event_id) @@ -136,10 +137,15 @@ class StatsHandler(StateDeltasHandler): event_content = {} if event_id is not None: - event_content = (yield self.store.get_event(event_id)).content or {} + event = yield self.store.get_event(event_id, allow_none=True) + if event: + event_content = event.content or {} + + # We use stream_pos here rather than fetch by event_id as event_id + # may be None + now = yield self.store.get_received_ts_by_stream_pos(stream_pos) # quantise time to the nearest bucket - now = yield self.store.get_received_ts(event_id) now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size if typ == EventTypes.Member: @@ -149,9 +155,11 @@ class StatsHandler(StateDeltasHandler): # compare them. prev_event_content = {} if prev_event_id is not None: - prev_event_content = ( - yield self.store.get_event(prev_event_id) - ).content + prev_event = yield self.store.get_event( + prev_event_id, allow_none=True, + ) + if prev_event: + prev_event_content = prev_event.content membership = event_content.get("membership", Membership.LEAVE) prev_membership = prev_event_content.get("membership", Membership.LEAVE) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 1782428048..cc7df5cf14 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -78,6 +78,43 @@ class EventsWorkerStore(SQLBaseStore): desc="get_received_ts", ) + def get_received_ts_by_stream_pos(self, stream_ordering): + """Given a stream ordering get an approximate timestamp of when it + happened. + + This is done by simply taking the received ts of the first event that + has a stream ordering greater than or equal to the given stream pos. + If none exists returns the current time, on the assumption that it must + have happened recently. + + Args: + stream_ordering (int) + + Returns: + Deferred[int] + """ + + def _get_approximate_received_ts_txn(txn): + sql = """ + SELECT received_ts FROM events + WHERE stream_ordering >= ? + LIMIT 1 + """ + + txn.execute(sql, (stream_ordering,)) + row = txn.fetchone() + if row and row[0]: + ts = row[0] + else: + ts = self.clock.time_msec() + + return ts + + return self.runInteraction( + "get_approximate_received_ts", + _get_approximate_received_ts_txn, + ) + @defer.inlineCallbacks def get_event( self, diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py index 249aba3d59..2710c991cf 100644 --- a/tests/handlers/test_stats.py +++ b/tests/handlers/test_stats.py @@ -204,7 +204,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): "a2": {"membership": "not a real thing"}, } - def get_event(event_id): + def get_event(event_id, allow_none=True): m = Mock() m.content = events[event_id] d = defer.Deferred() @@ -224,7 +224,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): "room_id": "room", "event_id": "a1", "prev_event_id": "a2", - "stream_id": "bleb", + "stream_id": 60, } ] @@ -241,7 +241,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): "room_id": "room", "event_id": "a2", "prev_event_id": "a1", - "stream_id": "bleb", + "stream_id": 100, } ] @@ -249,3 +249,59 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.assertEqual( f.value.args[0], "'not a real thing' is not a valid membership" ) + + def test_redacted_prev_event(self): + """ + If the prev_event does not exist, then it is assumed to be a LEAVE. + """ + u1 = self.register_user("u1", "pass") + u1_token = self.login("u1", "pass") + + room_1 = self.helper.create_room_as(u1, tok=u1_token) + + # Do the initial population of the user directory via the background update + self._add_background_updates() + + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + events = { + "a1": None, + "a2": {"membership": Membership.JOIN}, + } + + def get_event(event_id, allow_none=True): + if events.get(event_id): + m = Mock() + m.content = events[event_id] + else: + m = None + d = defer.Deferred() + self.reactor.callLater(0.0, d.callback, m) + return d + + def get_received_ts(event_id): + return defer.succeed(1) + + self.store.get_received_ts = get_received_ts + self.store.get_event = get_event + + deltas = [ + { + "type": EventTypes.Member, + "state_key": "some_user:test", + "room_id": room_1, + "event_id": "a2", + "prev_event_id": "a1", + "stream_id": 100, + } + ] + + # Handle our fake deltas, which has a user going from LEAVE -> JOIN. + self.get_success(self.handler._handle_deltas(deltas)) + + # One delta, with two joined members -- the room creator, and our fake + # user. + r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) + self.assertEqual(len(r), 1) + self.assertEqual(r[0]["joined_members"], 2) -- cgit 1.5.1