Mastodonを読む
はじめに †
home#indexの処理を見た際など、いくつかのコード中にストリーミングに関する処理がありました。今回はそれらについて見ていき、Mastodonでどのようにストリーミング処理が行われているのかを確認しましょう。
復習:app/controllers/home_controller.rb †
home#indexを見た際、ストリーミングに利用するURLが以下のように設定されていました。
1
2
3
4
5
6
7
| -
|
|
|
|
|
!
| def index
@body_classes = 'app-body'
@token = find_or_create_access_token.token
@web_settings = Web::Setting.find_by(user: current_user)&.data || {}
@admin = Account.find_local(Setting.site_contact_username)
@streaming_api_base_url = Rails.configuration.x.streaming_api_base_url
end
|
Rails.configuration.xってなんだろ。アプリ独自の設定書くときに使っていいのかな。
indexのレンダリングが行われ、クライアントにはinitial-stateとしてストリーミングAPIのURLが渡されることになります。なお、開発環境だと以下のようになります。
1
2
3
4
| {
"meta":{
"streaming_api_base_url":"http://localhost:4000",
"access_token":"8920c1d8279002fa0f6b74f96bf7c971f997318755a35f5a26b511229a2786b7",
|
ここで重要なのは、ストリーミングで使われるサーバがMastodon本体とは別、ということです。(Mastodon本体は開発環境の場合、http://localhost:3000になります)
クライアント側:app/assets/javascripts/components/containers/mastdon.jsx †
サーバが別ということが確認できましたが、まずはサーバの前にクライアントの確認をしましょう。クライアント処理はReactのMastodonクラスに書かれています。(以下はホームタイムラインですが、ローカルタイムライン、連合タイムラインも処理はほぼ同じです)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
| class Mastodon extends React.Component {
componentDidMount() {
const { locale } = this.props;
const streamingAPIBaseURL = store.getState().getIn(['meta', 'streaming_api_base_url']);
const accessToken = store.getState().getIn(['meta', 'access_token']);
this.subscription = createStream(streamingAPIBaseURL, accessToken, 'user', {
connected () {
store.dispatch(connectTimeline('home'));
},
disconnected () {
store.dispatch(disconnectTimeline('home'));
},
received (data) {
switch(data.event) {
case 'update':
store.dispatch(updateTimeline('home', JSON.parse(data.payload)));
break;
case 'delete':
store.dispatch(deleteFromTimelines(data.payload));
break;
case 'notification':
store.dispatch(updateNotifications(JSON.parse(data.payload), getMessagesForLocale(locale), locale));
break;
}
},
reconnected () {
store.dispatch(connectTimeline('home'));
store.dispatch(refreshTimeline('home'));
store.dispatch(refreshNotifications());
}
});
省略
}
|
createStreamで接続、引数で渡しているハンドラオブジェクト?っていうのかな?のreceivedでサーバからデータを受信したときにタイムラインの更新を行っている雰囲気です。
app/assets/javascripts/components/stream.jsx †
createStreamを確認しましょう。
1
2
3
4
5
6
7
8
9
10
| export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
const ws = new WebSocketClient(`${createWebSocketURL(streamingAPIBaseURL)}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`);
ws.onopen = connected;
ws.onmessage = e => received(JSON.parse(e.data));
ws.onclose = disconnected;
ws.onreconnect = reconnected;
return ws;
};
|
というわけで、「/api/v1/streaming/」というURLにアクセスが行われるようです。
サーバ側:streaming/index.js †
ここからサーバ側。「/api/v1/streaming/」で探してみると、streamingディレクトリにindex.jsがあるのに気づきます。ストリーミング処理はJavascript(Node)で書かれているようです。
クライアントからの接続受け付け †
最後の方から見ると以下の記述があります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| -
|
|
|
|
-
-
|
|
|
!
|
-
|
|
|
|
!
!
!
| wss.on('connection', ws => {
const location = url.parse(ws.upgradeReq.url, true)
const token = location.query.access_token
const req = { requestId: uuid.v4() }
accountFromToken(token, req, err => {
if (err) {
log.error(req.requestId, err)
ws.close()
return
}
switch(location.query.stream) {
case 'user':
streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(ws))
break;
省略
}
})
})
|
accountFromTokenは名前の通り、アクセストークンからアカウントのIDを取得しています。
streamFrom †
では。streamFromです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| -
|
|
-
|
|
-
|
|
|
|
|
!
|
-
|
-
|
-
|
!
!
|
|
|
!
| const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => {
log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`)
const listener = message => {
const { event, payload, queued_at } = JSON.parse(message)
const transmit = () => {
const now = new Date().getTime()
const delta = now - queued_at;
log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
output(event, payload)
}
if (needsFiltering && event === 'update') {
省略
} else {
transmit()
}
}
subscribe(id, listener)
attachCloseHandler(id, listener)
}
|
listenerを定義しています。listenerではmessageが送られてきたらそれをoutputオブジェクトに書き込むということをしています。これにより、接続先のクライアントに情報が送られるということになります。
subscribe †
というわけで今度はsubscribe。
1
2
3
4
5
| -
|
|
|
!
| const subscribe = (channel, callback) => {
log.silly(`Adding listener for ${channel}`)
subs[channel] = subs[channel] || []
subs[channel].push(callback)
}
|
よくあるイベントリスナー的なコードです。ではこのsubsをどこで使っているのかというと、すぐ上の部分、
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| -
|
|
|
!
-
|
|
|
|
-
|
!
|
|
!
| const redisClient = redis.createClient({
host: process.env.REDIS_HOST || '127.0.0.1',
port: process.env.REDIS_PORT || 6379,
password: process.env.REDIS_PASSWORD
})
const subs = {}
redisClient.on('pmessage', (_, channel, message) => {
const callbacks = subs[channel]
log.silly(`New message on channel ${channel}`)
if (!callbacks) {
return
}
callbacks.forEach(callback => callback(message))
})
redisClient.psubscribe('timeline:*')
|
Redisに接続し、Redisからメッセージが飛んで来たら対応するcallback(listener)が呼び出されるようになっています。つまり、Redisへの書き込みが行われることでストリーミングサーバが反応→クライアントにプッシュが行われるようです。なお、psubscribeはpattern subscribeの略のようです。
Redisへの書き込み:app/services/post_status_service.rb †
というわけで後はRedisに書き込んでいる場所を探せばいいことになります。実はすでに一回出てきていました。ここからRallsになります。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| -
-
|
|
-
|
|
|
|
|
|
|
|
!
|
|
|
|
|
|
|
|
!
| class PostStatusService < BaseService
def call(account, text, in_reply_to = nil, options = {})
media = validate_media!(options[:media_ids])
status = nil
ApplicationRecord.transaction do
status = account.statuses.create!(text: text,
thread: in_reply_to,
sensitive: options[:sensitive],
spoiler_text: options[:spoiler_text] || '',
visibility: options[:visibility],
language: detect_language_for(text, account),
application: options[:application])
attach_media(status, media)
end
process_mentions_service.call(status)
process_hashtags_service.call(status)
LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text.present?
DistributionWorker.perform_async(status.id)
Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
status
end
|
いろいろなWorkerが呼ばれていますが、DistributionWorkerを見てみる。
app/works/distribution_worker.rb
1
2
3
4
5
6
7
8
9
| -
|
|
-
|
|
|
!
!
| class DistributionWorker < ApplicationWorker
include Sidekiq::Worker
def perform(status_id)
FanOutOnWriteService.new.call(Status.find(status_id))
rescue ActiveRecord::RecordNotFound
info("Couldn't find the status")
end
end
|
さらにFanOutOnWriteService。
app/services/fan_out_on_write_service.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| -
-
|
-
|
|
|
|
-
|
|
|
!
|
|
|
|
|
|
|
|
|
!
|
-
|
|
!
| class FanOutOnWriteService < BaseService
def call(status)
raise Mastodon::RaceConditionError if status.visibility.nil?
deliver_to_self(status) if status.account.local?
if status.direct_visibility?
deliver_to_mentioned_followers(status)
else
deliver_to_followers(status)
end
return if status.account.silenced? || !status.public_visibility? || status.reblog?
render_anonymous_payload(status)
deliver_to_hashtags(status)
return if status.reply? && status.in_reply_to_account_id != status.account_id
deliver_to_public(status)
end
def deliver_to_self(status)
Rails.logger.debug "Delivering status #{status.id} to author"
FeedManager.instance.push(:home, status.account, status)
end
|
app/lib/feed_manager.rb †
FeedManagerのpushメソッド。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| -
|
|
-
-
!
|
|
|
|
|
!
|
|
!
| def push(timeline_type, account, status)
timeline_key = key(timeline_type, account.id)
if status.reblog?
rank = redis.zrevrank(timeline_key, status.reblog_of_id)
return if !rank.nil? && rank < 40
redis.zadd(timeline_key, status.id, status.reblog_of_id)
else
redis.zadd(timeline_key, status.id, status.id)
trim(timeline_type, account.id)
end
PushUpdateWorker.perform_async(account.id, status.id)
end
|
なかなかしぶとい(笑)。さらにPushUpdateWorkerです。
1
2
3
4
5
6
7
8
9
10
11
12
13
| -
|
|
-
|
|
|
|
|
|
|
!
!
| class PushUpdateWorker
include Sidekiq::Worker
def perform(account_id, status_id)
account = Account.find(account_id)
status = Status.find(status_id)
message = InlineRenderer.render(status, account, 'api/v1/statuses/show')
Redis.current.publish("timeline:#{account.id}", Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i))
rescue ActiveRecord::RecordNotFound
true
end
end
|
「timeline:アカウントID」への書き込みがありました。これで、Redisに書き込みが行われ、最終的にクライアントに届くということになります。フォロワーにトゥートが届く仕組みもほぼ同じです。リモートインスタンスにいるフォロワーの場合は仕組みがかなり複雑になっていますが、ひとつずつ確認していけば理解できると思います。
おわりに †
今回はストリーミングの処理を見てきました。ストリーミングは次の3つから構成されています。
- クライアント側の接続、サーバから送られてきたら表示処理
- サーバ側その1。クライアントへデータを送る部分(Node)
- サーバ側その2。送るデータの書き込み部分(Rails)
サーバ内での情報の受け渡しにはRedisが使われていました。
ここまでで、初期画面表示、トゥート作成、フォロー、リモートフォロー、ストリーミングと一通りの処理を見てきたことになります。実はそろそろ1.4が出そうですが、これにて「Mastodonを読む」完結としたいと思います。