diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index ddc5c21e7d..833ff41377 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -105,7 +105,9 @@ class BaseHandler(object):
if not suppress_auth:
self.auth.check(event, auth_events=context.current_state)
- yield self.store.persist_event(event, context=context)
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
+ )
federation_handler = self.hs.get_handlers().federation_handler
@@ -142,7 +144,8 @@ class BaseHandler(object):
with PreserveLoggingContext():
# Don't block waiting on waking up all the listeners.
notify_d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
)
def log_failure(f):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 880cbd77e7..d35d9f603c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -160,7 +160,7 @@ class FederationHandler(BaseHandler):
)
try:
- yield self._handle_new_event(
+ _, event_stream_id, max_stream_id = yield self._handle_new_event(
origin,
event,
state=state,
@@ -203,7 +203,8 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
)
def log_failure(f):
@@ -563,7 +564,7 @@ class FederationHandler(BaseHandler):
if e.event_id in auth_ids
}
- yield self._handle_new_event(
+ _, event_stream_id, max_stream_id = yield self._handle_new_event(
origin,
new_event,
state=state,
@@ -573,7 +574,8 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- new_event, extra_users=[joinee]
+ new_event, event_stream_id, max_stream_id,
+ extra_users=[joinee]
)
def log_failure(f):
@@ -639,7 +641,9 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- context = yield self._handle_new_event(origin, event)
+ context, event_stream_id, max_stream_id = yield self._handle_new_event(
+ origin, event
+ )
logger.debug(
"on_send_join_request: After _handle_new_event: %s, sigs: %s",
@@ -655,7 +659,7 @@ class FederationHandler(BaseHandler):
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
+ event, event_stream_id, max_stream_id, extra_users=extra_users
)
def log_failure(f):
@@ -729,7 +733,7 @@ class FederationHandler(BaseHandler):
context = yield self.state_handler.compute_event_context(event)
- yield self.store.persist_event(
+ event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=False,
@@ -738,7 +742,8 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(event.state_key)
with PreserveLoggingContext():
d = self.notifier.on_new_room_event(
- event, extra_users=[target_user],
+ event, event_stream_id, max_stream_id,
+ extra_users=[target_user],
)
def log_failure(f):
@@ -916,7 +921,7 @@ class FederationHandler(BaseHandler):
)
raise
- yield self.store.persist_event(
+ event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
backfilled=backfilled,
@@ -924,7 +929,7 @@ class FederationHandler(BaseHandler):
current_state=current_state,
)
- defer.returnValue(context)
+ defer.returnValue((context, event_stream_id, max_stream_id))
@defer.inlineCallbacks
def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index a01020e202..40794187b1 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -344,6 +344,8 @@ class PresenceHandler(BaseHandler):
curr_users = yield rm_handler.get_room_members(room_id)
for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
+ statuscache = self._get_or_offline_usercache(local_user)
+ statuscache.update({}, serial=self._user_cachemap_latest_serial)
self.push_update_to_local_and_remote(
observed_user=local_user,
users_to_push=[user],
@@ -936,6 +938,8 @@ class PresenceHandler(BaseHandler):
"""
with PreserveLoggingContext():
self.notifier.on_new_user_event(
+ "presence_key",
+ self._user_cachemap_latest_serial,
users_to_push,
room_ids,
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 35a62fda47..bd8c603681 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -92,7 +92,7 @@ class SyncHandler(BaseHandler):
result = yield self.current_sync_for_user(sync_config, since_token)
defer.returnValue(result)
else:
- def current_sync_callback():
+ def current_sync_callback(before_token, after_token):
return self.current_sync_for_user(sync_config, since_token)
rm_handler = self.hs.get_handlers().room_member_handler
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 64fe51aa3e..a9895292c2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -218,7 +218,9 @@ class TypingNotificationHandler(BaseHandler):
self._room_serials[room_id] = self._latest_room_serial
with PreserveLoggingContext():
- self.notifier.on_new_user_event(rooms=[room_id])
+ self.notifier.on_new_user_event(
+ "typing_key", self._latest_room_serial, rooms=[room_id]
+ )
class TypingNotificationEventSource(object):
|