java

位置:IT落伍者 >> java >> 浏览文章

jscalazipKBHTTP

               

面向Java开发人员的Scala指南: 深入了解Scala并发性 了解 Scala 如何简化并发编


发布日期:2024年05月02日
 
面向Java开发人员的Scala指南: 深入了解Scala并发性 了解 Scala 如何简化并发编

了解 Scala 如何简化并发编程并绕过陷阱

对于许多(如果不是大多数)Java? 程序员来说Scala 的吸引力在于处理并发性以及编写线程安全的代码时非常轻松在本期文章中Ted Neward 将开始深入研究 Scala 语言及环境所提供的各种并发特性和库

Herb Sutter 在他的文章 The Free Lunch Is Over 中揭露了行业中最不可告人的一个小秘密他明确论证了处理器在速度上的发展已经走到了尽头并且将由全新的单芯片上的并行 内核(虚拟 CPU)所取代这一发现对编程社区造成了不小的沖击因为正确创建线程安全的代码在理论而非实践中始终会提高高性能开发人员的身价而让各公司难以聘用他们看上去仅有少数人充分理解了 Java 的线程模型并发 API 以及 同步 的含义以便能够编写同时提供安全性和吞吐量的代码 —— 并且大多数人已经明白了它的困难所在

据推测行业的其余部分将自力更生这显然不是一个理想的结局至少不是 IT 部门努力开发软件所应得的回报

与 Scala 在 NET 领域中的姐妹语言 F# 相似Scala 是针对 并发性问题 的解决方案之一在本期文章中我讨论了 Scala 的一些属性这些属性使它更加胜任于编写线程安全的代码比如默认不可修改的对象并讨论了一种返回对象副本而不是修改它们内容的首选设计方案Scala 对并发性的支持远比此深远现在我们有必要来了解一下 Scala 的各种库

关于本系列

关于本系列 Ted Neward 潜心研究 Scala 编程语言并带您跟他一起徜徉在这个新的 developerWorks 系列 中您将深入了解 Scala 并看到 Scala 的语言功能的实际效果在进行相关比较时Scala 代码和 Java 代码将放在一起展示但(您将发现)Scala 中的许多内容与您在 Java 编程中发现的任何内容都没有直接关联而这正是 Scala 的魅力所在!毕竟如果 Java 代码可以做到的话又何必学习 Scala 呢?

并发性基础

在深入研究 Scala 的并发性支持之前有必要确保您具备了对 Java 基本并发性模型的良好理解因为 Scala 的并发性支持从某种程度上说建立在 JVM 和支持库所提供的特性和功能的基础之上为此清单 中的代码包含了一个已知的 Producer/Consumer 并发性问题(详见 Sun Java Tutorial 的 Guarded Blocks 小节)注意Java Tutorial 版本并未在其解决方案中使用 ncurrent 类而是择优使用了 javalangObject 中的较旧的 wait()/notifyAll() 方法

清单 Producer/Consumer(Java 之前)

package comtednewardscalaexamplesnotj;

class Producer implements Runnable

{

private Drop drop;

private String importantInfo[] = {

Mares eat oats

Does eat oats

Little lambs eat ivy

A kid will eat ivy too

};

public Producer(Drop drop) { thisdrop = drop; }

public void run()

{

for (int i = ; i < importantInfolength; i++)

{

dropput(importantInfo[i]);

}

dropput(DONE);

}

}

class Consumer implements Runnable

{

private Drop drop;

public Consumer(Drop drop) { thisdrop = drop; }

public void run()

{

for (String message = droptake(); !messageequals(DONE);

message = droptake())

{

Systemoutformat(MESSAGE RECEIVED: %s%n message);

}

}

}

class Drop

{

//Message sent from producer to consumer

private String message;

//True if consumer should wait for producer to send message

//false if producer should wait for consumer to retrieve message

private boolean empty = true;

//Object to use to synchronize against so as to not leak the

//this monitor

private Object lock = new Object();

public String take()

{

synchronized(lock)

{

//Wait until message is available

while (empty)

{

try

{

lockwait();

}

catch (InterruptedException e) {}

}

//Toggle status

empty = true;

//Notify producer that status has changed

locknotifyAll();

return message;

}

}

public void put(String message)

{

synchronized(lock)

{

//Wait until message has been retrieved

while (!empty)

{

try

{

lockwait();

} catch (InterruptedException e) {}

}

//Toggle status

empty = false;

//Store message

ssage = message;

//Notify consumer that status has changed

locknotifyAll();

}

}

}

public class ProdConSample

{

public static void main(String[] args)

{

Drop drop = new Drop();

(new Thread(new Producer(drop)))start();

(new Thread(new Consumer(drop)))start();

}

}

注意 我在此处展示的代码对 Sun 教程解决方案做了少许修改它们提供的代码存在一个很小的设计缺陷(参见 Java 教程 缺陷

Java 教程 缺陷

好奇的读者可能会将此处的代码与 Java Tutorial 中的代码进行比较寻找它们之间有哪些不同他们会发现我并未 同步 puttake 方法而是使用了存储在 Drop 中的 lock 对象其原因非常简单对象的监测程序永远都不会封装在类的内部因此 Java Tutorial 版本允许此代码打破此规则(显然很疯狂)

public class ProdConSample{  public static void main(String[] args)  {    Drop drop = new Drop();    (new Thread(new Producer(drop)))start();    (new Thread(new Consumer(drop)))start();synchronized(drop){  Threadsleep( * * * * ); // sleep for years?!?}  }}

通过使用私有对象作为锁定所依托的监测程序此代码将不会有任何效果从本质上说现在已经封装了线程安全的实现然后它才能依赖客户机的优势正常运行

Producer/Consumer 问题的核心非常容易理解一个(或多个)生产者实体希望将数据提供给一个(或多个)使用者实体供它们使用和操作(在本例中它包括将数据打印到控制台)Producer 和 Consumer 类是相应直观的 Runnable实现类Producer 从数组中获取 String并通过 put 将它们放置到 Consumer 的缓沖区中并根据需要执行 take

问题的难点在于如果 Producer 运行过快则数据在覆盖时可能会丢失如果 Consumer 运行过快则当 Consumer 读取相同的数据两次时数据可能会得到重复处理缓沖区(在 Java Tutorial 代码中称作 Drop)将确保不会出现这两种情况数据破坏的可能性就更不用提了(在 String 引用的例子中很困难但仍然值得注意)因为数据会由 put 放入缓沖区并由 take 取出

关于此主题的全面讨论请阅读 Brian Goetz 的 Java Concurrency in Practice 或 Doug Lea 的 Concurrent Programming in Java(参见 参考资料)但是在应用 Scala 之前有必要快速了解一下此代码的运行原理

当 Java 编译器看到 synchronized 关键字时它会在同步块的位置生成一个 try/finally 块其顶部包括一个 monitorenter 操作码并且 finally 块中包括一个 monitorexit 操作码以确保监控程序(Java 的原子性基础)已经发布而与代码退出的方式无关因此Drop 中的 put 代码将被重写如清单 所示

清单 编译器失效后的 Dropput

// This is pseudocode

public void put(String message)

{

try

{

monitorenter(lock)

//Wait until message has been retrieved

while (!empty)

{

try

{

lockwait();

} catch (InterruptedException e) {}

}

//Toggle status

empty = false;

//Store message

ssage = message;

//Notify consumer that status has changed

locknotifyAll();

}

finally

{

monitorexit(lock)

}

}

wait() 方法将通知当前线程进入非活动状态并等待另一个线对该对象调用 notifyAll()然后通知的线程必须在能够继续执行的时候尝试再次获取监控程序从本质上说wait() 和 notify()/notifyAll() 允许一种简单的信令机制它允许 Drop 在 Producer 和 Consumer 线程之间进行协调每个 put 都有相应的 take

本文的 代码下载 部分使用 Java 并发性增强(Lock 和 Condition 接口以及 ReentrantLock 锁定实现)提供 清单 的基于超时的版本但基本代码模式仍然相同这就是问题所在编写清单 这样的代码的开发人员需要过度专注于线程和锁定的细节以及低级实现代码以便让它们能够正确运行此外开发人员需要对每一行代码刨根知底以确定是否需要保护它们因为过度同步与过少同步同样有害

现在我们来看到 Scala 替代方案

良好的 Scala 并发性 (v

开始应用 Scala 并发性的一种方法是将 Java 代码直接转换为 Scala以便利用 Scala 的语法优势来简化代码(至少能简化一点)

清单 ProdConSample (Scala)

object ProdConSample

{

class Producer(drop : Drop)

extends Runnable

{

val importantInfo : Array[String] = Array(

Mares eat oats

Does eat oats

Little lambs eat ivy

A kid will eat ivy too

);

override def run() : Unit =

{

importantInfoforeach((msg) => dropput(msg))

dropput(DONE)

}

}

class Consumer(drop : Drop)

extends Runnable

{

override def run() : Unit =

{

var message = droptake()

while (message != DONE)

{

Systemoutformat(MESSAGE RECEIVED: %s%n message)

message = droptake()

}

}

}

class Drop

{

var message : String =

var empty : Boolean = true

var lock : AnyRef = new Object()

def put(x: String) : Unit =

locksynchronized

{

// Wait until message has been retrieved

await (empty == true)

// Toggle status

empty = false

// Store message

message = x

// Notify consumer that status has changed

locknotifyAll()

}

def take() : String =

locksynchronized

{

// Wait until message is available

await (empty == false)

// Toggle status

empty=true

// Notify producer that staus has changed

locknotifyAll()

// Return the message

message

}

private def await(cond: => Boolean) =

while (!cond) { lockwait() }

}

def main(args : Array[String]) : Unit =

{

// Create Drop

val drop = new Drop();

// Spawn Producer

new Thread(new Producer(drop))start();

// Spawn Consumer

new Thread(new Consumer(drop))start();

}

}

Producer 和 Consumer 类几乎与它们的 Java 同类相同再一次扩展(实现)了 Runnable 接口并覆盖了 run() 方法并且 — 对于 Producer 的情况 — 分别使用了内置迭代方法来遍历 importantInfo 数组的内容(实际上为了让它更像 ScalaimportantInfo 可能应该是一个 List 而不是 Array但在第一次尝试时我希望尽可能保证它们与原始 Java 代码一致

Drop 类同样类似于它的 Java 版本但 Scala 中有一些例外synchronized 并不是关键字它是针对 AnyRef 类定义的一个方法即 Scala 所有引用类型的根这意味着要同步某个特定的对象您只需要对该对象调用同步方法在本例中对 Drop 上的 lock 字段中所保存的对象调用同步方法

注意我们在 await() 方法定义的 Drop 类中还利用了一种 Scala 机制cond 参数是等待计算的代码块而不是在传递给该方法之前进行计算在 Scala 中这被称作 callbyname此处它是一种实用的方法可以捕获需要在 Java 版本中表示两次的条件等待逻辑(分别用于 put 和 take)

最后在 main() 中创建 Drop 实例实例化两个线程使用 start() 启动它们然后在 main() 的结束部分退出相信 JVM 会在 main() 结束之前启动这两个线程(在生产代码中可能无法保证这种情况但对于这样的简单的例子% 没有问题

但是已经说过仍然存在相同的基本问题程序员仍然需要过分担心两个线程之间的通信和协调问题虽然一些 Scala 机制可以简化语法但这目前为止并没有相当大的吸引力

Scala 并发性 v

Scala Library Reference 中有一个有趣的包ncurrency这个包包含许多不同的并发性结构包括我们即将利用的 MailBox 类

顾名思义MailBox 从本质上说就是 Drop用于在检测之前保存数据块的单槽缓沖区但是MailBox 最大的优势在于它将发送和接收数据的细节完全封装到模式匹配和 case 类中这使它比简单的 Drop(或 Drop 的多槽数据保存类 ncurrentBoundedBuffer)更加灵活

清单 ProdConSample v (Scala)

package comtednewardscalaexamplesscalaV

{

import concurrent{MailBox ops}

object ProdConSample

{

class Producer(drop : Drop)

extends Runnable

{

val importantInfo : Array[String] = Array(

Mares eat oats

Does eat oats

Little lambs eat ivy

A kid will eat ivy too

);

override def run() : Unit =

{

importantInfoforeach((msg) => dropput(msg))

dropput(DONE)

}

}

class Consumer(drop : Drop)

extends Runnable

{

override def run() : Unit =

{

var message = droptake()

while (message != DONE)

{

Systemoutformat(MESSAGE RECEIVED: %s%n message)

message = droptake()

}

}

}

class Drop

{

private val m = new MailBox()

private case class Empty()

private case class Full(x : String)

m send Empty() // initialization

def put(msg : String) : Unit =

{

m receive

{

case Empty() =>

m send Full(msg)

}

}

def take() : String =

{

m receive

{

case Full(msg) =>

m send Empty(); msg

}

}

}

def main(args : Array[String]) : Unit =

{

// Create Drop

val drop = new Drop()

// Spawn Producer

new Thread(new Producer(drop))start();

// Spawn Consumer

new Thread(new Consumer(drop))start();

}

}

}

此处v 和 v 之间的惟一区别在于 Drop 的实现它现在利用 MailBox 类处理传入以及从 Drop 中删除的消息的阻塞和信号事务(我们可以重写 Producer 和 Consumer让它们直接使用 MailBox但考虑到简单性我们假定希望保持所有示例中的 Drop API 相一致)使用 MailBox 与使用典型的 BoundedBuffer(Drop)稍有不同因此我们来仔细看看其代码

MailBox 有两个基本操作send 和 receivereceiveWithin 方法仅仅是基于超时的 receiveMailBox 接收任何类型的消息send() 方法将消息放置到邮箱中并立即通知任何关心该类型消息的等待接收者并将它附加到一个消息链表中以便稍后检索receive() 方法将阻塞直到接收到对于功能块合适的消息

因此在这种情况下我们将创建两个 case 类一个不包含任何内容(Empty)这表示 MailBox 为空另一个包含消息数据(Full

put 方法由于它会将数据放置在 Drop 中对 MailBox 调用 receive() 以查找 Empty 实例因此会阻塞直到发送 Empty此时它发送一个 Full 实例给包含新数据的 MailBox

take 方法由于它会从 Drop 中删除数据对 MailBox 调用 receive() 以查找 Full 实例提取消息(再次得益于模式匹配从 case 类内部提取值并将它们绑到本地变量的能力)并发送一个 Empty 实例给 MailBox不需要明确的锁定并且不需要考虑监控程序

Scala 并发性 v

事实上我们可以显着缩短代码只要 Producer 和 Consumer 不需要功能全面的类(此处便是如此) — 两者从本质上说都是 Runnablerun() 方法的瘦包装器Scala 可以使用 ncurrentops 对象的 spawn 方法来实现如清单 所示

清单 ProdConSample v (Scala)

package comtednewardscalaexamplesscalaV

{

import concurrentMailBox

import concurrentops_

object ProdConSample

{

class Drop

{

private val m = new MailBox()

private case class Empty()

private case class Full(x : String)

m send Empty() // initialization

def put(msg : String) : Unit =

{

m receive

{

case Empty() =>

m send Full(msg)

}

}

def take() : String =

{

m receive

{

case Full(msg) =>

m send Empty(); msg

}

}

}

def main(args : Array[String]) : Unit =

{

// Create Drop

val drop = new Drop()

// Spawn Producer

spawn

{

val importantInfo : Array[String] = Array(

Mares eat oats

Does eat oats

Little lambs eat ivy

A kid will eat ivy too

);

importantInfoforeach((msg) => dropput(msg))

dropput(DONE)

}

// Spawn Consumer

spawn

{

var message = droptake()

while (message != DONE)

{

Systemoutformat(MESSAGE RECEIVED: %s%n message)

message = droptake()

}

}

}

}

}

spawn 方法(通过包块顶部的 ops 对象导入)接收一个代码块(另一个 byname 参数示例)并将它包装在匿名构造的线程对象的 run() 方法内部事实上并不难理解 spawn 的定义在 ops 类的内部是什么样的

清单 ncurrentopsspawn()

def spawn(p: => Unit) = {

val t = new Thread() { override def run() = p }

tstart()

}

……这再一次强调了 byname 参数的强大之处

opsspawn 方法的一个缺点在于它是在 年 Java concurrency 类还不可用的时候编写的特别是ncurrentExecutor 及其同类的作用是让开发人员更加轻松地生成线程而不需要实际处理直接创建线程对象的细节幸运的是在您自己的自定义库中重新创建 spawn 的定义是相当简单的这需要利用 Executor(或 ExecutorService 或 ScheduledExecutorService)来执行线程的实际启动任务

事实上Scala 的并发性支持超越了 MailBox 和 ops 类Scala 还支持一个类似的 Actors 概念它使用了与 MailBox 所采用的方法相类似的消息传递方法但应用更加全面并且灵活性也更好但是这部分内容将在下期讨论

结束语

Scala 为并发性提供了两种级别的支持这与其他与 Java 相关的主题极为类似

首先对底层库的完全访问(比如说 ncurrent)以及对 传统 Java 并发性语义的支持(比如说监控程序和 wait()/notifyAll())

其次这些基本机制上面有一个抽象层详见本文所讨论的 MailBox 类以及将在本系列下一篇文章中讨论的 Actors 库

两个例子中的目标是相同的让开发人员能够更加轻松地专注于问题的实质而不用考虑并发编程的低级细节(显然第二种方法更好地实现了这一目标至少对于没有过多考虑低级细节的人来说是这样的

但是当前 Scala 库的一个明显的缺陷就是缺乏 Java 支持ncurrentops 类应该具有 spawn 这样的利用新的 Executor 接口的方法它还应该支持利用新的 Lock 接口的各种版本的 synchronized幸运的是这些都是可以在 Scala 生命周期中实现的库增强而不会破坏已有代码它们甚至可以由 Scala 开发人员自己完成而不需要等待 Scala 的核心开发团队提供给他们(只需要花费少量时间)

描述名字大小下载方法本文的示例 Scala 代码

上一篇:面向Java开发人员的Scala指南: 深入了解Scala并发性 了解 actor 如何提供新的应

下一篇:Java与C++