多线程写入同一文件

Yalong Yang

2013/01/06
大家都知道文件是互斥的资源,一个线程打开在用,另外一个线程就不能再打开了。
然而由于项目需求,需要做多线程下载,也就是需要多个线程下载数据往同一个文件写入数据。
一开始想找一些现成的解法,看到的解法大多数是:
每个线程写一个临时文件,当所有线程都现在完成后再将所有的临时文件合并。
这样的解法显然不给力,写很多临时文件就够不和谐了,如果是大文件最后合并还很耗费时间。
所以本人实现了个 多生产者,一个消费者 模式的文件写入方式。
生产者也就是下载的线程,消费者就是写入文件的线程。
其实也是挺简单的~~
首先选定 消费者 数据结构,显然用单口的线性数据结构比较合适,stack、queue都行~
然后就是通过,公开一个添加数据的接口 供 下载线程调用,
当有数据添加进来,写文件的线程就调用写数据的函数
其中需要两把锁进行互斥,
第一个是 对数据结构 保护的锁,由于多线程添加数据,并且写数据时需要取数据,所以这个锁是必须的;
第二个是 对文件写入的锁,由于文件是互斥的,所以一个时间只能有一个写文件的调用。
以上就可以实现多线程写同一文件了。
实现可参考
public class PartialFile
{
public PartialFile(byte[] data, int dataSize, long startPosition)
{
this.StartPosition = startPosition;
this.DataSize = dataSize;
this.Data = new byte[DataSize];
System.Buffer.BlockCopy(data, 0, this.Data, 0, DataSize);
}
public int DataSize
{
get;
set;
}
public byte[] Data
{
get;
set;
}
public long StartPosition
{
get;
set;
}
}
public class FileWriter
{
public event WriteSuccessHandler OnWriteSuccess;
public delegate void WriteSuccessHandler(FileWriter w, long startPosition, long endPosition);
public event WriteErrorHandler OnWriteError;
public delegate void WriteErrorHandler(FileWriter w);
private Stream writeStream
{
get;
set;
}
private StorageFile file
{
get;
set;
}
public FileWriter(StorageFile file)
{
this.file = file;
}
public async Task Initial()
{
writeStream = await file.OpenStreamForWriteAsync();
}
public void AddPartialFile(PartialFile partialFile)
{
//Debug.WriteLine("Download at {0} for {1} bytes", partialFile.StartPosition, partialFile.DataSize);
//new Task(() => onPartialFileAdded(partialFile)).Start();
onPartialFileAdded(partialFile);
//onPartialFileAdded(partialFile);
}
private readonly AsyncLock m_lock = new AsyncLock();
object writeLock = new object();
public void Close()
{
//using (var releaser = await m_lock.LockAsync())
lock(writeLock)
{
this.writeStream.Dispose();
canWrite = false;
}
}
private bool canWrite = true;
private void onPartialFileAdded(PartialFile partialFile)
{
bool shouldNotify = false;
//using (var releaser = await m_lock.LockAsync())
lock (writeLock)
{
if (canWrite == true)
{
shouldNotify = true;
try
{
writeStream.Position = partialFile.StartPosition;
writeStream.Write(partialFile.Data, 0, partialFile.DataSize);
Debug.WriteLine("Write data at {0} for {1} bytes, now in {2}",
partialFile.StartPosition, partialFile.DataSize, partialFile.StartPosition + partialFile.DataSize);
}
catch (Exception ex)
{
Debug.WriteLine(ex);
if (OnWriteError != null)
{
OnWriteError(this);
}
}
}
}
if (shouldNotify == true)
{
if (OnWriteSuccess != null)
{
OnWriteSuccess(this, partialFile.StartPosition, partialFile.StartPosition + partialFile.DataSize);
}
}
}
}