-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlock.py
47 lines (41 loc) · 1.42 KB
/
lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""
Example of using fcntl.flock for locking file. Some code inspired by filelock module.
"""
import os
import fcntl
import time
def acquire(lock_file):
open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
fd = os.open(lock_file, open_mode)
pid = os.getpid()
lock_file_fd = None
timeout = 5.0
start_time = current_time = time.time()
while current_time < start_time + timeout:
try:
# The LOCK_EX means that only one process can hold the lock
# The LOCK_NB means that the fcntl.flock() is not blocking
# and we are able to implement termination of while loop,
# when timeout is reached.
# More information here:
# https://docs.python.org/3/library/fcntl.html#fcntl.flock
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError):
pass
else:
lock_file_fd = fd
break
print(f' {pid} waiting for lock')
time.sleep(1.0)
current_time = time.time()
if lock_file_fd is None:
os.close(fd)
return lock_file_fd
def release(lock_file_fd):
# Do not remove the lockfile:
#
# https://github.com/benediktschmitt/py-filelock/issues/31
# https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
fcntl.flock(lock_file_fd, fcntl.LOCK_UN)
os.close(lock_file_fd)
return None