-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfileutil.py
138 lines (122 loc) · 3.86 KB
/
fileutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import sys, os, errno, shutil
from async import async_call, coroutine
from dialogs import dialogs
from fileutil_common import *
def atomic_write_file(path, data, mode="wb"):
temp = os.path.join(os.path.dirname(path), ".saving." + os.path.basename(path))
try:
with open(temp, mode) as out:
try:
shutil.copystat(path, temp)
except OSError:
pass
out.write(data)
except Exception as e:
try:
os.remove(temp)
except Exception:
pass
raise e
else:
try:
rename(temp, path)
except Exception as e:
try:
os.remove(temp)
except Exception:
pass
raise e
def read_file(path, mode="rb"):
with open(path, mode) as f:
return f.read()
rename = os.rename
def remove(path):
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
def mkdir(path):
try:
os.mkdir(path)
return True
except OSError as e:
if e.errno == errno.EEXIST:
return False
raise
def mkpath(path):
dirpath = path
parts = []
while dirpath != os.path.dirname(dirpath):
try:
os.mkdir(dirpath)
break
except OSError as e:
if e.errno == errno.EEXIST:
break
elif e.errno == errno.ENOENT or (sys.platform == "win32" and e.winerror == 3):
parts.append(os.path.basename(dirpath))
dirpath = os.path.dirname(dirpath)
else:
raise
for part in reversed(parts):
dirpath = os.path.join(dirpath, part)
os.mkdir(dirpath)
def is_hidden_file(path):
return os.path.basename(path).startswith(".")
def is_mount_point(path):
stat = os.stat(path)
parent_stat = os.stat(os.path.join(path, ".."))
return stat.st_dev != parent_stat.st_dev or stat.st_ino == parent_stat.st_ino
def mount_point(path):
while not is_mount_point(path):
path = os.path.join(path, "..")
return path
def can_use_directory(path):
return os.access(path, os.R_OK | os.W_OK | os.X_OK)
def can_use_file(path):
return os.access(path, os.R_OK | os.W_OK)
@coroutine
def shell_remove(path, parent=None):
if ask_delete_file(parent, path):
try:
yield async_call(remove, path)
except Exception as e:
dialogs.error(parent, "Error deleting file:\n\n%s" % e)
@coroutine
def shell_move(srcpath, dstpath, parent=None):
if destination_is_same(srcpath, dstpath):
return
if ask_move_file(parent, srcpath, dstpath):
dstfile = os.path.join(dstpath, os.path.basename(srcpath))
if os.path.exists(dstfile):
if not ask_overwrite_file(parent, dstfile):
return
try:
os.remove(dstfile)
except Exception as e:
dialogs.error(parent, "Error overwriting file file:\n\n%s" % e)
return
try:
yield async_call(shutil.move, srcpath, dstpath)
except Exception as e:
dialogs.error(parent, "Error moving file:\n\n%s" % e)
@coroutine
def shell_copy(srcpath, dstpath, parent=None):
if destination_is_same(srcpath, dstpath):
return
if ask_copy_file(parent, srcpath, dstpath):
try:
yield async_call(shutil.copy2, srcpath, dstpath)
except Exception as e:
dialogs.error(parent, "Error copying file:\n\n%s" % e)
def shell_move_or_copy(srcpath, dstpath, parent=None):
srcdev = os.stat(srcpath).st_dev
dstdev = os.stat(dstpath).st_dev
if srcdev == dstdev:
return shell_move(srcpath, dstpath, parent)
else:
return shell_copy(srcpath, dstpath, parent)
if sys.platform == "win32":
from fileutil_win32 import *
else:
from fileutil_posix import *