如果要处理多线程读写文件造成的数据不一致的问题,第一个想到的就是加锁。在
java.concurrent.locks
中
ReadWriteLock
分别定义了乐观锁读锁和悲观锁写锁,将以上的情况都考虑到了,可以很好地处理多线程读写同一个文件的情况。但是既然加锁,必然会导致多线程在读写文件时效率较低,在不同情形下似乎有更好的解决方案:
通过
ReadWriteLock
为文件读写过程加锁,防止数据与预想不一致,同时降低了多线程处理的效率
如果多线程频繁写入少量数据,可创建一个类缓存需要写入的数据,并且按时批量写入数据,减少频繁操作文件及加锁操作带来的问题。
可通过RandomAccessFile规划好不同位置,多线程同时操作不同位置的写入
有关RandomAccessFile,之前在
JAVA篇:Java IO (三)访问文件--转换流和文件流
对RandomAccessFile进行了简单的了解。
1 实践一 多线程写文件RandomAccessFile不加锁
RandomAccessFile可选模式是
r
,
rw
,
rws
,或者
rwd
,写入时不会清空原数据,会在指定位置覆盖原本内容写入新内容。
清空RandomAccessFile打开的文件的方法为
rw.setLength(0)
为了防止多线程间写入内容互相覆盖需要规划好写入的位置,插入的话会更加麻烦
规划好写入的位置有两种,a)如果每行写入的字数相同很简单可以计算得到足够的位置 b)预留足够的位置
预留足够的位置的结果其实会有些瑕疵,结果如下所示:
public
void
run(){
try
{
RandomAccessFile rw
=
new
RandomAccessFile(
this
.fileName,"rw"
);
rw.seek(pos);
rw.writeBytes(text);
rw.close();
}
catch
(FileNotFoundException e) {
e.printStackTrace();
}
catch
(IOException e) {
e.printStackTrace();
public
void
test(){
ExecutorService pool
= Executors.newFixedThreadPool(10
);
int
linwsize = 20;
//
必须预留足够的一行的空间,否则会导致覆盖,但是预留空间过大,也会出现空的字符
int
lines = 50;
//
需要写入100行
String fileName = "a.csv"
;
RandomAccessFile rw
=
null
;
try
{
rw
=
new
RandomAccessFile(fileName,"rw"
);
rw.setLength(
0);
//
清空文件
/*
写入标题栏
*/
rw.writeBytes(
"index,text\n"
);
rw.close();
}
catch
(FileNotFoundException e) {
e.printStackTrace();
}
catch
(IOException e) {
e.printStackTrace();
for
(
int
i=1;i<=lines;i++
){
pool.execute(
new
RWrite(i*linwsize,i+", text"+i+"\n"
,fileName));
pool.shutdown();
2 实践二 将写入文件交给一个线程进行
类似于生产者消费者模式,只是只有一个消费者。
本来只想简单地写一下的,但是因为在某处将0写成了9调试了许久。
/* 将写入工作交给一个线程 */
/* 加工数据的Task。生产者 */
class DataTask implements Runnable{
BlockingQueue<String> data_put;
private int start;
private int end;
public DataTask(BlockingQueue<String> data_put,int start,int end){
this.data_put = data_put;
this.start = start;
this.end = end;
@Override
public void run(){
/*System.out.println(String.format("%s-%s:%d-%d开始",
System.currentTimeMillis(),
Thread.currentThread().getName(),
this.start,this.end));*/
for(int i=start;i<=end;i++){
try {
/* 加工数据需要时间,随机 */
Thread.sleep(10+new Random().nextInt(20));
String s = String.format("%s-%s:%d-%d[%d]\n",
System.currentTimeMillis(),
Thread.currentThread().getName(),
this.start,this.end,i);
data_put.put(i+"\n");
} catch (InterruptedException e) {
e.printStackTrace();
return;
System.out.println(String.format("%s-%s:%d-%d结束",
System.currentTimeMillis(),
Thread.currentThread().getName(),
this.start,this.end));
/* 缓存提交的数据,并写入 */
class WriteTask implements Runnable{
private BlockingQueue<String> data_in = new ArrayBlockingQueue<>(10);
private byte[] buffer = new byte[1024];
private int th = (int)(1024*0.8);
int length=0;
private String fileName;
public WriteTask(String fileName){
this.fileName = fileName;
try {
/* 清空要写入数据的文件 */
FileOutputStream fileOutputStream = new FileOutputStream(fileName);
fileOutputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
public BlockingQueue<String> get_Queue(){
return data_in;
private void write(){
if(length==0) return;
try {
//System.out.println(length);
//System.out.println(new String(buffer));
System.out.println("开始写入……");
FileOutputStream fileOutputStream = new FileOutputStream(fileName,true);
fileOutputStream.write(buffer,0,length);
fileOutputStream.close();
System.out.println(length+"写入完成。");
length = 0;
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
private void close(){
//System.out.println(new String(buffer));
this.write();
// System.out.println(length);
//data_in = null;
@Override
public void run(){
while (true){
try {
byte[] tmp= data_in.take().getBytes();
System.arraycopy(tmp,0,buffer,length,tmp.length);
length = length+tmp.length;
if(length>=th){
this.write();
} catch (InterruptedException e) {
//e.printStackTrace();
break;
public void test3(){
ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
String fileName = "b.csv";
WriteTask writeTask = new WriteTask(fileName);
pool.execute(writeTask);
int num = 20;
int writenum = 100;
for(int i=0;i<num;i++){
//System.out.println(i*writenum+"---"+((i+1)*writenum-1));
pool.execute(new DataTask(writeTask.get_Queue(),i*writenum,((i+1)*writenum-1)));
pool.shutdown();
while (true){
try {
pool.awaitTermination(500,TimeUnit.MILLISECONDS);
if(pool.getActiveCount()==1){
writeTask.close();
Thread.sleep(10);
pool.shutdownNow();
if(pool.getActiveCount()==0){
break;
} catch (InterruptedException e) {
e.printStackTrace();
break;
3 实践三 多线程复制文件RandomAccessFile不加锁
使用相同的偏移量,进行读写,也能防止多线程写入时发生冲突。
复制文件包含读写操作,好处是不必自己规划偏移量。
/* 多线程复制文件 */
/* 多线程写入文件指定位置 */
class RCopy implements Runnable{
public int pos;
public int len;
public String readFile;
public String writeFlie;
public RCopy(String readFile,String writeFlie,int pos,int len){
this.pos = pos;
this.len = len;
this.readFile = readFile;
this.writeFlie = writeFlie;
@Override
public void run(){
byte[] bytes = new byte[len];
try {
RandomAccessFile rr = new RandomAccessFile(this.readFile,"r");
RandomAccessFile rw = new RandomAccessFile(this.writeFlie,"rw");
rr.seek(pos);
rw.seek(pos);
rr.read(bytes);
rw.write(bytes);
rr.close();
rw.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
public void test2(){
ExecutorService pool = Executors.newFixedThreadPool(10);
String readFile = "b.csv";
String writeFlie = "a.csv";
long totalLen = 0;
int len = 1024; /* 每个task写入的大小 */
try {
//读取需要复制文件的大小
RandomAccessFile file = new RandomAccessFile(readFile,"r");
totalLen = file.length();
System.out.println("length:"+totalLen);
//清空需要写入的文件
file = new RandomAccessFile(writeFlie,"rw");
file.setLength(0);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
int tasknum = 11;
for(int i=0;i<totalLen;i = i+len){
int alen = len;
if(i+len>totalLen) alen = (int)totalLen-i;
//System.out.println(i+":"+alen);
pool.execute(new RCopy(readFile,writeFlie,i,alen));
pool.shutdown();
RandomAccessFile实现断点续传
RxRetrofit - 终极封装 - 深入浅出 & 断点续传
利用RandomAccessFile向文件指定位置插入内容
使用多线程以及RandomAccessfile来实现多线程复制文件