Send some ephemeral events to appservices (#8437)
Optionally sends typing, presence, and read receipt information to appservices.
1 files changed, 34 insertions, 1 deletions
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 7225923757..c242c409cf 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -13,9 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import List, Tuple
+from synapse.appservice import ApplicationService
from synapse.handlers._base import BaseHandler
-from synapse.types import ReadReceipt, get_domain_from_id
+from synapse.types import JsonDict, ReadReceipt, get_domain_from_id
from synapse.util.async_helpers import maybe_awaitable
logger = logging.getLogger(__name__)
@@ -140,5 +142,36 @@ class ReceiptEventSource:
return (events, to_key)
+ async def get_new_events_as(
+ self, from_key: int, service: ApplicationService
+ ) -> Tuple[List[JsonDict], int]:
+ """Returns a set of new receipt events that an appservice
+ may be interested in.
+
+ Args:
+ from_key: the stream position at which events should be fetched from
+ service: The appservice which may be interested
+ """
+ from_key = int(from_key)
+ to_key = self.get_current_key()
+
+ if from_key == to_key:
+ return [], to_key
+
+ # We first need to fetch all new receipts
+ rooms_to_events = await self.store.get_linearized_receipts_for_all_rooms(
+ from_key=from_key, to_key=to_key
+ )
+
+ # Then filter down to rooms that the AS can read
+ events = []
+ for room_id, event in rooms_to_events.items():
+ if not await service.matches_user_in_member_list(room_id, self.store):
+ continue
+
+ events.append(event)
+
+ return (events, to_key)
+
def get_current_key(self, direction="f"):
return self.store.get_max_receipt_stream_id()
|