diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 63d05f2531..5ad408f549 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -24,7 +24,6 @@ from .profile import ProfileHandler
from .directory import DirectoryHandler
from .admin import AdminHandler
from .identity import IdentityHandler
-from .receipts import ReceiptsHandler
from .search import SearchHandler
@@ -56,7 +55,6 @@ class Handlers(object):
self.profile_handler = ProfileHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
- self.receipts_handler = ReceiptsHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index c5368e5df2..f7fad15c62 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -34,9 +34,9 @@ class DeviceMessageHandler(object):
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_federation_sender()
- self.federation.register_edu_handler(
+ hs.get_replication_layer().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2d801bad47..771ab3bc43 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -80,22 +80,6 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
- def handle_new_event(self, event, destinations):
- """ Takes in an event from the client to server side, that has already
- been authed and handled by the state module, and sends it to any
- remote home servers that may be interested.
-
- Args:
- event: The event to send
- destinations: A list of destinations to send it to
-
- Returns:
- Deferred: Resolved when it has successfully been queued for
- processing.
- """
-
- return self.replication_layer.send_pdu(event, destinations)
-
@log_function
@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
@@ -830,25 +814,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- new_pdu = event
-
- users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
- destinations = set(
- get_domain_from_id(user_id) for user_id in users_in_room
- if not self.hs.is_mine_id(user_id)
- )
-
- destinations.discard(origin)
-
- logger.debug(
- "on_send_join_request: Sending event: %s, signatures: %s",
- event.event_id,
- event.signatures,
- )
-
- self.replication_layer.send_pdu(new_pdu, destinations)
-
state_ids = context.prev_state_ids.values()
auth_chain = yield self.store.get_auth_chain(set(
[event.event_id] + state_ids
@@ -1055,24 +1020,6 @@ class FederationHandler(BaseHandler):
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- new_pdu = event
-
- users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
- destinations = set(
- get_domain_from_id(user_id) for user_id in users_in_room
- if not self.hs.is_mine_id(user_id)
- )
- destinations.discard(origin)
-
- logger.debug(
- "on_send_leave_request: Sending event: %s, signatures: %s",
- event.event_id,
- event.signatures,
- )
-
- self.replication_layer.send_pdu(new_pdu, destinations)
-
defer.returnValue(None)
@defer.inlineCallbacks
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fbfa5a0281..e0ade4c164 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -372,11 +372,12 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_receipts():
- receipts_handler = self.hs.get_handlers().receipts_handler
- receipts = yield receipts_handler.get_receipts_for_room(
+ receipts = yield self.store.get_linearized_receipts_for_room(
room_id,
- now_token.receipt_key
+ to_key=now_token.receipt_key,
)
+ if not receipts:
+ receipts = []
defer.returnValue(receipts)
presence, receipts, (messages, token) = yield defer.gatherResults(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 81df45177a..fd09397226 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -22,7 +22,7 @@ from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.push.action_generator import ActionGenerator
from synapse.types import (
- UserID, RoomAlias, RoomStreamToken, get_domain_from_id
+ UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock
from synapse.util.logcontext import preserve_fn
@@ -599,13 +599,6 @@ class MessageHandler(BaseHandler):
event_stream_id, max_stream_id
)
- users_in_room = yield self.store.get_joined_users_from_context(event, context)
-
- destinations = [
- get_domain_from_id(user_id) for user_id in users_in_room
- if not self.hs.is_mine_id(user_id)
- ]
-
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
@@ -618,7 +611,3 @@ class MessageHandler(BaseHandler):
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
-
- preserve_fn(federation_handler.handle_new_event)(
- event, destinations=destinations,
- )
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index b047ae2250..1b89dc6274 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -91,28 +91,29 @@ class PresenceHandler(object):
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.federation = hs.get_replication_layer()
+ self.replication = hs.get_replication_layer()
+ self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence", self.incoming_presence
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.federation.register_edu_handler(
+ self.replication.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index e536a909d0..916e80a48e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -33,8 +33,8 @@ class ReceiptsHandler(BaseHandler):
self.server_name = hs.config.server_name
self.store = hs.get_datastore()
self.hs = hs
- self.federation = hs.get_replication_layer()
- self.federation.register_edu_handler(
+ self.federation = hs.get_federation_sender()
+ hs.get_replication_layer().register_edu_handler(
"m.receipt", self._received_remote_receipt
)
self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 27ee715ff0..0eea7f8f9c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -55,9 +55,9 @@ class TypingHandler(object):
self.clock = hs.get_clock()
self.wheel_timer = WheelTimer(bucket_size=5000)
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_federation_sender()
- self.federation.register_edu_handler("m.typing", self._recv_edu)
+ hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
hs.get_distributor().observe("user_left_room", self.user_left_room)
|