java

位置:IT落伍者 >> java >> 浏览文章

使用Java多线程实现任务分发


发布日期:2021年02月22日
 
使用Java多线程实现任务分发

多线程下载由来已久如 FlashGetNetAnts 等工具它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围)首先能读取出请求内容 (即欲下载的文件) 的大小划分出若干区块把区块分段分发给每个线程去下载线程从本段起始处下载数据及至段尾多个线程下载的内容最终会写入到同一个文件中

只研究有用的工作中的需求要把多个任务分派给Java的多个线程去执行这其中就会有一个任务列表指派到线程的策略思考已知 一个待执行的任务列表 指定要启动的线程数问题是每个线程实际要执行哪些任务

使用Java多线程实现这种任务分发的策略是任务列表连续按线程数分段先保证每线程平均能分配到的任务数余下的任务从前至后依次附加到线程中——只是数量上实际每个线程执行的任务都还是连续的如果出现那种僧多(线程) 粥(任务) 少的情况实际启动的线程数就等于任务数一挑一这里只实现了每个线程各扫自家门前雪动作快的完成后眼见别的线程再累都是爱莫能助

实现及演示代码如下由三个类实现写在了一个 Java 文件中TaskDistributor 为任务分发器Task 为待执行的任务WorkThread 为自定的工作线程代码中运用了命令模式如若能配以监听器用上观察者模式来控制 UI 显示就更绝妙不过了就能实现像下载中的区块着色跳跃的动感了在此定义下一步的着眼点了

代码中有较为详细的注释看这些注释和执行结果就很容易理解的main() 是测试方法

package mon;

import javautilArrayList;

import javautilList;

/**

* 指派任务列表给线程的分发器

* @author Unmi

* QQ: Email:

* MSN:  

*/

public class TaskDistributor {

/**

* 测试方法

* @param args

*/

public static void main(String[] args) {

//初始化要执行的任务列表

List taskList = new ArrayList();

for (int i = ; i < ; i++) {

taskListadd(new Task(i));

}

//设定要启动的工作线程数为

int threadCount = ;

List[] taskListPerThread = distributeTasks(taskList threadCount);

Systemoutprintln(实际要启动的工作线程数+taskListPerThreadlength);

for (int i = ; i < taskListPerThreadlength; i++) {

Thread workThread = new WorkThread(taskListPerThread[i]i);

workThreadstart();

}

}

/**

* 把 List 中的任务分配给每个线程先平均分配剩于的依次附加给前面的线程

* 返回的数组有多少个元素 (List) 就表明将启动多少个工作线程

* @param taskList 待分派的任务列表

* @param threadCount 线程数

* @return 列表的数组每个元素中存有该线程要执行的任务列表

*/

public static List[] distributeTasks(List taskList int threadCount) {

// 每个线程至少要执行的任务数假如不为零则表示每个线程都会分配到任务

int minTaskCount = taskListsize() / threadCount;

// 平均分配后还剩下的任务数不为零则还有任务依个附加到前面的线程中

int remainTaskCount = taskListsize() % threadCount;

// 实际要启动的线程数如果工作线程比任务还多

// 自然只需要启动与任务相同个数的工作线程一对一的执行

// 毕竟不打算实现了线程池所以用不着预先初始化好休眠的线程

int actualThreadCount = minTaskCount > ? threadCount : remainTaskCount;

// 要启动的线程数组以及每个线程要执行的任务列表

List[] taskListPerThread = new List[actualThreadCount];

int taskIndex = ;

//平均分配后多余任务每附加给一个线程后的剩余数重新声明与 remainTaskCount

//相同的变量不然会在执行中改变 remainTaskCount 原有值产生麻烦

int remainIndces = remainTaskCount;

for (int i = ; i < taskListPerThreadlength; i++) {

taskListPerThread[i] = new ArrayList();

// 如果大于零线程要分配到基本的任务

if (minTaskCount > ) {

for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {

taskListPerThread[i]add(taskListget(j));

}

taskIndex += minTaskCount;

}

// 假如还有剩下的则补一个到这个线程中

if (remainIndces > ) {

taskListPerThread[i]add(taskListget(taskIndex++));

remainIndces;

}

}

// 打印任务的分配情况

for (int i = ; i < taskListPerThreadlength; i++) {

Systemoutprintln(线程 + i + 的任务数 +

taskListPerThread[i]size() + 区间[

+ taskListPerThread[i]get()getTaskId() +

+ taskListPerThread[i]get(taskListPerThread[i]size() )getTaskId() + ]);

}

return taskListPerThread;

}

}

/**

* 要执行的任务可在执行时改变它的某个状态或调用它的某个操作

* 例如任务有三个状态就绪运行完成默认为就绪态

* 要进一步完善可为 Task 加上状态变迁的监听器因之决定UI的显示

*/

class Task {

public static final int READY = ;

public static final int RUNNING = ;

public static final int FINISHED = ;

private int status;

//声明一个任务的自有业务含义的变量用于标识任务

private int taskId;

//任务的初始化方法

public Task(int taskId){

thisstatus = READY;

thistaskId = taskId;

}

/**

* 执行任务

*/

public void execute() {

// 设置状态为运行中

setStatus(TaskRUNNING);

Systemoutprintln(当前线程 ID 是 + ThreadcurrentThread()getName()

+ | 任务 ID 是+thistaskId);

// 附加一个延时

try {

Threadsleep();

} catch (InterruptedException e) {

eprintStackTrace();

}

// 执行完成改状态为完成

setStatus(FINISHED);

}

public void setStatus(int status) {

thisstatus = status;

}

public int getTaskId() {

return taskId;

}

}

/**

* 自定义的工作线程持有分派给它执行的任务列表

*/

class WorkThread extends Thread {

//本线程待执行的任务列表你也可以指为任务索引的起始值

private List taskList = null;

private int threadId;

/**

* 构造工作线程为其指派任务列表及命名线程 ID

* @param taskList 欲执行的任务列表

* @param threadId 线程 ID

*/

public WorkThread(List taskListint threadId) {

thistaskList = taskList;

thisthreadId = threadId;

}

/**

* 执行被指派的所有任务

*/

public void run() {

for (Task task : taskList) {

taskexecute();

}

}

}

执行结果如下注意观察每个Java多线程分配到的任务数量及区间直到所有的线程完成了所分配到的任务后程序结束

线程 的任务数 区间[]

线程 的任务数 区间[]

线程 的任务数 区间[]

线程 的任务数 区间[]

线程 的任务数 区间[]

实际要启动的工作线程数

当前线程 ID 是Thread | 任务 ID 是

当前线程 ID 是Thread | 任务 ID 是

当前线程 ID 是Thread | 任务 ID 是

当前线程 ID 是Thread | 任务 ID 是

当前线程 ID 是Thread | 任务 ID 是

当前线程 ID 是Thread | 任务 ID 是

当前线程 ID 是Thread | 任务 ID 是

当前线程 ID 是Thread | 任务 ID 是

上面坦白来只算是基本功夫贴出来还真见笑了还有更为复杂的功能

像Java多线程的下载工具的确更充分利用了网络资源而且像 FlashGetNetAnts 都实现了假如某个线程下载完了欲先所分配段的内容之后会帮其他线程下载未完成数据直到任务完成或某一下载线程的未完成段区间已经很小了用不着别人来帮忙时这就涉及到任务的进一步分配再如以上两个工具都能动态增加减小或中止线程越说越复杂了它们原本比这复杂多了这些实现可能定义各种队列来实现如未完成任务队列下载中任务队列和已完成队列等

               

上一篇:Java加密保护

下一篇:Java深入分析之:使用Factory Method模式