# 1.4 Key-Value Stores, Time, and Ordering

## Lesson 1: Key-Value Stores

### Why Key-Value / NoSQL?

* The Key-Value Abstraction
  * Twitter: Tweet ID -> Info about tweet
  * Amazon: Item ID -> Info about it
  * Chase: Account # -> Info about it
* Kind of like a distributed dictionary/DHT in P2P systems
* Kind of like a database
  * Why not use relational DBMS? Mismatch with today's workloads
    * Data: Large and unstructured
    * Lots of random reads and writes from lots of clients
    * Sometimes write-heavy, while RDBMS are often optimized for reads
    * Foreign keys rarely needed
    * Joins frequent
* Demands of today's workloads
  * Speed
  * Avoid Single Point of Failure (SPoF)
  * Low Total Cost of Operation (TCO)
  * Fewer System Administrators
  * Incremental Scalability
  * Scale out, not up
    * Scale up: Grow the cluster capacity by replacing with more powerful machines
    * Scale out: Incrementally grow the cluster capacity by adding more COTS machines (Components Of The Shelf, sweet spot on the price curve)
      * This is cheaper, and we can phase in/out newer/older machines over a long duration
* NoSQL: Not Only SQL
  * Necessary API operations: get(key) and put(key, value)
  * There are tables like in RDBMS systems, but they may be unstructured/may not have schemas/don't always support joins/foreign keys, but they can have index tables
  * Storage: column-oriented storage
    * RDBMS stores an entire row together (on disk or at a server)
    * NoSQL systems store a column (or groups of columns) together
      * Entries within a column are indexed and easy to locate given a key (and vice versa)
      * This makes ranged searches within a column faster (as we don't need to fetch the entire database)
        * E.g., get me all the blog\_ids from the blog table that were updated within the past month&#x20;

![NoSQL structure](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MdyfgTkrmwlkZHQ7GO7%2F-Mdyli6aWKLE4pc7c0QJ%2FScreen%20Shot%202021-07-07%20at%2010.23.15%20AM.png?alt=media\&token=82b86399-3031-4545-aae1-9fe2ece4be71)

### Cassandra

* Data placement strategies
  * SimpleStrategy
    * RandomPartitioner: Chord-like hash partitioning
    * ByteOrderedPartitioner: Assigns ranges of keys to servers, easier for range queries
  * NetworkTopologyStrategy: For multi-DC deployments
    * Two/three replicas per DC
    * Per DC: The first replica is placed according to partitioner, then go clockwise until you hit a different rack
* Snitches: Maps IPs to racks and DCs
  * SimpleSnitch: Unaware of topology/rack
  * RackInferring: Assumes network topology by octet of server's IP address
    * 101.102.103.104 = x.\<DC octet>.\<rack octet>.\<node octet>
  * PropertyFileSnitch: Uses a config file
  * EC2Snitch: EC2 region = DC, availability zone = rack
* Writes
  * Client sends write to one coordinator node in Cassandra cluster
  * Coordinator uses partitioner to send query to all replica nodes responsible for key
  * When X replicas respond, coordinator returns an acknowledgment to the client
    * X is specified by the client -- we'll come back to this later
  * Hinted Handoff mechanism: If any replica is down, the coordinator writes to all other replicas, and keeps the write locally until down replica comes back up, when it sends a copy of that write. When all replicas are down, the coordinator buffers the write locally

![Cassandra ring structure](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeFirZ3GoLQlR2zDwOZ%2F-MeFjbRTIqQTX6G2l6wm%2FScreen%20Shot%202021-07-10%20at%2010.06.52%20PM.png?alt=media\&token=d2fd7018-c18a-4f14-8da4-fd82305c8b5e)

* When a replica nodes receives a write
  * Log it in disk commit log for failure recovery
  * Make changes to appropriate memtables, in-memory representations of multiple key-value pairs. Memtables are flushed to disk when they are full/old.
  * Data files: An SSTable (Sorted String Table), list of key-value pairs sorted by key
  * Index file: An SSTable of (key, position in data SSTable) pairs
  * Efficient search: Bloom filters!
* Bloom filters: Large bit maps
  * Checking for existence in set is cheap
  * Some probabilities of false positives (an item not in set reported as being in there -> incur a slight overhead for going into the SSTable), but never false negatives
  * The bit map starts with all zeros. On insert, we use k hash functions to map a key to k indexes. Then, all hashed bits are set to 1 for those k indexes (if they hadn't been set already)

![Bloom filters](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeFirZ3GoLQlR2zDwOZ%2F-MeFp383iLsXOBYCKyvi%2FScreen%20Shot%202021-07-10%20at%2010.31.02%20PM.png?alt=media\&token=79e35110-5882-4f8a-ad90-4133d3a2ba98)

* Compaction
  * Each server periodically merges SSTables by merging updates for a key
* Delete
  * Instead of deleting right away, add a tombstone to the log, and eventually, it will be deleted by the compaction process
* Reads
  * Coordinator contacts X replicas
  * When X replicas respond, the coordinator returns the latest-timestamped value from those X replicas
  * The coordinator also fetches values from other replicas
    * This checks consistency in the background, initiating a read repair if any two values are different
    * This mechanism seeks to eventually bring all replicas up-to-date
  * A row may span across multiple SSTables -> reads need to touch multiple SSTables -> reads are slower than writes
* Membership
  * Any server could be the coordinator -> every server needs to know about all the servers in the cluster, and the list of servers needs to be updated as servers join/leave/fail
  * Cassandra uses gossip-style membership
* Suspicion mechanism: Sets timeouts on a server-by-server basis
* Reads/writes are orders of magnitudes faster than MySQL. But what did we lose?

### The Mystery of X-The Cap Theorem

* In a distributed system, at most two out of these three can be satisfied:
  * Consistency: All nodes see the same data at any time/reads return the latest value written by any client
    * Thousands of people booking the same flight
  * Availability: The system allows operations all the time & operations return quickly
    * Amazon: each extra ms of latency implies a $6M yearly loss
  * Partition-tolerance: The system continues to work in spite of network partitions (within/across DCs)

![CAP tradeoff](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJN9Wk0saXSgqzCl-H%2F-MeJP2LefcjHhjHe_oOz%2FScreen%20Shot%202021-07-11%20at%203.11.30%20PM.png?alt=media\&token=d4cacb17-61e9-4748-a544-c0e7a8d97993)

* RDBMS provides ACID: Atomicity, Consistency, Isolation, Durability
* KV Stores provides BASE: Basically Available Soft-state Eventual Consistency (prefers availability over consistency)
* In Cassandra, for each operation, a client is allowed to choose a consistency level
  * ANY: Any server (may not be replica)
    * Fastest (coordinator caches write & replies quickly)
  * ALL: All replicas
    * Strong consistency but slow
  * ONE: At least one replica
    * Faster than ALL but no failure tolerance (if all replica fails)
  * QUORUM: Quorum across all replicas in all DCs
    * Quorum = majority (>50%)
    * Any two quorums intersect
    * Faster than ALL while still guaranteeing strong consistency
  * More quorum-related
* Quorum (N = total number of replicas)
  * Read consistency level: R <= N, coordinator waits for R replicas to respond before sending result to client, while in background, coordinator checks for consistency of remaining (N-R) replicas
  * Write consistency level: W <= N. Two flavors: (1) Coordinator blocks until quorum is reached, (2) Async: just write and return
  * For strong consistency:
    * W + R > N (write & read quorums intersect in at least one server among replicas of a key)
    * W > N / 2 (two write quorums ..., which keeps the latest value of the write)

![Selecting values of W and R based on workloads](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJN9Wk0saXSgqzCl-H%2F-MeJTt4gz_AfLCjO_UkD%2FScreen%20Shot%202021-07-11%20at%203.32.39%20PM.png?alt=media\&token=836669be-1d2e-4dbf-b3fe-7af72b6f92a3)

### The Consistency Spectrum

* Cassandra offers eventual consistency: If writes to a key stop, all replicas of key will converge

![Left: Faster reads/writes. Right: More consistency.](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJN9Wk0saXSgqzCl-H%2F-MeJYMG20kA9_piwbxn8%2FScreen%20Shot%202021-07-11%20at%203.52.11%20PM.png?alt=media\&token=a2722515-c6a7-4e4b-9f10-44e310b105db)

* Per-key sequential: Per key, all operations have a global order
* CRDT: Commutative Replicated Data Types, commutated writes give same result
  * Servers do not need to worry about consistency/ordering
* Red-Blue: Rewrites client operations and split them into red/blue ops
  * Red ops: Need to be executed in the same order at each DC
  * Blue ops: Can be commutated in any order across DCs
* Casual: Reads must respect partial order based on information flow
* Strong consistency models: Linearizability/Sequential consistency

### HBase

* API functions
  * Get/Put (row)
  * Scan (row range, filter) - range queries
  * MultiPut
* Prefers consistency over availability (unlike Cassandra)

![HBase architecture](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJN9Wk0saXSgqzCl-H%2F-MeJbGYUQHeCw2hRQ6cU%2FScreen%20Shot%202021-07-11%20at%204.09.15%20PM.png?alt=media\&token=b9a748a5-5211-4ffe-9020-cc29bb58da34)

![HFile structure](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJN9Wk0saXSgqzCl-H%2F-MeJc6xTtQtusO03TOgG%2FScreen%20Shot%202021-07-11%20at%204.12.57%20PM.png?alt=media\&token=d70520b8-a205-472c-8ef2-2b50399bc161)

* HBase uses write-ahead log (before writing to memstore) to ensure strong consistency
* Cross-datacenter replication: Single master + other slave clusters replicate the same tables

## Lesson 2: Time and Ordering

### Introduction and Basics

* Time synchronization is required for both correctness and fairness
* Challenges
  * End hosts in Internet-based systems like clouds have their own clocks
  * Processes in Internet-based systems follow an asynchronous system model (no bounds on message delays/processing delays)
* Clock skew/drift: Relative difference in clock values/frequencies of two processes
* MDR: Maximum Drift Rate of a clock. Between any pair of clocks, given a max acceptable skew M, need to synchronize every M / (2 \* MDR) time units

![Terminologies](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJeJXwhsTQFizHtO4w%2F-MeJg2D04aVwVQrZp12N%2FScreen%20Shot%202021-07-11%20at%204.30.08%20PM.png?alt=media\&token=01c49b05-523b-41a3-ba7f-99867f49d434)

* Consider a group of processes:
  * External synchronization: Each process's clock is within a bound D of a well-known external clock (e.g., UTC)
  * Internal synchronization: Every pair of processes have clocks within bound D
  * External sync. within D implies internal sync. within 2D

### Cristian's Algorithm

* Process P synchronizes with a time server S
* Problem: Time response message is inaccurate, the inaccuracy being a function of message latencies (and since latencies are unbounded, the inaccuracy cannot be bounnded)
* Cristian's Algorithm measures the RTT of message exchanges
* The actual time at P when it receives response is between `[t + min2, t + RTT - min1]`
  * min1 = P -> S latency, min2 = S -> P latency
* Cristian's Algorithm sets its time to `t + (RTT + min2 - min1) / 2` (halfway through this interval)
  * Error is now bounded, being at most `(RTT - min2 + min1) / 2`

### NTP

* NTP = Network Time Protocol
* Each client is a leaf of the tree, each node synchronizes with its parent

![](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJeJXwhsTQFizHtO4w%2F-MeJtCdhgZS-mcchVrZF%2FScreen%20Shot%202021-07-11%20at%205.27.40%20PM.png?alt=media\&token=3ba1b90b-b555-437c-9b7d-fbfc7ec8b70b)

* Suppose child is ahead of parent by oreal, and suppose one-way latency of message i is Li, and suppose offset `o = (tr1 - tr2 + ts2 - ts1) / 2`, then
  * `tr1 = ts1 + L1 + oreal`
  * `tr2 = ts2 + L2 - oreal`
  * Then, `oreal = o + (L2 - L1) / 2`, and then,
  * `|oreal - o| < |(L2 - L1) / 2| < |(L2 + L1) / 2|`, making the error bounded by RTT
* Can we avoid clock synchronization and still be able to order events?

### Lamport Timestamps

* As long as timestamps obey causality, we can assign to events timestamps that are not absolute time
* Happens-before is denoted as ->

![](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJurcc2yvbeG8sTJil%2F-MeKB7wHtaNYLC0kOBeC%2FScreen%20Shot%202021-07-11%20at%206.50.21%20PM.png?alt=media\&token=39c66fe6-f337-411b-9be6-21dd2d467322)

* Rules for assigning timestamps
  * Each process uses a local counter that is initialized as 0
  * A process increments its counter when a send/instruction happens
  * A send(message) event carries its timestamp
  * For a receive(message) event, the counter is updated by max(local clock, message timestamp) + 1
    * To obey the causality order
* Lamport timestamps are not guaranteed to be ordered or unequal for concurrent events
  * `E1 -> E2` implies `timestamp(E1) < timestamp(E2)`
  * `timestamp(E1) < timestamp(E2)` implies `{E1 -> E2} OR {E1 and E2 are concurrent}`
* Can we tell if two events are concurrent or casually related? -> Vector clocks!

![Lamport example 1](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJurcc2yvbeG8sTJil%2F-MeKR_6kAkunPLwb1koR%2F%E8%8D%89%E7%A8%BF%E6%9C%AC-249.jpg?alt=media\&token=7ceee5a3-d2a2-4805-8805-bd2a06cece68)

![Lamport example 2](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJurcc2yvbeG8sTJil%2F-MeKRXC_2b8nlt7CIf6A%2F%E8%8D%89%E7%A8%BF%E6%9C%AC-248.jpg?alt=media\&token=6f663690-a5d0-41cd-b4c5-585af8ca5684)

![Lamport example 3 (from the final exam)](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MgdrEvsSG7lOQda3bSA%2F-MgdwdUxBgJU_1zuq2v5%2Flamport3.jpeg?alt=media\&token=4fac53db-0b4b-4c03-902d-5e5bd90a045c)

### Vector Clocks

* N processes
* Each process uses a vector of integer clocks: Process i maintains `Vi[1, ..., N]`
* jth element of vector clock at process i, `Vi[j]`, is i's knowledge of latest events at process j
* Rules for assigning vector timestamps
  * On an instruction or send event at process i, it increments only the ith element of its vector clock
  * Each message carries the send event's vector timestamp
  * When process i receives a message
    * `Vi[i] += 1`
    * `Vi[j] = max(Vmessage[j], Vi[j]) for j != i`
* Casually-related:
  * `VT1 = VT2` iff `VT1[i] = VT2[i]` for all i = 1, ..., N
  * `VT1 <= VT2` iff `VT1[i] <= VT2[i]` for all i = 1, ..., N
  * Two events are casually related iff `VT1 < VT2`
    * i.e., iff `VT1 <= VT2` & there exists j such that `1 <= j <= N & VT1[j] < VT2[j]`
  * Two events are concurrent iff `NOT(VT1 <= VT2) AND NOT (VT2 <= VT1)`
    * Denote as `VT1 ||| VT2`

![Vector clock example 1](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJurcc2yvbeG8sTJil%2F-MeKResdtGnk-7v7Mgya%2F%E8%8D%89%E7%A8%BF%E6%9C%AC-250.jpg?alt=media\&token=426fcd7b-175a-478a-8c71-1cd0637f703e)

![Vector clock example 2](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MeJurcc2yvbeG8sTJil%2F-MeKRhjDHUbPkgi2ZSny%2F%E8%8D%89%E7%A8%BF%E6%9C%AC-251.jpg?alt=media\&token=e1183231-4c11-4691-9eee-ab57b4378ffb)

![Vector clock example 3 (from the final exam)](https://1313833672-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-MMTslgmrrtRXvxD2lk9%2F-MgdrEvsSG7lOQda3bSA%2F-MgdwjedkuFCTY23iTAl%2Fvector3.jpeg?alt=media\&token=e16308b8-1fc4-453f-ba1f-b82a95332cfe)
