Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
- Updates to fix adding worker node
Browse files Browse the repository at this point in the history
  • Loading branch information
johnsonw committed Jun 4, 2020
1 parent f10ba6b commit 344e812
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 43 deletions.
4 changes: 4 additions & 0 deletions chroma_core/models/client_mount.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def get_deps(self, state=None):
state = self.state

deps = []

if self.host.immutable_state:
return DependAll(deps)

if state == "mounted":
# Depend on this mount's host having LNet up. If LNet is stopped
# on the host, this filesystem will be unmounted first.
Expand Down
49 changes: 31 additions & 18 deletions chroma_core/models/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ class BaseSetupHostJob(NullStateChangeJob):
class Meta:
abstract = True

def _common_deps(self, lnet_state_required, lnet_acceptable_states, lnet_unacceptable_states):
def _common_deps(self):
# It really does not feel right that this is in here, but it does sort of work. These are the things
# it is dependent on so create them. Also I can't work out with today's state machine anywhere else to
# put them that works.
Expand Down Expand Up @@ -826,23 +826,13 @@ def _common_deps(self, lnet_state_required, lnet_acceptable_states, lnet_unaccep

deps = []

if self.target_object.lnet_configuration:
deps.append(
DependOn(
self.target_object.lnet_configuration,
lnet_state_required,
lnet_acceptable_states,
lnet_unacceptable_states,
)
)

if self.target_object.pacemaker_configuration:
deps.append(DependOn(self.target_object.pacemaker_configuration, "started"))

if self.target_object.ntp_configuration:
deps.append(DependOn(self.target_object.ntp_configuration, "configured"))

return DependAll(deps)
return deps


class InitialiseBlockDeviceDriversStep(Step):
Expand Down Expand Up @@ -871,7 +861,17 @@ def description(self):
return help_text["setup_managed_host_on"] % self.target_object

def get_deps(self):
return self._common_deps("lnet_up", None, None)
deps = self._common_deps()

if self.target_object.lnet_configuration:
deps.append(
DependOn(
self.target_object.lnet_configuration,
"lnet_up"
)
)

return DependAll(deps)

def get_steps(self):
return [(InitialiseBlockDeviceDriversStep, {"host": self.target_object})]
Expand All @@ -891,9 +891,9 @@ class Meta:
ordering = ["id"]

def get_deps(self):
# Moving out of unconfigured into lnet_unloaded will mean that lnet will start monitoring and responding to
# the state. Once we start monitoring any state other than unconfigured is acceptable.
return self._common_deps("lnet_unloaded", None, ["unconfigured"])
deps = self._common_deps()

return DependAll(deps)

def description(self):
return help_text["setup_monitored_host_on"] % self.target_object
Expand All @@ -913,14 +913,24 @@ class Meta:
ordering = ["id"]

def get_deps(self):
return self._common_deps("lnet_up", None, None)
deps = self._common_deps()

if self.target_object.lnet_configuration and not self.target_object.immutable_state:
deps.append(
DependOn(
self.target_object.lnet_configuration,
"lnet_up"
)
)

return DependAll(deps)

def description(self):
return help_text["setup_worker_host_on"] % self.target_object

@classmethod
def can_run(cls, host):
return host.is_managed and host.is_worker and (host.state != "unconfigured")
return host.is_worker and (host.state != "unconfigured")


class DetectTargetsStep(Step):
Expand Down Expand Up @@ -1174,6 +1184,9 @@ def description(self):
def get_deps(self):
deps = []

if self.host.immutable_state:
return DependAll(deps)

if self.host.lnet_configuration:
deps.append(DependOn(self.host.lnet_configuration, "unconfigured"))

Expand Down
1 change: 0 additions & 1 deletion chroma_core/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ def all_deps(self, dep_cache):
DependAll(dependent_deps),
dep_cache.get(self),
dep_cache.get(stateful_object, new_state),
DependOn(stateful_object, self.old_state),
)
else:
return dep_cache.get(self)
Expand Down
14 changes: 10 additions & 4 deletions chroma_core/services/job_scheduler/job_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,22 +1106,28 @@ def key(td):
# if we have an entry with 'root'=true then move it to the front of the list before returning the result
return sorted(sorted_list, key=lambda entry: entry.get("root", False), reverse=True)

def create_client_mount(self, host_id, filesystem_name, mountpoint):
def create_client_mount(self, host_id, filesystem_id, mountpoint, existing):
# RPC-callable
host = ObjectCache.get_one(ManagedHost, lambda mh: mh.id == host_id)
mount = self._create_client_mount(host, filesystem_name, mountpoint)
filesystem = ObjectCache.get_one(ManagedFilesystem, lambda mf: mf.id == filesystem_id)

mount = self._create_client_mount(host, filesystem, mountpoint, existing)

self.progress.advance()
return mount.id

def _create_client_mount(self, host, filesystem_name, mountpoint):
def _create_client_mount(self, host, filesystem, mountpoint, existing = False):
# Used for intra-JobScheduler calls
log.debug("Creating client mount for %s as %s:%s" % (filesystem_name, host, mountpoint))

with self._lock:
from django.db import transaction

with transaction.atomic():
mount, created = LustreClientMount.objects.get_or_create(host=host, filesystem=filesystem_name)
mount, created = LustreClientMount.objects.get_or_create(host=host, filesystem=filesystem)
if existing:
mount.state = "mounted"

mount.mountpoint = mountpoint
mount.save()

Expand Down
4 changes: 2 additions & 2 deletions chroma_core/services/job_scheduler/job_scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ def create_targets(cls, targets_data):
return (list(ManagedTarget.objects.filter(id__in=target_ids)), Command.objects.get(pk=command_id))

@classmethod
def create_client_mount(cls, host, filesystem_name, mountpoint):
def create_client_mount(cls, host, filesystem, mountpoint, existing):
from chroma_core.models import LustreClientMount

client_mount_id = JobSchedulerRpc().create_client_mount(host.id, filesystem_name, mountpoint)
client_mount_id = JobSchedulerRpc().create_client_mount(host.id, filesystem.id, mountpoint, existing)
return LustreClientMount.objects.get(id=client_mount_id)

@classmethod
Expand Down
3 changes: 2 additions & 1 deletion chroma_core/services/lustre_audit/update_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ def update_client_mounts(self):
log.info("updated mount %s on %s -> active" % (actual_mount["mountpoint"], self.host))
except IndexError:
log.info("creating new mount %s on %s" % (actual_mount["mountpoint"], self.host))
JobSchedulerClient.create_client_mount(self.host, fsname, actual_mount["mountpoint"])
filesystem = ManagedFilesystem.objects.get(name=fsname)
JobSchedulerClient.create_client_mount(self.host, filesystem, actual_mount["mountpoint"], True)

def update_target_mounts(self):
# If mounts is None then nothing changed since the last update and so we can just return.
Expand Down
30 changes: 13 additions & 17 deletions chroma_core/services/plugin_runner/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,23 +901,19 @@ def _persist_nid_updates(self, scannable_id, changed_resource_id, changed_attrs)
for lnet_state_resource in node_resources[LNETModules]:
lnet_state = lnet_state_resource.to_resource()

# Really this code should be more tightly tied to the lnet_configuration classes, but in a one step
# at a time approach. Until lnet is !unconfigured we should not be updating it's state.
# Double if because the first if should be removed really, in some more perfect future.
if host.lnet_configuration.state != "unconfigured":
if lnet_state.host_id == host.id:
lnet_configuration = LNetConfiguration.objects.get(host=lnet_state.host_id)

# Truthfully this should use the notify which I've added as a comment to show the correct way. The problem is that
# during the ConfigureLNetJob the state is changed to unloaded and this masks the notify in some way the is probably
# as planned but prevents it being set back. What we really need is to somehow get a single command that goes
# to a state and then to another state. post_dependencies almost. At present I can't see how to do this so I am leaving
# this code as is.
lnet_configuration.set_state(lnet_state.state)
lnet_configuration.save()
# JobSchedulerClient.notify(lnet_configuration, now(), {'state': lnet_state.state})

log.debug("_persist_nid_updates lnet_configuration %s" % lnet_configuration)
if lnet_state.host_id == host.id:
lnet_configuration = LNetConfiguration.objects.get(host=lnet_state.host_id)

# Truthfully this should use the notify which I've added as a comment to show the correct way. The problem is that
# during the ConfigureLNetJob the state is changed to unloaded and this masks the notify in some way the is probably
# as planned but prevents it being set back. What we really need is to somehow get a single command that goes
# to a state and then to another state. post_dependencies almost. At present I can't see how to do this so I am leaving
# this code as is.
lnet_configuration.set_state(lnet_state.state)
lnet_configuration.save()
# JobSchedulerClient.notify(lnet_configuration, now(), {'state': lnet_state.state})

log.debug("_persist_nid_updates lnet_configuration %s" % lnet_configuration)

# Only get the lnet_configuration if we actually have a LNetInterface (nid) to add.
if len(node_resources[LNETInterface]) > 0:
Expand Down

0 comments on commit 344e812

Please sign in to comment.