Combining ScalaLoci and Aggregate Programming

Strengths of both worlds

  • Aggregate Programming:

    • Robustness in the presence of node failures
    • Function composition
  • ScalaLoci:

    • Multi-tier Programming Model
    • Topology definition
    • Ease of deployment

Application scenarios

Middleware Architecture

Pure aggregate scenario

Middleware Architecture

Hybrid aggregate scenario

Middleware Architecture in detail

Runtime - Executor

  def main(): Unit on AGNode = {
    while (multitier.running) {
      val result = performRound(buildContext())
      remote.call(process(mid, result))
      actuation(mid, result, localSensors)
      update(result)
    }
  }

Middleware Configuration

Sensor

@multitier trait AGNodeSensors { this: Architecture =>
  var localSensors: Local[Set[Sensor]] on AGNode = Set.empty[Sensor]

  def getLocalSensorValue(sensor: Sensor): Local[Any] on AGNode =
    localSensors
      .find(_ == sensor)
      .map(_.value)
      .getOrElse(throw new Exception(s"${sensor.name} sensor not found"))

  def setLocalSensorValue(sensorName: String, value: Any): Local[Unit] on AGNode =
    localSensors += Sensor(sensorName, value)

  def addLocalSensor(sensor: Sensor): Local[Unit] on AGNode = localSensors += sensor
}

Actuator

@multitier trait AGNodeActuator { this: Architecture =>
  def actuation(id: ID, export: EXPORT, sensors: Set[Sensor]): Local[Unit] on AGNode
}

Topology Definition

First Approach

@multitier trait ClientServer extends Architecture {
  @peer type Node
  @peer type AGNode <: Node { type Tie <: Multiple[AGNode] }

  @peer type Client <: AGNode { type Tie <: Single[Server] with Multiple[Client] }
  @peer type Server <: Node { type Tie <: Multiple[Client] }
}
@multitier object HybridSystem
    extends ScafiLociExecutor(new AverageTemperature())
    with AGNodeSensors
    with MyActuator
    with ClientServer {
  override def main(): Unit on Node = {
    on[Node] {
      super.main()
    } and on[AGNode] {
      println("Hello AGNode")
      super.main()
    } and on[Server] {
      println("Hello Base Station")
      exports().asLocalFromAll observe { case export =>
        println(s"Export: $export ")
      }
    }
  }
}

Second Approach

@multitier trait GeneralArchClientServerHybrid extends Architecture {
  @peer type Node
  @peer type SimpleNode <: Node { type Tie <: Multiple[AGNode] }
  @peer type AGNode <: Node { type Tie <: Single[SimpleNode] with Multiple[AGNode] }
}
@multitier trait MyClientServerHybrid extends GeneralArchClientServerHybrid {
  @peer type Client <: AGNode
  @peer type Server <: SimpleNode
}
@multitier object ClientServerHybrid extends ScafiLociExecutor(new NeighboursId()) 
    with AGNodeSensors 
    with MyActuator
    with MyClientServerHybrid {
  override def main(): Unit on Node = {
    on[Node] {
      super.main()
    } and on[Client] {
      println("AGNode Client")
      super.main()
    } and on[Server] {
      println("SimpleNode Server")
    }
  }
}

Launching the application

object BaseStation extends App {
  multitier start new Instance[HybridSystem.Server](
    listen[HybridSystem.Client] {
      TCP(43052)
    }
  )
}
object AGNodeBS1 extends App {
  val ei = multitier start new Instance[HybridSystem.Client](
    connect[HybridSystem.Server] {
      TCP("localhost", 43052)
    } and listen[HybridSystem.Client] {
      TCP(43053)
    }
  )

  val instance = Await.result(ei.instance, Duration.Inf)
  instance.retrieve(HybridSystem.addLocalSensor(Sensor("temperature", 20.0)))
}