[ros-diffs] [mjmartin] 38573: - Re-implement Message Type read/write mode as the previous implementation was incorrect. - Fixed bug in Byte Stream mode that caused threads to not wake from a wait. - Implemented NpfsPeekPipe.

mjmartin at svn.reactos.org mjmartin at svn.reactos.org
Mon Jan 5 01:08:43 CET 2009


Author: mjmartin
Date: Sun Jan  4 18:08:43 2009
New Revision: 38573

URL: http://svn.reactos.org/svn/reactos?rev=38573&view=rev
Log:
- Re-implement Message Type read/write mode as the previous implementation was incorrect.
- Fixed bug in Byte Stream mode that caused threads to not wake from a wait.
- Implemented NpfsPeekPipe.


Modified:
    trunk/reactos/drivers/filesystems/npfs/fsctrl.c
    trunk/reactos/drivers/filesystems/npfs/rw.c

Modified: trunk/reactos/drivers/filesystems/npfs/fsctrl.c
URL: http://svn.reactos.org/svn/reactos/trunk/reactos/drivers/filesystems/npfs/fsctrl.c?rev=38573&r1=38572&r2=38573&view=diff
==============================================================================
--- trunk/reactos/drivers/filesystems/npfs/fsctrl.c [iso-8859-1] (original)
+++ trunk/reactos/drivers/filesystems/npfs/fsctrl.c [iso-8859-1] Sun Jan  4 18:08:43 2009
@@ -5,6 +5,7 @@
 * PURPOSE:    Named pipe filesystem
 * PROGRAMMER: David Welch <welch at cwcom.net>
 *             Eric Kohl
+*             Michael Martin
 */
 
 /* INCLUDES ******************************************************************/
@@ -368,11 +369,15 @@
 	PNPFS_FCB Fcb;
 	PNPFS_CCB Ccb;
 	NTSTATUS Status;
-
-	DPRINT1("NpfsPeekPipe\n");
+	ULONG MessageCount = 0;
+	ULONG MessageLength;
+	ULONG ReadDataAvailable;
+	PVOID BufferPtr;
+
+	DPRINT("NpfsPeekPipe\n");
 
 	OutputBufferLength = IoStack->Parameters.DeviceIoControl.OutputBufferLength;
-	DPRINT1("OutputBufferLength: %lu\n", OutputBufferLength);
+	DPRINT("OutputBufferLength: %lu\n", OutputBufferLength);
 
 	/* Validate parameters */
 	if (OutputBufferLength < sizeof(FILE_PIPE_PEEK_BUFFER))
@@ -391,16 +396,61 @@
 	Reply->ReadDataAvailable = Ccb->ReadDataAvailable;
 	DPRINT("ReadDataAvailable: %lu\n", Ccb->ReadDataAvailable);
 
-	Reply->NumberOfMessages = 0; /* FIXME */
-	Reply->MessageLength = 0; /* FIXME */
-	Reply->Data[0] = 0; /* FIXME */
-
-	//  Irp->IoStatus.Information = sizeof(FILE_PIPE_PEEK_BUFFER);
-
-	//  Status = STATUS_SUCCESS;
-	Status = STATUS_NOT_IMPLEMENTED;
-
-	DPRINT1("NpfsPeekPipe done\n");
+	if (Ccb->Fcb->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
+	{
+		DPRINT("Byte Stream Mode\n");
+		Reply->MessageLength = Ccb->ReadDataAvailable;
+		DPRINT("Reply->MessageLength  %d\n",Reply->MessageLength );
+	}
+	else
+	{
+		DPRINT("Message Mode\n");
+		ReadDataAvailable=Ccb->ReadDataAvailable;
+
+		if (ReadDataAvailable > 0)
+		{
+			memcpy(&Reply->MessageLength,Ccb->Data,sizeof(ULONG));
+			BufferPtr = Ccb->Data;
+			/* NOTE: Modifying the structure in header file to keep track of NumberOfMessage would be better */
+			while ((ReadDataAvailable > 0) && (BufferPtr < Ccb->WritePtr))
+			{
+				memcpy(&MessageLength, BufferPtr, sizeof(MessageLength));
+
+				ASSERT(MessageLength > 0);
+
+				DPRINT("MessageLength = %d\n",MessageLength);
+				MessageCount++;
+				ReadDataAvailable -= MessageLength;
+
+				/* If its the first message, copy the Message if the size of buffer is large enough */
+				if ((MessageCount==1) && (Reply->Data[0]) 
+					&& (OutputBufferLength >= (sizeof(FILE_PIPE_PEEK_BUFFER) + MessageLength)))
+				{
+					memcpy(&Reply->Data[0], (PVOID)((ULONG)BufferPtr + sizeof(MessageLength)), MessageLength);
+				}
+				BufferPtr =(PVOID)((ULONG)BufferPtr + MessageLength + sizeof(MessageLength));
+				DPRINT("Message %d\n",MessageCount);
+				DPRINT("ReadDataAvailable: %lu\n", ReadDataAvailable);
+			}
+
+			if (ReadDataAvailable != 0)
+			{
+				DPRINT1("This should never happen! Possible memory corruption.\n");
+				return STATUS_UNSUCCESSFUL;
+			}
+		}
+	}
+
+	Reply->NumberOfMessages = MessageCount;
+	if (MessageCount > 0)
+		Reply->Data[0] = 0;
+
+	Irp->IoStatus.Information = OutputBufferLength;
+	Irp->IoStatus.Status = STATUS_SUCCESS;
+
+	Status = STATUS_SUCCESS;
+
+	DPRINT("NpfsPeekPipe done\n");
 
 	return Status;
 }

Modified: trunk/reactos/drivers/filesystems/npfs/rw.c
URL: http://svn.reactos.org/svn/reactos/trunk/reactos/drivers/filesystems/npfs/rw.c?rev=38573&r1=38572&r2=38573&view=diff
==============================================================================
--- trunk/reactos/drivers/filesystems/npfs/rw.c [iso-8859-1] (original)
+++ trunk/reactos/drivers/filesystems/npfs/rw.c [iso-8859-1] Sun Jan  4 18:08:43 2009
@@ -4,6 +4,7 @@
 * FILE:       drivers/fs/np/rw.c
 * PURPOSE:    Named pipe filesystem
 * PROGRAMMER: David Welch <welch at cwcom.net>
+*             Michael Martin
 */
 
 /* INCLUDES ******************************************************************/
@@ -300,7 +301,7 @@
 	KEVENT Event;
 	ULONG Length;
 	ULONG Information = 0;
-	ULONG CopyLength;
+	ULONG CopyLength = 0;
 	ULONG TempLength;
 	BOOLEAN IsOriginalRequest = TRUE;
 	PVOID Buffer;
@@ -316,12 +317,16 @@
 	}
 
 	FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
+	DPRINT("FileObject %p\n", FileObject);
+	DPRINT("Pipe name %wZ\n", &FileObject->FileName);
 	Ccb = FileObject->FsContext2;
 	Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
 
-	if (Ccb->OtherSide == NULL)
-	{
-		DPRINT("Pipe is NOT connected!\n");
+	if ((Ccb->OtherSide == NULL) && (Ccb->ReadDataAvailable == 0))
+	{
+		/* Its ok if the other side has been Disconnect, but if we have data still in the buffer
+                   , need to still be able to read it. Currently this is a HAXXXX */
+		DPRINT1("Pipe is NO longer connected and no data exist in buffer!\n");
 		if (Ccb->PipeState == FILE_PIPE_LISTENING_STATE)
 			Status = STATUS_PIPE_LISTENING;
 		else if (Ccb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
@@ -420,7 +425,7 @@
 				{
 					break;
 				}
-				if (Ccb->PipeState != FILE_PIPE_CONNECTED_STATE)
+				if ((Ccb->PipeState != FILE_PIPE_CONNECTED_STATE) && (Ccb->ReadDataAvailable == 0))
 				{
 					DPRINT("PipeState: %x\n", Ccb->PipeState);
 					Status = STATUS_PIPE_BROKEN;
@@ -450,7 +455,7 @@
 					if (NT_SUCCESS(Status))
 					{
 						Status = STATUS_PENDING;
-                        goto done;
+						goto done;
 					}
 					ExAcquireFastMutex(&Ccb->DataListLock);
 					break;
@@ -489,7 +494,7 @@
 					Ccb->WriteQuotaAvailable += CopyLength;
 				}
 
-				if (Length == 0)
+				if ((Length == 0) || (Ccb->ReadDataAvailable == 0))
 				{
 					if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
 					{
@@ -503,37 +508,82 @@
 			{
 				DPRINT("Message mode\n");
 
-				/* Message mode */
+				/* For Message mode, the Message length will be stored in the buffer preceeding the Message. */
+
 				if (Ccb->ReadDataAvailable)
 				{
-					/* Truncate the message if the receive buffer is too small */
-					CopyLength = min(Ccb->ReadDataAvailable, Length);
-					memcpy(Buffer, Ccb->Data, CopyLength);
-
+					ULONG NextMessageLength=0;
+					//HexDump(Ccb->Data, (ULONG)Ccb->WritePtr - (ULONG)Ccb->Data);
+
+					/*First get the size of the message */
+					memcpy(&NextMessageLength, Ccb->Data, sizeof(NextMessageLength));
+
+					if (NextMessageLength == 0) 
+					{
+						DPRINT1("This should never happen! Possible memory corruption.\n");
 #ifndef NDEBUG
-					DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
-					HexDump((PUCHAR)Buffer, CopyLength);
+						HexDump(Ccb->Data, (ULONG)Ccb->WritePtr - (ULONG)Ccb->Data);
 #endif
-
-					Information = CopyLength;
-
-					if (Ccb->ReadDataAvailable > Length)
+						break;
+					}
+
+					/* Use the smaller value */
+					CopyLength = min(NextMessageLength, Length);
+					/* retrieve the message from the buffer */
+					memcpy(Buffer, (PVOID)((ULONG)Ccb->Data + sizeof(NextMessageLength)), CopyLength);
+
+
+					if (Ccb->ReadDataAvailable > CopyLength)
 					{
-						memmove(Ccb->Data, (PVOID)((ULONG_PTR)Ccb->Data + Length),
-							Ccb->ReadDataAvailable - Length);
-						Ccb->ReadDataAvailable -= Length;
-						Status = STATUS_MORE_ENTRIES;
+						if (CopyLength < NextMessageLength)
+						{
+							/* Client only requested part of the message */
+
+							/* Calculate the remaining message new size */
+							ULONG NewMessageSize = NextMessageLength-CopyLength;
+							/* Write a new Message size to buffer for the part of the message still there */
+							memcpy(Ccb->Data, &NewMessageSize, sizeof(NewMessageSize));
+
+							/* Move the memory starting from end of partial Message just retrieved */
+							memmove((PVOID)((ULONG_PTR)Ccb->Data + sizeof(NewMessageSize)), 
+								(PVOID)((ULONG_PTR) Ccb->Data + CopyLength + sizeof(NewMessageSize)), 
+								(ULONG)Ccb->WritePtr - ((ULONG)Ccb->Data + sizeof(NewMessageSize)) - CopyLength);
+
+							/* Update the write pointer */
+							Ccb->WritePtr = (PVOID)((ULONG)Ccb->WritePtr - CopyLength);
+						}
+						else
+						{
+							/* Client wanted the entire message */
+							/* Move the memory starting from the next Message just retrieved */
+							memmove(Ccb->Data, 
+								(PVOID)((ULONG_PTR) Ccb->Data + NextMessageLength + sizeof(NextMessageLength)),
+								 (ULONG)Ccb->WritePtr - (ULONG)Ccb->Data - NextMessageLength - sizeof(NextMessageLength));
+
+							/* Update the write pointer */
+							Ccb->WritePtr = (PVOID)((ULONG)Ccb->WritePtr - NextMessageLength);
+						}
 					}
 					else
 					{
+						/* This was the last Message, so just zero this messages for safety sake */
+						memset(Ccb->Data,0,NextMessageLength + sizeof(NextMessageLength));
+						/* reset the write pointer */
+						Ccb->WritePtr = Ccb->Data;
 						KeResetEvent(&Ccb->ReadEvent);
 						if (Ccb->PipeState == FILE_PIPE_CONNECTED_STATE)
 						{
 							KeSetEvent(&Ccb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
 						}
-						Ccb->ReadDataAvailable = 0;
-						Ccb->WriteQuotaAvailable = Ccb->MaxDataLength;
 					}
+#ifndef NDEBUG
+					DPRINT("Length %d Buffer %x\n",CopyLength,Buffer);
+					HexDump((PUCHAR)Buffer, CopyLength);
+#endif
+
+					Information += CopyLength;
+					Ccb->WriteQuotaAvailable +=CopyLength;
+					Ccb->ReadDataAvailable -= CopyLength;
 				}
 
 				if (Information > 0)
@@ -708,9 +758,11 @@
 		if (Fcb->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
 		{
 			DPRINT("Byte stream mode\n");
+
 			while (Length > 0 && ReaderCcb->WriteQuotaAvailable > 0)
 			{
 				CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
+
 				if ((ULONG_PTR)ReaderCcb->WritePtr + CopyLength <= (ULONG_PTR)ReaderCcb->Data + ReaderCcb->MaxDataLength)
 				{
 					memcpy(ReaderCcb->WritePtr, Buffer, CopyLength);
@@ -745,15 +797,23 @@
 		}
 		else
 		{
+			/* For Message Type Pipe, the Pipes memory will be used to store the size of each message */
+                        /* FIXME: Check and verify ReadMode ByteStream */
 			DPRINT("Message mode\n");
 			if (Length > 0)
 			{
 				CopyLength = min(Length, ReaderCcb->WriteQuotaAvailable);
-				memcpy(ReaderCcb->Data, Buffer, CopyLength);
-
-				Information = CopyLength;
-				ReaderCcb->ReadDataAvailable = CopyLength;
-				ReaderCcb->WriteQuotaAvailable = 0;
+				/* First Copy the Length of the message into the pipes buffer */
+				memcpy(ReaderCcb->WritePtr, &CopyLength, sizeof(CopyLength));
+				/* Now the user buffer itself */
+				memcpy((PVOID)((ULONG)ReaderCcb->WritePtr+ sizeof(CopyLength)), Buffer, CopyLength);
+				/* Update the write pointer */
+				ReaderCcb->WritePtr = (PVOID)((ULONG)ReaderCcb->WritePtr + sizeof(CopyLength) + CopyLength);
+
+				Information += CopyLength;
+
+				ReaderCcb->ReadDataAvailable += CopyLength;
+				ReaderCcb->WriteQuotaAvailable -= CopyLength;
 			}
 
 			if (Information > 0)



More information about the Ros-diffs mailing list