This file contains two email messages from me to the inn-workers list. The first contains a patch relative to INN-1.5.1, and the second contains a small fix relative to the first patch. ------------ Date: Wed, 29 Jan 1997 10:29:12 +0200 (GMT+0200) From: Alan Barrett To: inn-workers@vix.com Subject: inn patch: reworked scheduling Message-ID: Here's a patch (relative to innd 1.5.1) that does several things, described in more detail below. The main idea was to rework the scheduling mechanisms to make innd more responsive to new connections, even on a heavily loaded machine. After applying the patche below, I can connect to the nntp port on a server that previously took several minutes before printing its welcome banner, and get the welcome banner in less than 5 seconds. (This is despite the kernel often taking more than 1 second to create a new file in certain large directories like junk and control.cancel, and despite having several tens of incoming streaming connections.) Being more responsive to new connections doesn't help the overall article transfer rate when the disk is maxed out, but at least it stops the administrator from getting annoyed with how long ctlinnd is taking, and it stops news readers and news peers from timing out on the initial connection. --apb (Alan Barrett) --------- * Added a way for each innd channel to have a "Worker" function that gets called from the main select loop (in innd/chan.c) to perform anything that can be done without needing more I/O. The idea is that channels that have a lot of work to do should instead do just a little work, set up a Worker function to do a little more work later, and then relinquish control to give other channels a chance. New functions and variables RRCHANadd(), RRCHANremove(), RRCHANmask and friends keep track of which channels are ready to run. The main select() loop in innd/chan.c calls a channel's Worker function when appropriate. Files: innd/innd.h innd/chan.c * Reworked the way the main select loop in CHANreadloop() in innd/chan.c limits the amount of work it does per cycle. New variables max_work_per_cycle, max_work_this_cycle and work_this_cycle are used in an attempt to ensure that new connections on the control channel (used by ctlinnd) and the remconn channel (used by news readers and peer news servers making new connections to us) get serviced at least once every 5 seconds, regardless of how many other channels have work to do. Also, the code that used to be conditional on "#ifdef PRIORITISE_REMCONN" is now unconditional. Files: innd/chan.c * Reworked the way an NNTP channel limits the amount of work it does per cycle of the main select loop. The old kludge in CHANreadtext() (in innd/chan.c) where it would never read more than BUFSIZE is gone, which should improve the performance when reading large articles. Instead, RCproc() (in innd/rc.c) keeps track of how much work it has done, and when it reaches a #defined limit, it relinquishes control to give another channel a chance. This uses the RRCHAN mechanism introduced above; actually, this was the whole reason why I implemented the RRCHAN mechanism. Some of this code is still protected by "#ifdef LIMIT_WORK_PER_CYCLE", but I suggest removing the #ifdef and #endif lines as aoon as folk are comfortable with it. Files: innd/rc.c * Now that there's an easy way for each channel to have a "Worker" function to do stuff independent of I/O, I took advantage of that to add a "ctlinnd renumber '*'" command, which works like "ctlinnd renumber ''" except does its work in the background. Should change news.daily to use it, but have not done that yet. Should have a way for users to use ctlinnd to sind out whether a background renumber is in progress, but don't have that. Files: frontends/ctlinnd.h doc/ctlinnd.8 innd/icd.c innd/cc.c * Previously, some places in innd/nc.c called NCwritetext and some places called NCwritereply. NCwritetext queued the output without attempting to write it immediately, so the write would happen no sooner than next time around the main select loop (which could be a long time on an overloaded machine). NCwritereply tried to write the output immediately, but queued it if the immediate write was not possible. I wanted output to be written immediately, and I found it confusing to have two functions with such subtle differences, so I changed all calls of NCwritetext to call NCwritereply and deleted the NCwritetext function. That necessitated having NCwritereply call NCwritedone if the write is successful (so that NCwritedone gets a chance to do things like close the channel after a "quit" command). That, in turn, necessitated moving all the assignments to cp->State, making them occur *before* the call to NCwritereply. Files: innd/nc.c * Previously, every time NCwritetext was called, the output buffer would get zapped. This could sometimes result in lost data in streaming mode. Fixed by deleting the NCwritetext function completely, as part of the NCwritereply changes described above. Files: innd/nc.c * Previously, every time RCHANadd() was called, the input buffer would get zapped. This could result in lost data in streaming mode. Fixed by making RCHANadd be more intelligent about when to zap the buffer, and by not calling RCHANremove so often. (Actually, I am not sure why we ever want RCHANadd to zap the buffer.) Files: innd/chan.c innd/nc.c * nntpsend needs a way of passing the "-l" flag to innxmit. Instead of doing that, I changed the default value of logRejects in backends/innxmit.c. Other people will probably not want this. Files: backends/innxmit.c * When you edit config.data and then run a make, it edits the files in the samples directory. That causes make in the site directory to complain about every file, saying "${SRC} has changed; please update ${DEST}", even though nothing significant had changed. I didn't like that, so I removed the site/* files from config/files.list. Files: config/files.list * config/files.list says "Do not run subst on its own manpage!". I inserted some "\&" sequences in the manpage to prevent literal text from being mistaken for subst prototypes. This does not affect the way the man page prints (unless you have a font that wants to do special kerning or ligatures for "()", "@<" or ">@", in which case that will no longer happen). Files: doc/subst.1 --------------- Index: backends/innxmit.c --- 151/inn-1.5.1/backends/innxmit.c Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/backends/innxmit.c Tue Jan 28 17:54:56 1997 @@ -76,7 +76,7 @@ static int DoCheck = TRUE; /* Should check before takethis? */ static char modestream[] = "mode stream"; static long retries = 0; -static int logRejects = TRUE ; /* syslog the 437 responses. */ +static int logRejects = FALSE ; /* syslog the 437 responses. */ Index: config/files.list --- 151/inn-1.5.1/config/files.list Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/config/files.list Tue Jan 28 17:55:57 1997 @@ -65,44 +65,49 @@ ../lib/Makefile ../makedirs.sh ../nnrpd/Makefile -../samples/actsync.cfg -;; ../samples/actsync.ign -../samples/checkgroups -../samples/default -../samples/docheckgroups -../samples/expirerm -../samples/ihave -../samples/inncheck -../samples/innstat -../samples/innwatch -../samples/innwatch.ctl -../samples/innshellvars -../samples/innshellvars.csh -../samples/innshellvars.pl -../samples/innshellvars.tcl -;; ../samples/makegroup -../samples/newgroup -../samples/news.daily -../samples/nntpsend -../samples/parsecontrol -../samples/pgpverify -../samples/rc.news -../samples/rmgroup -../samples/scanlogs -../samples/scanspool -../samples/send-ihave -../samples/send-nntp -../samples/send-uucp -../samples/sendbatch -../samples/sendme -../samples/sendsys -../samples/senduuname -../samples/startup.tcl -;; ../samples/filter.tcl -../samples/tally.control -../samples/tally.unwanted -../samples/version -../samples/writelog +;; don't include any samples/* stuff here, +;; because if we do then changes to config.data +;; cause irrelevant changes to samples/*, which causes +;; "make" in the "site" directory to print irrelevant +;; complaints about "../site/filename has changed" +; ../samples/actsync.cfg +; ;; ../samples/actsync.ign +; ../samples/checkgroups +; ../samples/default +; ../samples/docheckgroups +; ../samples/expirerm +; ../samples/ihave +; ../samples/inncheck +; ../samples/innstat +; ../samples/innwatch +; ../samples/innwatch.ctl +; ../samples/innshellvars +; ../samples/innshellvars.csh +; ../samples/innshellvars.pl +; ../samples/innshellvars.tcl +; ;; ../samples/makegroup +; ../samples/newgroup +; ../samples/news.daily +; ../samples/nntpsend +; ../samples/parsecontrol +; ../samples/pgpverify +; ../samples/rc.news +; ../samples/rmgroup +; ../samples/scanlogs +; ../samples/scanspool +; ../samples/send-ihave +; ../samples/send-nntp +; ../samples/send-uucp +; ../samples/sendbatch +; ../samples/sendme +; ../samples/sendsys +; ../samples/senduuname +; ../samples/startup.tcl +; ;; ../samples/filter.tcl +; ../samples/tally.control +; ../samples/tally.unwanted +; ../samples/version +; ../samples/writelog ../site/Makefile ../syslog/syslog.c ../syslog/syslog.conf Index: doc/ctlinnd.8 --- 151/inn-1.5.1/doc/ctlinnd.8 Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/doc/ctlinnd.8 Tue Jan 28 17:26:20 1997 @@ -397,6 +397,19 @@ If .I group is an empty string then all newsgroups are scanned. +Scanning all the newsgroups can take a +long time if there are many newsgroups, and during this time +.I innd +wil not service any other channels or respond to new connection +attempts. To avoid a long delay during which +.I innd +does no other useful work, +.I group +can be specified as an asterisk (``*'') instead of as an empty string; +in that case, all newsgroups are scanned, but +.I innd +will not scan more than one newsgroup before checking for other work +on other channels. .TP .BI reserve " reason" The next ``pause'' or ``throttle'' command must use Index: doc/subst.1 --- 151/inn-1.5.1/doc/subst.1 Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/doc/subst.1 Fri Jan 24 18:49:50 1997 @@ -42,11 +42,11 @@ The prototype line should be delimited in such a way that it will be taken as a comment by whatever program processes the file later. The prototype line must contain a ``prototype'' of the target line bracketed -by `=()<' and `>()='; +by `=(\&)<' and `>(\&)='; everything else on the prototype line is ignored. .I Subst extracts the prototype, changes all instances of substitution names -bracketed by `@<' and `>@' to their values, +bracketed by `@\&<' and `>\&@' to their values, and then replaces the target line with the result. .SH OPTIONS .TP @@ -76,7 +76,7 @@ .RS .nf x = 2; -/* =()@ + @@;>()= */ +/* =(\&)\&@ + @\&\&@;>(\&)= */ y = 88 + 99; z = 5; .fi @@ -87,7 +87,7 @@ .RS .nf x = 2; -/* =()@ + @@;>()= */ +/* =(\&)\&@ + @\&\&@;>(\&)= */ y = 111 + 222; z = 5; .fi Index: frontends/ctlinnd.c --- 151/inn-1.5.1/frontends/ctlinnd.c Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/frontends/ctlinnd.c Tue Jan 28 15:50:47 1997 @@ -80,7 +80,7 @@ 1, SC_REJECT, TRUE }, { "reload", "what reason...\t\tRe-read config files*", 2, SC_RELOAD, TRUE }, - { "renumber", "group\t\tRenumber the active file*", + { "renumber", "group|''|'*'\t\tRenumber the active file*", 1, SC_RENUMBER, FALSE }, { "reserve", "reason...\t\tReserve the next pause or throttle", 1, SC_RESERVE, TRUE }, Index: innd/cc.c --- 151/inn-1.5.1/innd/cc.c Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/innd/cc.c Mon Jan 27 11:32:18 1997 @@ -1307,6 +1307,22 @@ /* ** Renumber the active file. +** +** av[0] is "" to renumber all newsgroups in the foreground. +** av[0] is "*" to renumber all newsgroups in the background. +** av[0] is a newsgroup name to renumber just that one newsgroup. +** +** Renumbering all newsgroups in the foreground takes a long time, +** and innd will not do any other work until the renumber is finished. +** +** Renumbering all newsgroups in the background is achieved by renumbering +** one newsgroup at a time, and allowing other I/O channels to be +** serviced in between groups. +** +** XXX There should be a way for a user to learn whether a +** background renumber has finished, and to terminate a background +** renumber before it has finished. A few more ctlinnd commands... +** */ STATIC STRING CCrenumber(av) @@ -1322,12 +1338,24 @@ return "1 Must first reload newsfeeds"; p = av[0]; if (*p) { + if (*p == '*' && *(p+1) == '\0') { + /* renumber all groups in background */ + if (CHANready(CCchan)) + return "1 Sorry, control channel already has work in progress"; + else { + if (!ICDrenumber_begin(CCchan)) + return CANTRENUMBER; + return "0 Ok (work will continue in the background)"; + } + } + /* renumber a single group */ if ((ngp = NGfind(p)) == NULL) return CCnogroup; if (!NGrenumber(ngp)) return CANTRENUMBER; } - else if (!ICDrenumberactive()) + /* renumber all groups in foreground */ + if (!ICDrenumberactive()) return CANTRENUMBER; return NULL; } Index: innd/chan.c --- 151/inn-1.5.1/innd/chan.c Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/innd/chan.c Tue Jan 28 18:17:50 1997 @@ -13,23 +13,27 @@ readloop */ #define COMP_THRESHOLD 10 -STATIC FDSET RCHANmask; -STATIC FDSET SCHANmask; -STATIC FDSET WCHANmask; +STATIC FDSET RCHANmask; /* Channels that want to read */ +STATIC FDSET SCHANmask; /* Channels that are sleeping */ +STATIC FDSET WCHANmask; /* Channels that want to write */ +STATIC FDSET RRCHANmask; /* Channels that are ready to run. + * This happens when a channel has more + * work to do, but voluntarily stops + * to give other channels a chance. + * It's normal for channels to be in read + * and/or write state concurrently with + * being in ready state. */ STATIC int SCHANcount; +STATIC int RRCHANcount; STATIC int CHANlastfd; STATIC int CHANlastsleepfd; STATIC int CHANccfd; +STATIC int CHANrcfd; STATIC int CHANtablesize; STATIC CHANNEL *CHANtable; STATIC CHANNEL *CHANcc; -STATIC CHANNEL CHANnull = { CTfree, CSerror, -1 }; - -#define PRIORITISE_REMCONN -#ifdef PRIORITISE_REMCONN -STATIC int CHANrcfd; STATIC CHANNEL *CHANrc; -#endif /* PRIORITISE_REMCONN */ +STATIC CHANNEL CHANnull = { CTfree, CSerror, -1 }; /* @@ -90,6 +94,7 @@ register CHANNEL *cp; FD_ZERO(&RCHANmask); + FD_ZERO(&RRCHANmask); FD_ZERO(&SCHANmask); FD_ZERO(&WCHANmask); if (CHANtable) @@ -170,13 +175,11 @@ CHANcc = cp; CHANccfd = fd; } -#ifdef PRIORITISE_REMCONN /* Note remconn channel, for efficiency */ if (Type == CTremconn) { CHANrc = cp; CHANrcfd = fd; } -#endif /* PRIORITISE_REMCONN */ return cp; } @@ -208,6 +211,9 @@ if (FD_ISSET(cp->fd, &WCHANmask)) syslog(L_NOTICE, "%s trace writing %d %s", p, cp->Out.Left, MaxLength(cp->Out.Data, cp->Out.Data)); + if (FD_ISSET(cp->fd, &RRCHANmask)) + syslog(L_NOTICE, "%s trace ready to run 0x%x", + p, cp->Worker); } } @@ -237,9 +243,13 @@ WCHANremove(cp); RCHANremove(cp); SCHANremove(cp); - if (cp->Argument != NULL) + RRCHANremove(cp); + if (cp->WakerArgument != NULL) + /* Set to NULL below. */ + DISPOSE(cp->WakerArgument); + if (cp->WorkerArgument != NULL) /* Set to NULL below. */ - DISPOSE(cp->Argument); + DISPOSE(cp->WorkerArgument); if (cp->fd >= 0 && close(cp->fd) < 0) syslog(L_ERROR, "%s cant close %s %m", LogName, name); } @@ -248,7 +258,8 @@ cp->Type = CTfree; cp->State = CSerror; cp->fd = -1; - cp->Argument = NULL; + cp->WorkerArgument = NULL; + cp->WakerArgument = NULL; /* Free the buffers if they got big. */ if (cp->In.Size > BIG_BUFFER) { @@ -377,12 +388,19 @@ RCHANadd(cp) register CHANNEL *cp; { - FD_SET(cp->fd, &RCHANmask); - if (cp->fd > CHANlastfd) - CHANlastfd = cp->fd; + if (!FD_ISSET(cp->fd, &RCHANmask)) { + FD_SET(cp->fd, &RCHANmask); + if (cp->fd > CHANlastfd) + CHANlastfd = cp->fd; - /* Start reading at the beginning of the buffer. */ - cp->In.Used = 0; + /* Start reading at the beginning of the buffer. + * + * Note: This zaps anything that was already in the input buffer. + * If that's not what you want, then don't call RCHANremove while + * the input buffer contains data that you don't want zapped. + */ + cp->In.Used = 0; + } } @@ -408,15 +426,15 @@ /* ** Put a channel to sleep, call a function when it wakes. -** Note that the Argument must be NULL or allocated memory! +** Note that the WakerArgument must be NULL or allocated memory! */ void -SCHANadd(cp, Waketime, Event, Waker, Argument) +SCHANadd(cp, Waketime, Event, Waker, WakerArgument) register CHANNEL *cp; time_t Waketime; POINTER Event; FUNCPTR Waker; - POINTER Argument; + POINTER WakerArgument; { if (!FD_ISSET(cp->fd, &SCHANmask)) { SCHANcount++; @@ -426,9 +444,9 @@ CHANlastsleepfd = cp->fd; cp->Waketime = Waketime; cp->Waker = Waker; - if (cp->Argument != Argument) { - DISPOSE(cp->Argument); - cp->Argument = Argument; + if (cp->WakerArgument != WakerArgument) { + DISPOSE(cp->WakerArgument); + cp->WakerArgument = WakerArgument; } cp->Event = Event; } @@ -534,6 +552,55 @@ WCHANset(cp, &bp->Data[bp->Used], bp->Left); } + +/* +** Mark a channel as ready to run. +** The Worker function will be called from time to time. +** +** Note that the WorkerArgument must be NULL or allocated memory! +*/ +void +RRCHANadd(cp, Worker, WorkerArgument) + register CHANNEL *cp; + FUNCPTR Worker; + POINTER WorkerArgument; +{ + if (!FD_ISSET(cp->fd, &RRCHANmask)) { + RRCHANcount++; + FD_SET(cp->fd, &RRCHANmask); + } + cp->Worker = Worker; + if (cp->WorkerArgument != WorkerArgument) { + DISPOSE(cp->WorkerArgument); + cp->WorkerArgument = WorkerArgument; + } +} + + +/* +** Remove a channel from the ready to run state. +*/ +void +RRCHANremove(cp) + register CHANNEL *cp; +{ + if (FD_ISSET(cp->fd, &RRCHANmask)) { + FD_CLR(cp->fd, &RRCHANmask); + RRCHANcount--; + } +} + +/* +** Is a channel on the ready list? +*/ +BOOL +CHANready(cp) + CHANNEL *cp; +{ + return FD_ISSET(cp->fd, &RRCHANmask); +} + + /* @@ -557,25 +624,9 @@ RENEW(bp->Data, char, bp->Size); } - /* Read in whatever is there, up to some reasonable limit. */ - /* - * XXX We really want to limit the amount of time it takes to - * process the incoming data for this channel. But there's - * no easy way of doing that, so we restrict the data size instead. - * If the data is part of a single large article, then reading - * and processing many kilobytes at a time costs very little. - * If the data is a long list of CHECK commands from a streaming - * feed, then every line of data will require a history lookup, and - * we probably don't want to do more than about 10 of those per - * channel on each cycle of the main select() loop (otherwise we - * might take too long before giving other channels a turn). 10 - * lines of CHECK commands suggests a limit of about 1 kilobyte of - * data, or less. BUFSIZ is often about 1 kilobyte, and is - * attractive for other reasons, so let's use that as our size limit. - */ + /* Read in whatever is there. */ bp->Left = bp->Size - bp->Used; - i = read(cp->fd, &bp->Data[bp->Used], - (bp->Left - 1 > BUFSIZ ? BUFSIZ : bp->Left - 1)); + i = read(cp->fd, &bp->Data[bp->Used], (bp->Left - 1)); if (i < 0) { #ifdef POLL_BUG /* return of -2 indicates EAGAIN, for SUNOS5.4 poll() bug workaround */ @@ -769,6 +820,9 @@ register int startpoint; register int count; register int lastfd; + register int work_this_cycle; + int max_work_per_cycle; + int max_work_this_cycle; int oerrno; register CHANNEL *cp; register BUFFER *bp; @@ -778,16 +832,47 @@ long silence; char *p; time_t LastUpdate; + time_t LastCycle; - LastUpdate = GetTimeInfo(&Now) < 0 ? 0 : Now.time; + /* + * If we do too much work on every cycle of the select() loop, + * then it can take a very long time for new connections + * to get their welcome banner, and for control commands + * to be acted upon. That would be bad. + * + * So we limit the number of times we are prepared to call out + * to any channel's Reader, Writer, Worker or Waker function + * on each cycle of the main loop. We adjust max_work_per_cycle + * dynamically, in an attempt to keep the time per cycle of + * the main loop down to less than 5 seconds. (That means that + * we try to check for new control commands or new incoming + * connection requests at least every 5 seconds. If we're very + * busy it can take much longer than a few seconds to respond to + * commands on already established connections, and there's not + * much we can do about that.) + * + * max_work_this cycle is initialised from max_work_per cycle, + * but may be reduced when there's activity on the control + * channel or other special channels. + */ + max_work_this_cycle = max_work_per_cycle = 30; + work_this_cycle = 0; + + LastCycle = LastUpdate = GetTimeInfo(&Now) < 0 ? 0 : Now.time; for ( ; ; ) { /* See if any processes died. */ PROCscan(); - /* Wait for data, note the time. */ + /* Wait for data, note the time. + * If there's other work that we could be doing without + * needing any I/O first, then poll instead of blocking. + */ MyRead = RCHANmask; MyWrite = WCHANmask; - MyTime = TimeOut; + if (RRCHANcount == 0) + MyTime = TimeOut; + else + MyTime.tv_sec = MyTime.tv_usec = 0; count = select(CHANlastfd + 1, &MyRead, &MyWrite, (FDSET *)NULL, &MyTime); if (GotTerminate) { @@ -816,39 +901,59 @@ LastUpdate = Now.time; } + /* Adjust max_work_per_cycle if necessary, + * and reset work_this_cycle and max_work_this_cycle. + */ + if (Now.time - LastCycle > 5) { + /* cycle was slower than 5 seconds. + * decrease max_work_per_cycle, but not to less than 5. + */ + if (max_work_per_cycle > 5) + --max_work_per_cycle; + } else if (Now.time - LastCycle < 2) { + /* cycle was quicker than 2 seconds. + * increase max_work_per_cycle if that's limiting us. + */ + if (work_this_cycle >= max_work_per_cycle) + ++max_work_per_cycle; + } + max_work_this_cycle = max_work_per_cycle; + work_this_cycle = 0; + LastCycle = Now.time; + if (count == 0) { /* No channels active, so flush and skip if nobody's - * sleeping. */ + * sleeping or ready to run. */ if (Mode == OMrunning) ICDwrite(); - if (SCHANcount == 0) + if (SCHANcount == 0 && RRCHANcount == 0) continue; } /* Try the control channel first. */ if (FD_ISSET(CHANccfd, &RCHANmask) && FD_ISSET(CHANccfd, &MyRead)) { count--; - if (count > 3) count = 3; /* might be more requests */ + work_this_cycle++; + max_work_this_cycle = 4; /* might be more requests */ (*CHANcc->Reader)(CHANcc); FD_CLR(CHANccfd, &MyRead); } -#ifdef PRIORITISE_REMCONN /* Try the remconn channel next. */ if (FD_ISSET(CHANrcfd, &RCHANmask) && FD_ISSET(CHANrcfd, &MyRead)) { count--; - if (count > 3) count = 3; /* might be more requests */ + work_this_cycle++; + max_work_this_cycle = 4; /* might be more requests */ (*CHANrc->Reader)(CHANrc); FD_CLR(CHANrcfd, &MyRead); } -#endif /* PRIORITISE_REMCONN */ - /* Loop through all active channels. Somebody could have closed + /* Loop through all active channels. Somebody could have * closed a channel so we double-check the global mask before * looking at what select returned. The code here is written so - * that a channel could be reading and writing and sleeping at the - * same time, even though that's not possible. (Just as well, - * since in SysVr4 the count would be wrong.) */ + * that a channel could be reading and writing and sleeping and + * ready to run at the same time, even though that's impossible. + * (Just as well, since in SysVr4 the count would be wrong.) */ lastfd = CHANlastfd; if (lastfd < CHANlastsleepfd) lastfd = CHANlastsleepfd; @@ -858,6 +963,28 @@ do { cp = &CHANtable[fd]; + /* Ready to run without I/O ? + * If so, then: + * - remove channel from the ready list; + * - call the Worker function (which may put it back onto + * the ready list if appropriate). + */ + if (FD_ISSET(fd, &RRCHANmask)) { + if (cp->Type == CTfree) { + syslog(L_ERROR, "%s %d free but was in RRMASK", + CHANname(cp), fd); + /* Don't call RRCHANremove since cp->fd will be -1. */ + FD_CLR(fd, &RRCHANmask); + (void)close(fd); + } + else { + work_this_cycle++; + cp->LastActive = Now.time; + RRCHANremove(cp); + (*cp->Worker)(cp); + } + } + /* Anything to read? */ if (FD_ISSET(fd, &RCHANmask) && FD_ISSET(fd, &MyRead)) { count--; @@ -869,6 +996,7 @@ (void)close(fd); } else { + work_this_cycle++; cp->LastActive = Now.time; (*cp->Reader)(cp); } @@ -890,6 +1018,7 @@ (void)close(fd); } else { + work_this_cycle++; bp = &cp->Out; if (bp->Left) { cp->LastActive = Now.time; @@ -948,6 +1077,7 @@ FD_CLR(fd, &SCHANmask); (void) close(fd); } else { + work_this_cycle++; cp->LastActive = Now.time; SCHANremove(cp); (*cp->Waker)(cp); @@ -981,8 +1111,10 @@ else fd++; - /* If there is nothing to do, break out. */ - if (count == 0 && SCHANcount == 0) + /* If there is nothing more to do, or if we have already done + * as much as we are allowed to do per cycle, break out. */ + if (work_this_cycle >= max_work_per_cycle || + (count == 0 && SCHANcount == 0 && RRCHANcount == 0)) break; } while (fd != startpoint); Index: innd/icd.c --- 151/inn-1.5.1/innd/icd.c Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/innd/icd.c Tue Jan 28 18:22:58 1997 @@ -14,6 +14,11 @@ typedef struct iovec IOVEC; +typedef struct { + int i; + NEWSGROUP *ngp; +} renumber_worker_arg; + STATIC char ICDactpath[] = _PATH_ACTIVE; STATIC char *ICDactpointer; @@ -133,6 +138,11 @@ /* ** Scan the active file, and renumber the min/max counts. +** +** This can take a long time, during which the news server +** will not respond to any other connections. See +** ICDrenumber_begin() and ICDrenumber_continue() for an +** alternative. */ BOOL ICDrenumberactive() @@ -146,6 +156,61 @@ if (i < 0) ICDwrite(); return TRUE; +} + +/* forward declaration */ +STATIC FUNCTYPE ICDrenumber_continue(); + +/* +** Scan the active file, and renumber the min/max counts in the background. +** +** ICDrenumber_begin sets things up, and ICDrenumber_continue +** renumbers one group at a time. +*/ +BOOL +ICDrenumber_begin(cp) + CHANNEL *cp; +{ + renumber_worker_arg *p; + + p = NEW(renumber_worker_arg, 1); + if (p == NULL) + return FALSE; + p->i = nGroups; + p->ngp = Groups; + RRCHANadd(cp, ICDrenumber_continue, (POINTER)p); + return TRUE; +} + +/* +** Renumber the next group, and reschedule ourself to get to +** more groups later, if necessary. +*/ +STATIC FUNCTYPE +ICDrenumber_continue(cp) + CHANNEL *cp; +{ + renumber_worker_arg *p; + + p = CAST(renumber_worker_arg *, cp->WorkerArgument); + if (p == NULL) { + /* XXX should syslog the error */ + RRCHANremove(cp); + return; + } + if (!NGrenumber(p->ngp)) { + /* XXX should syslog the error */ + RRCHANremove(cp); + return; + } + p->ngp++; + if (--p->i >= 0) + RRCHANadd(cp, ICDrenumber_continue, (POINTER)p); + else { + ICDwrite(); + RRCHANremove(cp); + } + return; } Index: innd/innd.h --- 151/inn-1.5.1/innd/innd.h Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/innd/innd.h Tue Jan 28 17:35:30 1997 @@ -18,6 +18,8 @@ ** PS Process state ** RC Remote NNTP connection-receiving channel ** RCHAN A channel in "read" state +** RRCHAN A channel in "ready to run" state +** SCHAN A channel in "sleep" state ** SITE Something that gets told when we get an article ** WCHAN A channel in "write" state */ @@ -149,12 +151,19 @@ time_t Waketime; time_t Started; FUNCPTR Waker; - POINTER Argument; + FUNCPTR Worker; + POINTER WakerArgument; + POINTER WorkerArgument; POINTER Event; BUFFER In; BUFFER Out; BOOL Tracing; BUFFER Sendid; + /* the following should really be local variables in NCproc(), + * until such time as the BUFFER stuff gets thoroughly reworked + * to accommodate streaming better. I decided to leave these + * as part of CHANNEL for now, to make my patches smaller. + * --apb 1997-01-25 */ int Lastch; int Rest; int SaveUsed; @@ -468,6 +477,7 @@ extern void BUFFset(); extern void BUFFswap(); +extern BOOL CHANready(); extern BOOL CHANsleeping(); extern CHANNEL *CHANcreate(); extern CHANNEL *CHANiter(); @@ -479,6 +489,9 @@ extern void CHANsetup(); extern void CHANtracing(); +extern void RRCHANadd(); +extern void RRCHANremove(); + extern void RCHANadd(); extern void RCHANremove(); @@ -509,6 +522,7 @@ extern BOOL ICDchangegroup(); extern void ICDclose(); extern BOOL ICDrenumberactive(); +extern BOOL ICDrenumber_begin(); extern BOOL ICDrmgroup(); extern void ICDsetup(); extern void ICDwrite(); Index: innd/nc.c --- 151/inn-1.5.1/innd/nc.c Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/innd/nc.c Tue Jan 28 18:37:28 1997 @@ -11,6 +11,7 @@ #include "dbz.h" +#define LIMIT_WORK_PER_CYCLE 15 #define BAD_COMMAND_COUNT 10 #define WIP_CHECK (1 * 60) #define SAVE_AMT 10 @@ -126,25 +127,16 @@ /* ** Write an NNTP reply message. -*/ -STATIC void -NCwritetext(cp, text) - CHANNEL *cp; - char *text; -{ - RCHANremove(cp); - WCHANset(cp, text, (int)strlen(text)); - WCHANappend(cp, NCterm, STRLEN(NCterm)); - WCHANadd(cp); - if (Tracing || cp->Tracing) - syslog(L_TRACE, "%s > %s", CHANname(cp), text); -} - - -/* -** Write an NNTP reply message. -** Call only when we will stay in NCreader mode. -** Tries to do the actual write if it will not block. +** +** Tries to do the actual write immediately if it will not block +** and if there is not already other buffered output. Then, if the +** write is successful, calls NCwritedone (which does whatever is +** necessary to accommodate state changes). Else, NCwritedone will +** be called from the main select loop later. +** +** If the reply that we are writing now is associated with a +** state change, then cp->State must be set to its new value +** *before* NCwritereply is called. */ STATIC void NCwritereply(cp, text) @@ -154,6 +146,11 @@ register BUFFER *bp; register int i; + /* XXX could do RCHANremove(cp) here, as the old NCwritetext() used to + * do, but that would be wrong if the channel is sreaming (because it + * would zap the channell's input buffer). There's no harm in + * never calling RCHANremove here. */ + bp = &cp->Out; i = bp->Left; WCHANappend(cp, text, (int)strlen(text)); /* text in buffer */ @@ -164,11 +161,14 @@ syslog(L_TRACE, "%s NCwritereply %d=write(%d, \"%.15s\", %d)", CHANname(cp), i, cp->fd, &bp->Data[bp->Used], bp->Left); if (i > 0) bp->Used += i; - if (bp->Used == bp->Left) bp->Used = bp->Left = 0; + if (bp->Used == bp->Left) { + /* all the data was written */ + bp->Used = bp->Left = 0; + NCwritedone(cp); + } else i = 0; } else i = 0; if (i <= 0) { /* write failed, queue it for later */ - RCHANremove(cp); WCHANadd(cp); } if (Tracing || cp->Tracing) @@ -183,13 +183,19 @@ CHANNEL *cp; char *text; { - RCHANremove(cp); + cp->State = CSwritegoodbye; + RCHANremove(cp); /* we're not going to read anything more */ +#if 0 + /* XXX why would we want to zap whatever was already + * in the output buffer? */ WCHANset(cp, NNTP_GOODBYE, STRLEN(NNTP_GOODBYE)); +#else + WCHANappend(cp, NNTP_GOODBYE, STRLEN(NNTP_GOODBYE)); +#endif WCHANappend(cp, " ", 1); WCHANappend(cp, text, (int)strlen(text)); WCHANappend(cp, NCterm, STRLEN(NCterm)); WCHANadd(cp); - cp->State = CSwritegoodbye; } @@ -204,7 +210,7 @@ if (ARTidok(p)) return FALSE; - NCwritetext(cp, NNTP_HAVEIT_BADID); + NCwritereply(cp, NNTP_HAVEIT_BADID); syslog(L_NOTICE, "%s bad_messageid %s", CHANname(cp), MaxLength(p, p)); return TRUE; } @@ -255,8 +261,8 @@ cp->Received, cp->Refused, cp->Rejected); cp->Reported = 0; } - NCwritereply(cp, response); cp->State = CSgetcmd; + NCwritereply(cp, response); break; case OMthrottled: @@ -322,12 +328,12 @@ /* Get the article filenames, and the article header+body. */ if ((art = ARTreadarticle(HISfilesfor(p))) == NULL) { - NCwritetext(cp, NNTP_DONTHAVEIT); + NCwritereply(cp, NNTP_DONTHAVEIT); return; } /* Write it. */ - NCwritetext(cp, NNTP_ARTICLE_FOLLOWS); + NCwritereply(cp, NNTP_ARTICLE_FOLLOWS); for (p = art; ((q = strchr(p, '\n')) != NULL); p = q + 1) { if (*p == '.') WCHANappend(cp, ".", 1); @@ -360,12 +366,12 @@ /* Get the article filenames, and the header. */ if ((head = ARTreadheader(HISfilesfor(p))) == NULL) { - NCwritetext(cp, NNTP_DONTHAVEIT); + NCwritereply(cp, NNTP_DONTHAVEIT); return; } /* Write it. */ - NCwritetext(cp, NNTP_HEAD_FOLLOWS); + NCwritereply(cp, NNTP_HEAD_FOLLOWS); for (p = head; ((q = strchr(p, '\n')) != NULL); p = q + 1) { if (*p == '.') WCHANappend(cp, ".", 1); @@ -397,13 +403,13 @@ /* Get the article filenames; read the header (to make sure not * the article is still here). */ if (ARTreadheader(HISfilesfor(p)) == NULL) { - NCwritetext(cp, NNTP_DONTHAVEIT); + NCwritereply(cp, NNTP_DONTHAVEIT); return; } /* Write the message. */ (void)sprintf(buff, "%d 0 %s", NNTP_NOTHING_FOLLOWS_VAL, p); - NCwritetext(cp, buff); + NCwritereply(cp, buff); } @@ -430,7 +436,7 @@ /* Otherwise, make sure we're only getting "authinfo" commands. */ if (!caseEQn(p, AUTHINFO, STRLEN(AUTHINFO))) { - NCwritetext(cp, NNTP_AUTH_NEEDED); + NCwritereply(cp, NNTP_AUTH_NEEDED); return; } for (p += STRLEN(AUTHINFO); ISWHITE(*p); p++) @@ -439,13 +445,13 @@ /* Ignore "authinfo user" commands, since we only care about the * password. */ if (caseEQn(p, USER, STRLEN(USER))) { - NCwritetext(cp, NNTP_AUTH_NEXT); + NCwritereply(cp, NNTP_AUTH_NEXT); return; } /* Now make sure we're getting only "authinfo pass" commands. */ if (!caseEQn(p, PASS, STRLEN(PASS))) { - NCwritetext(cp, NNTP_AUTH_NEEDED); + NCwritereply(cp, NNTP_AUTH_NEEDED); return; } for (p += STRLEN(PASS); ISWHITE(*p); p++) @@ -453,12 +459,12 @@ /* Got the password -- is it okay? */ if (!RCauthorized(cp, p)) { - NCwritetext(cp, NNTP_AUTH_BAD); cp->State = CSwritegoodbye; + NCwritereply(cp, NNTP_AUTH_BAD); } else { - NCwritetext(cp, NNTP_AUTH_OK); cp->State = CSgetcmd; + NCwritereply(cp, NNTP_AUTH_OK); } } @@ -509,7 +515,7 @@ static char LINE2[] = "\" at this machine."; register NCDISPATCH *dp; - NCwritetext(cp, NNTP_HELP_FOLLOWS); + NCwritereply(cp, NNTP_HELP_FOLLOWS); for (dp = NCcommands; dp < ENDOF(NCcommands); dp++) if (dp->Function != NC_unimp) { if (!StreamingOff || cp->Streaming || @@ -539,22 +545,22 @@ { WIP *who; - if (HIShavearticle(cp->Argument)) { + if (HIShavearticle(cp->WakerArgument)) { cp->Refused++; - NCwritetext(cp, NNTP_HAVEIT); - DISPOSE(cp->Argument); - cp->Argument = NULL; + NCwritereply(cp, NNTP_HAVEIT); + DISPOSE(cp->WakerArgument); + cp->WakerArgument = NULL; } - else if (NCinprogress(cp, cp->Argument, &who)) { + else if (NCinprogress(cp, cp->WakerArgument, &who)) { who->Wanted = TRUE; SCHANadd(cp, (time_t)(Now.time + WIP_CHECK / 2 + 1), (POINTER)who, - NCwaitfor, (POINTER)cp->Argument); + NCwaitfor, (POINTER)cp->WakerArgument); } else { - NCwritetext(cp, NNTP_SENDIT); cp->State = CSgetarticle; - DISPOSE(cp->Argument); - cp->Argument = NULL; + NCwritereply(cp, NNTP_SENDIT); + DISPOSE(cp->WakerArgument); + cp->WakerArgument = NULL; } } #endif /* !defined(NNTP_RESENDIT_LATER) */ @@ -572,7 +578,7 @@ WIP *who; if (AmSlave) { - NCwritetext(cp, NCbadcommand); + NCwritereply(cp, NCbadcommand); return; } @@ -584,11 +590,11 @@ if (HIShavearticle(p)) { cp->Refused++; - NCwritetext(cp, NNTP_HAVEIT); + NCwritereply(cp, NNTP_HAVEIT); } else if (NCinprogress(cp, p, &who)) { #if defined(NNTP_RESENDIT_LATER) - NCwritetext(cp, NNTP_RESENDIT_LATER); + NCwritereply(cp, NNTP_RESENDIT_LATER); #else /* Somebody else is sending it to us; wait until they're done. */ who->Wanted = TRUE; @@ -599,8 +605,8 @@ #endif /* defined(NNTP_RESENDIT_LATER) */ } else { - NCwritetext(cp, NNTP_SENDIT); cp->State = CSgetarticle; + NCwritereply(cp, NNTP_SENDIT); } } @@ -616,7 +622,7 @@ WIP *wp; if (AmSlave) { - NCwritetext(cp, NCbadcommand); + NCwritereply(cp, NCbadcommand); return; } @@ -642,7 +648,7 @@ if (wp->Size <= 0) { syslog(L_NOTICE, "%s got bad xbatch size %ld", CHANname(cp), wp->Size); - NCwritetext(cp, NNTP_XBATCH_BADSIZE); + NCwritereply(cp, NNTP_XBATCH_BADSIZE); return; } @@ -663,8 +669,8 @@ cp->In.Data = NEW(char, cp->In.Size); } #endif - NCwritetext(cp, NNTP_CONT_XBATCH); cp->State = CSgetxbatch; + NCwritereply(cp, NNTP_CONT_XBATCH); } /* @@ -694,12 +700,12 @@ trash = NULL; } else { - NCwritetext(cp, NCbadcommand); + NCwritereply(cp, NCbadcommand); return; } /* Loop over all lines, sending the text and \r\n. */ - NCwritetext(cp, NNTP_LIST_FOLLOWS); + NCwritereply(cp, NNTP_LIST_FOLLOWS); for (; p < end && (q = strchr(p, '\n')) != NULL; p = q + 1) { WCHANappend(cp, p, q - p); WCHANappend(cp, NCterm, STRLEN(NCterm)); @@ -732,12 +738,12 @@ (!StreamingOff || (StreamingOff && cp->Streaming))) { char buff[16]; (void)sprintf(buff, "%d StreamOK.", NNTP_OK_STREAM_VAL); - NCwritetext(cp, buff); + NCwritereply(cp, buff); syslog(L_NOTICE, "%s NCmode \"mode stream\" received", CHANname(cp)); return; } else { - NCwritetext(cp, NCbadcommand); + NCwritereply(cp, NCbadcommand); return; } RChandoff(cp->fd, h); @@ -770,8 +776,8 @@ wp->MessageID = NULL; } #endif /* 0 */ - NCwritetext(cp, NNTP_GOODBYE_ACK); cp->State = CSwritegoodbye; + NCwritereply(cp, NNTP_GOODBYE_ACK); } @@ -793,7 +799,7 @@ return; if ((p = HISfilesfor(p)) == NULL) { - NCwritetext(cp, NNTP_DONTHAVEIT); + NCwritereply(cp, NNTP_DONTHAVEIT); return; } i = 3 + 1 + strlen(p); @@ -806,7 +812,7 @@ RENEW(Reply.Data, char, i + 1); } (void)sprintf(Reply.Data, "%d %s", NNTP_NOTHING_FOLLOWS_VAL, p); - NCwritetext(cp, Reply.Data); + NCwritereply(cp, Reply.Data); } @@ -822,7 +828,7 @@ register int i; if (!RCismaster(cp->Address)) { - NCwritetext(cp, NCbadcommand); + NCwritereply(cp, NCbadcommand); return; } @@ -839,8 +845,8 @@ bp->Used = bp->Left; /* Tell master to send it to us. */ - NCwritetext(cp, NNTP_SENDIT); cp->State = CSgetrep; + NCwritereply(cp, NNTP_SENDIT); } @@ -860,7 +866,7 @@ *p = '\0'; (void)sprintf(buff, "%d \"%s\" not implemented; try \"help\".", NNTP_BAD_COMMAND_VAL, MaxLength(cp->In.Data, cp->In.Data)); - NCwritetext(cp, buff); + NCwritereply(cp, buff); } @@ -896,6 +902,16 @@ /* ** Check whatever data is available on the channel. If we got the ** full amount (i.e., the command or the whole article) process it. +** +#ifdef LIMIT_WORK_PER_CYCLE +** If we're streaming, then there might be several commands and/or +** complete articles available. Processing the whole lot might take +** a long time, so we limit the amount of work that we do. When +** the threshold is reached, we set things up so that, next time the +** main select() loop gives this channel a chance, we will be able +** to carry on where we left off. +** +#endif LIMIT_WORK_PER_CYCLE */ STATIC FUNCTYPE NCproc(cp) @@ -909,6 +925,12 @@ char buff[SMBUF]; char *av[2]; int i; +#ifdef LIMIT_WORK_PER_CYCLE + /* work_this_cycle should arguably be part of the CHANNEL struct, + * so that when a channel has reading, writing and possibly other + * work to do, they all get added together. */ + int work_this_cycle; +#endif if (Tracing || cp->Tracing) syslog(L_TRACE, "%s NCproc Used=%d", @@ -919,12 +941,25 @@ p = &bp->Data[bp->Used]; wp = &NCwip[cp->fd]; +#ifdef LIMIT_WORK_PER_CYCLE + work_this_cycle = 0; +#endif /* LIMIT_WORK_PER_CYCLE */ for ( ; ; ) { +#ifdef LIMIT_WORK_PER_CYCLE + if (work_this_cycle >= LIMIT_WORK_PER_CYCLE) { + /* this channel has already done enough work */ + RRCHANadd(cp, NCproc, (POINTER)NULL); + if (Tracing || cp->Tracing) + syslog(L_TRACE, "%s NCproc relinquishing Rest=%d Used=%d SaveUsed=%d", + CHANname(cp), cp->Rest, bp->Used, cp->SaveUsed); + break; + } +#endif /* LIMIT_WORK_PER_CYCLE */ cp->Rest = 0; cp->SaveUsed = bp->Used; if (Tracing || cp->Tracing) if (bp->Used > 15) - syslog(L_TRACE, "%s NCproc state=%d next \"%.15s\"", + syslog(L_TRACE, "%s NCproc before-switch state=%d next \"%.15s\"", CHANname(cp), cp->State, bp->Data); switch (cp->State) { default: @@ -936,8 +971,13 @@ case CSgetauth: /* Did we get the whole command, terminated with "\r\n"? */ for (i = 0; (i < bp->Used) && (bp->Data[i] != '\n'); i++) ; - if (i < bp->Used) cp->Rest = bp->Used = ++i; + if (i < bp->Used) { /* yes, there's a complete command for us */ + cp->Rest = bp->Used = ++i; + } else { + if (Tracing || cp->Tracing) + syslog(L_TRACE, "%s NCproc partial-command Rest=%d Used=%d SaveUsed=%d", + CHANname(cp), cp->Rest, bp->Used, cp->SaveUsed); cp->Rest = 0; break; /* come back later for rest of line */ } @@ -948,12 +988,13 @@ syslog(L_NOTICE, "%s bad_command %s", CHANname(cp), MaxLength(bp->Data, bp->Data)); - NCwritetext(cp, NCbadcommand); if (++(cp->BadCommands) >= BAD_COMMAND_COUNT) { cp->State = CSwritegoodbye; + NCwritereply(cp, NCbadcommand); cp->Rest = cp->SaveUsed; break; - } + } else + NCwritereply(cp, NCbadcommand); for (j = i + 1; j < cp->SaveUsed; j++) if (bp->Data[j] == '\n') { if (bp->Data[j - 1] == '\r') break; @@ -970,11 +1011,16 @@ if (Tracing || cp->Tracing) syslog(L_TRACE, "%s < %s", CHANname(cp), bp->Data); +#ifdef LIMIT_WORK_PER_CYCLE + /* blank lines are not "work", but commands are */ + work_this_cycle += 1; +#endif /* LIMIT_WORK_PER_CYCLE */ + /* We got something -- stop sleeping (in case we were). */ SCHANremove(cp); - if (cp->Argument != NULL) { - DISPOSE(cp->Argument); - cp->Argument = NULL; + if (cp->WakerArgument != NULL) { + DISPOSE(cp->WakerArgument); + cp->WakerArgument = NULL; } if (cp->State == CSgetauth) { @@ -998,11 +1044,12 @@ } if (dp == ENDOF(NCcommands)) { - NCwritetext(cp, NCbadcommand); if (++(cp->BadCommands) >= BAD_COMMAND_COUNT) { cp->State = CSwritegoodbye; + NCwritereply(cp, NCbadcommand); cp->Rest = cp->SaveUsed; - } + } else + NCwritereply(cp, NCbadcommand); for (i = 0; (p = NCquietlist[i]) != NULL; i++) if (caseEQ(p, bp->Data)) break; @@ -1014,6 +1061,9 @@ case CSgetarticle: case CSgetrep: +#ifdef LIMIT_WORK_PER_CYCLE + work_this_cycle += 1; +#endif /* LIMIT_WORK_PER_CYCLE */ /* Check for the null article. */ if ((bp->Used >= 3) && (bp->Data[0] == '.') && (bp->Data[1] == '\r') && (bp->Data[2] == '\n')) { @@ -1022,6 +1072,7 @@ #if 0 syslog(L_NOTICE, "%s empty article", CHANname(cp)) ; #endif + cp->State = CSgetcmd; if (cp->Sendid.Size > 3) { /* We be streaming */ char buff[4]; (void)sprintf(buff, "%d", NNTP_ERR_FAILID_VAL); @@ -1030,8 +1081,7 @@ cp->Sendid.Data[2] = buff[2]; NCwritereply(cp, cp->Sendid.Data); } - else NCwritetext(cp, NNTP_REJECTIT_EMPTY); - cp->State = CSgetcmd; + else NCwritereply(cp, NNTP_REJECTIT_EMPTY); bp->Used = 0; /* Clear the work-in-progress entry. */ @@ -1075,12 +1125,15 @@ } /* Strip article terminator and post the article. */ +#ifdef LIMIT_WORK_PER_CYCLE + work_this_cycle += 10; +#endif /* LIMIT_WORK_PER_CYCLE */ p[-3] = '\0'; bp->Used -= 2; SCHANremove(cp); - if (cp->Argument != NULL) { - DISPOSE(cp->Argument); - cp->Argument = NULL; + if (cp->WakerArgument != NULL) { + DISPOSE(cp->WakerArgument); + cp->WakerArgument = NULL; } NCclean(bp); NCpostit(cp); @@ -1088,6 +1141,9 @@ break; case CSeatarticle: +#ifdef LIMIT_WORK_PER_CYCLE + work_this_cycle += 1; +#endif /* LIMIT_WORK_PER_CYCLE */ /* Eat the article and then complain that it was too large */ /* Reading an article; look for "\r\n.\r\n" terminator. */ if (cp->Lastch > 5) i = cp->Lastch; /* only look at new data */ @@ -1106,9 +1162,9 @@ if (i <= bp->Used) { /* did find terminator */ /* Reached the end of the article. */ SCHANremove(cp); - if (cp->Argument != NULL) { - DISPOSE(cp->Argument); - cp->Argument = NULL; + if (cp->WakerArgument != NULL) { + DISPOSE(cp->WakerArgument); + cp->WakerArgument = NULL; } p = wp->MessageID; i = wp->Size + bp->Used; @@ -1116,9 +1172,9 @@ CHANname(cp), p ? p : "(null)", i, LargestArticle); (void)sprintf(buff, "%d Article exceeds local limit of %ld bytes", NNTP_REJECTIT_VAL, LargestArticle); - if (cp->Sendid.Size) NCwritetext(cp, cp->Sendid.Data); - else NCwritetext(cp, buff); cp->State = CSgetcmd; + if (cp->Sendid.Size) NCwritereply(cp, cp->Sendid.Data); + else NCwritereply(cp, buff); cp->Rejected++; /* Write a local cancel entry so nobody else gives it to us. */ @@ -1159,7 +1215,11 @@ cp->Rest = 0; } break; + case CSgetxbatch: +#ifdef LIMIT_WORK_PER_CYCLE + work_this_cycle += 1; +#endif /* LIMIT_WORK_PER_CYCLE */ /* if the batch is complete, write it out into the in.coming * directory with an unique timestamp, and start rnews on it. */ @@ -1174,6 +1234,10 @@ cp->Rest = wp->Size; /* now do something with the batch */ +#ifdef LIMIT_WORK_PER_CYCLE + work_this_cycle += 10; +#endif /* LIMIT_WORK_PER_CYCLE */ + cp->State = CSgetcmd; { char buff[SMBUF], buff2[SMBUF]; int fd, oerrno, failed; @@ -1191,7 +1255,7 @@ CHANname(cp), buff); sprintf(buff, "%s cant create file: %s", NNTP_RESENDIT_XBATCHERR, strerror(oerrno)); - NCwritetext(cp, buff); + NCwritereply(cp, buff); } else { if (write(fd, cp->In.Data, wp->Size) != wp->Size) { oerrno = errno; @@ -1199,7 +1263,7 @@ CHANname(cp), buff); sprintf(buff, "%s cant write batch to file: %s", NNTP_RESENDIT_XBATCHERR, strerror(oerrno)); - NCwritetext(cp, buff); + NCwritereply(cp, buff); failed = 1; } } @@ -1209,7 +1273,7 @@ CHANname(cp), failed ? "" : buff); sprintf(buff, "%s error closing batch file: %s", NNTP_RESENDIT_XBATCHERR, strerror(oerrno)); - NCwritetext(cp, buff); + NCwritereply(cp, buff); failed = 1; } sprintf(buff2, "%s/%ld%d.x", _PATH_XBATCHES, now, cp->fd); @@ -1219,18 +1283,17 @@ CHANname(cp), failed ? "" : buff, buff2); sprintf(buff, "%s cant rename batch to %s: %s", NNTP_RESENDIT_XBATCHERR, buff2, strerror(oerrno)); - NCwritetext(cp, buff); + NCwritereply(cp, buff); failed = 1; } cp->Reported++; if (!failed) { - NCwritetext(cp, NNTP_OK_XBATCHED); + NCwritereply(cp, NNTP_OK_XBATCHED); cp->Received++; } else cp->Rejected++; } syslog(L_NOTICE, "%s accepted batch size %ld", CHANname(cp), wp->Size); - cp->State = CSgetcmd; /* Clear the work-in-progress entry. */ NCclearwip(wp); @@ -1251,7 +1314,7 @@ break; } if (Tracing || cp->Tracing) - syslog(L_TRACE, "%s NCproc Rest=%d Used=%d SaveUsed=%d", + syslog(L_TRACE, "%s NCproc after-switch Rest=%d Used=%d SaveUsed=%d", CHANname(cp), cp->Rest, bp->Used, cp->SaveUsed); if (cp->Rest > 0) { @@ -1287,6 +1350,18 @@ syslog(L_TRACE, "%s NCreader Used=%d", CHANname(cp), cp->In.Used); + /* If the channel is in a state where it has work to do + * without needing any input, and if the input buffer + * already contains a lot of data, then we refrain from + * reading more now. Otherwise, there would be a risk + * that the input buffer could grow without bound while the + * channel performs work that doesn't use up any input. + * + * XXX should this test be in the select loop in chan.c rather than here? + */ + if (CHANready(cp) && cp->In.Used > 4096) + return; + /* Read any data that's there; ignore errors (retry next time it's our * turn) and if we got nothing, then it's EOF so mark it closed. */ if ((i = CHANreadtext(cp)) <= 0) { @@ -1440,7 +1515,7 @@ } cp->BadReads = 0; cp->BadCommands = 0; - NCwritetext(cp, NCgreeting); + NCwritereply(cp, NCgreeting); return cp; } @@ -1463,7 +1538,7 @@ WIP *who; if (AmSlave) { - NCwritetext(cp, NCbadcommand); + NCwritereply(cp, NCbadcommand); return; } Index: innd/rc.c --- 151/inn-1.5.1/innd/rc.c Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/innd/rc.c Tue Jan 28 13:29:11 1997 @@ -469,7 +469,9 @@ /* Was host specified as as dotted quad? */ if ((rp->Address.s_addr = inet_addr(buff)) != (unsigned int) -1) { - syslog(LOG_NOTICE, "think it's a dotquad: %s",buff); +#if 0 /* Why would we want our logs to fill up with these messages? */ + syslog(LOG_NOTICE, "think it's a dotquad: %s",buff); +#endif rp->Name = COPY(buff); rp->Password = COPY(pass); rp->Patterns = (pats && *pats) ? CommaSplit(COPY(pats)) : NULL; Index: innd/site.c --- 151/inn-1.5.1/innd/site.c Tue Dec 17 16:40:40 1996 +++ 151+apb/inn-1.5.1/innd/site.c Tue Jan 28 17:39:09 1997 @@ -68,9 +68,10 @@ if (AmRoot) xchown(name); if (cp) { - if (cp->fd >= 0) - syslog(L_ERROR, "DEBUG ERROR SITEspool trashed:%d %s:%d", cp->fd, sp->Name, i); + if (cp->fd >= 0) + syslog(L_ERROR, "DEBUG ERROR SITEspool trashed:%d %s:%d", cp->fd, sp->Name, i); /* CPU-eating bug, killed by kre. */ + RRCHANremove(cp); WCHANremove(cp); RCHANremove(cp); SCHANremove(cp); @@ -524,10 +525,10 @@ SITE *sp; int *ip; - ip = CAST(int*, cp->Argument); + ip = CAST(int*, cp->WakerArgument); sp = &Sites[*ip]; - DISPOSE(cp->Argument); - cp->Argument = NULL; + DISPOSE(cp->WakerArgument); + cp->WakerArgument = NULL; if (sp->Channel != cp) { syslog(L_ERROR, "%s internal SITEspoolwake %s got %d, not %d", LogName, sp->Name, cp->fd, sp->Channel->fd); (end) Date: Wed, 29 Jan 1997 10:51:52 +0200 (GMT+0200) From: Alan Barrett To: inn-workers@vix.com Subject: Re: inn patch: reworked scheduling In-Reply-To: Message-ID: Don't use the patch from my previous message without also applying the the following patch to innd/cc.c. Else every time you use {ctlinnd renumber just.one.group} it will renumber everything. i should not make last-minute "trivial" changes without more testing i should not make last-minute "trivial" changes without more testing i should not make last-minute "trivial" changes without more testing i should not make last-minute "trivial" changes without more testing --apb (Alan Barrett) Index: innd/cc.c --- cc.c.broken Wed Jan 29 10:46:56 1997 +++ cc.c Wed Jan 29 10:41:07 1997 @@ -1353,11 +1353,13 @@ return CCnogroup; if (!NGrenumber(ngp)) return CANTRENUMBER; + return NULL; + } else { + /* renumber all groups in foreground */ + if (!ICDrenumberactive()) + return CANTRENUMBER; + return NULL; } - /* renumber all groups in foreground */ - if (!ICDrenumberactive()) - return CANTRENUMBER; - return NULL; }