diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8f362896a2..f445e2aa2a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Optional
+from typing import Optional, Tuple
from six import iteritems, itervalues, string_types
@@ -42,6 +42,7 @@ from synapse.api.errors import (
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.api.urls import ConsentURIBuilder
+from synapse.events import EventBase
from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -630,7 +631,9 @@ class EventCreationHandler(object):
msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
- async def send_nonmember_event(self, requester, event, context, ratelimit=True):
+ async def send_nonmember_event(
+ self, requester, event, context, ratelimit=True
+ ) -> int:
"""
Persists and notifies local clients and federation of an event.
@@ -639,6 +642,9 @@ class EventCreationHandler(object):
context (Context) the context of the event.
ratelimit (bool): Whether to rate limit this send.
is_guest (bool): Whether the sender is a guest.
+
+ Return:
+ The stream_id of the persisted event.
"""
if event.type == EventTypes.Member:
raise SynapseError(
@@ -659,7 +665,7 @@ class EventCreationHandler(object):
)
return prev_state
- await self.handle_new_client_event(
+ return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
)
@@ -688,7 +694,7 @@ class EventCreationHandler(object):
async def create_and_send_nonmember_event(
self, requester, event_dict, ratelimit=True, txn_id=None
- ):
+ ) -> Tuple[EventBase, int]:
"""
Creates an event, then sends it.
@@ -711,10 +717,10 @@ class EventCreationHandler(object):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
- await self.send_nonmember_event(
+ stream_id = await self.send_nonmember_event(
requester, event, context, ratelimit=ratelimit
)
- return event
+ return event, stream_id
@measure_func("create_new_client_event")
@defer.inlineCallbacks
@@ -774,7 +780,7 @@ class EventCreationHandler(object):
@measure_func("handle_new_client_event")
async def handle_new_client_event(
self, requester, event, context, ratelimit=True, extra_users=[]
- ):
+ ) -> int:
"""Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
@@ -787,6 +793,9 @@ class EventCreationHandler(object):
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
+
+ Return:
+ The stream_id of the persisted event.
"""
if event.is_state() and (event.type, event.state_key) == (
@@ -827,7 +836,7 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
- await self.send_event_to_master(
+ result = await self.send_event_to_master(
event_id=event.event_id,
store=self.store,
requester=requester,
@@ -836,14 +845,17 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
+ stream_id = result["stream_id"]
+ event.internal_metadata.stream_ordering = stream_id
success = True
- return
+ return stream_id
- await self.persist_and_notify_client_event(
+ stream_id = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
success = True
+ return stream_id
finally:
if not success:
# Ensure that we actually remove the entries in the push actions
@@ -886,7 +898,7 @@ class EventCreationHandler(object):
async def persist_and_notify_client_event(
self, requester, event, context, ratelimit=True, extra_users=[]
- ):
+ ) -> int:
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.
@@ -1076,6 +1088,8 @@ class EventCreationHandler(object):
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
+ return event_stream_id
+
async def _bump_active_time(self, user):
try:
presence = self.hs.get_presence_handler()
|