Mastodonを読む

はじめに

home#indexの処理を見た際など、いくつかのコード中にストリーミングに関する処理がありました。今回はそれらについて見ていき、Mastodonでどのようにストリーミング処理が行われているのかを確認しましょう。

復習:app/controllers/home_controller.rb

home#indexを見た際、ストリーミングに利用するURLが以下のように設定されていました。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
-
|
|
|
|
|
!
  def index
    @body_classes           = 'app-body'
    @token                  = find_or_create_access_token.token
    @web_settings           = Web::Setting.find_by(user: current_user)&.data || {}
    @admin                  = Account.find_local(Setting.site_contact_username)
    @streaming_api_base_url = Rails.configuration.x.streaming_api_base_url
  end

Rails.configuration.xってなんだろ。アプリ独自の設定書くときに使っていいのかな。

indexのレンダリングが行われ、クライアントにはinitial-stateとしてストリーミングAPIのURLが渡されることになります。なお、開発環境だと以下のようになります。

  1
  2
  3
  4
{
  "meta":{
    "streaming_api_base_url":"http://localhost:4000",
    "access_token":"8920c1d8279002fa0f6b74f96bf7c971f997318755a35f5a26b511229a2786b7",

ここで重要なのは、ストリーミングで使われるサーバがMastodon本体とは別、ということです。(Mastodon本体は開発環境の場合、http://localhost:3000になります)

クライアント側:app/assets/javascripts/components/containers/mastdon.jsx

サーバが別ということが確認できましたが、まずはサーバの前にクライアントの確認をしましょう。クライアント処理はReactのMastodonクラスに書かれています。(以下はホームタイムラインですが、ローカルタイムライン、連合タイムラインも処理はほぼ同じです)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
class Mastodon extends React.Component {
 
  componentDidMount() {
    const { locale }  = this.props;
    const streamingAPIBaseURL = store.getState().getIn(['meta', 'streaming_api_base_url']);
    const accessToken = store.getState().getIn(['meta', 'access_token']);
 
    this.subscription = createStream(streamingAPIBaseURL, accessToken, 'user', {
 
      connected () {
        store.dispatch(connectTimeline('home'));
      },
 
      disconnected () {
        store.dispatch(disconnectTimeline('home'));
      },
 
      received (data) {
        switch(data.event) {
        case 'update':
          store.dispatch(updateTimeline('home', JSON.parse(data.payload)));
          break;
        case 'delete':
          store.dispatch(deleteFromTimelines(data.payload));
          break;
        case 'notification':
          store.dispatch(updateNotifications(JSON.parse(data.payload), getMessagesForLocale(locale), locale));
          break;
        }
      },
 
      reconnected () {
        store.dispatch(connectTimeline('home'));
        store.dispatch(refreshTimeline('home'));
        store.dispatch(refreshNotifications());
      }
 
    });
 
    省略
  }

createStreamで接続、引数で渡しているハンドラオブジェクト?っていうのかな?のreceivedでサーバからデータを受信したときにタイムラインの更新を行っている雰囲気です。

app/assets/javascripts/components/stream.jsx

createStreamを確認しましょう。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
  const ws = new WebSocketClient(`${createWebSocketURL(streamingAPIBaseURL)}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`);
 
  ws.onopen      = connected;
  ws.onmessage   = e => received(JSON.parse(e.data));
  ws.onclose     = disconnected;
  ws.onreconnect = reconnected;
 
  return ws;
};

というわけで、「/api/v1/streaming/」というURLにアクセスが行われるようです。

サーバ側:streaming/index.js

ここからサーバ側。「/api/v1/streaming/」で探してみると、streamingディレクトリにindex.jsがあるのに気づきます。ストリーミング処理はJavascript(Node)で書かれているようです。

クライアントからの接続受け付け

最後の方から見ると以下の記述があります。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
-
|
|
|
|
-
-
|
|
|
!
|
-
|
|
|
|
!
!
!
  wss.on('connection', ws => {
    const location = url.parse(ws.upgradeReq.url, true)
    const token    = location.query.access_token
    const req      = { requestId: uuid.v4() }
 
    accountFromToken(token, req, err => {
      if (err) {
        log.error(req.requestId, err)
        ws.close()
        return
      }
 
      switch(location.query.stream) {
      case 'user':
        streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(ws))
        break;
      省略
      }
    })
  })

accountFromTokenは名前の通り、アクセストークンからアカウントのIDを取得しています。

streamFrom

では。streamFromです。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
-
|
|
-
|
|
-
|
|
|
|
|
!
|
-
|
-
|
-
|
!
!
|
|
|
!
  const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => {
    log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`)
 
    const listener = message => {
      const { event, payload, queued_at } = JSON.parse(message)
 
      const transmit = () => {
        const now   = new Date().getTime()
        const delta = now - queued_at;
 
        log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
        output(event, payload)
      }
 
      // Only messages that may require filtering are statuses, since notifications
      // are already personalized and deletes do not matter
      if (needsFiltering && event === 'update') {
        省略
      } else {
        transmit()
      }
    }
 
    subscribe(id, listener)
    attachCloseHandler(id, listener)
  }

listenerを定義しています。listenerではmessageが送られてきたらそれをoutputオブジェクトに書き込むということをしています。これにより、接続先のクライアントに情報が送られるということになります。

subscribe

というわけで今度はsubscribe。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
-
|
|
|
!
  const subscribe = (channel, callback) => {
    log.silly(`Adding listener for ${channel}`)
    subs[channel] = subs[channel] || []
    subs[channel].push(callback)
  }

よくあるイベントリスナー的なコードです。ではこのsubsをどこで使っているのかというと、すぐ上の部分、

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
-
|
|
|
!
 
 
 
-
|
|
|
|
-
|
!
|
|
!
 
 
  const redisClient = redis.createClient({
    host:     process.env.REDIS_HOST     || '127.0.0.1',
    port:     process.env.REDIS_PORT     || 6379,
    password: process.env.REDIS_PASSWORD
  })
 
  const subs = {}
 
  redisClient.on('pmessage', (_, channel, message) => {
    const callbacks = subs[channel]
 
    log.silly(`New message on channel ${channel}`)
 
    if (!callbacks) {
      return
    }
 
    callbacks.forEach(callback => callback(message))
  })
 
  redisClient.psubscribe('timeline:*')

Redisに接続し、Redisからメッセージが飛んで来たら対応するcallback(listener)が呼び出されるようになっています。つまり、Redisへの書き込みが行われることでストリーミングサーバが反応→クライアントにプッシュが行われるようです。なお、psubscribeはpattern subscribeの略のようです。

Redisへの書き込み:app/services/post_status_service.rb

というわけで後はRedisに書き込んでいる場所を探せばいいことになります。実はすでに一回出てきていました。ここからRallsになります。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
-
-
|
|
-
|
|
|
|
|
|
|
|
!
|
|
|
|
|
|
|
|
!
class PostStatusService < BaseService
  def call(account, text, in_reply_to = nil, options = {})
    media  = validate_media!(options[:media_ids])
    status = nil
    ApplicationRecord.transaction do
      status = account.statuses.create!(text: text,
                                        thread: in_reply_to,
                                        sensitive: options[:sensitive],
                                        spoiler_text: options[:spoiler_text] || '',
                                        visibility: options[:visibility],
                                        language: detect_language_for(text, account),
                                        application: options[:application])
      attach_media(status, media)
    end
    process_mentions_service.call(status)
    process_hashtags_service.call(status)
 
    LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text.present?
    DistributionWorker.perform_async(status.id)
    Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
 
    status
  end

いろいろなWorkerが呼ばれていますが、DistributionWorkerを見てみる。

app/works/distribution_worker.rb

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
-
|
|
-
|
|
|
!
!
class DistributionWorker < ApplicationWorker
  include Sidekiq::Worker
 
  def perform(status_id)
    FanOutOnWriteService.new.call(Status.find(status_id))
  rescue ActiveRecord::RecordNotFound
    info("Couldn't find the status")
  end
end

さらにFanOutOnWriteService。

app/services/fan_out_on_write_service.rb

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
-
-
|
-
|
|
|
|
-
|
|
|
!
|
|
|
|
|
|
|
|
|
!
|
-
|
|
!
class FanOutOnWriteService < BaseService
  # Push a status into home and mentions feeds
  # @param [Status] status
  def call(status)
    raise Mastodon::RaceConditionError if status.visibility.nil?
 
    deliver_to_self(status) if status.account.local?
 
    if status.direct_visibility?
      deliver_to_mentioned_followers(status)
    else
      deliver_to_followers(status)
    end
 
    return if status.account.silenced? || !status.public_visibility? || status.reblog?
 
    render_anonymous_payload(status)
    deliver_to_hashtags(status)
 
    return if status.reply? && status.in_reply_to_account_id != status.account_id
 
    deliver_to_public(status)
  end
 
  def deliver_to_self(status)
    Rails.logger.debug "Delivering status #{status.id} to author"
    FeedManager.instance.push(:home, status.account, status)
  end

app/lib/feed_manager.rb

FeedManagerのpushメソッド。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
-
|
|
-
-
!
|
|
|
|
|
!
|
|
!
  def push(timeline_type, account, status)
    timeline_key = key(timeline_type, account.id)
 
    if status.reblog?
      # If the original status is within 40 statuses from top, do not re-insert it into the feed
      rank = redis.zrevrank(timeline_key, status.reblog_of_id)
      return if !rank.nil? && rank < 40
      redis.zadd(timeline_key, status.id, status.reblog_of_id)
    else
      redis.zadd(timeline_key, status.id, status.id)
      trim(timeline_type, account.id)
    end
 
    PushUpdateWorker.perform_async(account.id, status.id)
  end

なかなかしぶとい(笑)。さらにPushUpdateWorkerです。

Everything is expanded.Everything is shortened.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
-
|
|
-
|
|
|
|
|
|
|
!
!
class PushUpdateWorker
  include Sidekiq::Worker
 
  def perform(account_id, status_id)
    account = Account.find(account_id)
    status  = Status.find(status_id)
    message = InlineRenderer.render(status, account, 'api/v1/statuses/show')
 
    Redis.current.publish("timeline:#{account.id}", Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i))
  rescue ActiveRecord::RecordNotFound
    true
  end
end

「timeline:アカウントID」への書き込みがありました。これで、Redisに書き込みが行われ、最終的にクライアントに届くということになります。フォロワーにトゥートが届く仕組みもほぼ同じです。リモートインスタンスにいるフォロワーの場合は仕組みがかなり複雑になっていますが、ひとつずつ確認していけば理解できると思います。

おわりに

今回はストリーミングの処理を見てきました。ストリーミングは次の3つから構成されています。

  • クライアント側の接続、サーバから送られてきたら表示処理
  • サーバ側その1。クライアントへデータを送る部分(Node)
  • サーバ側その2。送るデータの書き込み部分(Rails)

サーバ内での情報の受け渡しにはRedisが使われていました。

ここまでで、初期画面表示、トゥート作成、フォロー、リモートフォロー、ストリーミングと一通りの処理を見てきたことになります。実はそろそろ1.4が出そうですが、これにて「Mastodonを読む」完結としたいと思います。


トップ   編集 凍結 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2017-05-24 (水) 20:40:34 (2755d)