`
liudeh_009
  • 浏览: 239750 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Jetty基于NIO的方式处理请求

阅读更多

       Jetty基于NIO的方式处理请求的类是SelectChannelConnector,该类同样继承AbstractLifeCycle类,SelectChannelConnector初始化的时候会调用AbstractLifeCycle类的start()方法,如下:

       

 public final void start() throws Exception
    {
        synchronized (_lock)
        {
            try
            {
                if (_state == STARTED || _state == STARTING)
                    return;
                setStarting();
                doStart();
                Log.debug("started {}",this);
                setStarted();
            }
            catch (Exception e)
            {
                setFailed(e);
                throw e;
            }
            catch (Error e)
            {
                setFailed(e);
                throw e;
            }
        }
    }

   doStart()方法在SelectChannelConnector类中.如下:

  

   protected void doStart() throws Exception
    {
        _manager.setSelectSets(getAcceptors());//设置接收请求的线程个数,默认1个
        _manager.setMaxIdleTime(getMaxIdleTime());
        _manager.setLowResourcesConnections(getLowResourcesConnections());
        _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
        _manager.start();//初始化Selector
         open();//初始化ServerSocketChannel
        _manager.register(_acceptChannel);
        super.doStart();
    }

   _manager类名为SelectorManager,open()方法如下:

  

    public void open() throws IOException
    {
        synchronized(this)
        {
            if (_acceptChannel == null)
            {
                // Create a new server socket
                _acceptChannel = ServerSocketChannel.open();

                // Bind the server socket to the local host and port
                _acceptChannel.socket().setReuseAddress(getReuseAddress());
                InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
                _acceptChannel.socket().bind(addr,getAcceptQueueSize());

                // Set to non blocking mode
                _acceptChannel.configureBlocking(false);
                
            }
        }
    }

     super.doStart()方法如下:

   

 protected void doStart() throws Exception
    {
        if (_server==null)
            throw new IllegalStateException("No server");
        
        // open listener port
        open();//再一次调用open()方法,确保ServerSocketChannel启动,调用两次就能确保启动?
        super.doStart();
        
        if (_threadPool==null)
            _threadPool=_server.getThreadPool();
        if (_threadPool!=_server.getThreadPool() && (_threadPool instanceof LifeCycle))
            ((LifeCycle)_threadPool).start();
        
        // Start selector thread
        synchronized(this)
        {
            _acceptorThread=new Thread[getAcceptors()];

            for (int i=0;i<_acceptorThread.length;i++)
            {
                if (!_threadPool.dispatch(new Acceptor(i)))//启动接受请求的线程
                {
                    Log.warn("insufficient maxThreads configured for {}",this);
                    break;
                }
            }
        }
        
        Log.info("Started {}",this);
    }

     Acceptor线程的run()方法如下:

  

 public void run()
        {   
            Thread current = Thread.currentThread();
            String name;
            synchronized(AbstractConnector.this)//设置当前线程的名字,是不是太复杂点
            {
                if (_acceptorThread==null)
                    return;
                
                _acceptorThread[_acceptor]=current;
                name =_acceptorThread[_acceptor].getName();
                current.setName(name+" - Acceptor"+_acceptor+" "+AbstractConnector.this);
            }
            int old_priority=current.getPriority();
            
            try
            {
                current.setPriority(old_priority-_acceptorPriorityOffset);
                while (isRunning() && getConnection()!=null)//connector初始化并且ServerSocketChannel存在
                {
                    try
                    {
                        accept(_acceptor);//处理收到的请求 
                    }
                    catch(EofException e)
                    {
                        Log.ignore(e);
                    }
                    catch(IOException e)
                    {
                        Log.ignore(e);
                    }
                    catch(ThreadDeath e)
                    {
                        throw e;
                    }
                    catch(Throwable e)
                    {
                        Log.warn(e);
                    }
                }
            }
            finally
            {   
                current.setPriority(old_priority);
                current.setName(name);
                
                synchronized(AbstractConnector.this)
                {
                    if (_acceptorThread!=null)
                        _acceptorThread[_acceptor]=null;
                }
            }
        }

       accept(_acceptor)最终会调用SelectorManager.SelectSet.doSelect()方法,该方法比较复杂,简单来说就是每接受一个请求就注册到Selector上,并且用SelectChannelEndPoint类(本身也是一个线程)处理请求,SelectChannelEndPoint类的run()方法如下:

    

   public void run()
    {
        try
        {
            _connection.handle();
        }
        catch (ClosedChannelException e)
        {
            Log.ignore(e);
        }
        catch (EofException e)
        {
            Log.debug("EOF", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        }
        catch (HttpException e)
        {
            Log.debug("BAD", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        }
        catch (Throwable e)
        {
            Log.warn("handle failed", e);
            try{close();}
            catch(IOException e2){Log.ignore(e2);}
        }
        finally
        {
            undispatch();
        }
    }

      _connection类的类名为HttpConnection,HttpConnection的handle()方法如下:

 

      

   public void handle() throws IOException
    {
        // Loop while more in buffer
        boolean more_in_buffer = true; // assume true until proven otherwise
        int no_progress = 0;

        while (more_in_buffer)
        {
            try
            {
                synchronized (this)
                {
                    if (_handling)
                        throw new IllegalStateException(); // TODO delete this
                                                           // check
                    _handling = true;
                }

                setCurrentConnection(this);
                long io = 0;

                Continuation continuation = _request.getContinuation();//得到RetryContinuation
                if (continuation != null && continuation.isPending())
                {
                    Log.debug("resume continuation {}",continuation);
                    if (_request.getMethod() == null)
                        throw new IllegalStateException();
                    handleRequest();//处理http请求,执行filter,servlet等
                }
                else//解析http请求
                {
                    // If we are not ended then parse available
                    if (!_parser.isComplete())
                        io = _parser.parseAvailable();

                    // Do we have more generating to do?
                    // Loop here because some writes may take multiple steps and
                    // we need to flush them all before potentially blocking in
                    // the
                    // next loop.
                    while (_generator.isCommitted() && !_generator.isComplete())
                    {
                        long written = _generator.flush();
                        io += written;
                        if (written <= 0)
                            break;
                        if (_endp.isBufferingOutput())
                            _endp.flush();
                    }

                    // Flush buffers
                    if (_endp.isBufferingOutput())
                    {
                        _endp.flush();
                        if (!_endp.isBufferingOutput())
                            no_progress = 0;
                    }

                    if (io > 0)
                        no_progress = 0;
                    else if (no_progress++ >= 2)
                        return;
                }
            }
            catch (HttpException e)
            {
                if (Log.isDebugEnabled())
                {
                    Log.debug("uri=" + _uri);
                    Log.debug("fields=" + _requestFields);
                    Log.debug(e);
                }
                _generator.sendError(e.getStatus(),e.getReason(),null,true);

                _parser.reset(true);
                _endp.close();
                throw e;
            }
            finally
            {
                setCurrentConnection(null);

                more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();

                synchronized (this)
                {
                    _handling = false;

                    if (_destroy)
                    {
                        destroy();
                        return;
                    }
                }

                if (_parser.isComplete() && _generator.isComplete() && !_endp.isBufferingOutput())
                {
                    if (!_generator.isPersistent())
                    {
                        _parser.reset(true);
                        more_in_buffer = false;
                    }

                    if (more_in_buffer)
                    {
                        reset(false);
                        more_in_buffer = _parser.isMoreInBuffer() || _endp.isBufferingInput();
                    }
                    else
                        reset(true);

                    no_progress = 0;
                }

                Continuation continuation = _request.getContinuation();
                if (continuation != null && continuation.isPending())
                {
                    break;
                }else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof SelectChannelEndPoint) // TODO
                        ((SelectChannelEndPoint)_endp).setWritable(false);
            }
        }
    }
1
1
分享到:
评论

相关推荐

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式

    从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式.doc

    jetty相关的全部jar包

    jetty-security-9.4.8.v20171121.jar,jetty-io-9.4.8.v20171121.jar,jetty-continuation-9.4.8.v20171121.jar,jetty-client-9.4.8.v20171121.jar,jetty-jmx-9.4.8.v20171121.jar,jetty-plus-9.4.8.v20171121....

    Jetty中文手册

    处理JVM NIO Bug Rewrite模块 Inversion of Control and Dependency Injection Frameworks Jetty XML IOC 如何使用Spring来配置Jetty 如何使用XBean来配置Jetty 客户端 Asynchronous HTTP Client教程 日志 / 监控 ...

    使用jetty内嵌方式发布jsf的demo

    本demo是使用jetty内嵌的方式发布的jsf的demo 整个demo是工程整个打包,包含所有的jar包,包括使用的jetty9.1.1、JSF所使用的包,以及jsp发布所需。 各位初学这方面东西的朋友,拿到后,可以直接在eclipse中运行

    eclipse jetty插件run-jetty-run-1.3.3

    eclipse jetty插件,从...下载run-jetty-run.zip文件,解压后再编写个links文件丢到eclipse的dropins目录下即可,省去了使用eclipse update方式安装的麻烦。 link文件样例如: path=d:\\eclipse_plugins\\run-jetty-run

    Jetty9 配置使用HTTPS证书

    Jetty9 配置使用HTTPS证书,访问你的服务器更安全,更好的配置方法。

    Jetty cometd(Continuation)学习笔记

    Jetty 7是Jetty奔向Eclipse后发布的第一个版本,本次的Jetty 7 RC2带给了我们一个十分诱人的新特性-支持跨域名Ajax请求。众所周知因为安全的原因,多数浏览器都限制了Ajax跨域请求和javascript加载的时候只能是与...

    基于websocket和jetty8的聊天室demo

    基于jetty8的websocket提炼的能跑的demo

    jetty 8及依赖包

    jetty8以及依赖包,学习的好代码,包括NIO和servlet的实现等

    jetty-5.1.12

    Jetty 是一个开源的servlet容器,它为基于Java的web内容,例如JSP和servlet提供运行环境。Jetty是使用Java语言编写的,它的API以一组JAR包的形式发布。开发人员可以将Jetty容器实例化成一个对象,可以迅速为一些独立...

    Jetty配置支持https

    Jetty配置支持HTTPS以及受信网站证书生成方式

    jetty嵌入Web编程多种实现方式案例

    jetty嵌入Web编程多种实现方式案例

    Jetty多版本软件包

    Jetty软件包内容: jetty-distribution-9.4.51.v20230217.tar.gz jetty-distribution-9.4.51.v20230217.zip jetty-home-10.0.15.tar.gz jetty-home-10.0.15.zip jetty-home-11.0.15.tar.gz jetty-home-11.0.15.zip ...

    jetty6 指南书

    jetty是什么 jetty配置 jetty使用 jetty嵌入 jetty启动 jetty部署 jetty教程 jetty嵌入式 jetty

    jetty-6.1.9 jspweb 服务器

    Jetty 是一个开源的servlet容器,它为基于Java的web内容,例如JSP和servlet提供运行环境。Jetty是使用Java语言编写的,它的API以一组JAR包的形式发布。开发人员可以将Jetty容器实例化成一个对象,可以迅速为一些独立...

    实战 Jetty--让你快速速学会jetty

    2. 利用 Continuation 机制来处理大量的用户请求以及时间比较长的连接。 另外 Jetty 设计了非常良好的接口,因此在 Jetty 的某种实现无法满足用户的需要时,用户可以非常方便地对 Jetty 的某些实现进行修改,使得 ...

    Jetty插件_eclipse

    Eclipse Jetty插件, links方式安装 。Eclipse Jetty插件, links方式安装 。

    jetty-distribution-9.4.47.v20220610

    jetty9.4.47用于部署或替换jar升级jetty解决安全漏洞 下载镜像地址https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-distribution/9.4.47.v20220610/

    jetty 9.2.24

    Jetty是一个纯粹的基于Java的网页服务器和Java Servlet容器。 尽管网页服务器通常用来为人们呈现文档,但是Jetty通常在较大的软件框架中用于计算机与计算机之间的通信。 Jetty作为Eclipse基金会的一部分,是一个自由...

    PDF的JETTY文档

    .jetty

Global site tag (gtag.js) - Google Analytics