Rui's Blog
  • Rui's Blog/Paper Reading Notes - Introduction
  • Personal Blog
    • Personal Blog - Index
      • How to Create Picture-in-Picture Effect / Video Overlay for a Presentation Video
      • How to Do Your Part to Protect the Environment in Wisconsin
      • How to Get a Driver's License in Wisconsin
      • How to Travel from the U.S. to China onboard AA127 in June 2021
      • How to Transfer Credits Back to UW-Madison
      • Resources on Learning Academic Writing (for Computer Science)
    • Towards applying to CS Ph.D. programs
  • Machine Learning Systems
    • Machine Learning Systems - Index
      • MLSys Papers - Short Notes
      • [2011 NSDI] Dominant Resource Fairness: Fair Allocation of Multiple Resource Types
      • [2014 OSDI] Scaling Distributed Machine Learning with the Parameter Server
      • [2018 OSDI] Gandiva: Introspective Cluster Scheduling for Deep Learning
      • [2018 SIGCOMM] Chameleon: Scalable Adaptation of Video Analytics via Temporal and Cross-camera ...
      • [2018 NIPS] Dynamic Space-Time Scheduling for GPU Inference
      • [2019 ATC] Analysis of Large-Scale Multi-Tenant GPU Clusters for DNN Training Workloads
      • [2019 NSDI] Tiresias: A GPU Cluster Manager for Distributed Deep Learning
      • [2019 SOSP] ByteScheduler: A Generic Communication Scheduler for Distributed DNN Training ...
      • [2019 SOSP] PipeDream: Generalized Pipeline Parallelism for DNN Training
      • [2019 SOSP] Parity Models: Erasure-Coded Resilience for Prediction Serving Systems
      • [2019 NIPS] GPipe: Efficient Training of Giant Neural Networks using Pipeline Parallelism
      • [2019 SC] ZeRO: memory optimizations toward training trillion parameter models
      • [2020 OSDI] Gavel: Heterogeneity-Aware Cluster Scheduling Policies for Deep Learning Workloads
      • [2020 OSDI] AntMan: Dynamic Scaling on GPU Clusters for Deep Learning
      • [2020 OSDI] BytePS: A High Performance and Generic Framework for Distributed DNN Training
      • [2020 SIGCOMM] Reducto: On-Camera Filtering for Resource-Efficient Real-Time Video Analytics
        • [2020 MLSys] Salus: Fine-Grained GPU Sharing Primitives for Deep Learning Applications
      • [2020 EuroSys] AlloX: Compute Allocation in Hybrid Clusters
      • [2020 VLDB] PyTorch Distributed: Experiences on Accelerating Data Parallel Training
      • [2020 NetAI] Is Network the Bottleneck of Distributed Training?
      • [2020 NSDI] Themis: Fair and Efficient GPU Cluster Scheduling
      • [2021 MLSys] Accordion: Adaptive Gradient Communication via Critical Learning Regime Identification
      • [2021 VLDB] Analyzing and Mitigating Data Stalls in DNN Training
      • [2021 FAST] CheckFreq: Frequent, Fine-Grained DNN Checkpointing
      • [2021 EuroMLSys] Interference-Aware Scheduling for Inference Serving
      • [2021 OSDI] Pollux: Co-adaptive Cluster Scheduling for Goodput-Optimized Deep Learning
      • [2021 MLSys] Wavelet: Efficient DNN Training with Tick-Tock Scheduling
      • [2021 NSDI] SwitchML: Scaling Distributed Machine Learning with In-Network Aggregation
    • Big Data Systems - Index
      • Big Data Systems Papers - Short Notes
      • [2003 SOSP] The Google File System
      • [2004 OSDI] MapReduce: Simplified Data Processing on Large Clusters
      • [2010 SIGMOD] Pregel: A System for Large-Scale Graph Processing
      • [2011 NSDI] Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center
      • [2012 NSDI] Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster ...
      • [2012 OSDI] PowerGraph: Distributed Graph-Parallel Computation on Natural Graphs
      • [2019 FAST] DistCache: Provable Load Balancing for Large-Scale Storage Systems with Distributed...
      • [2021 HotOS] From Cloud Computing to Sky Computing
      • [2021 EuroSys] NextDoor: Accelerating graph sampling for graph machine learning using GPUs
  • Earlier Readings & Notes
    • High Performance Computing Course Notes
      • Lecture 1: Course Overview
      • Lecture 2: From Code to Instructions. The FDX Cycle. Instruction Level Parallelism.
      • Lecture 3: Superscalar architectures. Measuring Computer Performance. Memory Aspects.
      • Lecture 4: The memory hierarchy. Caches.
      • Lecture 5: Caches, wrap up. Virtual Memory.
      • Lecture 6: The Walls to Sequential Computing. Moore’s Law.
      • Lecture 7: Parallel Computing. Flynn's Taxonomy. Amdahl's Law.
      • Lecture 8: GPU Computing Intro. The CUDA Programming Model. CUDA Execution Configuration.
      • Lecture 9: GPU Memory Spaces
      • Lecture 10: GPU Scheduling Issues.
      • Lecture 11: Execution Divergence. Control Flow in CUDA. CUDA Shared Memory Issues.
      • Lecture 12: Global Memory Access Patterns and Implications.
      • Lecture 13: Atomic operations in CUDA. GPU ode optimization rules of thumb.
      • Lecture 14: CUDA Case Studies. (1) 1D Stencil Operation. (2) Vector Reduction in CUDA.
      • Lecture 15: CUDA Case Studies. (3) Parallel Prefix Scan on the GPU. Using Multiple Streams in CUDA.
      • Lecture 16: Streams, and overlapping data copy with execution.
      • Lecture 17: GPU Computing: Advanced Features.
      • Lecture 18: GPU Computing with thrust and cub.
      • Lecture 19: Hardware aspects relevant in multi-core, shared memory parallel computing.
      • Lecture 20: Multi-core Parallel Computing with OpenMP. Parallel Regions.
      • Lecture 21: OpenMP Work Sharing.
      • Lecture 22: OpenMP Work Sharing
      • Lecture 23: OpenMP NUMA Aspects. Caching and OpenMP.
      • Lecture 24: Critical Thinking. Code Optimization Aspects.
      • Lecture 25: Computing with Supercomputers.
      • Lecture 26: MPI Parallel Programming General Introduction. Point-to-Point Communication.
      • Lecture 27: MPI Parallel Programming Point-to-Point communication: Blocking vs. Non-blocking sends.
      • Lecture 28: MPI Parallel Programming: MPI Collectives. Overview of topics covered in the class.
    • Cloud Computing Course Notes
      • 1.1 Introduction to Clouds, MapReduce
      • 1.2 Gossip, Membership, and Grids
      • 1.3 P2P Systems
      • 1.4 Key-Value Stores, Time, and Ordering
      • 1.5 Classical Distributed Algorithms
      • 4.1 Spark, Hortonworks, HDFS, CAP
      • 4.2 Large Scale Data Storage
    • Operating Systems Papers - Index
      • CS 736 @ UW-Madison Fall 2020 Reading List
      • All File Systems Are Not Created Equal: On the Complexity of Crafting Crash-Consistent Applications
      • ARC: A Self-Tuning, Low Overhead Replacement Cache
      • A File is Not a File: Understanding the I/O Behavior of Apple Desktop Applications
      • Biscuit: The benefits and costs of writing a POSIX kernel in a high-level language
      • Data Domain: Avoiding the Disk Bottleneck in the Data Domain Deduplication File System
      • Disco: Running Commodity Operating Systems on Scalable Multiprocessors
      • FFS: A Fast File System for UNIX
      • From WiscKey to Bourbon: A Learned Index for Log-Structured Merge Trees
      • LegoOS: A Disseminated, Distributed OS for Hardware Resource Disaggregation
      • LFS: The Design and Implementation of a Log-Structured File System
      • Lottery Scheduling: Flexible Proportional-Share Resource Management
      • Memory Resource Management in VMware ESX Server
      • Monotasks: Architecting for Performance Clarity in Data Analytics Frameworks
      • NFS: Sun's Network File System
      • OptFS: Optimistic Crash Consistency
      • RAID: A Case for Redundant Arrays of Inexpensive Disks
      • RDP: Row-Diagonal Parity for Double Disk Failure Correction
      • Resource Containers: A New Facility for Resource Management in Server Systems
      • ReVirt: Enabling Intrusion Analysis through Virtual-Machine Logging and Replay
      • Scheduler Activations: Effective Kernel Support for the User-Level Management of Parallelism
      • SnapMirror: File-System-Based Asynchronous Mirroring for Disaster Recovery
      • The Linux Scheduler: a Decade of Wasted Cores
      • The Unwritten Contract of Solid State Drives
      • Venti: A New Approach to Archival Storage
    • Earlier Notes
      • How to read a paper
  • FIXME
    • Template for Paper Reading Notes
Powered by GitBook
On this page
  • One-line Summary
  • Paper Structure Outline
  • Background & Motivation
  • Design
  • The Pregel Programming Model
  • Combiners
  • Aggregators
  • Topology Mutations
  • Implementation
  • Pregel Architecture
  • Fault Tolerance
  • Example Workloads
  • PageRank
  • Shortest Paths
  • Evaluation
  • Links

Was this helpful?

  1. Machine Learning Systems
  2. Big Data Systems - Index

[2010 SIGMOD] Pregel: A System for Large-Scale Graph Processing

One-line Summary

Pregel is a computational model/message passing abstraction that allows users to express many graph algorithms with ease. In Pregel, vertex programs run in sequences of super-steps, and the programming model is essentially "thinking like a vertex".

Paper Structure Outline

  1. Introduction

  2. Model of Computation

  3. The C++ API

    1. Message Passing

    2. Combiners

    3. Aggregators

    4. Topology Mutations

    5. Input and Output

  4. Implementation

    1. Basic Architecture

    2. Fault Tolerance

    3. Worker Implementation

    4. Master Implementation

    5. Aggregators

  5. Applications

    1. PageRank

    2. Shortest Paths

    3. Bipartite Matching

    4. Semi-Clustering

  6. Experiments

  7. Related Work

  8. Conclusions and Future Work

Background & Motivation

Graphs are getting bigger (in terms of #vertices/#edges). A list of challenges in implementing large-scale graph processing algorithms is as follows.

Pregel is a system for processing large-scale graphs in a distributed fashion. Its API allows for arbitrary graph algorithms to be expressed with ease.

Design

The Pregel Programming Model

Pregel computations consist of a sequence of iterations, called supersteps. During a superstep the framework invokes a userdefined function for each vertex, conceptually in parallel. The function specifies behavior at a single vertex V and a single superstep S. It can read messages sent to V in superstep S − 1, send messages to other vertices that will be received at superstep S + 1, and modify the state of V and its outgoing edges. Messages are typically sent along outgoing edges, but a message may be sent to any vertex whose identifier is known. Algorithm termination is based on every vertex voting to halt. In superstep 0, every vertex is in the active state; all active vertices participate in the computation of any given superstep. A vertex deactivates itself by voting to halt. This means that the vertex has no further work to do unless triggered externally, and the Pregel framework will not execute that vertex in subsequent supersteps unless it receives a message. If reactivated by a message, a vertex must explicitly deactivate itself again. The algorithm as a whole terminates when all vertices are simultaneously inactive and there are no messages in transit.

Combiners

To reduce the overhead when sending a message, Pregel provides combiners, which are user-defined functions that allow multiple messages to be coalesced, reducing the message traffic. Note that combiners should only be enabled for commutative and associative operations, as there is no guarantee about {which messages are combined, the order of combining, the groupings presented to the combiner, etc}.

Aggregators

Aggregators are used for global information exchange:

Each vertex can provide a value to an aggregator in superstep S, the system combines those values using a reduction operator, and the resulting value is made available to all vertices in superstep S + 1.

Topology Mutations

An example use case is clustering algorithms, in which each cluster might be replaced with a single vertex. In the Compute() function, requests can be issued to add/remove vertices/edges.

Implementation

Pregel Architecture

A graph is partitioned into partitions, each containing a set of vertices and their outgoing edges. The default partitioning function is a hash function, while users can also use custom assignment functions to better exploit locality (e.g., colocating vertices representing pages of the same site).

The execution stages are as follows:

FIXME: This is the part where I get a bit confused -- if the user input is loaded in step 3, then how come we can already determine the partitions in step 2?

  1. Many copies of the user program begin executing on a cluster of machines. One of the copies acts as the master to coordinate worker activities.

  2. The master determines the number of partitions and assigns partitions to machines. It is possible to have multiple partitions per worker for parallelism & load balancing. Each worker maintains the state of its partition, executes Compute(), and manages messages between workers.

  3. Each worker gets assigned a portion of the user's input by the master. If a worker loads a vertex that belongs to that worker's section of the graph, then "Aal Izz Well". Otherwise, the worker enqueues a message to the remote peer that owns the vertex.

  4. The master instructs each worker to do a superstep. Compute() is called for each active vertex. Messages are sent asynchronously to overlap computation and communication. When a worker finishes, it reports to the master, telling it how many vertices will be active in the next superstep. This step iterates until all vertices are inactive.

  5. The computation halts and the master may instruct each worker to save its portion of the graph.

Fault Tolerance

At the beginning of a superstep, the master instructs the workers to save the partition states (vertex/edge values, incoming messages) to persistent storage. The checkpoint frequency is determined using a mean time to failure model. A heartbeat mechanism is used to detect failures, either for a worker to terminate or for the master to mark a worker as failed. After a failure, the master reassigns partitions to the currently available workers, each of which loads from the checkpoint.

Example Workloads

PageRank

Shortest Paths

See more examples (bipartite matching, semi-clustering) in the paper :_D

Evaluation

Links

Previous[2004 OSDI] MapReduce: Simplified Data Processing on Large ClustersNext[2011 NSDI] Mesos: A Platform for Fine-Grained Resource Sharing in the Data Center

Last updated 2 years ago

Was this helpful?

Paper PDF
Reading notes by the morning paper
A simple example where the largest value is propageted to every vertex.
The Pregel API
In this example, as the receiving vertex of a message only needs to know the shortest distance (the minimum of all distances sent by neighboring vertices), a combiner can be utilized.
SSSP: Single-source shortest paths. Scalability! Let's go!
In a realistic setting (log-normal random graphs), the performance also scales.