Mastodonを読む
はじめに †
前回までで「/」にアクセスしたとき(ログイン済みとする)に初めに表示される画面の読解は終わりました。今回はトゥートをしたときに行われる処理について見ていきたいと思います。
クライアントサイドの処理 †
app/assets/javascripts/components/features/compose/index.jsx †
PCの場合にしろ、スマホの場合にしろ、トゥートを行うためのコンポーネントはComposeクラスが使用されます。
renderメソッドについては何度も見ているので省略します。
app/assets/javascripts/components/features/compose/containers/compose_form_container.jsx †
コンポーネントとActionをつなぐ記述を確認します。ConposeFormContainerに書かれています。必要なところだけ抜き出すと、
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| import { connect } from 'react-redux';
import ComposeForm from '../components/compose_form';
import {
changeCompose,
submitCompose,
clearComposeSuggestions,
fetchComposeSuggestions,
selectComposeSuggestion,
changeComposeSpoilerText,
insertEmojiCompose
} from '../../../actions/compose';
const mapDispatchToProps = (dispatch) => ({
onChange (text) {
dispatch(changeCompose(text));
},
onSubmit () {
dispatch(submitCompose());
},
他のイベントハンドラ
});
export default connect(mapStateToProps, mapDispatchToProps)(ComposeForm);
|
ところで、jsxは全く知らないのですが、「export default」としておくとその右にあるものがConposeFormContainerとして使われるって認識でいいんですよね?
app/assets.javascripts/components/actions/compose.jsx †
というわけで、submitCompose。ちょっと長めですがやってることはそこまで難しくありません。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| export function submitCompose() {
return function (dispatch, getState) {
const status = emojione.shortnameToUnicode(getState().getIn(['compose', 'text'], ''));
if (!status || !status.length) {
return;
}
dispatch(submitComposeRequest());
api(getState).post('/api/v1/statuses', {
status,
in_reply_to_id: getState().getIn(['compose', 'in_reply_to'], null),
media_ids: getState().getIn(['compose', 'media_attachments']).map(item => item.get('id')),
sensitive: getState().getIn(['compose', 'sensitive']),
spoiler_text: getState().getIn(['compose', 'spoiler_text'], ''),
visibility: getState().getIn(['compose', 'privacy'])
}, {
headers: {
'Idempotency-Key': getState().getIn(['compose', 'idempotencyKey'])
}
}).then(function (response) {
dispatch(submitComposeSuccess({ ...response.data }));
// To make the app more responsive, immediately get the status into the columns
dispatch(updateTimeline('home', { ...response.data }));
if (response.data.in_reply_to_id === null && response.data.visibility === 'public') {
if (getState().getIn(['timelines', 'community', 'loaded'])) {
dispatch(updateTimeline('community', { ...response.data }));
}
if (getState().getIn(['timelines', 'public', 'loaded'])) {
dispatch(updateTimeline('public', { ...response.data }));
}
}
}).catch(function (error) {
dispatch(submitComposeFail(error));
});
};
};
|
「/api/v1/statuses」に入力をPOSTして、成功したら各タイムラインを更新です。更新ActionおよびReducerでの処理も淡々と処理が行われているだけなので省略します。
サーバサイドの処理 †
ここまででクライアントサイドでどういう処理が行われているか見てきたので、次はサーバサイドを見ていきます。
app/controllers/api/v1/statuses_controller.rb †
「/api/v1/statuses」にアクセスされたときに呼び出されるのは、StatusesControllerのcreateメソッド、
1
2
3
4
5
6
7
8
9
10
11
12
13
| -
|
|
|
|
|
|
|
|
|
|
|
!
| def create
@status = PostStatusService.new.call(current_user.account,
status_params[:status],
status_params[:in_reply_to_id].blank? ? nil : Status.find(status_params[:in_reply_to_id]),
media_ids: status_params[:media_ids],
sensitive: status_params[:sensitive],
spoiler_text: status_params[:spoiler_text],
visibility: status_params[:visibility],
application: doorkeeper_token.application,
idempotency: request.headers['Idempotency-Key'])
render :show
end
|
Serviceって使ったことないけど昔からあるんですかね?ともかく、PostStatusServiceに制御が移ります。
app/services/post_status_service.rb †
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| -
-
|
|
|
|
|
|
|
|
|
|
|
-
-
|
|
!
|
|
|
-
|
|
|
|
|
|
|
|
!
|
|
|
|
|
|
|
-
|
!
|
|
!
| class PostStatusService < BaseService
def call(account, text, in_reply_to = nil, options = {})
if options[:idempotency].present?
existing_id = redis.get("idempotency:status:#{account.id}:#{options[:idempotency]}")
return Status.find(existing_id) if existing_id
end
media = validate_media!(options[:media_ids])
status = nil
ApplicationRecord.transaction do
status = account.statuses.create!(text: text,
thread: in_reply_to,
sensitive: options[:sensitive],
spoiler_text: options[:spoiler_text] || '',
visibility: options[:visibility],
language: detect_language_for(text, account),
application: options[:application])
attach_media(status, media)
end
process_mentions_service.call(status)
process_hashtags_service.call(status)
LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text.present?
DistributionWorker.perform_async(status.id)
Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
if options[:idempotency].present?
redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id)
end
status
end
|
送信されたトゥート(Status)の保存自体は普通のRailsでのモデル新規作成です。違いがあるのはprocess_*_service.callと*Worker.perform_asyncです。
app/services/process_mentions_service.rb †
まず、process_mentions_serviceです。その名の通り、メンション(リプ)先のアカウントに通知を行っています。ただし、Mastodonでは別インスタンスにいるユーザ(リモートユーザ)宛のメンションもできるのでその部分で少し処理分岐が発生しています。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| -
|
|
-
|
|
|
-
|
|
-
|
|
|
-
-
|
|
|
!
!
|
|
|
|
!
|
-
|
|
-
|
|
|
!
!
!
| class ProcessMentionsService < BaseService
include StreamEntryRenderer
def call(status)
return unless status.local?
status.text.scan(Account::MENTION_RE).each do |match|
username, domain = match.first.split('@')
mentioned_account = Account.find_remote(username, domain)
if mentioned_account.nil? && !domain.nil?
begin
mentioned_account = follow_remote_account_service.call(match.first.to_s)
rescue Goldfinger::Error, HTTP::Error
mentioned_account = nil
end
end
next if mentioned_account.nil?
mentioned_account.mentions.where(status: status).first_or_create(status: status)
end
status.mentions.includes(:account).each do |mention|
mentioned_account = mention.account
if mentioned_account.local?
NotifyService.new.call(mentioned_account, mention)
else
NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id)
end
end
end
|
リモートユーザの場合はリモートユーザがいるインスタンスにデータを投げて(Worker経由でService実行)、受けて処理する(Controller→Worker→Service)ということが行われています。全部追いかけていくと長くなるので省略します。
app/workers/distribution_worker.rb †
次にいくつかWorkerを実行しています。このWorkerはSidekiqというgemを利用しており、処理はバックグラウンドで行われるようです。
DistributionWorkerでは投稿したトゥートをフォロワーなどに配信する処理を行っているようです。
1
2
3
4
5
6
7
8
9
| -
|
|
-
|
|
|
!
!
| class DistributionWorker < ApplicationWorker
include Sidekiq::Worker
def perform(status_id)
FanOutOnWriteService.new.call(Status.find(status_id))
rescue ActiveRecord::RecordNotFound
info("Couldn't find the status")
end
end
|
app/services/fan_out_on_write_service.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| -
-
|
-
|
|
|
|
-
|
|
|
!
|
|
|
|
|
|
|
|
|
!
| class FanOutOnWriteService < BaseService
def call(status)
raise Mastodon::RaceConditionError if status.visibility.nil?
deliver_to_self(status) if status.account.local?
if status.direct_visibility?
deliver_to_mentioned_followers(status)
else
deliver_to_followers(status)
end
return if status.account.silenced? || !status.public_visibility? || status.reblog?
render_anonymous_payload(status)
deliver_to_hashtags(status)
return if status.reply? && status.in_reply_to_account_id != status.account_id
deliver_to_public(status)
end
|
app/workers/pubsubhubbub/distribution_worker.rb †
最後にPubsubhubbubの方のDistributionWorkerです。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| -
|
|
|
|
-
|
|
|
|
|
|
|
|
-
|
|
!
|
|
!
!
| class Pubsubhubbub::DistributionWorker
include Sidekiq::Worker
sidekiq_options queue: 'push'
def perform(stream_entry_id)
stream_entry = StreamEntry.find(stream_entry_id)
return if stream_entry.status&.direct_visibility?
account = stream_entry.account
payload = AtomSerializer.render(AtomSerializer.new.feed(account, [stream_entry]))
domains = account.followers_domains
Subscription.where(account: account).active.select('id, callback_url').find_each do |subscription|
next unless domains.include?(Addressable::URI.parse(subscription.callback_url).host)
Pubsubhubbub::DeliveryWorker.perform_async(subscription.id, payload)
end
rescue ActiveRecord::RecordNotFound
true
end
end
|
これにより連合を組んでいるインスタンスにトゥートが送信される、と思っているのですが、accountってなんだろう。連合組んでるインスタンスにはトゥートがあったら全部飛んでくわけじゃない?もう少し確認が必要に思いますが今回は保留。
おわりに †
今回は新規トゥート送信時のクライアントサイド、サーバサイドの処理を見てきました。クライアントサイドについては今まで見てきたのと同じようなReact, Reduxの処理でした。
一方、サーバサイドではServiceを使用し処理をモジュール化、また、Mastodonの特徴である別インスタンスへの通知を行うためにSidekiqを利用したWorkerが使用されていました。これについては雰囲気こんな感じという読み方しかしていないので誤読があるかもしれません。特に、別インスタンスに新規トゥートを送ってるところは思ってたのと違う(思ってたことが間違ってる可能性もある)ので連合の仕様をちゃんと確認する必要がありそうです。