Hadoop中DataNode与NameNode之间的心跳机制

2019-03-28 13:07|来源: 网络

DataNode: 用于存储HDFS的数据,

public class DataNode extends Configured
    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {。。。}


1,实现了InterDatanodeProtocol, ClientDatanodeProtocol来供与客户端以及其他DataNode的通信,接受请求。

2,实现Runnable 接口

 


首先是来看下run()方法

  public void run() {
    LOG.info(dnRegistration + "In DataNode.run, data = " + data);


    // start dataXceiveServer
    dataXceiverServer.start();
       
    while (shouldRun) {
      try {
        startDistributedUpgradeIfNeeded();
        offerService();
      } catch (Exception ex) {
        LOG.error("Exception: " + StringUtils.stringifyException(ex));
        if (shouldRun) {
          try {
            Thread.sleep(5000);
          } catch (InterruptedException ie) {
          }
        }
      }
    }
       
    LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
    shutdown();
  }


①,    dataXceiverServer.start();开启数据监听守护进程,有数据到来时,开启一个DataXceiver线程

DataXceiverServer.java


 public void run() {
    while (datanode.shouldRun) {
      try {
        Socket s = ss.accept();
        s.setTcpNoDelay(true);
        new Daemon(datanode.threadGroup,
            new DataXceiver(s, datanode, this)).start();
      } catch (SocketTimeoutException ignored) {
        // wake up to see if should continue to run
      } catch (IOException ie) {
        LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
                                + StringUtils.stringifyException(ie));
      } catch (Throwable te) {
        LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:"
                                + StringUtils.stringifyException(te));
        datanode.shouldRun = false;
      }
    }
    try {
      ss.close();
    } catch (IOException ie) {
      LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
                              + StringUtils.stringifyException(ie));
    }
  }


在DataXceiver中主要是从DataXceiverServer读写数据

-------DataXceiver.java----------

  /**
  * Read/write data from/to the DataXceiveServer.
  */
  public void run() {
    DataInputStream in=null;
    try {
      in = new DataInputStream(
          new BufferedInputStream(NetUtils.getInputStream(s),
                                  SMALL_BUFFER_SIZE));
      short version = in.readShort();
      if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
        throw new IOException( "Version Mismatch" );
      }
      boolean local = s.getInetAddress().equals(s.getLocalAddress());
      byte op = in.readByte();
      // Make sure the xciver count is not exceeded
      int curXceiverCount = datanode.getXceiverCount();
      if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
        throw new IOException("xceiverCount " + curXceiverCount
                              + " exceeds the limit of concurrent xcievers "
                              + dataXceiverServer.maxXceiverCount);
      }
      long startTime = DataNode.now();
      switch ( op ) {
      case DataTransferProtocol.OP_READ_BLOCK:
        readBlock( in );
        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.readsFromLocalClient.inc();
        else
          datanode.myMetrics.readsFromRemoteClient.inc();
        break;
      case DataTransferProtocol.OP_WRITE_BLOCK:
        writeBlock( in );
        datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
        if (local)
          datanode.myMetrics.writesFromLocalClient.inc();
        else
          datanode.myMetrics.writesFromRemoteClient.inc();
        break;
      case DataTransferProtocol.OP_READ_METADATA:
        readMetadata( in );
        datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
        replaceBlock(in);
        datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_COPY_BLOCK:
            // for balancing purpose; send to a proxy source
        copyBlock(in);
        datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
        break;
      case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
        getBlockChecksum(in);
        datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
        break;
      default:
        throw new IOException("Unknown opcode " + op + " in data stream");
      }
    } catch (Throwable t) {
      LOG.error(datanode.dnRegistration + ":DataXceiver",t);
    } finally {
      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
                              + datanode.getXceiverCount());
      IOUtils.closeStream(in);
      IOUtils.closeSocket(s);
      dataXceiverServer.childSockets.remove(s);
    }
  }

相关问答

更多