Mastodonを読む

はじめに

前回までで「/」にアクセスしたとき(ログイン済みとする)に初めに表示される画面の読解は終わりました。今回はトゥートをしたときに行われる処理について見ていきたいと思います。

クライアントサイドの処理

app/assets/javascripts/components/features/compose/index.jsx

PCの場合にしろ、スマホの場合にしろ、トゥートを行うためのコンポーネントはComposeクラスが使用されます。

compose.jpg

renderメソッドについては何度も見ているので省略します。

app/assets/javascripts/components/features/compose/containers/compose_form_container.jsx

コンポーネントとActionをつなぐ記述を確認します。ConposeFormContainerに書かれています。必要なところだけ抜き出すと、

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
import { connect } from 'react-redux';
import ComposeForm from '../components/compose_form';
import {
  changeCompose,
  submitCompose,
  clearComposeSuggestions,
  fetchComposeSuggestions,
  selectComposeSuggestion,
  changeComposeSpoilerText,
  insertEmojiCompose
} from '../../../actions/compose';
 
const mapDispatchToProps = (dispatch) => ({
 
  onChange (text) {
    dispatch(changeCompose(text));
  },
 
  onSubmit () {
    dispatch(submitCompose());
  },
 
  他のイベントハンドラ
 
});
 
export default connect(mapStateToProps, mapDispatchToProps)(ComposeForm);

ところで、jsxは全く知らないのですが、「export default」としておくとその右にあるものがConposeFormContainerとして使われるって認識でいいんですよね?

app/assets.javascripts/components/actions/compose.jsx

というわけで、submitCompose。ちょっと長めですがやってることはそこまで難しくありません。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
export function submitCompose() {
  return function (dispatch, getState) {
    const status = emojione.shortnameToUnicode(getState().getIn(['compose', 'text'], ''));
    if (!status || !status.length) {
      return;
    }
    dispatch(submitComposeRequest());
    api(getState).post('/api/v1/statuses', {
      status,
      in_reply_to_id: getState().getIn(['compose', 'in_reply_to'], null),
      media_ids: getState().getIn(['compose', 'media_attachments']).map(item => item.get('id')),
      sensitive: getState().getIn(['compose', 'sensitive']),
      spoiler_text: getState().getIn(['compose', 'spoiler_text'], ''),
      visibility: getState().getIn(['compose', 'privacy'])
    }, {
      headers: {
        'Idempotency-Key': getState().getIn(['compose', 'idempotencyKey'])
      }
    }).then(function (response) {
      dispatch(submitComposeSuccess({ ...response.data }));
 
      // To make the app more responsive, immediately get the status into the columns
      dispatch(updateTimeline('home', { ...response.data }));
 
      if (response.data.in_reply_to_id === null && response.data.visibility === 'public') {
        if (getState().getIn(['timelines', 'community', 'loaded'])) {
          dispatch(updateTimeline('community', { ...response.data }));
        }
 
        if (getState().getIn(['timelines', 'public', 'loaded'])) {
          dispatch(updateTimeline('public', { ...response.data }));
        }
      }
    }).catch(function (error) {
      dispatch(submitComposeFail(error));
    });
  };
};

「/api/v1/statuses」に入力をPOSTして、成功したら各タイムラインを更新です。更新ActionおよびReducerでの処理も淡々と処理が行われているだけなので省略します。

サーバサイドの処理

ここまででクライアントサイドでどういう処理が行われているか見てきたので、次はサーバサイドを見ていきます。

app/controllers/api/v1/statuses_controller.rb

「/api/v1/statuses」にアクセスされたときに呼び出されるのは、StatusesControllerのcreateメソッド、

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
-
|
|
|
|
|
|
|
|
|
|
|
!
  def create
    @status = PostStatusService.new.call(current_user.account,
                                         status_params[:status],
                                         status_params[:in_reply_to_id].blank? ? nil : Status.find(status_params[:in_reply_to_id]),
                                         media_ids: status_params[:media_ids],
                                         sensitive: status_params[:sensitive],
                                         spoiler_text: status_params[:spoiler_text],
                                         visibility: status_params[:visibility],
                                         application: doorkeeper_token.application,
                                         idempotency: request.headers['Idempotency-Key'])
 
    render :show
  end

Serviceって使ったことないけど昔からあるんですかね?ともかく、PostStatusServiceに制御が移ります。

app/services/post_status_service.rb

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
-
-
|
|
|
|
|
|
|
|
|
|
|
-
-
|
|
!
|
|
|
-
|
|
|
|
|
|
|
|
!
|
|
|
|
|
|
|
-
|
!
|
|
!
class PostStatusService < BaseService
  # Post a text status update, fetch and notify remote users mentioned
  # @param [Account] account Account from which to post
  # @param [String] text Message
  # @param [Status] in_reply_to Optional status to reply to
  # @param [Hash] options
  # @option [Boolean] :sensitive
  # @option [String] :visibility
  # @option [String] :spoiler_text
  # @option [Enumerable] :media_ids Optional array of media IDs to attach
  # @option [Doorkeeper::Application] :application
  # @option [String] :idempotency Optional idempotency key
  # @return [Status]
  def call(account, text, in_reply_to = nil, options = {})
    if options[:idempotency].present?
      existing_id = redis.get("idempotency:status:#{account.id}:#{options[:idempotency]}")
      return Status.find(existing_id) if existing_id
    end
 
    media  = validate_media!(options[:media_ids])
    status = nil
    ApplicationRecord.transaction do
      status = account.statuses.create!(text: text,
                                        thread: in_reply_to,
                                        sensitive: options[:sensitive],
                                        spoiler_text: options[:spoiler_text] || '',
                                        visibility: options[:visibility],
                                        language: detect_language_for(text, account),
                                        application: options[:application])
      attach_media(status, media)
    end
    process_mentions_service.call(status)
    process_hashtags_service.call(status)
 
    LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text.present?
    DistributionWorker.perform_async(status.id)
    Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
 
    if options[:idempotency].present?
      redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id)
    end
 
    status
  end

送信されたトゥート(Status)の保存自体は普通のRailsでのモデル新規作成です。違いがあるのはprocess_*_service.callと*Worker.perform_asyncです。

app/services/process_mentions_service.rb

まず、process_mentions_serviceです。その名の通り、メンション(リプ)先のアカウントに通知を行っています。ただし、Mastodonでは別インスタンスにいるユーザ(リモートユーザ)宛のメンションもできるのでその部分で少し処理分岐が発生しています。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
-
|
|
-
|
|
|
-
|
|
-
|
|
|
-
-
|
|
|
!
!
|
|
|
|
!
|
-
|
|
-
|
|
|
!
!
!
class ProcessMentionsService < BaseService
  include StreamEntryRenderer
 
  # Scan status for mentions and fetch remote mentioned users, create
  # local mention pointers, send Salmon notifications to mentioned
  # remote users
  # @param [Status] status
  def call(status)
    return unless status.local?
 
    status.text.scan(Account::MENTION_RE).each do |match|
      username, domain  = match.first.split('@')
      mentioned_account = Account.find_remote(username, domain)
 
      if mentioned_account.nil? && !domain.nil?
        begin
          mentioned_account = follow_remote_account_service.call(match.first.to_s)
        rescue Goldfinger::Error, HTTP::Error
          mentioned_account = nil
        end
      end
 
      next if mentioned_account.nil?
 
      mentioned_account.mentions.where(status: status).first_or_create(status: status)
    end
 
    status.mentions.includes(:account).each do |mention|
      mentioned_account = mention.account
 
      if mentioned_account.local?
        NotifyService.new.call(mentioned_account, mention)
      else
        NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id)
      end
    end
  end

リモートユーザの場合はリモートユーザがいるインスタンスにデータを投げて(Worker経由でService実行)、受けて処理する(Controller→Worker→Service)ということが行われています。全部追いかけていくと長くなるので省略します。

app/workers/distribution_worker.rb

次にいくつかWorkerを実行しています。このWorkerはSidekiqというgemを利用しており、処理はバックグラウンドで行われるようです。

DistributionWorkerでは投稿したトゥートをフォロワーなどに配信する処理を行っているようです。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
-
|
|
-
|
|
|
!
!
class DistributionWorker < ApplicationWorker
  include Sidekiq::Worker
 
  def perform(status_id)
    FanOutOnWriteService.new.call(Status.find(status_id))
  rescue ActiveRecord::RecordNotFound
    info("Couldn't find the status")
  end
end

app/services/fan_out_on_write_service.rb

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
-
-
|
-
|
|
|
|
-
|
|
|
!
|
|
|
|
|
|
|
|
|
!
class FanOutOnWriteService < BaseService
  # Push a status into home and mentions feeds
  # @param [Status] status
  def call(status)
    raise Mastodon::RaceConditionError if status.visibility.nil?
 
    deliver_to_self(status) if status.account.local?
 
    if status.direct_visibility?
      deliver_to_mentioned_followers(status)
    else
      deliver_to_followers(status)
    end
 
    return if status.account.silenced? || !status.public_visibility? || status.reblog?
 
    render_anonymous_payload(status)
    deliver_to_hashtags(status)
 
    return if status.reply? && status.in_reply_to_account_id != status.account_id
 
    deliver_to_public(status)
  end

app/workers/pubsubhubbub/distribution_worker.rb

最後にPubsubhubbubの方のDistributionWorkerです。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
-
|
|
|
|
-
|
|
|
|
|
|
|
|
-
|
|
!
|
|
!
!
class Pubsubhubbub::DistributionWorker
  include Sidekiq::Worker
 
  sidekiq_options queue: 'push'
 
  def perform(stream_entry_id)
    stream_entry = StreamEntry.find(stream_entry_id)
 
    return if stream_entry.status&.direct_visibility?
 
    account = stream_entry.account
    payload = AtomSerializer.render(AtomSerializer.new.feed(account, [stream_entry]))
    domains = account.followers_domains
 
    Subscription.where(account: account).active.select('id, callback_url').find_each do |subscription|
      next unless domains.include?(Addressable::URI.parse(subscription.callback_url).host)
      Pubsubhubbub::DeliveryWorker.perform_async(subscription.id, payload)
    end
  rescue ActiveRecord::RecordNotFound
    true
  end
end

これにより連合を組んでいるインスタンスにトゥートが送信される、と思っているのですが、accountってなんだろう。連合組んでるインスタンスにはトゥートがあったら全部飛んでくわけじゃない?もう少し確認が必要に思いますが今回は保留。

おわりに

今回は新規トゥート送信時のクライアントサイド、サーバサイドの処理を見てきました。クライアントサイドについては今まで見てきたのと同じようなReact, Reduxの処理でした。

一方、サーバサイドではServiceを使用し処理をモジュール化、また、Mastodonの特徴である別インスタンスへの通知を行うためにSidekiqを利用したWorkerが使用されていました。これについては雰囲気こんな感じという読み方しかしていないので誤読があるかもしれません。特に、別インスタンスに新規トゥートを送ってるところは思ってたのと違う(思ってたことが間違ってる可能性もある)ので連合の仕様をちゃんと確認する必要がありそうです。


添付ファイル: filecompose.jpg 688件 [詳細]

トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2017-05-06 (土) 22:40:40 (944d)