1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | import sys import mmap import time from ctypes import * from ctypes.wintypes import BOOL, DWORD, HANDLE, LPCWSTR, LPCVOID, LPVOID class Mmf: def __init__(self, nameOfMmf, data): if(type(nameOfMmf) != type("")): print("nameOfMmf 인자가 문자열이 아닙니다.") return 1 if(type(data) != type(b"")): print("data의 타입이 바이트 타입이 아닙니다.") return 1 self.FILE_MAP_ALL_ACCESS = 0x000F001F # 플래그의 상수값 self.INVALID_HANDLE_VALUE = -1 # 시스템 페이지를 사용하기위한 파일디스크립터(INVALID_FILE_HANDLER) self.SHMEMSIZE = 0x100 # 공유메모리 크기 self.PAGE_READWRITE = 0x04 # 플래그의 상수값 self.kernel32_dll = windll.kernel32 self.msvcrt_dll = cdll.msvcrt self.create_file_mapping_func = self.kernel32_dll.CreateFileMappingW self.create_file_mapping_func.argtypes = (HANDLE, LPVOID, DWORD, DWORD, DWORD, LPCWSTR) self.create_file_mapping_func.restype = HANDLE self.map_view_of_file_func = self.kernel32_dll.MapViewOfFile self.map_view_of_file_func.argtypes = (HANDLE, DWORD, DWORD, DWORD, c_ulonglong) self.map_view_of_file_func.restype = LPVOID self.memcpy_func = self.msvcrt_dll.memcpy self.memcpy_func.argtypes = (c_void_p, c_void_p, c_size_t) self.memcpy_func.restype = LPVOID self.rtl_copy_memory_func = self.kernel32_dll.RtlCopyMemory self.rtl_copy_memory_func.argtypes = (LPVOID, LPCVOID, c_ulonglong) self.unmap_view_of_file_func = self.kernel32_dll.UnmapViewOfFile self.unmap_view_of_file_func.argtypes = (LPCVOID, ) self.unmap_view_of_file_func.restype = BOOL self.memcpy_func = self.msvcrt_dll.memcpy self.memcpy_func.argtypes = (c_void_p, c_void_p, c_size_t) self.memcpy_func.restype = LPVOID self.rtl_copy_memory_func = self.kernel32_dll.RtlCopyMemory self.rtl_copy_memory_func.argtypes = (LPVOID, LPCVOID, c_ulonglong) self.unmap_view_of_file_func = self.kernel32_dll.UnmapViewOfFile self.unmap_view_of_file_func.argtypes = (LPCVOID, ) self.unmap_view_of_file_func.restype = BOOL self.close_handle_func = self.kernel32_dll.CloseHandle self.close_handle_func.argtypes = (HANDLE, ) self.close_handle_func.restype = BOOL self.get_last_error_func = self.kernel32_dll.GetLastError self.getch_func = self.msvcrt_dll._getch ''' C타입 함수 포인터들의 선언 ''' self.file_mapping_name_ptr = c_wchar_p(nameOfMmf) self.data = bytes(data) self.msg_ptr = c_char_p(self.data) self.mapping_handle = self.create_file_mapping_func(self.INVALID_HANDLE_VALUE, 0, self.PAGE_READWRITE, 0, self.SHMEMSIZE, self.file_mapping_name_ptr) print("Mapping object handle: 0x{:016X}".format(self.mapping_handle)) if not self.mapping_handle: print("Could not open file mapping object: {:d}".format(self.get_last_error_func())) return 1 raise WinError() def update(self) : self.mapped_view_ptr = self.map_view_of_file_func(self.mapping_handle, self.FILE_MAP_ALL_ACCESS, 0, 0, self.SHMEMSIZE) print("Mapped view addr: 0x{:016X}".format(self.mapped_view_ptr)) if not self.mapped_view_ptr: print("Could not map view of file: {:d}".format(self.get_last_error_func())) self.close_handle_func(self.mapping_handle) return 1 raise WinError() print("Message length: {:d} chars ({:d} bytes)".format(len(self.data), len(self.data))) self.memcpy_func(self.mapped_view_ptr, self.msg_ptr, len(self.data)) def close() : self.unmap_view_of_file_func(self.mapped_view_ptr) self.close_handle_func(self.mapping_handle) if __name__ == "__main__": print("Python {:s} on {:s}".format(sys.version, sys.platform)) #rtl_copy_memory_func(mapped_view_ptr, msg_ptr, byte_len) mmf = Mmf("123123", b"hello c++") mmf.update() print("Hit a key to exit...") input() mmf.close() | cs |
'IPC(Pipe, Shared Mem)' 카테고리의 다른 글
ctypes mmf 널문자 무시 바이트데이터 수신 예제 (0) | 2019.02.07 |
---|---|
파이썬 MMF read 예제 (0) | 2019.02.01 |
ctypes mmf 예제 (0) | 2019.02.01 |
c/python ipc 구현 블로그 (0) | 2019.01.31 |
python 헤더 정의 (0) | 2019.01.31 |