Explain BlueSky's data model to me, what is a PDS and how does it work?
Thread (12 posts)
BlueSky's Data Model and PDS Architecture
BlueSky is built on the AT Protocol (Authenticated Transfer Protocol), which uses a fundamentally different approach than traditional social media platforms. Let me break down the key components:
Core Data Model
Records as Merkle Trees: Every piece of content (posts, likes, follows, etc.) is stored as a cryptographically signed record in a Merkle tree structure. This ensures data integrity and enables efficient synchronization.
Content Addressing: Each record has a unique identifier (AT-URI) that looks like:
at://did:plc:user123/app.bsky.feed.post/record456
Personal Data Server (PDS)
A PDS is your personal data repository - think of it as "your server" in the network.
What it stores:
- All your posts, likes, follows, and other social data
- Your cryptographic identity and signing keys
- Your account metadata and preferences
How it works:
- Data Ownership: You control your PDS - you can migrate between PDS providers while keeping your identity and data
- Synchronization: Your PDS continuously syncs with other services in the network
- Authentication: Uses DIDs (Decentralized Identifiers) so your identity isn't tied to any single server
The Bigger Picture
[Your PDS] ←→ [Relay Network] ←→ [App View] ←→ [Client App]
- Relay: Aggregates data from many PDSes for efficient querying
- App View: Provides the social media experience (feeds, notifications, etc.)
- Client: The app you actually use (web, mobile, etc.)
Key Benefits
- Portability: Switch providers without losing your data or social graph
- Algorithmic Choice: Different app views can offer different algorithms
- Censorship Resistance: No single point of control
- Composability: Third-party developers can build on the open protocol
This architecture separates data storage, aggregation, and presentation - allowing for a more open and competitive social media ecosystem.
Building on AT Protocol: App Architecture
When building on ATProto, you have several architectural choices depending on your app's needs. Here's how to structure your application:
Core Components You'll Need
1. Lexicon Schema Definition
First, define your app's data types using ATProto's Lexicon format:
{
"lexicon": 1,
"id": "com.yourapp.post",
"defs": {
"main": {
"type": "record",
"record": {
"type": "object",
"properties": {
"text": {"type": "string"},
"createdAt": {"type": "string", "format": "datetime"}
}
}
}
}
}
2. Choose Your Architecture Pattern
Option A: Lightweight Client (Recommended for most apps)
[Your App] ←→ [Existing PDS] ←→ [ATProto Relay] ←→ [Your App View]
Structure:
- Frontend: React/Vue/etc. client
- Backend: Simple app view server (Node.js/Python/Go)
- Data: Read from existing PDSes via ATProto APIs
Pros: Fastest to build, leverages existing infrastructure Cons: Dependent on existing PDS providers
Option B: Full Stack with Custom PDS
[Your App] ←→ [Your PDS] ←→ [ATProto Relay] ←→ [Your App View]
Structure:
- Frontend: Your client app
- PDS Layer: Custom PDS implementation
- App View: Your algorithmic/feed layer
- Database: PostgreSQL/etc. for your app-specific data
Implementation Strategy
1. Start with the ATProto SDK
import { BskyAgent } from '@atproto/api'
const agent = new BskyAgent({ service: 'https://bsky.social' })
await agent.login({ identifier: 'user', password: 'pass' })
2. Define Your Data Layer
// Create records in users' repositories
await agent.com.atproto.repo.createRecord({
repo: agent.session.did,
collection: 'com.yourapp.post',
record: {
text: 'Hello ATProto!',
createdAt: new Date().toISOString()
}
})
3. Build Your App View
// Subscribe to firehose for real-time updates
const subscription = new WebSocket('wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos')
subscription.onmessage = (event) => {
// Process new records and update your app's state
}
Recommended Tech Stack
Backend
- Node.js with
@atproto/api
and@atproto/lexicon
- Database: PostgreSQL for relational data, Redis for caching
- Queue: Bull/Agenda for background processing
Frontend
- React/Next.js or Vue/Nuxt for web
- React Native or Flutter for mobile
- ATProto Web SDK for protocol interactions
Development Phases
Phase 1: Prototype
- Use existing PDS (bsky.social)
- Build basic read/write functionality
- Focus on your app's unique value proposition
Phase 2: Scale
- Consider running your own PDS
- Implement custom algorithms and feeds
- Add real-time features via firehose
Phase 3: Production
- Handle identity verification
- Implement moderation tools
- Scale your infrastructure
Key Considerations
- Identity: Users bring their own DIDs, so design for portability
- Moderation: Build your own moderation layer for your app view
- Caching: ATProto data is eventually consistent, plan accordingly
- Rate Limits: Respect PDS and relay rate limits
The beauty of ATProto is that you can start simple and gradually take on more infrastructure as your app grows!
App View Functions in ATProto
Your App View sits between the raw ATProto data and your users, providing the "social media experience." Here are the key functions it should handle:
Core Data Processing
1. Record Ingestion & Filtering
// Listen to firehose and filter relevant records
function processFirehoseEvent(event: CommitEvent) {
for (const op of event.ops) {
if (op.action === 'create' && op.path.includes('app.bsky.feed.post')) {
indexPost(op.record)
}
}
}
2. Data Indexing & Storage
- Posts: Text content, media, replies, quotes
- Social Graph: Follows, blocks, mutes
- Interactions: Likes, reposts, replies
- User Profiles: Display names, avatars, bios
Feed Generation & Algorithms
3. Timeline Construction
async function buildTimeline(userDid: string, algorithm: string) {
const follows = await getFollows(userDid)
const posts = await getRecentPosts(follows)
switch (algorithm) {
case 'chronological':
return posts.sort((a, b) => b.createdAt - a.createdAt)
case 'engagement':
return posts.sort((a, b) => b.likeCount - a.likeCount)
// Your custom algorithms here
}
}
4. Content Discovery
- Trending topics: Analyze hashtags, mentions
- Recommended users: Suggest follows based on network
- Popular posts: Surface high-engagement content
User Experience Features
5. Search & Query
async function searchPosts(query: string, filters: SearchFilters) {
return await searchIndex.search({
query,
filters: {
language: filters.lang,
dateRange: filters.dateRange,
hasMedia: filters.hasMedia
}
})
}
6. Notifications
- Mentions: When users are tagged
- Interactions: Likes, reposts, replies to user's content
- Follows: New followers
- Custom events: App-specific notifications
7. Content Moderation
async function moderateContent(post: Post) {
const labels = await runModerationChecks(post)
return {
...post,
labels,
visibility: determineVisibility(labels),
warnings: generateContentWarnings(labels)
}
}
Performance & Scaling
8. Caching Strategy
// Cache frequently accessed data
const feedCache = new Redis()
async function getCachedFeed(userId: string) {
const cached = await feedCache.get(`feed:${userId}`)
if (cached) return JSON.parse(cached)
const feed = await buildTimeline(userId, 'default')
await feedCache.setex(`feed:${userId}`, 300, JSON.stringify(feed))
return feed
}
9. Background Processing
- Engagement calculations: Update post metrics
- Graph analysis: Compute recommendations
- Content preprocessing: Extract metadata, generate thumbnails
API Endpoints
10. RESTful API Design
// Core feed endpoints
app.get('/xrpc/app.bsky.feed.getTimeline', getTimeline)
app.get('/xrpc/app.bsky.feed.getAuthorFeed', getAuthorFeed)
app.get('/xrpc/app.bsky.feed.getPostThread', getPostThread)
// Search endpoints
app.get('/xrpc/app.bsky.feed.searchPosts', searchPosts)
// User endpoints
app.get('/xrpc/app.bsky.actor.getProfile', getProfile)
app.get('/xrpc/app.bsky.actor.searchActors', searchActors)
Advanced Features
11. Real-time Updates
// WebSocket for live updates
io.on('connection', (socket) => {
socket.on('subscribe:timeline', (userId) => {
socket.join(`timeline:${userId}`)
})
})
// Broadcast new posts to subscribers
function broadcastNewPost(post: Post) {
post.mentionedUsers.forEach(userId => {
io.to(`timeline:${userId}`).emit('new_post', post)
})
}
12. Analytics & Metrics
- User engagement: Time spent, interaction rates
- Content performance: Reach, engagement, virality
- System health: Response times, error rates
App View Architecture Pattern
class AppView {
constructor(
private firehose: FirehoseSubscription,
private database: Database,
private cache: CacheService,
private moderator: ModerationService
) {}
async start() {
// Subscribe to ATProto firehose
this.firehose.on('commit', this.handleCommit.bind(this))
// Start background workers
this.startFeedBuilder()
this.startNotificationProcessor()
}
private async handleCommit(commit: Commit) {
// Process incoming ATProto records
await this.indexRecords(commit.ops)
await this.updateAffectedFeeds(commit.ops)
await this.triggerNotifications(commit.ops)
}
}
The key is that your App View transforms raw ATProto data into the specific user experience your app provides - whether that's a Twitter-like feed, a specialized community, or something entirely new!
Does filtering the firehose imply that my app iew has to have a constant web socket connection to the firehose? That seems like it could take up a lot of bandwidth.
Firehose Bandwidth & Alternative Approaches
You're absolutely right to be concerned about bandwidth! The firehose can be quite heavy - it includes every record created across the entire ATProto network. Let me break down the options:
Firehose Reality Check
Bandwidth usage:
- Full firehose: ~1-5 MB/s (and growing as the network scales)
- Includes all posts, likes, follows, blocks, etc. from all users
- Most of this data is probably irrelevant to your specific app
Alternative Approaches
1. Selective Firehose Filtering
// Filter at the connection level
const firehose = new WebSocket('wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos')
firehose.onmessage = (event) => {
const commit = parseCommit(event.data)
// Only process records you care about
const relevantOps = commit.ops.filter(op => {
return op.collection === 'app.bsky.feed.post' ||
op.collection === 'com.yourapp.customRecord'
})
if (relevantOps.length > 0) {
processOperations(relevantOps)
}
}
2. Polling Strategy (Lower bandwidth)
// Poll for updates periodically instead of real-time
async function pollForUpdates() {
const since = await getLastSyncTimestamp()
// Query specific repositories you care about
const repos = await getRelevantRepositories()
for (const repo of repos) {
const updates = await agent.com.atproto.sync.getRepo({
did: repo.did,
since: since
})
await processUpdates(updates)
}
}
// Run every 30 seconds instead of real-time
setInterval(pollForUpdates, 30000)
3. Hybrid Approach
class OptimizedIngestion {
constructor() {
this.firehoseConnected = false
this.pollInterval = null
}
async start() {
// Try firehose first
try {
await this.connectToFirehose()
this.firehoseConnected = true
} catch (error) {
// Fallback to polling
console.log('Firehose failed, falling back to polling')
this.startPolling()
}
}
async connectToFirehose() {
// Only subscribe to collections you need
const firehose = new WebSocket(
'wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos'
)
firehose.onmessage = this.handleFirehoseMessage.bind(this)
firehose.onerror = () => {
this.firehoseConnected = false
this.startPolling() // Fallback
}
}
}
4. User-Driven Data Loading
// Only fetch data when users actually request it
async function getTimelineOnDemand(userDid: string) {
const follows = await getFollows(userDid)
// Fetch recent posts from followed users
const posts = await Promise.all(
follows.map(async (followDid) => {
return await agent.com.atproto.repo.listRecords({
repo: followDid,
collection: 'app.bsky.feed.post',
limit: 50
})
})
)
return mergePosts(posts)
}
Bandwidth Optimization Strategies
5. Compressed Firehose
// Use compression if available
const firehose = new WebSocket('wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos', {
headers: {
'Accept-Encoding': 'gzip, deflate'
}
})
6. Regional/Clustered Approach
// Connect to relay nodes closer to your users
const relayEndpoints = [
'wss://relay-us-east.bsky.social',
'wss://relay-eu.bsky.social',
'wss://relay-asia.bsky.social'
]
const optimalRelay = await selectOptimalRelay(relayEndpoints)
Recommended Architecture for Most Apps
class EfficientAppView {
constructor() {
this.criticalDataViaFirehose = true // Posts, follows
this.bulkDataViaPolling = true // Likes, profile updates
}
async start() {
// Real-time for critical updates
this.subscribeToFirehose([
'app.bsky.feed.post',
'app.bsky.graph.follow'
])
// Batch processing for less critical data
this.schedulePolling([
'app.bsky.feed.like',
'app.bsky.actor.profile'
], 60000) // Every minute
}
}
Cost-Benefit Analysis
Full Firehose:
- ✅ Real-time updates
- ✅ Complete data coverage
- ❌ High bandwidth costs
- ❌ Processing overhead
Selective/Polling:
- ✅ Lower bandwidth
- ✅ Predictable costs
- ❌ Slight latency
- ❌ More complex logic
For most apps, a hybrid approach works best - use the firehose for critical real-time data and polling for everything else. This gives you the best of both worlds while keeping bandwidth manageable.
Hm. Where would I get the list of repositories to poll though? Doing a for loop over all my users can't be efficient.
Efficient Repository Discovery & Polling
You're absolutely right - polling every user's repository individually would be terrible for performance! Here are the practical approaches:
The Repository Discovery Problem
You need to know which repositories have new data without checking all of them. Here's how to solve it:
1. Use ATProto's Sync Endpoints (Recommended)
// Get a cursor-based feed of recent commits across repos
async function getRecentCommits(cursor?: string) {
const response = await agent.com.atproto.sync.listRepos({
limit: 1000,
cursor: cursor
})
// This gives you repos that have recent activity
const activeRepos = response.data.repos.filter(repo =>
repo.head !== repo.prev // Has new commits
)
return {
repos: activeRepos,
cursor: response.data.cursor
}
}
2. Subscribe to Commit Events (Without Full Firehose)
// Subscribe to just commit metadata, not full records
const commitStream = new WebSocket(
'wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos?wantedCollections=none'
)
commitStream.onmessage = (event) => {
const commit = parseCommit(event.data)
// Just get the repo DID and timestamp
const repoUpdate = {
did: commit.repo,
rev: commit.rev,
time: commit.time
}
// Queue this repo for detailed polling later
await queueRepoForUpdate(repoUpdate)
}
3. Smart Repository Tracking
class RepositoryTracker {
private activeRepos = new Set<string>()
private lastSeen = new Map<string, Date>()
async trackUserActivity(userDid: string) {
// Only track repos of users who are active in your app
this.activeRepos.add(userDid)
this.lastSeen.set(userDid, new Date())
}
async getReposToUpdate(): Promise<string[]> {
const now = new Date()
const cutoff = new Date(now.getTime() - 24 * 60 * 60 * 1000) // 24 hours
// Only poll repos that have been active recently
return Array.from(this.activeRepos).filter(did => {
const lastActivity = this.lastSeen.get(did)
return lastActivity && lastActivity > cutoff
})
}
}
4. Batch Repository Queries
async function batchUpdateRepositories(repoDids: string[]) {
// Process in chunks to avoid overwhelming the network
const chunks = chunkArray(repoDids, 50)
for (const chunk of chunks) {
await Promise.all(chunk.map(async (did) => {
try {
const updates = await agent.com.atproto.sync.getRepo({
did: did,
since: await getLastSyncRev(did)
})
await processRepoUpdates(did, updates)
} catch (error) {
// Handle individual repo failures gracefully
console.log(`Failed to update repo ${did}:`, error)
}
}))
// Rate limiting between chunks
await sleep(100)
}
}
Practical Implementation Strategy
Hybrid Approach with Smart Discovery
class EfficientDataIngestion {
constructor() {
this.repoTracker = new RepositoryTracker()
this.updateQueue = new Queue()
}
async start() {
// 1. Subscribe to lightweight commit stream
this.subscribeToCommitStream()
// 2. Process queued repo updates in batches
this.startBatchProcessor()
// 3. Periodic cleanup of inactive repos
this.startRepoCleanup()
}
private subscribeToCommitStream() {
const stream = new WebSocket(
'wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos'
)
stream.onmessage = (event) => {
const commit = parseCommit(event.data)
// Only queue repos we care about
if (this.repoTracker.isTracked(commit.repo)) {
this.updateQueue.add({
repo: commit.repo,
rev: commit.rev,
priority: this.calculatePriority(commit.repo)
})
}
}
}
private async startBatchProcessor() {
setInterval(async () => {
const batch = await this.updateQueue.getBatch(100)
if (batch.length > 0) {
await this.batchUpdateRepositories(batch)
}
}, 5000) // Process every 5 seconds
}
}
5. User-Driven Repository Discovery
// Only track repos for users who actually use your app
class UserDrivenTracking {
async onUserLogin(userDid: string) {
// Track this user's repo
await this.repoTracker.trackUserActivity(userDid)
// Also track repos of users they follow (for timeline)
const follows = await getFollows(userDid)
for (const followDid of follows) {
await this.repoTracker.trackUserActivity(followDid)
}
}
async onUserViewsProfile(profileDid: string) {
// Opportunistically update this repo since user is interested
await this.updateQueue.add({
repo: profileDid,
priority: 'high'
})
}
}
Performance Optimizations
6. Repository Prioritization
function calculatePriority(repoDid: string): 'high' | 'medium' | 'low' {
// High: Active users in your app
if (this.activeUsers.has(repoDid)) return 'high'
// Medium: Followed by your users
if (this.followedByUsers.has(repoDid)) return 'medium'
// Low: Discovered via network effects
return 'low'
}
7. Incremental Sync
async function incrementalRepoSync(repoDid: string) {
const lastRev = await getLastSyncRev(repoDid)
// Only get changes since last sync
const changes = await agent.com.atproto.sync.getRepo({
did: repoDid,
since: lastRev
})
// Update our local state
await updateLastSyncRev(repoDid, changes.rev)
return changes
}
The key insight: Don't poll all repos blindly. Instead, use the commit stream to discover which repos have updates, then batch-process only those repos. This reduces your polling from O(all users) to O(active users), which is much more manageable!
Makes sense. Can anyone publish data using a particular lexicon? e.g. If someone else has an app can I write my app so it's compatible with their lexicon and publish so that other users on their app can see it?
Lexicon Interoperability in ATProto
Yes, absolutely! This is one of the most powerful features of ATProto - lexicons are open standards that any app can implement. This enables true interoperability between different applications.
How Lexicon Sharing Works
1. Anyone Can Use Any Lexicon
// Your app can write records using BlueSky's lexicons
await agent.com.atproto.repo.createRecord({
repo: userDid,
collection: 'app.bsky.feed.post', // BlueSky's lexicon
record: {
text: 'Hello from my custom app!',
createdAt: new Date().toISOString()
}
})
// Or someone else's custom lexicon
await agent.com.atproto.repo.createRecord({
repo: userDid,
collection: 'com.otherapp.recipe', // Another app's lexicon
record: {
title: 'Chocolate Cake',
ingredients: ['flour', 'sugar', 'cocoa'],
instructions: 'Mix and bake...'
}
})
2. Cross-App Compatibility Example
// App A defines a "book review" lexicon
const bookReviewLexicon = {
"lexicon": 1,
"id": "com.bookapp.review",
"defs": {
"main": {
"type": "record",
"record": {
"type": "object",
"properties": {
"book": {"type": "string"},
"rating": {"type": "integer"},
"review": {"type": "string"}
}
}
}
}
}
// App B can read and write the same lexicon
class BookCompatibleApp {
async publishReview(book: string, rating: number, review: string) {
// Uses App A's lexicon
await this.agent.com.atproto.repo.createRecord({
repo: this.userDid,
collection: 'com.bookapp.review',
record: { book, rating, review }
})
}
async showReviews() {
// Reads reviews from ALL apps using this lexicon
const reviews = await this.agent.com.atproto.repo.listRecords({
repo: this.userDid,
collection: 'com.bookapp.review'
})
return reviews
}
}
Real-World Interoperability Scenarios
3. Social Media Compatibility
// Your specialized app can still participate in the broader social graph
class NicheApp {
async publishPost(content: string) {
// Publish using standard BlueSky lexicon
await this.agent.com.atproto.repo.createRecord({
repo: this.userDid,
collection: 'app.bsky.feed.post',
record: {
text: content,
createdAt: new Date().toISOString()
}
})
// Also publish using your specialized lexicon
await this.agent.com.atproto.repo.createRecord({
repo: this.userDid,
collection: 'com.nicheapp.specialpost',
record: {
content: content,
category: 'technical',
tags: ['programming', 'atproto']
}
})
}
}
4. Building on Existing Ecosystems
// Photography app that's compatible with general social media
class PhotoApp {
async sharePhoto(imageBlob: Blob, caption: string) {
// Upload image first
const imageRef = await this.uploadImage(imageBlob)
// Create standard social media post
await this.agent.com.atproto.repo.createRecord({
repo: this.userDid,
collection: 'app.bsky.feed.post',
record: {
text: caption,
embed: {
$type: 'app.bsky.embed.images',
images: [{ image: imageRef, alt: caption }]
},
createdAt: new Date().toISOString()
}
})
// Also create detailed photo metadata
await this.agent.com.atproto.repo.createRecord({
repo: this.userDid,
collection: 'com.photoapp.photo',
record: {
image: imageRef,
caption,
camera: 'Canon EOS R5',
settings: { iso: 100, aperture: 'f/2.8' },
location: { lat: 40.7128, lng: -74.0060 }
}
})
}
}
Cross-App Discovery
5. Multi-Lexicon App Views
class UniversalFeedApp {
async buildUniversalFeed(userDid: string) {
const follows = await this.getFollows(userDid)
const allContent = []
// Collect content from multiple lexicons
for (const followDid of follows) {
// Standard posts
const posts = await this.getRecords(followDid, 'app.bsky.feed.post')
// Music posts
const musicPosts = await this.getRecords(followDid, 'com.musicapp.track')
// Recipe posts
const recipes = await this.getRecords(followDid, 'com.recipeapp.recipe')
allContent.push(...posts, ...musicPosts, ...recipes)
}
return allContent.sort((a, b) =>
new Date(b.createdAt) - new Date(a.createdAt)
)
}
}
Best Practices for Lexicon Compatibility
6. Multi-Lexicon Publishing Strategy
class CompatibleApp {
async publishContent(content: any) {
const records = []
// Always publish to standard lexicons for broad compatibility
if (content.type === 'text') {
records.push({
collection: 'app.bsky.feed.post',
record: {
text: content.text,
createdAt: new Date().toISOString()
}
})
}
// Also publish to your specialized lexicon
records.push({
collection: 'com.yourapp.content',
record: {
...content,
appVersion: '1.0.0',
metadata: this.generateMetadata(content)
}
})
// Batch create all records
await Promise.all(records.map(r =>
this.agent.com.atproto.repo.createRecord({
repo: this.userDid,
...r
})
))
}
}
7. Lexicon Discovery
// Discover what lexicons a user's repository supports
async function discoverUserCapabilities(userDid: string) {
const collections = await agent.com.atproto.repo.listRecords({
repo: userDid,
limit: 1000
})
const supportedLexicons = new Set()
collections.records.forEach(record => {
supportedLexicons.add(record.uri.split('/')[4]) // Extract collection
})
return Array.from(supportedLexicons)
}
The Power of Open Standards
This interoperability means:
- Users own their data across all apps
- Network effects benefit everyone
- Specialized apps can still participate in the broader ecosystem
- Innovation happens at the application layer, not the protocol layer
So yes, you can absolutely build an app that reads and writes to other apps' lexicons, creating a truly interoperable social ecosystem!