博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JAVA实现环形缓冲多线程读取远程文件
阅读量:6839 次
发布时间:2019-06-26

本文共 8571 字,大约阅读时间需要 28 分钟。

hot3.png

       如果用HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。多线程中的同步、HttpURLConnection的超时阻塞等因素都会使代码看起来异常复杂。

      问题模型 数组buf[],N个“写”线程从文件顺序读入固定长度数据写入buf[];一个“读”线程按照文件内容顺序从缓冲区读取,一次读任意长度。一个写线程发生错误或文件读完,这个读线程和所有写线程退出。

      针对上述问题,简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块32K,下载线程负责向缓冲区写数据,每次写一块;每次读小于32K的任意字节。线程同步:各个写线程互斥等待空闲块,用信号量机制分配空闲块;各写线程并发填写buf[];读线程和各写线程并发使用buf[]。

      为突出主题略去了一些和本文无关的代码。

 

一、HttpReader类功能:HTTP协议从指定URL读取数据。

package instream;import java.io.IOException;import java.io.InputStream;import java.net.HttpURLConnection;import java.net.URL;public final class HttpReader {	public static final int MAX_RETRY = 10;	private URL url;	private HttpURLConnection httpConnection;	private InputStream in_stream;	private long cur_pos;			//决定seek方法中是否执行文件读取定位	private int connect_timeout;	private int read_timeout;		public HttpReader(URL u) {		this(u, 5000, 5000);	}		public HttpReader(URL u, int connect_timeout, int read_timeout) {		this.connect_timeout = connect_timeout;		this.read_timeout = read_timeout;		url = u;	}	public int read(byte[] b, int off, int len) throws IOException {		int r = in_stream.read(b, off, len);		cur_pos += r;		return r;	}				/*	 * 抛出异常通知重试.	 * 例如响应码503可能是由某种暂时的原因引起的,同一IP频繁的连接请求会遭服务器拒绝.	 */	public void seek(long start_pos) throws IOException {		if (start_pos == cur_pos && in_stream != null)			return;		if (httpConnection != null) {			httpConnection.disconnect();			httpConnection = null;		}		if (in_stream != null) {			in_stream.close();			in_stream = null;		}		httpConnection = (HttpURLConnection) url.openConnection();		httpConnection.setConnectTimeout(connect_timeout);		httpConnection.setReadTimeout(read_timeout);		String sProperty = "bytes=" + start_pos + "-";		httpConnection.setRequestProperty("Range", sProperty);		//httpConnection.setRequestProperty("Connection", "Keep-Alive");		int responseCode = httpConnection.getResponseCode();		if (responseCode < 200 || responseCode >= 300) {			try {				Thread.sleep(200);			} catch (InterruptedException e) {				e.printStackTrace();			}			throw new IOException("HTTP responseCode="+responseCode);		}		in_stream = httpConnection.getInputStream();		cur_pos = start_pos;	}}

  

二、IWriterCallBack接口

package instream;/* * 读/写通信接口.类似于C++的回调函数 *  * 例: * class BuffRandAcceURL 内实现本接口的方法tryWriting() * class BuffRandAcceURL 内new Writer(this, ...)传值到Writer.icb * class Writer 内调用icb.tryWriting() */public interface IWriterCallBack {	public int tryWriting() throws InterruptedException;	public void updateBuffer(int i, int len);	public void updateWriterCount();	public int getWriterCount();	public void terminateWriters();}

  

三、Writer类:下载线程,负责向buf[]写数据。

package instream;import java.net.URL;public final class Writer implements Runnable {	private static boolean isalive = true;	// 一个线程超时其它线程也能退出	private static byte[] buf;	private static IWriterCallBack icb;	private HttpReader hr;		public Writer(IWriterCallBack cb, URL u, byte[] b, int i) {		hr = new HttpReader(u);		icb = cb;		buf = b;		Thread t = new Thread(this,"dt_"+i);		t.setPriority(Thread.NORM_PRIORITY + 1);		t.start();	}		public void run() {		int wbytes=0, wpos=0, rema = 0, retry = 0;		int idxmask = BuffRandAcceURL.UNIT_COUNT - 1;		boolean cont = true;		int index = 0;		//buf[]内"块"索引号		int startpos = 0;	//index对应的文件位置(相对于文件首的偏移量)		long time0 = 0;		while (cont) {			try {				// 1.等待空闲块				if(retry == 0) {					if ((startpos = icb.tryWriting()) == -1)						break;					index = (startpos >> BuffRandAcceURL.UNIT_LENGTH_BITS) & idxmask;					wpos = startpos & BuffRandAcceURL.BUF_LENGTH_MASK;					wbytes = 0;					rema = BuffRandAcceURL.UNIT_LENGTH;					time0 = System.currentTimeMillis();				}								// 2.定位				hr.seek(startpos);				// 3.下载"一块"				int w;				while (rema > 0 && isalive) {					w = (rema < 2048) ? rema : 2048; //每次读几K合适?					if ((w = hr.read(buf, wpos, w)) == -1) {						cont = false;						break;					}					rema -= w;					wpos += w;					startpos += w;	// 能断点续传					wbytes += w;				}								// 下载速度足够快就结束本线程				long t = System.currentTimeMillis() - time0;				if(icb.getWriterCount() > 1 && t < 500)					cont = false;								//4.通知"读"线程				retry = 0;				icb.updateBuffer(index, wbytes);			} catch (Exception e) {				if(++retry == HttpReader.MAX_RETRY) {					isalive = false;					icb.terminateWriters();					break;				}			}		}		icb.updateWriterCount();		try {			hr.close();		} catch (Exception e) {}		hr = null;	}}

 

四、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。关键是如何简单有效防止死锁?

package instream;import java.net.URL;public final class BuffRandAcceURL implements IWriterCallBack {	public static final int UNIT_LENGTH_BITS = 15;	public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS; //2^16=32K	public static final int BUF_LENGTH = UNIT_LENGTH << 4;	public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS; //16块	public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);	private static final int MAX_WRITER = 5;	private static long file_pointer;	private static int read_pos;	private static int fill_bytes;	private static byte[] buf;			//同时作写线程同步锁	private static int[] unit_bytes;	//同时作读线程互斥锁	private static int alloc_pos;	private static URL url;	private static boolean isalive = true;	private static int writer_count;	private static long file_length;	private static long frame_bytes;	private static int free_unit = UNIT_COUNT;	// "信号量"计数器		public BuffRandAcceURL(String sURL) throws Exception {		this(sURL,MAX_WRITER);	}		public BuffRandAcceURL(String sURL, int download_threads) throws Exception {		buf = new byte[BUF_LENGTH];		unit_bytes = new int[UNIT_COUNT];		url = new URL(sURL);		// 创建"写"线程		writer_count = download_threads;		for (int i = 0; i < download_threads; i++) {			new Writer(this, url, buf, i + 1);			Thread.sleep(200);		}		try_cache();	}		/*	 * 缓冲	 */	private void try_cache() throws InterruptedException {		int cache_size = BUF_LENGTH;		int bi = unit_bytes[read_pos >> UNIT_LENGTH_BITS];		if(bi != 0)			cache_size -= UNIT_LENGTH - bi;		while (fill_bytes < cache_size) {			if (writer_count == 0 || isalive == false)				return;			synchronized (unit_bytes) {				unit_bytes.wait(200);	//wait(200)错过通知也可结束循环?			}		}	}	private int try_reading(int i, int len) throws Exception {		int n = (i + 1) & (UNIT_COUNT - 1);		int r = (unit_bytes[i] > 0) ? (unit_bytes[i] + unit_bytes[n]) : unit_bytes[i];		if (r < len) {			if (writer_count == 0 || isalive == false)				return r;			try_cache();		}				return len;	}		/*	 * 各个"写"线程互斥等待空闲块. 空闲块按由小到大的顺序分配.	 */	public int tryWriting() throws InterruptedException {		int ret = -1;		synchronized (buf) {			while (free_unit == 0 && isalive)				buf.wait();						if(alloc_pos >= file_length || isalive == false)				return -1;			ret = alloc_pos;			alloc_pos += UNIT_LENGTH;			free_unit--;		}		return ret;	}		/*	 * "写"线程向buf[]写完数据后调用,通知"读"线程	 */	public void updateBuffer(int i, int len) {		synchronized (unit_bytes) {			unit_bytes[i] = len;			fill_bytes += len;			unit_bytes.notify();		}	}		/*	 * "写"线程准备退出时调用	 */	public void updateWriterCount() {		synchronized (unit_bytes) {			writer_count--;			unit_bytes.notify();		}	}		public int getWriterCount() {		return writer_count;	}		/*	 * read方法内调用	 */	public void notifyWriter() {		synchronized (buf) {			buf.notifyAll();		}	}		/*	 * 被某个"写"线程调用,用于终止其它"写"线程;isalive也影响"读"线程流程	 */	public void terminateWriters() {		synchronized (unit_bytes) {				if (isalive) {				isalive = false;				System.out.println("\n读取文件超时。重试 " + HttpReader.MAX_RETRY						+ " 次后放弃,请您稍后再试。");			}			unit_bytes.notify();		}		notifyWriter();	}		public int read(byte[] b, int off, int len) throws Exception {		int i = read_pos >> UNIT_LENGTH_BITS;				// 1.等待有足够内容可读		if(try_reading(i, len) < len || isalive == false)			return -1;		// 2.读取		int tail = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH		if (tail < len) {			System.arraycopy(buf, read_pos, b, off, tail);			System.arraycopy(buf, 0, b, off + tail, len - tail);		} else			System.arraycopy(buf, read_pos, b, off, len);		fill_bytes -= len;		file_pointer += len;		read_pos += len;		read_pos &= BUF_LENGTH_MASK;		unit_bytes[i] -= len;		if (unit_bytes[i] < 0) {			int ni = read_pos >> UNIT_LENGTH_BITS;			unit_bytes[ni] += unit_bytes[i];			unit_bytes[i] = 0;			free_unit++;			notifyWriter();		} else if (unit_bytes[i] == 0) {			free_unit++;	// 空闲块"信号量"计数加1			notifyWriter(); 	// 3.通知		}		// 如果下一块未填满,意味着文件读完,第1步已处理一次读空两块的情况		return len;	}}

 

本文是JAVA开源项目jmp123源代码的一部分,单独成文旨在与朋友们交流文中提出的问题模型的解决方法。

【jmp123下载地址】

转载于:https://my.oschina.net/darkness/blog/363516

你可能感兴趣的文章
shell 调试技术,伪信号打印程序出错位置
查看>>
(转)批处理
查看>>
struts标签绑定Map<String, List<Map<String, Object>>>
查看>>
navigator.userAgent.indexOf来判断浏览器类型
查看>>
【重磅消息】-支付宝小程序可以申请公测啦!!!
查看>>
vm用nat方式访问外网及访问宿主机
查看>>
opencv 配置
查看>>
python re group()
查看>>
git 版本控制使用总结
查看>>
mysql 表的多列交叉去重问题
查看>>
TransactionAnalyzer分析消息树序列图
查看>>
ROCKETMQ——NameServ源码分析
查看>>
Spring Boot 2 (一) System Requirements
查看>>
复合索引和单一索引
查看>>
十六进制转为字符串
查看>>
shell 操作 sftp例子
查看>>
Webpack打包生成相对路径的资源引用
查看>>
varnish 4.0 官方文档翻译2-安装varnish
查看>>
速度爆快:Mac OS X上用smbpasswd改Windows domain用户密码
查看>>
2.新浪微博Swift项目第二天
查看>>