summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2016-05-31 13:23:58 +0100
committerMark Haines <mark.haines@matrix.org>2016-05-31 13:23:58 +0100
commit86a746982bbf85f56d33af84172035d13df92c62 (patch)
tree7495bba24e70b811fc2f0eb5efb28119d1b61a10 /synapse/app
parentMerge remote-tracking branch 'origin/develop' into markjh/synchrotron (diff)
downloadsynapse-86a746982bbf85f56d33af84172035d13df92c62.tar.xz
Poke the notifier to wake up /syncs
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/synchrotron.py21
1 files changed, 21 insertions, 0 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 623a73f67a..6a0592f41d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -16,10 +16,12 @@
 
 import synapse
 
+from synapse.api.constants import EventTypes
 from synapse.config._base import ConfigError
 from synapse.config.database import DatabaseConfig
 from synapse.config.logger import LoggingConfig
 from synapse.config.appservice import AppServiceConfig
+from synapse.events import FrozenEvent
 from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
@@ -49,6 +51,7 @@ from daemonize import Daemonize
 import sys
 import logging
 import contextlib
+import ujson as json
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
@@ -226,10 +229,27 @@ class SynchrotronServer(HomeServer):
         store = self.get_datastore()
         replication_url = self.config.replication_url
         clock = self.get_clock()
+        notifier = self.get_notifier()
 
         def expire_broken_caches():
             store.who_forgot_in_room.invalidate_all()
 
+        def notify(result):
+            stream = result.get("events")
+            if stream:
+                max_position = stream["position"]
+                for row in stream["rows"]:
+                    position = row[0]
+                    internal = json.loads(row[1])
+                    event_json = json.loads(row[2])
+                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
+                    extra_users = ()
+                    if event.type == EventTypes.Member:
+                        extra_users = (event.state_key,)
+                    notifier.on_new_room_event(
+                        event, position, max_position, extra_users
+                    )
+
         next_expire_broken_caches_ms = 0
         while True:
             try:
@@ -244,6 +264,7 @@ class SynchrotronServer(HomeServer):
                         now_ms + store.BROKEN_CACHE_EXPIRY_MS
                     )
                 yield store.process_replication(result)
+                notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
                 sleep(5)