A distributed sync package.
This project has moved to https://github.com/minio/minio/tree/master/pkg/dsync
A distributed locking and syncing package for Go.
dsync
is a package for doing distributed locks over a network of n
nodes. It is designed with simplicity in mind and hence offers limited scalability (n <= 32
). Each node will be connected to all other nodes and lock requests from any node will be broadcast to all connected nodes. A node will succeed in getting the lock if n/2 + 1
nodes (whether or not including itself) respond positively. If the lock is acquired it can be held for as long as the client desires and needs to be released afterwards. This will cause the release to be broadcast to all nodes after which the lock becomes available again.
This package was developed for the distributed server version of Minio Object Storage. For this we needed a distributed locking mechanism for up to 32 servers that each would be running minio server
. The locking mechanism itself should be a reader/writer mutual exclusion lock meaning that it can be held by a single writer or an arbitrary number of readers.
For minio the distributed version is started as follows (for a 6-server system):
$ minio server http://server1/disk http://server2/disk http://server3/disk http://server4/disk http://server5/disk http://server6/disk
(note that the same identical command should be run on servers server1
through to server6
)
n/2 - 1
nodes are down).sync.RWMutex
and supports sync.Locker
interface.The tables below show detailed performance numbers.
This table shows test performance on the same (EC2) instance type but with a varying number of nodes:
EC2 Instance Type | Nodes | Locks/server/sec | Total Locks/sec | CPU Usage |
---|---|---|---|---|
c3.2xlarge | 4 | (min=3110, max=3376) | 12972 | 25% |
c3.2xlarge | 8 | (min=1884, max=2096) | 15920 | 25% |
c3.2xlarge | 12 | (min=1239, max=1558) | 16782 | 25% |
c3.2xlarge | 16 | (min=996, max=1391) | 19096 | 25% |
The min and max locks/server/sec gradually declines but due to the larger number of nodes the overall total number of locks rises steadily (at the same CPU usage level).
This table shows test performance for a fixed number of 8 nodes on different EC2 instance types:
EC2 Instance Type | Nodes | Locks/server/sec | Total Locks/sec | CPU Usage |
---|---|---|---|---|
c3.large (2 vCPU) | 8 | (min=823, max=896) | 6876 | 75% |
c3.2xlarge (8 vCPU) | 8 | (min=1884, max=2096) | 15920 | 25% |
c3.8xlarge (32 vCPU) | 8 | (min=2601, max=2898) | 21996 | 10% |
With the rise in the number of cores the CPU load decreases and overall performance increases.
Stress test on a c3.8xlarge (32 vCPU) instance type:
EC2 Instance Type | Nodes | Locks/server/sec | Total Locks/sec | CPU Usage |
---|---|---|---|---|
c3.8xlarge | 8 | (min=2601, max=2898) | 21996 | 10% |
c3.8xlarge | 8 | (min=4756, max=5227) | 39932 | 20% |
c3.8xlarge | 8 | (min=7979, max=8517) | 65984 | 40% |
c3.8xlarge | 8 | (min=9267, max=9469) | 74944 | 50% |
The system can be pushed to 75K locks/sec at 50% CPU load.
NOTE: Previously if you were using
dsync.Init([]NetLocker, nodeIndex)
to initialize dsync has been changed todsync.New([]NetLocker, nodeIndex)
which returns a*Dsync
object to be used in every instance ofNewDRWMutex("test", *Dsync)
Here is a simple example showing how to protect a single resource (drop-in replacement for sync.Mutex
):
import (
"github.com/minio/dsync/v3"
)
func lockSameResource() {
// Create distributed mutex to protect resource 'test'
dm := dsync.NewDRWMutex(context.Background(), "test", ds)
dm.Lock("lock-1", "example.go:505:lockSameResource()")
log.Println("first lock granted")
// Release 1st lock after 5 seconds
go func() {
time.Sleep(5 * time.Second)
log.Println("first lock unlocked")
dm.Unlock()
}()
// Try to acquire lock again, will block until initial lock is released
log.Println("about to lock same resource again...")
dm.Lock("lock-1", "example.go:515:lockSameResource()")
log.Println("second lock granted")
time.Sleep(2 * time.Second)
dm.Unlock()
}
which gives the following output:
2016/09/02 14:50:00 first lock granted
2016/09/02 14:50:00 about to lock same resource again...
2016/09/02 14:50:05 first lock unlocked
2016/09/02 14:50:05 second lock granted
DRWMutex also supports multiple simultaneous read locks as shown below (analogous to sync.RWMutex
)
func twoReadLocksAndSingleWriteLock() {
drwm := dsync.NewDRWMutex(context.Background(), "resource", ds)
drwm.RLock("RLock-1", "example.go:416:twoReadLocksAndSingleWriteLock()")
log.Println("1st read lock acquired, waiting...")
drwm.RLock("RLock-2", "example.go:420:twoReadLocksAndSingleWriteLock()")
log.Println("2nd read lock acquired, waiting...")
go func() {
time.Sleep(1 * time.Second)
drwm.RUnlock()
log.Println("1st read lock released, waiting...")
}()
go func() {
time.Sleep(2 * time.Second)
drwm.RUnlock()
log.Println("2nd read lock released, waiting...")
}()
log.Println("Trying to acquire write lock, waiting...")
drwm.Lock("Lock-1", "example.go:445:twoReadLocksAndSingleWriteLock()")
log.Println("Write lock acquired, waiting...")
time.Sleep(3 * time.Second)
drwm.Unlock()
}
which gives the following output:
2016/09/02 15:05:20 1st read lock acquired, waiting...
2016/09/02 15:05:20 2nd read lock acquired, waiting...
2016/09/02 15:05:20 Trying to acquire write lock, waiting...
2016/09/02 15:05:22 1st read lock released, waiting...
2016/09/02 15:05:24 2nd read lock released, waiting...
2016/09/02 15:05:24 Write lock acquired, waiting...
The basic steps in the lock process are as follows:
n
nodesn/2 + 1
responded positively) then grant lockThe unlock process is really simple:
A 'stale' lock is a lock that is left at a node while the client that originally acquired the client either:
Too many stale locks can prevent a new lock on a resource from being acquired, that is, if the sum of the stale locks and the number of down nodes is greater than n/2 - 1
. In dsync
a recovery mechanism is implemented to remove stale locks (see here for the details).
Known deficiencies can be divided into two categories, namely a) more than one write lock granted and b) lock not becoming available anymore.
So far we have identified one case during which this can happen (example for 8 node system):
Now we have two concurrent locks on the same resource name which violates the core requirement. Note that if just a single server out of 4 or 5 crashes that we are still fine because the second lock cannot acquire quorum.
This table summarizes the conditions for different configurations during which this can happen:
Nodes | Down nodes | Crashed nodes | Total nodes |
---|---|---|---|
4 | 1 | 2 | 3 |
8 | 3 | 2 | 5 |
12 | 5 | 2 | 7 |
16 | 7 | 2 | 9 |
(for more info see testMultipleServersOverQuorumDownDuringLockKnownError
in chaos.go)
This would be due to too many stale locks and/or too many servers down (total over n/2 - 1
). The following table shows the maximum toterable number for different node sizes:
Nodes | Max tolerable |
---|---|
4 | 1 |
8 | 3 |
12 | 5 |
16 | 7 |
If you see any other short comings, we would be interested in hearing about them.
n/2
locks and there is no majority winner. Both will fail back to their clients and will retry later after a semi-randomized delay.On the server side just the following logic needs to be added (barring some extra error checking):
const WriteLock = -1
type lockServer struct {
mutex sync.Mutex
lockMap map[string]int64 // Map of locks, with negative value indicating (exclusive) write lock
// and positive values indicating number of read locks
}
func (l *lockServer) Lock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
if _, *reply = l.lockMap[args.Name]; !*reply {
l.lockMap[args.Name] = WriteLock // No locks held on the given name, so claim write lock
}
*reply = !*reply // Negate *reply to return true when lock is granted or false otherwise
return nil
}
func (l *lockServer) Unlock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name
return fmt.Errorf("Unlock attempted on an unlocked entity: %s", args.Name)
}
if *reply = locksHeld == WriteLock; !*reply { // Unless it is a write lock
return fmt.Errorf("Unlock attempted on a read locked entity: %s (%d read locks active)", args.Name, locksHeld)
}
delete(l.lockMap, args.Name) // Remove the write lock
return nil
}
If you also want RLock()/RUnlock() functionality, then add this as well:
const ReadLock = 1
func (l *lockServer) RLock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Name]; !*reply {
l.lockMap[args.Name] = ReadLock // No locks held on the given name, so claim (first) read lock
*reply = true
} else {
if *reply = locksHeld != WriteLock; *reply { // Unless there is a write lock
l.lockMap[args.Name] = locksHeld + ReadLock // Grant another read lock
}
}
return nil
}
func (l *lockServer) RUnlock(args *LockArgs, reply *bool) error {
l.mutex.Lock()
defer l.mutex.Unlock()
var locksHeld int64
if locksHeld, *reply = l.lockMap[args.Name]; !*reply { // No lock is held on the given name
return fmt.Errorf("RUnlock attempted on an unlocked entity: %s", args.Name)
}
if *reply = locksHeld != WriteLock; !*reply { // A write-lock is held, cannot release a read lock
return fmt.Errorf("RUnlock attempted on a write locked entity: %s", args.Name)
}
if locksHeld > ReadLock {
l.lockMap[args.Name] = locksHeld - ReadLock // Remove one of the read locks held
} else {
delete(l.lockMap, args.Name) // Remove the (last) read lock
}
return nil
}
See dsync-server_test.go for a full implementation.
The full test code (including benchmarks) from sync/rwmutex_test.go
is used for testing purposes.
It is possible to trade some level of robustness with overall performance by not contacting each node for every Lock()/Unlock() cycle. In the normal case (example for n = 16
nodes) a total of 32 RPC messages is sent and the lock is granted if at least a quorum of n/2 + 1
nodes respond positively. When all nodes are functioning normally this would mean n = 16
positive responses and, in fact, n/2 - 1 = 7
responses over the (minimum) quorum of n/2 + 1 = 9
. So you could say that this is some overkill, meaning that even if 6 nodes are down you still have an extra node over the quorum.
For this case it is possible to reduce the number of nodes to be contacted to for example 12
. Instead of 32 RPC messages now 24 message will be sent which is 25% less. As the performance is mostly depending on the number of RPC messages sent, the total locks/second handled by all nodes would increase by 33% (given the same CPU load).
You do however want to make sure that you have some sort of 'random' selection of which 12 out of the 16 nodes will participate in every lock. See here for some sample code that could help with this.
Building on the previous example and depending on how resilient you want to be for outages of nodes, you can also go the other way, namely to increase the total number of nodes while keeping the number of nodes contacted per lock the same.
For instance you could imagine a system of 64 nodes where only a quorum majority of 17
would be needed out of 28
nodes. Again this requires some sort of pseudo-random 'deterministic' selection of 28 nodes out of the total of 64 servers (same example as above).
We are well aware that there are more sophisticated systems such as zookeeper, raft, etc. However we found that for our limited use case this was adding too much complexity. So if dsync
does not meet your requirements than you are probably better off using one of those systems.
Other links that you may find interesting:
net/rpc
vs grpc
We did an analysis of the performance of net/rpc
vs grpc
, see here, so we'll stick with net/rpc
for now.
Released under the Apache License v2.0. You can find the complete text in the file LICENSE.
Contributions are welcome, please send PRs for any enhancements.