diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/presence.py | 11 | ||||
-rw-r--r-- | synapse/handlers/stats.py | 18 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 37 |
3 files changed, 57 insertions, 9 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6209858bbb..e49c8203ef 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -828,14 +828,17 @@ class PresenceHandler(object): # joins. continue - event = yield self.store.get_event(event_id) - if event.content.get("membership") != Membership.JOIN: + event = yield self.store.get_event(event_id, allow_none=True) + if not event or event.content.get("membership") != Membership.JOIN: # We only care about joins continue if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id) - if prev_event.content.get("membership") == Membership.JOIN: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + if ( + prev_event + and prev_event.content.get("membership") == Membership.JOIN + ): # Ignore changes to join events. continue diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 0e92b405ba..7ad16c8566 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -115,6 +115,7 @@ class StatsHandler(StateDeltasHandler): event_id = delta["event_id"] stream_id = delta["stream_id"] prev_event_id = delta["prev_event_id"] + stream_pos = delta["stream_id"] logger.debug("Handling: %r %r, %s", typ, state_key, event_id) @@ -136,10 +137,15 @@ class StatsHandler(StateDeltasHandler): event_content = {} if event_id is not None: - event_content = (yield self.store.get_event(event_id)).content or {} + event = yield self.store.get_event(event_id, allow_none=True) + if event: + event_content = event.content or {} + + # We use stream_pos here rather than fetch by event_id as event_id + # may be None + now = yield self.store.get_received_ts_by_stream_pos(stream_pos) # quantise time to the nearest bucket - now = yield self.store.get_received_ts(event_id) now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size if typ == EventTypes.Member: @@ -149,9 +155,11 @@ class StatsHandler(StateDeltasHandler): # compare them. prev_event_content = {} if prev_event_id is not None: - prev_event_content = ( - yield self.store.get_event(prev_event_id) - ).content + prev_event = yield self.store.get_event( + prev_event_id, allow_none=True, + ) + if prev_event: + prev_event_content = prev_event.content membership = event_content.get("membership", Membership.LEAVE) prev_membership = prev_event_content.get("membership", Membership.LEAVE) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 1782428048..cc7df5cf14 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -78,6 +78,43 @@ class EventsWorkerStore(SQLBaseStore): desc="get_received_ts", ) + def get_received_ts_by_stream_pos(self, stream_ordering): + """Given a stream ordering get an approximate timestamp of when it + happened. + + This is done by simply taking the received ts of the first event that + has a stream ordering greater than or equal to the given stream pos. + If none exists returns the current time, on the assumption that it must + have happened recently. + + Args: + stream_ordering (int) + + Returns: + Deferred[int] + """ + + def _get_approximate_received_ts_txn(txn): + sql = """ + SELECT received_ts FROM events + WHERE stream_ordering >= ? + LIMIT 1 + """ + + txn.execute(sql, (stream_ordering,)) + row = txn.fetchone() + if row and row[0]: + ts = row[0] + else: + ts = self.clock.time_msec() + + return ts + + return self.runInteraction( + "get_approximate_received_ts", + _get_approximate_received_ts_txn, + ) + @defer.inlineCallbacks def get_event( self, |