java

位置:IT落伍者 >> java >> 浏览文章

用Java多线程实现无阻塞读取远程文件


发布日期:2018年01月14日
 
用Java多线程实现无阻塞读取远程文件

我是不怎么赞同使用Java多线程下载的加之有的链接下载速度本身就比较快所以在下载速度足够的情况下就让下载线程退出直到只剩下一个下载线程当然多线程中令人头痛的死锁问题HttpURLConnection的超时阻塞问题都会使代码看起来异常复杂

简要介绍一下使用Java多线程实现无阻塞读取远程文件的方法将缓沖区buf[]分为每块K下载线程负责向缓沖区写数据每次写一块读线程(BuffRandAcceURL类)每次读小于K的任意字节同步描述写/写互斥等待空闲块写/写并发填写buf[]读/写并发使用buf[]

经过我很长一段时间使用我认为比较满意地实现了我的目标同其它MP播放器对比我的这种方法能够比较流畅稳定地下载并播放我把实现多线程下载缓沖的方法写出来不足之处恳请批评指正

HttpReader类功能HTTP协议从指定URL读取数据

/** *//**

* author by  

*/

package instream;

import javaioIOException;

import javaioInputStream;

import HttpURLConnection;

import URL;

public final class HttpReader {

public static final int MAX_RETRY = ;

private static long content_length;

private URL url;

private HttpURLConnection httpConnection;

private InputStream in_stream;

private long cur_pos; //用于决定seek方法中是否执行文件定位

private int connect_timeout;

private int read_timeout;

public HttpReader(URL u) {

this(u );

}

public HttpReader(URL u int connect_timeout int read_timeout) {

nnect_timeout = connect_timeout;

thisread_timeout = read_timeout;

url = u;

if (content_length == ) {

int retry = ;

while (retry < HttpReaderMAX_RETRY)

try {

thisseek();

content_length = ();

break;

} catch (Exception e) {

retry++;

}

}

}

public static long getContentLength() {

return content_length;

}

public int read(byte[] b int off int len) throws IOException {

int r = in_streamread(b off len);

cur_pos += r;

return r;

}

public int getData(byte[] b int off int len) throws IOException {

int r rema = len;

while (rema > ) {

if ((r = in_streamread(b off rema)) == ) {

return ;

}

rema = r;

off += r;

cur_pos += r;

}

return len;

}

public void close() {

if (httpConnection != null) {

();

httpConnection = null;

}

if (in_stream != null) {

try {

in_streamclose();

} catch (IOException e) {}

in_stream = null;

}

url = null;

}

/**//*

* 抛出异常通知再试

* 响应码可能是由某种暂时的原因引起的例如同一IP频繁的连接请求可能遭服务器拒绝

*/

public void seek(long start_pos) throws IOException {

if (start_pos == cur_pos && in_stream != null)

return;

if (httpConnection != null) {

();

httpConnection = null;

}

if (in_stream != null) {

in_streamclose();

in_stream = null;

}

httpConnection = (HttpURLConnection) urlopenConnection();

(connect_timeout);

(read_timeout);

String sProperty = bytes= + start_pos + ;

(Range sProperty);

//(Connection KeepAlive);

int responseCode = ();

if (responseCode < || responseCode >= ) {

try {

Threadsleep();

} catch (InterruptedException e) {

eprintStackTrace();

}

throw new IOException(HTTP responseCode=+responseCode);

}

in_stream = ();

cur_pos = start_pos;

}

}

IWriterCallBack接口功能实现读/写通信

package instream;

public interface IWriterCallBack {

public boolean tryWriting(Writer w) throws InterruptedException;

public void updateBuffer(int i int len);

public void updateWriterCount();

public void terminateWriters();

}

Writer类下载线程负责向buf[]写数据

/** *//**

*  

*/

package instream;

import javaioIOException;

import URL;

public final class Writer implements Runnable {

private static boolean isalive = true;

private byte[] buf;

private IWriterCallBack icb;

protected int index; //buf[]内索引号

protected long start_pos; //index对应的文件位置(相对于文件首的偏移量)

protected int await_count; //用于判断:下载速度足够就退出一个线程

private HttpReader hr;

public Writer(IWriterCallBack call_back URL u byte[] b int i) {

hr = new HttpReader(u);

if(HttpReadergetContentLength() == ) //实例化HttpReader对象都不成功

return;

icb = call_back;

buf = b;

Thread t = new Thread(thisdt_+i);

tsetPriority(ThreadNORM_PRIORITY + );

tstart();

}

public void run() {

int write_bytes= write_pos= rema = retry = ;

boolean cont = true;

while (cont) {

try {

// 等待空闲块

if(retry == ) {

if (icbtryWriting(this) == false)

break;

write_bytes = ;

rema = BuffRandAcceURLUNIT_LENGTH;

write_pos = index << BuffRandAcceURLUNIT_LENGTH_BITS;

}

// 定位

hrseek(start_pos);

// 下载一块

int w;

while (rema > && isalive) {

w = (rema < ) ? rema : ; //每次读几K合适?

if ((w = hrread(buf write_pos w)) == ) {

cont = false;

break;

}

rema = w;

write_pos += w;

start_pos += w;

write_bytes += w;

}

//通知线程

retry = ;

icbupdateBuffer(index write_bytes);

} catch (InterruptedException e) {

isalive = false;

icbterminateWriters();

break;

} catch (IOException e) {

if(++retry == HttpReaderMAX_RETRY) {

isalive = false;

icbterminateWriters();

break;

}

}

}

icbupdateWriterCount();

try {

hrclose();

} catch (Exception e) {}

hr = null;

buf = null;

icb = null;

}

}

IRandomAccess接口

无阻塞读取远程文件中需要随机读取文件接口BuffRandAcceURL类和BuffRandAcceFile类实现接口方法BuffRandAcceFile类实现读取本地磁盘文件这儿就不给出其源码了

package instream;

public interface IRandomAccess {

public int read() throws Exception;

public int read(byte b[]) throws Exception;

public int read(byte b[] int off int len) throws Exception;

public int dump(int src_off byte b[] int dst_off int len) throws Exception;

public void seek(long pos) throws Exception;

public long length();

public long getFilePointer();

public void close();

}

BuffRandAcceURL类功能创建下载线程read方法从buf[]读数据

关键是如何简单有效防止死锁?以下只是我的一次尝试请指正

/** *//**

* ;

*/

package instream;

import URL;

import URLDecoder;

import decodeHeader;

import tagMPTag;

import tagTagThread;

public final class BuffRandAcceURL implements IRandomAccess IWriterCallBack {

public static final int UNIT_LENGTH_BITS = ; //K

public static final int UNIT_LENGTH = << UNIT_LENGTH_BITS;

public static final int BUF_LENGTH = UNIT_LENGTH << ; //

public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;

public static final int BUF_LENGTH_MASK = (BUF_LENGTH );

private static final int MAX_WRITER = ;

private static long file_pointer;

private static int read_pos;

private static int fill_bytes;

private static byte[] buf; //同时也作读写同步锁:bufwait()/bufnotify()

private static int[] buf_bytes;

private static int buf_index;

private static int alloc_pos;

private static URL url = null;

private static boolean isalive = true;

private static int writer_count;

private static int await_count;

private long file_length;

private long frame_bytes;

public BuffRandAcceURL(String sURL) throws Exception {

this(sURLMAX_WRITER);

}

public BuffRandAcceURL(String sURL int download_threads) throws Exception {

buf = new byte[BUF_LENGTH];

buf_bytes = new int[UNIT_COUNT];

url = new URL(sURL);

//创建线程以异步方式解析ID

new TagThread(url);

//打印当前文件名

try {

String s = URLDecoderdecode(sURL GBK);

Systemoutprintln(start>> + ssubstring(slastIndexOf(/) + ));

s = null;

} catch (Exception e) {

Systemoutprintln(start>> + sURL);

}

//创建线程

for(int i = ; i < download_threads; i++)

new Writer(this url buf i+);

frame_bytes = file_length = HttpReadergetContentLength();

if(file_length == ) {

HeaderstrLastErr = 连接URL出错重试 + HttpReaderMAX_RETRY + 次后放弃;

throw new Exception(retry + HttpReaderMAX_RETRY);

}

writer_count = download_threads;

//缓沖

try_cache();

//跳过ID v

MPTag mPTag = new MPTag();

int v_size = mPTagcheckIDV(buf);

if (v_size > ) {

frame_bytes = v_size;

//seek(v_size):

fill_bytes = v_size;

file_pointer = v_size;

read_pos = v_size;

read_pos &= BUF_LENGTH_MASK;

int units = v_size >> UNIT_LENGTH_BITS;

for(int i = ; i < units; i++) {

buf_bytes[i] = ;

thisnotifyWriter();

}

buf_bytes[units] = v_size;

thisnotifyWriter();

}

mPTag = null;

}

private void try_cache() throws InterruptedException {

int cache_size = BUF_LENGTH;

if(cache_size > (int)file_length alloc_pos)

cache_size = (int)file_length alloc_pos;

cache_size = UNIT_LENGTH;

//等待填写当前正在读的那一块缓沖区

/**//*if(fill_bytes >= cache_size && writer_count > ) {

synchronized (buf) {

bufwait();

}

return;

}*/

//等待填满缓沖区

while (fill_bytes < cache_size) {

if (writer_count == || isalive == false)

return;

if(BUF_LENGTH > (int)file_length alloc_pos)

cache_size = (int)file_length alloc_pos UNIT_LENGTH;

Systemoutprintf(\r[缓沖%$f%%] (float)fill_bytes / cache_size * );

synchronized (buf) {

bufwait();

}

}

Systemoutprintf(\r);

}

private int try_reading(int i int len) throws Exception {

int n = (i == UNIT_COUNT ) ? : (i + );

int r = (buf_bytes[i] == ) ? : (buf_bytes[i] + buf_bytes[n]);

while (r < len) {

if (writer_count == || isalive == false)

return r;

try_cache();

r = (buf_bytes[i] == ) ? : (buf_bytes[i] + buf_bytes[n]);

}

return len;

}

/**//*

* 各个线程互斥等待空闲块

*/

public synchronized boolean tryWriting(Writer w) throws InterruptedException {

await_count++;

while (buf_bytes[buf_index] != && isalive) {

thiswait();

}

//下载速度足够就结束一个线程

if(writer_count > && wawait_count >= await_count &&

wawait_count >= writer_count)

return false;

if(alloc_pos >= file_length)

return false;

wawait_count = await_count;

await_count;

wstart_pos = alloc_pos;

windex = buf_index;

alloc_pos += UNIT_LENGTH;

buf_index = (buf_index == UNIT_COUNT ) ? : buf_index + ;

return isalive;

}

public void updateBuffer(int i int len) {

synchronized (buf) {

buf_bytes[i] = len;

fill_bytes += len;

bufnotify();

}

}

public void updateWriterCount() {

synchronized (buf) {

writer_count;

bufnotify();

}

}

public synchronized void notifyWriter() {

thisnotifyAll();

}

public void terminateWriters() {

synchronized (buf) {

if (isalive) {

isalive = false;

HeaderstrLastErr = 读取文件超时重试 + HttpReaderMAX_RETRY

+ 次后放弃请您稍后再试;

}

bufnotify();

}

notifyWriter();

}

public int read() throws Exception {

int iret = ;

int i = read_pos >> UNIT_LENGTH_BITS;

// 等待字节可读

while (buf_bytes[i] < ) {

try_cache();

if (writer_count == )

return ;

}

if(isalive == false)

return ;

// 读取

iret = buf[read_pos] & xff;

fill_bytes;

file_pointer++;

read_pos++;

read_pos &= BUF_LENGTH_MASK;

if (buf_bytes[i] == )

notifyWriter(); // 通知

return iret;

}

public int read(byte b[]) throws Exception {

return read(b blength);

}

public int read(byte[] b int off int len) throws Exception {

if(len > UNIT_LENGTH)

len = UNIT_LENGTH;

int i = read_pos >> UNIT_LENGTH_BITS;

// 等待有足够内容可读

if(try_reading(i len) < len || isalive == false)

return ;

// 读取

int tail_len = BUF_LENGTH read_pos; // write_pos != BUF_LENGTH

if (tail_len < len) {

Systemarraycopy(buf read_pos b off tail_len);

Systemarraycopy(buf b off + tail_len len tail_len);

} else

Systemarraycopy(buf read_pos b off len);

fill_bytes = len;

file_pointer += len;

read_pos += len;

read_pos &= BUF_LENGTH_MASK;

buf_bytes[i] = len;

if (buf_bytes[i] < ) {

int ni = read_pos >> UNIT_LENGTH_BITS;

buf_bytes[ni] += buf_bytes[i];

buf_bytes[i] = ;

notifyWriter();

} else if (buf_bytes[i] == )

notifyWriter();

return len;

}

/**//*

* 从src_off位置复制不移动文件指针

*/

public int dump(int src_off byte b[] int dst_off int len) throws Exception {

int rpos = read_pos + src_off;

if(try_reading(rpos >> UNIT_LENGTH_BITS len) < len || isalive == false)

return ;

int tail_len = BUF_LENGTH rpos;

if (tail_len < len) {

Systemarraycopy(buf rpos b dst_off tail_len);

Systemarraycopy(buf b dst_off + tail_len len tail_len);

} else

Systemarraycopy(buf rpos b dst_off len);

// 不发信号

return len;

}

public long length() {

return file_length;

}

public long getFilePointer() {

return file_pointer;

}

public void close() {

//

}

//

public void seek(long pos) throws Exception {

//

}

}

               

上一篇:Java多线程编程

下一篇:实战体会Java多线程编程精要之基础