summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py36
1 files changed, 25 insertions, 11 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8f362896a2..f445e2aa2a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Optional
+from typing import Optional, Tuple
 
 from six import iteritems, itervalues, string_types
 
@@ -42,6 +42,7 @@ from synapse.api.errors import (
 )
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.api.urls import ConsentURIBuilder
+from synapse.events import EventBase
 from synapse.events.validator import EventValidator
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -630,7 +631,9 @@ class EventCreationHandler(object):
         msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
         raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
 
-    async def send_nonmember_event(self, requester, event, context, ratelimit=True):
+    async def send_nonmember_event(
+        self, requester, event, context, ratelimit=True
+    ) -> int:
         """
         Persists and notifies local clients and federation of an event.
 
@@ -639,6 +642,9 @@ class EventCreationHandler(object):
             context (Context) the context of the event.
             ratelimit (bool): Whether to rate limit this send.
             is_guest (bool): Whether the sender is a guest.
+
+        Return:
+            The stream_id of the persisted event.
         """
         if event.type == EventTypes.Member:
             raise SynapseError(
@@ -659,7 +665,7 @@ class EventCreationHandler(object):
                 )
                 return prev_state
 
-        await self.handle_new_client_event(
+        return await self.handle_new_client_event(
             requester=requester, event=event, context=context, ratelimit=ratelimit
         )
 
@@ -688,7 +694,7 @@ class EventCreationHandler(object):
 
     async def create_and_send_nonmember_event(
         self, requester, event_dict, ratelimit=True, txn_id=None
-    ):
+    ) -> Tuple[EventBase, int]:
         """
         Creates an event, then sends it.
 
@@ -711,10 +717,10 @@ class EventCreationHandler(object):
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
-            await self.send_nonmember_event(
+            stream_id = await self.send_nonmember_event(
                 requester, event, context, ratelimit=ratelimit
             )
-        return event
+        return event, stream_id
 
     @measure_func("create_new_client_event")
     @defer.inlineCallbacks
@@ -774,7 +780,7 @@ class EventCreationHandler(object):
     @measure_func("handle_new_client_event")
     async def handle_new_client_event(
         self, requester, event, context, ratelimit=True, extra_users=[]
-    ):
+    ) -> int:
         """Processes a new event. This includes checking auth, persisting it,
         notifying users, sending to remote servers, etc.
 
@@ -787,6 +793,9 @@ class EventCreationHandler(object):
             context (EventContext)
             ratelimit (bool)
             extra_users (list(UserID)): Any extra users to notify about event
+
+        Return:
+            The stream_id of the persisted event.
         """
 
         if event.is_state() and (event.type, event.state_key) == (
@@ -827,7 +836,7 @@ class EventCreationHandler(object):
         try:
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
-                await self.send_event_to_master(
+                result = await self.send_event_to_master(
                     event_id=event.event_id,
                     store=self.store,
                     requester=requester,
@@ -836,14 +845,17 @@ class EventCreationHandler(object):
                     ratelimit=ratelimit,
                     extra_users=extra_users,
                 )
+                stream_id = result["stream_id"]
+                event.internal_metadata.stream_ordering = stream_id
                 success = True
-                return
+                return stream_id
 
-            await self.persist_and_notify_client_event(
+            stream_id = await self.persist_and_notify_client_event(
                 requester, event, context, ratelimit=ratelimit, extra_users=extra_users
             )
 
             success = True
+            return stream_id
         finally:
             if not success:
                 # Ensure that we actually remove the entries in the push actions
@@ -886,7 +898,7 @@ class EventCreationHandler(object):
 
     async def persist_and_notify_client_event(
         self, requester, event, context, ratelimit=True, extra_users=[]
-    ):
+    ) -> int:
         """Called when we have fully built the event, have already
         calculated the push actions for the event, and checked auth.
 
@@ -1076,6 +1088,8 @@ class EventCreationHandler(object):
             # matters as sometimes presence code can take a while.
             run_in_background(self._bump_active_time, requester.user)
 
+        return event_stream_id
+
     async def _bump_active_time(self, user):
         try:
             presence = self.hs.get_presence_handler()