diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 5fd20285d2..24c4c62698 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -19,9 +19,12 @@ from synapse.api.errors import LimitExceededError, SynapseError, AuthError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias
+from synapse.push.action_generator import ActionGenerator
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.events.utils import serialize_event
+
import logging
@@ -264,6 +267,11 @@ class BaseHandler(object):
event, context=context
)
+ action_generator = ActionGenerator(self.hs, self.store)
+ yield action_generator.handle_event(serialize_event(
+ event, self.clock.time_msec()
+ ))
+
destinations = set(extra_destinations)
for k, s in context.current_state.items():
try:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 28f2ff68d6..0b1221deb5 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,10 +32,12 @@ from synapse.crypto.event_signing import (
)
from synapse.types import UserID
-from synapse.events.utils import prune_event
+from synapse.events.utils import prune_event, serialize_event
from synapse.util.retryutils import NotRetryingDestination
+from synapse.push.action_generator import ActionGenerator
+
from twisted.internet import defer
import itertools
@@ -242,6 +244,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
+ if not backfilled and not event.internal_metadata.is_outlier():
+ action_generator = ActionGenerator(self.hs, self.store)
+ yield action_generator.handle_event(serialize_event(
+ event, self.clock.time_msec())
+ )
+
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7088c20cb4..4cbb43a31b 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -53,6 +53,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"state", # dict[(str, str), FrozenEvent]
"ephemeral",
"account_data",
+ "unread_notification_count",
])):
__slots__ = []
@@ -65,6 +66,8 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
or self.state
or self.ephemeral
or self.account_data
+ # nb the notification count does not, er, count: if there's nothing
+ # else in the result, we don't need to send it.
)
@@ -162,6 +165,18 @@ class SyncHandler(BaseHandler):
else:
return self.incremental_sync_with_gap(sync_config, since_token)
+ def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
+ if room_id not in ephemeral_by_room:
+ return None
+ for e in ephemeral_by_room[room_id]:
+ if e['type'] != 'm.receipt':
+ continue
+ for receipt_event_id, val in e['content'].items():
+ if 'm.read' in val:
+ if user_id in val['m.read']:
+ return receipt_event_id
+ return None
+
@defer.inlineCallbacks
def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state.
@@ -273,6 +288,13 @@ class SyncHandler(BaseHandler):
room_id, sync_config, now_token, since_token=timeline_since_token
)
+ notifs = yield self.unread_notifs_for_room_id(
+ room_id, sync_config, ephemeral_by_room
+ )
+ notif_count = None
+ if notifs is not None:
+ notif_count = len(notifs)
+
current_state = yield self.get_state_at(room_id, now_token)
defer.returnValue(JoinedSyncResult(
@@ -283,6 +305,7 @@ class SyncHandler(BaseHandler):
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
+ unread_notification_count=notif_count
))
def account_data_for_user(self, account_data):
@@ -424,6 +447,10 @@ class SyncHandler(BaseHandler):
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
+ _, all_ephemeral_by_room = yield self.ephemeral_by_room(
+ sync_config, now_token
+ )
+
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token, since_token
)
@@ -497,6 +524,13 @@ class SyncHandler(BaseHandler):
else:
prev_batch = now_token
+ notifs = yield self.unread_notifs_for_room_id(
+ room_id, sync_config, all_ephemeral_by_room
+ )
+ notif_count = None
+ if notifs is not None:
+ notif_count = len(notifs)
+
just_joined = yield self.check_joined_room(sync_config, state)
if just_joined:
logger.debug("User has just joined %s: needs full state",
@@ -517,6 +551,7 @@ class SyncHandler(BaseHandler):
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
+ unread_notification_count=notif_count
)
logger.debug("Result for room %s: %r", room_id, room_sync)
@@ -787,3 +822,20 @@ class SyncHandler(BaseHandler):
if join_event.content["membership"] == Membership.JOIN:
return True
return False
+
+ @defer.inlineCallbacks
+ def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
+ last_unread_event_id = self.last_read_event_id_for_room_and_user(
+ room_id, sync_config.user.to_string(), ephemeral_by_room
+ )
+
+ notifs = []
+ if last_unread_event_id:
+ notifs = yield self.store.get_unread_event_actions_by_room_for_user(
+ room_id, sync_config.user.to_string(), last_unread_event_id
+ )
+ else:
+ # There is no new information in this period, so your notification
+ # count is whatever it was last time.
+ defer.returnValue(None)
+ defer.returnValue(notifs)
|