forked from ourresearch/jump-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinstitution.py
146 lines (121 loc) · 5.46 KB
/
institution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import datetime
from cached_property import cached_property
import shortuuid
from sqlalchemy.orm import relationship
from collections import OrderedDict
from app import db
from app import get_db_cursor
from grid_id import GridId
from ror_id import RorId
from permission import UserInstitutionPermission
from user import User
class Institution(db.Model):
__tablename__ = 'jump_institution'
id = db.Column(db.Text, primary_key=True)
display_name = db.Column(db.Text)
created = db.Column(db.DateTime)
is_consortium = db.Column(db.Boolean)
consortium_id = db.Column(db.Text)
old_username = db.Column(db.Text)
is_demo_institution = db.Column(db.Boolean)
grid_ids = relationship(GridId, lazy='subquery')
ror_ids = relationship(RorId, lazy='subquery')
def user_permissions(self, is_consortium=None):
user_ids = db.session.query(UserInstitutionPermission.user_id).filter(
UserInstitutionPermission.institution_id == self.id).distinct()
users = User.query.filter(User.id.in_(user_ids)).all()
# permission_dicts = [u.to_dict_permissions()[self.id] for u in users if (u.email and (not u.email.startswith("team+")))]
permission_dicts = [u.to_dict_permissions()[self.id] for u in users]
if is_consortium is not None:
permission_dicts = [d for d in permission_dicts if d["is_consortium"]==is_consortium]
return permission_dicts
@cached_property
def feedback_sets(self):
response = [my_package for my_package in self.packages_sorted if my_package.is_feedback_package]
return response
@cached_property
def packages_sorted(self):
packages = self.packages
response = [my_package for my_package in packages if not my_package.is_deleted]
response.sort(key=lambda k: k.package_name, reverse=False)
response.sort(key=lambda k: k.is_owned_by_consortium, reverse=False) #minor
# publisher will no longer be required so can not sort on it
# response.sort(key=lambda k: k.publisher, reverse=False) #main sorting key is last
return response
@cached_property
def is_consortium_member(self):
for my_package in self.packages:
if my_package.is_owned_by_consortium:
return True
return False
@cached_property
def is_jisc(self):
from app import JISC_INSTITUTION_ID
if self.id == JISC_INSTITUTION_ID:
return True
# jisc member institutions
if "jisc" in self.id:
return True
# n8_institution_id
if self.id == "institution-Tfi2z4svqqkU":
return True
# test_institution_id
if self.id == "institution-WzH2RdcHUPoR":
return True
# our testing instiutions
if "institution-testing" in self.id:
return True
return False
def to_dict(self):
return OrderedDict([
("id", self.id),
("grid_ids", [g.grid_id for g in self.grid_ids]),
("ror_ids", [r.ror_id for r in self.ror_ids]),
("name", self.display_name),
("is_demo", self.is_demo_institution),
("is_consortium", self.is_consortium),
("is_consortium_member", self.is_consortium_member),
("user_permissions", self.user_permissions()),
("institutions", self.user_permissions(is_consortium=False)),
("consortia", self.user_permissions(is_consortium=True)),
("publishers", [p.to_dict_minimal() for p in self.packages_sorted]),
("consortial_proposal_sets", [p.to_dict_minimal_feedback_set() for p in self.packages_sorted if p.is_feedback_package]),
("is_jisc", self.is_jisc)
])
@cached_property
def apc_data_from_db(self):
if self.is_consortium:
qry = """
select * from jump_apc_institutional_authorships
where institution_id in (
select ji.id from jump_consortium_members jcm
join jump_account_package jp on jcm.member_package_id=jp.package_id
join jump_institution ji on jp.institution_id=ji.id
where jp.package_id in (
select distinct(member_package_id) from jump_consortium_members
where consortium_package_id in (
select DISTINCT(package_id) from jump_account_package
where institution_id = %s
)
)
)
"""
with get_db_cursor(use_realdictcursor=True) as cursor:
print(cursor.mogrify(qry, (self.id,)))
cursor.execute(qry, (self.id,))
rows = cursor.fetchall()
else:
qry = "select * from jump_apc_institutional_authorships where institution_id = %s"
with get_db_cursor(use_realdictcursor=True) as cursor:
print(cursor.mogrify(qry, (self.id,)))
cursor.execute(qry, (self.id,))
rows = cursor.fetchall()
return rows
def to_dict_apc(self):
return self.apc_data_from_db
def __init__(self, **kwargs):
self.id = 'institution-{}'.format(shortuuid.uuid()[0:12])
self.created = datetime.datetime.utcnow().isoformat()
super(Institution, self).__init__(**kwargs)
def __repr__(self):
return "<{} ({}) {}>".format(self.__class__.__name__, self.id, self.display_name)