Thunder & Candidate Pipeline 仕様書
X (Twitter) アルゴリズムにおける Thunder(インネットワーク投稿ストア)と Candidate Pipeline Framework の詳細仕様です。
1. Thunder(インネットワーク投稿ストア)
1.1 概要と目的
Thunder は、ユーザーがフォローしているアカウント(インネットワーク)からの投稿をリアルタイムで保持・提供するための高性能インメモリストアです。
高速アクセス
フォロー中ユーザーの最新投稿への高速アクセスを実現
インネットワーク候補提供
ホームタイムラインにおけるインネットワーク候補の即座提供
リアルタイム処理
Kafka からのリアルタイムイベント処理
投稿タイプ別管理
オリジナル、リプライ/リツイート、動画の効率的な管理
アーキテクチャ上の位置づけ
ThunderSource として統合され、 フォロー中ユーザーからの投稿候補を提供する主要なデータソースの一つです。1.2 Kafka からのリアルタイムインジェスト
Thunder は Kafka からツイートイベントを消費し、リアルタイムでインメモリストアを更新します。
サポートされるイベントタイプ:
TweetCreateEvent: 新規投稿の作成TweetDeleteEvent: 投稿の削除QuotedTweetDeleteEvent: 引用ツイートの削除
// パーティション分散によるスレッド処理
let num_partitions = args.tweet_events_num_partitions;
let kafka_num_threads = args.kafka_num_threads;
let partitions_per_thread = num_partitions.div_ceil(kafka_num_threads);各スレッドは割り当てられたパーティションのサブセットを処理し、以下の機能を持ちます:
- バッチ処理(設定可能なバッチサイズ)
- パーティションラグ監視
- エラー時の自動リトライ
// バッチが一定サイズに達したら処理を実行
if message_buffer.len() >= batch_size {
let messages = std::mem::take(&mut message_buffer);
process_message_batch(messages, batch_num, producer_clone, post_retention_sec).await?;
consumer.write().await.commit_offsets()?;
}// Kafka catchup 完了を待機
for _ in 0..args.kafka_num_threads {
rx.recv().await;
}
post_store.finalize_init().await?;1.3 ユーザー別投稿ストア構造
PostStore は DashMap を使用した並行安全なインメモリデータ構造です。
pub struct PostStore {
/// 投稿ID から完全な投稿データへのマップ
posts: Arc<DashMap<i64, LightPost>>,
/// ユーザーID からオリジナル投稿参照リストへのマップ
original_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
/// ユーザーID からリプライ/リツイート参照リストへのマップ
secondary_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
/// ユーザーID から動画投稿参照リストへのマップ
video_posts_by_user: Arc<DashMap<i64, VecDeque<TinyPost>>>,
/// 削除された投稿のトラッキング
deleted_posts: Arc<DashMap<i64, bool>>,
/// 保持期間(秒)
retention_seconds: u64,
/// リクエストタイムアウト
request_timeout: Duration,
}TinyPost(軽量参照)
ユーザータイムラインには最小限の参照情報のみを保持
pub struct TinyPost {
pub post_id: i64,
pub created_at: i64,
}LightPost(完全な投稿データ)
投稿の詳細情報を保持(protobuf で定義):
- - post_id, author_id, created_at
- - in_reply_to_post_id/user_id
- - is_retweet, is_reply
- - source_post_id/user_id
- - has_video, conversation_id
1.4 投稿タイプ(オリジナル、リプライ/リツイート、動画)
Thunder は投稿を3つのカテゴリに分類して管理します:
1. オリジナル投稿 (original_posts_by_user)
リプライでもリツイートでもない投稿
判定: !post.is_reply && !post.is_retweet2. セカンダリ投稿 (secondary_posts_by_user)
リプライまたはリツイート。追加のフィルタリングロジック適用(フォロー中ユーザーへのリプライを優先)
3. 動画投稿 (video_posts_by_user)
動画コンテンツを含む投稿。最小動画長の条件あり、リツイートの元投稿の動画も考慮、リプライは除外
// 投稿挿入時の分類
if is_original {
original_posts_by_user.entry(author_id).or_default().push_back(tiny_post);
} else {
secondary_posts_by_user.entry(author_id).or_default().push_back(tiny_post);
}
// 動画投稿の判定
let mut video_eligible = post.has_video;
if !video_eligible && post.is_retweet {
if let Some(source_post) = self.posts.get(&source_post_id) {
video_eligible = !source_post.is_reply && source_post.has_video;
}
}
if post.is_reply {
video_eligible = false;
}| 制限項目 | 説明 |
|---|---|
| MAX_ORIGINAL_POSTS_PER_AUTHOR | オリジナル投稿の最大数 |
| MAX_REPLY_POSTS_PER_AUTHOR | リプライ/リツイートの最大数 |
| MAX_VIDEO_POSTS_PER_AUTHOR | 動画投稿の最大数 |
| MAX_TINY_POSTS_PER_USER_SCAN | スキャン時の最大参照数 |
1.5 保持期間と自動トリム
Thunder は設定可能な保持期間に基づいて古い投稿を自動的に削除します。
// デフォルト: 2日間
impl Default for PostStore {
fn default() -> Self {
Self::new(2 * 24 * 60 * 60, 0) // 2日 = 172,800秒
}
}// バックグラウンドで定期的に古い投稿を削除
pub fn start_auto_trim(self: Arc<Self>, interval_minutes: u64) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60));
loop {
interval.tick().await;
let trimmed = self.trim_old_posts().await;
if trimmed > 0 {
info!("Auto-trim: removed {} old posts", trimmed);
}
}
});
}トリム処理の詳細:
- - 各ユーザーのタイムラインを走査
- - 保持期間を超えた投稿を VecDeque の先頭から削除
- - 空になったユーザーエントリを削除
- - メモリ効率化のためキャパシティを縮小
1.6 gRPC サービス(InNetworkPostsService)
Thunder は gRPC を通じて投稿データを提供します。
#[tonic::async_trait]
impl InNetworkPostsService for ThunderServiceImpl {
async fn get_in_network_posts(
&self,
request: Request<GetInNetworkPostsRequest>,
) -> Result<Response<GetInNetworkPostsResponse>, Status>
}| パラメータ | 説明 |
|---|---|
| user_id | リクエストユーザーID |
| following_user_ids | フォロー中ユーザーIDリスト |
| exclude_tweet_ids | 除外する投稿IDリスト |
| max_results | 返却する最大投稿数 |
| is_video_request | 動画投稿のみを要求するフラグ |
| debug | デバッグモードフラグ |
pub fn server(self) -> InNetworkPostsServiceServer<Self> {
InNetworkPostsServiceServer::new(self)
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
.send_compressed(tonic::codec::CompressionEncoding::Zstd)
}統計分析とメトリクス:
- - 最新投稿からの経過時間(freshness)
- - 投稿の時間範囲
- - リプライ比率
- - ユニーク投稿者数
- - 投稿者あたりの投稿数
1.7 負荷制御(セマフォ、同時接続制限)
Thunder は過負荷保護のためのメカニズムを実装しています。
pub struct ThunderServiceImpl {
post_store: Arc<PostStore>,
strato_client: Arc<StratoClient>,
/// 同時リクエスト数を制限するセマフォ
request_semaphore: Arc<Semaphore>,
}// ノンブロッキングでセマフォ取得を試行
let _permit = match self.request_semaphore.try_acquire() {
Ok(permit) => {
IN_FLIGHT_REQUESTS.inc();
permit
}
Err(_) => {
REJECTED_REQUESTS.inc();
return Err(Status::resource_exhausted(
"Server at capacity, please retry",
));
}
};入力リスト制限
- - フォロー中ユーザーリストの制限
- - 除外リストの制限
- - MAX_INPUT_LIST_SIZE で上限設定
メトリクス監視
- - IN_FLIGHT_REQUESTS: 処理中
- - REJECTED_REQUESTS: 拒否数
- - POST_STORE_REQUESTS: 総数
- - POST_STORE_REQUEST_TIMEOUTS: タイムアウト
2. Candidate Pipeline Framework
2.1 概要と設計思想
Candidate Pipeline Framework は、推薦候補の取得から最終選択までの処理を構造化されたパイプラインとして実装するためのフレームワークです。
モジュラー構成
各処理ステージが独立したコンポーネント
並列実行
可能な限り処理を並列化して性能向上
エラー耐性
個別コンポーネントの失敗がパイプライン全体を停止させない
拡張性
トレイトベースの設計で新しいコンポーネントを容易に追加可能
Query → QueryHydrator → Sources → Hydrators → Filters
→ Scorers → Selector → PostSelectionHydrators
→ PostSelectionFilters → SideEffects → Result
2.2 トレイト定義
1Source(候補取得)
外部システムから候補を取得するコンポーネント。全ての有効な Source が並列実行され、エラー発生時はログを記録し継続します。
#[async_trait]
pub trait Source<Q, C>: Any + Send + Sync
where
Q: Clone + Send + Sync + 'static,
C: Clone + Send + Sync + 'static,
{
/// このソースを有効にするか判定
fn enable(&self, _query: &Q) -> bool { true }
/// 候補を取得
async fn get_candidates(&self, query: &Q) -> Result<Vec<C>, String>;
/// ログ/メトリクス用の名前
fn name(&self) -> &'static str;
}2Hydrator(データ補完)
候補に追加データを付与するコンポーネント。全ての有効な Hydrator が並列実行されます。
#[async_trait]
pub trait Hydrator<Q, C>: Any + Send + Sync
{
fn enable(&self, _query: &Q) -> bool { true }
/// 候補にデータを補完
/// 重要: 返却ベクターは入力と同じ順序・同じ候補数を維持すること
async fn hydrate(&self, query: &Q, candidates: &[C]) -> Result<Vec<C>, String>;
/// 単一候補にハイドレート結果を適用
fn update(&self, candidate: &mut C, hydrated: C);
fn name(&self) -> &'static str;
}3Filter(フィルタリング)
候補を条件に基づいて保持/除外に分類するコンポーネント。Filter は順次実行されます。
pub struct FilterResult<C> {
pub kept: Vec<C>, // 保持された候補
pub removed: Vec<C>, // 除外された候補
}
#[async_trait]
pub trait Filter<Q, C>: Any + Send + Sync
{
fn enable(&self, _query: &Q) -> bool { true }
/// 候補をフィルタリング
async fn filter(&self, query: &Q, candidates: Vec<C>) -> Result<FilterResult<C>, String>;
fn name(&self) -> &'static str;
}4Scorer(スコアリング)
候補にスコアを付与するコンポーネント。Scorer は順次実行されます。
#[async_trait]
pub trait Scorer<Q, C>: Send + Sync
{
fn enable(&self, _query: &Q) -> bool { true }
/// 候補にスコアを付与
async fn score(&self, query: &Q, candidates: &[C]) -> Result<Vec<C>, String>;
fn update(&self, candidate: &mut C, scored: C);
fn name(&self) -> &'static str;
}5Selector(選択)
最終的な候補選択を行うコンポーネント。スコアに基づいてソートし上位N件を選択。
pub trait Selector<Q, C>: Send + Sync
{
fn select(&self, _query: &Q, candidates: Vec<C>) -> Vec<C> {
let mut sorted = self.sort(candidates);
if let Some(limit) = self.size() {
sorted.truncate(limit);
}
sorted
}
fn enable(&self, _query: &Q) -> bool { true }
fn score(&self, candidate: &C) -> f64;
fn sort(&self, candidates: Vec<C>) -> Vec<C>;
fn size(&self) -> Option<usize> { None }
fn name(&self) -> &'static str;
}6SideEffect(副作用処理)
パイプライン結果に影響を与えない非同期処理。別タスクで並列実行され、結果の返却をブロックしません。
#[async_trait]
pub trait SideEffect<Q, C>: Send + Sync
{
fn enable(&self, _query: Arc<Q>) -> bool { true }
/// 副作用を実行
async fn run(&self, input: Arc<SideEffectInput<Q, C>>) -> Result<(), String>;
fn name(&self) -> &'static str;
}2.3 並列実行とエラーハンドリング
並列実行パターン
- - QueryHydrator: 全て並列実行
- - Source: 全て並列実行
- - Hydrator: 全て並列実行
- - Filter: 順次実行
- - Scorer: 順次実行
- - SideEffect: 別タスクで並列
エラーハンドリング戦略
- - 継続戦略: エラー時も他は続行
- - ログ記録: request_id付きで記録
- - フォールバック: 元リストを保持
- - 長さ検証: 不一致時はスキップ
async fn fetch_candidates(&self, query: &Q) -> Vec<C> {
let sources: Vec<_> = self.sources()
.iter()
.filter(|s| s.enable(query))
.collect();
// 全ソースを並列実行
let source_futures = sources.iter().map(|s| s.get_candidates(query));
let results = join_all(source_futures).await;
// 結果を収集
let mut collected = Vec::new();
for (source, result) in sources.iter().zip(results) {
match result {
Ok(mut candidates) => collected.append(&mut candidates),
Err(err) => error!("Source {} failed: {}", source.name(), err),
}
}
collected
}// エラー時のフォールバック例
match filter.filter(query, candidates).await {
Ok(result) => {
candidates = result.kept;
all_removed.extend(result.removed);
}
Err(err) => {
error!("Filter {} failed: {}", filter.name(), err);
candidates = backup; // 元のリストを復元
}
}2.4 パイプライン構成のカスタマイズ
CandidatePipeline トレイトを実装することで、カスタムパイプラインを構築できます。
#[async_trait]
pub trait CandidatePipeline<Q, C>: Send + Sync
{
fn query_hydrators(&self) -> &[Box<dyn QueryHydrator<Q>>];
fn sources(&self) -> &[Box<dyn Source<Q, C>>];
fn hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
fn filters(&self) -> &[Box<dyn Filter<Q, C>>];
fn scorers(&self) -> &[Box<dyn Scorer<Q, C>>];
fn selector(&self) -> &dyn Selector<Q, C>;
fn post_selection_hydrators(&self) -> &[Box<dyn Hydrator<Q, C>>];
fn post_selection_filters(&self) -> &[Box<dyn Filter<Q, C>>];
fn side_effects(&self) -> Arc<Vec<Box<dyn SideEffect<Q, C>>>>;
fn result_size(&self) -> usize;
}async fn execute(&self, query: Q) -> PipelineResult<Q, C> {
// 1. クエリ補完
let hydrated_query = self.hydrate_query(query).await;
// 2. 候補取得
let candidates = self.fetch_candidates(&hydrated_query).await;
// 3. データ補完
let hydrated_candidates = self.hydrate(&hydrated_query, candidates).await;
// 4. フィルタリング
let (kept_candidates, filtered_candidates) =
self.filter(&hydrated_query, hydrated_candidates).await;
// 5. スコアリング
let scored_candidates = self.score(&hydrated_query, kept_candidates).await;
// 6. 選択
let selected_candidates = self.select(&hydrated_query, scored_candidates);
// 7. 選択後処理
let post_hydrated = self.hydrate_post_selection(&hydrated_query, selected_candidates).await;
let (final_candidates, _) = self.filter_post_selection(&hydrated_query, post_hydrated).await;
// 8. 結果サイズ制限
final_candidates.truncate(self.result_size());
// 9. 副作用実行(非同期、結果待機なし)
self.run_side_effects(input);
PipelineResult { ... }
}pub struct PipelineResult<Q, C> {
pub retrieved_candidates: Vec<C>, // 取得された全候補
pub filtered_candidates: Vec<C>, // 除外された候補
pub selected_candidates: Vec<C>, // 最終選択された候補
pub query: Arc<Q>, // 補完後のクエリ
}3. Thunder と Candidate Pipeline の統合
Thunder は Candidate Pipeline の Source として統合されます。
ThunderSource の役割
- 1Thunder gRPC クライアントを通じてインネットワーク投稿を取得
- 2取得した投稿を PostCandidate に変換
- 3Home Mixer の候補プールに追加
Sources(並列実行)
PhoenixSource
OON候補
ThunderSource
インネットワーク候補
パーソナライズされたホームタイムライン