Technical Specification

Thunder & Candidate Pipeline 仕様書

X (Twitter) アルゴリズムにおける Thunder(インネットワーク投稿ストア)と Candidate Pipeline Framework の詳細仕様です。

1. Thunder(インネットワーク投稿ストア)

1.1 概要と目的

Thunder は、ユーザーがフォローしているアカウント(インネットワーク)からの投稿をリアルタイムで保持・提供するための高性能インメモリストアです。

高速アクセス

フォロー中ユーザーの最新投稿への高速アクセスを実現

インネットワーク候補提供

ホームタイムラインにおけるインネットワーク候補の即座提供

リアルタイム処理

Kafka からのリアルタイムイベント処理

投稿タイプ別管理

オリジナル、リプライ/リツイート、動画の効率的な管理

アーキテクチャ上の位置づけ

Thunder は Home Mixer の候補パイプラインにおいて ThunderSource として統合され、 フォロー中ユーザーからの投稿候補を提供する主要なデータソースの一つです。

1.2 Kafka からのリアルタイムインジェスト

Thunder は Kafka からツイートイベントを消費し、リアルタイムでインメモリストアを更新します。

イベント処理フロー
Kafka (tweet_events)
Consumer
Deserializer
PostStore

サポートされるイベントタイプ:

  • TweetCreateEvent: 新規投稿の作成
  • TweetDeleteEvent: 投稿の削除
  • QuotedTweetDeleteEvent: 引用ツイートの削除
マルチスレッド処理rust
// パーティション分散によるスレッド処理
let num_partitions = args.tweet_events_num_partitions;
let kafka_num_threads = args.kafka_num_threads;
let partitions_per_thread = num_partitions.div_ceil(kafka_num_threads);

各スレッドは割り当てられたパーティションのサブセットを処理し、以下の機能を持ちます:

  • バッチ処理(設定可能なバッチサイズ)
  • パーティションラグ監視
  • エラー時の自動リトライ
メッセージバッチ処理rust
// バッチが一定サイズに達したら処理を実行
if message_buffer.len() >= batch_size {
    let messages = std::mem::take(&mut message_buffer);
    process_message_batch(messages, batch_num, producer_clone, post_retention_sec).await?;
    consumer.write().await.commit_offsets()?;
}
初期化フェーズrust
// Kafka catchup 完了を待機
for _ in 0..args.kafka_num_threads {
    rx.recv().await;
}
post_store.finalize_init().await?;

1.3 ユーザー別投稿ストア構造

PostStore は DashMap を使用した並行安全なインメモリデータ構造です。

PostStore データ構造rust
pub struct PostStore {
    /// 投稿ID から完全な投稿データへのマップ
    posts: Arc<DashMap<i64, LightPost>>,

    /// ユーザーID からオリジナル投稿参照リストへのマップ
    original_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,

    /// ユーザーID からリプライ/リツイート参照リストへのマップ
    secondary_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,

    /// ユーザーID から動画投稿参照リストへのマップ
    video_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,

    /// 削除された投稿のトラッキング
    deleted_posts: Arc<DashMap<i64, bool>>,

    /// 保持期間(秒)
    retention_seconds: u64,

    /// リクエストタイムアウト
    request_timeout: Duration,
}

TinyPost(軽量参照)

ユーザータイムラインには最小限の参照情報のみを保持

rust
pub struct TinyPost {
    pub post_id: i64,
    pub created_at: i64,
}

LightPost(完全な投稿データ)

投稿の詳細情報を保持(protobuf で定義):

  • - post_id, author_id, created_at
  • - in_reply_to_post_id/user_id
  • - is_retweet, is_reply
  • - source_post_id/user_id
  • - has_video, conversation_id

1.4 投稿タイプ(オリジナル、リプライ/リツイート、動画)

Thunder は投稿を3つのカテゴリに分類して管理します:

1. オリジナル投稿 (original_posts_by_user)

リプライでもリツイートでもない投稿

判定: !post.is_reply && !post.is_retweet

2. セカンダリ投稿 (secondary_posts_by_user)

リプライまたはリツイート。追加のフィルタリングロジック適用(フォロー中ユーザーへのリプライを優先)

3. 動画投稿 (video_posts_by_user)

動画コンテンツを含む投稿。最小動画長の条件あり、リツイートの元投稿の動画も考慮、リプライは除外

投稿分類ロジックrust
// 投稿挿入時の分類
if is_original {
    original_posts_by_user.entry(author_id).or_default().push_back(tiny_post);
} else {
    secondary_posts_by_user.entry(author_id).or_default().push_back(tiny_post);
}

// 動画投稿の判定
let mut video_eligible = post.has_video;
if !video_eligible && post.is_retweet {
    if let Some(source_post) = self.posts.get(&source_post_id) {
        video_eligible = !source_post.is_reply && source_post.has_video;
    }
}
if post.is_reply {
    video_eligible = false;
}
制限項目説明
MAX_ORIGINAL_POSTS_PER_AUTHORオリジナル投稿の最大数
MAX_REPLY_POSTS_PER_AUTHORリプライ/リツイートの最大数
MAX_VIDEO_POSTS_PER_AUTHOR動画投稿の最大数
MAX_TINY_POSTS_PER_USER_SCANスキャン時の最大参照数

1.5 保持期間と自動トリム

Thunder は設定可能な保持期間に基づいて古い投稿を自動的に削除します。

保持期間の設定rust
// デフォルト: 2日間
impl Default for PostStore {
    fn default() -> Self {
        Self::new(2 * 24 * 60 * 60, 0)  // 2日 = 172,800秒
    }
}
自動トリムタスクrust
// バックグラウンドで定期的に古い投稿を削除
pub fn start_auto_trim(self: Arc<Self>, interval_minutes: u64) {
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60));
        loop {
            interval.tick().await;
            let trimmed = self.trim_old_posts().await;
            if trimmed > 0 {
                info!("Auto-trim: removed {} old posts", trimmed);
            }
        }
    });
}

トリム処理の詳細:

  • - 各ユーザーのタイムラインを走査
  • - 保持期間を超えた投稿を VecDeque の先頭から削除
  • - 空になったユーザーエントリを削除
  • - メモリ効率化のためキャパシティを縮小

1.6 gRPC サービス(InNetworkPostsService)

Thunder は gRPC を通じて投稿データを提供します。

サービス定義rust
#[tonic::async_trait]
impl InNetworkPostsService for ThunderServiceImpl {
    async fn get_in_network_posts(
        &self,
        request: Request<GetInNetworkPostsRequest>,
    ) -> Result<Response<GetInNetworkPostsResponse>, Status>
}
パラメータ説明
user_idリクエストユーザーID
following_user_idsフォロー中ユーザーIDリスト
exclude_tweet_ids除外する投稿IDリスト
max_results返却する最大投稿数
is_video_request動画投稿のみを要求するフラグ
debugデバッグモードフラグ
圧縮対応rust
pub fn server(self) -> InNetworkPostsServiceServer<Self> {
    InNetworkPostsServiceServer::new(self)
        .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
        .send_compressed(tonic::codec::CompressionEncoding::Zstd)
}

統計分析とメトリクス:

  • - 最新投稿からの経過時間(freshness)
  • - 投稿の時間範囲
  • - リプライ比率
  • - ユニーク投稿者数
  • - 投稿者あたりの投稿数

1.7 負荷制御(セマフォ、同時接続制限)

Thunder は過負荷保護のためのメカニズムを実装しています。

同時リクエスト制限rust
pub struct ThunderServiceImpl {
    post_store: Arc<PostStore>,
    strato_client: Arc<StratoClient>,
    /// 同時リクエスト数を制限するセマフォ
    request_semaphore: Arc<Semaphore>,
}
リクエスト処理時の制御rust
// ノンブロッキングでセマフォ取得を試行
let _permit = match self.request_semaphore.try_acquire() {
    Ok(permit) => {
        IN_FLIGHT_REQUESTS.inc();
        permit
    }
    Err(_) => {
        REJECTED_REQUESTS.inc();
        return Err(Status::resource_exhausted(
            "Server at capacity, please retry",
        ));
    }
};

入力リスト制限

  • - フォロー中ユーザーリストの制限
  • - 除外リストの制限
  • - MAX_INPUT_LIST_SIZE で上限設定

メトリクス監視

  • - IN_FLIGHT_REQUESTS: 処理中
  • - REJECTED_REQUESTS: 拒否数
  • - POST_STORE_REQUESTS: 総数
  • - POST_STORE_REQUEST_TIMEOUTS: タイムアウト

2. Candidate Pipeline Framework

2.1 概要と設計思想

Candidate Pipeline Framework は、推薦候補の取得から最終選択までの処理を構造化されたパイプラインとして実装するためのフレームワークです。

モジュラー構成

各処理ステージが独立したコンポーネント

並列実行

可能な限り処理を並列化して性能向上

エラー耐性

個別コンポーネントの失敗がパイプライン全体を停止させない

拡張性

トレイトベースの設計で新しいコンポーネントを容易に追加可能

パイプラインステージ
1
QueryHydratorクエリの補完
2
Source候補取得
3
Hydratorデータ補完
4
Filterフィルタリング
5
Scorerスコアリング
6
Selector最終選択
7
PostSelectionHydrator選択後のデータ補完
8
PostSelectionFilter選択後のフィルタリング
パイプライン実行フロー

Query → QueryHydrator → Sources → Hydrators → Filters

→ Scorers → Selector → PostSelectionHydrators

→ PostSelectionFilters → SideEffects → Result

2.2 トレイト定義

1Source(候補取得)

外部システムから候補を取得するコンポーネント。全ての有効な Source が並列実行され、エラー発生時はログを記録し継続します。

Source トレイトrust
#[async_trait]
pub trait Source<Q, C>: Any + Send + Sync
where
    Q: Clone + Send + Sync + 'static,
    C: Clone + Send + Sync + 'static,
{
    /// このソースを有効にするか判定
    fn enable(&self, _query: &Q) -> bool { true }

    /// 候補を取得
    async fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>;

    /// ログ/メトリクス用の名前
    fn name(&self) -> &'static str;
}

2Hydrator(データ補完)

候補に追加データを付与するコンポーネント。全ての有効な Hydrator が並列実行されます。

Hydrator トレイトrust
#[async_trait]
pub trait Hydrator<Q, C>: Any + Send + Sync
{
    fn enable(&self, _query: &Q) -> bool { true }

    /// 候補にデータを補完
    /// 重要: 返却ベクターは入力と同じ順序・同じ候補数を維持すること
    async fn hydrate(&self, query: &Q, candidates: &[C]) -> Result<Vec<C>, String>;

    /// 単一候補にハイドレート結果を適用
    fn update(&self, candidate: &mut C, hydrated: C);

    fn name(&self) -> &'static str;
}

3Filter(フィルタリング)

候補を条件に基づいて保持/除外に分類するコンポーネント。Filter は順次実行されます。

Filter トレイトrust
pub struct FilterResult<C> {
    pub kept: Vec<C>,    // 保持された候補
    pub removed: Vec<C>, // 除外された候補
}

#[async_trait]
pub trait Filter<Q, C>: Any + Send + Sync
{
    fn enable(&self, _query: &Q) -> bool { true }

    /// 候補をフィルタリング
    async fn filter(&self, query: &Q, candidates: Vec<C>) -> Result<FilterResult<C>, String>;

    fn name(&self) -> &'static str;
}

4Scorer(スコアリング)

候補にスコアを付与するコンポーネント。Scorer は順次実行されます。

Scorer トレイトrust
#[async_trait]
pub trait Scorer<Q, C>: Send + Sync
{
    fn enable(&self, _query: &Q) -> bool { true }

    /// 候補にスコアを付与
    async fn score(&self, query: &Q, candidates: &[C]) -> Result<Vec<C>, String>;

    fn update(&self, candidate: &mut C, scored: C);

    fn name(&self) -> &'static str;
}

5Selector(選択)

最終的な候補選択を行うコンポーネント。スコアに基づいてソートし上位N件を選択。

Selector トレイトrust
pub trait Selector<Q, C>: Send + Sync
{
    fn select(&self, _query: &Q, candidates: Vec<C>) -> Vec<C> {
        let mut sorted = self.sort(candidates);
        if let Some(limit) = self.size() {
            sorted.truncate(limit);
        }
        sorted
    }

    fn enable(&self, _query: &Q) -> bool { true }
    fn score(&self, candidate: &C) -> f64;
    fn sort(&self, candidates: Vec<C>) -> Vec<C>;
    fn size(&self) -> Option<usize> { None }
    fn name(&self) -> &'static str;
}

6SideEffect(副作用処理)

パイプライン結果に影響を与えない非同期処理。別タスクで並列実行され、結果の返却をブロックしません。

SideEffect トレイトrust
#[async_trait]
pub trait SideEffect<Q, C>: Send + Sync
{
    fn enable(&self, _query: Arc<Q>) -> bool { true }

    /// 副作用を実行
    async fn run(&self, input: Arc<SideEffectInput<Q, C>>) -> Result<(), String>;

    fn name(&self) -> &'static str;
}

2.3 並列実行とエラーハンドリング

並列実行パターン

  • - QueryHydrator: 全て並列実行
  • - Source: 全て並列実行
  • - Hydrator: 全て並列実行
  • - Filter: 順次実行
  • - Scorer: 順次実行
  • - SideEffect: 別タスクで並列

エラーハンドリング戦略

  • - 継続戦略: エラー時も他は続行
  • - ログ記録: request_id付きで記録
  • - フォールバック: 元リストを保持
  • - 長さ検証: 不一致時はスキップ
Source の並列実行rust
async fn fetch_candidates(&self, query: &Q) -> Vec<C> {
    let sources: Vec<_> = self.sources()
        .iter()
        .filter(|s| s.enable(query))
        .collect();

    // 全ソースを並列実行
    let source_futures = sources.iter().map(|s| s.get_candidates(query));
    let results = join_all(source_futures).await;

    // 結果を収集
    let mut collected = Vec::new();
    for (source, result) in sources.iter().zip(results) {
        match result {
            Ok(mut candidates) => collected.append(&mut candidates),
            Err(err) => error!("Source {} failed: {}", source.name(), err),
        }
    }
    collected
}
Filter のエラー時フォールバックrust
// エラー時のフォールバック例
match filter.filter(query, candidates).await {
    Ok(result) => {
        candidates = result.kept;
        all_removed.extend(result.removed);
    }
    Err(err) => {
        error!("Filter {} failed: {}", filter.name(), err);
        candidates = backup;  // 元のリストを復元
    }
}

2.4 パイプライン構成のカスタマイズ

CandidatePipeline トレイトを実装することで、カスタムパイプラインを構築できます。

CandidatePipeline トレイトrust
#[async_trait]
pub trait CandidatePipeline<Q, C>: Send + Sync
{
    fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<Q>>];
    fn sources(&self) -> &[Box<dyn Source<Q, C>>];
    fn hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
    fn filters(&self) -> &[Box<dyn Filter<Q, C>>];
    fn scorers(&self) -> &[Box<dyn Scorer<Q, C>>];
    fn selector(&self) -> &dyn Selector<Q, C>;
    fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
    fn post_selection_filters(&self) -> &[Box<dyn Filter<Q, C>>];
    fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<Q, C>>>>;
    fn result_size(&self) -> usize;
}
パイプライン実行の流れrust
async fn execute(&self, query: Q) -> PipelineResult<Q, C> {
    // 1. クエリ補完
    let hydrated_query = self.hydrate_query(query).await;

    // 2. 候補取得
    let candidates = self.fetch_candidates(&hydrated_query).await;

    // 3. データ補完
    let hydrated_candidates = self.hydrate(&hydrated_query, candidates).await;

    // 4. フィルタリング
    let (kept_candidates, filtered_candidates) =
        self.filter(&hydrated_query, hydrated_candidates).await;

    // 5. スコアリング
    let scored_candidates = self.score(&hydrated_query, kept_candidates).await;

    // 6. 選択
    let selected_candidates = self.select(&hydrated_query, scored_candidates);

    // 7. 選択後処理
    let post_hydrated = self.hydrate_post_selection(&hydrated_query, selected_candidates).await;
    let (final_candidates, _) = self.filter_post_selection(&hydrated_query, post_hydrated).await;

    // 8. 結果サイズ制限
    final_candidates.truncate(self.result_size());

    // 9. 副作用実行(非同期、結果待機なし)
    self.run_side_effects(input);

    PipelineResult { ... }
}
PipelineResultrust
pub struct PipelineResult<Q, C> {
    pub retrieved_candidates: Vec<C>,   // 取得された全候補
    pub filtered_candidates: Vec<C>,     // 除外された候補
    pub selected_candidates: Vec<C>,     // 最終選択された候補
    pub query: Arc<Q>,                   // 補完後のクエリ
}

3. Thunder と Candidate Pipeline の統合

Thunder は Candidate Pipeline の Source として統合されます。

ThunderSource の役割

  1. 1Thunder gRPC クライアントを通じてインネットワーク投稿を取得
  2. 2取得した投稿を PostCandidate に変換
  3. 3Home Mixer の候補プールに追加
統合フロー
User Request → ScoredPostsQuery
PhoenixCandidatePipeline.execute()

Sources(並列実行)

PhoenixSource

OON候補

ThunderSource

インネットワーク候補

[候補結合] → Hydrators → Filters → Scorers → Selector
Final Timeline

パーソナライズされたホームタイムライン

この統合により、フォロー中ユーザーからの最新投稿と、アルゴリズムで発見されたアウトオブネットワークコンテンツが適切にブレンドされ、パーソナライズされたホームタイムラインが生成されます。

関連ドキュメント

X レコメンドアルゴリズム - オープンソース ドキュメント