Mal wieder ein Perl-Problem... ^^
Der Server-Code oben ist mehr oder weniger sinnfreies Rumgebastel, da ich mich intensiver mit Sockets, Tk (Client) und Threads auseinander setzen moechte. Die Performance von dem Teil ist sicher net die beste und das Design wahrscheinlich auch net, aber das soll uns hier 'mal nicht stoeren.
Der Gedanke dahinter:
Master
- Oeffnen des Sockets,
- Starten des "Dispatchers"
- Warten auf Clienten und Starten der Reader & Writer
Reader
- Client-Socket auf Input ueberwachen, Input von Socket lesen und in Warteschlage packen
Writer
- Client-Output-Warteschlange ueberwachen und output in Socket schreiben
Dispatcher
- "Tote"/inaktive/ungenutzte Threads und deren Warteschlangen loeschen
- Eingaben in Warteschlange verarbeiten
- Ausgaben in Warteschlange packen
Soweit so gut... Eigentlich funktioniert das Teil auch, bis auf 2 Dinge:
[1] - wird der Befehl "list" ausgefuehrt bekomme ich folgende Ausgaben:
Server:
Client:
8127480 und 8127504 sind die internen Adressen der "shared" variablen. Innerhalb des Threads hat erstere aber einen falschen Wert !?
Im "Master" hat sie direkt nach dem Start des Threads den Wert "127.0.0.1". Im "Dispatcher" spaeter aber "1234469938.798932". Wie kann das sein?
Hat Time::HiRes da seine Finger im Spiel? Immerhin ist das ja ein Timestamp der da drin steht...
[2] - schalte ich den Cleanup wieder ein (NO_CLEANUP => 0) erhalte ich:
Der Thread schmiert also wegen
ab. Aber warum? Hab ich einen Denkfehler im Cleanup?
Und warum loescht er 2 mal die Output-Warteschlange von Client 0? Die Duerfte nach dem 1. Loeschen agrnichtmehr existieren, also duerfte er sie auch netmehr Loeschen wollen...
Zum Testen einfach den Server starten (ggf. mit -v) und via telnet verbinden.
MfG - Keks
Code:
Linux keksinat0r.is-a-geek.org 2.6.26-1-amd64 #1 SMP Sat Jan 10 17:57:00 UTC 2009 x86_64 GNU/Linux
Code:
Summary of my perl5 (revision 5 version 10 subversion 0) configuration:
Platform:
osname=linux, osvers=2.6.26-1-vserver-amd64, archname=x86_64-linux-gnu-thread-multi
uname='linux excelsior 2.6.26-1-vserver-amd64 #1 smp sat nov 8 20:24:14 utc 2008 x86_64 gnulinux '
config_args='-Dusethreads -Duselargefiles -Dccflags=-DDEBIAN -Dcccdlflags=-fPIC -Darchname=x86_64-linux-gnu -Dprefix=/usr -Dprivlib=/usr/share/perl/5.10 -Darchlib=/usr/lib/perl/5.10 -Dvendorprefix=/usr -Dvendorlib=/usr/share/perl5 -Dvendorarch=/usr/lib/perl5 -Dsiteprefix=/usr/local -Dsitelib=/usr/local/share/perl/5.10.0 -Dsitearch=/usr/local/lib/perl/5.10.0 -Dman1dir=/usr/share/man/man1 -Dman3dir=/usr/share/man/man3 -Dsiteman1dir=/usr/local/man/man1 -Dsiteman3dir=/usr/local/man/man3 -Dman1ext=1 -Dman3ext=3perl -Dpager=/usr/bin/sensible-pager -Uafs -Ud_csh -Ud_ualarm -Uusesfio -Uusenm -DDEBUGGING=-g -Doptimize=-O2 -Duseshrplib -Dlibperl=libperl.so.5.10.0 -Dd_dosuid -des'
hint=recommended, useposix=true, d_sigaction=define
useithreads=define, usemultiplicity=define
useperlio=define, d_sfio=undef, uselargefiles=define, usesocks=undef
use64bitint=define, use64bitall=define, uselongdouble=undef
usemymalloc=n, bincompat5005=undef
Compiler:
cc='cc', ccflags ='-D_REENTRANT -D_GNU_SOURCE -DDEBIAN -fno-strict-aliasing -pipe -I/usr/local/include -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64',
optimize='-O2 -g',
cppflags='-D_REENTRANT -D_GNU_SOURCE -DDEBIAN -fno-strict-aliasing -pipe -I/usr/local/include'
ccversion='', gccversion='4.3.2', gccosandvers=''
intsize=4, longsize=8, ptrsize=8, doublesize=8, byteorder=12345678
d_longlong=define, longlongsize=8, d_longdbl=define, longdblsize=16
ivtype='long', ivsize=8, nvtype='double', nvsize=8, Off_t='off_t', lseeksize=8
alignbytes=8, prototype=define
Linker and Libraries:
ld='cc', ldflags =' -L/usr/local/lib'
libpth=/usr/local/lib /lib /usr/lib /lib64 /usr/lib64
libs=-lgdbm -lgdbm_compat -ldb -ldl -lm -lpthread -lc -lcrypt
perllibs=-ldl -lm -lpthread -lc -lcrypt
libc=/lib/libc-2.7.so, so=so, useshrplib=true, libperl=libperl.so.5.10.0
gnulibc_version='2.7'
Dynamic Linking:
dlsrc=dl_dlopen.xs, dlext=so, d_dlsymun=undef, ccdlflags='-Wl,-E'
cccdlflags='-fPIC', lddlflags='-shared -O2 -g -L/usr/local/lib'
Characteristics of this binary (from libperl):
Compile-time options: MULTIPLICITY PERL_DONT_CREATE_GVSV
PERL_IMPLICIT_CONTEXT PERL_MALLOC_WRAP USE_64_BIT_ALL
USE_64_BIT_INT USE_ITHREADS USE_LARGE_FILES
USE_PERLIO USE_REENTRANT_API
Built under linux
Compiled at Jan 1 2009 15:42:01
@INC:
/etc/perl
/usr/local/lib/perl/5.10.0
/usr/local/share/perl/5.10.0
/usr/lib/perl5
/usr/share/perl5
/usr/lib/perl/5.10
/usr/share/perl/5.10
/usr/local/lib/site_perl
.
Code:
#!/usr/bin/env perl
package main;
use threads;
use threads::shared;
use constant NO_CLEANUP => 1;
$| = 1;
our @thr :shared = ();
our @thr_in :shared = ();
our @thr_out :shared = ();
$SIG{CHLD} = sub{ };
$SIG{TERM} = sub{ exit 1 };
$SIG{KILL} = sub{ exit 2 };
my $v = $ARGV[0] eq '-v' ? 1 : 0;
sub v (;@) {
if( $v ) {
local $_ = join '', @_, "\n";
$_ =~ s/\n{2,}/\n/g;
print STDERR $_;
}
};
#############################################################################
### Master ####################################################################
package Master;
use strict;
use warnings FATAL => 'all';
use threads;
use threads::shared;
use IO::Socket::INET;
use Time::HiRes qw/ usleep gettimeofday /;
my $cid = 0;
my $msock = new IO::Socket::INET(
Listen => SOMAXCONN,
Type => SOCK_STREAM,
Proto => 'tcp',
LocalPort => 3333,
LocalAddr => 'localhost',
ReuseAddr => 1,
) or die "Master socket error: $!\n";
::v ' * server listening on ', $msock->sockhost, ':', $msock->sockport;
threads->new(\&Dispatcher::run);
::v ' * dispatcher started';
::v ' * waiting for incomming connections...';
while( my $csock = $msock -> accept ) {
::v ' * client connected: ', $cid, ' [', $csock->peerhost, ':', $csock->sockport, ']';
$csock->autoflush(1);
{
lock(@::thr);
lock(@::thr_in);
lock(@::thr_out);
my @thr; share(@thr);
my @thr_in; share(@thr_in);
my @thr_out; share(@thr_out);
$::thr[$cid] = \@thr;
$::thr_in[$cid] = \@thr_in;
$::thr_out[$cid] = \@thr_out;
$::thr[$cid][0] = threads->new(\&Reader::run, $cid, $csock)->tid;
$::thr[$cid][1] = threads->new(\&Writer::run, $cid, $csock)->tid;
$::thr[$cid][2] = join '.', gettimeofday;
$::thr[$cid][3] = $csock->peerhost;
$::thr[$cid][4] = $csock->peerport;
### XXX DEBUG:
::v " <<< ", is_shared($::thr[$cid][3]), " = ", $::thr[$cid][3], "\n";
::v " <<< ", is_shared($::thr[$cid][4]), " = ", $::thr[$cid][4], "\n";
::v ' * thread ', $::thr[$cid][0], ' and ', $::thr[$cid][1], ' started at ', $::thr[$cid][2];
};
$cid++;
usleep(250);
}
#############################################################################
### Reader ####################################################################
package Reader;
use strict;
use warnings FATAL => 'all';
use threads;
use threads::shared;
use IO::Socket::INET;
use Time::HiRes qw/ usleep gettimeofday /;
sub run {
my($cid, $csock) = (shift, shift);
local $SIG{TERM} = sub{ eval{close $csock}; threads -> exit(0) };
local $SIG{KILL} = sub{ eval{close $csock}; threads -> exit(2) };
local $_;
while( defined($_ = <$csock>) ) {
{
lock(@{$::thr[$cid]});
lock(@{$::thr_in[$cid]});
$::thr[$cid][3] = join '.', gettimeofday;
push @{$::thr_in[$cid]}, $_;
}
usleep(250);
}
}
#############################################################################
### Writer ####################################################################
package Writer;
use strict;
use warnings FATAL => 'all';
use threads;
use threads::shared;
use IO::Socket::INET;
use Time::HiRes qw/ usleep gettimeofday /;
sub run {
my($cid, $csock) = (shift, shift);
local $SIG{TERM} = sub{ eval{close $csock}; threads -> exit(0) };
local $SIG{KILL} = sub{ eval{close $csock}; threads -> exit(2) };
local $_;
while( 1 ) {
if( $#{$::thr_out[$cid]} != -1 ) {
{
lock(@{$::thr[$cid]});
lock(@{$::thr_out[$cid]});
$::thr[$cid][3] = join '.', gettimeofday;
$_ = shift @{$::thr_out[$cid]};
}
if( defined $_ ) {
$_ =~ s/\r\n/\n/;
$_ .= "\n" unless $_ =~ /\n$/;
print $csock $_
}
}
usleep(250);
}
}
#############################################################################
### Dispatcher ################################################################
package Dispatcher;
use strict;
use warnings FATAL => 'all';
use threads;
use threads::shared;
use IO::Socket::INET;
use Time::HiRes qw/ usleep gettimeofday /;
sub run {
local $SIG{TERM} = sub{ threads -> exit(0) };
local $SIG{KILL} = sub{ threads -> exit(2) };
local $_;
while( 1 ) {
lock(@::thr);
lock(@::thr_in);
lock(@::thr_out);
unless( ::NO_CLEANUP ) {
for my $cid ( 0 .. $#::thr ) {
if( exists $::thr[$cid] ) {
### remove unused
unless( defined $::thr[$cid] && ref $::thr[$cid] eq 'ARRAY' ) {
delete $::thr[$cid];
::v " * removed unused: $cid\n";
}
else {
### remove crippled
unless( exists $::thr[$cid][0] && defined $::thr[$cid][0] && $::thr[$cid][0] =~ /^\d+$/
&& exists $::thr[$cid][1] && defined $::thr[$cid][1] && $::thr[$cid][1] =~ /^\d+$/
&& exists $::thr[$cid][2] && defined $::thr[$cid][2] && $::thr[$cid][2] =~ /^\d+\.\d+$/ )
{
if( exists $::thr[$cid][0] && defined $::thr[$cid][0] && $::thr[$cid][0] =~ /^\d+$/) {
$_ = threads->object($::thr[$cid][0]);
if( defined $_ ) {
$_ -> kill('KILL');
$_ -> detach;
}
}
if( exists $::thr[$cid][1] && defined $::thr[$cid][1] && $::thr[$cid][1] =~ /^\d+$/) {
$_ = threads->object($::thr[$cid][1]);
if( defined $_ ) {
$_ -> kill('KILL');
$_ -> detach;
}
}
delete $::thr[$cid];
::v " * removed crippled: $cid\n";
}
else {
my $r = threads->object($::thr[$cid][0]);
my $w = threads->object($::thr[$cid][1]);
### remove crashed
unless( defined $r && $r -> is_running && defined $w && $w -> is_running ) {
if( defined $r && $r -> is_running ) {
$r -> kill('KILL');
$r -> detach;
}
if( defined $w && $w -> is_running ) {
$w -> kill('KILL');
$w -> detach;
}
delete $::thr[$cid];
::v " * removed crashed: $cid\n";
}
else {
### remove timed out
unless( tv_interval($::thr[$cid][2]) < 500 ) {
$r -> kill('KILL');
$r -> detach;
$w -> kill('KILL');
$w -> detach;
delete $::thr[$cid];
::v " * removed timed out: $cid\n";
}
}
}
}
}
}
for( 0 .. $#::thr_in ) {
if( exists $::thr_in[$_] && !exists $::thr[$_]) {
delete $::thr_in[$_];
::v " * removed in-queue: $_\n";
}
}
for( 0 .. $#::thr_out ) {
if( exists $::thr_out[$_] && !exists $::thr[$_]) {
delete $::thr_out[$_];
::v " * removed out-queue: $_\n";
}
}
}
usleep(250);
### dispatch commands
for my $cid ( 0 .. $#::thr_in ) {
if( exists $::thr_in[$cid] ) {
if( defined($_ = shift @{$::thr_in[$cid]}) ) {
$_ =~ s/[\r\n]$//g;
if( defined $_ && $_ ne '' ) {
if( /^echo (.+)$/ ) {
push @{$::thr_out[$cid]}, $1
} elsif( /^time( hires)?$/ ) {
push @{$::thr_out[$cid]}, $1 ? join('.', gettimeofday) : time
} elsif( /^date( hires)?$/ ) {
push @{$::thr_out[$cid]}, scalar localtime( $1 ? scalar(gettimeofday) : time );
} elsif( /^broadcast(?: (.+))?$/ ) {
for( 0 .. $#::thr_out ) {
if( exists $::thr_out[$_] ) {
push @{$::thr_out[$_]}, ("($cid) : " . (defined $1 ? $1 : '/hello there!/'));
}
}
} elsif( /^whoami$/ ) {
push @{$::thr_out[$cid]}, "ID: $cid";
} elsif( /^list$/ ) {
lock(@::thr);
for my $id ( 0 .. $#::thr ) {
lock(@{$::thr[$id]});
if( exists $::thr[$id] ) {
push @{$::thr_out[$cid]}, (" " . $id . " - " . $::thr[$id][3] . ":" . $::thr[$id][4] . " (" . $::thr[$id][2] . ")");
### XXX DEBUG:
push @{$::thr_out[$cid]}, (" -> " . is_shared($::thr[$id][3]) . ", " . is_shared($::thr[$id][4]));
::v " " . $id . " - " . $::thr[$id][3] . ":" . $::thr[$id][4] . " (" . $::thr[$id][2] . ")\n";
::v " -> " . is_shared($::thr[$id][3]) . ", " . is_shared($::thr[$id][4]) . "\n";
}
}
} elsif( /^whisper (\d+) (.+)$/ ) {
if( exists $::thr_out[$1] ) {
push @{$::thr_out[$1]}, ("[$cid] : " . $2);
} else {
push @{$::thr_out[$cid]}, 'unknown CID'
}
} else {
push @{$::thr_out[$cid]}, 'unknown command';
}
}
}
}
}
usleep(250);
}
}
__END__
Der Gedanke dahinter:
Master
- Oeffnen des Sockets,
- Starten des "Dispatchers"
- Warten auf Clienten und Starten der Reader & Writer
Reader
- Client-Socket auf Input ueberwachen, Input von Socket lesen und in Warteschlage packen
Writer
- Client-Output-Warteschlange ueberwachen und output in Socket schreiben
Dispatcher
- "Tote"/inaktive/ungenutzte Threads und deren Warteschlangen loeschen
- Eingaben in Warteschlange verarbeiten
- Ausgaben in Warteschlange packen
Soweit so gut... Eigentlich funktioniert das Teil auch, bis auf 2 Dinge:
[1] - wird der Befehl "list" ausgefuehrt bekomme ich folgende Ausgaben:
Server:
Code:
* server listening on 127.0.0.1:3333
* dispatcher started
* waiting for incomming connections...
* client connected: 0 [127.0.0.1:3333]
<<< 8127480 = 127.0.0.1
<<< 8127504 = 53515
* thread 2 and 3 started at 1234469936.33166
Client:
Code:
0 - 1234469938.798932:53515 (1234469936.33166)
-> 8127480, 8127504
Im "Master" hat sie direkt nach dem Start des Threads den Wert "127.0.0.1". Im "Dispatcher" spaeter aber "1234469938.798932". Wie kann das sein?
Hat Time::HiRes da seine Finger im Spiel? Immerhin ist das ja ein Timestamp der da drin steht...
[2] - schalte ich den Cleanup wieder ein (NO_CLEANUP => 0) erhalte ich:
Code:
* server listening on 127.0.0.1:3333
* dispatcher started
* waiting for incomming connections...
* client connected: 0 [127.0.0.1:3333]
<<< 9704440 = 127.0.0.1
<<< 9704464 = 60887
* thread 2 and 3 started at 1234470268.20149
* removed crashed: 0
* removed in-queue: 0
* removed out-queue: 0
Thread 3 terminated abnormally: Invalid value for shared scalar at ./chat.pl line 158.
* removed out-queue: 0
Code:
if( $#{$::thr_out[$cid]} != -1 )
Und warum loescht er 2 mal die Output-Warteschlange von Client 0? Die Duerfte nach dem 1. Loeschen agrnichtmehr existieren, also duerfte er sie auch netmehr Loeschen wollen...
Zum Testen einfach den Server starten (ggf. mit -v) und via telnet verbinden.
MfG - Keks
