summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/app/synchrotron.py21
-rw-r--r--synapse/replication/slave/storage/events.py1
-rw-r--r--synapse/storage/_base.py1
3 files changed, 23 insertions, 0 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 623a73f67a..6a0592f41d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,10 +16,12 @@
 
 import synapse
 
+from synapse.api.constants import EventTypes
 from synapse.config._base import ConfigError
 from synapse.config.database import DatabaseConfig
 from synapse.config.logger import LoggingConfig
 from synapse.config.appservice import AppServiceConfig
+from synapse.events import FrozenEvent
 from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
@@ -49,6 +51,7 @@ from daemonize import Daemonize
 import sys
 import logging
 import contextlib
+import ujson as json
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
@@ -226,10 +229,27 @@ class SynchrotronServer(HomeServer):
         store = self.get_datastore()
         replication_url = self.config.replication_url
         clock = self.get_clock()
+        notifier = self.get_notifier()
 
         def expire_broken_caches():
             store.who_forgot_in_room.invalidate_all()
 
+        def notify(result):
+            stream = result.get("events")
+            if stream:
+                max_position = stream["position"]
+                for row in stream["rows"]:
+                    position = row[0]
+                    internal = json.loads(row[1])
+                    event_json = json.loads(row[2])
+                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
+                    extra_users = ()
+                    if event.type == EventTypes.Member:
+                        extra_users = (event.state_key,)
+                    notifier.on_new_room_event(
+                        event, position, max_position, extra_users
+                    )
+
         next_expire_broken_caches_ms = 0
         while True:
             try:
@@ -244,6 +264,7 @@ class SynchrotronServer(HomeServer):
                         now_ms + store.BROKEN_CACHE_EXPIRY_MS
                     )
                 yield store.process_replication(result)
+                notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 sleep(5)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index d768e2aabe..cbc1ae4190 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -123,6 +123,7 @@ class SlavedEventStore(BaseSlavedStore):
     get_room_events_stream_for_rooms = (
         DataStore.get_room_events_stream_for_rooms.__func__
     )
+    get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
 
     _set_before_and_after = staticmethod(DataStore._set_before_and_after)
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 56a0dd80f3..e0d7098692 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -153,6 +153,7 @@ class SQLBaseStore(object):
     def __init__(self, hs):
         self.hs = hs
         self._db_pool = hs.get_db_pool()
+        self._clock = hs.get_clock()
 
         self._previous_txn_total_time = 0
         self._current_txn_total_time = 0