2020-11-2
MIT concurrency notes
Lecture 20: Blockstack
-
Paper:
BlockStack (2017);
Lecture
- Using blockchain as the basis for a new paradigm of internet services
-
Current web has the applications control all user data instead of the users
controlling their own data
-
Decentralized architecture would have app on user computers. App would
be able to somehow figure out how to get the data (potentially from some
sort of cloud storage). Permissions/privacy are hard
-
Some data is not owned by a user and some data should not be visible to
all users. Makes application programming difficult. (ex. Ebay with
hidden bidding. Reddit with vote counts)
-
Blockstack
- Naming (human readable): name → data location, name → public key
-
Public Key Infrastructure (Human readable, globally unique,
decentralized allocation)
-
Naming things via bitcoin blockchain on a first come first serve basis
(means that the names have no real meaning behind them)
-
Atlas system: Converts hashed info in the bitcoin blockchain to a data
location, which are Gaia servers
- Every name registration must burn money to execute it
-
Limitations
-
Forced to use a system like bitcoin to generate a serializable log as
certificate transparency for example has issues with preventing multiple
registrations for the same naming
-
Blockstack not very convenient for programmers. Hard to program an app
as no app only data, hard to do permissions
-
For users, no major apparent advantage. Have to trust storage servers
anyways
Lecture 19: Peer-to-peer: Bitcoin
-
Paper:
Bitcoin (2008);
Lecture
- Peer to peer system to establish trust by making a chain of hashes
-
Peer to peer systems
-
Simplest solution is to have a single trusted entity. But not great
because no one entity is trusted by everyone globally
-
Alternatively, use votes by peers. Has issues where someone can make
many fake peers and vote for themselves
-
Bitcoin
-
Prove work by generating hashes with a given number of zeros. People who
do that are called miners and they are rewarded with bitcoin
-
If most of the cpu power is controlled by non-malicious actors, the
non-malicious group should generally create the next block first
-
Handles forks (two successors to a single block) when they arise by
having all miners switch to longer forks as soon as they learn of it
-
It's possible in short forks for payments to be overwritten. The
solution is to wait for many blocks to appear after the transacation you
are interested in
-
Determining the difficulty of the hardness is done deterministically
based on the timestamps of the blockchain. Forces agreement as everyone
sees the same chain
-
Limitations:
-
Blocks are limited in size, which highly limits the transactions per
second
-
Interblock interval hard to make shorter as making it significantly
shorter makes it harder for miners to learn of updated peers and thus
they would be wasting much of their compute time. Miners don't want to
waste compute time
- Compute time is costly
Lecture 18: Fork Consistency, Certificate Transparency
-
Paper:
Certificate Transparency;
Lecture
-
System to trust an append only log that is potentially run by a bad agent
-
The original idea about SSL/TLS was to have certificate authorities be able
to check that everyone who claims to own a domain/DNS actually does.
However, now there are too many websites and there are too many certificate
authorities
-
As a result, sometimes the certificate authorities make a mistake or go
rogue and issue bad certs that allow for man in the middle attacks
- Very difficult to detect what certs are good and what are fake
-
Certificate Transparency
-
Allow auditing of validity of certificates by revealing all information
publicly
-
Certificate authorities make certs for servers and also sends a copy to
the certificate transparency log server
-
Client browsers validate that certificates are in the certificate
transparency log servers
-
No browser will use certs not in the log (could still be bad certs
issued by certificate authorities)
-
Each website builds a "monitor" service to check for any rogue
certificates
-
Append only (to prevent deletion by rogue users), no forks (no
equivocation aka don't show two different logs to two different users),
and ability to handle when untrusted
-
Merkle tree
-
Pairs of numbers are hashed in a tree structure. Tree heads are signed
so that log servers cannot disavow them later
-
Browsers on getting a cert ask log servers for a proof of inclusion
- Asks for the signed tree head
-
Then asks for the position of the given certificate + hashes of the
sibling nodes to the parent. Computes the hashs locally and compares
vs signed tree head. Computationally infeasible to produce a fake
hash
- Only has log(n) sibling hashes. Makes this fast
-
Once forked, must maintain the fork indefinitely. Can be defeated by
gossip between various clients
- Figure out of one log is a prefix of another
-
Given two signed tree heads, ask proof from log server that one is a
prefix of another. Log server sends sibings of the nodes to the more
recent tree head and then client can compute the hashes locally to
compare
-
Browser challenges log servers with previous signed tree heads
Lecture 17: Causal Consistency, COPS
-
Paper:
COPS (2011);
Lecture
-
Scaling a consistency model while using a consistency model based on
causation of events
-
Goal: Local reads + local writes
-
Easiest way is to use
eventual consistency (See Dynamo and Cassandra)
- Reads occur locally
-
Writes occur locally, but each writer has a write queue of items
that they send to replicated servers
-
Other readers on other servers are not guaranteed to see your write.
They will eventually, but the order might be different
-
Difficult to use as an app programmer as related updates might
return out of order. Ex. Upload Photo → Add to list. The add to list
might occur before the photo upload replication happens and thus
have be an empty pointer
-
Eventual Consistency has issues with reconcilling simultaneous writes to
the same key
-
Wall clock to break ties is naive solution (same time ties can be
broken with data center id for example), but there can be drift,
which results in many writes being rejected incorrectly
-
Lamport (logical) clocks
Every server keeps a value of Tmax, which is the higher number it
has seen anywhere else
-
When server assigns a timestamp, it takes max(Tmax + 1, real
time)
- Last writer wins policy is easy
-
Struggles with atomic increments (transactions/mini-transacations)
-
Eventual Consistency + Barrier: Implements
sync
operation, which forces a wait on a key until version is at least a
given value
-
Use a single log server at local datacenter: Orders all local operations
and makes sure they arrive at other servers in the right order.
Unfortunately, this becomes a potential bottleneck
-
COPS + Causal Consistency
-
Client Context
-
Keeps track of operations a client does on the client side to
understand ordering dependencies. The client keeps track of version
numbers obtained by various operations
-
Ex (v indicate version number returned, → indicates dependency
tree): Get X returns v2 → Get Y returns v4 → Put Z with value which
depends on {x: v2, y: v4} returns v3
-
Local server gets client operations and forwards data + dependencies to
replicas. Replicas must then wait until the dependencies are applied
before being able to apply the write. Replicas query other servers in
its datacenter to figure out the version numbers of the depenendencies
and if they are high enough to apply operation
-
Any put allows for subsequent operations to use only the put as its
context instead of all proceeding operations. This is correct because
the put implicitly has all its proceeding operations as a dependency
- Performance not notable, 50k ops/second
-
Active area of research (OCCULT for example improves on COPS by not
using explicit dependencies and using "causal" timestamps, which is
based on real time, that indicate how far advanced a single shard is. If
timestamps returned from the server on a read > local time, need to
retry or query master)
-
COPS with Get Transactions
-
Response to Get Transaction gives the version (same as before) as well
as the dependencies of each value
-
For each item returned, check its dependencies and make sure that the
item returned from the Get Transacation has at least the version in the
dependency
-
If it doesn't, send a second round of gets. This doesn't introduce
additional waiting for dependencies as the initial Get Transaction
having a certain value implies that all its dependencies are already
updated to the required version. Two rounds of gets to make sure all
dependencies are the correct version
Lecture 16: Cache Consistency: Memcached at Facebook
-
Paper:
Memcached at Facebook (2013); Lecture
- Handling issues while scaling memcached
-
Standard read:
-
Read: try get from memcached → if return null → fetch from DB → stored
returned value in memcached
-
Write: send new value to DB → delete from memcached (invalidate scheme)
-
Could alternatively, send updated data to memcached on write instead of
deleting from memcached (harder to get right)
-
Because all writes go through the primary region, a write stemming from
the secondary region might not get propagated to the secondary region's
database before the cache is invalidated. Solved by adding a key as
"remote" which tells future clients to look at the primary region when
failing to get a hit in memcached
-
Performance
-
Partition
- Keep one copy of each key means it's RAM efficient
- Not good for hot, very frequently accessed keys
- Clients talk to every partition
-
Replication
- Good when handling very hot keys
- Clients talk to one server only
- Handle a lot less total data
-
Starting new memcached cluster is slow
-
On miss for "cold" clusters, tries local memcached first, but then
tries a different cluster. If it finds in a hot cluster, will add to
the cold cluster otherwise query database
-
Prevent inconsistency with database updates by adding a non-zero
"hold" time for operations shortly after a delete
-
Prevent thundering herd after cache invalidation by using a "lease".
- First client to get a miss will get a lease # from memcached
-
Other front ends getting a miss will be told to wait because
memcached has a lease #. The lease number will eventually time out,
making it possible to make progress eventually
-
Makes it so that only a single client actually queries the database
-
Lease mechanism also used to prevent out of order writes after a
delete (cache invalidation). Cache invalidation invalidates any
outstanding leases so writer cannot write with stale data
-
Fault Tolerance with Gutter servers
- Idle unless main memcached server is dead
-
Client on memcached failure due to being unable to contact a server,
contacts gutter server
-
Key doesn't exist in gutter server means client reads from DB and then
installs in gutter servers
-
Keys are not explicitly deleted because otherwise excessive number of
delete requests to it as gutter can store any key and might not be
partitioned. Instead entries expire quickly
Lecture 15: Big Data: Spark
-
Paper:
Spark (2012); Lecture
-
Generalizes two stages of MapReduce into multi-step dataflow graphs. Not
great at streaming data
-
Build lazy lineage graph instead of actually computing. Lazy computing
-
Only executed when running an action (count, collect, reduce,
lookup, save)
-
Something like a distinct requires a distributed sort/sending info to
right workers or hashing to figure out how to send info to right workers
- Persist/cache → stores data in memory so don't have to recompute
-
Data is directly piped in memory to next step, unlike in MapReduce which
must create intermediate files. More efficient
-
For Narrow dependencies (only requires local data), no network
interaction at all
-
Last narrow stage will put output to disk and then for the wide stage
(dependent on multiple previous stages), it reads the buckets of the
file. Similar to MapReduce's reduce step
-
By generating lineage graph can optimize wide operations by avoiding
unnecessary operations
-
Fault Tolerance
-
Fault tolerance by just rerunning the lineage graph as operations are
immutable
-
Works well for narrow dependencies, but wide dependencies require
potentially data from all other workers, if not saved to disk might need
to rerun all those computations
- Use snapshotting to avoid having to rerun all computations
-
Very large clusters so need to have a reasonable failure recovery system
Lecture 14: Optimistic Concurrency Control
-
Paper:
FARM (2015); Lecture
-
Use remote direct memory access + non volatile memory (battery powered) to
provide serializability (ordered transactions)
-
Research prototype where the app uses special network interface cards (NICs)
to read from memory of app on a different computer directly. Only viable in
local networks at the moment, but crazy good performance (100x faster than
spanner). Uses two phase commits
-
Optimistic Concurrency Control by first reading the value directly from app
memory on a different computer and then validating during the transaction
after acquiring locks. Pessimistic Concurrency Control blocks on acquring
locks while Optimistic aborts when it find that the valid it has read is no
longer valid.
- Direct Read → Lock → Validate → Commit
Lecture 13: Spanner
-
Paper:
Spanner (2012); Lecture
-
Use Paxos to make the transacation coordinator in two phase commits have
fallbacks and so it does not completely block on transacation coordinator
failure (availability). This allows for external
consistency/serializability. High demand by programmers for transacations as
easier to reason about
-
Use versioning of objects (snapshot isolation) + time API to allow many read
only transactions to complete by reading from the the local instance without
locking. Paxos sends operations in timestamp order. Must wait until sees a
Paxos operation with timestamp after the timestamp of the transaction (See
start rule)
-
Time API has uncertainty.
-
Start Rule: Transaction timestamp = time.now.latest.
- Read only: start time
- Read write: commit time
-
Commit Wait: Wait to commit a read write transaction until Transaction
timestamp < time.now.earliest
- Used commerially by Google and also in Cockroachdb
Lecture 12: Distributed Transactions
-
Paper:
Read 6.033 Chapter 9, just 9.1.5, 9.1.6, 9.5.2, 9.5.3, 9.6.3 ; Lecture
-
Serializable
-
There exists some serial order (one at a time) of execution of the
transactions such that yields the same result as the actual execution
-
Given two transactions T1, T2. The only possible serial orders are: T1,
T2 or T2, T1. A serializable system must have the same result as running
either T1 then T2 or T2 then T1.
-
Distributed Transactions with ACID:
-
concurrency control (to provide isolation/serializability)
-
Pessimistic: Acquire lock before using data. Wait if lock held by
something else (faster when frequent conflicts)
-
Optimistic: Don't acquire locks and just read/write to temporary
area. At the end check if another system is using the data at the
same time. If it was being used, abort. Faster when conflicts are
rare
- atomic commit (to provide atomicity despite failure)
-
Two phase locking (Pessimistic)
- Acquire lock before using record
-
Hold lock until completely done/abort (needed as opposed to release
immediately after last usage of a record because an abort after that
could result in the need to rollback the entire transaction)
-
Easy to get deadlocks (db needs to smart to detect and abort
transactions)
-
Two phase commits (used in many databases that are sharded + have
multi-record transactions):
-
Phase One: Coordinator tells all participants the operations they need
to perform. Participants prepare locks/rollback resources as needed.
After receiving the response and about ready to commit, coordinator asks
all participants if they can commit. If participants says yes, they must
be remain in a state indefinitely where they can either do the operation
or rollback.
-
Phase Two:
-
Coordinator receives yes from all participants → Sends message to
all participants to commit repeatedly until gets acknowledgement
from all participants.
-
Coordinator receives no from any participants → sends abort
repeatedly to all participants to rollback until gets
acknowledgement from all participants.
-
Has issues when coordinator fails as participants cannot change their
locked without coordinator command to either proceed or abort. Generally
requires manual intervention
Lecture 11: Cache Consistency: Frangipani
-
Paper:
Frangipani; Lecture
-
Networked file system with locking to guarantee cache coherence/atomicity
using Petal (block storage service)
-
Cache Coherence Rules
-
No workstation can hold cached data without holding a lock, but servers
don't give up lock after they are done using the data to optimize for
the typical case where one creates files and immediately use it
- Acquire Lock by requesting from Petal → Read from Petal
-
When planning to release lock → first write cached data to Petal
-
Data is typically in RAM, but anything in cache is written to file every
~30 seconds
-
Lock server keeps track of locks for specific files and the workstation
that owns that lock
-
Example:
-
Workstation 1: Request lock to lock server → receive Grant lock from
lock server → uses files but continues holding lock even when not
using it directly
- Workstation 2: Sends a Request lock to lock server
-
Lock Server: Checks table → lock owned by Workstation 1 → sends
Revoke to workstation 1
-
Workstation 1: Writes any cached data to write ahead log and then
Petal → Release lock message to Lock Server
-
Lock Server: receives Release lock from Workstation 1 → Sends Grant
lock to Workstation 2 and Workstation 2 can use the file
-
Crash Recovery
-
A workstation must write to its own write ahead log in Petal (allows
other workstations to acccess it in case of crashes) before beginning
any write that it does. Log Entries have version number with version
before write + 1
-
On crash, other computer looks at log and finds the last entry via the
monotonically increasing log sequence number. Each entry has description
of file operations
-
Replays any complete log entries (no partial writes) into Petal and
then tells Petal to release the locks
-
Ignores log entries that have version numbers ≤ version number in
Petal (only replays newer entries)
-
Recovery reads without acquiring locks even though other systems may
have the locks. (Required because if power outage knowledge about
locks is completely lost and recovery software must continue). This
is okay because there are only two possibilities:
-
workstation had lock when crashed → nothing else could have
written to it as lock was not acquired by any other system so we
are safe. version number < version in log
-
workstation gave up lock when crashed → must have already
written to Petal so version number will be ≥ version in log and
recovery does nothing
Lecture 10: Cloud Replicated DB, Aurora
-
Paper:
Aurora (2017); Lecture
-
Cloud based database on AWS with quorum based reads/writes across multiple
availability zones
-
Quorums
-
Consistency achived when Vr + Vw > V, where Vr is the
number of votes needed for a read request, Vw is the number of
votes needed for a write request and V is the number of copies
-
Aurora has a goal of having read availability when one availability zone
+ 1 server is down. They achieve this by having 6 copies across 3
availability zones. Then a single zone + 1 server can be down and a read
quorum of 3 can be achieved. Write quorum must then be 4
-
Only stores "redo" logs as opposed to pages. Materializes pages when needed.
The log is the database
-
Writes through a single database, many read only databases
-
Read only databases can directly interact with storage servers to
grab data. Also cache data, but lag to some extent
-
mini transactions/VDL numbers etc are used to ensure consistency for
the read only databases
-
Aurora typically does not use quorum reads as it keeps track of how far
in the log each storage server has reached and sends read requests to a
storage server whose log is up to date enough. Quorum reads used in
recovery
-
Quorum reads are used in recovery to find the first missing log
entry amongst the quorumed servers and discards all entries
following that
-
Only log entries of uncommitted entries are discarded due to the
quorum write property
-
Redo log:
- Old value + new value
-
Commit record (existence of this indicates if the transaction has
completed or not)
-
Network is the main bottleneck, dealt with via customized storage servers
Lecture 9: More Replication, CRAQ
-
Paper:
CRAQ (2009);
Lecture (majority
of the lecture is about Zookeeper, CRAQ part starts at ~57:20)
-
Improves on Chain Replication with higher read throughput that allows gives
applications the option for weaker consistency guarantees on non-committed
operations
-
Chain Replication
-
Linearly ordered sequence of servers where all writes are processed by
head node and all reads are processed by tail node. Has strong
consistency
-
Head node sends data to next node, which then sends data to the node
after it. Repeat until reaches tail node
-
Tail node sends acknowledgement that it has committed, which then
propagates via the chain back to the head. Then head can then respond to
the client that the write operation has completed
-
Chain Replication with Apportioned Queries (CRAQ)
-
Each node stores multiple versions of an object
- monotonically increasing version number + clean/dirty flag
-
When a node receives an update from a node higher up in the chain (or
from client), appends that update to the object with an increased
version number
-
Not last node → mark as dirty and propagate to node proceeding
itself
-
Last node → mark as clean and sends acknowledgement up the chain
-
node receives an acknowledgement message for an object → mark object as
clean and can delete non-latest version of object
-
node receives a read request for an object
-
Latest version is clean (aka has one version of the object) →
responds with the object. The tail is guaranteed not to have seen
the object yet
-
Latest version is dirty → sends message to tail to get the tail's
last commit number of the object and responds with that version of
the object (which must exist)
-
It is possible tail commits a new version between when it respond to
the node and when the node sends to the client. But this is still
strongly consistent because consistency is ordered to when tail
responded
-
Can use as eventual consistency system by having nodes respond to read
requests by serving the latest version available. A request to a
different node may result in an older version of the object
-
Vs Raft/Paxos
-
Faster than a system like Raft/Paxos as reads/writes are divided amongst
multiple servers, but handles lagging servers poorly
-
Failure recovery is similar to Chain Replication in that every node
knows its predecessor and successor and can generally replace it with no
extra work (nodes internal in the chain failing may require resends)
-
Not resistant to partition failure. General way to deal with this is a
configuration manager such as Zookeeper (or similar Raft/Paxos
replicated system) to keep track of which servers are alive and the
current setup of the chain
Lecture 8: Zookeeper
-
Paper:
ZooKeeper (2010);
Lecture Part1
Lecture Part2
(until ~57:20)
-
Stand alone service used for coordination of other services. Uses ZAB, which
is similar to Raft/Paxos
-
Linearizability
-
A history is linearizable if there exists a total order of operations
that matches real time and reads see proceeding writes in the order
- Cycles =/= not linearizable
- Zookeeper does not use the standard definition of linearizability
-
Zookeeper alows reads to return stale data (unless using sync)
-
Zookeeper Guarantees
- Writes are linearizable
-
For any given client, Zookeeper executes operations in a client
specified order (FIFO)
-
Writes: Client gives an ordering of the writes for the leader to
execute
-
Reads: A read has a given effective location in the log. Any future
read must be at least as up to date as that previous read. This
guarantee holds even when switching to a different replica
-
When replica responds to a client read request, replica attaches
zxID of the previous entry to the response. Client remembers the
most recent zxID it sees. Client sends that id to future
requests
- Applies to reads followed by writes. Read your own writes
-
sync + FIFO client order allows one to see the most recent data
if needed
-
Strategies
-
Ready file (acts like a lock): before modifying configuration, delete
"ready" file. Then make changes and at the end recreate "ready" file.
This allows other systems to know that the configuration is in a good
state
-
When trying to read configurations, read for "ready" file existence (to
know the configuration is good) with a watch to be notified of any
change (and thus know the configuration is stale). Watches not carried
over by replicate failure so need to restart with watches when replicate
crashes (client is notified of this)
-
Most operations have version number so zookeeper only does an operation
when the current version number = version sent via request
-
while true: getData(id) → if setData(id, new_value ,version + 1): break
-
Thundering herd
-
Because typically one uses version numbers to prevent running with stale
data, it is common to implment with a loop. Many many clients trying at
the same time will result in a thundering herd
-
General solution: Randomized Exponentially increasing delays between
retries
-
Ephemeral nodes: Tries to create ephemeral, but if it fails check if
exists with a watch to know when the lock is released
-
Sequential file: Check if previously number file in the sequence
exists. Only notified when previous client's lock is released so no
need to retry except when you know it will work for sure
Lecture 6-7: Fault Tolerance: Raft
-
Paper:
Raft (extended) (2014); Lecture Part1
Lecture Part2
- Replicated State machine
-
Must avoid Split Brain against network partition
- Non-faulty networks: Communication failure → server down
- Human Intervention
-
Majority Voting (sometimes called quorum systems): Odd number of servers
- 2F+1 servers → F failures and can still continue
- At most one paritition can have a majority of servers
-
Any two majorities must overlap in at least one server (just count
it! Pidgeonhole principle)
- After being elected, majority includes leader itself
-
Some similar systems
- Paxos: Considered very challenging to understand
- Viewstamp Replication
-
Log: Leader gives an order for client operations so all clients see the same
order
- Persisted log allows for a server to recover after crashing
-
Logs tails may diverge (i.e. leader crashes before sending them out to
all the servers)
-
Leaderless systems exist, but require more messages (multiple rounds of
voting) and are harder to reason about
- Each term has at most one leader
-
If server has not heard from current leader after election timeout has run
out, it will start an election
-
Increments term → Votes for itself → Generates a new election timeout
interval randomly →Sends out RequestVotes RPC to all other servers
-
If receieves votes from a majority of servers (including itself) →
become leader
-
Each server votes at most once per term (stored on disk to avoid
forgetting)
-
If AppendEntries RPC received from current leader (higher/equal term to
itself), convert to Follower
-
If election timeout elapses →start new election
-
Happens when vote is split up and not decisive or partitioned
network
-
Raft reduces the odds of split votes is by
randomizing election timeouts so all servers do not try to
start an election at the same exact time. This makes it so that one
server will likely be able to send out a full round of vote requests
before another server attempts to do the same. Other servers will
all vote for the first server
-
Average time of failures for a single server≫ Election timeout ≫
heartbeat interval
-
Candidate after winning election sends out heartbeat to all other servers.
Implicitly indicates they are the leader because there is at most one leader
for a single term.
-
Heartbeats supress candidates from popping up because it resets election
timeout of followers/candidates who receive messages
-
Who can be leader (how will servers decide whether to vote a certain other
server as a leader or not)?
- Cannot merely use longest log (in Raft)
- Must have term >= server's term
-
Must have log at least as up to date as the server's own log
- Compare last log entry
- Higher term log entry is more up to date
- If same term, larger log entry index is more up to date
-
Log Matching Property
-
Log entries in two different logs with the same index and same term are
the same log entry
-
If a log entry is matching all preceding entries are also matching
-
By default, the leader finds the last common log entry by decrementing
its count of where the follower is by one and sending that index + term
to the follower. The follower then checks to see if it has a log entry
there with the same index + term (which then must be the same by the Log
Matching Property)
- This can be slow when a follower gets very far behind
-
Alternate Scheme for higher performance when very far behind
rejection from follower includes:
- XTerm: term in the conflicting entry (if any)
- XIndex: index of first entry with that term (if any)
- XLen: log length
-
Case 1 (leader doesn't have XTerm): nextIndex = XIndex (can
reset to the last common term)
-
Case 2 (leader has XTerm): nextIndex = leader's last entry for
XTerm (i.e. when a follower has more log entries in a certain
term than a leader, those entries should be deleted and thus go
back to leader's last entry for XTerm)
-
Case 3 (follower's log is too short): nextIndex = XLen (try to
match at last entry of follower, and if needed go to case1/2)
-
Could also in theory use something like binary search, which might
be faster
-
Persistence
Relatively costly, so only persist the minimal number of things
-
currentTerm: Required to prevent two leaders from existing in the same
term (due to forgetting about a previous term)
-
votedFor: Required to prevent voting for multiple candidates in a single
term
-
log: Required for the information to reconstruct the application state
-
Other things such as lastApplied do not need to be persisted as they can
be reconstructed by looking at own log + responses from ApplyEntries.
Leader will re-execute entries after a restart. Application might need
logic to ignore duplicates
-
Log Compaction/Snapshots
-
Application is asked to save its state as of a particular log entry as
state is generally smaller than log of changes
- Raft can then discard all earlier entries
-
If earlier entries are needed for AppendEntries/restoring another
server's state, use InstallSnapshot RPC
- To be added: Membership changes
Lecture 5: Go, Threads, and Raft
- add -race for go's race detection
-
Closure for goroutines: Need to pass in changing variables explicitly from
the outside go func(i int){ log.Print(i) }(i)
-
SIGQUIT
-
the default SIGQUIT handler for Go prints stack traces for all
goroutines (and then exits)
- Ctrl+\ will send SIGQUIT
- Kill a single goroutine with kill -QUIT pid
Lecture 4: Primary-Backup Replication
-
Paper:
Fault-Tolerant Virtual Machines (2010);
Lecture
-
Use deterministic replay of state changes to a single backup to allow for
Fault tolerant VMs
-
Main requirement: if the backup VM ever takes over after a failure of the
primary, the backup VM will continue executing in a way that is entirely
consistent with all outputs that the primary VM has sent to the external
world.
-
Achieved by delaying output from primary until the backup has received
the information needed to replay up to that output operation and has
knowledged all entries up to that log's entry. Primary can continue
executing, but cannot send output. Backup eventually replays all
operations (discards output though)
-
Non-deterministic actions are done on primary and the result is sent to
the backup, which then uses the result
-
Could consider a chain replication scheme instead of delaying output
from the primary
-
Use heartbeats to determine if other server is alive
-
Must be careful as this can result in split brain if not careful when
the backup think it's the primary and the primary thinks the backup is
dead
-
Split brain avoided by executing atomic test-and-set operation (returns
true to only the first instance to try it) on shared storage. Operation
succeeds → can go live. Fails →must abort. Storage acts as a tiebreaker
- Duplicate output can happen. TCP etc should be able to handle it
-
Alternatively, send over the entire state instead of state changes
-
This paper is only single core and has issues with how to switch to
multicore
-
Sending over entire state would make the problem much easier to handle
Lecture 3: GFS
-
Paper:
GFS (2003);
Lecture
-
Distributed file system built from many commodity servers to primarily
handle sequential writes to large files
-
Classic Issue: Performance → sharding over many machines → get many faults →
need fault tolerance via replication → have potential for inconsistency →
better consistency → low performance
-
GFS
-
Many chunk servers that store actual data. One master keeps track of all
metadata in two tables. Stored in both memory + disk.
- File names mapped to array of chunk handles (non-volatile)
-
Chunk Handles mapped to list of chunk servers (volatile, can ask all
chunk servers what handles they have), version number
(non-volatile), primary chunk server (volatile, just wait the chunk
expiration), primary lease expiration time (volatile)
-
Operations
-
Read:
- Client sends filename + offset in request to master
-
Client gets list of chunk servers from master (cached by client)
-
Client talks to one of the chunk servers (tries to pick closest) and
gets the data back
Write:
-
If no primary
-
finds chunk servers with the most up to date chunk (must match or
be larger than the version number stored by the master), does not
respond until it can figure this out.
-
Picks a primary from the list of servers that have the most up to
date chunk.
-
Sends information to chunk servers of who is in charge + lease
including new version number. When master writes version number to
disk is unclear
-
If master cannot communicate with primary, it can wait lease
duration and then designate a new primary, avoiding split brain
-
Client sends data to all replicas (maybe just closest and then chain
replication)
-
Once all replicas have data, client tells primary to write the data
-
Primary picks an offset, writes the data, and tells the replicas the
offset and to write the data
-
Replicas respond to primary and primary gets "yes" from all replicas,
it responds to client that the operation has succeeded
-
If replicas don't all respond "yes", primary responds no. Client
should retry sequence from the start.
-
Note that a single entry can appear multiple times or not at all due
to retries and potentially different orders depending on the replica
it talks to. Client must handle this
Lecture 2: RPC and Threads
-
Threads: Share memory/cache. Can be very helpful, but must be careful about
mutations.
- Use locks to solve
-
Change program to not share data (for increment common variables just
increment separate variables and then increment all at the end)
-
Deadlocks: One thread holding a lock is waiting for another thread to
release a lock. Unfortunately, other thread waiting for first thread to
release the lock it is holding
- Channels: Pass messages to share data
Lecture 1: Introduction
-
Paper:
MapReduce (2004);
Lecture
-
Distributed file system built from many commodity servers to primarily
handle sequential writes to large files
-
Distributed Systems
-
Set of cooperating computers communicating over network to do certain
tasks
-
Build for parallelism, fault tolerance, physical separation, and for
security/isolation reasons
-
Challenges: concurrency, partial failures (instead of full failure or
full success), performance (hard to get linear scaling)
-
MapReduce
-
Goal for non-distributed system experts to be able to use distributed
system infra to run large jobs
-
Use "Map" to generate intermediate key/value pairs, and then "Reduce" to
generate a final set of values
- Map: (key1,value2) → (key2, value2)
- Reduce: (key2, list value2) → (key2, value3)
-
All Maps can run in parallel and so can all Reduces. N workers will give
~Nx performance
-
Tasks are deterministic and repeatable. Many small tasks to avoid one
task dominating the time
- Limitations: No state or real-time streaming processing
-
Fault Tolerance
-
Single master. Only fault tolerance is snapshotting and restarting from
snapshot
-
Master pings workers periodically, but if worker does not respond in a
timely matter, the master marks the worker as failed and gives the tasks
the worker was doing to another worker. Resilient to worker failure
-
When map worker finishes job, sends master list of output files. Master
records filenames. Reduce worker's finished results are handled via an
atomic rename so multiple copies are not made
Any error corrections or comments can be made by sending me a pull request.
Back