diff options
author | Erik Johnston <erik@matrix.org> | 2020-01-22 16:53:28 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-01-22 16:53:28 +0000 |
commit | 57a60365da0c47f286ea4608d766abbca5762233 (patch) | |
tree | aaef0948f26f3352092b787d32e1dda0743d697e /synapse/replication/tcp/streams/events.py | |
parent | Pull out more info about room key requests (diff) | |
parent | Remove unnecessary abstractions in admin handler (#6751) (diff) | |
download | synapse-github/erikj/debug_direct_message_checks.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/debug_direct_message_checks github/erikj/debug_direct_message_checks erikj/debug_direct_message_checks
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r-- | synapse/replication/tcp/streams/events.py | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index d97669c886..b3afabb8cd 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -13,12 +13,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import heapq +from typing import Tuple, Type import attr -from twisted.internet import defer - from ._base import Stream @@ -63,7 +63,8 @@ class BaseEventsStreamRow(object): Specifies how to identify, serialize and deserialize the different types. """ - TypeId = None # Unique string that ids the type. Must be overriden in sub classes. + # Unique string that ids the type. Must be overriden in sub classes. + TypeId = None # type: str @classmethod def from_data(cls, data): @@ -99,9 +100,12 @@ class EventsStreamCurrentStateRow(BaseEventsStreamRow): event_id = attr.ib() # str, optional -TypeToRow = { - Row.TypeId: Row for Row in (EventsStreamEventRow, EventsStreamCurrentStateRow) -} +_EventRows = ( + EventsStreamEventRow, + EventsStreamCurrentStateRow, +) # type: Tuple[Type[BaseEventsStreamRow], ...] + +TypeToRow = {Row.TypeId: Row for Row in _EventRows} class EventsStream(Stream): @@ -112,20 +116,19 @@ class EventsStream(Stream): def __init__(self, hs): self._store = hs.get_datastore() - self.current_token = self._store.get_current_events_token + self.current_token = self._store.get_current_events_token # type: ignore super(EventsStream, self).__init__(hs) - @defer.inlineCallbacks - def update_function(self, from_token, current_token, limit=None): - event_rows = yield self._store.get_all_new_forward_event_rows( + async def update_function(self, from_token, current_token, limit=None): + event_rows = await self._store.get_all_new_forward_event_rows( from_token, current_token, limit ) event_updates = ( (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows ) - state_rows = yield self._store.get_all_updated_current_state_deltas( + state_rows = await self._store.get_all_updated_current_state_deltas( from_token, current_token, limit ) state_updates = ( |