[[Mastodonを読む]]
 
 #contents
 
 *はじめに [#ybf3408f]
 
 home#indexの処理を見た際など、いくつかのコード中にストリーミングに関する処理がありました。今回はそれらについて見ていき、Mastodonでどのようにストリーミング処理が行われているのかを確認しましょう。
 
 *復習:app/controllers/home_controller.rb [#p8a42340]
 
 home#indexを見た際、ストリーミングに利用するURLが以下のように設定されていました。
 
 #code(Ruby){{
   def index
     @body_classes           = 'app-body'
     @token                  = find_or_create_access_token.token
     @web_settings           = Web::Setting.find_by(user: current_user)&.data || {}
     @admin                  = Account.find_local(Setting.site_contact_username)
     @streaming_api_base_url = Rails.configuration.x.streaming_api_base_url
   end
 }}
 
 Rails.configuration.xってなんだろ。アプリ独自の設定書くときに使っていいのかな。
 
 indexのレンダリングが行われ、クライアントにはinitial-stateとしてストリーミングAPIのURLが渡されることになります。なお、開発環境だと以下のようになります。
 
 #code{{
 {
   "meta":{
     "streaming_api_base_url":"http://localhost:4000",
     "access_token":"8920c1d8279002fa0f6b74f96bf7c971f997318755a35f5a26b511229a2786b7",
 }}
 
 ここで重要なのは、ストリーミングで使われるサーバがMastodon本体とは別、ということです。(Mastodon本体は開発環境の場合、http://localhost:3000になります)
 
 *クライアント側:app/assets/javascripts/components/containers/mastdon.jsx [#e6c5a731]
 
 サーバが別ということが確認できましたが、まずはサーバの前にクライアントの確認をしましょう。クライアント処理はReactのMastodonクラスに書かれています。(以下はホームタイムラインですが、ローカルタイムライン、連合タイムラインも処理はほぼ同じです)
 
 #code{{
 class Mastodon extends React.Component {
 
   componentDidMount() {
     const { locale }  = this.props;
     const streamingAPIBaseURL = store.getState().getIn(['meta', 'streaming_api_base_url']);
     const accessToken = store.getState().getIn(['meta', 'access_token']);
 
     this.subscription = createStream(streamingAPIBaseURL, accessToken, 'user', {
 
       connected () {
         store.dispatch(connectTimeline('home'));
       },
 
       disconnected () {
         store.dispatch(disconnectTimeline('home'));
       },
 
       received (data) {
         switch(data.event) {
         case 'update':
           store.dispatch(updateTimeline('home', JSON.parse(data.payload)));
           break;
         case 'delete':
           store.dispatch(deleteFromTimelines(data.payload));
           break;
         case 'notification':
           store.dispatch(updateNotifications(JSON.parse(data.payload), getMessagesForLocale(locale), locale));
           break;
         }
       },
 
       reconnected () {
         store.dispatch(connectTimeline('home'));
         store.dispatch(refreshTimeline('home'));
         store.dispatch(refreshNotifications());
       }
 
     });
 
     省略
   }
 }}
 
 createStreamで接続、引数で渡しているハンドラオブジェクト?っていうのかな?のreceivedでサーバからデータを受信したときにタイムラインの更新を行っている雰囲気です。
 
 **app/assets/javascripts/components/stream.jsx [#p927c34c]
 
 createStreamを確認しましょう。
 
 #code{{
 export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
   const ws = new WebSocketClient(`${createWebSocketURL(streamingAPIBaseURL)}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`);
 
   ws.onopen      = connected;
   ws.onmessage   = e => received(JSON.parse(e.data));
   ws.onclose     = disconnected;
   ws.onreconnect = reconnected;
 
   return ws;
 };
 }}
 
 というわけで、「/api/v1/streaming/」というURLにアクセスが行われるようです。
 
 *サーバ側:streaming/index.js [#qe46045f]
 
 ここからサーバ側。「/api/v1/streaming/」で探してみると、streamingディレクトリにindex.jsがあるのに気づきます。ストリーミング処理はJavascript(Node)で書かれているようです。
 
 **クライアントからの接続受け付け [#i22b0c10]
 
 最後の方から見ると以下の記述があります。
 
 #code(javascript){{
   wss.on('connection', ws => {
     const location = url.parse(ws.upgradeReq.url, true)
     const token    = location.query.access_token
     const req      = { requestId: uuid.v4() }
 
     accountFromToken(token, req, err => {
       if (err) {
         log.error(req.requestId, err)
         ws.close()
         return
       }
 
       switch(location.query.stream) {
       case 'user':
         streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(ws))
         break;
       省略
       }
     })
   })
 }}
 
 accountFromTokenは名前の通り、アクセストークンからアカウントのIDを取得しています。
 
 **streamFrom [#e9f4ef39]
 
 では。streamFromです。
 
 #code(javascript){{
   const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => {
     log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`)
 
     const listener = message => {
       const { event, payload, queued_at } = JSON.parse(message)
 
       const transmit = () => {
         const now   = new Date().getTime()
         const delta = now - queued_at;
 
         log.silly(req.requestId, `Transmitting for ${req.accountId}: ${event} ${payload} Delay: ${delta}ms`)
         output(event, payload)
       }
 
       // Only messages that may require filtering are statuses, since notifications
       // are already personalized and deletes do not matter
       if (needsFiltering && event === 'update') {
         省略
       } else {
         transmit()
       }
     }
 
     subscribe(id, listener)
     attachCloseHandler(id, listener)
   }
 }}
 
 listenerを定義しています。listenerではmessageが送られてきたらそれをoutputオブジェクトに書き込むということをしています。これにより、接続先のクライアントに情報が送られるということになります。
 
 **subscribe [#c7aa549c]
 
 というわけで今度はsubscribe。
 
 #code(javascript){{
 
   const subscribe = (channel, callback) => {
     log.silly(`Adding listener for ${channel}`)
     subs[channel] = subs[channel] || []
     subs[channel].push(callback)
   }
 }}
 
 よくあるイベントリスナー的なコードです。ではこのsubsをどこで使っているのかというと、すぐ上の部分、
 
 #code(javascript){{
   const redisClient = redis.createClient({
     host:     process.env.REDIS_HOST     || '127.0.0.1',
     port:     process.env.REDIS_PORT     || 6379,
     password: process.env.REDIS_PASSWORD
   })
 
   const subs = {}
 
   redisClient.on('pmessage', (_, channel, message) => {
     const callbacks = subs[channel]
 
     log.silly(`New message on channel ${channel}`)
 
     if (!callbacks) {
       return
     }
 
     callbacks.forEach(callback => callback(message))
   })
 
   redisClient.psubscribe('timeline:*')
 }}
 
 Redisに接続し、Redisからメッセージが飛んで来たら対応するcallback(listener)が呼び出されるようになっています。つまり、Redisへの書き込みが行われることでストリーミングサーバが反応→クライアントにプッシュが行われるようです。なお、psubscribeはpattern subscribeの略のようです。
 
 *Redisへの書き込み:app/services/post_status_service.rb [#y51fdace]
 
 というわけで後はRedisに書き込んでいる場所を探せばいいことになります。実はすでに一回出てきていました。ここからRallsになります。
 
 #code(Ruby){{
 class PostStatusService < BaseService
   def call(account, text, in_reply_to = nil, options = {})
     media  = validate_media!(options[:media_ids])
     status = nil
     ApplicationRecord.transaction do
       status = account.statuses.create!(text: text,
                                         thread: in_reply_to,
                                         sensitive: options[:sensitive],
                                         spoiler_text: options[:spoiler_text] || '',
                                         visibility: options[:visibility],
                                         language: detect_language_for(text, account),
                                         application: options[:application])
       attach_media(status, media)
     end
     process_mentions_service.call(status)
     process_hashtags_service.call(status)
 
     LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text.present?
     DistributionWorker.perform_async(status.id)
     Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
 
     status
   end
 }}
 
 いろいろなWorkerが呼ばれていますが、DistributionWorkerを見てみる。
 
 app/works/distribution_worker.rb
 #code(Ruby){{
 class DistributionWorker < ApplicationWorker
   include Sidekiq::Worker
 
   def perform(status_id)
     FanOutOnWriteService.new.call(Status.find(status_id))
   rescue ActiveRecord::RecordNotFound
     info("Couldn't find the status")
   end
 end
 }}
 
 さらにFanOutOnWriteService。
 
 app/services/fan_out_on_write_service.rb
 #code(Ruby){{
 class FanOutOnWriteService < BaseService
   # Push a status into home and mentions feeds
   # @param [Status] status
   def call(status)
     raise Mastodon::RaceConditionError if status.visibility.nil?
 
     deliver_to_self(status) if status.account.local?
 
     if status.direct_visibility?
       deliver_to_mentioned_followers(status)
     else
       deliver_to_followers(status)
     end
 
     return if status.account.silenced? || !status.public_visibility? || status.reblog?
 
     render_anonymous_payload(status)
     deliver_to_hashtags(status)
 
     return if status.reply? && status.in_reply_to_account_id != status.account_id
 
     deliver_to_public(status)
   end
 
   def deliver_to_self(status)
     Rails.logger.debug "Delivering status #{status.id} to author"
     FeedManager.instance.push(:home, status.account, status)
   end
 }}
 
 **app/lib/feed_manager.rb [#u274292c]
 
 FeedManagerのpushメソッド。
 
 #code(Ruby){{
   def push(timeline_type, account, status)
     timeline_key = key(timeline_type, account.id)
 
     if status.reblog?
       # If the original status is within 40 statuses from top, do not re-insert it into the feed
       rank = redis.zrevrank(timeline_key, status.reblog_of_id)
       return if !rank.nil? && rank < 40
       redis.zadd(timeline_key, status.id, status.reblog_of_id)
     else
       redis.zadd(timeline_key, status.id, status.id)
       trim(timeline_type, account.id)
     end
 
     PushUpdateWorker.perform_async(account.id, status.id)
   end
 }}
 
 なかなかしぶとい(笑)。さらにPushUpdateWorkerです。
 
 #code(Ruby){{
 class PushUpdateWorker
   include Sidekiq::Worker
 
   def perform(account_id, status_id)
     account = Account.find(account_id)
     status  = Status.find(status_id)
     message = InlineRenderer.render(status, account, 'api/v1/statuses/show')
 
     Redis.current.publish("timeline:#{account.id}", Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i))
   rescue ActiveRecord::RecordNotFound
     true
   end
 end
 }}
 
 「timeline:アカウントID」への書き込みがありました。これで、Redisに書き込みが行われ、最終的にクライアントに届くということになります。フォロワーにトゥートが届く仕組みもほぼ同じです。リモートインスタンスにいるフォロワーの場合は仕組みがかなり複雑になっていますが、ひとつずつ確認していけば理解できると思います。
 
 *おわりに [#te19abd0]
 
 今回はストリーミングの処理を見てきました。ストリーミングは次の3つから構成されています。
 
 -クライアント側の接続、サーバから送られてきたら表示処理
 -サーバ側その1。クライアントへデータを送る部分(Node)
 -サーバ側その2。送るデータの書き込み部分(Rails)
 
 サーバ内での情報の受け渡しにはRedisが使われていました。
 
 ここまでで、初期画面表示、トゥート作成、フォロー、リモートフォロー、ストリーミングと一通りの処理を見てきたことになります。実はそろそろ1.4が出そうですが、これにて「Mastodonを読む」完結としたいと思います。
 

トップ   編集 差分 バックアップ 添付 複製 名前変更 リロード   新規 一覧 単語検索 最終更新   ヘルプ   最終更新のRSS