了解 actor 如何提供新的应用程序代码建模方法
主要芯片厂商已经开始提供同时运行两个或更多个核的芯片(虽然不一定更快)在这种情况下并发性很快成为每个软件开发人员都关心的热门主题本文延续 Ted Neward 的另一篇文章 深入了解 Scala 并发性在本文中Ted Neward 通过研究 actor 深入讨论并发性这个热门主题actor 是通过传递消息相互协作的执行实体
关于本系列
Ted Neward 潜心研究 Scala 编程语言并带您跟他一起徜徉在这个新的 developerWorks 系列 中您将深入了解 Scala 并看到 Scala 的语言功能的实际效果在进行相关比较时Scala 代码和 Java; 代码将放在一起展示但(您将发现)Scala 中的许多内容与您在 Java 编程中发现的任何内容都没有直接关联而这正是 Scala 的魅力所在!毕竟如果 Java 代码可以做到的话又何必学习 Scala 呢?
在 前一篇文章 中我讨论了构建并发代码的重要性(无论是否是 Scala 代码)还讨论了在编写并发代码时开发人员面对的一些问题包括不要锁住太多东西不要锁住太少东西避免死锁避免生成太多线程等等
这些理论问题太沉闷了为了避免读者觉得失望我与您一起研究了 Scala 的一些并发构造首先是在 Scala 中直接使用 Java 语言的并发库的基本方法然后讨论 Scala API 中的 MailBox 类型尽管这两种方法都是可行的但是它们并不是 Scala 实现并发性的主要机制
真正提供并发性的是 Scala 的 actor
什么是 actor?
actor 实现在称为 actor 的执行实体之间使用消息传递进行协作(注意这里有意避免使用 进程线程 或 机器 等词汇)尽管它听起来与 RPC 机制有点儿相似但是它们是有区别的RPC 调用(比如 Java RMI 调用)会在调用者端阻塞直到服务器端完成处理并发送回某种响应(返回值或异常)而消息传递方法不会阻塞调用者因此可以巧妙地避免死锁
仅仅传递消息并不能避免错误的并发代码的所有问题另外这种方法还有助于使用 不共享任何东西 编程风格也就是说不同的 actor 并不访问共享的数据结构(这有助于促进封装 actor无论 actor 是 JVM 本地的还是位于其他地方) — 这样就完全不需要同步了毕竟如果不共享任何东西并发执行就不涉及任何需要同步的东西
这不算是对 actor 模型的正规描述而且毫无疑问具有更正规的计算机科学背景的人会找到各种更严谨的描述方法能够描述 actor 的所有细节但是对于本文来说这个描述已经够了在网上可以找到更详细更正规的描述还有一些学术文章详细讨论了 actor 背后的概念(请您自己决定是否要深入学习这些概念)现在我们来看看 Scala actors API
Scala actor
使用 actor 根本不困难只需使用 Actor 类的 actor 方法创建一个 actor见清单
清单 开拍!
import scalaactors_ Actor_
package comtednewardscalaexamplesscalaV
{
object Actor
{
def main(args : Array[String]) =
{
val badActor =
actor
{
receive
{
case msg => Systemoutprintln(msg)
}
}
badActor ! Do ya feel lucky punk?
}
}
}
这里同时做了两件事
首先我们从 Scala Actors 库的包中导入了这个库然后从库中直接导入了 Actor 类的成员第二步并不是完全必要的因为在后面的代码中可以使用 Actoractor 替代 actor但是这么做能够表明 actor 是语言的内置结构并(在一定程度上)提高代码的可读性
下一步是使用 actor 方法创建 actor 本身这个方法通过参数接收一个代码块在这里代码块执行一个简单的 receive(稍后讨论)结果是一个 actor它被存储在一个值引用中供以后使用
请记住除了消息之外actor 不使用其他通信方法使用 ! 的代码行实际上是一个向 badActor 发送消息的方法这可能不太直观Actor 内部还包含另一个 MailBox 元素(已讨论)! 方法接收传递过来的参数(在这里是一个字符串)把它发送给邮箱然后立即返回
消息交付给 actor 之后actor 通过调用它的 receive 方法来处理消息这个方法从邮箱中取出第一个可用的消息把它交付给一个模式匹配块注意因为这里没有指定模式匹配的类型所以任何消息都是匹配的而且消息被绑定到 msg 名称(为了打印它)
一定要注意一点对于可以发送的类型没有任何限制 — 不一定要像前面的示例那样发送字符串实际上基于 actor 的设计常常使用 Scala case 类携带实际消息本身这样就可以根据 case 类的参数/成员的类型提供隐式的 命令 或 动作或者向动作提供数据
例如假设希望 actor 用两个不同的动作来响应发送的消息新的实现可能与清单 相似
清单 嗨我是导演!
object Actor
{
case class Speak(line : String);
case class Gesture(bodyPart : String action : String);
case class NegotiateNewContract;
def main(args : Array[String]) =
{
val badActor =
actor
{
receive
{
case NegotiateNewContract =>
Systemoutprintln(I wont do it for less than $ million!)
case Speak(line) =>
Systemoutprintln(line)
case Gesture(bodyPart action) =>
Systemoutprintln(( + action + s + bodyPart + ))
case _ =>
Systemoutprintln(Huh? Ill be in my trailer)
}
}
badActor ! NegotiateNewContract
badActor ! Speak(Do ya feel lucky punk?)
badActor ! Gesture(face grimaces)
badActor ! Speak(Well do ya?)
}
}
到目前为止看起来似乎没问题但是在运行时只协商了新合同在此之后JVM 终止了初看上去似乎是生成的线程无法足够快地响应消息但是要记住在 actor 模型中并不处理线程只处理消息传递这里的问题其实非常简单一次接收使用一个消息所以无论队列中有多少个消息正在等待处理都无所谓因为只有一次接收所以只交付一个消息
纠正这个问题需要对代码做以下修改见清单
把 receive 块放在一个接近无限的循环中
创建一个新的 case 类来表示什么时候处理全部完成了
清单 现在我是一个更好的导演!
object Actor
{
case class Speak(line : String);
case class Gesture(bodyPart : String action : String);
case class NegotiateNewContract;
case class ThatsAWrap;
def main(args : Array[String]) =
{
val badActor =
actor
{
var done = false
while (! done)
{
receive
{
case NegotiateNewContract =>
Systemoutprintln(I wont do it for less than $ million!)
case Speak(line) =>
Systemoutprintln(line)
case Gesture(bodyPart action) =>
Systemoutprintln(( + action + s + bodyPart + ))
case ThatsAWrap =>
Systemoutprintln(Great cast party everybody! See ya!)
done = true
case _ =>
Systemoutprintln(Huh? Ill be in my trailer)
}
}
}
badActor ! NegotiateNewContract
badActor ! Speak(Do ya feel lucky punk?)
badActor ! Gesture(face grimaces)
badActor ! Speak(Well do ya?)
badActor ! ThatsAWrap
}
}
这下行了!使用 Scala actor 就这么容易
并发地执行动作
上面的代码没有反映出并发性 — 到目前为止给出的代码更像是另一种异步的方法调用形式您看不出区别(从技术上说在第二个示例中引入接近无限循环之前的代码中可以猜出有一定的并发性存在但这只是偶然的证据不是明确的证明)
为了证明在幕后确实有多个线程存在我们深入研究一下前一个示例
清单 我要拍特写了
object Actor
{
case class Speak(line : String);
case class Gesture(bodyPart : String action : String);
case class NegotiateNewContract;
case class ThatsAWrap;
def main(args : Array[String]) =
{
def ct =
Thread + ThreadcurrentThread()getName() + :
val badActor =
actor
{
var done = false
while (! done)
{
receive
{
case NegotiateNewContract =>
Systemoutprintln(ct + I wont do it for less than $ million!)
case Speak(line) =>
Systemoutprintln(ct + line)
case Gesture(bodyPart action) =>
Systemoutprintln(ct + ( + action + s + bodyPart + ))
case ThatsAWrap =>
Systemoutprintln(ct + Great cast party everybody! See ya!)
done = true
case _ =>
Systemoutprintln(ct + Huh? Ill be in my trailer)
}
}
}
Systemoutprintln(ct + Negotiating)
badActor ! NegotiateNewContract
Systemoutprintln(ct + Speaking)
badActor ! Speak(Do ya feel lucky punk?)
Systemoutprintln(ct + Gesturing)
badActor ! Gesture(face grimaces)
Systemoutprintln(ct + Speaking again)
badActor ! Speak(Well do ya?)
Systemoutprintln(ct + Wrapping up)
badActor ! ThatsAWrap
}
}
运行这个新示例就会非常明确地发现确实有两个不同的线程
main 线程(所有 Java 程序都以它开始)
Thread 线程它是 Scala Actors 库在幕后生成的
因此在启动第一个 actor 时本质上已经开始了多线程执行
但是习惯这种新的执行模型可能有点儿困难因为这是一种全新的并发性考虑方式例如请考虑 前一篇文章 中的 Producer/Consumer 模型那里有大量代码尤其是在 Drop 类中我们可以清楚地看到线程之间以及线程与保证所有东西同步的监视器之间有哪些交互活动为了便于参考我在这里给出前一篇文章中的 V 代码
清单 ProdConSamplev (Scala)
package comtednewardscalaexamplesscalaV
{
import concurrentMailBox
import concurrentops_
object ProdConSample
{
class Drop
{
private val m = new MailBox()
private case class Empty()
private case class Full(x : String)
m send Empty() // initialization
def put(msg : String) : Unit =
{
m receive
{
case Empty() =>
m send Full(msg)
}
}
def take() : String =
{
m receive
{
case Full(msg) =>
m send Empty(); msg
}
}
}
def main(args : Array[String]) : Unit =
{
// Create Drop
val drop = new Drop()
// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
Mares eat oats
Does eat oats
Little lambs eat ivy
A kid will eat ivy too
);
importantInfoforeach((msg) => dropput(msg))
dropput(DONE)
}
// Spawn Consumer
spawn
{
var message = droptake()
while (message != DONE)
{
Systemoutformat(MESSAGE RECEIVED: %s%n message)
message = droptake()
}
}
}
}
}
尽管看到 Scala 如何简化这些代码很有意思但是它实际上与原来的 Java 版本没有概念性差异现在看看如果把 Producer/Consumer 示例的基于 actor 的版本缩减到最基本的形式它会是什么样子
清单 Take 开拍!生产!消费!
object ProdConSample
{
case class Message(msg : String)
def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
Systemoutprintln(Received message! > + msg)
done = (msg == DONE)
}
}
}
consumer ! Mares eat oats
consumer ! Does eat oats
consumer ! Little lambs eat ivy
consumer ! Kids eat ivy too
consumer ! DONE
}
}
第一个版本确实简短多了而且在某些情况下可能能够完成所需的所有工作但是如果运行这段代码并与以前的版本做比较就会发现一个重要的差异 — 基于 actor 的版本是一个多位置缓沖区而不是我们以前使用的单位置缓沖这看起来是一项改进而不是缺陷但是我们要通过对比确认这一点我们来创建 Drop 的基于 actor 的版本在这个版本中所有对 put() 的调用必须由对 take() 的调用进行平衡
幸运的是Scala Actors 库很容易模拟这种功能希望让 Producer 一直阻塞直到 Consumer 接收了消息实现的方法很简单让 Producer 一直阻塞直到它从 Consumer 收到已经接收消息的确认从某种意义上说这就是以前的基于监视器的代码所做的那个版本通过对锁对象使用监视器发送这种信号
在 Scala Actors 库中最容易的实现方法是使用 !? 方法而不是 ! 方法(这样就会一直阻塞到收到确认时)(在 Scala Actors 实现中每个 Java 线程都是一个 actor所以回复会发送到与 main 线程隐式关联的邮箱)这意味着 Consumer 需要发送某种确认这要使用隐式继承的 reply(它还继承 receive 方法)见清单
清单 Take 开拍!
object ProdConSample
{
case class Message(msg : String)
def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
Systemoutprintln(Received message! > + msg)
done = (msg == DONE)
reply(RECEIVED)
}
}
}
Systemoutprintln(Sending)
consumer !? Mares eat oats
Systemoutprintln(Sending)
consumer !? Does eat oats
Systemoutprintln(Sending)
consumer !? Little lambs eat ivy
Systemoutprintln(Sending)
consumer !? Kids eat ivy too
Systemoutprintln(Sending)
consumer !? DONE
}
}
如果喜欢使用 spawn 把 Producer 放在 main() 之外的另一个线程中(这非常接近最初的代码)那么代码可能像清单 这样
清单 Take 开拍!
object ProdConSampleUsingSpawn
{
import concurrentops_
def main(args : Array[String]) : Unit =
{
// Spawn Consumer
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
Systemoutprintln(MESSAGE RECEIVED: + msg)
done = (msg == DONE)
reply(RECEIVED)
}
}
}
// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
Mares eat oats
Does eat oats
Little lambs eat ivy
A kid will eat ivy too
DONE
);
importantInfoforeach((msg) => consumer !? msg)
}
}
}
无论从哪个角度来看基于 actor 的版本都比原来的版本简单多了读者只要让 actor 和隐含的邮箱自己发挥作用即可
但是这并不简单actor 模型完全颠覆了考虑并发性和线程安全的整个过程在以前的模型中我们主要关注共享的数据结构(数据并发性)而现在主要关注操作数据的代码本身的结构(任务并发性)尽可能少共享数据请注意 Producer/Consumer 示例的不同版本的差异在以前的示例中并发功能是围绕 Drop 类(有界限的缓沖区)显式编写的在本文中的版本中Drop 甚至没有出现重点在于两个 actor(线程)以及它们之间的交互(通过不共享任何东西的消息)
当然仍然可以用 actor 构建以数据为中心的并发构造只是必须采用稍有差异的方式请考虑一个简单的 计数器 对象它使用 actor 消息传达 increment 和 get 操作见清单
清单 Take 计数!
object CountingSample
{
case class Incr
case class Value(sender : Actor)
case class Lock(sender : Actor)
case class UnLock(value : Int)
class Counter extends Actor
{
override def act(): Unit = loop()
def loop(value: int): Unit = {
receive {
case Incr() => loop(value + )
case Value(a) => a ! value; loop(value)
case Lock(a) => a ! value
receive { case UnLock(v) => loop(v) }
case _ => loop(value)
}
}
}
def main(args : Array[String]) : Unit =
{
val counter = new Counter
counterstart()
counter ! Incr()
counter ! Incr()
counter ! Incr()
counter ! Value(self)
receive { case cvalue => Consoleprintln(cvalue) }
counter ! Incr()
counter ! Incr()
counter ! Value(self)
receive { case cvalue => Consoleprintln(cvalue) }
}
}
为了进一步扩展 Producer/Consumer 示例清单 给出一个在内部使用 actor 的 Drop 版本(这样其他 Java 类就可以使用这个 Drop而不需要直接调用 actor 的方法)
清单 在内部使用 actor 的 Drop
object ActorDropSample
{
class Drop
{
private case class Put(x: String)
private case object Take
private case object Stop
private val buffer =
actor
{
var data =
loop
{
react
{
case Put(x) if data == =>
data = x; reply()
case Take if data != =>
val r = data; data = ; reply(r)
case Stop =>
reply(); exit(stopped)
}
}
}
def put(x: String) { buffer !? Put(x) }
def take() : String = (buffer !? Take)asInstanceOf[String]
def stop() { buffer !? Stop }
}
def main(args : Array[String]) : Unit =
{
import concurrentops_
// Create Drop
val drop = new Drop()
// Spawn Producer
spawn
{
val importantInfo : Array[String] = Array(
Mares eat oats
Does eat oats
Little lambs eat ivy
A kid will eat ivy too
);
importantInfoforeach((msg) => { dropput(msg) })
dropput(DONE)
}
// Spawn Consumer
spawn
{
var message = droptake()
while (message != DONE)
{
Systemoutformat(MESSAGE RECEIVED: %s%n message)
message = droptake()
}
dropstop()
}
}
}
可以看到这需要更多代码(和更多的线程因为每个 actor 都在一个线程池内部起作用)但是这个版本的 API 与以前的版本相同它把所有与并发性相关的代码都放在 Drop 内部这正是 Java 开发人员所期望的
actor 还有更多特性
在规模很大的系统中让每个 actor 都由一个 Java 线程支持是非常浪费资源的尤其是在 actor 的等待时间比处理时间长的情况下在这些情况下基于事件的 actor 可能更合适这种 actor 实际上放在一个闭包中闭包捕捉 actor 的其他动作也就是说现在并不通过线程状态和寄存器表示代码块(函数)当一个消息到达 actor 时(这时显然需要活动的线程)触发闭包闭包在它的活动期间借用一个活动的线程然后通过回调本身终止或进入 等待 状态这样就会释放线程(请参见 参考资料 中 Haller/Odersky 的文章)
在 Scala Actors 库中这要使用 react 方法而不是前面使用的 receive使用 react 的关键是在形式上 react 不能返回所以 react 中的实现必须重复调用包含 react 块的代码块简便方法是使用 loop 结构创建一个接近无限的循环这意味着 清单 中的 Drop 实现实际上只通过借用调用者的线程执行操作这会减少执行所有操作所需的线程数(在实践中我还没有见过在简单的示例中出现这种效果所以我想我们只能暂且相信 Scala 设计者的说法)
在某些情况下可能选择通过派生基本的 Actor 类(在这种情况下必须定义 act 方法否则类仍然是抽象的)创建一个新类它隐式地作为 actor 执行尽管这是可行的但是这种思想在 Scala 社区中不受欢迎在一般情况下我在这里描述的方法(使用 Actor 对象中的 actor 方法)是创建 actor 的首选方法
结束语
因为 actor 编程需要与 传统 对象编程不同的风格所以在使用 actor 时要记住几点
首先actor 的主要能力来源于消息传递风格而不采用阻塞调用风格这是它的主要特点(有意思的是也有使用消息传递作为核心机制的面向对象语言最知名的两个例子是 ObjectiveC 和 Smalltalk还有 ThoughtWorker 的 Ola Bini 新创建的 Ioke)如果创建直接或间接扩展 Actor 的类那么要确保对对象的所有调用都通过消息传递进行
第二因为可以在任何时候交付消息而且更重要的是在发送和接收之间可能有相当长的延迟所以一定要确保消息携带正确地处理它们所需的所有状态这种方式会
让代码更容易理解(因为消息携带处理所需的所有状态)
减少 actor 访问某些地方的共享状态的可能性从而减少发生死锁或其他并发性问题的机会
第三actor 应该不会阻塞您从前面的内容应该能够看出这一点从本质上说阻塞是导致死锁的原因代码可能产生的阻塞越少发生死锁的可能性就越低
很有意思的是如果您熟悉 Java Message Service (JMS) API就会发现我给出的这些建议在很大程度上也适用于 JMS — 毕竟actor 消息传递风格只是在实体之间传递消息JMS 消息传递也是在实体之间传递消息它们的差异在于JMS 消息往往比较大在层和进程级别上操作而 actor 消息往往比较小在对象和线程级别上操作如果您掌握了 JMSactor 也不难掌握
actor 并不是解决所有并发性问题的万灵药但是它们为应用程序或库代码的建模提供了一种新的方式所用的构造相当简单明了尽管它们的工作方式有时与您预期的不一样但是一些行为正是我们所熟悉的 — 毕竟我们在最初使用对象时也有点不习惯只要经过努力您也会掌握并喜欢上 actor