diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 623a73f67a..6a0592f41d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,10 +16,12 @@
import synapse
+from synapse.api.constants import EventTypes
from synapse.config._base import ConfigError
from synapse.config.database import DatabaseConfig
from synapse.config.logger import LoggingConfig
from synapse.config.appservice import AppServiceConfig
+from synapse.events import FrozenEvent
from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
@@ -49,6 +51,7 @@ from daemonize import Daemonize
import sys
import logging
import contextlib
+import ujson as json
logger = logging.getLogger("synapse.app.synchrotron")
@@ -226,10 +229,27 @@ class SynchrotronServer(HomeServer):
store = self.get_datastore()
replication_url = self.config.replication_url
clock = self.get_clock()
+ notifier = self.get_notifier()
def expire_broken_caches():
store.who_forgot_in_room.invalidate_all()
+ def notify(result):
+ stream = result.get("events")
+ if stream:
+ max_position = stream["position"]
+ for row in stream["rows"]:
+ position = row[0]
+ internal = json.loads(row[1])
+ event_json = json.loads(row[2])
+ event = FrozenEvent(event_json, internal_metadata_dict=internal)
+ extra_users = ()
+ if event.type == EventTypes.Member:
+ extra_users = (event.state_key,)
+ notifier.on_new_room_event(
+ event, position, max_position, extra_users
+ )
+
next_expire_broken_caches_ms = 0
while True:
try:
@@ -244,6 +264,7 @@ class SynchrotronServer(HomeServer):
now_ms + store.BROKEN_CACHE_EXPIRY_MS
)
yield store.process_replication(result)
+ notify(result)
except:
logger.exception("Error replicating from %r", replication_url)
sleep(5)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index d768e2aabe..cbc1ae4190 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -123,6 +123,7 @@ class SlavedEventStore(BaseSlavedStore):
get_room_events_stream_for_rooms = (
DataStore.get_room_events_stream_for_rooms.__func__
)
+ get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
_set_before_and_after = staticmethod(DataStore._set_before_and_after)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 56a0dd80f3..e0d7098692 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -153,6 +153,7 @@ class SQLBaseStore(object):
def __init__(self, hs):
self.hs = hs
self._db_pool = hs.get_db_pool()
+ self._clock = hs.get_clock()
self._previous_txn_total_time = 0
self._current_txn_total_time = 0
|