diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f1de1e7ce9..3c3fa38053 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -311,7 +311,7 @@ class PusherServer(HomeServer):
poke_pushers(result)
except:
logger.exception("Error replicating from %r", replication_url)
- sleep(30)
+ yield sleep(30)
def setup(config_options):
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index aa81e1c5da..5c552ffb29 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -187,7 +187,10 @@ class SynchrotronPresence(object):
yield self._send_syncing_users_now()
def _end():
- if affect_presence:
+ # We check that the user_id is in user_to_num_current_syncs because
+ # user_to_num_current_syncs may have been cleared if we are
+ # shutting down.
+ if affect_presence and user_id in self.user_to_num_current_syncs:
self.user_to_num_current_syncs[user_id] -= 1
@contextlib.contextmanager
@@ -443,7 +446,7 @@ class SynchrotronServer(HomeServer):
notify(result)
except:
logger.exception("Error replicating from %r", replication_url)
- sleep(5)
+ yield sleep(5)
def build_presence_handler(self):
return SynchrotronPresence(self)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 0e19f777b8..6b70fa3817 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -183,7 +183,7 @@ class PresenceHandler(object):
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
self.clock.call_later(
- 30 * 1000,
+ 30,
self.clock.looping_call,
self._handle_timeouts,
5000,
@@ -283,44 +283,48 @@ class PresenceHandler(object):
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
+ logger.info("Handling presence timeouts")
now = self.clock.time_msec()
- with Measure(self.clock, "presence_handle_timeouts"):
- # Fetch the list of users that *may* have timed out. Things may have
- # changed since the timeout was set, so we won't necessarily have to
- # take any action.
- users_to_check = set(self.wheel_timer.fetch(now))
-
- # Check whether the lists of syncing processes from an external
- # process have expired.
- expired_process_ids = [
- process_id for process_id, last_update
- in self.external_process_last_update.items()
- if now - last_update > EXTERNAL_PROCESS_EXPIRY
- ]
- for process_id in expired_process_ids:
- users_to_check.update(
- self.external_process_to_current_syncs.pop(process_id, ())
- )
- self.external_process_last_update.pop(process_id)
+ try:
+ with Measure(self.clock, "presence_handle_timeouts"):
+ # Fetch the list of users that *may* have timed out. Things may have
+ # changed since the timeout was set, so we won't necessarily have to
+ # take any action.
+ users_to_check = set(self.wheel_timer.fetch(now))
+
+ # Check whether the lists of syncing processes from an external
+ # process have expired.
+ expired_process_ids = [
+ process_id for process_id, last_update
+ in self.external_process_last_updated_ms.items()
+ if now - last_update > EXTERNAL_PROCESS_EXPIRY
+ ]
+ for process_id in expired_process_ids:
+ users_to_check.update(
+ self.external_process_last_updated_ms.pop(process_id, ())
+ )
+ self.external_process_last_update.pop(process_id)
- states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
- for user_id in users_to_check
- ]
+ states = [
+ self.user_to_current_state.get(
+ user_id, UserPresenceState.default(user_id)
+ )
+ for user_id in users_to_check
+ ]
- timers_fired_counter.inc_by(len(states))
+ timers_fired_counter.inc_by(len(states))
- changes = handle_timeouts(
- states,
- is_mine_fn=self.is_mine_id,
- syncing_users=self.get_syncing_users(),
- now=now,
- )
+ changes = handle_timeouts(
+ states,
+ is_mine_fn=self.is_mine_id,
+ syncing_user_ids=self.get_currently_syncing_users(),
+ now=now,
+ )
- preserve_fn(self._update_states)(changes)
+ preserve_fn(self._update_states)(changes)
+ except:
+ logger.exception("Exception in _handle_timeouts loop")
@defer.inlineCallbacks
def bump_presence_active_time(self, user):
@@ -402,7 +406,8 @@ class PresenceHandler(object):
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
- syncing_user_ids.update(self.external_process_to_current_syncs.values())
+ for user_ids in self.external_process_to_current_syncs.values():
+ syncing_user_ids.update(user_ids)
return syncing_user_ids
@defer.inlineCallbacks
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index ffb7d4a25b..d1ee533fac 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -320,7 +320,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
event_ids = json.loads(entry["event_ids"])
- events = yield self.get_events(event_ids)
+ events = yield self._get_events(event_ids)
defer.returnValue(AppServiceTransaction(
service=service, id=entry["txn_id"], events=events
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5db24e86f9..6d978ffcd5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json
from collections import deque, namedtuple
+import synapse
+import synapse.metrics
+
import logging
import math
@@ -35,6 +38,10 @@ import ujson as json
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
def encode_json(json_object):
if USE_FROZEN_DICTS:
# ujson doesn't like frozen_dicts
@@ -261,6 +268,7 @@ class EventsStore(SQLBaseStore):
events_and_contexts=chunk,
backfilled=backfilled,
)
+ persist_event_counter.inc_by(len(chunk))
@defer.inlineCallbacks
@log_function
@@ -278,6 +286,7 @@ class EventsStore(SQLBaseStore):
current_state=current_state,
backfilled=backfilled,
)
+ persist_event_counter.inc()
except _RollbackButIsFineException:
pass
@@ -635,6 +644,8 @@ class EventsStore(SQLBaseStore):
],
)
+ self._add_to_cache(txn, events_and_contexts)
+
if backfilled:
# Backfilled events come before the current state so we don't need
# to update the current state table
@@ -676,6 +687,45 @@ class EventsStore(SQLBaseStore):
return
+ def _add_to_cache(self, txn, events_and_contexts):
+ to_prefill = []
+
+ rows = []
+ N = 200
+ for i in range(0, len(events_and_contexts), N):
+ ev_map = {
+ e[0].event_id: e[0]
+ for e in events_and_contexts[i:i + N]
+ }
+ if not ev_map:
+ break
+
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE e.event_id IN (%s)"
+ ) % (",".join(["?"] * len(ev_map)),)
+
+ txn.execute(sql, ev_map.keys())
+ rows = self.cursor_to_dict(txn)
+ for row in rows:
+ event = ev_map[row["event_id"]]
+ if not row["rejects"] and not row["redacts"]:
+ to_prefill.append(_EventCacheEntry(
+ event=event,
+ redacted_event=None,
+ ))
+
+ def prefill():
+ for cache_entry in to_prefill:
+ self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+ txn.call_after(prefill)
+
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
|