java

位置:IT落伍者 >> java >> 浏览文章

用java多线程断点续传实践


发布日期:2022年04月04日
 
用java多线程断点续传实践

/**

* authorannegu

* date

*/

annegu做了一个简单的Http多线程的下载程序来讨论一下多线程并发下载以及断点续传的问题

这个程序的功能就是可以分多个线程从目标地址上下载数据每个线程负责下载一部分并可以支持断点续传和超时重连

下载的方法是download()它接收两个参数分别是要下载的页面的url和编码方式在这个负责下载的方法中主要分了三个步骤第一步是用来设置断点续传时候的一些信息的第二步就是主要的分多线程来下载了最后是数据的合并

多线程下载

/**  

*/

public String download(String urlStr String charset) {

thischarset = charset;

long contentLength = ;

CountDownLatch latch = new CountDownLatch(threadNum);

long[] startPos = new long[threadNum];

long endPos = ;

try {

// 从url中获得下载的文件格式与名字

thisfileName = urlStrsubstring(urlStrlastIndexOf(/) + );

thisurl = new URL(urlStr);

URLConnection con = urlopenConnection();

setHeader(con);

// 得到content的长度

contentLength = congetContentLength();

// 把context分为threadNum段的话每段的长度

thisthreadLength = contentLength / threadNum;

// 第一步分析已下载的临时文件设置断点如果是新的下载任务则建立目标文件在第点中说明

startPos = setThreadBreakpoint(fileDir fileName contentLength startPos);

//第二步分多个线程下载文件

ExecutorService exec = ExecutorsnewCachedThreadPool();

for (int i = ; i < threadNum; i++) {

// 创建子线程来负责下载数据每段数据的起始位置为(threadLength * i + 已下载长度)

startPos[i] += threadLength * i;

/**//*设置子线程的终止位置非最后一个线程即为(threadLength * (i + ) )

最后一个线程的终止位置即为下载内容的长度*/

if (i == threadNum ) {

endPos = contentLength;

} else {

endPos = threadLength * (i + ) ;

}

// 开启子线程并执行

ChildThread thread = new ChildThread(this latch i startPos[i] endPos);

childThreads[i] = thread;

execexecute(thread);

}

try {

// 等待CountdownLatch信号为表示所有子线程都结束

latchawait();

execshutdown();

// 第三步把分段下载下来的临时文件中的内容写入目标文件中在第点中说明

tempFileToTargetFile(childThreads);

} catch (InterruptedException e) {

eprintStackTrace();

}

}

首先来看最主要的步骤多线程下载

首先从url中提取目标文件的名称并在对应的目录创建文件然后取得要下载的文件大小根据分成的下载线程数量平均分配每个线程需要下载的数据量就是threadLength然后就可以分多个线程来进行下载任务了

在这个例子中并没有直接显示的创建Thread对象而是用Executor来管理Thread对象并且用CachedThreadPool来创建的线程池当然也可以用FixedThreadPoolCachedThreadPool在程序执行的过程中会创建与所需数量相同的线程当程序回收旧线程的时候就停止创建新线程FixedThreadPool可以预先新建参数给定个数的线程这样就不用在创建任务的时候再来创建线程了可以直接从线程池中取出已准备好的线程下载线程的数量是通过一个全局变量threadNum来控制的默认为

好了个子线程已经通过Executor来创建了下面它们就会各自为政互不干涉的执行了线程有两种实现方式实现Runnable接口继承Thread类

ChildThread就是子线程它作为DownloadTask的内部类继承了Thread它的构造方法需要个参数依次是一个对DownloadTask的引用一个CountDownLatchid(标识线程的id号)startPosition(下载内容的开始位置)endPosition(下载内容的结束位置)

这个CountDownLatch是做什么用的呢?

现在我们整理一下思路要实现分多个线程来下载数据的话我们肯定还要把这多个线程下载下来的数据进行合主线程必须等待所有的子线程都执行结束之后才能把所有子线程的下载数据按照各自的id顺序进行合并CountDownLatch就是来做这个工作的

CountDownLatch用来同步主线程强制主线程等待所有的子线程执行的下载操作完成在主线程中CountDownLatch对象被设置了一个初始计数器就是子线程的个数代码①处在新建了个子线程并开始执行之后主线程用CountDownLatch的await()方法来阻塞主线程直到这个计数器的值到达才会进行下面的操作代码②处

对每个子线程来说在执行完下载指定区间与长度的数据之后必须通过调用CountDownLatch的countDown()方法来把这个计数器减

在全面开启下载任务之后主线程就开始阻塞等待子线程执行完毕所以下面我们来看一下具体的下载线程ChildThread

/**

*author by  

*/

public class ChildThread extends Thread {

public static final int STATUS_HASNOT_FINISHED = ;

public static final int STATUS_HAS_FINISHED = ;

public static final int STATUS_HTTPSTATUS_ERROR = ;

private DownloadTask task;

private int id;

private long startPosition;

private long endPosition;

private final CountDownLatch latch;

private File tempFile = null;

//线程状态码

private int status = ChildThreadSTATUS_HASNOT_FINISHED;

public ChildThread(DownloadTask task CountDownLatch latch int id long startPos long endPos) {

super();

thistask = task;

thisid = id;

thisstartPosition = startPos;

thisendPosition = endPos;

thislatch = latch;

try {

tempFile = new File(thistaskfileDir + thistaskfileName + _ + id);

if(!tempFileexists()){

tempFilecreateNewFile();

}

} catch (IOException e) {

eprintStackTrace();

}

}

public void run() {

Systemoutprintln(Thread + id + run );

HttpURLConnection con = null;

InputStream inputStream = null;

BufferedOutputStream outputStream = null;

int count = ;

long threadDownloadLength = endPosition startPosition;

try {

outputStream = new BufferedOutputStream(new FileOutputStream(tempFilegetPath() true));

} catch (FileNotFoundException e) {

eprintStackTrace();

}

③ for(;;){

④ startPosition += count;

try {

//打开URLConnection

con = (HttpURLConnection) taskurlopenConnection();

setHeader(con);

consetAllowUserInteraction(true);

//设置连接超时时间为ms

⑤ consetConnectTimeout();

//设置读取数据超时时间为ms

consetReadTimeout();

if(startPosition < endPosition){

//设置下载数据的起止区间

consetRequestProperty(Range bytes= + startPosition +

+ endPosition);

Systemoutprintln(Thread + id + startPosition is + startPosition);

Systemoutprintln(Thread + id + endPosition is + endPosition);

//判断http status是否为HTTP/ Partial Content或者 OK

//如果不是以上两种状态把status改为STATUS_HTTPSTATUS_ERROR

⑥ if (congetResponseCode() != HttpURLConnectionHTTP_OK

&& congetResponseCode() != HttpURLConnectionHTTP_PARTIAL) {

Systemoutprintln(Thread + id + : code =

+ congetResponseCode() + status =

+ congetResponseMessage());

status = ChildThreadSTATUS_HTTPSTATUS_ERROR;

thistaskstatusError = true;

outputStreamclose();

condisconnect();

Systemoutprintln(Thread + id + finished);

untDown();

break;

}

inputStream = congetInputStream();

int len = ;

byte[] b = new byte[];

while ((len = inputStreamread(b)) != ) {

outputStreamwrite(b len);

count += len;

//每读满个byte往磁盘上flush一下

if(count % == ){

⑦ outputStreamflush();

}

}

Systemoutprintln(count is + count);

if(count >= threadDownloadLength){

hasFinished = true;

}

⑧ outputStreamflush();

outputStreamclose();

inputStreamclose();

condisconnect();

}

Systemoutprintln(Thread + id + finished);

untDown();

break;

} catch (IOException e) {

try {

⑨ outputStreamflush();

⑩ TimeUnitSECONDSsleep(getSleepSeconds());

} catch (InterruptedException e) {

eprintStackTrace();

} catch (IOException e) {

eprintStackTrace();

}

continue;

}

}

}

}

在ChildThread的构造方法中除了设置一些从主线程中带来的id 起始位置之外就是新建了一个临时文件用来存放当前线程的下载数据临时文件的命名规则是这样的下载的目标文件名+_+线程编号

现在让我们来看看从网络中读数据是怎么读的我们通过URLConnection来获得一个http的连接有些网站为了安全起见会对请求的http连接进行过滤因此为了伪装这个http的连接请求我们给httpHeader穿一件伪装服下面的setHeader方法展示了一些非常常用的典型的httpHeader的伪装方法比较重要的有UerAgent模拟从Ubuntu的firefox浏览器发出的请求Referer模拟浏览器请求的前一个触发页面例如从skycn站点来下载软件的话Referer设置成skycn的首页域名就可以了Range就是这个连接获取的流文件的起始区间

private void setHeader(URLConnection con) {

consetRequestProperty(UserAgent Mozilla/ (X; U; Linux i; enUS; rv:) Gecko/ Ubuntu/ (hardy) Firefox/);

consetRequestProperty(AcceptLanguage enusen;q=zhcn;q=);

consetRequestProperty(AcceptEncoding aa);

consetRequestProperty(AcceptCharset ISOutf;q=*;q=);

consetRequestProperty(KeepAlive );

consetRequestProperty(Connection keepalive);

consetRequestProperty(IfModifiedSince Fri Jan :: GMT);

consetRequestProperty(IfNoneMatch \ddfd\);

consetRequestProperty(CacheControl maxage=);

consetRequestProperty(Referer );

}

另外为了避免线程因为网络原因而阻塞设置了ConnectTimeout和ReadTimeout代码⑤⑥处setConnectTimeout设置的连接的超时时间而setReadTimeout设置的是读取数据的超时时间发生超时的话就会抛出socketTimeout异常两个方法的参数都是超时的毫秒数

这里对超时的发生采用的是等候一段时间重新连接的方法整个获取网络连接并读取下载数据的过程都包含在一个循环之中(代码③处)如果发生了连接或者读取数据的超时在抛出的异常里面就会sleep一定的时间(代码⑩处)然后continue再次尝试获取连接并读取数据这个时间可以通过setSleepSeconds()方法来设置我们在迅雷等下载工具的使用中经常可以看到状态栏会输出类似连接超时等待*秒后重试的话这个就是通过ConnectTimeoutReadTimeout来实现的

连接建立好之后我们要检查一下返回响应的状态码常见的Http Response Code有以下几种

a) OK 一切正常对GET和POST请求的应答文档跟在后面

b) Partial Content 客户发送了一个带有Range头的GET请求服务器完成

c) Not Found 无法找到指定位置的资源这也是一个常用的应答

d) Request URI Too Long URI太长

e) Requested Range Not Satisfiable 服务器不能满足客户在请求中指定的Range头

f) Internal Server Error 服务器遇到了意料不到的情况不能完成客户的请求

g) Service Unavailable 服务器由于维护或者负载过重未能应答例如Servlet可能在数据库连接池已满的情况下返回

在这些状态里面只有才是我们需要的正确的状态所以在代码⑥处进行了状态码的判断如果返回不符合要求的状态码则结束线程返回主线程并提示报错

假设一切正常下面我们就要考虑从网络中读数据了正如我之前在分析mysql的数据库驱动中看的一样网络中发送数据都是以数据包的形式来发送的也就是说不管是客户端向服务器发出的请求数据还是从服务器返回给客户端的响应数据都会被拆分成若干个小型数据包在网络中传递等数据包到达了目的地网络接口会依据数据包的编号来组装它们成为完整的比特数据因此我们可以想到在这里也是一样的我们用inputStream的read方法来通过网卡从网络中读取数据并不一定一次就能把所有的数据包都读完所以我们要不断的循环来从inputStream中读取数据Read方法有一个int型的返回值表示每次从inputStream中读取的字节数如果把这个inputStream中的数据读完了那么就返回 Read方法最多可以有三个参数byte b[]是读取数据之后存放的目标数组off标识了目标数组中存储的开始位置len是想要读取的数据长度这个长度必定不能大于b[]的长度

public synchronized int read(byte b[] int off int len)

我们的目标是要把目标地址的内容下载下来现在分了个线程来分段下载那么这些分段下载的数据保存在哪里呢?如果把它们都保存在内存中是非常糟糕的做法如果文件相当之大例如是一个视频的话难道把这么大的数据都放在内存中吗这样的话万一连接中断那前面下载的东西就都没有了?我们当然要想办法及时的把下载的数据刷到磁盘上保存下来当用bt下载视频的时候通常都会有个临时文件当视频完全下载结束之后这个临时文件就会被删除那么下次继续下载的时候就会接着上次下载的点继续下载所以我们的outputStream就是往这个临时文件来输出了

OutputStream的write方法和上面InputStream的read方法有类似的参数byte b[]是输出数据的来源off标识了开始位置len是数据长度

public synchronized void write(byte b[] int off int len) throws IOException在往临时文件的outputStream中写数据的时候我会加上一个计数器每满个比特就往文件中flush一下(代码⑦处)

对于输出流的flush有些要注意的地方在程序中有三个地方调用了outputStreamflush()第一个是在循环的读取网络数据并往outputStream中写入的时候每满个byte就flush一下(代码⑦处)第二个是循环之后(代码⑧处)这时候正常的读取写入操作已经完成但是outputStream中还有没有刷入磁盘的数据所以要flush一下才能关闭连接第三个就是在异常中的flush(代码⑨处)因为如果发生了连接超时或者读取数据超时的话就会直接跑到catch的exception中去这个时候outputStream中的数据如果不flush的话重新连接的时候这部分数据就会丢失了另外当抛出异常重新连接的时候下载的起始位置也要重新设置(代码④处)count就是用来标识已经下载的字节数的把count+startPosition就是新一次连接需要的下载起始位置了

现在每个分段的下载线程都顺利结束了也都创建了相应的临时文件接下来在主线程中会对临时文件进行合并并写入目标文件最后删除临时文件这部分很简单就是一个对所有下载线程进行遍历的过程这里outputStream也有两次flush与上面类似不再赘述

/**author by   */

private void tempFileToTargetFile(ChildThread[] childThreads) {

try {

BufferedOutputStream outputStream = new BufferedOutputStream(

new FileOutputStream(fileDir + fileName));

// 遍历所有子线程创建的临时文件按顺序把下载内容写入目标文件中

for (int i = ; i < threadNum; i++) {

if (statusError) {

for (int k = ; k < threadNum; k++) {

if (childThreads[k]tempFilelength() == )

childThreads[k]tempFiledelete();

}

Systemoutprintln(本次下载任务不成功请重新设置线程数);

break;

}

BufferedInputStream inputStream = new BufferedInputStream(

new FileInputStream(childThreads[i]tempFile));

Systemoutprintln(Now is file + childThreads[i]id);

int len = ;

int count = ;

byte[] b = new byte[];

while ((len = inputStreamread(b)) != ) {

count += len;

outputStreamwrite(b len);

if ((count % ) == ) {

outputStreamflush();

}

// b = new byte[];

}

inputStreamclose();

// 删除临时文件

if (childThreads[i]status == ChildThreadSTATUS_HAS_FINISHED) {

childThreads[i]tempFiledelete();

}

}

outputStreamflush();

outputStreamclose();

} catch (FileNotFoundException e) {

eprintStackTrace();

} catch (IOException e) {

eprintStackTrace();

}

}

最后说说断点续传前面为了实现断点续传在每个下载线程中都创建了一个临时文件现在我们就要利用这个临时文件来设置断点的位置由于临时文件的命名方式都是固定的所以我们就专门找对应下载的目标文件的临时文件临时文件中已经下载的字节数就是我们需要的断点位置startPos是一个数组存放了每个线程的已下载的字节数

//第一步分析已下载的临时文件设置断点如果是新的下载任务则建立目标文件

private long[] setThreadBreakpoint(String fileDir String fileName

long contentLength long[] startPos) {

File file = new File(fileDir + fileName);

long localFileSize = filelength();

if (fileexists()) {

Systemoutprintln(file + fileName + has exists!);

// 下载的目标文件已存在判断目标文件是否完整

if (localFileSize < contentLength) {

Systemoutprintln(Now download continue );

// 遍历目标文件的所有临时文件设置断点的位置即每个临时文件的长度

File tempFileDir = new File(fileDir);

File[] files = tempFileDirlistFiles();

for (int k = ; k < fileslength; k++) {

String tempFileName = files[k]getName();

// 临时文件的命名方式为目标文件名+_+编号

if (tempFileName != null && files[k]length() >

&& tempFileNamestartsWith(fileName + _)) {

int fileLongNum = IntegerparseInt(tempFileName

substring(tempFileNamelastIndexOf(_) +

tempFileNamelastIndexOf(_) + ));

// 为每个线程设置已下载的位置

startPos[fileLongNum] = files[k]length();

}

}

}

} else {

// 如果下载的目标文件不存在则创建新文件

try {

filecreateNewFile();

} catch (IOException e) {

eprintStackTrace();

}

}

return startPos;

}

测试

public class DownloadStartup {

private static final String encoding = utf;

public static void main(String[] args) {

DownloadTask downloadManager = new DownloadTask();

String urlStr = toolszip;

downloadManagersetSleepSeconds();

downloadManagerdownload(urlStr encoding);

}

}

               

上一篇:Java Thread类如何在子类中实现run

下一篇:Java核心 Java中多态的实现机制