1.BlockIo状态迁移
RecordFile是很重要的一个类,几个重要的变量:
final TransactionManager txnMgr;
private final LinkedList free = new LinkedList();
private final HashMap inUse = new HashMap();
private final HashMap dirty = new HashMap();
private final HashMap inTxn = new HashMap();
private RandomAccessFile file;
free是一个LinkedList,FIFO.其他几个维护着BlockIo对象的状态,如果BlockIo对象
从free里面取出来了,那么它的状态就是inUse了,所以RecordFile类get方法最后
会将BlockIo对象放到inUse的Map中:
BlockIo node;
......
inUse.put(key, node);
node.setClean(); //注意此时node被设置为dirty=false,也就是说BlockIo也有dirty这个指标
return node;
inUse的BlockIo对象如果被修改了,那么它的状态就变成dirty了。由于从inUse中取出的对象是否发生
了改变RecordFile对象不知道,需要调用者调用一个方法release:
/**
* Releases a block.
*
* @param blockid The record number to release.
* @param isDirty If true, the block was modified since the get().
*/
void release(long blockid, boolean isDirty)
throws IOException {
BlockIo node = (BlockIo) inUse.get(new Long(blockid));
if (node == null)
throw new IOException("bad blockid " + blockid + " on release");
if (!node.isDirty() && isDirty)
node.setDirty();
release(node);
}
/**
* Releases a block.
*
* @param block The block to release.
*/
void release(BlockIo block) {
Long key = new Long(block.getBlockId());
inUse.remove(key);
if (block.isDirty()) {
// System.out.println( "Dirty: " + key + block );
dirty.put(key, block);
} else {
if (!transactionsDisabled && block.isInTransaction()) {
inTxn.put(key, block);
} else {
free.add(block);
}
}
}
release方法输入两个参数,其中一个是isDirty,如果取出来的BlockIo对象修改了
就应该将isDirty置为true,否则修改不会被保存。也就是说修改后,BlockIo对象
被放入map dirty中。
release以后,BlockIo对象可能有三个状态,首先它会从inUse map里面删除。如果
BlockIo对象被修改,则被放入dirty map中。如果没有修改,就有可能放入free map中。
做完修改后,可以关闭这个RecordFile对象。
RecordFile file = new RecordFile( testFileName );
byte[] data = file.get( 0 ).getData();
data[ 14 ] = (byte) 'b';
file.release( 0, true );
file.close();
close方法的工作如下:
void close() throws IOException {
if (!dirty.isEmpty()) {
commit();
}
txnMgr.shutdown();
......
file.close(); //此时RandomAccessFile对象close
file = null;
}
可以看到close方法里面主要就是commit这个事务:
for (Iterator i = dirty.values().iterator(); i.hasNext(); ) {
BlockIo node = (BlockIo) i.next();
i.remove();
// System.out.println("node " + node + " map size now " + dirty.size());
if (transactionsDisabled) {
long offset = node.getBlockId() * BLOCK_SIZE;
file.seek(offset);
file.write(node.getData());
node.setClean();
free.add(node);
}
else {
txnMgr.add(node);
inTxn.put(new Long(node.getBlockId()), node);
}
}
如果transaction被disable,那么每一个节点进行更新,如下:
long offset = node.getBlockId() * BLOCK_SIZE;
file.seek(offset);
file.write(node.getData());
node.setClean();
free.add(node);
从最后一行,BlockIo对象node的状态重新变为free。以上状态的变化是如下一个循环:
free -> inUse -> dirty -> inTxn -> free
2.BlockIo的获取
RecordFile对应两个文件,一个是.db文件,另一个是.lg文件。缓存对象即BlockIo是操作
的最小单元,在get方法中,如果指定的blockid不在inTxn,dirty和free中,那么通过
getNewNode(blockid)得到一个新的BlockIo对象,如果blockid在有效范围内(这个判断是通过
计算offset得到的,offset=blockid*BLOCK_SIZE,offset小于.db文件的大小,那说明blockid对应
的数据块在文件内),那么就从.db文件中读取去blockid对应的数据库。如果不在有效范围内,那么
数据块就是cleanData的copy,即A block of clean data。
// get a new node and read it from the file
node = getNewNode(blockid);
long offset = blockid * BLOCK_SIZE;
if (file.length() > 0 && offset <= file.length()) {
read(file, offset, node.getData(), BLOCK_SIZE);
} else {
System.arraycopy(cleanData, 0, node.getData(), 0, BLOCK_SIZE);
}
inUse.put(key, node);
node.setClean();
return node;
这里getNewNode()方法如下:
private BlockIo getNewNode(long blockid)
throws IOException
{
BlockIo retval = null;
if (!free.isEmpty()) {
retval = (BlockIo) free.removeFirst();
}
if (retval == null)
retval = new BlockIo(0, new byte[BLOCK_SIZE]);
retval.setBlockId(blockid);
retval.setView(null);
return retval;
}
getNewNode()不是直接new BlockIo(),而是从free中取,free中的
BlockIo对象没有被使用,则直接利用,采取的方式是Least Recently Used策略。BlockIo实现了自定义的Externalizable
序列化:
// implement externalizable interface
public void readExternal(ObjectInput in)
throws IOException, ClassNotFoundException {
blockId = in.readLong();
int length = in.readInt();
data = new byte[length];
in.readFully(data);
}
// implement externalizable interface
public void writeExternal(ObjectOutput out) throws IOException {
out.writeLong(blockId);
out.writeInt(data.length);
out.write(data);
}
3.TransactionManager的事务处理
序列化用在事务这块,如果没有启动事务,RecordFile直接写到.db文件中,不会进行序列化操作。
RecordFile演示了事务的操作:
final TransactionManager txnMgr;
......
txnMgr = new TransactionManager(this);
......
if (!transactionsDisabled) {
txnMgr.start();
}
......
if (!transactionsDisabled)
txnMgr.add(node);
......
if (!transactionsDisabled) {
txnMgr.commit();
}
......
txnMgr.shutdown();
以上是一个完整的事务过程,下面对以上过程发生的操作深入阐述:
TransactionManager(this)以RecordFile作为参数进行构造:
TransactionManager(RecordFile owner) throws IOException {
this.owner = owner;
recover();
open();
}
TransactionManager持有RecordFile的引用,然后进行recover和open操作。recover主要是
对log file进行操作,如果有事务没有执行,那么执行事务将log file中的数据写到.db文件中
并且对RecordFile进行sync()操作,最后把log file删除。
private void recover() throws IOException {
String logName = makeLogName();
File logFile = new File(logName);
if (!logFile.exists())
return;
if (logFile.length() == 0) {
logFile.delete();
return;
}
FileInputStream fis = new FileInputStream(logFile);
ObjectInputStream ois = new ObjectInputStream(fis);
try {
if (ois.readShort() != Magic.LOGFILE_HEADER)
throw new Error("Bad magic on log file");
} catch (IOException e) {
// corrupted/empty logfile
logFile.delete();
return;
}
while (true) {
ArrayList blocks = null;
try {
blocks = (ArrayList) ois.readObject();
} catch (ClassNotFoundException e) {
throw new Error("Unexcepted exception: " + e);
} catch (IOException e) {
// corrupted logfile, ignore rest of transactions
break;
}
synchronizeBlocks(blocks.iterator(), false);
// ObjectInputStream must match exactly each
// ObjectOutputStream created during writes
try {
ois = new ObjectInputStream(fis);
} catch (IOException e) {
// corrupted logfile, ignore rest of transactions
break;
}
}
owner.sync();
logFile.delete();
}
open的操作相对简单很多,只是进行一些初始化赋值工作:
/** Opens the log file */
private void open() throws IOException {
fos = new FileOutputStream(makeLogName());
oos = new ObjectOutputStream(fos);
oos.writeShort(Magic.LOGFILE_HEADER);
oos.flush();
curTxn = -1;
}
下一步就是start这个txnMgr了:
void start() throws IOException {
curTxn++;
if (curTxn == _maxTxns) {
synchronizeLogFromMemory();
curTxn = 0;
}
txns[curTxn] = new ArrayList();
}
start的时候就将当前事务数增加1,如果当前事务数等于设置的最大事务数,就进行sync处理。
sync处理的代码如下:
/** Synchs in-core transactions to data file and opens a fresh log */
private void synchronizeLogFromMemory() throws IOException {
close();
TreeSet blockList = new TreeSet( new BlockIoComparator() );
int numBlocks = 0;
int writtenBlocks = 0;
for (int i = 0; i < _maxTxns; i++) {
if (txns[i] == null)
continue;
// Add each block to the blockList, replacing the old copy of this
// block if necessary, thus avoiding writing the same block twice
for (Iterator k = txns[i].iterator(); k.hasNext(); ) {
BlockIo block = (BlockIo)k.next();
if ( blockList.contains( block ) ) {
block.decrementTransactionCount();
}
else {
writtenBlocks++;
boolean result = blockList.add( block );
}
numBlocks++;
}
txns[i] = null;
}
// Write the blocks from the blockList to disk
synchronizeBlocks(blockList.iterator(), true);
owner.sync();
open();
}
sync的主要操作就是synchronizeBlocks操作:
private void synchronizeBlocks(Iterator blockIterator, boolean fromCore)
throws IOException
{
// write block vector elements to the data file.
while ( blockIterator.hasNext() ) {
BlockIo cur = (BlockIo)blockIterator.next();
owner.synch(cur);
if (fromCore) {
cur.decrementTransactionCount();
if (!cur.isInTransaction()) {
owner.releaseFromTransaction(cur, true);
}
}
}
}
接下来的操作txnMgr.add(node):
/**
* Indicates the block is part of the transaction.
*/
void add(BlockIo block) throws IOException {
block.incrementTransactionCount();
txns[curTxn].add(block);
}
这个操作很简单,就是将BlockIo对象放入到当前事务的ArrayList当中。之后commit操作:
/**
* Commits the transaction to the log file.
*/
void commit() throws IOException {
oos.writeObject(txns[curTxn]);
sync();
// set clean flag to indicate blocks have been written to log
setClean(txns[curTxn]);
// open a new ObjectOutputStream in order to store
// newer states of BlockIo
oos = new ObjectOutputStream(fos);
}
最后的操作就是txnMgr.shutdown():
/**
* Shutdowns the transaction manager. Resynchronizes outstanding
* logs.
*/
void shutdown() throws IOException {
synchronizeLogFromMemory();
close();
}