diff options
author | Erik Johnston <erik@matrix.org> | 2019-06-05 15:45:46 +0100 |
---|---|---|
committer | Amber Brown <hawkowl@atleastfornow.net> | 2019-06-06 00:45:46 +1000 |
commit | 75538813fcd0403ec8915484a813b99e6eb256c6 (patch) | |
tree | 89735486b44fb3c5572f6bc38524dd231eed6233 | |
parent | Fix notes about well-known and acme (#5357) (diff) | |
download | synapse-75538813fcd0403ec8915484a813b99e6eb256c6.tar.xz |
Fix background updates to handle redactions/rejections (#5352)
* Fix background updates to handle redactions/rejections In background updates based on current state delta stream we need to handle that we may not have all the events (or at least that `get_events` may raise an exception).
-rw-r--r-- | changelog.d/5352.bugfix | 1 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 11 | ||||
-rw-r--r-- | synapse/handlers/stats.py | 18 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 37 | ||||
-rw-r--r-- | tests/handlers/test_stats.py | 62 |
5 files changed, 117 insertions, 12 deletions
diff --git a/changelog.d/5352.bugfix b/changelog.d/5352.bugfix new file mode 100644 index 0000000000..2ffefe5a68 --- /dev/null +++ b/changelog.d/5352.bugfix @@ -0,0 +1 @@ +Fix room stats and presence background updates to correctly handle missing events. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6209858bbb..e49c8203ef 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -828,14 +828,17 @@ class PresenceHandler(object): # joins. continue - event = yield self.store.get_event(event_id) - if event.content.get("membership") != Membership.JOIN: + event = yield self.store.get_event(event_id, allow_none=True) + if not event or event.content.get("membership") != Membership.JOIN: # We only care about joins continue if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id) - if prev_event.content.get("membership") == Membership.JOIN: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + if ( + prev_event + and prev_event.content.get("membership") == Membership.JOIN + ): # Ignore changes to join events. continue diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 0e92b405ba..7ad16c8566 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -115,6 +115,7 @@ class StatsHandler(StateDeltasHandler): event_id = delta["event_id"] stream_id = delta["stream_id"] prev_event_id = delta["prev_event_id"] + stream_pos = delta["stream_id"] logger.debug("Handling: %r %r, %s", typ, state_key, event_id) @@ -136,10 +137,15 @@ class StatsHandler(StateDeltasHandler): event_content = {} if event_id is not None: - event_content = (yield self.store.get_event(event_id)).content or {} + event = yield self.store.get_event(event_id, allow_none=True) + if event: + event_content = event.content or {} + + # We use stream_pos here rather than fetch by event_id as event_id + # may be None + now = yield self.store.get_received_ts_by_stream_pos(stream_pos) # quantise time to the nearest bucket - now = yield self.store.get_received_ts(event_id) now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size if typ == EventTypes.Member: @@ -149,9 +155,11 @@ class StatsHandler(StateDeltasHandler): # compare them. prev_event_content = {} if prev_event_id is not None: - prev_event_content = ( - yield self.store.get_event(prev_event_id) - ).content + prev_event = yield self.store.get_event( + prev_event_id, allow_none=True, + ) + if prev_event: + prev_event_content = prev_event.content membership = event_content.get("membership", Membership.LEAVE) prev_membership = prev_event_content.get("membership", Membership.LEAVE) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 1782428048..cc7df5cf14 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -78,6 +78,43 @@ class EventsWorkerStore(SQLBaseStore): desc="get_received_ts", ) + def get_received_ts_by_stream_pos(self, stream_ordering): + """Given a stream ordering get an approximate timestamp of when it + happened. + + This is done by simply taking the received ts of the first event that + has a stream ordering greater than or equal to the given stream pos. + If none exists returns the current time, on the assumption that it must + have happened recently. + + Args: + stream_ordering (int) + + Returns: + Deferred[int] + """ + + def _get_approximate_received_ts_txn(txn): + sql = """ + SELECT received_ts FROM events + WHERE stream_ordering >= ? + LIMIT 1 + """ + + txn.execute(sql, (stream_ordering,)) + row = txn.fetchone() + if row and row[0]: + ts = row[0] + else: + ts = self.clock.time_msec() + + return ts + + return self.runInteraction( + "get_approximate_received_ts", + _get_approximate_received_ts_txn, + ) + @defer.inlineCallbacks def get_event( self, diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py index 249aba3d59..2710c991cf 100644 --- a/tests/handlers/test_stats.py +++ b/tests/handlers/test_stats.py @@ -204,7 +204,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): "a2": {"membership": "not a real thing"}, } - def get_event(event_id): + def get_event(event_id, allow_none=True): m = Mock() m.content = events[event_id] d = defer.Deferred() @@ -224,7 +224,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): "room_id": "room", "event_id": "a1", "prev_event_id": "a2", - "stream_id": "bleb", + "stream_id": 60, } ] @@ -241,7 +241,7 @@ class StatsRoomTests(unittest.HomeserverTestCase): "room_id": "room", "event_id": "a2", "prev_event_id": "a1", - "stream_id": "bleb", + "stream_id": 100, } ] @@ -249,3 +249,59 @@ class StatsRoomTests(unittest.HomeserverTestCase): self.assertEqual( f.value.args[0], "'not a real thing' is not a valid membership" ) + + def test_redacted_prev_event(self): + """ + If the prev_event does not exist, then it is assumed to be a LEAVE. + """ + u1 = self.register_user("u1", "pass") + u1_token = self.login("u1", "pass") + + room_1 = self.helper.create_room_as(u1, tok=u1_token) + + # Do the initial population of the user directory via the background update + self._add_background_updates() + + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + events = { + "a1": None, + "a2": {"membership": Membership.JOIN}, + } + + def get_event(event_id, allow_none=True): + if events.get(event_id): + m = Mock() + m.content = events[event_id] + else: + m = None + d = defer.Deferred() + self.reactor.callLater(0.0, d.callback, m) + return d + + def get_received_ts(event_id): + return defer.succeed(1) + + self.store.get_received_ts = get_received_ts + self.store.get_event = get_event + + deltas = [ + { + "type": EventTypes.Member, + "state_key": "some_user:test", + "room_id": room_1, + "event_id": "a2", + "prev_event_id": "a1", + "stream_id": 100, + } + ] + + # Handle our fake deltas, which has a user going from LEAVE -> JOIN. + self.get_success(self.handler._handle_deltas(deltas)) + + # One delta, with two joined members -- the room creator, and our fake + # user. + r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) + self.assertEqual(len(r), 1) + self.assertEqual(r[0]["joined_members"], 2) |