import threading
from motey.communication.api_routes.service import Service as ServiceEndpoint
from motey.models.image_state import ImageState
from motey.models.service_state import ServiceState
from motey.utils.network_utils import get_own_ip
[docs]class InterNodeOrchestrator(object):
"""
This class orchestrates services.
It will start and stop virtual instances of images defined in the service.
It also can communicate with other nodes to start instances there if the requirements does not fit with the
possibilities of the current node.
"""
def __init__(self, logger, valmanager, service_repository, capability_repository, node_repository,
communication_manager):
"""
Constructor of the class.
:param logger: DI injected
:type logger: motey.utils.logger.Logger
:param valmanager: DI injected
:type valmanager: motey.val.valmanager.VALManager
:param service_repository: DI injected
:type service_repository: motey.repositories.service_repository.ServiceRepository
:param capability_repository: DI injected
:type capability_repository: motey.capabilityengine.capabiltiy_engine.CapabilityEngine
:param node_repository: DI injected
:type node_repository: motey.repositories.node_repository.NodeRepository
:param communication_manager: DI injected
:type communication_manager: motey.communication.communication_manager.CommunicationManager
"""
self.logger = logger
self.valmanager = valmanager
self.service_repository = service_repository
self.capability_repository = capability_repository
self.node_repository = node_repository
self.communication_manager = communication_manager
self.yaml_post_stream = ServiceEndpoint.yaml_post_stream.subscribe(self.instantiate_service)
self.yaml_delete_stream = ServiceEndpoint.yaml_delete_stream.subscribe(self.terminate_service)
[docs] def instantiate_service(self, service):
"""
Instantiate a service.
:param service: the service to be used.
:type service: motey.models.service.Service
"""
def __inner_instantiate(inner_service):
"""
Inner function which is used to run in a thread to instantiate a service.
:param inner_service: the service to be used.
:type inner_service: motey.models.service.Service
"""
inner_service.state = ServiceState.INSTANTIATING
self.service_repository.add(dict(inner_service))
for image in inner_service.images:
if not image.capabilities:
# no capabilities, deploy locally
image.node = get_own_ip()
continue
for capability in image.capabilities:
if not self.capability_repository.has(capability=capability):
# if a single capability is not satisfied, search for external node
node = self.find_node(image)
if node:
image.node = node['ip']
# found a node which handle the container - we can break the loop
break
else:
# does not found any node - error
inner_service.state = ServiceState.ERROR
break
else:
# never broke - all capabilities are succeeded locally
image.node = get_own_ip()
if inner_service.state == ServiceState.ERROR:
self.service_repository.update(dict(inner_service))
break
else:
# never broke - no errors occurred - deploy
self.service_repository.update(dict(inner_service))
self.deploy_service(service=inner_service)
worker_thread = threading.Thread(target=__inner_instantiate, args=(service,))
worker_thread.daemon = True
worker_thread.start()
[docs] def deploy_service(self, service):
"""
Deploy all images of a service to the related nodes.
:param service: the service which should be deployed
:type service: motey.models.service.Service
"""
for image in service.images:
image.id = self.communication_manager.deploy_image(image)
# TODO: store new image id
self.service_repository.update(dict(service))
[docs] def get_service_status(self, service):
"""
Returns the service status.
:param service: the service which should be used
:type service: motey.models.service.Service
:return: the status of the service
"""
image_status_list = []
for image in service.images:
image_status = self.communication_manager.request_image_status(image)
image_status_list.append(image_status)
if ImageState.ERROR in image_status_list:
self.terminate_service(service=service)
service.state = ServiceState.ERROR
elif ImageState.TERMINATED in image_status_list:
self.terminate_service(service=service)
service.state = ServiceState.TERMINATED
elif ImageState.STOPPING in image_status_list:
self.terminate_service(service=service)
service.state = ServiceState.STOPPING
elif ImageState.INSTANTIATING in image_status_list:
service.state = ServiceState.INSTANTIATING
elif ImageState.INITIAL in image_status_list:
service.state = ServiceState.INITIAL
elif len(image_status_list) > 0 and image_status_list[1:] == image_status_list[:-1] and \
image_status_list[0] == ImageState.RUNNING:
service.state = ServiceState.RUNNING
else:
service.state = ServiceState.ERROR
self.service_repository.update(dict(service))
return service.state
[docs] def compare_capabilities(self, needed_capabilities_list, node_capabilities_dict):
"""
Compares two dicts with capabilities.
:param needed_capabilities_list: the capabilities to compare with
:type needed_capabilities_list: list
:param node_capabilities_dict: the capabilties to check
:type node_capabilities_dict: list
:return: True if all capabilities are fulfilled, otherwiese False
"""
for capability in needed_capabilities_list:
for node_capability in node_capabilities_dict:
if node_capability['capability'] == capability:
# found them
break
else:
# never broke - capability not found - break outer loop and try next node
return False
return True
[docs] def find_node(self, image):
"""
Try to find a node in the cluster which can be used to deploy the given image.
:param image: the image to be used
:type image: motey.models.image.Image
:return: the IP of the node to be used or None if it does not found a node which fulfill all capabilities
"""
for node in self.node_repository.all():
capabilities = self.communication_manager.request_capabilities(node['ip'])
if self.compare_capabilities(needed_capabilities_list=image.capabilities, node_capabilities_dict=capabilities):
return node
return None
[docs] def terminate_service(self, service):
"""
Terminates a service.
:param service: the service to be used.
:type service: motey.models.service.Service
"""
def __inner_terminate(inner_service):
"""
Inner function which is used to run in a thread to terminates a service.
:param inner_service: the service to be used.
:type inner_service: motey.models.service.Service
"""
if self.service_repository.has(service_id=inner_service.id):
inner_service.state = ServiceState.STOPPING
self.service_repository.update(dict(inner_service))
for image in inner_service.images:
self.communication_manager.terminate_image(image)
else:
self.logger.error(
'Service `%s` with the id `%s` is not available' % (inner_service.service_name, inner_service.id))
worker_thread = threading.Thread(target=__inner_terminate, args=(service,))
worker_thread.daemon = True
worker_thread.start()