diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 6209858bbb..e49c8203ef 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -828,14 +828,17 @@ class PresenceHandler(object):
# joins.
continue
- event = yield self.store.get_event(event_id)
- if event.content.get("membership") != Membership.JOIN:
+ event = yield self.store.get_event(event_id, allow_none=True)
+ if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue
if prev_event_id:
- prev_event = yield self.store.get_event(prev_event_id)
- if prev_event.content.get("membership") == Membership.JOIN:
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+ if (
+ prev_event
+ and prev_event.content.get("membership") == Membership.JOIN
+ ):
# Ignore changes to join events.
continue
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 0e92b405ba..7ad16c8566 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -115,6 +115,7 @@ class StatsHandler(StateDeltasHandler):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
+ stream_pos = delta["stream_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
@@ -136,10 +137,15 @@ class StatsHandler(StateDeltasHandler):
event_content = {}
if event_id is not None:
- event_content = (yield self.store.get_event(event_id)).content or {}
+ event = yield self.store.get_event(event_id, allow_none=True)
+ if event:
+ event_content = event.content or {}
+
+ # We use stream_pos here rather than fetch by event_id as event_id
+ # may be None
+ now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
# quantise time to the nearest bucket
- now = yield self.store.get_received_ts(event_id)
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
if typ == EventTypes.Member:
@@ -149,9 +155,11 @@ class StatsHandler(StateDeltasHandler):
# compare them.
prev_event_content = {}
if prev_event_id is not None:
- prev_event_content = (
- yield self.store.get_event(prev_event_id)
- ).content
+ prev_event = yield self.store.get_event(
+ prev_event_id, allow_none=True,
+ )
+ if prev_event:
+ prev_event_content = prev_event.content
membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 1782428048..cc7df5cf14 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -78,6 +78,43 @@ class EventsWorkerStore(SQLBaseStore):
desc="get_received_ts",
)
+ def get_received_ts_by_stream_pos(self, stream_ordering):
+ """Given a stream ordering get an approximate timestamp of when it
+ happened.
+
+ This is done by simply taking the received ts of the first event that
+ has a stream ordering greater than or equal to the given stream pos.
+ If none exists returns the current time, on the assumption that it must
+ have happened recently.
+
+ Args:
+ stream_ordering (int)
+
+ Returns:
+ Deferred[int]
+ """
+
+ def _get_approximate_received_ts_txn(txn):
+ sql = """
+ SELECT received_ts FROM events
+ WHERE stream_ordering >= ?
+ LIMIT 1
+ """
+
+ txn.execute(sql, (stream_ordering,))
+ row = txn.fetchone()
+ if row and row[0]:
+ ts = row[0]
+ else:
+ ts = self.clock.time_msec()
+
+ return ts
+
+ return self.runInteraction(
+ "get_approximate_received_ts",
+ _get_approximate_received_ts_txn,
+ )
+
@defer.inlineCallbacks
def get_event(
self,
|